73 lines
2.1 KiB
Elixir
73 lines
2.1 KiB
Elixir
defmodule Localiser.RSSI.Buffer do
|
|
use GenServer
|
|
|
|
alias Localiser.Localisation.Tag.Filter, as: TagFilter
|
|
alias Localiser.Localisation.Sensor.Server, as: SensorServer
|
|
|
|
@flush_interval_ms 500
|
|
|
|
# reading :: %{sensor_id: String.t(), tag_id: String.t(), rssi: integer(), tx_power: integer() | nil}
|
|
def push(reading) do
|
|
GenServer.cast(__MODULE__, {:push, reading})
|
|
end
|
|
|
|
def start_link(_args) do
|
|
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
|
|
end
|
|
|
|
@impl true
|
|
def init(state) do
|
|
schedule_flush()
|
|
{:ok, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast({:push, %{tag_id: tag_id} = reading}, state) do
|
|
{:noreply, Map.update(state, tag_id, [reading], &[reading | &1])}
|
|
end
|
|
|
|
@impl true
|
|
def handle_info(:flush, state) do
|
|
flush_batches(state)
|
|
schedule_flush()
|
|
{:noreply, %{}}
|
|
end
|
|
|
|
defp flush_batches(batches) do
|
|
Enum.each(batches, fn {tag_id, readings} ->
|
|
# Calibration routing runs regardless of whether the tag has a filter registered,
|
|
# so scan-mode readings from unknown tags still reach the sensor server.
|
|
normal = Enum.flat_map(readings, &route_reading(tag_id, &1))
|
|
|
|
if normal != [] do
|
|
case Registry.lookup(Localiser.Registry, {:filter, tag_id}) do
|
|
[{_pid, _}] -> TagFilter.ingest(tag_id, normal)
|
|
[] -> :ok
|
|
end
|
|
end
|
|
end)
|
|
end
|
|
|
|
# Routes one reading. If the sensor is in any calibration state, forwards to Sensor.Server
|
|
# (with tag_id so scan-mode can advertise it) and returns [] to exclude from Tag.Filter.
|
|
# Otherwise returns a normal measurement map.
|
|
defp route_reading(tag_id, %{sensor_id: sensor_id, rssi: rssi, tx_power: tx_power}) do
|
|
case Registry.lookup(Localiser.Registry, {:sensor, sensor_id}) do
|
|
[{_pid, _}] ->
|
|
if SensorServer.in_calibration_mode?(sensor_id) do
|
|
SensorServer.calibration_reading(sensor_id, tag_id, rssi)
|
|
[]
|
|
else
|
|
[SensorServer.measure(sensor_id, rssi, tx_power)]
|
|
end
|
|
|
|
[] ->
|
|
[]
|
|
end
|
|
end
|
|
|
|
defp schedule_flush do
|
|
Process.send_after(self(), :flush, @flush_interval_ms)
|
|
end
|
|
end
|