package main import ( "context" "encoding/json" "os" "strings" "time" paho "github.com/eclipse/paho.mqtt.golang" influxdb "github.com/influxdata/influxdb-client-go/v2" "github.com/labstack/gommon/log" ) type dataPoints = map[string]interface{} type updateEvent struct { //State string `json:"state"` DataPoints dataPoints `json:"values"` } // processUpdate processes device state updates received via MQTT and writes data points to InfluxDB. // // Parameters: // - _ : The MQTT client instance (unused). // - message: The MQTT message containing the topic and payload. func (m *Gateway) processUpdate(_ paho.Client, message paho.Message) { // Parse the MQTT topic to extract user, room, and device information. topic := strings.Split(message.Topic(), "/") if len(topic) != 4 { log.Errorf("invalid topic: %s", message.Topic()) return } user := topic[0] room := topic[1] device := topic[2] // Decode the message payload into an updateEvent structure. var event updateEvent err := json.Unmarshal(message.Payload(), &event) if err != nil { log.Errorf("invalid update event payload: %s", string(message.Payload())) return } // Skip processing if no data points are provided. if event.DataPoints == nil || len(event.DataPoints) == 0 { return } // Check if there's a pending request for this user, room, and device if m.pendingUser == user && m.pendingRoom == room && m.pendingDevice == device { // Create a slice to hold the measurements var measurements []map[string]interface{} currentTime := time.Now().Format(time.RFC3339Nano) // Convert each data point to a measurement for field, value := range event.DataPoints { // Create a measurement for this field measurement := map[string]interface{}{ "time": currentTime, "type": field, "value": value, } measurements = append(measurements, measurement) } // Convert measurements to JSON if len(measurements) > 0 { responseData, err := json.Marshal(measurements) if err == nil { // Send the data as a response select { case m.pendingChan <- string(responseData): // Response sent successfully default: // Channel is full or closed, log an error log.Errorf("failed to send response to pending channel") } } } } // Prepare the InfluxDB point with tags and fields. MEASUREMENT_NAME, ok := os.LookupEnv("MEASUREMENT_NAME") if !ok { log.Error("MEASUREMENT_NAME not set, using default value: softweng") MEASUREMENT_NAME = "softweng" } point := influxdb.NewPointWithMeasurement(MEASUREMENT_NAME). SetTime(time.Now()). AddTag("user", user). AddTag("room", room). AddTag("device", device) for n, dp := range event.DataPoints { point.AddField(n, dp) } // Write the point to InfluxDB and handle errors. err = m.influxApi.WritePoint(context.Background(), point) if err != nil { log.Errorf("failed to write data points to influx: %v", err) } // Flush the InfluxDB client to ensure data is written. err = m.influxApi.Flush(context.Background()) if err != nil { log.Errorf("failed to flush data points to influx: %v", err) } }