feat(gateway): added first implement of the Gateway (MQTT, Influx, REST)
Signed-off-by: Klagarge <remi@heredero.ch>
This commit is contained in:
77
gateway/src/ProcessUpdate.go
Normal file
77
gateway/src/ProcessUpdate.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
paho "github.com/eclipse/paho.mqtt.golang"
|
||||
influxdb "github.com/influxdata/influxdb-client-go/v2"
|
||||
"github.com/labstack/gommon/log"
|
||||
)
|
||||
|
||||
type dataPoints = map[string]interface{}
|
||||
|
||||
type updateEvent struct {
|
||||
State string `json:"state"`
|
||||
DataPoints dataPoints `json:"values"`
|
||||
}
|
||||
|
||||
// processUpdate processes device state updates received via MQTT and writes data points to InfluxDB.
|
||||
//
|
||||
// Parameters:
|
||||
// - _ : The MQTT client instance (unused).
|
||||
// - message: The MQTT message containing the topic and payload.
|
||||
func (m *Gateway) processUpdate(_ paho.Client, message paho.Message) {
|
||||
// Parse the MQTT topic to extract user, room, and device information.
|
||||
topic := strings.Split(message.Topic(), "/")
|
||||
if len(topic) != 4 {
|
||||
log.Errorf("invalid topic: %s", message.Topic())
|
||||
return
|
||||
}
|
||||
|
||||
user := topic[0]
|
||||
room := topic[1]
|
||||
device := topic[2]
|
||||
|
||||
// Decode the message payload into an updateEvent structure.
|
||||
var event updateEvent
|
||||
err := json.Unmarshal(message.Payload(), &event)
|
||||
if err != nil {
|
||||
log.Errorf("invalid update event payload: %s", string(message.Payload()))
|
||||
}
|
||||
|
||||
// Skip processing if no data points are provided.
|
||||
if event.DataPoints == nil || len(event.DataPoints) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Prepare the InfluxDB point with tags and fields.
|
||||
MEASUREMENT_NAME, ok := os.LookupEnv("MEASUREMENT_NAME")
|
||||
if !ok {
|
||||
log.Error("MEASUREMENT_NAME not set, using default value: softweng")
|
||||
MEASUREMENT_NAME = "softweng"
|
||||
}
|
||||
point := influxdb.NewPointWithMeasurement(MEASUREMENT_NAME).
|
||||
SetTime(time.Now()).
|
||||
AddTag("user", user).
|
||||
AddTag("room", room).
|
||||
AddTag("device", device)
|
||||
for n, dp := range event.DataPoints {
|
||||
point.AddField(n, dp)
|
||||
}
|
||||
|
||||
// Write the point to InfluxDB and handle errors.
|
||||
err = m.influxApi.WritePoint(context.Background(), point)
|
||||
if err != nil {
|
||||
log.Errorf("failed to write data points to influx: %v", err)
|
||||
}
|
||||
|
||||
// Flush the InfluxDB client to ensure data is written.
|
||||
err = m.influxApi.Flush(context.Background())
|
||||
if err != nil {
|
||||
log.Errorf("failed to flush data points to influx: %v", err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user