From d9e7544070eb9f57cf948d73a997e2693dce574f Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 30 Nov 2023 23:23:55 +0100 Subject: [PATCH 01/10] refactor(sessds): Introduce macros for the timers --- apps/emqx/src/emqx_persistent_session_ds.erl | 24 ++++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 9844e6d48..fa94e656e 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -120,7 +120,11 @@ -type clientinfo() :: emqx_types:clientinfo(). -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). --type timer() :: pull | get_streams | bump_last_alive_at. + +-define(TIMER_PULL, timer_pull). +-define(TIMER_GET_STREAMS, timer_get_streams). +-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at). +-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT. -define(STATS_KEYS, [ subscriptions_cnt, @@ -399,7 +403,7 @@ deliver(_ClientInfo, _Delivers, Session) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. handle_timeout( _ClientInfo, - pull, + ?TIMER_PULL, Session = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} ) -> {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( @@ -422,13 +426,13 @@ handle_timeout( [_ | _] -> 0 end, - ensure_timer(pull, Timeout), + ensure_timer(?TIMER_PULL, Timeout), {ok, Publishes, Session#{inflight := Inflight}}; -handle_timeout(_ClientInfo, get_streams, Session) -> +handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) -> renew_streams(Session), - ensure_timer(get_streams), + ensure_timer(?TIMER_GET_STREAMS), {ok, [], Session}; -handle_timeout(_ClientInfo, bump_last_alive_at, Session0) -> +handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) -> %% Note: we take a pessimistic approach here and assume that the client will be alive %% until the next bump timeout. With this, we avoid garbage collecting this session %% too early in case the session/connection/node crashes earlier without having time @@ -436,7 +440,7 @@ handle_timeout(_ClientInfo, bump_last_alive_at, Session0) -> BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), EstimatedLastAliveAt = now_ms() + BumpInterval, Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt), - ensure_timer(bump_last_alive_at), + ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT), {ok, [], Session}. -spec replay(clientinfo(), [], session()) -> @@ -958,9 +962,9 @@ export_record(_, _, [], Acc) -> %% TODO: find a more reliable way to perform actions that have side %% effects. Add `CBM:init' callback to the session behavior? ensure_timers() -> - ensure_timer(pull), - ensure_timer(get_streams), - ensure_timer(bump_last_alive_at). + ensure_timer(?TIMER_PULL), + ensure_timer(?TIMER_GET_STREAMS), + ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT). -spec ensure_timer(timer()) -> ok. ensure_timer(bump_last_alive_at = Type) -> From 1897e5c31bee3ff389fae04482789d6b8d9a09c3 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 1 Dec 2023 00:56:38 +0100 Subject: [PATCH 02/10] refactor(session): Use common naming conventions --- apps/emqx/src/emqx_session.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 108e8ec09..7c34dbc65 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -492,13 +492,13 @@ reset_timer(Name, Time, Channel) -> -spec cancel_timer(custom_timer_name(), timerset()) -> timerset(). -cancel_timer(Name, Timers) -> - case maps:take(Name, Timers) of - {TRef, NTimers} -> +cancel_timer(Name, Timers0) -> + case maps:take(Name, Timers0) of + {TRef, Timers} -> ok = emqx_utils:cancel_timer(TRef), - NTimers; + Timers; error -> - Timers + Timers0 end. %%-------------------------------------------------------------------- From 69f1ca43c36d44c72d30171a79c084849265e6b1 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 1 Dec 2023 00:59:09 +0100 Subject: [PATCH 03/10] fix(sessds): Create a timer even if it's present in the map --- apps/emqx/src/emqx_session.erl | 2 -- 1 file changed, 2 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 7c34dbc65..239919179 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -479,8 +479,6 @@ handle_timeout(ClientInfo, Timer, Session) -> -spec ensure_timer(custom_timer_name(), timeout(), timerset()) -> timerset(). -ensure_timer(Name, _Time, Timers = #{}) when is_map_key(Name, Timers) -> - Timers; ensure_timer(Name, Time, Timers = #{}) when Time > 0 -> TRef = emqx_utils:start_timer(Time, {?MODULE, Name}), Timers#{Name => TRef}. From 38800c0260004f5fc600f507e80a23072459bde8 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 1 Dec 2023 02:20:12 +0100 Subject: [PATCH 04/10] refactor(sessds): Store timers in the session --- apps/emqx/src/emqx_persistent_session_ds.erl | 53 ++++++++------------ apps/emqx/src/emqx_session.erl | 23 ++++----- 2 files changed, 31 insertions(+), 45 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index fa94e656e..6b94f3d74 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -96,6 +96,12 @@ props := map(), extra := map() }. + +-define(TIMER_PULL, timer_pull). +-define(TIMER_GET_STREAMS, timer_get_streams). +-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at). +-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT. + -type session() :: #{ %% Client ID id := id(), @@ -111,6 +117,8 @@ receive_maximum := pos_integer(), %% Connection Info conninfo := emqx_types:conninfo(), + %% Timers + timer() => reference(), %% props := map() }. @@ -121,11 +129,6 @@ -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). --define(TIMER_PULL, timer_pull). --define(TIMER_GET_STREAMS, timer_get_streams). --define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at). --type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT. - -define(STATS_KEYS, [ subscriptions_cnt, subscriptions_max, @@ -148,8 +151,7 @@ session(). create(#{clientid := ClientID}, ConnInfo, Conf) -> % TODO: expiration - ensure_timers(), - ensure_session(ClientID, ConnInfo, Conf). + ensure_timers(ensure_session(ClientID, ConnInfo, Conf)). -spec open(clientinfo(), conninfo()) -> {_IsPresent :: true, session(), []} | false. @@ -163,10 +165,9 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> ok = emqx_cm:discard_session(ClientID), case session_open(ClientID, ConnInfo) of Session0 = #{} -> - ensure_timers(), ReceiveMaximum = receive_maximum(ConnInfo), Session = Session0#{receive_maximum => ReceiveMaximum}, - {true, Session, []}; + {true, ensure_timers(Session), []}; false -> false end. @@ -404,7 +405,7 @@ deliver(_ClientInfo, _Delivers, Session) -> handle_timeout( _ClientInfo, ?TIMER_PULL, - Session = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} + Session0 = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} ) -> {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( fun @@ -426,12 +427,11 @@ handle_timeout( [_ | _] -> 0 end, - ensure_timer(?TIMER_PULL, Timeout), - {ok, Publishes, Session#{inflight := Inflight}}; + Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session0#{inflight := Inflight}), + {ok, Publishes, Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) -> renew_streams(Session), - ensure_timer(?TIMER_GET_STREAMS), - {ok, [], Session}; + {ok, [], emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session)}; handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) -> %% Note: we take a pessimistic approach here and assume that the client will be alive %% until the next bump timeout. With this, we avoid garbage collecting this session @@ -440,8 +440,8 @@ handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) -> BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), EstimatedLastAliveAt = now_ms() + BumpInterval, Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt), - ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT), - {ok, [], Session}. + BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), + {ok, [], emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, BumpInterval, Session)}. -spec replay(clientinfo(), [], session()) -> {ok, replies(), session()}. @@ -961,22 +961,11 @@ export_record(_, _, [], Acc) -> %% TODO: find a more reliable way to perform actions that have side %% effects. Add `CBM:init' callback to the session behavior? -ensure_timers() -> - ensure_timer(?TIMER_PULL), - ensure_timer(?TIMER_GET_STREAMS), - ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT). - --spec ensure_timer(timer()) -> ok. -ensure_timer(bump_last_alive_at = Type) -> - BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), - ensure_timer(Type, BumpInterval); -ensure_timer(Type) -> - ensure_timer(Type, 100). - --spec ensure_timer(timer(), non_neg_integer()) -> ok. -ensure_timer(Type, Timeout) -> - _ = emqx_utils:start_timer(Timeout, {emqx_session, Type}), - ok. +-spec ensure_timers(session()) -> session(). +ensure_timers(Session0) -> + Session1 = emqx_session:ensure_timer(?TIMER_PULL, 100, Session0), + Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1), + emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2). -spec receive_maximum(conninfo()) -> pos_integer(). receive_maximum(ConnInfo) -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 239919179..bf12c933f 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -111,8 +111,7 @@ reply/0, replies/0, common_timer_name/0, - custom_timer_name/0, - timerset/0 + custom_timer_name/0 ]). -type session_id() :: _TODO. @@ -154,8 +153,6 @@ emqx_session_mem:session() | emqx_persistent_session_ds:session(). --type timerset() :: #{custom_timer_name() => _TimerRef :: reference()}. - -define(INFO_KEYS, [ id, created_at, @@ -477,19 +474,19 @@ handle_timeout(ClientInfo, Timer, Session) -> %%-------------------------------------------------------------------- --spec ensure_timer(custom_timer_name(), timeout(), timerset()) -> - timerset(). -ensure_timer(Name, Time, Timers = #{}) when Time > 0 -> +-spec ensure_timer(custom_timer_name(), timeout(), map()) -> + map(). +ensure_timer(Name, Time, Timers = #{}) when Time >= 0 -> TRef = emqx_utils:start_timer(Time, {?MODULE, Name}), Timers#{Name => TRef}. --spec reset_timer(custom_timer_name(), timeout(), timerset()) -> - timerset(). -reset_timer(Name, Time, Channel) -> - ensure_timer(Name, Time, cancel_timer(Name, Channel)). +-spec reset_timer(custom_timer_name(), timeout(), map()) -> + map(). +reset_timer(Name, Time, Timers) -> + ensure_timer(Name, Time, cancel_timer(Name, Timers)). --spec cancel_timer(custom_timer_name(), timerset()) -> - timerset(). +-spec cancel_timer(custom_timer_name(), map()) -> + map(). cancel_timer(Name, Timers0) -> case maps:take(Name, Timers0) of {TRef, Timers} -> From 4717e56fb6ef40e8c5a1b7f28e47e685b5b6864f Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 1 Dec 2023 02:45:15 +0100 Subject: [PATCH 05/10] fix(sessds): Schedule poll immediately upon receiving an ack This commit affects the flow control, and improves the throughput by removing a delay between freeing up space in the in-flight window and polling new messages. --- apps/emqx/src/emqx_persistent_session_ds.erl | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 6b94f3d74..11801b098 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -338,9 +338,9 @@ publish(_PacketId, Msg, Session) -> puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) -> case emqx_persistent_message_ds_replayer:commit_offset(Id, ack, PacketId, Inflight0) of {true, Inflight} -> - %% TODO + %% TODO: we pass a bogus message into the hook: Msg = emqx_message:make(Id, <<>>, <<>>), - {ok, Msg, [], Session#{inflight => Inflight}}; + {ok, Msg, [], pull_now(Session#{inflight => Inflight})}; {false, _} -> %% Invalid Packet Id {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -356,9 +356,9 @@ puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) -> pubrec(PacketId, Session = #{id := Id, inflight := Inflight0}) -> case emqx_persistent_message_ds_replayer:commit_offset(Id, rec, PacketId, Inflight0) of {true, Inflight} -> - %% TODO + %% TODO: we pass a bogus message into the hook: Msg = emqx_message:make(Id, <<>>, <<>>), - {ok, Msg, Session#{inflight => Inflight}}; + {ok, Msg, pull_now(Session#{inflight => Inflight})}; {false, _} -> %% Invalid Packet Id {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -967,6 +967,10 @@ ensure_timers(Session0) -> Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1), emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2). +-spec pull_now(session()) -> session(). +pull_now(Session) -> + emqx_session:reset_timer(?TIMER_PULL, 0, Session). + -spec receive_maximum(conninfo()) -> pos_integer(). receive_maximum(ConnInfo) -> %% Note: the default value should be always set by the channel From 0e625d814a13e9144d8e60c730e8d0a2a11131e2 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 1 Dec 2023 03:29:41 +0100 Subject: [PATCH 06/10] feat(sessds): Make batch size configurable This change affects flow control. It allows to configure maximum size of a batch, as well as fetch threshold. --- .../src/emqx_persistent_message_ds_replayer.erl | 3 ++- apps/emqx/src/emqx_persistent_session_ds.erl | 4 +++- apps/emqx/src/emqx_schema.erl | 16 ++++++++++++++++ rel/i18n/emqx_schema.hocon | 12 ++++++++++++ 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index fb8170904..2bd312561 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -169,7 +169,8 @@ commit_offset( -spec poll(reply_fun(), emqx_persistent_session_ds:id(), inflight(), pos_integer()) -> {emqx_session:replies(), inflight()}. poll(ReplyFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < ?EPOCH_SIZE -> - FetchThreshold = max(1, WindowSize div 2), + MinBatchSize = emqx_config:get([session_persistence, min_batch_size]), + FetchThreshold = min(MinBatchSize, ceil(WindowSize / 2)), FreeSpace = WindowSize - n_inflight(Inflight0), case FreeSpace >= FetchThreshold of false -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 11801b098..b50ac8c64 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -407,6 +407,8 @@ handle_timeout( ?TIMER_PULL, Session0 = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} ) -> + MaxBatchSize = emqx_config:get([session_persistence, max_batch_size]), + BatchSize = min(ReceiveMaximum, MaxBatchSize), {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( fun (_Seqno, Message = #message{qos = ?QOS_0}) -> @@ -417,7 +419,7 @@ handle_timeout( end, Id, Inflight0, - ReceiveMaximum + BatchSize ), IdlePollInterval = emqx_config:get([session_persistence, idle_poll_interval]), Timeout = diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index f46387d3b..cdb1035df 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1773,6 +1773,22 @@ fields("session_persistence") -> } } )}, + {"max_batch_size", + sc( + pos_integer(), + #{ + default => 1000, + desc => ?DESC(session_ds_max_batch_size) + } + )}, + {"min_batch_size", + sc( + pos_integer(), + #{ + default => 100, + desc => ?DESC(session_ds_min_batch_size) + } + )}, {"idle_poll_interval", sc( timeout_duration(), diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 2a6fb03ba..96c9c5824 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1577,4 +1577,16 @@ session_ds_session_gc_interval.desc: session_ds_session_gc_batch_size.desc: """The size of each batch of expired persistent sessions to be garbage collected per iteration.""" +session_ds_max_batch_size.desc: +"""This value affects the flow control for the persistent sessions. +The session queries the DB for the new messages in batches. +Size of the batch doesn't exceed this value or `RecieveMaximum`, whichever is smaller.""" + +session_ds_min_batch_size.desc: +"""This value affects the flow control for the persistent sessions. +The session will query the DB for the new messages when the value of `FreeSpace` variable is larger than this value or `ReceiveMaximum` / 2, whichever is smaller. + +FreeSpace is calculated as `ReceiveMaximum` for the session - number of inflight messages.""" + + } From 0ae618d010216e20acc2a80153939f04fe987e21 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 1 Dec 2023 03:41:38 +0100 Subject: [PATCH 07/10] fix(ds): Use emqx_rpc for calls that work with large binaries --- apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 0d7972466..3b7c36082 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -69,7 +69,7 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> | {ok, end_of_stream} | {error, _}. next(Node, DB, Shard, Iter, BatchSize) -> - erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]). + emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]). -spec store_batch( node(), @@ -80,7 +80,9 @@ next(Node, DB, Shard, Iter, BatchSize) -> ) -> emqx_ds:store_batch_result(). store_batch(Node, DB, Shard, Batch, Options) -> - erpc:call(Node, emqx_ds_replication_layer, do_store_batch_v1, [DB, Shard, Batch, Options]). + emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [ + DB, Shard, Batch, Options + ]). %%================================================================================ %% behavior callbacks From e238602533d6a52b3ad0dc33c52af92280e7353a Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 1 Dec 2023 06:10:28 +0100 Subject: [PATCH 08/10] fix(ds): Update README --- apps/emqx_durable_storage/README.md | 42 +++++++++++++++++++---------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/apps/emqx_durable_storage/README.md b/apps/emqx_durable_storage/README.md index f01af0c37..bc8eae2d0 100644 --- a/apps/emqx_durable_storage/README.md +++ b/apps/emqx_durable_storage/README.md @@ -1,36 +1,50 @@ # EMQX Replay -`emqx_ds` is a generic durable storage for MQTT messages within EMQX. +`emqx_ds` is an application implementing durable storage for MQTT messages within EMQX. -Concepts: +# Features +- Streams. Stream is an abstraction that encompasses topics, shards, different data layouts, etc. + The client application must only aware of the streams. +- Batching. All the API functions are batch-oriented. -> 0. App overview introduction -> 1. let people know what your project can do specifically. Is it a base -> library dependency, or what kind of functionality is provided to the user? -> 2. Provide context and add a link to any reference visitors might be -> unfamiliar with. -> 3. Design details, implementation technology architecture, Roadmap, etc. +- Iterators. Iterators can be stored durably or transferred over network. + They take relatively small space. -# [Features] - [Optional] -> A List of features your application provided. If the feature is quite simple, just -> list in the previous section. +- Support for various backends. Almost any DBMS that supports range + queries can serve as a `emqx_durable_storage` backend. + +- Builtin backend based on RocksDB. + - Changing storage layout on the fly: it's achieved by creating a + new set of tables (known as "generation") and the schema. + - Sharding based on publisher's client ID # Limitation -TBD + +- Builtin backend currently doesn't replicate data across different sites +- There is no local cache of messages, which may result in transferring the same data multiple times # Documentation links TBD # Usage -TBD + +Currently it's only used to implement persistent sessions. + +In the future it can serve as a storage for retained messages or as a generic message buffering layer for the bridges. # Configurations -TBD + +`emqx_durable_storage` doesn't have any configurable parameters. +Instead, it relies on the upper-level business applications to create +a correct configuration and pass it to `emqx_ds:open_db(DBName, Config)` +function according to its needs. # HTTP APIs +None + # Other TBD From e1ec560639122214559588be3ead3a3dd52c6c3c Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 1 Dec 2023 10:06:18 +0100 Subject: [PATCH 09/10] test(emqx): Fix flaky emqx_takeover_SUITE:t_takeover testcase This testcase uses QoS1, so it must account for possible duplication of the messages. --- apps/emqx/test/emqx_takeover_SUITE.erl | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 97616c947..03a48e174 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -145,15 +145,16 @@ assert_messages_missed(Ls1, Ls2) -> assert_messages_order([], []) -> ok; -assert_messages_order([Msg | Ls1], [#{payload := No} | Ls2]) -> - case emqx_message:payload(Msg) == No of - false -> +assert_messages_order([Msg | Expected], Received) -> + %% Account for duplicate messages: + case lists:splitwith(fun(#{payload := P}) -> emqx_message:payload(Msg) == P end, Received) of + {[], [#{payload := Mismatch} | _]} -> ct:fail("Message order is not correct, expected: ~p, received: ~p", [ - emqx_message:payload(Msg), No + emqx_message:payload(Msg), Mismatch ]), error; - true -> - assert_messages_order(Ls1, Ls2) + {_Matching, Rest} -> + assert_messages_order(Expected, Rest) end. messages(Offset, Cnt) -> From 2f282dd3f1467a7cd9a24e49b579f7facad986e4 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 1 Dec 2023 16:53:47 +0100 Subject: [PATCH 10/10] fix(sessds): Apply suggestions from code review to the docs Co-authored-by: Thales Macedo Garitezi --- rel/i18n/emqx_schema.hocon | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 96c9c5824..c7a73ec33 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1580,13 +1580,13 @@ session_ds_session_gc_batch_size.desc: session_ds_max_batch_size.desc: """This value affects the flow control for the persistent sessions. The session queries the DB for the new messages in batches. -Size of the batch doesn't exceed this value or `RecieveMaximum`, whichever is smaller.""" +Size of the batch doesn't exceed this value or `ReceiveMaximum`, whichever is smaller.""" session_ds_min_batch_size.desc: """This value affects the flow control for the persistent sessions. The session will query the DB for the new messages when the value of `FreeSpace` variable is larger than this value or `ReceiveMaximum` / 2, whichever is smaller. -FreeSpace is calculated as `ReceiveMaximum` for the session - number of inflight messages.""" +`FreeSpace` is calculated as `ReceiveMaximum` for the session - number of inflight messages.""" }