init: inital commit
This commit is contained in:
@@ -0,0 +1,101 @@
|
||||
defmodule Localiser.MQTT.Connection do
|
||||
@moduledoc """
|
||||
GenServer responsible for maintaining a connection to the MQTT broker.
|
||||
"""
|
||||
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
@broker_host Application.compile_env(:localiser, :mqtt_host, "localhost")
|
||||
@broker_port Application.compile_env(:localiser, :mqtt_port, 1883)
|
||||
|
||||
@rssi_topic "localiser/sensor/+/rssi"
|
||||
@announce_topic "localiser/sensor/+/announce"
|
||||
|
||||
def start_link(opts) do
|
||||
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
||||
end
|
||||
|
||||
def publish(topic, payload) do
|
||||
GenServer.call(__MODULE__, {:publish, topic, payload})
|
||||
end
|
||||
|
||||
# Server Callbacks
|
||||
|
||||
@impl true
|
||||
def init(_opts) do
|
||||
Process.flag(:trap_exit, true)
|
||||
send(self(), :connect)
|
||||
{:ok, %{client_id: nil, connected: false, client_pid: nil}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:connect, state) do
|
||||
client_id = "localiser_#{:rand.uniform(1000)}"
|
||||
Logger.info("[MQTT.Connection] Attempting to connect to MQTT broker at #{@broker_host}:#{@broker_port} with client ID #{client_id}")
|
||||
|
||||
case do_connect(client_id) do
|
||||
{:ok, pid} ->
|
||||
Logger.info("[MQTT.Connection] Connected to MQTT broker with client ID #{client_id}")
|
||||
{:noreply, %{state | client_id: client_id, connected: true, client_pid: pid}}
|
||||
|
||||
{:error, reason} ->
|
||||
# TODO: Implement exponential backoff for retries
|
||||
Logger.error("[MQTT.Connection] Failed to connect to MQTT broker: #{inspect(reason)}. Retrying in 5 seconds...")
|
||||
Process.send_after(self(), :connect, 5_000)
|
||||
{:noreply, %{state | connected: false, client_pid: nil}}
|
||||
end
|
||||
end
|
||||
|
||||
# Emqtt process exited (broker dropped connection or startup failure)
|
||||
@impl true
|
||||
def handle_info({:EXIT, pid, reason}, %{client_pid: pid} = state) do
|
||||
Logger.warning("[MQTT.Connection] MQTT client exited (#{inspect(reason)}). Reconnecting in 5 seconds...")
|
||||
Process.send_after(self(), :connect, 5_000)
|
||||
{:noreply, %{state | connected: false, client_pid: nil}}
|
||||
end
|
||||
|
||||
def handle_info({:EXIT, _pid, _reason}, state), do: {:noreply, state}
|
||||
|
||||
def handle_info({:publish, %{:topic => topic, :payload => payload}}, state) do
|
||||
Localiser.MQTT.Router.route(topic, payload)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:publish, _topic, _payload}, _from, %{connected: false} = state) do
|
||||
{:reply, {:error, :not_connected}, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:publish, topic, payload}, _from, %{client_pid: pid} = state) do
|
||||
result = :emqtt.publish(pid, topic, payload)
|
||||
{:reply, result, state}
|
||||
end
|
||||
|
||||
# Private
|
||||
|
||||
defp do_connect(client_id) do
|
||||
case :emqtt.start_link(host: @broker_host, port: @broker_port, clientid: client_id) do
|
||||
{:ok, pid} -> do_connect_pid(pid)
|
||||
{:error, reason} -> {:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp do_connect_pid(pid) do
|
||||
case :emqtt.connect(pid) do
|
||||
{:ok, _props} ->
|
||||
:emqtt.subscribe(pid, {@rssi_topic, 1})
|
||||
:emqtt.subscribe(pid, {@announce_topic, 1})
|
||||
{:ok, pid}
|
||||
|
||||
{:error, reason} ->
|
||||
if Process.alive?(pid), do: :emqtt.stop(pid)
|
||||
{:error, reason}
|
||||
end
|
||||
catch
|
||||
:exit, reason ->
|
||||
if Process.alive?(pid), do: :emqtt.stop(pid)
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
@@ -0,0 +1,83 @@
|
||||
defmodule Localiser.MQTT.Router do
|
||||
@moduledoc """
|
||||
Module responsible for routing incoming MQTT messages to the appropriate handlers.
|
||||
"""
|
||||
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
alias Localiser.Domain.Sensors
|
||||
|
||||
def start_link(opts) do
|
||||
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
||||
end
|
||||
|
||||
def route(topic, payload) do
|
||||
GenServer.cast(__MODULE__, {:route, topic, payload})
|
||||
end
|
||||
|
||||
# GenServer Callbacks
|
||||
|
||||
@impl true
|
||||
def init(_opts) do
|
||||
{:ok, %{}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:route, topic, payload}, state) do
|
||||
case parse_topic(topic) do
|
||||
{:rssi, sensor_id} ->
|
||||
handle_rssi(sensor_id, payload)
|
||||
|
||||
{:announce, sensor_id} ->
|
||||
handle_announce(sensor_id)
|
||||
|
||||
{:error, :invalid_topic} ->
|
||||
Logger.debug("[MQTT.Router] Received message with invalid topic: #{topic}")
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
# Private helper functions
|
||||
|
||||
defp parse_topic(topic) do
|
||||
# Example topic: "localiser/sensor/device123/rssi"
|
||||
case String.split(topic, "/") do
|
||||
["localiser", "sensor", sensor_id, "rssi"] -> {:rssi, sensor_id}
|
||||
["localiser", "sensor", sensor_id, "announce"] -> {:announce, sensor_id}
|
||||
_ -> {:error, :invalid_topic}
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_rssi(sensor_id, payload) do
|
||||
with {:ok, %{"tag_id" => tag_id, "rssi" => rssi}} <-
|
||||
Jason.decode(payload) do
|
||||
|
||||
reading = %{
|
||||
sensor_id: sensor_id,
|
||||
tag_id: tag_id,
|
||||
rssi: rssi,
|
||||
timestamp: DateTime.utc_now()
|
||||
}
|
||||
|
||||
Localiser.MQTT.Telemetry.count_reading()
|
||||
Localiser.RSSI.Buffer.push(reading)
|
||||
|
||||
else
|
||||
{:error, reason} ->
|
||||
Logger.error("[MQTT.Router] Bad payload from #{sensor_id}: #{inspect(reason)}")
|
||||
Localiser.MQTT.Telemetry.count_error()
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_announce(sensor_id) do
|
||||
case Sensors.upsert_announced(sensor_id) do
|
||||
{:ok, _sensor} ->
|
||||
Logger.info("[MQTT.Router] Sensor announced: #{sensor_id}")
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("[MQTT.Router] Failed to register announced sensor #{sensor_id}: #{inspect(reason)}")
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -0,0 +1,22 @@
|
||||
defmodule Localiser.MQTT.Supervisor do
|
||||
@moduledoc """
|
||||
Supervisor responsible for managing MQTT-related processes.
|
||||
"""
|
||||
|
||||
use Supervisor
|
||||
|
||||
def start_link(opts) do
|
||||
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(_opts) do
|
||||
children = [
|
||||
Localiser.MQTT.Connection,
|
||||
Localiser.MQTT.Router,
|
||||
Localiser.MQTT.Telemetry
|
||||
]
|
||||
|
||||
Supervisor.init(children, strategy: :rest_for_one)
|
||||
end
|
||||
end
|
||||
@@ -0,0 +1,59 @@
|
||||
defmodule Localiser.MQTT.Telemetry do
|
||||
@moduledoc """
|
||||
GenServer responsible for tracking telemetry data related to MQTT messages.
|
||||
"""
|
||||
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
def start_link(opts) do
|
||||
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
||||
end
|
||||
|
||||
def count_reading() do
|
||||
GenServer.cast(__MODULE__, :count_reading)
|
||||
end
|
||||
|
||||
def count_error() do
|
||||
GenServer.cast(__MODULE__, :count_error)
|
||||
end
|
||||
|
||||
@log_interval 60_000
|
||||
|
||||
# GenServer Callbacks
|
||||
|
||||
@impl true
|
||||
def init(_opts) do
|
||||
schedule_log()
|
||||
{:ok, %{total_readings: 0, total_errors: 0}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast(:count_reading, state) do
|
||||
new_count = state.total_readings + 1
|
||||
{:noreply, %{state | total_readings: new_count}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast(:count_error, state) do
|
||||
new_count = state.total_errors + 1
|
||||
{:noreply, %{state | total_errors: new_count}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:log_stats, state) do
|
||||
rate = if state.total_readings > 0 do
|
||||
Float.round(state.total_errors / state.total_readings * 100, 2)
|
||||
else
|
||||
0.0
|
||||
end
|
||||
Logger.info("[MQTT.Telemetry] #{state.total_readings} readings, #{state.total_errors} errors (#{rate} err rate) in last #{@log_interval / 1000}s.")
|
||||
schedule_log()
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
# Private helper functions
|
||||
defp schedule_log() do
|
||||
Process.send_after(self(), :log_stats, @log_interval)
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user