diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index fb8170904..2bd312561 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -169,7 +169,8 @@ commit_offset( -spec poll(reply_fun(), emqx_persistent_session_ds:id(), inflight(), pos_integer()) -> {emqx_session:replies(), inflight()}. poll(ReplyFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < ?EPOCH_SIZE -> - FetchThreshold = max(1, WindowSize div 2), + MinBatchSize = emqx_config:get([session_persistence, min_batch_size]), + FetchThreshold = min(MinBatchSize, ceil(WindowSize / 2)), FreeSpace = WindowSize - n_inflight(Inflight0), case FreeSpace >= FetchThreshold of false -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 9844e6d48..b50ac8c64 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -96,6 +96,12 @@ props := map(), extra := map() }. + +-define(TIMER_PULL, timer_pull). +-define(TIMER_GET_STREAMS, timer_get_streams). +-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at). +-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT. + -type session() :: #{ %% Client ID id := id(), @@ -111,6 +117,8 @@ receive_maximum := pos_integer(), %% Connection Info conninfo := emqx_types:conninfo(), + %% Timers + timer() => reference(), %% props := map() }. @@ -120,7 +128,6 @@ -type clientinfo() :: emqx_types:clientinfo(). -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). --type timer() :: pull | get_streams | bump_last_alive_at. -define(STATS_KEYS, [ subscriptions_cnt, @@ -144,8 +151,7 @@ session(). create(#{clientid := ClientID}, ConnInfo, Conf) -> % TODO: expiration - ensure_timers(), - ensure_session(ClientID, ConnInfo, Conf). + ensure_timers(ensure_session(ClientID, ConnInfo, Conf)). -spec open(clientinfo(), conninfo()) -> {_IsPresent :: true, session(), []} | false. @@ -159,10 +165,9 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> ok = emqx_cm:discard_session(ClientID), case session_open(ClientID, ConnInfo) of Session0 = #{} -> - ensure_timers(), ReceiveMaximum = receive_maximum(ConnInfo), Session = Session0#{receive_maximum => ReceiveMaximum}, - {true, Session, []}; + {true, ensure_timers(Session), []}; false -> false end. @@ -333,9 +338,9 @@ publish(_PacketId, Msg, Session) -> puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) -> case emqx_persistent_message_ds_replayer:commit_offset(Id, ack, PacketId, Inflight0) of {true, Inflight} -> - %% TODO + %% TODO: we pass a bogus message into the hook: Msg = emqx_message:make(Id, <<>>, <<>>), - {ok, Msg, [], Session#{inflight => Inflight}}; + {ok, Msg, [], pull_now(Session#{inflight => Inflight})}; {false, _} -> %% Invalid Packet Id {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -351,9 +356,9 @@ puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) -> pubrec(PacketId, Session = #{id := Id, inflight := Inflight0}) -> case emqx_persistent_message_ds_replayer:commit_offset(Id, rec, PacketId, Inflight0) of {true, Inflight} -> - %% TODO + %% TODO: we pass a bogus message into the hook: Msg = emqx_message:make(Id, <<>>, <<>>), - {ok, Msg, Session#{inflight => Inflight}}; + {ok, Msg, pull_now(Session#{inflight => Inflight})}; {false, _} -> %% Invalid Packet Id {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -399,9 +404,11 @@ deliver(_ClientInfo, _Delivers, Session) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. handle_timeout( _ClientInfo, - pull, - Session = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} + ?TIMER_PULL, + Session0 = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} ) -> + MaxBatchSize = emqx_config:get([session_persistence, max_batch_size]), + BatchSize = min(ReceiveMaximum, MaxBatchSize), {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( fun (_Seqno, Message = #message{qos = ?QOS_0}) -> @@ -412,7 +419,7 @@ handle_timeout( end, Id, Inflight0, - ReceiveMaximum + BatchSize ), IdlePollInterval = emqx_config:get([session_persistence, idle_poll_interval]), Timeout = @@ -422,13 +429,12 @@ handle_timeout( [_ | _] -> 0 end, - ensure_timer(pull, Timeout), - {ok, Publishes, Session#{inflight := Inflight}}; -handle_timeout(_ClientInfo, get_streams, Session) -> + Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session0#{inflight := Inflight}), + {ok, Publishes, Session}; +handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) -> renew_streams(Session), - ensure_timer(get_streams), - {ok, [], Session}; -handle_timeout(_ClientInfo, bump_last_alive_at, Session0) -> + {ok, [], emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session)}; +handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) -> %% Note: we take a pessimistic approach here and assume that the client will be alive %% until the next bump timeout. With this, we avoid garbage collecting this session %% too early in case the session/connection/node crashes earlier without having time @@ -436,8 +442,8 @@ handle_timeout(_ClientInfo, bump_last_alive_at, Session0) -> BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), EstimatedLastAliveAt = now_ms() + BumpInterval, Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt), - ensure_timer(bump_last_alive_at), - {ok, [], Session}. + BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), + {ok, [], emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, BumpInterval, Session)}. -spec replay(clientinfo(), [], session()) -> {ok, replies(), session()}. @@ -957,22 +963,15 @@ export_record(_, _, [], Acc) -> %% TODO: find a more reliable way to perform actions that have side %% effects. Add `CBM:init' callback to the session behavior? -ensure_timers() -> - ensure_timer(pull), - ensure_timer(get_streams), - ensure_timer(bump_last_alive_at). +-spec ensure_timers(session()) -> session(). +ensure_timers(Session0) -> + Session1 = emqx_session:ensure_timer(?TIMER_PULL, 100, Session0), + Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1), + emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2). --spec ensure_timer(timer()) -> ok. -ensure_timer(bump_last_alive_at = Type) -> - BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), - ensure_timer(Type, BumpInterval); -ensure_timer(Type) -> - ensure_timer(Type, 100). - --spec ensure_timer(timer(), non_neg_integer()) -> ok. -ensure_timer(Type, Timeout) -> - _ = emqx_utils:start_timer(Timeout, {emqx_session, Type}), - ok. +-spec pull_now(session()) -> session(). +pull_now(Session) -> + emqx_session:reset_timer(?TIMER_PULL, 0, Session). -spec receive_maximum(conninfo()) -> pos_integer(). receive_maximum(ConnInfo) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index f46387d3b..cdb1035df 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1773,6 +1773,22 @@ fields("session_persistence") -> } } )}, + {"max_batch_size", + sc( + pos_integer(), + #{ + default => 1000, + desc => ?DESC(session_ds_max_batch_size) + } + )}, + {"min_batch_size", + sc( + pos_integer(), + #{ + default => 100, + desc => ?DESC(session_ds_min_batch_size) + } + )}, {"idle_poll_interval", sc( timeout_duration(), diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 108e8ec09..bf12c933f 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -111,8 +111,7 @@ reply/0, replies/0, common_timer_name/0, - custom_timer_name/0, - timerset/0 + custom_timer_name/0 ]). -type session_id() :: _TODO. @@ -154,8 +153,6 @@ emqx_session_mem:session() | emqx_persistent_session_ds:session(). --type timerset() :: #{custom_timer_name() => _TimerRef :: reference()}. - -define(INFO_KEYS, [ id, created_at, @@ -477,28 +474,26 @@ handle_timeout(ClientInfo, Timer, Session) -> %%-------------------------------------------------------------------- --spec ensure_timer(custom_timer_name(), timeout(), timerset()) -> - timerset(). -ensure_timer(Name, _Time, Timers = #{}) when is_map_key(Name, Timers) -> - Timers; -ensure_timer(Name, Time, Timers = #{}) when Time > 0 -> +-spec ensure_timer(custom_timer_name(), timeout(), map()) -> + map(). +ensure_timer(Name, Time, Timers = #{}) when Time >= 0 -> TRef = emqx_utils:start_timer(Time, {?MODULE, Name}), Timers#{Name => TRef}. --spec reset_timer(custom_timer_name(), timeout(), timerset()) -> - timerset(). -reset_timer(Name, Time, Channel) -> - ensure_timer(Name, Time, cancel_timer(Name, Channel)). +-spec reset_timer(custom_timer_name(), timeout(), map()) -> + map(). +reset_timer(Name, Time, Timers) -> + ensure_timer(Name, Time, cancel_timer(Name, Timers)). --spec cancel_timer(custom_timer_name(), timerset()) -> - timerset(). -cancel_timer(Name, Timers) -> - case maps:take(Name, Timers) of - {TRef, NTimers} -> +-spec cancel_timer(custom_timer_name(), map()) -> + map(). +cancel_timer(Name, Timers0) -> + case maps:take(Name, Timers0) of + {TRef, Timers} -> ok = emqx_utils:cancel_timer(TRef), - NTimers; + Timers; error -> - Timers + Timers0 end. %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 97616c947..03a48e174 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -145,15 +145,16 @@ assert_messages_missed(Ls1, Ls2) -> assert_messages_order([], []) -> ok; -assert_messages_order([Msg | Ls1], [#{payload := No} | Ls2]) -> - case emqx_message:payload(Msg) == No of - false -> +assert_messages_order([Msg | Expected], Received) -> + %% Account for duplicate messages: + case lists:splitwith(fun(#{payload := P}) -> emqx_message:payload(Msg) == P end, Received) of + {[], [#{payload := Mismatch} | _]} -> ct:fail("Message order is not correct, expected: ~p, received: ~p", [ - emqx_message:payload(Msg), No + emqx_message:payload(Msg), Mismatch ]), error; - true -> - assert_messages_order(Ls1, Ls2) + {_Matching, Rest} -> + assert_messages_order(Expected, Rest) end. messages(Offset, Cnt) -> diff --git a/apps/emqx_durable_storage/README.md b/apps/emqx_durable_storage/README.md index f01af0c37..bc8eae2d0 100644 --- a/apps/emqx_durable_storage/README.md +++ b/apps/emqx_durable_storage/README.md @@ -1,36 +1,50 @@ # EMQX Replay -`emqx_ds` is a generic durable storage for MQTT messages within EMQX. +`emqx_ds` is an application implementing durable storage for MQTT messages within EMQX. -Concepts: +# Features +- Streams. Stream is an abstraction that encompasses topics, shards, different data layouts, etc. + The client application must only aware of the streams. +- Batching. All the API functions are batch-oriented. -> 0. App overview introduction -> 1. let people know what your project can do specifically. Is it a base -> library dependency, or what kind of functionality is provided to the user? -> 2. Provide context and add a link to any reference visitors might be -> unfamiliar with. -> 3. Design details, implementation technology architecture, Roadmap, etc. +- Iterators. Iterators can be stored durably or transferred over network. + They take relatively small space. -# [Features] - [Optional] -> A List of features your application provided. If the feature is quite simple, just -> list in the previous section. +- Support for various backends. Almost any DBMS that supports range + queries can serve as a `emqx_durable_storage` backend. + +- Builtin backend based on RocksDB. + - Changing storage layout on the fly: it's achieved by creating a + new set of tables (known as "generation") and the schema. + - Sharding based on publisher's client ID # Limitation -TBD + +- Builtin backend currently doesn't replicate data across different sites +- There is no local cache of messages, which may result in transferring the same data multiple times # Documentation links TBD # Usage -TBD + +Currently it's only used to implement persistent sessions. + +In the future it can serve as a storage for retained messages or as a generic message buffering layer for the bridges. # Configurations -TBD + +`emqx_durable_storage` doesn't have any configurable parameters. +Instead, it relies on the upper-level business applications to create +a correct configuration and pass it to `emqx_ds:open_db(DBName, Config)` +function according to its needs. # HTTP APIs +None + # Other TBD diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 0d7972466..3b7c36082 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -69,7 +69,7 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> | {ok, end_of_stream} | {error, _}. next(Node, DB, Shard, Iter, BatchSize) -> - erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]). + emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]). -spec store_batch( node(), @@ -80,7 +80,9 @@ next(Node, DB, Shard, Iter, BatchSize) -> ) -> emqx_ds:store_batch_result(). store_batch(Node, DB, Shard, Batch, Options) -> - erpc:call(Node, emqx_ds_replication_layer, do_store_batch_v1, [DB, Shard, Batch, Options]). + emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [ + DB, Shard, Batch, Options + ]). %%================================================================================ %% behavior callbacks diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 2a6fb03ba..c7a73ec33 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1577,4 +1577,16 @@ session_ds_session_gc_interval.desc: session_ds_session_gc_batch_size.desc: """The size of each batch of expired persistent sessions to be garbage collected per iteration.""" +session_ds_max_batch_size.desc: +"""This value affects the flow control for the persistent sessions. +The session queries the DB for the new messages in batches. +Size of the batch doesn't exceed this value or `ReceiveMaximum`, whichever is smaller.""" + +session_ds_min_batch_size.desc: +"""This value affects the flow control for the persistent sessions. +The session will query the DB for the new messages when the value of `FreeSpace` variable is larger than this value or `ReceiveMaximum` / 2, whichever is smaller. + +`FreeSpace` is calculated as `ReceiveMaximum` for the session - number of inflight messages.""" + + }