diff --git a/gateway/src/Connection.go b/gateway/src/Connection.go index f2ce776..63b9322 100644 --- a/gateway/src/Connection.go +++ b/gateway/src/Connection.go @@ -2,6 +2,7 @@ package main import ( _ "crypto/md5" + "encoding/json" _ "gateway-softweng/docs" paho "github.com/eclipse/paho.mqtt.golang" "github.com/gin-contrib/cors" @@ -24,6 +25,12 @@ type Gateway struct { accounts gin.Accounts influx influxdb2.Client influxApi api.WriteAPIBlocking + + // Fields to track pending POST requests + pendingUser string + pendingRoom string + pendingDevice string + pendingChan chan string } // @Summary Ping test endpoint @@ -96,9 +103,20 @@ func (gh *Gateway) createRestGateway() { // Define the route for the publish command gh.restGroup.POST("", func(c *gin.Context) { - ret := gh.publishCommand(c) + response, ret := gh.publishCommand(c) if ret == nil { - c.JSON(http.StatusOK, gin.H{"status": "ok"}) + if response != "" { + // Try to parse the response as JSON + var measurements []map[string]interface{} + if err := json.Unmarshal([]byte(response), &measurements); err == nil { + // If parsing succeeds, return the measurements directly + c.JSON(http.StatusOK, measurements) + } else { + c.JSON(http.StatusOK, gin.H{"status": "ok"}) + } + } else { + c.JSON(http.StatusNoContent, gin.H{"status": "ok"}) + } } else { c.JSON(http.StatusInternalServerError, gin.H{"error": ret.Error()}) } @@ -222,6 +240,9 @@ func NewGateway() (*Gateway, error) { var gh = Gateway{} var err error + // Initialize the pending request channel + gh.pendingChan = make(chan string, 1) + // Initialize the InfluxDB client gh.createInfluxGateway() diff --git a/gateway/src/ProcessUpdate.go b/gateway/src/ProcessUpdate.go index ce0fd05..b832f75 100644 --- a/gateway/src/ProcessUpdate.go +++ b/gateway/src/ProcessUpdate.go @@ -41,6 +41,7 @@ func (m *Gateway) processUpdate(_ paho.Client, message paho.Message) { err := json.Unmarshal(message.Payload(), &event) if err != nil { log.Errorf("invalid update event payload: %s", string(message.Payload())) + return } // Skip processing if no data points are provided. @@ -48,6 +49,39 @@ func (m *Gateway) processUpdate(_ paho.Client, message paho.Message) { return } + // Check if there's a pending request for this user, room, and device + if m.pendingUser == user && m.pendingRoom == room && m.pendingDevice == device { + // Create a slice to hold the measurements + var measurements []map[string]interface{} + currentTime := time.Now().Format(time.RFC3339Nano) + + // Convert each data point to a measurement + for field, value := range event.DataPoints { + // Create a measurement for this field + measurement := map[string]interface{}{ + "time": currentTime, + "type": field, + "value": value, + } + measurements = append(measurements, measurement) + } + + // Convert measurements to JSON + if len(measurements) > 0 { + responseData, err := json.Marshal(measurements) + if err == nil { + // Send the data as a response + select { + case m.pendingChan <- string(responseData): + // Response sent successfully + default: + // Channel is full or closed, log an error + log.Errorf("failed to send response to pending channel") + } + } + } + } + // Prepare the InfluxDB point with tags and fields. MEASUREMENT_NAME, ok := os.LookupEnv("MEASUREMENT_NAME") if !ok { diff --git a/gateway/src/PublishCommand.go b/gateway/src/PublishCommand.go index f2d3d79..3c96864 100644 --- a/gateway/src/PublishCommand.go +++ b/gateway/src/PublishCommand.go @@ -3,6 +3,7 @@ package main import ( "errors" _ "gateway-softweng/docs" + "time" _ "github.com/eclipse/paho.mqtt.golang" "github.com/gin-gonic/gin" @@ -24,25 +25,25 @@ type Command struct { // @Param room query string true "Room name" example(Bedroom) // @Param device query string true "Device name" example(DoorSensor) // @Param command body main.Command true "Command to publish" -// @Success 200 {object} map[string]string "status:ok" +// @Success 200 {object} map[string]interface{} "response or status:ok" // @Failure 500 {object} gin.H // @Router /raclette [post] -func (gh *Gateway) publishCommand(c *gin.Context) error { +func (gh *Gateway) publishCommand(c *gin.Context) (string, error) { // Get the user from the authenticated context //userid := c.MustGet(gin.AuthUserKey).(string) // Get user, room and device from the query parameters user, ret := c.GetQuery("user") if !ret { - return errors.New("no user found") + return "", errors.New("no user found") } room, ret := c.GetQuery("room") if !ret { - return errors.New(`no room found`) + return "", errors.New(`no room found`) } device, ret := c.GetQuery("device") if !ret { - return errors.New(`no device found`) + return "", errors.New(`no device found`) } // Define the JSON structure for the command @@ -51,14 +52,38 @@ func (gh *Gateway) publishCommand(c *gin.Context) error { // Bind the JSON payload to the structure err := c.Bind(&json) if err != nil { - return err + return "", err } + // Set the pending request fields to track this request + gh.pendingUser = user + gh.pendingRoom = room + gh.pendingDevice = device + // Publish the command to the MQTT broker - topic := user + "/" + room + "/" + device + "/cmd/" + json.Command - token := gh.mqtt.Publish(topic, 1, false, "") + cmdTopic := user + "/" + room + "/" + device + "/cmd/" + json.Command + token := gh.mqtt.Publish(cmdTopic, 1, false, "") if token.Wait() && token.Error() != nil { - return token.Error() + // Clear pending request fields + gh.pendingUser = "" + gh.pendingRoom = "" + gh.pendingDevice = "" + return "", token.Error() + } + + // Wait for response with a 1-second timeout + select { + case response := <-gh.pendingChan: + // Clear pending request fields + gh.pendingUser = "" + gh.pendingRoom = "" + gh.pendingDevice = "" + return response, nil + case <-time.After(1 * time.Second): + // Timeout occurred, clear pending request fields + gh.pendingUser = "" + gh.pendingRoom = "" + gh.pendingDevice = "" + return "", nil } - return nil } diff --git a/gateway/src/docs/docs.go b/gateway/src/docs/docs.go index 78f2084..1fd6f44 100644 --- a/gateway/src/docs/docs.go +++ b/gateway/src/docs/docs.go @@ -152,12 +152,10 @@ const docTemplate = `{ ], "responses": { "200": { - "description": "status:ok", + "description": "response or status:ok", "schema": { "type": "object", - "additionalProperties": { - "type": "string" - } + "additionalProperties": true } }, "500": { diff --git a/gateway/src/docs/swagger.json b/gateway/src/docs/swagger.json index 86a6c32..1d8137a 100644 --- a/gateway/src/docs/swagger.json +++ b/gateway/src/docs/swagger.json @@ -146,12 +146,10 @@ ], "responses": { "200": { - "description": "status:ok", + "description": "response or status:ok", "schema": { "type": "object", - "additionalProperties": { - "type": "string" - } + "additionalProperties": true } }, "500": { diff --git a/gateway/src/docs/swagger.yaml b/gateway/src/docs/swagger.yaml index a703ded..a2ba6b6 100644 --- a/gateway/src/docs/swagger.yaml +++ b/gateway/src/docs/swagger.yaml @@ -108,10 +108,9 @@ paths: - application/json responses: "200": - description: status:ok + description: response or status:ok schema: - additionalProperties: - type: string + additionalProperties: true type: object "500": description: Internal Server Error