InfluxDB 2.x - Ergebnisses per join zusammenführen

Mit ** gekennzeichnete Links auf dieser Seite sind Affiliatelinks.

InfluxDB 2.x - Ergebnisses per join zusammenführen
InfluxDB 2.x - Ergebnisses per join zusammenführen
  • 24.08.2023
  • Visualisierung
  • Datenbanken

In einem ersten Video bin ich auf die InfluxDB 2.x Grundlagen eingegangen. Dann folge ein Praixs-Video zu meinem PV-Dashboard. Diese beiden Videos haben jede Menge Mails und Anfragen mit Sonderwünschen generiert. Auf einen davon möchte ich hier näher eingehen. Die Anforderunge war, dass aus dem Zählerstand ein Line-Graph gebaut wird, welcher jeden Tag bei 0 anfängt und dann ansteigt.

Was Du benötigst?

  • Eine bestehende InfluxDB 2.x Installation
  • Das Grundwissen aus den vorigen Videos

Video

Hausbau-Kurs

Statements

Die Grundlage war ein Statement, welches die Entwicklung des Zählerstandes zeigt. Das Problem dabei ist, dass der erste Wert im Ergebnis natürlich sehr hoch ist (je nachdem wieviel Energie schon generiert wurde).

import "timezone"
import "date"

option location = timezone.location(name: "Europe/Berlin")

stopDate = date.truncate(t: today(), unit: 1d)
startDate = date.sub(from: stopDate, d: 1d)

from(bucket: "smarthome")
  |> range(start: startDate, stop: stopDate)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "generatedWh")
  |> aggregateWindow(every: 5m, fn: first, createEmpty: false, timeSrc: "_start")

Das sieht dann so aus:

Entwicklung Zählerstand

»Entwicklung Zählerstand«

Was ist nun aber, wenn der Graph bei 0 beginnen soll? Dann müssten wir ja den ersten Wert des Ergebnisses als Referenz nehmen und diesen von jedem weiteren Wert abziehen. Logischerweise ist der erste Wert im Ergebnis immer der kleinste - der Zähler zählt ja nicht rückwärts.

Dabei hilft die Funktion findRecord. Ich hole mir also aus dem gleichen Zeitraum den ersten Wert des Ergebnisses und speichere diese Ergebniszeile in einer Variablen namens firstRow (wie die heißt ist natürlich total egal). Jedenfalls kann dann mit einer Map-Funktion darauf zugegriffen werden und von jedem Wert des Ergebnisses dieser Wert abgezogen werden.

import "timezone"
import "date"

option location = timezone.location(name: "Europe/Berlin")

stopDate = date.truncate(t: today(), unit: 1d)
startDate = date.sub(from: stopDate, d: 1d)

firstRow = from(bucket: "smarthome")
  |> range(start: startDate, stop: stopDate)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "generatedWh")
  |> first()
  |> findRecord(
    fn: (key) => key._field == "generatedWh",
    idx: 0,
  )

from(bucket: "smarthome")
  |> range(start: startDate, stop: stopDate)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "generatedWh")
  |> aggregateWindow(every: 5m, fn: last, createEmpty: false, timeSrc: "_start")
  |> map(fn: (r) => ({r with _value: r._value - firstRow._value}))

Entwicklung Zählerstand - Start bei 0

»Entwicklung Zählerstand - Start bei 0«

Das klappt also wunderbar, solange man sich nur innerhalb eines Tages befindet. Kommen nun mehrere Tage in das Ergebnis, bildet sich eine Treppe (weil ja jeder Wert als Referenz die erste Zeile des Ergebnisses nutzt). Logisch soweit, oder?

Entwicklung Zählerstand über 7 Tage - Start bei 0

»Entwicklung Zählerstand über 7 Tage - Start bei 0«

Was müsste man also tun? Im ersten Schritt müsste für jeden Tag im Zeitraum der kleinste (bzw. erste) Wert ermittelt werden. Das geht ja relativ einfach:

dayStartValues = from(bucket: "smarthome")
  |> range(start: startDate, stop: stopDate)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "generatedWh")
  |> aggregateWindow(every: 1d, fn: first, createEmpty: false, timeSrc: "_start")

Nun bekommen wir genau so viele Ergebnisse zurück, wie im angefragten Zeitraum Tage enthalten sind. Nur wie verknüpfen wir das mit unserem Ergebnis? Jetzt müsste man ja praktisch aus jeder Zeile des Ergebnisses eine andere Zeile suchen, wo der Tag übereinstimmt.

Startwerte für jeden Tag im Ergebnis

»Startwerte für jeden Tag im Ergebnis«

Mein erster Versuch sah so aus:

import "timezone"
import "date"

option location = timezone.location(name: "Europe/Berlin")

stopDate = date.truncate(t: today(), unit: 1d)
startDate = date.sub(from: stopDate, d: 7d)

dayStartValues = from(bucket: "smarthome")
  |> range(start: startDate, stop: stopDate)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "generatedWh")
  |> aggregateWindow(every: 1d, fn: first, createEmpty: false, timeSrc: "_start")

getStartValue = (time) => (dayStartValues |> findRecord(
    fn: (key) => key._time == time,
    idx: 0
  ))._value

from(bucket: "smarthome")
  |> range(start: startDate, stop: stopDate)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "generatedWh")
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false, timeSrc: "_start")
  |> map(fn: (r) => ({r with dayStart: date.truncate(t: r._time, unit: 1d) }))
  |> map(fn: (r) => ({r with _value: r._value - getStartValue(time: r.dayStart) }))

Das funktioniert zwar, ist aber extrem langsam. Hier wird eine Funktion definiert, welche aus dem anderen Stream für jede einzelne Zeile ein Ergebnis sucht, wo die Zeit übereinstimmt. Dafür brauchte ich natürlich erstmal den Tagesanfang in jeder Zeile, welcher mit einer map-Funktion hinzugefügt wurde. Das könnte man auch deutlich verkürzen und das date.truncate mit in die Funktion dayStartValues packen oder in den Aufruf. Aber: Das ist extrem langsam. Erst nach gut 100 Sekunden (also fast 2 Minuten) hatte ich die Werte. Das ist also in der Praxis völlig unbrauchbar.

Besser wäre es, die beiden Ergebnisse mit einem inner join zu verbinden (wir können ja sicherstellen, dass es zu jedem Datum eine Zeile im anderen Ergebnis gibt):

import "timezone"
import "date"
import "join"

option location = timezone.location(name: "Europe/Berlin")

stopDate = date.truncate(t: today(), unit: 1d)
startDate = date.sub(from: stopDate, d: 7d)

dayStartValues = from(bucket: "smarthome")
  |> range(start: startDate, stop: stopDate)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "generatedWh")
  |> aggregateWindow(every: 1d, fn: first, createEmpty: false, timeSrc: "_start")

graphValues = from(bucket: "smarthome")
  |> range(start: startDate, stop: stopDate)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "generatedWh")
  |> aggregateWindow(every: 15m, fn: last, createEmpty: false, timeSrc: "_start")
  |> map(fn: (r) => ({r with dayStart: date.truncate(t: r._time, unit: 1d) }))

join.inner(
    left: graphValues,
    right: dayStartValues,
    on: (l, r) => l.dayStart == r._time,
    as: (l, r) => ({l with dayStartValue: r._value}),
)
|> map(fn: (r) => ({r with _value: r._value - r.dayStartValue }))
|> drop(columns: ["dayStartValue"])

Entwicklung Zählerstand - Jeden Tag bei 0 starten

»Entwicklung Zählerstand - Jeden Tag bei 0 starten«

Zack - das ist genau das richtige Ergebnis! Jetzt habe ich als Zwischenergebnis in jeder Zeile die neue Spalte dayStartValue zur Verfügung. Das ist die _value-Spalte aus dayStartValues. Diese musste einen neuen Namen bekommen, da es sonst ja zwei Spalten mit dem gleichen Namen gäbe.

Und der Rest war einfach: Mit einer Map-Funktion wieder eine Subtration anstoßen. Fertig. So einfach kann man zwei Ergebnisse zusammenführen und damit weiter arbeiten. Flux gefällt mir immer besser! Schade, die Sprache in InfluxDB 3.x wieder raus fliegen wird. Aber dazu vllt. bald ein weiteres Video.

Mich ärgert etwas, dass ich da nicht direkt drauf gekommen bin. Immerhin gehören inner, left, right, left-inner joins usw. zum Alltag bei der Arbeit mit Datenbanken. Und SQL nutze ich z.B. schon über 20 Jahre.

Du willst mehr?

Smart-Home-Trainings von A-Z

Steig' noch tiefer in die Themen ein und meistere Deine Projekte! Über 15.000 Teilnehmer konnten sich schon von der Qualität der Online-Kurse überzeugen.