From b29c5ad23c25668ae309bb13da714af6f8cbff7a Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 5 Jun 2023 13:03:47 +0200 Subject: [PATCH] feat(emqx_ds): Add API draft for logic layer --- apps/emqx_durable_storage/IMPLEMENTATION.md | 75 ++++++++ apps/emqx_durable_storage/src/emqx_ds.erl | 177 ++++++++++++++++++ apps/emqx_durable_storage/src/emqx_ds_app.erl | 15 ++ apps/emqx_durable_storage/src/emqx_ds_int.hrl | 27 +++ ...s.app.src => emqx_durable_storage.app.src} | 0 5 files changed, 294 insertions(+) create mode 100644 apps/emqx_durable_storage/IMPLEMENTATION.md create mode 100644 apps/emqx_durable_storage/src/emqx_ds.erl create mode 100644 apps/emqx_durable_storage/src/emqx_ds_int.hrl rename apps/emqx_durable_storage/src/{emqx_ds.app.src => emqx_durable_storage.app.src} (100%) diff --git a/apps/emqx_durable_storage/IMPLEMENTATION.md b/apps/emqx_durable_storage/IMPLEMENTATION.md new file mode 100644 index 000000000..4c78b8cc8 --- /dev/null +++ b/apps/emqx_durable_storage/IMPLEMENTATION.md @@ -0,0 +1,75 @@ +# General concepts + +In the logic layer we don't speak about replication. +This is because we could use an external DB with its own replication logic. + +On the other hand, we introduce notion of shard right here at the logic. +This is because shared subscription logic needs to be aware of it to some extend, as it has to split work between subscribers somehow. + +# Tables + +## Message storage + +Data is written every time a message matching certain pattern is published. +This pattern is not part of the logic layer spec. + +Write throughput: very high +Data size: very high +Write pattern: append only +Read pattern: pseudoserial + +Number of records: O(total write throughput * retention time) + +## Session storage + +Data there is updated when: + +- A new client connects with clean session = false +- Client subscribes to a topic +- Client unsubscribes to a topic +- Garbage collection is performed + +Write throughput: low + +Data is read when a client connects and replay agents are started + +Read throughput: low + +Data format: + +`#session{clientId = "foobar", iterators = [ItKey1, ItKey2, ItKey3, ...]}` + +Number of records: O(N clients) +Size of record: O(N subscriptions per clients) + +## Iterator storage + +Data is written every time a client acks a message. +Data is read when a client reconnects and we restart replay agents. + +`#iterator{key = IterKey, data = Blob}` + +Number of records: O(N clients * N subscriptions per client) +Size of record: O(1) +Write throughput: high, lots of small updates +Write pattern: mostly key overwrite +Read throughput: low +Read pattern: random + +# Push vs. Pull model + +In push model we have replay agents iterating over the dataset in the shards. + +In pull model the the client processes work with iterators. + +## Push pros: +- Lower latency: message can be dispatched to the client as soon as it's persisted +- Less worry about buffering + +## Push cons: +- Need pushback logic +- It's not entirely justified when working with external DB that may not provide streaming API + +## Pull pros: +- No need for pushback: client advances iterators at its own tempo +- diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl new file mode 100644 index 000000000..b08d5e186 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -0,0 +1,177 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds). + +%% API: +%% Messages: +-export([message_store/2, message_store/1, message_stats/0]). +%% Iterator: +-export([iterator_update/2, iterator_next/1, iterator_stats/0]). +%% Session: +-export([ + session_open/1, + session_drop/1, + session_suspend/1, + session_add_iterator/2, + session_del_iterator/2, + session_stats/0 +]). + +%% internal exports: +-export([]). + +-export_type([ + message_id/0, + message_stats/0, + message_store_opts/0, + session_id/0, + iterator_id/0, + iterator/0 +]). + +-include("emqx_ds_int.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-type session_id() :: emqx_types:clientid(). + +-type iterator() :: term(). + +-opaque iterator_id() :: binary(). + +%%-type session() :: #session{}. + +-type message_store_opts() :: #{}. + +-type message_stats() :: #{}. + +-type message_id() :: binary(). + +%%================================================================================ +%% API funcions +%%================================================================================ + +%%-------------------------------------------------------------------------------- +%% Message +%%-------------------------------------------------------------------------------- +-spec message_store([emqx_types:message()], message_store_opts()) -> + {ok, [message_id()]} | {error, _}. +message_store(_Msg, _Opts) -> + %% TODO + ok. + +-spec message_store([emqx_types:message()]) -> {ok, [message_id()]} | {error, _}. +message_store(Msg) -> + %% TODO + message_store(Msg, #{}). + +-spec message_stats() -> message_stats(). +message_stats() -> + #{}. + +%%-------------------------------------------------------------------------------- +%% Session +%%-------------------------------------------------------------------------------- + +%% @doc Called when a client connects. This function looks up a +%% session or creates a new one if previous one couldn't be found. +%% +%% This function also spawns replay agents for each iterator. +%% +%% Note: session API doesn't handle session takeovers, it's the job of +%% the broker. +-spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id(), [iterator_id()]}. +session_open(ClientID) -> + {atomic, Ret} = + mria:transaction( + ?DS_SHARD, + fun() -> + case mnesia:read(?SESSION_TAB, ClientID) of + [#session{iterators = Iterators}] -> + {false, ClientID, Iterators}; + [] -> + Session = #session{id = ClientID, iterators = []}, + mnesia:write(?SESSION_TAB, Session), + {true, ClientID, []} + end + end + ), + Ret. + +%% @doc Called when a client reconnects with `clean session=true' or +%% during session GC +-spec session_drop(emqx_types:clientid()) -> ok. +session_drop(ClientID) -> + {atomic, ok} = mnesia:transaction( + ?DS_SHARD, + fun() -> + mnesia:delete(?SESSION_TAB, ClientID) + end + ), + ok. + +%% @doc Called when a client disconnects. This function terminates all +%% active processes related to the session. +-spec session_suspend(session_id()) -> ok | {error, session_not_found}. +session_suspend(_SessionId) -> + %% TODO + ok. + +%% @doc Called when a client subscribes to a topic. Idempotent. +-spec session_add_iterator(session_id(), emqx_topic:words()) -> + {ok, iterator_id()} | {error, session_not_found}. +session_add_iterator(_SessionId, _TopicFilter) -> + %% TODO + {ok, <<"">>}. + +%% @doc Called when a client unsubscribes from a topic. Returns `true' +%% if the session contained the subscription or `false' if it wasn't +%% subscribed. +-spec session_del_iterator(session_id(), emqx_topic:words()) -> + {ok, boolean()} | {error, session_not_found}. +session_del_iterator(_SessionId, _TopicFilter) -> + %% TODO + false. + +-spec session_stats() -> #{}. +session_stats() -> + #{}. + +%%-------------------------------------------------------------------------------- +%% Iterator (pull API) +%%-------------------------------------------------------------------------------- + +%% @doc Called when a client acks a message +-spec iterator_update(iterator_id(), iterator()) -> ok. +iterator_update(_IterId, _Iter) -> + %% TODO + ok. + +%% @doc Called when a client acks a message +-spec iterator_next(iterator()) -> {value, emqx_types:message(), iterator()} | none | {error, _}. +iterator_next(_Iter) -> + %% TODO + ok. + +-spec iterator_stats() -> #{}. +iterator_stats() -> + #{}. + +%%================================================================================ +%% Internal functions +%%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index 858855b6f..fb4d487e9 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -6,5 +6,20 @@ -export([start/2]). +-include("emqx_ds_int.hrl"). + start(_Type, _Args) -> + init_mnesia(), emqx_ds_sup:start_link(). + +init_mnesia() -> + ok = mria:create_table( + ?SESSION_TAB, + [ + {rlog_shard, ?DS_SHARD}, + {type, set}, + {storage, rocksdb_copies}, + {record_name, session}, + {attributes, record_info(fields, session)} + ] + ). diff --git a/apps/emqx_durable_storage/src/emqx_ds_int.hrl b/apps/emqx_durable_storage/src/emqx_ds_int.hrl new file mode 100644 index 000000000..96688ede6 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_int.hrl @@ -0,0 +1,27 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_DS_HRL). +-define(EMQX_DS_HRL, true). + +-define(SESSION_TAB, emqx_ds_session). +-define(DS_SHARD, emqx_ds_shard). + +-record(session, { + id :: emqx_ds:session_id(), + iterators :: [{emqx_topic:words(), emqx_ds:iterator_id()}] +}). + +-endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds.app.src b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src similarity index 100% rename from apps/emqx_durable_storage/src/emqx_ds.app.src rename to apps/emqx_durable_storage/src/emqx_durable_storage.app.src