defmodule Localiser.Localisation.Sensor.Server do use GenServer require Logger alias Localiser.Domain.Sensors alias Localiser.Domain.Schema.{Sensor, SensorCalibration} alias Localiser.Localisation.Calibration alias Localiser.MQTT.Connection, as: MQTTConnection @default_rssi_ref -59 @default_path_loss_exp 2.0 @default_samples 30 # mode: # :ok # {:calibration_mode, completed_stages} # {:calibrating_stage, distance, samples, completed_stages} # # completed_stages :: [%{distance: float, mean_rssi: float, readings: [{rssi, is_outlier}]}] defstruct [:sensor_id, :sensor_db_id, :floor_x, :floor_y, :rssi_ref, :path_loss_exp, mode: :ok] def start_link({sensor, room}) do GenServer.start_link(__MODULE__, {sensor, room}, name: via(sensor.sensor_id)) end def via(sensor_id) do {:via, Registry, {Localiser.Registry, {:sensor, sensor_id}}} end def measure(sensor_id, rssi, tx_power \\ nil) do GenServer.call(via(sensor_id), {:measure, rssi, tx_power}) end # Returns true only when a stage is actively collecting (used by RSSI.Buffer). def calibrating?(sensor_id) do GenServer.call(via(sensor_id), :calibrating?) end # Returns a JSON-serialisable snapshot of the current calibration state for channel join. def calibration_state(sensor_id) do GenServer.call(via(sensor_id), :calibration_state) end # Feeds a raw RSSI value into the active stage buffer. Ignored between stages. def calibration_reading(sensor_id, rssi) do GenServer.cast(via(sensor_id), {:calibration_reading, rssi}) end # Puts the sensor into calibration mode (between-stages). Returns :ok. def begin_calibration_mode(sensor_id) do GenServer.call(via(sensor_id), :begin_calibration_mode) end # Starts collecting samples for a given distance. Returns {:ok, samples_needed} or {:error, reason}. def start_stage(sensor_id, distance) do GenServer.call(via(sensor_id), {:start_stage, distance}) end # Runs OLS regression over completed stages and saves the result. Requires >= 2 stages. # Returns {:ok, %{rssi_ref: integer, path_loss_exp: float}} or {:error, reason}. def finish_calibration(sensor_id) do GenServer.call(via(sensor_id), :finish_calibration) end # Aborts calibration from any state, discarding all stages. def abort_calibration(sensor_id) do GenServer.cast(via(sensor_id), :abort_calibration) end @impl true def init({sensor, room}) do Phoenix.PubSub.subscribe(Localiser.PubSub, "sensors") calibration = Sensors.latest_calibration(sensor) {rssi_ref, path_loss_exp} = calibration_params(calibration) state = %__MODULE__{ sensor_id: sensor.sensor_id, sensor_db_id: sensor.id, floor_x: (room.x || 0.0) + (sensor.x || 0.0), floor_y: (room.y || 0.0) + (sensor.y || 0.0), rssi_ref: rssi_ref, path_loss_exp: path_loss_exp } {:ok, state} end @impl true def handle_call({:measure, rssi, tx_power}, _from, state) do distance = rssi_to_distance(rssi, state.rssi_ref, state.path_loss_exp) measurement = %{ sensor_id: state.sensor_id, floor_x: state.floor_x, floor_y: state.floor_y, distance: distance, rssi: rssi, tx_power: tx_power } {:reply, measurement, state} end @impl true def handle_call(:calibrating?, _from, state) do {:reply, match?({:calibrating_stage, _, _, _}, state.mode), state} end @impl true def handle_call(:calibration_state, _from, state) do snapshot = case state.mode do :ok -> %{status: "idle"} {:calibration_mode, completed} -> %{ status: "calibration_mode", samples_needed: samples_needed(), completed_stages: Enum.map(completed, &render_stage/1) } {:calibrating_stage, distance, samples, completed} -> %{ status: "stage_active", distance: distance, samples_needed: samples_needed(), stage_progress: {length(samples), samples_needed()}, completed_stages: Enum.map(completed, &render_stage/1) } end {:reply, snapshot, state} end @impl true def handle_call(:begin_calibration_mode, _from, state) do MQTTConnection.publish("localiser/sensor/#{state.sensor_id}/cmd", ~s({"action":"calibrate_start"})) broadcast_calibration(state.sensor_id, {:calibration_mode_entered, state.sensor_id, samples_needed()}) {:reply, :ok, %{state | mode: {:calibration_mode, []}}} end @impl true def handle_call({:start_stage, _distance}, _from, %{mode: {:calibrating_stage, _, _, _}} = state) do {:reply, {:error, :already_active}, state} end def handle_call({:start_stage, _distance}, _from, %{mode: :ok} = state) do {:reply, {:error, :not_in_calibration_mode}, state} end def handle_call({:start_stage, distance}, _from, %{mode: {:calibration_mode, completed}} = state) do n = samples_needed() broadcast_calibration(state.sensor_id, {:stage_started, state.sensor_id, distance, length(completed)}) {:reply, {:ok, n}, %{state | mode: {:calibrating_stage, distance, [], completed}}} end @impl true def handle_call(:finish_calibration, _from, %{mode: {:calibration_mode, completed}} = state) when length(completed) >= 2 do case Calibration.least_squares(completed) do {:ok, {rssi_ref, path_loss_exp}} -> sensor_struct = %Sensor{id: state.sensor_db_id, sensor_id: state.sensor_id} case Sensors.add_calibration(sensor_struct, %{ rssi_ref: rssi_ref, path_loss_exp: path_loss_exp, calibrated_at: DateTime.utc_now() }) do {:ok, _} -> Logger.info("[Sensor.Server] Calibration finished for #{state.sensor_id}: rssi_ref=#{rssi_ref} n=#{path_loss_exp}") MQTTConnection.publish("localiser/sensor/#{state.sensor_id}/cmd", ~s({"action":"calibrate_stop"})) broadcast_calibration(state.sensor_id, {:calibration_finished, state.sensor_id, rssi_ref, path_loss_exp}) Phoenix.PubSub.broadcast(Localiser.PubSub, "sensors", {:calibration_complete, state.sensor_id, rssi_ref, path_loss_exp}) result = %{rssi_ref: rssi_ref, path_loss_exp: path_loss_exp} {:reply, {:ok, result}, %{state | rssi_ref: rssi_ref, path_loss_exp: path_loss_exp, mode: :ok}} {:error, reason} -> Logger.error("[Sensor.Server] Failed to save calibration for #{state.sensor_id}: #{inspect(reason)}") {:reply, {:error, :save_failed}, state} end {:error, reason} -> {:reply, {:error, reason}, state} end end def handle_call(:finish_calibration, _from, %{mode: {:calibration_mode, _}} = state) do {:reply, {:error, :insufficient_stages}, state} end def handle_call(:finish_calibration, _from, %{mode: {:calibrating_stage, _, _, _}} = state) do {:reply, {:error, :stage_active}, state} end def handle_call(:finish_calibration, _from, state) do {:reply, {:error, :not_in_calibration_mode}, state} end @impl true def handle_cast(:abort_calibration, state) do case state.mode do :ok -> {:noreply, state} _ -> MQTTConnection.publish("localiser/sensor/#{state.sensor_id}/cmd", ~s({"action":"calibrate_stop"})) broadcast_calibration(state.sensor_id, {:calibration_cancelled, state.sensor_id}) {:noreply, %{state | mode: :ok}} end end @impl true def handle_cast({:calibration_reading, rssi}, %{mode: {:calibrating_stage, distance, samples, completed}} = state) do n = samples_needed() is_outlier = Calibration.outlier?(rssi, samples) new_samples = [rssi | samples] progress = {length(new_samples), n} broadcast_calibration(state.sensor_id, {:calibration_reading, state.sensor_id, rssi, is_outlier, %{stage: progress}}) if length(new_samples) >= n do classified = Calibration.classify_outliers(new_samples) clean = for {r, false} <- classified, do: r mean_rssi = if clean == [] do mean(new_samples) else mean(clean) end stage = %{distance: distance, mean_rssi: mean_rssi, readings: classified} new_completed = [stage | Enum.reject(completed, &(&1.distance == distance))] broadcast_calibration(state.sensor_id, {:stage_complete, state.sensor_id, distance, classified, mean_rssi}) {:noreply, %{state | mode: {:calibration_mode, new_completed}}} else {:noreply, %{state | mode: {:calibrating_stage, distance, new_samples, completed}}} end end def handle_cast({:calibration_reading, _rssi}, state), do: {:noreply, state} @impl true def handle_info({:sensor_enrolled, %Sensor{sensor_id: sid} = sensor, room}, %{sensor_id: sid} = state) do floor_x = (room.x || 0.0) + (sensor.x || 0.0) floor_y = (room.y || 0.0) + (sensor.y || 0.0) {:noreply, %{state | floor_x: floor_x, floor_y: floor_y}} end def handle_info(_msg, state), do: {:noreply, state} # Private defp broadcast_calibration(sensor_id, message) do Phoenix.PubSub.broadcast(Localiser.PubSub, "calibration:#{sensor_id}", message) end defp render_stage(%{distance: d, mean_rssi: r, readings: readings}) do %{distance: d, mean_rssi: r, readings: Enum.map(readings, fn {rssi, outlier} -> %{rssi: rssi, outlier: outlier} end)} end defp samples_needed do Application.get_env(:localiser, :calibration_samples, @default_samples) end defp mean(list) do Enum.sum(list) / length(list) end defp rssi_to_distance(rssi, rssi_ref, path_loss_exp) do :math.pow(10.0, (rssi_ref - rssi) / (10.0 * path_loss_exp)) end defp calibration_params(nil), do: {@default_rssi_ref, @default_path_loss_exp} defp calibration_params(%SensorCalibration{rssi_ref: rssi_ref, path_loss_exp: path_loss_exp}) do {rssi_ref, path_loss_exp} end end