93 lines
2.4 KiB
Elixir
93 lines
2.4 KiB
Elixir
defmodule Localiser.MQTT.Router do
|
|
@moduledoc """
|
|
Module responsible for routing incoming MQTT messages to the appropriate handlers.
|
|
"""
|
|
|
|
use GenServer
|
|
require Logger
|
|
|
|
alias Localiser.Domain.Sensors
|
|
|
|
def start_link(opts) do
|
|
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
|
end
|
|
|
|
def route(topic, payload) do
|
|
GenServer.cast(__MODULE__, {:route, topic, payload})
|
|
end
|
|
|
|
# GenServer Callbacks
|
|
|
|
@impl true
|
|
def init(_opts) do
|
|
{:ok, %{}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast({:route, topic, payload}, state) do
|
|
case parse_topic(topic) do
|
|
{:rssi, sensor_id} ->
|
|
handle_rssi(sensor_id, payload)
|
|
|
|
{:announce, sensor_id} ->
|
|
handle_announce(sensor_id, payload)
|
|
|
|
{:error, :invalid_topic} ->
|
|
Logger.debug("[MQTT.Router] Received message with invalid topic: #{topic}")
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
# Private helper functions
|
|
|
|
defp parse_topic(topic) do
|
|
# Example topic: "localiser/sensor/device123/rssi"
|
|
case String.split(topic, "/") do
|
|
["localiser", "sensor", sensor_id, "rssi"] -> {:rssi, sensor_id}
|
|
["localiser", "sensor", sensor_id, "announce"] -> {:announce, sensor_id}
|
|
_ -> {:error, :invalid_topic}
|
|
end
|
|
end
|
|
|
|
defp handle_rssi(sensor_id, payload) do
|
|
if sensor_id == "anchor_73f460" do
|
|
Logger.error("Sensor send payload: #{payload}")
|
|
end
|
|
|
|
case Jason.decode(payload) do
|
|
{:ok, %{"id" => id, "rssi" => rssi, "type" => type} = decoded} ->
|
|
reading = %{
|
|
type: type,
|
|
sensor_id: sensor_id,
|
|
tag_id: "#{type}:#{id}",
|
|
rssi: rssi,
|
|
tx_power: Map.get(decoded, "tx_power"),
|
|
timestamp: DateTime.utc_now()
|
|
}
|
|
Localiser.MQTT.Telemetry.count_reading()
|
|
Localiser.RSSI.Buffer.push(reading)
|
|
|
|
{:error, reason} ->
|
|
Logger.error("[MQTT.Router] Bad payload from #{sensor_id}: #{inspect(reason)}")
|
|
Localiser.MQTT.Telemetry.count_error()
|
|
end
|
|
end
|
|
|
|
defp handle_announce(sensor_id, payload) do
|
|
version =
|
|
case Jason.decode(payload) do
|
|
{:ok, %{"version" => v}} when is_binary(v) -> v
|
|
_ -> nil
|
|
end
|
|
|
|
case Sensors.upsert_announced(sensor_id, version) do
|
|
{:ok, _sensor} ->
|
|
Logger.info("[MQTT.Router] Sensor announced: #{sensor_id}")
|
|
|
|
{:error, reason} ->
|
|
Logger.error("[MQTT.Router] Failed to register announced sensor #{sensor_id}: #{inspect(reason)}")
|
|
end
|
|
end
|
|
end
|