From 34ddbe669e3c56dc70578d9cb5516efe37f2b1ba Mon Sep 17 00:00:00 2001 From: dvdrw Date: Thu, 16 Apr 2026 15:46:00 +0200 Subject: [PATCH] init: inital commit --- .formatter.exs | 4 + .gitignore | 26 +++ README.md | 21 +++ config/config.exs | 7 + lib/localiser/application.ex | 19 ++ lib/localiser/domain/floors.ex | 34 ++++ lib/localiser/domain/rooms.ex | 34 ++++ lib/localiser/domain/schema/floor.ex | 21 +++ lib/localiser/domain/schema/room.ex | 28 +++ lib/localiser/domain/schema/sensor.ex | 27 +++ .../domain/schema/sensor_calibration.ex | 26 +++ lib/localiser/domain/schema/tag.ex | 21 +++ lib/localiser/domain/schema/user.ex | 28 +++ lib/localiser/domain/sensors.ex | 118 ++++++++++++ lib/localiser/domain/tags.ex | 30 ++++ lib/localiser/domain/users.ex | 32 ++++ .../localisation/filter/behaviour.ex | 32 ++++ .../filter/discrete_recursive_bayes.ex | 21 +++ lib/localiser/localisation/filter/kalman.ex | 21 +++ lib/localiser/localisation/filter/particle.ex | 21 +++ .../localisation/filter/supervisor.ex | 32 ++++ lib/localiser/localisation/floor/server.ex | 24 +++ .../localisation/floor_supervisor.ex | 32 ++++ lib/localiser/localisation/room/server.ex | 75 ++++++++ lib/localiser/localisation/room/supervisor.ex | 41 +++++ lib/localiser/localisation/sensor/manager.ex | 54 ++++++ lib/localiser/localisation/sensor/server.ex | 170 ++++++++++++++++++ .../localisation/sensor/supervisor.ex | 44 +++++ lib/localiser/localisation/tag/filter.ex | 93 ++++++++++ lib/localiser/mqtt/connection.ex | 101 +++++++++++ lib/localiser/mqtt/router.ex | 83 +++++++++ lib/localiser/mqtt/supervisor.ex | 22 +++ lib/localiser/mqtt/telemetry.ex | 59 ++++++ lib/localiser/repo.ex | 5 + lib/localiser/rssi_buffer.ex | 72 ++++++++ lib/localiserd.ex | 18 ++ mix.exs | 32 ++++ mix.lock | 19 ++ test/localiserd_test.exs | 8 + test/test_helper.exs | 1 + 40 files changed, 1556 insertions(+) create mode 100644 .formatter.exs create mode 100644 .gitignore create mode 100644 README.md create mode 100644 config/config.exs create mode 100644 lib/localiser/application.ex create mode 100644 lib/localiser/domain/floors.ex create mode 100644 lib/localiser/domain/rooms.ex create mode 100644 lib/localiser/domain/schema/floor.ex create mode 100644 lib/localiser/domain/schema/room.ex create mode 100644 lib/localiser/domain/schema/sensor.ex create mode 100644 lib/localiser/domain/schema/sensor_calibration.ex create mode 100644 lib/localiser/domain/schema/tag.ex create mode 100644 lib/localiser/domain/schema/user.ex create mode 100644 lib/localiser/domain/sensors.ex create mode 100644 lib/localiser/domain/tags.ex create mode 100644 lib/localiser/domain/users.ex create mode 100644 lib/localiser/localisation/filter/behaviour.ex create mode 100644 lib/localiser/localisation/filter/discrete_recursive_bayes.ex create mode 100644 lib/localiser/localisation/filter/kalman.ex create mode 100644 lib/localiser/localisation/filter/particle.ex create mode 100644 lib/localiser/localisation/filter/supervisor.ex create mode 100644 lib/localiser/localisation/floor/server.ex create mode 100644 lib/localiser/localisation/floor_supervisor.ex create mode 100644 lib/localiser/localisation/room/server.ex create mode 100644 lib/localiser/localisation/room/supervisor.ex create mode 100644 lib/localiser/localisation/sensor/manager.ex create mode 100644 lib/localiser/localisation/sensor/server.ex create mode 100644 lib/localiser/localisation/sensor/supervisor.ex create mode 100644 lib/localiser/localisation/tag/filter.ex create mode 100644 lib/localiser/mqtt/connection.ex create mode 100644 lib/localiser/mqtt/router.ex create mode 100644 lib/localiser/mqtt/supervisor.ex create mode 100644 lib/localiser/mqtt/telemetry.ex create mode 100644 lib/localiser/repo.ex create mode 100644 lib/localiser/rssi_buffer.ex create mode 100644 lib/localiserd.ex create mode 100644 mix.exs create mode 100644 mix.lock create mode 100644 test/localiserd_test.exs create mode 100644 test/test_helper.exs diff --git a/.formatter.exs b/.formatter.exs new file mode 100644 index 0000000..d2cda26 --- /dev/null +++ b/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..be8f04b --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# Temporary files, for example, from tests. +/tmp/ + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +localiserd-*.tar + +# sqlite database files +priv/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..5192244 --- /dev/null +++ b/README.md @@ -0,0 +1,21 @@ +# Localiserd + +**TODO: Add description** + +## Installation + +If [available in Hex](https://hex.pm/docs/publish), the package can be installed +by adding `localiserd` to your list of dependencies in `mix.exs`: + +```elixir +def deps do + [ + {:localiserd, "~> 0.1.0"} + ] +end +``` + +Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc) +and published on [HexDocs](https://hexdocs.pm). Once published, the docs can +be found at . + diff --git a/config/config.exs b/config/config.exs new file mode 100644 index 0000000..c658316 --- /dev/null +++ b/config/config.exs @@ -0,0 +1,7 @@ +import Config + +config :localiserd, Localiser.Repo, + database: Path.expand("../priv/db/localiser.db", Path.dirname(__ENV__.file)), + pool_size: 5 + +config :localiserd, ecto_repos: [Localiser.Repo] diff --git a/lib/localiser/application.ex b/lib/localiser/application.ex new file mode 100644 index 0000000..62ba46f --- /dev/null +++ b/lib/localiser/application.ex @@ -0,0 +1,19 @@ +defmodule Localiser.Application do + use Application + + @impl true + def start(_type, _args) do + children = [ + Localiser.Repo, + {Registry, keys: :unique, name: Localiser.Registry}, + {Phoenix.PubSub, name: Localiser.PubSub}, + Localiser.MQTT.Supervisor, + Localiser.RSSI.Buffer, + Localiser.Localisation.Filter.Supervisor, + Localiser.Localisation.Floor.Supervisor + ] + + opts = [strategy: :one_for_one, name: Localiser.Supervisor] + Supervisor.start_link(children, opts) + end +end diff --git a/lib/localiser/domain/floors.ex b/lib/localiser/domain/floors.ex new file mode 100644 index 0000000..5d3ff3d --- /dev/null +++ b/lib/localiser/domain/floors.ex @@ -0,0 +1,34 @@ +defmodule Localiser.Domain.Floors do + import Ecto.Query + + alias Localiser.Repo + alias Localiser.Domain.Schema.Floor + + def list_floors do + Repo.all(Floor) + end + + def get_floor!(id), do: Repo.get!(Floor, id) + + def create_floor(attrs) do + %Floor{} + |> Floor.changeset(attrs) + |> Repo.insert() + end + + def update_floor(%Floor{} = floor, attrs) do + floor + |> Floor.changeset(attrs) + |> Repo.update() + end + + def delete_floor(%Floor{} = floor) do + Repo.delete(floor) + end + + def list_floors_with_rooms do + Floor + |> preload(:rooms) + |> Repo.all() + end +end diff --git a/lib/localiser/domain/rooms.ex b/lib/localiser/domain/rooms.ex new file mode 100644 index 0000000..f3941e2 --- /dev/null +++ b/lib/localiser/domain/rooms.ex @@ -0,0 +1,34 @@ +defmodule Localiser.Domain.Rooms do + import Ecto.Query + + alias Localiser.Repo + alias Localiser.Domain.Schema.Room + + def list_rooms do + Repo.all(Room) + end + + def list_rooms_for_floor(floor_id) do + Room + |> where([r], r.floor_id == ^floor_id) + |> Repo.all() + end + + def get_room!(id), do: Repo.get!(Room, id) + + def create_room(attrs) do + %Room{} + |> Room.changeset(attrs) + |> Repo.insert() + end + + def update_room(%Room{} = room, attrs) do + room + |> Room.changeset(attrs) + |> Repo.update() + end + + def delete_room(%Room{} = room) do + Repo.delete(room) + end +end diff --git a/lib/localiser/domain/schema/floor.ex b/lib/localiser/domain/schema/floor.ex new file mode 100644 index 0000000..71066f4 --- /dev/null +++ b/lib/localiser/domain/schema/floor.ex @@ -0,0 +1,21 @@ +defmodule Localiser.Domain.Schema.Floor do + use Ecto.Schema + import Ecto.Changeset + + alias Localiser.Domain.Schema.Room + + schema "floors" do + field :name, :string + + has_many :rooms, Room + + timestamps(type: :utc_datetime) + end + + @doc false + def changeset(floor, attrs) do + floor + |> cast(attrs, [:name]) + |> validate_required([:name]) + end +end diff --git a/lib/localiser/domain/schema/room.ex b/lib/localiser/domain/schema/room.ex new file mode 100644 index 0000000..75f4754 --- /dev/null +++ b/lib/localiser/domain/schema/room.ex @@ -0,0 +1,28 @@ +defmodule Localiser.Domain.Schema.Room do + use Ecto.Schema + import Ecto.Changeset + + alias Localiser.Domain.Schema.Floor + alias Localiser.Domain.Schema.Sensor + + schema "rooms" do + field :name, :string + field :width, :float + field :height, :float + field :offset_x, :float + field :offset_y, :float + + belongs_to :floor, Floor + has_many :sensors, Sensor + + timestamps(type: :utc_datetime) + end + + @doc false + def changeset(room, attrs) do + room + |> cast(attrs, [:name, :floor_id, :width, :height, :offset_x, :offset_y]) + |> validate_required([:name, :floor_id]) + |> assoc_constraint(:floor) + end +end diff --git a/lib/localiser/domain/schema/sensor.ex b/lib/localiser/domain/schema/sensor.ex new file mode 100644 index 0000000..1d317fe --- /dev/null +++ b/lib/localiser/domain/schema/sensor.ex @@ -0,0 +1,27 @@ +defmodule Localiser.Domain.Schema.Sensor do + use Ecto.Schema + import Ecto.Changeset + + alias Localiser.Domain.Schema.Room + alias Localiser.Domain.Schema.SensorCalibration + + schema "sensors" do + field :sensor_id, :string + field :x, :float + field :y, :float + + belongs_to :room, Room + has_many :calibrations, SensorCalibration + + timestamps(type: :utc_datetime) + end + + @doc false + def changeset(sensor, attrs) do + sensor + |> cast(attrs, [:sensor_id, :room_id, :x, :y]) + |> validate_required([:sensor_id]) + |> unique_constraint(:sensor_id) + |> assoc_constraint(:room) + end +end diff --git a/lib/localiser/domain/schema/sensor_calibration.ex b/lib/localiser/domain/schema/sensor_calibration.ex new file mode 100644 index 0000000..ed6f350 --- /dev/null +++ b/lib/localiser/domain/schema/sensor_calibration.ex @@ -0,0 +1,26 @@ +defmodule Localiser.Domain.Schema.SensorCalibration do + use Ecto.Schema + import Ecto.Changeset + + alias Localiser.Domain.Schema.Sensor + + @timestamps_opts [inserted_at: :inserted_at, updated_at: false, type: :utc_datetime] + + schema "sensor_calibrations" do + field :rssi_ref, :integer + field :path_loss_exp, :float + field :calibrated_at, :utc_datetime + + belongs_to :sensor, Sensor + + timestamps(@timestamps_opts) + end + + @doc false + def changeset(calibration, attrs) do + calibration + |> cast(attrs, [:sensor_id, :rssi_ref, :path_loss_exp, :calibrated_at]) + |> validate_required([:sensor_id, :rssi_ref, :path_loss_exp, :calibrated_at]) + |> assoc_constraint(:sensor) + end +end diff --git a/lib/localiser/domain/schema/tag.ex b/lib/localiser/domain/schema/tag.ex new file mode 100644 index 0000000..7237fab --- /dev/null +++ b/lib/localiser/domain/schema/tag.ex @@ -0,0 +1,21 @@ +defmodule Localiser.Domain.Schema.Tag do + use Ecto.Schema + import Ecto.Changeset + + schema "tags" do + field :tag_id, :string + field :name, :string + field :color, :string + field :metadata, :map + + timestamps(type: :utc_datetime) + end + + @doc false + def changeset(tag, attrs) do + tag + |> cast(attrs, [:tag_id, :name, :color, :metadata]) + |> validate_required([:tag_id]) + |> unique_constraint(:tag_id) + end +end diff --git a/lib/localiser/domain/schema/user.ex b/lib/localiser/domain/schema/user.ex new file mode 100644 index 0000000..36991c8 --- /dev/null +++ b/lib/localiser/domain/schema/user.ex @@ -0,0 +1,28 @@ +defmodule Localiser.Domain.Schema.User do + use Ecto.Schema + import Ecto.Changeset + + schema "users" do + field :username, :string + field :password_hash, :string, redact: true + field :password, :string, virtual: true, redact: true + + timestamps(type: :utc_datetime) + end + + @doc false + def changeset(user, attrs) do + user + |> cast(attrs, [:username, :password]) + |> validate_required([:username, :password]) + |> validate_length(:password, min: 8) + |> unique_constraint(:username) + |> hash_password() + end + + defp hash_password(%Ecto.Changeset{valid?: true, changes: %{password: pw}} = changeset) do + put_change(changeset, :password_hash, Argon2.hash_pwd_salt(pw)) + end + + defp hash_password(changeset), do: changeset +end diff --git a/lib/localiser/domain/sensors.ex b/lib/localiser/domain/sensors.ex new file mode 100644 index 0000000..02ce811 --- /dev/null +++ b/lib/localiser/domain/sensors.ex @@ -0,0 +1,118 @@ +defmodule Localiser.Domain.Sensors do + import Ecto.Query + + alias Localiser.Repo + alias Localiser.Domain.Schema.Sensor + alias Localiser.Domain.Schema.SensorCalibration + + def list_sensors do + Repo.all(Sensor) + end + + def list_sensors_for_room(room_id) do + Sensor + |> where([s], s.room_id == ^room_id) + |> Repo.all() + end + + def list_sensors_for_floor(floor_id) do + Sensor + |> join(:inner, [s], r in assoc(s, :room)) + |> where([_s, r], r.floor_id == ^floor_id) + |> Repo.all() + end + + def get_sensor!(id), do: Repo.get!(Sensor, id) + + def get_sensor_by_sensor_id(sensor_id) do + Repo.get_by(Sensor, sensor_id: sensor_id) + end + + def create_sensor(attrs) do + %Sensor{} + |> Sensor.changeset(attrs) + |> Repo.insert() + end + + def update_sensor(%Sensor{} = sensor, attrs) do + sensor + |> Sensor.changeset(attrs) + |> Repo.update() + end + + def delete_sensor(%Sensor{} = sensor) do + Repo.delete(sensor) + end + + def enroll_sensor(%Sensor{} = sensor, room_id) do + sensor + |> Sensor.changeset(%{room_id: room_id}) + |> Repo.update() + end + + def add_calibration(%Sensor{} = sensor, attrs) do + attrs = Map.put(attrs, :sensor_id, sensor.id) + + %SensorCalibration{} + |> SensorCalibration.changeset(attrs) + |> Repo.insert() + end + + def list_calibrations(%Sensor{} = sensor) do + SensorCalibration + |> where([c], c.sensor_id == ^sensor.id) + |> order_by([c], desc: c.calibrated_at) + |> Repo.all() + end + + def latest_calibration(%Sensor{} = sensor) do + SensorCalibration + |> where([c], c.sensor_id == ^sensor.id) + |> order_by([c], desc: c.calibrated_at) + |> limit(1) + |> Repo.one() + end + + # Sensor lifecycle / enrollment helpers + + # Called when an ESP board self-announces on MQTT. Inserts a new unplaced sensor + # record, or bumps updated_at if the sensor_id already exists. + def upsert_announced(sensor_id) do + result = + %Sensor{} + |> Sensor.changeset(%{sensor_id: sensor_id}) + |> Repo.insert( + on_conflict: [set: [updated_at: DateTime.utc_now()]], + conflict_target: :sensor_id, + returning: true + ) + + case result do + {:ok, sensor} -> + Phoenix.PubSub.broadcast(Localiser.PubSub, "sensors", {:sensor_announced, sensor}) + {:ok, sensor} + + error -> + error + end + end + + # Places (or re-places) a sensor at a specific position within a room. + # Broadcasts {:sensor_enrolled, sensor, room} for Sensor.Supervisor / Sensor.Server. + def place_sensor(%Sensor{} = sensor, room_id, {x, y}) do + with {:ok, sensor} <- update_sensor(sensor, %{room_id: room_id, x: x, y: y}) do + room = Repo.preload(sensor, :room).room + Phoenix.PubSub.broadcast(Localiser.PubSub, "sensors", {:sensor_enrolled, sensor, room}) + {:ok, sensor} + end + end + + # Removes a sensor from the room layout without deleting the DB record. + # Broadcasts {:sensor_unenrolled, sensor_id} for Sensor.Supervisor. + def remove_from_layout(%Sensor{} = sensor) do + with {:ok, sensor} <- update_sensor(sensor, %{room_id: nil, x: nil, y: nil}) do + Phoenix.PubSub.broadcast(Localiser.PubSub, "sensors", {:sensor_unenrolled, sensor.sensor_id}) + {:ok, sensor} + end + end +end diff --git a/lib/localiser/domain/tags.ex b/lib/localiser/domain/tags.ex new file mode 100644 index 0000000..3b1955f --- /dev/null +++ b/lib/localiser/domain/tags.ex @@ -0,0 +1,30 @@ +defmodule Localiser.Domain.Tags do + alias Localiser.Repo + alias Localiser.Domain.Schema.Tag + + def list_tags do + Repo.all(Tag) + end + + def get_tag!(id), do: Repo.get!(Tag, id) + + def get_tag_by_tag_id(tag_id) do + Repo.get_by(Tag, tag_id: tag_id) + end + + def create_tag(attrs) do + %Tag{} + |> Tag.changeset(attrs) + |> Repo.insert() + end + + def update_tag(%Tag{} = tag, attrs) do + tag + |> Tag.changeset(attrs) + |> Repo.update() + end + + def delete_tag(%Tag{} = tag) do + Repo.delete(tag) + end +end diff --git a/lib/localiser/domain/users.ex b/lib/localiser/domain/users.ex new file mode 100644 index 0000000..031e2ae --- /dev/null +++ b/lib/localiser/domain/users.ex @@ -0,0 +1,32 @@ +defmodule Localiser.Domain.Users do + alias Localiser.Repo + alias Localiser.Domain.Schema.User + + def get_user!(id), do: Repo.get!(User, id) + + def get_user_by_username(username) do + Repo.get_by(User, username: username) + end + + def create_user(attrs) do + %User{} + |> User.changeset(attrs) + |> Repo.insert() + end + + def authenticate_user(username, password) do + user = get_user_by_username(username) + + cond do + user && Argon2.verify_pass(password, user.password_hash) -> + {:ok, user} + + user -> + {:error, :invalid_credentials} + + true -> + Argon2.no_user_verify() + {:error, :invalid_credentials} + end + end +end diff --git a/lib/localiser/localisation/filter/behaviour.ex b/lib/localiser/localisation/filter/behaviour.ex new file mode 100644 index 0000000..872371d --- /dev/null +++ b/lib/localiser/localisation/filter/behaviour.ex @@ -0,0 +1,32 @@ +defmodule Localiser.Localisation.Filter.Behaviour do + @moduledoc """ + Behaviour for localisation filter implementations. + + Filters receive a batch of resolved sensor measurements (RSSI already converted + to distance) and produce an estimated `{x, y}` position in floor coordinate space. + """ + + @type sensor_measurement :: %{ + sensor_id: String.t(), + floor_x: float(), + floor_y: float(), + distance: float() + } + + @type position :: {float(), float()} + + @type confidence :: float() + + @doc "Initialise the filter. `sensors` is a list of all enrolled sensor structs." + @callback init(sensors :: list(), opts :: keyword()) :: {:ok, state :: term()} + + @doc """ + Feed a batch of measurements into the filter and return the updated position + estimate and new filter state. + """ + @callback update(state :: term(), measurements :: [sensor_measurement()]) :: + {:ok, position(), new_state :: term()} + + @doc "Return the current best position estimate and a confidence score in [0.0, 1.0]." + @callback estimate(state :: term()) :: {position(), confidence()} +end diff --git a/lib/localiser/localisation/filter/discrete_recursive_bayes.ex b/lib/localiser/localisation/filter/discrete_recursive_bayes.ex new file mode 100644 index 0000000..b64acf3 --- /dev/null +++ b/lib/localiser/localisation/filter/discrete_recursive_bayes.ex @@ -0,0 +1,21 @@ +defmodule Localiser.Localisation.Filter.DiscreteRecursiveBayes do + @behaviour Localiser.Localisation.Filter.Behaviour + + @impl true + def init(_sensors, _opts) do + # TODO: discretise floor into a grid, initialise uniform prior over cells + {:ok, %{}} + end + + @impl true + def update(state, _measurements) do + # TODO: motion model (convolution), measurement update (pointwise multiply + normalise) + {:ok, {0.0, 0.0}, state} + end + + @impl true + def estimate(_state) do + # TODO: return MAP cell centre; confidence = max cell probability + {{0.0, 0.0}, 0.0} + end +end diff --git a/lib/localiser/localisation/filter/kalman.ex b/lib/localiser/localisation/filter/kalman.ex new file mode 100644 index 0000000..b5f748d --- /dev/null +++ b/lib/localiser/localisation/filter/kalman.ex @@ -0,0 +1,21 @@ +defmodule Localiser.Localisation.Filter.Kalman do + @behaviour Localiser.Localisation.Filter.Behaviour + + @impl true + def init(_sensors, _opts) do + # TODO: initialise state vector [x, y, vx, vy], covariance P, Q, R matrices + {:ok, %{}} + end + + @impl true + def update(state, _measurements) do + # TODO: predict step (F * x), update step (Kalman gain, residual) + {:ok, {0.0, 0.0}, state} + end + + @impl true + def estimate(_state) do + # TODO: return state vector position and trace of P as inverse-confidence proxy + {{0.0, 0.0}, 0.0} + end +end diff --git a/lib/localiser/localisation/filter/particle.ex b/lib/localiser/localisation/filter/particle.ex new file mode 100644 index 0000000..e6dc2d2 --- /dev/null +++ b/lib/localiser/localisation/filter/particle.ex @@ -0,0 +1,21 @@ +defmodule Localiser.Localisation.Filter.Particle do + @behaviour Localiser.Localisation.Filter.Behaviour + + @impl true + def init(_sensors, _opts) do + # TODO: initialise particle set, weights, noise parameters + {:ok, %{}} + end + + @impl true + def update(state, _measurements) do + # TODO: predict step, weight update by likelihood, resample + {:ok, {0.0, 0.0}, state} + end + + @impl true + def estimate(_state) do + # TODO: weighted mean of particle set + {{0.0, 0.0}, 0.0} + end +end diff --git a/lib/localiser/localisation/filter/supervisor.ex b/lib/localiser/localisation/filter/supervisor.ex new file mode 100644 index 0000000..f76c15c --- /dev/null +++ b/lib/localiser/localisation/filter/supervisor.ex @@ -0,0 +1,32 @@ +defmodule Localiser.Localisation.Filter.Supervisor do + use DynamicSupervisor + + alias Localiser.Domain.Tags + alias Localiser.Localisation.Tag.Filter, as: TagFilter + + def start_link(_args) do + case DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__) do + {:ok, pid} -> + Tags.list_tags() |> Enum.each(&start_tag_filter/1) + {:ok, pid} + + error -> + error + end + end + + @impl true + def init(:ok) do + DynamicSupervisor.init(strategy: :one_for_one) + end + + def start_tag_filter(tag) do + child_spec = %{ + id: {TagFilter, tag.tag_id}, + start: {TagFilter, :start_link, [tag]}, + restart: :transient + } + + DynamicSupervisor.start_child(__MODULE__, child_spec) + end +end diff --git a/lib/localiser/localisation/floor/server.ex b/lib/localiser/localisation/floor/server.ex new file mode 100644 index 0000000..46b1490 --- /dev/null +++ b/lib/localiser/localisation/floor/server.ex @@ -0,0 +1,24 @@ +defmodule Localiser.Localisation.Floor.Server do + use Supervisor + + alias Localiser.Localisation.Room + alias Localiser.Localisation.Sensor + def start_link(floor) do + Supervisor.start_link(__MODULE__, floor, name: via(floor.id)) + end + + def via(floor_id) do + {:via, Registry, {Localiser.Registry, {:floor_server, floor_id}}} + end + + @impl true + def init(floor) do + children = [ + {Room.Supervisor, floor}, + {Sensor.Supervisor, floor}, + {Sensor.Manager, floor} + ] + + Supervisor.init(children, strategy: :one_for_one) + end +end diff --git a/lib/localiser/localisation/floor_supervisor.ex b/lib/localiser/localisation/floor_supervisor.ex new file mode 100644 index 0000000..07e3261 --- /dev/null +++ b/lib/localiser/localisation/floor_supervisor.ex @@ -0,0 +1,32 @@ +defmodule Localiser.Localisation.Floor.Supervisor do + use DynamicSupervisor + + alias Localiser.Domain.Floors + alias Localiser.Localisation.Floor.Server + + def start_link(_args) do + case DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__) do + {:ok, pid} -> + Floors.list_floors() |> Enum.each(&start_floor_server/1) + {:ok, pid} + + error -> + error + end + end + + @impl true + def init(:ok) do + DynamicSupervisor.init(strategy: :one_for_one) + end + + def start_floor_server(floor) do + child_spec = %{ + id: {Server, floor.id}, + start: {Server, :start_link, [floor]}, + restart: :transient + } + + DynamicSupervisor.start_child(__MODULE__, child_spec) + end +end diff --git a/lib/localiser/localisation/room/server.ex b/lib/localiser/localisation/room/server.ex new file mode 100644 index 0000000..5ad836a --- /dev/null +++ b/lib/localiser/localisation/room/server.ex @@ -0,0 +1,75 @@ +defmodule Localiser.Localisation.Room.Server do + use GenServer + + @pubsub Localiser.PubSub + + defstruct [:id, :name, :offset_x, :offset_y, :width, :height, occupants: MapSet.new()] + + def start_link(room) do + GenServer.start_link(__MODULE__, room, name: via(room.id)) + end + + def via(room_id) do + {:via, Registry, {Localiser.Registry, {:room, room_id}}} + end + + def tag_entered(room_id, tag_id) do + GenServer.cast(via(room_id), {:tag_entered, tag_id}) + end + + def tag_left(room_id, tag_id) do + GenServer.cast(via(room_id), {:tag_left, tag_id}) + end + + def get_occupants(room_id) do + GenServer.call(via(room_id), :get_occupants) + end + + @impl true + def init(room) do + state = %__MODULE__{ + id: room.id, + name: room.name, + offset_x: room.offset_x || 0.0, + offset_y: room.offset_y || 0.0, + width: room.width || 0.0, + height: room.height || 0.0 + } + + {:ok, state} + end + + @impl true + def handle_cast({:tag_entered, tag_id}, state) do + new_occupants = MapSet.put(state.occupants, tag_id) + + if new_occupants != state.occupants do + broadcast(state.id, new_occupants) + end + + {:noreply, %{state | occupants: new_occupants}} + end + + def handle_cast({:tag_left, tag_id}, state) do + new_occupants = MapSet.delete(state.occupants, tag_id) + + if new_occupants != state.occupants do + broadcast(state.id, new_occupants) + end + + {:noreply, %{state | occupants: new_occupants}} + end + + @impl true + def handle_call(:get_occupants, _from, state) do + {:reply, state.occupants, state} + end + + defp broadcast(room_id, occupants) do + Phoenix.PubSub.broadcast( + @pubsub, + "room:#{room_id}", + {:room_occupancy_changed, room_id, occupants} + ) + end +end diff --git a/lib/localiser/localisation/room/supervisor.ex b/lib/localiser/localisation/room/supervisor.ex new file mode 100644 index 0000000..9cfbd57 --- /dev/null +++ b/lib/localiser/localisation/room/supervisor.ex @@ -0,0 +1,41 @@ +defmodule Localiser.Localisation.Room.Supervisor do + use DynamicSupervisor + + alias Localiser.Domain.Rooms + alias Localiser.Localisation.Room.Server + + def start_link(floor) do + case DynamicSupervisor.start_link(__MODULE__, :ok, name: via(floor.id)) do + {:ok, pid} -> + seed_rooms(floor.id) + {:ok, pid} + + error -> + error + end + end + + def via(floor_id) do + {:via, Registry, {Localiser.Registry, {:room_supervisor, floor_id}}} + end + + @impl true + def init(:ok) do + DynamicSupervisor.init(strategy: :one_for_one) + end + + def start_room_server(floor_id, room) do + child_spec = %{ + id: {Server, room.id}, + start: {Server, :start_link, [room]}, + restart: :transient + } + + DynamicSupervisor.start_child(via(floor_id), child_spec) + end + + defp seed_rooms(floor_id) do + Rooms.list_rooms_for_floor(floor_id) + |> Enum.each(&start_room_server(floor_id, &1)) + end +end diff --git a/lib/localiser/localisation/sensor/manager.ex b/lib/localiser/localisation/sensor/manager.ex new file mode 100644 index 0000000..0212a86 --- /dev/null +++ b/lib/localiser/localisation/sensor/manager.ex @@ -0,0 +1,54 @@ +defmodule Localiser.Localisation.Sensor.Manager do + @moduledoc """ + GenServer that subscribes to the "sensors" PubSub topic and manages + Sensor.Server lifecycle for a specific floor. + + - {:sensor_enrolled, sensor, room} → start Sensor.Server if not already running + - {:sensor_unenrolled, sensor_id} → terminate the Sensor.Server for this floor + """ + + use GenServer + + alias Localiser.Localisation.Sensor + + def start_link(floor) do + GenServer.start_link(__MODULE__, floor.id, name: via(floor.id)) + end + + def via(floor_id) do + {:via, Registry, {Localiser.Registry, {:sensor_manager, floor_id}}} + end + + @impl true + def init(floor_id) do + Phoenix.PubSub.subscribe(Localiser.PubSub, "sensors") + {:ok, %{floor_id: floor_id}} + end + + # Sensor placed (or moved) onto this floor's layout. + @impl true + def handle_info({:sensor_enrolled, sensor, %{floor_id: floor_id} = room}, %{floor_id: floor_id} = state) do + case Registry.lookup(Localiser.Registry, {:sensor, sensor.sensor_id}) do + [] -> Sensor.Supervisor.start_sensor_server(floor_id, sensor, room) + _ -> :ok # Sensor.Server is already running and handles position via its own PubSub sub + end + + {:noreply, state} + end + + # Sensor removed from layout — stop the server for this floor if it's running. + def handle_info({:sensor_unenrolled, sensor_id}, %{floor_id: floor_id} = state) do + case Registry.lookup(Localiser.Registry, {:sensor, sensor_id}) do + [{pid, _}] -> + DynamicSupervisor.terminate_child(Sensor.Supervisor.via(floor_id), pid) + + [] -> + :ok + end + + {:noreply, state} + end + + # Ignore all other PubSub broadcasts (sensor announced, calibration_complete, etc.) + def handle_info(_msg, state), do: {:noreply, state} +end diff --git a/lib/localiser/localisation/sensor/server.ex b/lib/localiser/localisation/sensor/server.ex new file mode 100644 index 0000000..f595122 --- /dev/null +++ b/lib/localiser/localisation/sensor/server.ex @@ -0,0 +1,170 @@ +defmodule Localiser.Localisation.Sensor.Server do + use GenServer + + require Logger + + alias Localiser.Domain.Sensors + alias Localiser.Domain.Schema.{Sensor, SensorCalibration} + alias Localiser.MQTT.Connection, as: MQTTConnection + + @default_rssi_ref -59 + @default_path_loss_exp 2.0 + + # mode: :ok | {:calibrating, buffer :: [integer()], target :: pos_integer()} + defstruct [:sensor_id, :sensor_db_id, :floor_x, :floor_y, :rssi_ref, :path_loss_exp, mode: :ok] + + def start_link({sensor, room}) do + GenServer.start_link(__MODULE__, {sensor, room}, name: via(sensor.sensor_id)) + end + + def via(sensor_id) do + {:via, Registry, {Localiser.Registry, {:sensor, sensor_id}}} + end + + # Returns %{sensor_id, floor_x, floor_y, distance} for a raw RSSI reading. + def measure(sensor_id, rssi) do + GenServer.call(via(sensor_id), {:measure, rssi}) + end + + # Returns true if the sensor is currently collecting calibration samples. + def calibrating?(sensor_id) do + GenServer.call(via(sensor_id), :calibrating?) + end + + # Feeds a raw RSSI value into the calibration buffer. + def calibration_reading(sensor_id, rssi) do + GenServer.cast(via(sensor_id), {:calibration_reading, rssi}) + end + + # Starts calibration mode. sample_target: number of RSSI samples to collect. + def begin_calibration(sensor_id, sample_target \\ 50) do + GenServer.cast(via(sensor_id), {:begin_calibration, sample_target}) + end + + # Aborts an in-progress calibration without saving. + def abort_calibration(sensor_id) do + GenServer.cast(via(sensor_id), :abort_calibration) + end + + @impl true + def init({sensor, room}) do + Phoenix.PubSub.subscribe(Localiser.PubSub, "sensors") + + calibration = Sensors.latest_calibration(sensor) + {rssi_ref, path_loss_exp} = calibration_params(calibration) + + state = %__MODULE__{ + sensor_id: sensor.sensor_id, + sensor_db_id: sensor.id, + floor_x: (room.offset_x || 0.0) + (sensor.x || 0.0), + floor_y: (room.offset_y || 0.0) + (sensor.y || 0.0), + rssi_ref: rssi_ref, + path_loss_exp: path_loss_exp + } + + {:ok, state} + end + + @impl true + def handle_call({:measure, rssi}, _from, state) do + distance = rssi_to_distance(rssi, state.rssi_ref, state.path_loss_exp) + + measurement = %{ + sensor_id: state.sensor_id, + floor_x: state.floor_x, + floor_y: state.floor_y, + distance: distance + } + + {:reply, measurement, state} + end + + @impl true + def handle_call(:calibrating?, _from, state) do + {:reply, match?({:calibrating, _, _}, state.mode), state} + end + + @impl true + def handle_cast({:begin_calibration, target}, state) do + MQTTConnection.publish("localiser/sensor/#{state.sensor_id}/cmd", ~s({"action":"calibrate_start"})) + {:noreply, %{state | mode: {:calibrating, [], target}}} + end + + @impl true + def handle_cast(:abort_calibration, %{mode: {:calibrating, _, _}} = state) do + MQTTConnection.publish("localiser/sensor/#{state.sensor_id}/cmd", ~s({"action":"calibrate_stop"})) + {:noreply, %{state | mode: :ok}} + end + + def handle_cast(:abort_calibration, state), do: {:noreply, state} + + @impl true + def handle_cast({:calibration_reading, rssi}, %{mode: {:calibrating, buffer, target}} = state) do + buffer = [rssi | buffer] + + if length(buffer) >= target do + finalize_calibration(buffer, state) + else + {:noreply, %{state | mode: {:calibrating, buffer, target}}} + end + end + + def handle_cast({:calibration_reading, _rssi}, state), do: {:noreply, state} + + # Position updated (sensor dragged in layout). + @impl true + def handle_info({:sensor_enrolled, %Sensor{sensor_id: sid} = sensor, room}, %{sensor_id: sid} = state) do + floor_x = (room.offset_x || 0.0) + (sensor.x || 0.0) + floor_y = (room.offset_y || 0.0) + (sensor.y || 0.0) + {:noreply, %{state | floor_x: floor_x, floor_y: floor_y}} + end + + # Ignore PubSub messages not relevant to this server. + def handle_info(_msg, state), do: {:noreply, state} + + # Private + + defp finalize_calibration(buffer, state) do + rssi_ref = median(buffer) + sensor_struct = %Sensor{id: state.sensor_db_id, sensor_id: state.sensor_id} + + case Sensors.add_calibration(sensor_struct, %{ + rssi_ref: rssi_ref, + path_loss_exp: state.path_loss_exp, + calibrated_at: DateTime.utc_now() + }) do + {:ok, _calibration} -> + Logger.info("[Sensor.Server] Calibration complete for #{state.sensor_id}: rssi_ref=#{rssi_ref}") + MQTTConnection.publish("localiser/sensor/#{state.sensor_id}/cmd", ~s({"action":"calibrate_stop"})) + Phoenix.PubSub.broadcast(Localiser.PubSub, "sensors", {:calibration_complete, state.sensor_id}) + {:noreply, %{state | rssi_ref: rssi_ref, mode: :ok}} + + {:error, reason} -> + Logger.error("[Sensor.Server] Failed to save calibration for #{state.sensor_id}: #{inspect(reason)}") + {:noreply, %{state | mode: :ok}} + end + end + + defp median(list) do + sorted = Enum.sort(list) + len = length(sorted) + mid = div(len, 2) + + if rem(len, 2) == 0 do + round((Enum.at(sorted, mid - 1) + Enum.at(sorted, mid)) / 2) + else + Enum.at(sorted, mid) + end + end + + # d = 10 ^ ((rssi_ref - rssi) / (10 * n)) + defp rssi_to_distance(rssi, rssi_ref, path_loss_exp) do + :math.pow(10.0, (rssi_ref - rssi) / (10.0 * path_loss_exp)) + end + + defp calibration_params(nil), do: {@default_rssi_ref, @default_path_loss_exp} + + defp calibration_params(%SensorCalibration{rssi_ref: rssi_ref, path_loss_exp: path_loss_exp}) do + {rssi_ref, path_loss_exp} + end +end diff --git a/lib/localiser/localisation/sensor/supervisor.ex b/lib/localiser/localisation/sensor/supervisor.ex new file mode 100644 index 0000000..6164289 --- /dev/null +++ b/lib/localiser/localisation/sensor/supervisor.ex @@ -0,0 +1,44 @@ +defmodule Localiser.Localisation.Sensor.Supervisor do + use DynamicSupervisor + + alias Localiser.Domain.{Rooms, Sensors} + alias Localiser.Localisation.Sensor.Server + + def start_link(floor) do + case DynamicSupervisor.start_link(__MODULE__, :ok, name: via(floor.id)) do + {:ok, pid} -> + seed_sensors(floor.id) + {:ok, pid} + + error -> + error + end + end + + def via(floor_id) do + {:via, Registry, {Localiser.Registry, {:sensor_supervisor, floor_id}}} + end + + @impl true + def init(:ok) do + DynamicSupervisor.init(strategy: :one_for_one) + end + + def start_sensor_server(floor_id, sensor, room) do + child_spec = %{ + id: {Server, sensor.id}, + start: {Server, :start_link, [{sensor, room}]}, + restart: :transient + } + + DynamicSupervisor.start_child(via(floor_id), child_spec) + end + + defp seed_sensors(floor_id) do + Rooms.list_rooms_for_floor(floor_id) + |> Enum.each(fn room -> + Sensors.list_sensors_for_room(room.id) + |> Enum.each(&start_sensor_server(floor_id, &1, room)) + end) + end +end diff --git a/lib/localiser/localisation/tag/filter.ex b/lib/localiser/localisation/tag/filter.ex new file mode 100644 index 0000000..ab1efb1 --- /dev/null +++ b/lib/localiser/localisation/tag/filter.ex @@ -0,0 +1,93 @@ +defmodule Localiser.Localisation.Tag.Filter do + use GenServer + + alias Localiser.Domain.Floors + alias Localiser.Localisation.Room.Server, as: RoomServer + + @default_filter Localiser.Localisation.Filter.Particle + + defstruct [ + :tag_id, + :filter_module, + :filter_state, + :rooms, + current_room_id: nil + ] + + def start_link(tag) do + GenServer.start_link(__MODULE__, tag, name: via(tag.tag_id)) + end + + def via(tag_id) do + {:via, Registry, {Localiser.Registry, {:filter, tag_id}}} + end + + # Deliver a batch of resolved measurements to this tag's filter. Called by RSSI.Buffer. + # measurements :: [%{sensor_id, floor_x, floor_y, distance}] + def ingest(tag_id, measurements) do + GenServer.cast(via(tag_id), {:ingest, measurements}) + end + + # Hot-swap the filter module. Reinitialises filter state with new module. + def swap_filter(tag_id, new_module, opts \\ []) do + GenServer.call(via(tag_id), {:swap_filter, new_module, opts}) + end + + @impl true + def init(tag) do + filter_module = Application.get_env(:localiserd, :default_filter, @default_filter) + rooms = load_rooms() + {:ok, filter_state} = filter_module.init([], []) + + state = %__MODULE__{ + tag_id: tag.tag_id, + filter_module: filter_module, + filter_state: filter_state, + rooms: rooms + } + + {:ok, state} + end + + @impl true + def handle_cast({:ingest, []}, state), do: {:noreply, state} + + def handle_cast({:ingest, measurements}, state) do + {:ok, {x, y}, new_filter_state} = state.filter_module.update(state.filter_state, measurements) + + new_room = find_room(state.rooms, x, y) + new_room_id = if new_room, do: new_room.id, else: nil + + if new_room_id != state.current_room_id do + if state.current_room_id, do: RoomServer.tag_left(state.current_room_id, state.tag_id) + if new_room_id, do: RoomServer.tag_entered(new_room_id, state.tag_id) + end + + {:noreply, %{state | filter_state: new_filter_state, current_room_id: new_room_id}} + end + + @impl true + def handle_call({:swap_filter, new_module, opts}, _from, state) do + case new_module.init([], opts) do + {:ok, new_filter_state} -> + {:reply, :ok, %{state | filter_module: new_module, filter_state: new_filter_state}} + + {:error, reason} -> + {:reply, {:error, reason}, state} + end + end + + defp load_rooms do + Floors.list_floors_with_rooms() + |> Enum.flat_map(& &1.rooms) + end + + defp find_room(rooms, x, y) do + Enum.find(rooms, fn room -> + ox = room.offset_x || 0.0 + oy = room.offset_y || 0.0 + x >= ox and x < ox + (room.width || 0.0) and + y >= oy and y < oy + (room.height || 0.0) + end) + end +end diff --git a/lib/localiser/mqtt/connection.ex b/lib/localiser/mqtt/connection.ex new file mode 100644 index 0000000..a49af6a --- /dev/null +++ b/lib/localiser/mqtt/connection.ex @@ -0,0 +1,101 @@ +defmodule Localiser.MQTT.Connection do + @moduledoc """ + GenServer responsible for maintaining a connection to the MQTT broker. + """ + + use GenServer + require Logger + + @broker_host Application.compile_env(:localiser, :mqtt_host, "localhost") + @broker_port Application.compile_env(:localiser, :mqtt_port, 1883) + + @rssi_topic "localiser/sensor/+/rssi" + @announce_topic "localiser/sensor/+/announce" + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def publish(topic, payload) do + GenServer.call(__MODULE__, {:publish, topic, payload}) + end + + # Server Callbacks + + @impl true + def init(_opts) do + Process.flag(:trap_exit, true) + send(self(), :connect) + {:ok, %{client_id: nil, connected: false, client_pid: nil}} + end + + @impl true + def handle_info(:connect, state) do + client_id = "localiser_#{:rand.uniform(1000)}" + Logger.info("[MQTT.Connection] Attempting to connect to MQTT broker at #{@broker_host}:#{@broker_port} with client ID #{client_id}") + + case do_connect(client_id) do + {:ok, pid} -> + Logger.info("[MQTT.Connection] Connected to MQTT broker with client ID #{client_id}") + {:noreply, %{state | client_id: client_id, connected: true, client_pid: pid}} + + {:error, reason} -> + # TODO: Implement exponential backoff for retries + Logger.error("[MQTT.Connection] Failed to connect to MQTT broker: #{inspect(reason)}. Retrying in 5 seconds...") + Process.send_after(self(), :connect, 5_000) + {:noreply, %{state | connected: false, client_pid: nil}} + end + end + + # Emqtt process exited (broker dropped connection or startup failure) + @impl true + def handle_info({:EXIT, pid, reason}, %{client_pid: pid} = state) do + Logger.warning("[MQTT.Connection] MQTT client exited (#{inspect(reason)}). Reconnecting in 5 seconds...") + Process.send_after(self(), :connect, 5_000) + {:noreply, %{state | connected: false, client_pid: nil}} + end + + def handle_info({:EXIT, _pid, _reason}, state), do: {:noreply, state} + + def handle_info({:publish, %{:topic => topic, :payload => payload}}, state) do + Localiser.MQTT.Router.route(topic, payload) + {:noreply, state} + end + + @impl true + def handle_call({:publish, _topic, _payload}, _from, %{connected: false} = state) do + {:reply, {:error, :not_connected}, state} + end + + @impl true + def handle_call({:publish, topic, payload}, _from, %{client_pid: pid} = state) do + result = :emqtt.publish(pid, topic, payload) + {:reply, result, state} + end + + # Private + + defp do_connect(client_id) do + case :emqtt.start_link(host: @broker_host, port: @broker_port, clientid: client_id) do + {:ok, pid} -> do_connect_pid(pid) + {:error, reason} -> {:error, reason} + end + end + + defp do_connect_pid(pid) do + case :emqtt.connect(pid) do + {:ok, _props} -> + :emqtt.subscribe(pid, {@rssi_topic, 1}) + :emqtt.subscribe(pid, {@announce_topic, 1}) + {:ok, pid} + + {:error, reason} -> + if Process.alive?(pid), do: :emqtt.stop(pid) + {:error, reason} + end + catch + :exit, reason -> + if Process.alive?(pid), do: :emqtt.stop(pid) + {:error, reason} + end +end diff --git a/lib/localiser/mqtt/router.ex b/lib/localiser/mqtt/router.ex new file mode 100644 index 0000000..eb1560b --- /dev/null +++ b/lib/localiser/mqtt/router.ex @@ -0,0 +1,83 @@ +defmodule Localiser.MQTT.Router do + @moduledoc """ + Module responsible for routing incoming MQTT messages to the appropriate handlers. + """ + + use GenServer + require Logger + + alias Localiser.Domain.Sensors + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def route(topic, payload) do + GenServer.cast(__MODULE__, {:route, topic, payload}) + end + + # GenServer Callbacks + + @impl true + def init(_opts) do + {:ok, %{}} + end + + @impl true + def handle_cast({:route, topic, payload}, state) do + case parse_topic(topic) do + {:rssi, sensor_id} -> + handle_rssi(sensor_id, payload) + + {:announce, sensor_id} -> + handle_announce(sensor_id) + + {:error, :invalid_topic} -> + Logger.debug("[MQTT.Router] Received message with invalid topic: #{topic}") + end + + {:noreply, state} + end + + # Private helper functions + + defp parse_topic(topic) do + # Example topic: "localiser/sensor/device123/rssi" + case String.split(topic, "/") do + ["localiser", "sensor", sensor_id, "rssi"] -> {:rssi, sensor_id} + ["localiser", "sensor", sensor_id, "announce"] -> {:announce, sensor_id} + _ -> {:error, :invalid_topic} + end + end + + defp handle_rssi(sensor_id, payload) do + with {:ok, %{"tag_id" => tag_id, "rssi" => rssi}} <- + Jason.decode(payload) do + + reading = %{ + sensor_id: sensor_id, + tag_id: tag_id, + rssi: rssi, + timestamp: DateTime.utc_now() + } + + Localiser.MQTT.Telemetry.count_reading() + Localiser.RSSI.Buffer.push(reading) + + else + {:error, reason} -> + Logger.error("[MQTT.Router] Bad payload from #{sensor_id}: #{inspect(reason)}") + Localiser.MQTT.Telemetry.count_error() + end + end + + defp handle_announce(sensor_id) do + case Sensors.upsert_announced(sensor_id) do + {:ok, _sensor} -> + Logger.info("[MQTT.Router] Sensor announced: #{sensor_id}") + + {:error, reason} -> + Logger.error("[MQTT.Router] Failed to register announced sensor #{sensor_id}: #{inspect(reason)}") + end + end +end diff --git a/lib/localiser/mqtt/supervisor.ex b/lib/localiser/mqtt/supervisor.ex new file mode 100644 index 0000000..b18036b --- /dev/null +++ b/lib/localiser/mqtt/supervisor.ex @@ -0,0 +1,22 @@ +defmodule Localiser.MQTT.Supervisor do + @moduledoc """ + Supervisor responsible for managing MQTT-related processes. + """ + + use Supervisor + + def start_link(opts) do + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(_opts) do + children = [ + Localiser.MQTT.Connection, + Localiser.MQTT.Router, + Localiser.MQTT.Telemetry + ] + + Supervisor.init(children, strategy: :rest_for_one) + end +end diff --git a/lib/localiser/mqtt/telemetry.ex b/lib/localiser/mqtt/telemetry.ex new file mode 100644 index 0000000..ecfd04e --- /dev/null +++ b/lib/localiser/mqtt/telemetry.ex @@ -0,0 +1,59 @@ +defmodule Localiser.MQTT.Telemetry do + @moduledoc """ + GenServer responsible for tracking telemetry data related to MQTT messages. + """ + + use GenServer + require Logger + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def count_reading() do + GenServer.cast(__MODULE__, :count_reading) + end + + def count_error() do + GenServer.cast(__MODULE__, :count_error) + end + + @log_interval 60_000 + + # GenServer Callbacks + + @impl true + def init(_opts) do + schedule_log() + {:ok, %{total_readings: 0, total_errors: 0}} + end + + @impl true + def handle_cast(:count_reading, state) do + new_count = state.total_readings + 1 + {:noreply, %{state | total_readings: new_count}} + end + + @impl true + def handle_cast(:count_error, state) do + new_count = state.total_errors + 1 + {:noreply, %{state | total_errors: new_count}} + end + + @impl true + def handle_info(:log_stats, state) do + rate = if state.total_readings > 0 do + Float.round(state.total_errors / state.total_readings * 100, 2) + else + 0.0 + end + Logger.info("[MQTT.Telemetry] #{state.total_readings} readings, #{state.total_errors} errors (#{rate} err rate) in last #{@log_interval / 1000}s.") + schedule_log() + {:noreply, state} + end + + # Private helper functions + defp schedule_log() do + Process.send_after(self(), :log_stats, @log_interval) + end +end diff --git a/lib/localiser/repo.ex b/lib/localiser/repo.ex new file mode 100644 index 0000000..9c36dd2 --- /dev/null +++ b/lib/localiser/repo.ex @@ -0,0 +1,5 @@ +defmodule Localiser.Repo do + use Ecto.Repo, + otp_app: :localiserd, + adapter: Ecto.Adapters.SQLite3 +end diff --git a/lib/localiser/rssi_buffer.ex b/lib/localiser/rssi_buffer.ex new file mode 100644 index 0000000..f207727 --- /dev/null +++ b/lib/localiser/rssi_buffer.ex @@ -0,0 +1,72 @@ +defmodule Localiser.RSSI.Buffer do + use GenServer + + alias Localiser.Localisation + alias Localiser.Localisation.Tag.Filter, as: TagFilter + alias Localiser.Localisation.Sensor.Server, as: SensorServer + + @flush_interval_ms 500 + + # reading :: %{sensor_id: String.t(), tag_id: String.t(), rssi: integer()} + def push(reading) do + GenServer.cast(__MODULE__, {:push, reading}) + end + + def start_link(_args) do + GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + end + + @impl true + def init(state) do + schedule_flush() + {:ok, state} + end + + @impl true + def handle_cast({:push, %{tag_id: tag_id} = reading}, state) do + {:noreply, Map.update(state, tag_id, [reading], &[reading | &1])} + end + + @impl true + def handle_info(:flush, state) do + flush_batches(state) + schedule_flush() + {:noreply, %{}} + end + + defp flush_batches(batches) do + Enum.each(batches, fn {tag_id, readings} -> + case Registry.lookup(Localiser.Registry, {:filter, tag_id}) do + [{_pid, _}] -> + measurements = Enum.flat_map(readings, &resolve_measurement/1) + if measurements != [], do: TagFilter.ingest(tag_id, measurements) + + [] -> + :ok + end + end) + end + + # Resolves a raw RSSI reading to a measurement with sensor location and distance. + # If the sensor is in calibration mode, feeds the reading to Sensor.Server instead + # and returns [] so the sample is excluded from Tag.Filter measurements. + # If the sensor server isn't running, returns []. + defp resolve_measurement(%{sensor_id: sensor_id, rssi: rssi}) do + case Registry.lookup(Localiser.Registry, {:sensor, sensor_id}) do + [{_pid, _}] -> + if SensorServer.calibrating?(sensor_id) do + SensorServer.calibration_reading(sensor_id, rssi) + [] + else + [SensorServer.measure(sensor_id, rssi)] + end + + [] -> + [] + end + end + + defp schedule_flush do + Process.send_after(self(), :flush, @flush_interval_ms) + end +end diff --git a/lib/localiserd.ex b/lib/localiserd.ex new file mode 100644 index 0000000..de8a002 --- /dev/null +++ b/lib/localiserd.ex @@ -0,0 +1,18 @@ +defmodule Localiserd do + @moduledoc """ + Documentation for `Localiserd`. + """ + + @doc """ + Hello world. + + ## Examples + + iex> Localiserd.hello() + :world + + """ + def hello do + :world + end +end diff --git a/mix.exs b/mix.exs new file mode 100644 index 0000000..7310fcd --- /dev/null +++ b/mix.exs @@ -0,0 +1,32 @@ +defmodule Localiserd.MixProject do + use Mix.Project + + def project do + [ + app: :localiserd, + version: "0.1.0", + elixir: "~> 1.19", + start_permanent: Mix.env() == :prod, + deps: deps() + ] + end + + # Run "mix help compile.app" to learn about applications. + def application do + [ + extra_applications: [:logger], + mod: {Localiser.Application, []} + ] + end + + # Run "mix help deps" to learn about dependencies. + defp deps do + [ + {:emqtt, github: "emqx/emqtt", tag: "1.14.6", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]}, + {:jason, "~> 1.4"}, + {:ecto_sqlite3, "~> 0.18"}, + {:argon2_elixir, "~> 4.0"}, + {:phoenix_pubsub, "~> 2.1"} + ] + end +end diff --git a/mix.lock b/mix.lock new file mode 100644 index 0000000..3b13dfe --- /dev/null +++ b/mix.lock @@ -0,0 +1,19 @@ +%{ + "argon2_elixir": {:hex, :argon2_elixir, "4.1.3", "4f28318286f89453364d7fbb53e03d4563fd7ed2438a60237eba5e426e97785f", [:make, :mix], [{:comeonin, "~> 5.3", [hex: :comeonin, repo: "hexpm", optional: false]}, {:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "7c295b8d8e0eaf6f43641698f962526cdf87c6feb7d14bd21e599271b510608c"}, + "cc_precompiler": {:hex, :cc_precompiler, "0.1.11", "8c844d0b9fb98a3edea067f94f616b3f6b29b959b6b3bf25fee94ffe34364768", [:mix], [{:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3427232caf0835f94680e5bcf082408a70b48ad68a5f5c0b02a3bea9f3a075b9"}, + "comeonin": {:hex, :comeonin, "5.5.1", "5113e5f3800799787de08a6e0db307133850e635d34e9fab23c70b6501669510", [:mix], [], "hexpm", "65aac8f19938145377cee73973f192c5645873dcf550a8a6b18187d17c13ccdb"}, + "cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"}, + "db_connection": {:hex, :db_connection, "2.9.0", "a6a97c5c958a2d7091a58a9be40caf41ab496b0701d21e1d1abff3fa27a7f371", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "17d502eacaf61829db98facf6f20808ed33da6ccf495354a41e64fe42f9c509c"}, + "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, + "ecto": {:hex, :ecto, "3.13.5", "9d4a69700183f33bf97208294768e561f5c7f1ecf417e0fa1006e4a91713a834", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "df9efebf70cf94142739ba357499661ef5dbb559ef902b68ea1f3c1fabce36de"}, + "ecto_sql": {:hex, :ecto_sql, "3.13.5", "2f8282b2ad97bf0f0d3217ea0a6fff320ead9e2f8770f810141189d182dc304e", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.13.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "aa36751f4e6a2b56ae79efb0e088042e010ff4935fc8684e74c23b1f49e25fdc"}, + "ecto_sqlite3": {:hex, :ecto_sqlite3, "0.22.0", "edab2d0f701b7dd05dcf7e2d97769c106aff62b5cfddc000d1dd6f46b9cbd8c3", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.13.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.13.0", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:exqlite, "~> 0.22", [hex: :exqlite, repo: "hexpm", optional: false]}], "hexpm", "5af9e031bffcc5da0b7bca90c271a7b1e7c04a93fecf7f6cd35bc1b1921a64bd"}, + "elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"}, + "emqtt": {:git, "https://github.com/emqx/emqtt.git", "249600337261dd004a848381db19bf1986687f28", [tag: "1.14.6"]}, + "exqlite": {:hex, :exqlite, "0.36.0", "07b4f95d61cb82b8d52946d0639497fa7d32117e09b2c8d25e24a38723c295cb", [:make, :mix], [{:cc_precompiler, "~> 0.1", [hex: :cc_precompiler, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:elixir_make, "~> 0.8", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "cbeca3ce781f9ff07cfa9a87486f3ebd512a143ad6a14ed5c9fca21fe0bf3ae7"}, + "getopt": {:hex, :getopt, "1.0.3", "4f3320c1f6f26b2bec0f6c6446b943eb927a1e6428ea279a1c6c534906ee79f1", [:rebar3], [], "hexpm", "7e01de90ac540f21494ff72792b1e3162d399966ebbfc674b4ce52cb8f49324f"}, + "gun": {:hex, :gun, "2.1.0", "b4e4cbbf3026d21981c447e9e7ca856766046eff693720ba43114d7f5de36e87", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "52fc7fc246bfc3b00e01aea1c2854c70a366348574ab50c57dfe796d24a0101d"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.2.0", "ff3a5616e1bed6804de7773b92cbccfc0b0f473faf1f63d7daf1206c7aeaaa6f", [:mix], [], "hexpm", "adc313a5bf7136039f63cfd9668fde73bba0765e0614cba80c06ac9460ff3e96"}, + "telemetry": {:hex, :telemetry, "1.4.1", "ab6de178e2b29b58e8256b92b382ea3f590a47152ca3651ea857a6cae05ac423", [:rebar3], [], "hexpm", "2172e05a27531d3d31dd9782841065c50dd5c3c7699d95266b2edd54c2dafa1c"}, +} diff --git a/test/localiserd_test.exs b/test/localiserd_test.exs new file mode 100644 index 0000000..25a30a0 --- /dev/null +++ b/test/localiserd_test.exs @@ -0,0 +1,8 @@ +defmodule LocaliserdTest do + use ExUnit.Case + doctest Localiserd + + test "greets the world" do + assert Localiserd.hello() == :world + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs new file mode 100644 index 0000000..869559e --- /dev/null +++ b/test/test_helper.exs @@ -0,0 +1 @@ +ExUnit.start()