InfluxDB 2.x - Dashboard für eine PV-Anlage

Mit ** gekennzeichnete Links auf dieser Seite sind Affiliatelinks.

InfluxDB 2.x - Dashboard für eine PV-Anlage
InfluxDB 2.x - Dashboard für eine PV-Anlage
  • 11.05.2023
  • Visualisierung
  • Datenbanken

In einem ersten Video bin ich auf die InfluxDB 2.x Grundlagen eingegangen. Dort habe ich alle nötigen Begriffe und das generelle Vorgehen erklärt, sodass wird nun gemeinsam in die Praxis gehen können. Ich möchte ein Dashboard in Grafana erstellen, welches mir viele Kennzahlen liefert. Warum das Ganze? Die App von SolarEdge ist zwar ganz nett, aber viele Werte stehen mir dort einfach nicht zur Verfügung. So fehlen zum Beispiel alle finanziellen Aspekte (beispielsweise gesparte Kosten durch selbst genutzte PV-Energie). Und spätestens wenn dann noch ein Batteriespeicher dazu kommt (welcher nichts mit SolarEdge zu tun hat), wird es richtig undurchsichtig in der App.

Was Du benötigst?

  • Eine bestehende InfluxDB 2.x Installation (Im Video wird 2.6.1 verwendet)
  • Die Daten Deiner PV-Anlage im ioBroker (z.B. via Modbus angebunden)

Mein Standard-Bucket heißt smarthome und für die Langzeitspeicherung (Downsampling) heißt das Bucket smarthome-history. Schau am besten das komplette Video einmal an.

Benötigte Werte

  • Solarproduktion [Wh] - generatedWh
  • Einspeisung [Wh] - exportedWh
  • Zukauf [Wh] - importedWh

Video

Hausbau-Kurs

Werte errechnen

  • Eigenverbrauch [Wh] = Solarproduktion [Wh] - Einspeisung [Wh]
  • Eigenverbrauch [%] = (Eigenverbrauch [Wh] / Solarproduktion [Wh]) * 100
  • Hausverbrauch [Wh] = Zukauf [Wh] + Eigenverbrauch [Wh]
  • Autarkie [%] = (Eigenverbrauch [Wh] / Hausverbrauch [Wh]) * 100
  • Autarkie [%] = ((Solarproduktion [Wh] - Einspeisung [Wh]) / (Zukauf [Wh] + Solarproduktion [Wh] - Einspeisung [Wh])) * 100

Daten in InfluxDB schreiben

Um die Daten zu schreiben, wird ein HTTP-Post mit Daten im Line-Protocol zusammengebaut. Dafür habe ich das folgende Script erstellt. Du musst nur die Object-IDs anpassen und das Token für die InfluxDB (dieses kann aus Sicherheitsgründen nicht direkt aus den Instanzeinstellungen ausgelesen werden).

// v0.3
const influxDbInstance = 'influxdb.0';
const token = 'yyy-xxx==';
const measurement = 'energy-stats';

const loggingTemplate = {
    '0_userdata.0.energy.contract.electricity.kWhPrice': 'priceIn',
    '0_userdata.0.energy.contract.electricity.kWhPriceExport': 'priceOut',
    'alias.0.energy.electricity.meter.totalIn': 'importedWh',
    'alias.0.energy.electricity.meter.totalOut': 'exportedWh',
    'alias.0.energy.electricity.photovoltaic.total': 'generatedWh',
    'alias.0.energy.electricity.wallbox.total': 'wallboxWh',
};

const loggingObj = {};

async function start() {
    const influxDbInstanceConfig = await getObjectAsync(`system.adapter.${influxDbInstance}`);

    const protocol = influxDbInstanceConfig.native.protocol;
    const host = influxDbInstanceConfig.native.host;
    const port = influxDbInstanceConfig.native.port;
    const org = influxDbInstanceConfig.native.organization;
    const bucket = influxDbInstanceConfig.native.dbname;

    console.log(`Starting "${measurement}" logging to ${protocol}://${host}:${port} into bucket "${bucket}" by org ${org}`);

    // Init loggingObj with current values
    for (let [objId, key] of Object.entries(loggingTemplate)) {
        const state = await getStateAsync(objId);
        if (state && !isNaN(state.val)) {
            loggingObj[key] = state.val;
        } else {
            loggingObj[key] = 0;
        }
    }

    on({ id: Object.keys(loggingTemplate), change: 'ne' }, async (obj) => {
        // Update value in loggingObj
        const key = loggingTemplate[obj.id];
        loggingObj[key] = obj.state.val;

        // Save Data
        const data = `${measurement} ${Object.keys(loggingObj)
            .filter(key => !isNaN(loggingObj[key]))
            .map((key) => `${key}=${loggingObj[key]}`)
            .join(',')}`;

        if (data) {
            // console.log(`Saving "${data}" to InfluxDB @ ${protocol}://${host}:${port}/`);

            httpPostAsync(`${protocol}://${host}:${port}/api/v2/write?bucket=${bucket}&org=${org}`, data, {
                headers: {
                    'Content-Type': 'text/plain',
                    'Authorization': `Token ${token}`
                }
            }).catch(err => console.error(err));
        }
    });
}
start();

Alte Version des Scripts (ohne httpPost-Funktion über axios)

// v0.1
const axios = require('axios').default;

const influxDbInstance = 'influxdb.0';
const token = 'yyy-xxx==';
const measurement = 'energy-stats';

const loggingTemplate = {
    '0_userdata.0.energy.contract.electricity.kWhPrice': 'priceIn',
    '0_userdata.0.energy.contract.electricity.kWhPriceExport': 'priceOut',
    'alias.0.energy.electricity.meter.totalIn': 'importedWh',
    'alias.0.energy.electricity.meter.totalOut': 'exportedWh',
    'alias.0.energy.electricity.photovoltaic.total': 'generatedWh',
    'alias.0.energy.electricity.wallbox.total': 'wallboxWh',
};

const loggingObj = {};

async function start() {
    const influxDbInstanceConfig = await getObjectAsync(`system.adapter.${influxDbInstance}`);

    const protocol = influxDbInstanceConfig.native.protocol;
    const host = influxDbInstanceConfig.native.host;
    const port = influxDbInstanceConfig.native.port;
    const org = influxDbInstanceConfig.native.organization;
    const bucket = influxDbInstanceConfig.native.dbname;

    console.log(`Starting "${measurement}" logging to ${protocol}://${host}:${port} into bucket "${bucket}" by org ${org}`);

    // Init loggingObj with current values
    for (let [objId, key] of Object.entries(loggingTemplate)) {
        const state = await getStateAsync(objId);
        if (state && !isNaN(state.val)) {
            loggingObj[key] = state.val;
        } else {
            loggingObj[key] = 0;
        }
    }

    on({ id: Object.keys(loggingTemplate), change: 'ne' }, async (obj) => {
        // Update value in loggingObj
        const key = loggingTemplate[obj.id];
        loggingObj[key] = obj.state.val;

        // Save Data
        const data = `${measurement} ${Object.keys(loggingObj)
            .filter(key => !isNaN(loggingObj[key]))
            .map((key) => `${key}=${loggingObj[key]}`)
            .join(',')}`;

        if (data) {
            // console.log(`Saving "${data}" to InfluxDB @ ${protocol}://${host}:${port}/`);

            axios.post(`${protocol}://${host}:${port}/api/v2/write?bucket=${bucket}&org=${org}`, data, {
                headers: {
                    'Content-Type': 'text/plain',
                    'Authorization': `Token ${token}`
                }
            }).catch(err => {
                console.error(err);
            });
        }
    });
}
start();

Leistungsdaten

Hausverbrauch [W]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart)
  |> filter(fn: (r) => r._measurement == "power-stats")
  |> filter(fn: (r) => r._field == "meterOutW" or r._field == "generatorW" or r._field == "meterInW")
  |> last()
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({ _value: r.generatorW - r.meterOutW + r.meterInW, _time: r._time, _field: "Hausverbrauch" }))

Wallbox [W]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart)
  |> filter(fn: (r) => r._measurement == "power-stats")
  |> filter(fn: (r) => r._field == "wallboxW")
  |> last()

Solarproduktion [W]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart)
  |> filter(fn: (r) => r._measurement == "power-stats")
  |> filter(fn: (r) => r._field == "generatorW")
  |> last()

Einspeisung [W]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart)
  |> filter(fn: (r) => r._measurement == "power-stats")
  |> filter(fn: (r) => r._field == "meterOutW")
  |> last()

Zukauf [W]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart)
  |> filter(fn: (r) => r._measurement == "power-stats")
  |> filter(fn: (r) => r._field == "meterInW")
  |> last()

Energiedaten

Hausverbrauch [Wh]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh" or r._field == "importedWh")
  |> difference()
  |> sum()
  |> pivot(rowKey: ["_start"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({r with _value: r.generatedWh - r.exportedWh + r.importedWh}))
  |> keep(columns: ["_value"])

oder

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh" or r._field == "importedWh")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> difference(columns: ["exportedWh", "generatedWh", "importedWh"])
  |> map(fn: (r) => ({r with _value: r.generatedWh - r.exportedWh + r.importedWh}))
  |> sum()

Und aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

from(bucket: "smarthome-history")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "houseWh")
  |> sum()

Wallbox [Wh]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "wallboxWh")
  |> difference()
  |> sum()

Und aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

from(bucket: "smarthome-history")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "wallboxWh")
  |> sum()

Solarproduktion [Wh]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "generatedWh")
  |> difference()
  |> sum()

Und aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

from(bucket: "smarthome-history")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "generatedWh")
  |> sum()

Autarkie [%]

Bei der Autarkie möchte ich nicht auf einzelnen Zeilen arbeiten, sondern direkt die Differenz der Werte bilden. Ansonsten bekäme man hunderte Zeilen zurück, wo die Autarkie dauerhaft 0% oder 100% ist und man müsste daraus irgendwie den Durchschnitt bilden. Das wäre unnötig kompliziert.

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh" or r._field == "importedWh")
  |> difference()
  |> sum()
  |> pivot(rowKey: ["_start"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({r with _value: (r.generatedWh - r.exportedWh) / (r.importedWh + r.generatedWh - r.exportedWh) * 100.0}))
  |> keep(columns: ["_value"])

Und aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

from(bucket: "smarthome-history")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "ownusedWh" or r._field == "houseWh")
  |> sum()
  |> pivot(rowKey: ["_start"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({r with _value: r.ownusedWh / r.houseWh * 100.0}))
  |> keep(columns: ["_value"])

Eigenverbrauch [Wh]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> difference(columns: ["exportedWh", "generatedWh"])
  |> map(fn: (r) => ({r with _value: r.generatedWh - r.exportedWh}))
  |> sum()

Eigenverbrauch [%]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh")
  |> difference()
  |> sum()
  |> pivot(rowKey: ["_start"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({r with _value: if r.generatedWh > 0 then (r.generatedWh - r.exportedWh) / r.generatedWh * 100.0 else 0.0}))
  |> keep(columns: ["_value"])

Und aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

from(bucket: "smarthome-history")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "ownusedWh")
  |> sum()

Gespart [€]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh" or r._field == "priceIn")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> difference(columns: ["exportedWh", "generatedWh"])
  |> map(fn: (r) => ({r with _value: (r.generatedWh - r.exportedWh) / 1000.0 * r.priceIn}))
  |> sum()

Und aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

from(bucket: "smarthome-history")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "savedEuro")
  |> sum()

Einspeisung [Wh]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh")
  |> difference()
  |> sum()

Und aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

from(bucket: "smarthome-history")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "exportedWh")
  |> sum()

Einspeisung [€]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "priceOut")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> difference(columns: ["exportedWh"])
  |> map(fn: (r) => ({r with _value: r.exportedWh / 1000.0 * r.priceOut}))
  |> sum()

Und aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

from(bucket: "smarthome-history")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "exportedEuro")
  |> sum()

Alternativ könnte man sich auch die Erträge des aktuell laufenden Jahres (z.B. als Tabelle) darstellen:

import "timezone"
import "date"

option location = timezone.location(name: "Europe/Berlin")

from(bucket: "smarthome-history")
  |> range(start: date.truncate(t: today(), unit: 1y))
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "exportedEuro")
  |> aggregateWindow(every: 1mo, fn: sum, createEmpty: true, timeSrc: "_start")
  |> sort(columns: ["_time"])

Einspeisung [%]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh")
  |> difference()
  |> sum()
  |> pivot(rowKey: ["_start"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({r with _value: if r.generatedWh > 0 then 100.0 - (r.generatedWh - r.exportedWh) / r.generatedWh * 100.0 else 0.0}))
  |> keep(columns: ["_value"])

Und aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

from(bucket: "smarthome-history")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh")
  |> sum()
  |> pivot(rowKey: ["_start"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({r with _value: if r.generatedWh > 0 then 100.0 - (r.generatedWh - r.exportedWh) / r.generatedWh * 100.0 else 0.0}))
  |> keep(columns: ["_value"])

Zukauf [Wh]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "importedWh")
  |> difference()
  |> sum()

Und aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

from(bucket: "smarthome-history")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "importedWh")
  |> sum()

Zukauf [€]

from(bucket: "smarthome")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "importedWh" or r._field == "priceIn")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> difference(columns: ["importedWh"])
  |> map(fn: (r) => ({r with _value: r.importedWh / 1000.0 * r.priceIn}))
  |> sum()

Und aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

from(bucket: "smarthome-history")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "importedEuro")
  |> sum()

Verlauf letzte 7 Tage

import "timezone"
import "date"

option location = timezone.location(name: "Europe/Berlin")

data = from(bucket: "smarthome")
  |> range(start: date.add(d: -7d, to: today()), stop: date.truncate(t: today(), unit: 1d))
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh" or r._field == "importedWh")
  |> difference()
  |> aggregateWindow(every: 1d, fn: sum, createEmpty: true, timeSrc: "_start")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({r with houseWh: r.generatedWh - r.exportedWh + r.importedWh}))
  |> drop(columns: ["_start", "_stop", "_measurement"])

generatedWh = data |> map(fn: (r) => ({ _value: r.generatedWh, _time: r._time, _field: "generatedWh" }))
exportedWh = data |> map(fn: (r) => ({ _value: r.exportedWh, _time: r._time, _field: "exportedWh" }))
importedWh = data |> map(fn: (r) => ({ _value: r.importedWh, _time: r._time, _field: "importedWh" }))
houseWh = data |> map(fn: (r) => ({ _value: r.houseWh, _time: r._time, _field: "houseWh" }))

union(tables: [generatedWh, exportedWh, importedWh, houseWh])
  |> group(columns:["_field"])

Oder mit Wallbox:

import "timezone"
import "date"

option location = timezone.location(name: "Europe/Berlin")

data = from(bucket: "smarthome")
  |> range(start: date.add(d: -7d, to: today()), stop: date.truncate(t: today(), unit: 1d))
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh" or r._field == "importedWh" or r._field == "wallboxWh")
  |> difference()
  |> aggregateWindow(every: 1d, fn: sum, createEmpty: true, timeSrc: "_start")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({r with houseWh: r.generatedWh - r.exportedWh + r.importedWh - r.wallboxWh}))
  |> drop(columns: ["_start", "_stop", "_measurement"])

generatedWh = data |> map(fn: (r) => ({ _value: r.generatedWh, _time: r._time, _field: "generatedWh" }))
exportedWh = data |> map(fn: (r) => ({ _value: r.exportedWh, _time: r._time, _field: "exportedWh" }))
importedWh = data |> map(fn: (r) => ({ _value: r.importedWh, _time: r._time, _field: "importedWh" }))
houseWh = data |> map(fn: (r) => ({ _value: r.houseWh, _time: r._time, _field: "houseWh" }))
wallboxWh  = data |> map(fn: (r) => ({ _value: r.wallboxWh, _time: r._time, _field: "wallboxWh" }))

union(tables: [generatedWh, exportedWh, importedWh, houseWh, wallboxWh])
  |> group(columns:["_field"])

Und aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

import "timezone"
import "date"

option location = timezone.location(name: "Europe/Berlin")

from(bucket: "smarthome-history")
  |> range(start: date.add(d: -7d, to: today()), stop: date.truncate(t: today(), unit: 1d))
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh" or r._field == "importedWh" or r._field == "wallboxWh" or r._field == "houseWh")
  |> aggregateWindow(every: 1d, fn: sum, createEmpty: true, timeSrc: "_start")

Verlauf letzte 6 Monate

import "timezone"
import "date"

option location = timezone.location(name: "Europe/Berlin")

data = from(bucket: "smarthome")
  |> range(start: date.truncate(t: date.add(d: -6mo, to: today()), unit: 1mo), stop: date.truncate(t: today(), unit: 1mo))
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh" or r._field == "importedWh")
  |> difference()
  |> aggregateWindow(every: 1mo, fn: sum, createEmpty: true, timeSrc: "_start")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({r with houseWh: r.generatedWh - r.exportedWh + r.importedWh))
  |> drop(columns: ["_start", "_stop", "_measurement"])

generatedWh = data |> map(fn: (r) => ({ _value: r.generatedWh, _time: r._time, _field: "generatedWh" }))
exportedWh = data |> map(fn: (r) => ({ _value: r.exportedWh, _time: r._time, _field: "exportedWh" }))
importedWh = data |> map(fn: (r) => ({ _value: r.importedWh, _time: r._time, _field: "importedWh" }))
houseWh = data |> map(fn: (r) => ({ _value: r.houseWh, _time: r._time, _field: "houseWh" }))

union(tables: [generatedWh, exportedWh, importedWh, houseWh])
  |> group(columns:["_field"])

Oder mit Wallbox:

import "timezone"
import "date"

option location = timezone.location(name: "Europe/Berlin")

data = from(bucket: "smarthome")
  |> range(start: date.truncate(t: date.add(d: -6mo, to: today()), unit: 1mo), stop: date.truncate(t: today(), unit: 1mo))
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh" or r._field == "importedWh" or r._field == "wallboxWh")
  |> difference()
  |> aggregateWindow(every: 1mo, fn: sum, createEmpty: true, timeSrc: "_start")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({r with houseWh: r.generatedWh - r.exportedWh + r.importedWh - r.wallboxWh}))
  |> drop(columns: ["_start", "_stop", "_measurement"])

generatedWh = data |> map(fn: (r) => ({ _value: r.generatedWh, _time: r._time, _field: "generatedWh" }))
exportedWh = data |> map(fn: (r) => ({ _value: r.exportedWh, _time: r._time, _field: "exportedWh" }))
importedWh = data |> map(fn: (r) => ({ _value: r.importedWh, _time: r._time, _field: "importedWh" }))
houseWh = data |> map(fn: (r) => ({ _value: r.houseWh, _time: r._time, _field: "houseWh" }))
wallboxWh  = data |> map(fn: (r) => ({ _value: r.wallboxWh, _time: r._time, _field: "wallboxWh" }))

union(tables: [generatedWh, exportedWh, importedWh, houseWh, wallboxWh])
  |> group(columns:["_field"])

Und aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

import "timezone"
import "date"

option location = timezone.location(name: "Europe/Berlin")

from(bucket: "smarthome-history")
  |> range(start: date.truncate(t: date.add(d: -6mo, to: today()), unit: 1mo), stop: date.truncate(t: today(), unit: 1mo))
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh" or r._field == "importedWh" or r._field == "houseWh" or r._field == "wallboxWh")
  |> aggregateWindow(every: 1mo, fn: sum, createEmpty: true, timeSrc: "_start")

Verlauf letzte 6 Wochen

Aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

import "timezone"
import "date"

option location = timezone.location(name: "Europe/Berlin")

tempDate = date.add(d: 3d, to: today())
weekStart = date.sub(d: 3d, from: date.truncate(t: tempDate, unit: 1w))

from(bucket: "smarthome-history")
  |> range(start: date.sub(d: 28d, from: weekStart), stop: weekStart)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh" or r._field == "importedWh" or r._field == "wallboxWh" or r._field == "houseWh")
  |> aggregateWindow(every: 1w, offset: -3d, fn: sum, timeSrc: "_start")

Ertragreichste Tage aktuelles Jahr

Aus den “historischen Daten” aus dem zweiten Bucket (Tasks erforderlich - siehe unten):

import "timezone"
import "date"

option location = timezone.location(name: "Europe/Berlin")

from(bucket: "smarthome-history")
  |> range(start: date.truncate(t: today(), unit: 1y))
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "generatedWh")
  |> aggregateWindow(every: 1d, fn: sum, createEmpty: false, timeSrc: "_start")
  |> sort(columns: ["_value"], desc: true)
  |> limit(n: 10)

Oder der letzten 365 Tage:

import "timezone"
import "date"

option location = timezone.location(name: "Europe/Berlin")

from(bucket: "smarthome-history")
  |> range(start: date.sub(d: 1y, from: today()))
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "generatedWh")
  |> aggregateWindow(every: 1d, fn: sum, createEmpty: false, timeSrc: "_start")
  |> sort(columns: ["_value"], desc: true)
  |> limit(n: 10)

Optimierung mit Tasks

Jetzt wäre es sinnvoll, die Menge an Daten regelmäßig in ein zweites Bucket zu übertragen und dabei die Daten zu aggregieren. Ansonsten werden die Statements von Zeit zu Zeit immer langsamer und das System braucht für große Zeiträume extrem lange um ein Ergebnis zu liefern. Außerdem wird die Datenmenge auf der Festplatte reduziert.

Alle Tasks sind so konfiguriert, dass sie die Daten aus der vergangenen Stunde holen und in 15min Abschnitten aggregieren.

  • Mit date.truncate(t: now(), unit: 1h) wird der aktuelle Zeitpunkt genommen und auf die letzte volle Stunde gerundet.
  • Mit date.sub(from: fullHourTime, d: 1h) wird eine Stunde von der Startzeit abgezogen. Das ist die Endzeit.

Solarproduktion [Wh]

import "date"

fullHourTime = date.truncate(t: now(), unit: 1h)
startTime = date.sub(from: fullHourTime, d: 1h)

from(bucket: "smarthome")
  |> range(start: startTime, stop: fullHourTime)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "generatedWh")
  |> filter(fn: (r) => r._value > 0)
  |> difference()
  |> aggregateWindow(every: 15m, fn: sum, createEmpty: true)
  |> to(
    bucket: "smarthome-history",
    fieldFn: (r) => ({ "generatedWh": r._value })
  )

Einspeisung [Wh]

import "date"

fullHourTime = date.truncate(t: now(), unit: 1h)
startTime = date.sub(from: fullHourTime, d: 1h)

from(bucket: "smarthome")
  |> range(start: startTime, stop: fullHourTime)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "exportedWh")
  |> filter(fn: (r) => r._value > 0)
  |> difference()
  |> aggregateWindow(every: 15m, fn: sum, createEmpty: true)
  |> to(
    bucket: "smarthome-history",
    fieldFn: (r) => ({ "exportedWh": r._value })
  )

Zukauf [Wh]

import "date"

fullHourTime = date.truncate(t: now(), unit: 1h)
startTime = date.sub(from: fullHourTime, d: 1h)

from(bucket: "smarthome")
  |> range(start: startTime, stop: fullHourTime)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "importedWh")
  |> filter(fn: (r) => r._value > 0)
  |> difference()
  |> aggregateWindow(every: 15m, fn: sum, createEmpty: true)
  |> to(
    bucket: "smarthome-history",
    fieldFn: (r) => ({ "importedWh": r._value })
  )

Wallbox [Wh]

import "date"

fullHourTime = date.truncate(t: now(), unit: 1h)
startTime = date.sub(from: fullHourTime, d: 1h)

from(bucket: "smarthome")
  |> range(start: startTime, stop: fullHourTime)
  |> filter(fn: (r) => r._measurement == "energy-stats" and r._field == "wallboxWh")
  |> filter(fn: (r) => r._value > 0)
  |> difference()
  |> aggregateWindow(every: 15m, fn: sum, createEmpty: true)
  |> to(
    bucket: "smarthome-history",
    fieldFn: (r) => ({ "wallboxWh": r._value })
  )

Eigenverbrauch [Wh]

Das neue Feld heißt ownusedWh. Diesen Wert könnte man theoretisch auch aus den anderen errechnen. Muss nicht zwingend gespeichert werden.

import "date"

fullHourTime = date.truncate(t: now(), unit: 1h)
startTime = date.sub(from: fullHourTime, d: 1h)

from(bucket: "smarthome")
  |> range(start: startTime, stop: fullHourTime)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> filter(fn: (r) => r.exportedWh > 0 and r.generatedWh > 0)
  |> difference(columns: ["exportedWh", "generatedWh"])
  |> map(fn: (r) => ({r with _value: r.generatedWh - r.exportedWh}))
  |> aggregateWindow(every: 15m, fn: sum, createEmpty: true)
  |> to(
    bucket: "smarthome-history",
    fieldFn: (r) => ({ "ownusedWh": r._value })
  )

Hausverbrauch [Wh]

Das neue Feld heißt houseWh. Diesen Wert könnte man theoretisch auch aus den anderen errechnen. Muss nicht zwingend gespeichert werden.

import "date"

fullHourTime = date.truncate(t: now(), unit: 1h)
startTime = date.sub(from: fullHourTime, d: 1h)

from(bucket: "smarthome")
  |> range(start: startTime, stop: fullHourTime)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh" or r._field == "importedWh")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> filter(fn: (r) => r.exportedWh > 0 and r.generatedWh > 0 and r.importedWh > 0)
  |> difference(columns: ["exportedWh", "generatedWh", "importedWh"])
  |> map(fn: (r) => ({r with _value: r.generatedWh - r.exportedWh + r.importedWh}))
  |> aggregateWindow(every: 15m, fn: sum, createEmpty: true)
  |> to(
    bucket: "smarthome-history",
    fieldFn: (r) => ({ "houseWh": r._value })
  )

Einspeisung [€]

Das neue Feld heißt exportedEuro.

import "date"

fullHourTime = date.truncate(t: now(), unit: 1h)
startTime = date.sub(from: fullHourTime, d: 1h)

from(bucket: "smarthome")
  |> range(start: startTime, stop: fullHourTime)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "priceOut")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> filter(fn: (r) => r.exportedWh > 0)
  |> difference(columns: ["exportedWh"])
  |> map(fn: (r) => ({r with _value: r.exportedWh / 1000.0 * r.priceOut}))
  |> aggregateWindow(every: 15m, fn: sum, createEmpty: true)
  |> to(
    bucket: "smarthome-history",
    fieldFn: (r) => ({ "exportedEuro": r._value })
  )

Zukauf [€]

Das neue Feld heißt importedEuro.

import "date"

fullHourTime = date.truncate(t: now(), unit: 1h)
startTime = date.sub(from: fullHourTime, d: 1h)

from(bucket: "smarthome")
  |> range(start: startTime, stop: fullHourTime)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "importedWh" or r._field == "priceIn")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> filter(fn: (r) => r.importedWh > 0)
  |> difference(columns: ["importedWh"])
  |> map(fn: (r) => ({r with _value: r.importedWh / 1000.0 * r.priceIn}))
  |> aggregateWindow(every: 15m, fn: sum, createEmpty: true)
  |> to(
    bucket: "smarthome-history",
    fieldFn: (r) => ({ "importedEuro": r._value })
  )

Gespart [€]

Das neue Feld heißt savedEuro.

import "date"

fullHourTime = date.truncate(t: now(), unit: 1h)
startTime = date.sub(from: fullHourTime, d: 1h)

from(bucket: "smarthome")
  |> range(start: startTime, stop: fullHourTime)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "generatedWh" or r._field == "priceIn")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> filter(fn: (r) => r.exportedWh > 0 and r.generatedWh > 0)
  |> difference(columns: ["exportedWh", "generatedWh"])
  |> map(fn: (r) => ({r with _value: (r.generatedWh - r.exportedWh) / 1000.0 * r.priceIn}))
  |> aggregateWindow(every: 15m, fn: sum, createEmpty: true)
  |> to(
    bucket: "smarthome-history",
    fieldFn: (r) => ({ "savedEuro": r._value })
  )

Alle historischen Daten berechnen / übernehmen

Um jetzt alle Daten aus der Vergangenheit zu berechnen und in das andere Bucket zu übertragen, nimmst Du die Statements aus den Tasks und gibst ein fixes Start-Datum vor. Als Tipp kann ich noch mitgeben, dass Du createEmpty in der aggregateFunction dann am besten auf false stellst, um nicht so viele “Leerzeilen” zu erzeugen. Das Statement führe ich dann einfach im Data Explorer aus.

Beispiel:

import "date"

fullHourTime = date.truncate(t: now(), unit: 1h)

from(bucket: "smarthome")
  |> range(start: 2023-01-01T00:00:00.000Z, stop: fullHourTime)
  |> filter(fn: (r) => r._measurement == "energy-stats")
  |> filter(fn: (r) => r._field == "exportedWh" or r._field == "priceOut")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> difference(columns: ["exportedWh"])
  |> map(fn: (r) => ({r with _value: r.exportedWh / 1000.0 * r.priceOut}))
  |> aggregateWindow(every: 15m, fn: sum, createEmpty: false)
  |> to(
    bucket: "smarthome-history",
    fieldFn: (r) => ({ "exportedEuro": r._value })
  )
Du willst mehr?

Smart-Home-Trainings von A-Z

Steig' noch tiefer in die Themen ein und meistere Deine Projekte! Über 14.000 Teilnehmer konnten sich schon von der Qualität der Online-Kurse überzeugen.

ioBroker-Master-Kurs

ioBroker-Master-Kurs

Mehr Infos
Hausbau-Kurs

Hausbau mit KNX

Mehr Infos
Lox-Kurs

Lox-Kurs

Mehr Infos
Node-RED-Master-Kurs

Node-RED-Master-Kurs

Mehr Infos