From 82e74d02015fce9ade5ed5c87ed0d29070af0041 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 21 Nov 2023 20:06:27 +0100 Subject: [PATCH 1/5] feat(ds): Add a flag that forces all sessions to become durable --- apps/emqx/src/emqx_persistent_message.erl | 8 +++++++- apps/emqx/src/emqx_session.erl | 18 ++++++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 30ebe7417..2a852627d 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -19,7 +19,7 @@ -include("emqx.hrl"). -export([init/0]). --export([is_persistence_enabled/0]). +-export([is_persistence_enabled/0, force_ds/0]). %% Message persistence -export([ @@ -54,6 +54,12 @@ is_persistence_enabled() -> storage_backend() -> storage_backend(emqx_config:get([session_persistence, storage])). +%% Dev-only option: force all messages to go through +%% `emqx_persistent_session_ds': +-spec force_ds() -> boolean(). +force_ds() -> + emqx_config:get([session_persistence, force_ds]). + storage_backend(#{ builtin := #{enable := true, n_shards := NShards, replication_factor := ReplicationFactor} }) -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index ba49d3f85..64ef2e30d 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -626,12 +626,18 @@ choose_impl_candidates(#{expiry_interval := EI}) -> choose_impl_candidates(_, _IsPSStoreEnabled = false) -> [emqx_session_mem]; choose_impl_candidates(0, _IsPSStoreEnabled = true) -> - %% NOTE - %% If ExpiryInterval is 0, the natural choice is `emqx_session_mem`. Yet we still - %% need to look the existing session up in the `emqx_persistent_session_ds` store - %% first, because previous connection may have set ExpiryInterval to a non-zero - %% value. - [emqx_session_mem, emqx_persistent_session_ds]; + case emqx_persistent_message:force_ds() of + false -> + %% NOTE + %% If ExpiryInterval is 0, the natural choice is + %% `emqx_session_mem'. Yet we still need to look the + %% existing session up in the `emqx_persistent_session_ds' + %% store first, because previous connection may have set + %% ExpiryInterval to a non-zero value. + [emqx_session_mem, emqx_persistent_session_ds]; + true -> + [emqx_persistent_session_ds] + end; choose_impl_candidates(EI, _IsPSStoreEnabled = true) when EI > 0 -> [emqx_persistent_session_ds]. From 1ced8786fdea13c51ad47016d9dd6eec0367ab74 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 22 Nov 2023 01:55:58 +0100 Subject: [PATCH 2/5] feat(ds): Make session poll interval configurable --- apps/emqx/src/emqx_persistent_message.erl | 2 +- apps/emqx/src/emqx_persistent_session_ds.erl | 4 ++-- apps/emqx/src/emqx_schema.erl | 17 +++++++++++++++++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 2a852627d..c8ad490f8 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -58,7 +58,7 @@ storage_backend() -> %% `emqx_persistent_session_ds': -spec force_ds() -> boolean(). force_ds() -> - emqx_config:get([session_persistence, force_ds]). + emqx_config:get([session_persistence, force_persistence]). storage_backend(#{ builtin := #{enable := true, n_shards := NShards, replication_factor := ReplicationFactor} diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 928115a52..420defb18 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -349,11 +349,11 @@ handle_timeout( Session = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} ) -> {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, ReceiveMaximum), - %% TODO: make these values configurable: + IdlePollInterval = emqx_config:get([session_persistence, idle_poll_interval]), Timeout = case Publishes of [] -> - 100; + IdlePollInterval; [_ | _] -> 0 end, diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 8e401a442..2986950e7 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1772,6 +1772,23 @@ fields("session_persistence") -> <<"builtin">> => #{} } } + )}, + {"idle_poll_interval", + sc( + duration(), + #{ + default => 100, + desc => ?DESC(session_ds_idle_poll_interval) + } + )}, + {"force_persistence", + sc( + boolean(), + #{ + default => false, + %% Only for testing, shall remain hidden + importance => ?IMPORTANCE_HIDDEN + } )} ]; fields("session_storage_backend") -> From c5bb86db67a06c8b7c4415ddd5a86ea8c4703f2d Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 15 Nov 2023 13:24:49 +0100 Subject: [PATCH 3/5] feat(ds): Support QoS 0 --- apps/emqx/src/emqx_message.erl | 4 +- .../emqx_persistent_message_ds_replayer.erl | 37 +++++++++++-------- apps/emqx/src/emqx_persistent_session_ds.erl | 2 +- apps/emqx/src/emqx_schema.erl | 4 +- .../test/emqx_persistent_messages_SUITE.erl | 29 ++++++++++++++- .../src/emqx_ds_replication_layer_meta.erl | 33 ++++++++++++++++- 6 files changed, 87 insertions(+), 22 deletions(-) diff --git a/apps/emqx/src/emqx_message.erl b/apps/emqx/src/emqx_message.erl index 4ff36504d..b65c8360f 100644 --- a/apps/emqx/src/emqx_message.erl +++ b/apps/emqx/src/emqx_message.erl @@ -301,7 +301,9 @@ update_expiry(Msg) -> Msg. %% @doc Message to PUBLISH Packet. --spec to_packet(emqx_types:packet_id(), emqx_types:message()) -> +%% +%% When QoS=0 then packet id must be `undefined' +-spec to_packet(emqx_types:packet_id() | undefined, emqx_types:message()) -> emqx_types:packet(). to_packet( PacketId, diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 64b9cabb4..d622444e9 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -27,6 +27,7 @@ -export_type([inflight/0, seqno/0]). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_utils/include/emqx_message.hrl"). -include("emqx_persistent_session_ds.hrl"). -ifdef(TEST). @@ -176,9 +177,12 @@ fetch(SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 -> #inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0, ItBegin = get_last_iterator(DSStream, Ranges), {ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N), - {Publishes, UntilSeqno} = publish(FirstSeqno, Messages), - case range_size(FirstSeqno, UntilSeqno) of - Size when Size > 0 -> + case Messages of + [] -> + fetch(SessionId, Inflight0, Streams, N, Acc); + _ -> + {Publishes, UntilSeqno} = publish(FirstSeqno, Messages, _PreserveQoS0 = true), + Size = range_size(FirstSeqno, UntilSeqno), %% We need to preserve the iterator pointing to the beginning of the %% range, so that we can replay it if needed. Range0 = #ds_pubrange{ @@ -197,9 +201,7 @@ fetch(SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 -> next_seqno = UntilSeqno, offset_ranges = Ranges ++ [Range] }, - fetch(SessionId, Inflight, Streams, N - Size, [Publishes | Acc]); - 0 -> - fetch(SessionId, Inflight0, Streams, N, Acc) + fetch(SessionId, Inflight, Streams, N - Size, [Publishes | Acc]) end; fetch(_SessionId, Inflight, _Streams, _N, Acc) -> Publishes = lists:append(lists:reverse(Acc)), @@ -268,7 +270,7 @@ replay_range( end, MessagesReplay = [emqx_message:set_flag(dup, true, Msg) || Msg <- MessagesUnacked], %% Asserting that range is consistent with the message storage state. - {Replies, Until} = publish(FirstUnacked, MessagesReplay), + {Replies, Until} = publish(FirstUnacked, MessagesReplay, _PreserveQoS0 = false), %% Again, we need to keep the iterator pointing past the end of the %% range, so that we can pick up where we left off. Range = Range0#ds_pubrange{iterator = ItNext}, @@ -276,15 +278,18 @@ replay_range( replay_range(Range0 = #ds_pubrange{type = checkpoint}, _AckedUntil, Acc) -> {Range0, Acc}. -publish(FirstSeqno, Messages) -> - lists:mapfoldl( - fun(Message, Seqno) -> - PacketId = seqno_to_packet_id(Seqno), - {{PacketId, Message}, next_seqno(Seqno)} - end, - FirstSeqno, - Messages - ). +publish(FirstSeqNo, Messages, PreserveQos0) -> + do_publish(FirstSeqNo, Messages, PreserveQos0, []). + +do_publish(SeqNo, [], _, Acc) -> + {lists:reverse(Acc), SeqNo}; +do_publish(SeqNo, [#message{qos = 0} | Messages], false, Acc) -> + do_publish(SeqNo, Messages, false, Acc); +do_publish(SeqNo, [#message{qos = 0} = Message | Messages], true, Acc) -> + do_publish(SeqNo, Messages, true, [{undefined, Message} | Acc]); +do_publish(SeqNo, [Message | Messages], PreserveQos0, Acc) -> + PacketId = seqno_to_packet_id(SeqNo), + do_publish(next_seqno(SeqNo), Messages, PreserveQos0, [{PacketId, Message} | Acc]). -spec preserve_range(ds_pubrange()) -> ok. preserve_range(Range = #ds_pubrange{type = inflight}) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 420defb18..6df8de892 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -338,7 +338,7 @@ pubcomp(_ClientInfo, _PacketId, _Session = #{}) -> -spec deliver(clientinfo(), [emqx_types:deliver()], session()) -> {ok, replies(), session()}. deliver(_ClientInfo, _Delivers, Session) -> - %% TODO: QoS0 and system messages end up here. + %% TODO: system messages end up here. {ok, [], Session}. -spec handle_timeout(clientinfo(), _Timeout, session()) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 2986950e7..045a9acb3 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1775,9 +1775,9 @@ fields("session_persistence") -> )}, {"idle_poll_interval", sc( - duration(), + timeout_duration(), #{ - default => 100, + default => <<"100ms">>, desc => ?DESC(session_ds_idle_poll_interval) } )}, diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 922d7248f..e750b1251 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -233,6 +233,31 @@ t_session_subscription_iterators(Config) -> ), ok. +t_qos0(Config) -> + Sub = connect(<>, true, 30), + Pub = connect(<>, true, 0), + try + {ok, _, [1]} = emqtt:subscribe(Sub, <<"t/#">>, qos1), + + Messages = [ + {<<"t/1">>, <<"1">>, 0}, + {<<"t/1">>, <<"2">>, 1}, + {<<"t/1">>, <<"3">>, 0} + ], + [emqtt:publish(Pub, Topic, Payload, Qos) || {Topic, Payload, Qos} <- Messages], + ?assertMatch( + [ + #{qos := 0, topic := <<"t/1">>, payload := <<"1">>}, + #{qos := 1, topic := <<"t/1">>, payload := <<"2">>}, + #{qos := 0, topic := <<"t/1">>, payload := <<"3">>} + ], + receive_messages(3) + ) + after + emqtt:stop(Sub), + emqtt:stop(Pub) + end. + %% connect(ClientId, CleanStart, EI) -> @@ -273,7 +298,7 @@ consume(It) -> end. receive_messages(Count) -> - receive_messages(Count, []). + lists:reverse(receive_messages(Count, [])). receive_messages(0, Msgs) -> Msgs; @@ -307,4 +332,6 @@ get_mqtt_port(Node, Type) -> clear_db() -> ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), + mria:stop(), + ok = mnesia:delete_schema([node()]), ok. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index f7dbc828f..077df28d0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -34,7 +34,8 @@ drop_db/1, shard_leader/2, this_site/0, - set_leader/3 + set_leader/3, + print_status/0 ]). %% gen_server @@ -100,6 +101,35 @@ %% API funcions %%================================================================================ +-spec print_status() -> ok. +print_status() -> + io:format("THIS SITE:~n~s~n", [base64:encode(this_site())]), + io:format("~nSITES:~n", []), + Nodes = [node() | nodes()], + lists:foreach( + fun(#?NODE_TAB{site = Site, node = Node}) -> + Status = + case lists:member(Node, Nodes) of + true -> up; + false -> down + end, + io:format("~s ~p ~p~n", [base64:encode(Site), Node, Status]) + end, + eval_qlc(mnesia:table(?NODE_TAB)) + ), + io:format("~nSHARDS~n", []), + lists:foreach( + fun(#?SHARD_TAB{shard = {DB, Shard}, leader = Leader}) -> + Status = + case lists:member(Leader, Nodes) of + true -> up; + false -> down + end, + io:format("~p/~s ~p ~p~n", [DB, Shard, Leader, Status]) + end, + eval_qlc(mnesia:table(?SHARD_TAB)) + ). + -spec this_site() -> site(). this_site() -> persistent_term:get(?emqx_ds_builtin_site). @@ -297,6 +327,7 @@ ensure_site() -> ok; _ -> Site = crypto:strong_rand_bytes(8), + logger:notice("Creating a new site with ID=~s", [base64:encode(Site)]), ok = filelib:ensure_dir(Filename), {ok, FD} = file:open(Filename, [write]), io:format(FD, "~p.", [Site]), From a158f25a403fe077aec4c224b8219363e9248c9f Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 23 Nov 2023 20:08:55 +0100 Subject: [PATCH 4/5] fix(ds): Fix return type of emqx_persistent_session_ds:publish --- apps/emqx/src/emqx_persistent_session_ds.erl | 4 +-- .../test/emqx_persistent_messages_SUITE.erl | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 6df8de892..eb4eb0b1b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -274,12 +274,12 @@ get_subscription(TopicFilter, #{subscriptions := Subs}) -> %%-------------------------------------------------------------------- -spec publish(emqx_types:packet_id(), emqx_types:message(), session()) -> - {ok, emqx_types:publish_result(), replies(), session()} + {ok, emqx_types:publish_result(), session()} | {error, emqx_types:reason_code()}. publish(_PacketId, Msg, Session) -> %% TODO: QoS2 Result = emqx_broker:publish(Msg), - {ok, Result, [], Session}. + {ok, Result, Session}. %%-------------------------------------------------------------------- %% Client -> Broker: PUBACK diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index e750b1251..f8f7baaf1 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -258,6 +258,31 @@ t_qos0(Config) -> emqtt:stop(Pub) end. +t_publish_as_persistent(Config) -> + Sub = connect(<>, true, 30), + Pub = connect(<>, true, 30), + try + {ok, _, [1]} = emqtt:subscribe(Sub, <<"t/#">>, qos1), + Messages = [ + {<<"t/1">>, <<"1">>, 0}, + {<<"t/1">>, <<"2">>, 1}, + {<<"t/1">>, <<"3">>, 2} + ], + [emqtt:publish(Pub, Topic, Payload, Qos) || {Topic, Payload, Qos} <- Messages], + ?assertMatch( + [ + #{qos := 0, topic := <<"t/1">>, payload := <<"1">>}, + #{qos := 1, topic := <<"t/1">>, payload := <<"2">>} + %% TODO: QoS 2 + %% #{qos := 2, topic := <<"t/1">>, payload := <<"3">>} + ], + receive_messages(3) + ) + after + emqtt:stop(Sub), + emqtt:stop(Pub) + end. + %% connect(ClientId, CleanStart, EI) -> From 449bafc27ef1029f32ed403df1cb1df1a1045bf0 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 23 Nov 2023 20:29:13 +0100 Subject: [PATCH 5/5] fix(ds): LTS trie handles empty topic levels --- apps/emqx_durable_storage/src/emqx_ds_lts.erl | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index d06854fd0..d148e8cbc 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -119,7 +119,7 @@ trie_restore(Options, Dump) -> Trie. %% @doc Lookup the topic key. Create a new one, if not found. --spec topic_key(trie(), threshold_fun(), [binary()]) -> msg_storage_key(). +-spec topic_key(trie(), threshold_fun(), [binary() | '']) -> msg_storage_key(). topic_key(Trie, ThresholdFun, Tokens) -> do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, []). @@ -363,12 +363,12 @@ emanating(#trie{trie = Tab}, State, ?EOT) -> [#trans{next = Next}] -> [{?EOT, Next}]; [] -> [] end; -emanating(#trie{trie = Tab}, State, Bin) when is_binary(Bin) -> +emanating(#trie{trie = Tab}, State, Token) when is_binary(Token); Token =:= '' -> [ {Edge, Next} || #trans{key = {_, Edge}, next = Next} <- ets:lookup(Tab, {State, ?PLUS}) ++ - ets:lookup(Tab, {State, Bin}) + ets:lookup(Tab, {State, Token}) ]. %%================================================================================ @@ -533,6 +533,7 @@ topic_match_test() -> {S11, []} = test_key(T, ThresholdFun, [1, 1]), {S12, []} = test_key(T, ThresholdFun, [1, 2]), {S111, []} = test_key(T, ThresholdFun, [1, 1, 1]), + {S11e, []} = test_key(T, ThresholdFun, [1, 1, '']), %% Match concrete topics: assert_match_topics(T, [1], [{S1, []}]), assert_match_topics(T, [1, 1], [{S11, []}]), @@ -540,14 +541,16 @@ topic_match_test() -> %% Match topics with +: assert_match_topics(T, [1, '+'], [{S11, []}, {S12, []}]), assert_match_topics(T, [1, '+', 1], [{S111, []}]), + assert_match_topics(T, [1, '+', ''], [{S11e, []}]), %% Match topics with #: assert_match_topics(T, [1, '#'], [{S1, []}, {S11, []}, {S12, []}, - {S111, []}]), + {S111, []}, {S11e, []}]), assert_match_topics(T, [1, 1, '#'], [{S11, []}, - {S111, []}]), + {S111, []}, + {S11e, []}]), %% Now add learned wildcards: {S21, []} = test_key(T, ThresholdFun, [2, 1]), {S22, []} = test_key(T, ThresholdFun, [2, 2]), @@ -587,7 +590,10 @@ assert_match_topics(Trie, Filter0, Expected) -> %% erlfmt-ignore test_key(Trie, Threshold, Topic0) -> - Topic = [integer_to_binary(I) || I <- Topic0], + Topic = lists:map(fun('') -> ''; + (I) -> integer_to_binary(I) + end, + Topic0), Ret = topic_key(Trie, Threshold, Topic), %% Test idempotency: Ret1 = topic_key(Trie, Threshold, Topic),