248 lines
7.1 KiB
Go
248 lines
7.1 KiB
Go
package main
|
|
|
|
import (
|
|
_ "crypto/md5"
|
|
_ "gateway-softweng/docs"
|
|
paho "github.com/eclipse/paho.mqtt.golang"
|
|
"github.com/gin-contrib/cors"
|
|
"github.com/gin-gonic/gin"
|
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
|
"github.com/influxdata/influxdb-client-go/v2/api"
|
|
"github.com/labstack/gommon/log"
|
|
swaggerFiles "github.com/swaggo/files"
|
|
ginSwagger "github.com/swaggo/gin-swagger"
|
|
"net/http"
|
|
"os"
|
|
)
|
|
|
|
type Gateway struct {
|
|
username string
|
|
password string
|
|
mqtt paho.Client
|
|
rest *gin.Engine
|
|
restGroup *gin.RouterGroup
|
|
accounts gin.Accounts
|
|
influx influxdb2.Client
|
|
influxApi api.WriteAPIBlocking
|
|
}
|
|
|
|
// @Summary Ping test endpoint
|
|
// @Description get ping response
|
|
// @Tags ping
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Success 200 {string} string "pong"
|
|
// @Router /ping [get]
|
|
func pingHandler(c *gin.Context) {
|
|
c.String(http.StatusOK, "pong")
|
|
}
|
|
|
|
// getUsers initializes user account.
|
|
//
|
|
// It performs the following actions:
|
|
// - Creates user accounts with usernames and password provided by environnement variable
|
|
// - Stores the credentials for basic authentication.
|
|
func (gh *Gateway) getUser() {
|
|
// Create username
|
|
ok := false
|
|
gh.username, ok = os.LookupEnv("REST_USERNAME")
|
|
if !ok {
|
|
log.Fatal("REST_USERNAME not set")
|
|
}
|
|
|
|
gh.password, ok = os.LookupEnv("REST_PASSWORD")
|
|
if !ok {
|
|
log.Fatal("REST_PASSWORD not set")
|
|
}
|
|
|
|
gh.accounts = gin.Accounts{}
|
|
gh.accounts[gh.username] = gh.password
|
|
|
|
}
|
|
|
|
// createRestGateway sets up the REST API using the Gin framework.
|
|
// It defines the following routes:
|
|
// - A public ping test endpoint for health checks.
|
|
// - A Swagger documentation endpoint for API documentation.
|
|
// - A secured group of endpoints for commands and data requests, protected by basic authentication.
|
|
//
|
|
// Behavior:
|
|
// - The `/ping` endpoint responds with "pong" to test server availability.
|
|
// - The `/swagger/*any` endpoint serves the Swagger UI for API documentation.
|
|
// - The `/raclette` group includes:
|
|
// - A POST route for publishing commands to MQTT.
|
|
// - A GET route for requesting data from InfluxDB.
|
|
//
|
|
// Security:
|
|
// - The `/raclette` group requires basic authentication using credentials generated for each user.
|
|
func (gh *Gateway) createRestGateway() {
|
|
// Create a new Gin router
|
|
gh.rest = gin.Default()
|
|
|
|
// Configure CORS middleware to allow all origins
|
|
config := cors.DefaultConfig()
|
|
config.AllowAllOrigins = true
|
|
config.AllowMethods = []string{"GET", "POST"}
|
|
config.AllowHeaders = []string{"Origin", "Content-Length", "Content-Type", "Authorization"}
|
|
gh.rest.Use(cors.New(config))
|
|
|
|
gh.rest.GET("/ping", pingHandler)
|
|
|
|
// Swagger documentation route
|
|
gh.rest.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
|
|
|
|
// Create a new router group with basic authentication for /raclette
|
|
gh.restGroup = gh.rest.Group("/raclette", gin.BasicAuth(gh.accounts))
|
|
|
|
// Define the route for the publish command
|
|
gh.restGroup.POST("", func(c *gin.Context) {
|
|
ret := gh.publishCommand(c)
|
|
if ret == nil {
|
|
c.JSON(http.StatusOK, gin.H{"status": "ok"})
|
|
} else {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": ret.Error()})
|
|
}
|
|
})
|
|
|
|
// Define the route for the publish command (GET)
|
|
gh.restGroup.GET("", func(c *gin.Context) {
|
|
ret := gh.requestInflux(c)
|
|
if ret != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": ret.Error()})
|
|
}
|
|
})
|
|
}
|
|
|
|
// createMQTTGateway initializes the MQTT client and sets up subscriptions.
|
|
//
|
|
// It subscribes to the following topics:
|
|
// - "+/+/+/update": For device state updates.
|
|
//
|
|
// The OnConnect callback ensures that subscriptions are re-established after reconnecting.
|
|
//
|
|
// Returns:
|
|
// - error: An error if the MQTT client fails to connect or subscribe to topics.
|
|
func (gh *Gateway) createMQTTGateway() error {
|
|
|
|
// Get env variables and set default values if not set
|
|
MQTT_URL, ok := os.LookupEnv("MQTT_URL")
|
|
if !ok {
|
|
log.Error("MQTT_URL not set, using default value: mqtt://mqtt.mse.kb28.ch:1883")
|
|
MQTT_URL = "mqtt://mqtt.mse.kb28.ch:1883"
|
|
}
|
|
|
|
CLIENT_ID, ok := os.LookupEnv("CLIENT_ID")
|
|
if !ok {
|
|
log.Error("CLIENT_ID not set, using default value: Gateway-SoftwEng")
|
|
CLIENT_ID = "Gateway-SoftwEng"
|
|
}
|
|
|
|
// Get env variables for MQTT credentials, panic if not set
|
|
MQTT_USERNAME, ok := os.LookupEnv("MQTT_USERNAME")
|
|
if !ok {
|
|
log.Fatal("MQTT_USERNAME not set")
|
|
}
|
|
MQTT_PASSWORD, ok := os.LookupEnv("MQTT_PASSWORD")
|
|
if !ok {
|
|
log.Fatal("MQTT_PASSWORD not set")
|
|
}
|
|
|
|
// Create MQTT client
|
|
option := paho.NewClientOptions()
|
|
option.AddBroker(MQTT_URL)
|
|
option.SetClientID(CLIENT_ID)
|
|
option.SetUsername(MQTT_USERNAME)
|
|
option.SetPassword(MQTT_PASSWORD)
|
|
option.SetAutoReconnect(true)
|
|
option.OnConnect = func(client paho.Client) {
|
|
|
|
// Subscribe to update
|
|
token := gh.mqtt.Subscribe("+/+/+/update", 0, gh.processUpdate)
|
|
if token.Wait() && token.Error() != nil {
|
|
log.Error(token.Error())
|
|
client.Disconnect(1000) // Disconnect from the broker after 1000ms
|
|
}
|
|
|
|
}
|
|
gh.mqtt = paho.NewClient(option)
|
|
token := gh.mqtt.Connect()
|
|
if token.Wait() && token.Error() != nil {
|
|
return token.Error()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// createInfluxGateway initializes the InfluxDB client.
|
|
func (gh *Gateway) createInfluxGateway() {
|
|
|
|
// Get env variables and set default values if not set
|
|
INFLUXDB_ORG, ok := os.LookupEnv("INFLUXDB_ORG")
|
|
if !ok {
|
|
log.Error("INFLUXDB_ORG not set, using default value: raclette")
|
|
INFLUXDB_ORG = "raclette"
|
|
}
|
|
|
|
INFLUXDB_BUCKET, ok := os.LookupEnv("INFLUXDB_BUCKET")
|
|
if !ok {
|
|
log.Error("INFLUXDB_BUCKET not set, using default value: raclette")
|
|
INFLUXDB_BUCKET = "raclette"
|
|
}
|
|
|
|
// Get env variables and set default values if not set
|
|
INFLUXDB_URL, ok := os.LookupEnv("INFLUXDB_URL")
|
|
if !ok {
|
|
log.Error("INFLUXDB_URL not set, using default value: http://influx.mse.kb28.ch")
|
|
INFLUXDB_URL = "http://influx.mse.kb28.ch"
|
|
}
|
|
|
|
// Get env variables for InfluxDB credentials, panic if not set
|
|
INFLUXDB_TOKEN, ok := os.LookupEnv("INFLUXDB_TOKEN")
|
|
if !ok {
|
|
log.Fatal("INFLUXDB_TOKEN not set")
|
|
}
|
|
|
|
// Initialize the InfluxDB client
|
|
gh.influx = influxdb2.NewClient(INFLUXDB_URL, INFLUXDB_TOKEN)
|
|
gh.influxApi = gh.influx.WriteAPIBlocking(INFLUXDB_ORG, INFLUXDB_BUCKET)
|
|
gh.influxApi.EnableBatching()
|
|
}
|
|
|
|
// NewGateway initializes the Gateway instance by setting up the necessary components.
|
|
//
|
|
// It performs the following actions:
|
|
// - Initializes the InfluxDB client using the provided token.
|
|
// - Creates user account with unique credentials and InfluxDB write APIs.
|
|
// - Sets up the REST API server with routes for health checks, Swagger documentation, and secured endpoints.
|
|
// - Connects to the MQTT broker and subscribes to relevant topics.
|
|
//
|
|
// Returns:
|
|
// - *Gateway: A pointer to the initialized Gateway instance.
|
|
// - error: An error if any of the initialization steps fail.
|
|
func NewGateway() (*Gateway, error) {
|
|
var gh = Gateway{}
|
|
var err error
|
|
|
|
// Initialize the InfluxDB client
|
|
gh.createInfluxGateway()
|
|
|
|
// Generate user accounts and credentials
|
|
gh.getUser()
|
|
|
|
// Set up the REST API server
|
|
gh.createRestGateway()
|
|
|
|
// Initialize the MQTT client and subscriptions
|
|
err = gh.createMQTTGateway()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Start the HTTP server
|
|
err = gh.rest.Run(":8080")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &gh, nil
|
|
}
|