package main import ( "context" "encoding/json" "os" "strings" "time" paho "github.com/eclipse/paho.mqtt.golang" influxdb "github.com/influxdata/influxdb-client-go/v2" "github.com/labstack/gommon/log" ) type dataPoints = map[string]interface{} type updateEvent struct { //State string `json:"state"` DataPoints dataPoints `json:"values"` } // processUpdate processes device state updates received via MQTT and writes data points to InfluxDB. // // Parameters: // - _ : The MQTT client instance (unused). // - message: The MQTT message containing the topic and payload. func (m *Gateway) processUpdate(_ paho.Client, message paho.Message) { // Parse the MQTT topic to extract user, room, and device information. topic := strings.Split(message.Topic(), "/") if len(topic) != 4 { log.Errorf("invalid topic: %s", message.Topic()) return } user := topic[0] room := topic[1] device := topic[2] // Decode the message payload into an updateEvent structure. var event updateEvent err := json.Unmarshal(message.Payload(), &event) if err != nil { log.Errorf("invalid update event payload: %s", string(message.Payload())) } // Skip processing if no data points are provided. if event.DataPoints == nil || len(event.DataPoints) == 0 { return } // Prepare the InfluxDB point with tags and fields. MEASUREMENT_NAME, ok := os.LookupEnv("MEASUREMENT_NAME") if !ok { log.Error("MEASUREMENT_NAME not set, using default value: softweng") MEASUREMENT_NAME = "softweng" } point := influxdb.NewPointWithMeasurement(MEASUREMENT_NAME). SetTime(time.Now()). AddTag("user", user). AddTag("room", room). AddTag("device", device) for n, dp := range event.DataPoints { point.AddField(n, dp) } // Write the point to InfluxDB and handle errors. err = m.influxApi.WritePoint(context.Background(), point) if err != nil { log.Errorf("failed to write data points to influx: %v", err) } // Flush the InfluxDB client to ensure data is written. err = m.influxApi.Flush(context.Background()) if err != nil { log.Errorf("failed to flush data points to influx: %v", err) } }