feat(gateway): add test routes
Return 100 last values to be sure that test have some values Need to be refactor to merge both function. Now it's clearly a mess with duplicate code. Signed-off-by: Klagarge <remi@heredero.ch>
This commit is contained in:
@@ -22,6 +22,7 @@ type Gateway struct {
|
|||||||
mqtt paho.Client
|
mqtt paho.Client
|
||||||
rest *gin.Engine
|
rest *gin.Engine
|
||||||
restGroup *gin.RouterGroup
|
restGroup *gin.RouterGroup
|
||||||
|
testGroup *gin.RouterGroup
|
||||||
accounts gin.Accounts
|
accounts gin.Accounts
|
||||||
influx influxdb2.Client
|
influx influxdb2.Client
|
||||||
influxApi api.WriteAPIBlocking
|
influxApi api.WriteAPIBlocking
|
||||||
@@ -100,6 +101,7 @@ func (gh *Gateway) createRestGateway() {
|
|||||||
|
|
||||||
// Create a new router group with basic authentication for /raclette
|
// Create a new router group with basic authentication for /raclette
|
||||||
gh.restGroup = gh.rest.Group("/raclette", gin.BasicAuth(gh.accounts))
|
gh.restGroup = gh.rest.Group("/raclette", gin.BasicAuth(gh.accounts))
|
||||||
|
gh.testGroup = gh.rest.Group("/test", gin.BasicAuth(gh.accounts))
|
||||||
|
|
||||||
// Define the route for the publish command
|
// Define the route for the publish command
|
||||||
gh.restGroup.POST("", func(c *gin.Context) {
|
gh.restGroup.POST("", func(c *gin.Context) {
|
||||||
@@ -129,6 +131,14 @@ func (gh *Gateway) createRestGateway() {
|
|||||||
c.JSON(http.StatusInternalServerError, gin.H{"error": ret.Error()})
|
c.JSON(http.StatusInternalServerError, gin.H{"error": ret.Error()})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Define the route for the publish command (GET)
|
||||||
|
gh.testGroup.GET("", func(c *gin.Context) {
|
||||||
|
ret := gh.requestInfluxTest(c)
|
||||||
|
if ret != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": ret.Error()})
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// createMQTTGateway initializes the MQTT client and sets up subscriptions.
|
// createMQTTGateway initializes the MQTT client and sets up subscriptions.
|
||||||
|
|||||||
@@ -68,6 +68,7 @@ func (gh *Gateway) requestInflux(c *gin.Context) error {
|
|||||||
|> filter(fn: (r) => r["user"] == %q)
|
|> filter(fn: (r) => r["user"] == %q)
|
||||||
|> filter(fn: (r) => r["room"] == %q)
|
|> filter(fn: (r) => r["room"] == %q)
|
||||||
|> filter(fn: (r) => r["device"] == %q)
|
|> filter(fn: (r) => r["device"] == %q)
|
||||||
|
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|
||||||
|> sort(columns: ["_time"], desc: true)
|
|> sort(columns: ["_time"], desc: true)
|
||||||
`, INFLUXDB_BUCKET, MEASUREMENT_NAME, user, room, device)
|
`, INFLUXDB_BUCKET, MEASUREMENT_NAME, user, room, device)
|
||||||
results, err := queryAPI.Query(context.Background(), query)
|
results, err := queryAPI.Query(context.Background(), query)
|
||||||
@@ -101,3 +102,83 @@ func (gh *Gateway) requestInflux(c *gin.Context) error {
|
|||||||
c.JSON(http.StatusOK, values)
|
c.JSON(http.StatusOK, values)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (gh *Gateway) requestInfluxTest(c *gin.Context) error {
|
||||||
|
// Get the user from the authenticated context
|
||||||
|
//userId := c.MustGet(gin.AuthUserKey).(string)
|
||||||
|
|
||||||
|
// Get room and device from the query parameters
|
||||||
|
user, ret := c.GetQuery("user")
|
||||||
|
if !ret {
|
||||||
|
return errors.New("no user found")
|
||||||
|
}
|
||||||
|
room, ret := c.GetQuery("room")
|
||||||
|
if !ret {
|
||||||
|
return errors.New(`no room found`)
|
||||||
|
}
|
||||||
|
device, ret := c.GetQuery("device")
|
||||||
|
if !ret {
|
||||||
|
return errors.New(`no device found`)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get env variables and set default values if not set
|
||||||
|
INFLUXDB_ORG, ok := os.LookupEnv("INFLUXDB_ORG")
|
||||||
|
if !ok {
|
||||||
|
log.Error("INFLUXDB_ORG not set, using default value: raclette")
|
||||||
|
INFLUXDB_ORG = "raclette"
|
||||||
|
}
|
||||||
|
|
||||||
|
INFLUXDB_BUCKET, ok := os.LookupEnv("INFLUXDB_BUCKET")
|
||||||
|
if !ok {
|
||||||
|
log.Error("INFLUXDB_BUCKET not set, using default value: raclette")
|
||||||
|
INFLUXDB_BUCKET = "raclette"
|
||||||
|
}
|
||||||
|
|
||||||
|
queryAPI := gh.influx.QueryAPI(INFLUXDB_ORG)
|
||||||
|
MEASUREMENT_NAME, ok := os.LookupEnv("MEASUREMENT_NAME")
|
||||||
|
if !ok {
|
||||||
|
log.Error("MEASUREMENT_NAME not set, using default value: softweng")
|
||||||
|
MEASUREMENT_NAME = "THC"
|
||||||
|
}
|
||||||
|
// The Flux query uses a large range (-1000d) and aggregates the latest values.
|
||||||
|
// This ensures we always get the most recent data, even if the database contains old entries.
|
||||||
|
query := fmt.Sprintf(`from(bucket: %q)
|
||||||
|
|> range(start: -1000d)
|
||||||
|
|> filter(fn: (r) => r["_measurement"] == %q)
|
||||||
|
|> filter(fn: (r) => r["user"] == %q)
|
||||||
|
|> filter(fn: (r) => r["room"] == %q)
|
||||||
|
|> filter(fn: (r) => r["device"] == %q)
|
||||||
|
|> sort(columns: ["_time"], desc: true)
|
||||||
|
|> limit(n: 100)
|
||||||
|
`, INFLUXDB_BUCKET, MEASUREMENT_NAME, user, room, device)
|
||||||
|
results, err := queryAPI.Query(context.Background(), query)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
type measurement = map[string]interface{}
|
||||||
|
var values []measurement
|
||||||
|
|
||||||
|
for results.Next() {
|
||||||
|
m := measurement{}
|
||||||
|
record := results.Record()
|
||||||
|
field := record.Field()
|
||||||
|
value := record.Value()
|
||||||
|
m["time"] = record.Time()
|
||||||
|
m["value"] = value.(float64)
|
||||||
|
if field == "temperature" {
|
||||||
|
m["type"] = "temperature"
|
||||||
|
}
|
||||||
|
if field == "humidity" {
|
||||||
|
m["type"] = "humidity"
|
||||||
|
}
|
||||||
|
values = append(values, m)
|
||||||
|
fmt.Println(results.Record())
|
||||||
|
}
|
||||||
|
if err := results.Err(); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, values)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user