Files
localiserd/lib/localiser/mqtt/connection.ex
T

102 lines
3.1 KiB
Elixir

defmodule Localiser.MQTT.Connection do
@moduledoc """
GenServer responsible for maintaining a connection to the MQTT broker.
"""
use GenServer
require Logger
@broker_host Application.compile_env(:localiserd, :mqtt_host, "localhost")
@broker_port Application.compile_env(:localiserd, :mqtt_port, 1883)
@rssi_topic "localiser/sensor/+/rssi"
@announce_topic "localiser/sensor/+/announce"
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def publish(topic, payload) do
GenServer.call(__MODULE__, {:publish, topic, payload})
end
# Server Callbacks
@impl true
def init(_opts) do
Process.flag(:trap_exit, true)
send(self(), :connect)
{:ok, %{client_id: nil, connected: false, client_pid: nil}}
end
@impl true
def handle_info(:connect, state) do
client_id = "localiser_#{:rand.uniform(1000)}"
Logger.info("[MQTT.Connection] Attempting to connect to MQTT broker at #{@broker_host}:#{@broker_port} with client ID #{client_id}")
case do_connect(client_id) do
{:ok, pid} ->
Logger.info("[MQTT.Connection] Connected to MQTT broker with client ID #{client_id}")
{:noreply, %{state | client_id: client_id, connected: true, client_pid: pid}}
{:error, reason} ->
# TODO: Implement exponential backoff for retries
Logger.error("[MQTT.Connection] Failed to connect to MQTT broker: #{inspect(reason)}. Retrying in 5 seconds...")
Process.send_after(self(), :connect, 5_000)
{:noreply, %{state | connected: false, client_pid: nil}}
end
end
# Emqtt process exited (broker dropped connection or startup failure)
@impl true
def handle_info({:EXIT, pid, reason}, %{client_pid: pid} = state) do
Logger.warning("[MQTT.Connection] MQTT client exited (#{inspect(reason)}). Reconnecting in 5 seconds...")
Process.send_after(self(), :connect, 5_000)
{:noreply, %{state | connected: false, client_pid: nil}}
end
def handle_info({:EXIT, _pid, _reason}, state), do: {:noreply, state}
def handle_info({:publish, %{:topic => topic, :payload => payload}}, state) do
Localiser.MQTT.Router.route(topic, payload)
{:noreply, state}
end
@impl true
def handle_call({:publish, _topic, _payload}, _from, %{connected: false} = state) do
{:reply, {:error, :not_connected}, state}
end
@impl true
def handle_call({:publish, topic, payload}, _from, %{client_pid: pid} = state) do
result = :emqtt.publish(pid, topic, payload)
{:reply, result, state}
end
# Private
defp do_connect(client_id) do
case :emqtt.start_link(host: @broker_host, port: @broker_port, clientid: client_id) do
{:ok, pid} -> do_connect_pid(pid)
{:error, reason} -> {:error, reason}
end
end
defp do_connect_pid(pid) do
case :emqtt.connect(pid) do
{:ok, _props} ->
:emqtt.subscribe(pid, {@rssi_topic, 1})
:emqtt.subscribe(pid, {@announce_topic, 1})
{:ok, pid}
{:error, reason} ->
if Process.alive?(pid), do: :emqtt.stop(pid)
{:error, reason}
end
catch
:exit, reason ->
if Process.alive?(pid), do: :emqtt.stop(pid)
{:error, reason}
end
end