Files
MSE-SoftwEng/gateway/src/ProcessUpdate.go
2025-04-15 10:43:29 +02:00

78 lines
2.1 KiB
Go

package main
import (
"context"
"encoding/json"
"os"
"strings"
"time"
paho "github.com/eclipse/paho.mqtt.golang"
influxdb "github.com/influxdata/influxdb-client-go/v2"
"github.com/labstack/gommon/log"
)
type dataPoints = map[string]interface{}
type updateEvent struct {
//State string `json:"state"`
DataPoints dataPoints `json:"values"`
}
// processUpdate processes device state updates received via MQTT and writes data points to InfluxDB.
//
// Parameters:
// - _ : The MQTT client instance (unused).
// - message: The MQTT message containing the topic and payload.
func (m *Gateway) processUpdate(_ paho.Client, message paho.Message) {
// Parse the MQTT topic to extract user, room, and device information.
topic := strings.Split(message.Topic(), "/")
if len(topic) != 4 {
log.Errorf("invalid topic: %s", message.Topic())
return
}
user := topic[0]
room := topic[1]
device := topic[2]
// Decode the message payload into an updateEvent structure.
var event updateEvent
err := json.Unmarshal(message.Payload(), &event)
if err != nil {
log.Errorf("invalid update event payload: %s", string(message.Payload()))
}
// Skip processing if no data points are provided.
if event.DataPoints == nil || len(event.DataPoints) == 0 {
return
}
// Prepare the InfluxDB point with tags and fields.
MEASUREMENT_NAME, ok := os.LookupEnv("MEASUREMENT_NAME")
if !ok {
log.Error("MEASUREMENT_NAME not set, using default value: softweng")
MEASUREMENT_NAME = "softweng"
}
point := influxdb.NewPointWithMeasurement(MEASUREMENT_NAME).
SetTime(time.Now()).
AddTag("user", user).
AddTag("room", room).
AddTag("device", device)
for n, dp := range event.DataPoints {
point.AddField(n, dp)
}
// Write the point to InfluxDB and handle errors.
err = m.influxApi.WritePoint(context.Background(), point)
if err != nil {
log.Errorf("failed to write data points to influx: %v", err)
}
// Flush the InfluxDB client to ensure data is written.
err = m.influxApi.Flush(context.Background())
if err != nil {
log.Errorf("failed to flush data points to influx: %v", err)
}
}