feat(gateway): added reply to the POST request with the latest measurement

Signed-off-by: Klagarge <remi@heredero.ch>
This commit is contained in:
2025-05-25 12:06:41 +02:00
parent bf3d7cbb59
commit cebe5e4930
6 changed files with 98 additions and 23 deletions

View File

@@ -2,6 +2,7 @@ package main
import ( import (
_ "crypto/md5" _ "crypto/md5"
"encoding/json"
_ "gateway-softweng/docs" _ "gateway-softweng/docs"
paho "github.com/eclipse/paho.mqtt.golang" paho "github.com/eclipse/paho.mqtt.golang"
"github.com/gin-contrib/cors" "github.com/gin-contrib/cors"
@@ -24,6 +25,12 @@ type Gateway struct {
accounts gin.Accounts accounts gin.Accounts
influx influxdb2.Client influx influxdb2.Client
influxApi api.WriteAPIBlocking influxApi api.WriteAPIBlocking
// Fields to track pending POST requests
pendingUser string
pendingRoom string
pendingDevice string
pendingChan chan string
} }
// @Summary Ping test endpoint // @Summary Ping test endpoint
@@ -96,9 +103,20 @@ func (gh *Gateway) createRestGateway() {
// Define the route for the publish command // Define the route for the publish command
gh.restGroup.POST("", func(c *gin.Context) { gh.restGroup.POST("", func(c *gin.Context) {
ret := gh.publishCommand(c) response, ret := gh.publishCommand(c)
if ret == nil { if ret == nil {
c.JSON(http.StatusOK, gin.H{"status": "ok"}) if response != "" {
// Try to parse the response as JSON
var measurements []map[string]interface{}
if err := json.Unmarshal([]byte(response), &measurements); err == nil {
// If parsing succeeds, return the measurements directly
c.JSON(http.StatusOK, measurements)
} else {
c.JSON(http.StatusOK, gin.H{"status": "ok"})
}
} else {
c.JSON(http.StatusNoContent, gin.H{"status": "ok"})
}
} else { } else {
c.JSON(http.StatusInternalServerError, gin.H{"error": ret.Error()}) c.JSON(http.StatusInternalServerError, gin.H{"error": ret.Error()})
} }
@@ -222,6 +240,9 @@ func NewGateway() (*Gateway, error) {
var gh = Gateway{} var gh = Gateway{}
var err error var err error
// Initialize the pending request channel
gh.pendingChan = make(chan string, 1)
// Initialize the InfluxDB client // Initialize the InfluxDB client
gh.createInfluxGateway() gh.createInfluxGateway()

View File

@@ -41,6 +41,7 @@ func (m *Gateway) processUpdate(_ paho.Client, message paho.Message) {
err := json.Unmarshal(message.Payload(), &event) err := json.Unmarshal(message.Payload(), &event)
if err != nil { if err != nil {
log.Errorf("invalid update event payload: %s", string(message.Payload())) log.Errorf("invalid update event payload: %s", string(message.Payload()))
return
} }
// Skip processing if no data points are provided. // Skip processing if no data points are provided.
@@ -48,6 +49,39 @@ func (m *Gateway) processUpdate(_ paho.Client, message paho.Message) {
return return
} }
// Check if there's a pending request for this user, room, and device
if m.pendingUser == user && m.pendingRoom == room && m.pendingDevice == device {
// Create a slice to hold the measurements
var measurements []map[string]interface{}
currentTime := time.Now().Format(time.RFC3339Nano)
// Convert each data point to a measurement
for field, value := range event.DataPoints {
// Create a measurement for this field
measurement := map[string]interface{}{
"time": currentTime,
"type": field,
"value": value,
}
measurements = append(measurements, measurement)
}
// Convert measurements to JSON
if len(measurements) > 0 {
responseData, err := json.Marshal(measurements)
if err == nil {
// Send the data as a response
select {
case m.pendingChan <- string(responseData):
// Response sent successfully
default:
// Channel is full or closed, log an error
log.Errorf("failed to send response to pending channel")
}
}
}
}
// Prepare the InfluxDB point with tags and fields. // Prepare the InfluxDB point with tags and fields.
MEASUREMENT_NAME, ok := os.LookupEnv("MEASUREMENT_NAME") MEASUREMENT_NAME, ok := os.LookupEnv("MEASUREMENT_NAME")
if !ok { if !ok {

View File

@@ -3,6 +3,7 @@ package main
import ( import (
"errors" "errors"
_ "gateway-softweng/docs" _ "gateway-softweng/docs"
"time"
_ "github.com/eclipse/paho.mqtt.golang" _ "github.com/eclipse/paho.mqtt.golang"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -24,25 +25,25 @@ type Command struct {
// @Param room query string true "Room name" example(Bedroom) // @Param room query string true "Room name" example(Bedroom)
// @Param device query string true "Device name" example(DoorSensor) // @Param device query string true "Device name" example(DoorSensor)
// @Param command body main.Command true "Command to publish" // @Param command body main.Command true "Command to publish"
// @Success 200 {object} map[string]string "status:ok" // @Success 200 {object} map[string]interface{} "response or status:ok"
// @Failure 500 {object} gin.H // @Failure 500 {object} gin.H
// @Router /raclette [post] // @Router /raclette [post]
func (gh *Gateway) publishCommand(c *gin.Context) error { func (gh *Gateway) publishCommand(c *gin.Context) (string, error) {
// Get the user from the authenticated context // Get the user from the authenticated context
//userid := c.MustGet(gin.AuthUserKey).(string) //userid := c.MustGet(gin.AuthUserKey).(string)
// Get user, room and device from the query parameters // Get user, room and device from the query parameters
user, ret := c.GetQuery("user") user, ret := c.GetQuery("user")
if !ret { if !ret {
return errors.New("no user found") return "", errors.New("no user found")
} }
room, ret := c.GetQuery("room") room, ret := c.GetQuery("room")
if !ret { if !ret {
return errors.New(`no room found`) return "", errors.New(`no room found`)
} }
device, ret := c.GetQuery("device") device, ret := c.GetQuery("device")
if !ret { if !ret {
return errors.New(`no device found`) return "", errors.New(`no device found`)
} }
// Define the JSON structure for the command // Define the JSON structure for the command
@@ -51,14 +52,38 @@ func (gh *Gateway) publishCommand(c *gin.Context) error {
// Bind the JSON payload to the structure // Bind the JSON payload to the structure
err := c.Bind(&json) err := c.Bind(&json)
if err != nil { if err != nil {
return err return "", err
} }
// Set the pending request fields to track this request
gh.pendingUser = user
gh.pendingRoom = room
gh.pendingDevice = device
// Publish the command to the MQTT broker // Publish the command to the MQTT broker
topic := user + "/" + room + "/" + device + "/cmd/" + json.Command cmdTopic := user + "/" + room + "/" + device + "/cmd/" + json.Command
token := gh.mqtt.Publish(topic, 1, false, "") token := gh.mqtt.Publish(cmdTopic, 1, false, "")
if token.Wait() && token.Error() != nil { if token.Wait() && token.Error() != nil {
return token.Error() // Clear pending request fields
gh.pendingUser = ""
gh.pendingRoom = ""
gh.pendingDevice = ""
return "", token.Error()
}
// Wait for response with a 1-second timeout
select {
case response := <-gh.pendingChan:
// Clear pending request fields
gh.pendingUser = ""
gh.pendingRoom = ""
gh.pendingDevice = ""
return response, nil
case <-time.After(1 * time.Second):
// Timeout occurred, clear pending request fields
gh.pendingUser = ""
gh.pendingRoom = ""
gh.pendingDevice = ""
return "", nil
} }
return nil
} }

View File

@@ -152,12 +152,10 @@ const docTemplate = `{
], ],
"responses": { "responses": {
"200": { "200": {
"description": "status:ok", "description": "response or status:ok",
"schema": { "schema": {
"type": "object", "type": "object",
"additionalProperties": { "additionalProperties": true
"type": "string"
}
} }
}, },
"500": { "500": {

View File

@@ -146,12 +146,10 @@
], ],
"responses": { "responses": {
"200": { "200": {
"description": "status:ok", "description": "response or status:ok",
"schema": { "schema": {
"type": "object", "type": "object",
"additionalProperties": { "additionalProperties": true
"type": "string"
}
} }
}, },
"500": { "500": {

View File

@@ -108,10 +108,9 @@ paths:
- application/json - application/json
responses: responses:
"200": "200":
description: status:ok description: response or status:ok
schema: schema:
additionalProperties: additionalProperties: true
type: string
type: object type: object
"500": "500":
description: Internal Server Error description: Internal Server Error