diff --git a/gateway/src/Connection.go b/gateway/src/Connection.go index 63b9322..34b78cb 100644 --- a/gateway/src/Connection.go +++ b/gateway/src/Connection.go @@ -22,6 +22,7 @@ type Gateway struct { mqtt paho.Client rest *gin.Engine restGroup *gin.RouterGroup + testGroup *gin.RouterGroup accounts gin.Accounts influx influxdb2.Client influxApi api.WriteAPIBlocking @@ -100,6 +101,7 @@ func (gh *Gateway) createRestGateway() { // Create a new router group with basic authentication for /raclette gh.restGroup = gh.rest.Group("/raclette", gin.BasicAuth(gh.accounts)) + gh.testGroup = gh.rest.Group("/test", gin.BasicAuth(gh.accounts)) // Define the route for the publish command gh.restGroup.POST("", func(c *gin.Context) { @@ -129,6 +131,14 @@ func (gh *Gateway) createRestGateway() { c.JSON(http.StatusInternalServerError, gin.H{"error": ret.Error()}) } }) + + // Define the route for the publish command (GET) + gh.testGroup.GET("", func(c *gin.Context) { + ret := gh.requestInfluxTest(c) + if ret != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": ret.Error()}) + } + }) } // createMQTTGateway initializes the MQTT client and sets up subscriptions. diff --git a/gateway/src/RequestInflux.go b/gateway/src/RequestInflux.go index 356ef1a..4dfa91f 100644 --- a/gateway/src/RequestInflux.go +++ b/gateway/src/RequestInflux.go @@ -68,6 +68,7 @@ func (gh *Gateway) requestInflux(c *gin.Context) error { |> filter(fn: (r) => r["user"] == %q) |> filter(fn: (r) => r["room"] == %q) |> filter(fn: (r) => r["device"] == %q) + |> aggregateWindow(every: 5m, fn: mean, createEmpty: false) |> sort(columns: ["_time"], desc: true) `, INFLUXDB_BUCKET, MEASUREMENT_NAME, user, room, device) results, err := queryAPI.Query(context.Background(), query) @@ -101,3 +102,83 @@ func (gh *Gateway) requestInflux(c *gin.Context) error { c.JSON(http.StatusOK, values) return nil } + +func (gh *Gateway) requestInfluxTest(c *gin.Context) error { + // Get the user from the authenticated context + //userId := c.MustGet(gin.AuthUserKey).(string) + + // Get room and device from the query parameters + user, ret := c.GetQuery("user") + if !ret { + return errors.New("no user found") + } + room, ret := c.GetQuery("room") + if !ret { + return errors.New(`no room found`) + } + device, ret := c.GetQuery("device") + if !ret { + return errors.New(`no device found`) + } + + // Get env variables and set default values if not set + INFLUXDB_ORG, ok := os.LookupEnv("INFLUXDB_ORG") + if !ok { + log.Error("INFLUXDB_ORG not set, using default value: raclette") + INFLUXDB_ORG = "raclette" + } + + INFLUXDB_BUCKET, ok := os.LookupEnv("INFLUXDB_BUCKET") + if !ok { + log.Error("INFLUXDB_BUCKET not set, using default value: raclette") + INFLUXDB_BUCKET = "raclette" + } + + queryAPI := gh.influx.QueryAPI(INFLUXDB_ORG) + MEASUREMENT_NAME, ok := os.LookupEnv("MEASUREMENT_NAME") + if !ok { + log.Error("MEASUREMENT_NAME not set, using default value: softweng") + MEASUREMENT_NAME = "THC" + } + // The Flux query uses a large range (-1000d) and aggregates the latest values. + // This ensures we always get the most recent data, even if the database contains old entries. + query := fmt.Sprintf(`from(bucket: %q) + |> range(start: -1000d) + |> filter(fn: (r) => r["_measurement"] == %q) + |> filter(fn: (r) => r["user"] == %q) + |> filter(fn: (r) => r["room"] == %q) + |> filter(fn: (r) => r["device"] == %q) + |> sort(columns: ["_time"], desc: true) + |> limit(n: 100) + `, INFLUXDB_BUCKET, MEASUREMENT_NAME, user, room, device) + results, err := queryAPI.Query(context.Background(), query) + if err != nil { + log.Fatal(err) + } + + type measurement = map[string]interface{} + var values []measurement + + for results.Next() { + m := measurement{} + record := results.Record() + field := record.Field() + value := record.Value() + m["time"] = record.Time() + m["value"] = value.(float64) + if field == "temperature" { + m["type"] = "temperature" + } + if field == "humidity" { + m["type"] = "humidity" + } + values = append(values, m) + fmt.Println(results.Record()) + } + if err := results.Err(); err != nil { + log.Fatal(err) + } + + c.JSON(http.StatusOK, values) + return nil +}