From c5bb86db67a06c8b7c4415ddd5a86ea8c4703f2d Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 15 Nov 2023 13:24:49 +0100 Subject: [PATCH] feat(ds): Support QoS 0 --- apps/emqx/src/emqx_message.erl | 4 +- .../emqx_persistent_message_ds_replayer.erl | 37 +++++++++++-------- apps/emqx/src/emqx_persistent_session_ds.erl | 2 +- apps/emqx/src/emqx_schema.erl | 4 +- .../test/emqx_persistent_messages_SUITE.erl | 29 ++++++++++++++- .../src/emqx_ds_replication_layer_meta.erl | 33 ++++++++++++++++- 6 files changed, 87 insertions(+), 22 deletions(-) diff --git a/apps/emqx/src/emqx_message.erl b/apps/emqx/src/emqx_message.erl index 4ff36504d..b65c8360f 100644 --- a/apps/emqx/src/emqx_message.erl +++ b/apps/emqx/src/emqx_message.erl @@ -301,7 +301,9 @@ update_expiry(Msg) -> Msg. %% @doc Message to PUBLISH Packet. --spec to_packet(emqx_types:packet_id(), emqx_types:message()) -> +%% +%% When QoS=0 then packet id must be `undefined' +-spec to_packet(emqx_types:packet_id() | undefined, emqx_types:message()) -> emqx_types:packet(). to_packet( PacketId, diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 64b9cabb4..d622444e9 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -27,6 +27,7 @@ -export_type([inflight/0, seqno/0]). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_utils/include/emqx_message.hrl"). -include("emqx_persistent_session_ds.hrl"). -ifdef(TEST). @@ -176,9 +177,12 @@ fetch(SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 -> #inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0, ItBegin = get_last_iterator(DSStream, Ranges), {ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N), - {Publishes, UntilSeqno} = publish(FirstSeqno, Messages), - case range_size(FirstSeqno, UntilSeqno) of - Size when Size > 0 -> + case Messages of + [] -> + fetch(SessionId, Inflight0, Streams, N, Acc); + _ -> + {Publishes, UntilSeqno} = publish(FirstSeqno, Messages, _PreserveQoS0 = true), + Size = range_size(FirstSeqno, UntilSeqno), %% We need to preserve the iterator pointing to the beginning of the %% range, so that we can replay it if needed. Range0 = #ds_pubrange{ @@ -197,9 +201,7 @@ fetch(SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 -> next_seqno = UntilSeqno, offset_ranges = Ranges ++ [Range] }, - fetch(SessionId, Inflight, Streams, N - Size, [Publishes | Acc]); - 0 -> - fetch(SessionId, Inflight0, Streams, N, Acc) + fetch(SessionId, Inflight, Streams, N - Size, [Publishes | Acc]) end; fetch(_SessionId, Inflight, _Streams, _N, Acc) -> Publishes = lists:append(lists:reverse(Acc)), @@ -268,7 +270,7 @@ replay_range( end, MessagesReplay = [emqx_message:set_flag(dup, true, Msg) || Msg <- MessagesUnacked], %% Asserting that range is consistent with the message storage state. - {Replies, Until} = publish(FirstUnacked, MessagesReplay), + {Replies, Until} = publish(FirstUnacked, MessagesReplay, _PreserveQoS0 = false), %% Again, we need to keep the iterator pointing past the end of the %% range, so that we can pick up where we left off. Range = Range0#ds_pubrange{iterator = ItNext}, @@ -276,15 +278,18 @@ replay_range( replay_range(Range0 = #ds_pubrange{type = checkpoint}, _AckedUntil, Acc) -> {Range0, Acc}. -publish(FirstSeqno, Messages) -> - lists:mapfoldl( - fun(Message, Seqno) -> - PacketId = seqno_to_packet_id(Seqno), - {{PacketId, Message}, next_seqno(Seqno)} - end, - FirstSeqno, - Messages - ). +publish(FirstSeqNo, Messages, PreserveQos0) -> + do_publish(FirstSeqNo, Messages, PreserveQos0, []). + +do_publish(SeqNo, [], _, Acc) -> + {lists:reverse(Acc), SeqNo}; +do_publish(SeqNo, [#message{qos = 0} | Messages], false, Acc) -> + do_publish(SeqNo, Messages, false, Acc); +do_publish(SeqNo, [#message{qos = 0} = Message | Messages], true, Acc) -> + do_publish(SeqNo, Messages, true, [{undefined, Message} | Acc]); +do_publish(SeqNo, [Message | Messages], PreserveQos0, Acc) -> + PacketId = seqno_to_packet_id(SeqNo), + do_publish(next_seqno(SeqNo), Messages, PreserveQos0, [{PacketId, Message} | Acc]). -spec preserve_range(ds_pubrange()) -> ok. preserve_range(Range = #ds_pubrange{type = inflight}) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 420defb18..6df8de892 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -338,7 +338,7 @@ pubcomp(_ClientInfo, _PacketId, _Session = #{}) -> -spec deliver(clientinfo(), [emqx_types:deliver()], session()) -> {ok, replies(), session()}. deliver(_ClientInfo, _Delivers, Session) -> - %% TODO: QoS0 and system messages end up here. + %% TODO: system messages end up here. {ok, [], Session}. -spec handle_timeout(clientinfo(), _Timeout, session()) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 2986950e7..045a9acb3 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1775,9 +1775,9 @@ fields("session_persistence") -> )}, {"idle_poll_interval", sc( - duration(), + timeout_duration(), #{ - default => 100, + default => <<"100ms">>, desc => ?DESC(session_ds_idle_poll_interval) } )}, diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 922d7248f..e750b1251 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -233,6 +233,31 @@ t_session_subscription_iterators(Config) -> ), ok. +t_qos0(Config) -> + Sub = connect(<>, true, 30), + Pub = connect(<>, true, 0), + try + {ok, _, [1]} = emqtt:subscribe(Sub, <<"t/#">>, qos1), + + Messages = [ + {<<"t/1">>, <<"1">>, 0}, + {<<"t/1">>, <<"2">>, 1}, + {<<"t/1">>, <<"3">>, 0} + ], + [emqtt:publish(Pub, Topic, Payload, Qos) || {Topic, Payload, Qos} <- Messages], + ?assertMatch( + [ + #{qos := 0, topic := <<"t/1">>, payload := <<"1">>}, + #{qos := 1, topic := <<"t/1">>, payload := <<"2">>}, + #{qos := 0, topic := <<"t/1">>, payload := <<"3">>} + ], + receive_messages(3) + ) + after + emqtt:stop(Sub), + emqtt:stop(Pub) + end. + %% connect(ClientId, CleanStart, EI) -> @@ -273,7 +298,7 @@ consume(It) -> end. receive_messages(Count) -> - receive_messages(Count, []). + lists:reverse(receive_messages(Count, [])). receive_messages(0, Msgs) -> Msgs; @@ -307,4 +332,6 @@ get_mqtt_port(Node, Type) -> clear_db() -> ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), + mria:stop(), + ok = mnesia:delete_schema([node()]), ok. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index f7dbc828f..077df28d0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -34,7 +34,8 @@ drop_db/1, shard_leader/2, this_site/0, - set_leader/3 + set_leader/3, + print_status/0 ]). %% gen_server @@ -100,6 +101,35 @@ %% API funcions %%================================================================================ +-spec print_status() -> ok. +print_status() -> + io:format("THIS SITE:~n~s~n", [base64:encode(this_site())]), + io:format("~nSITES:~n", []), + Nodes = [node() | nodes()], + lists:foreach( + fun(#?NODE_TAB{site = Site, node = Node}) -> + Status = + case lists:member(Node, Nodes) of + true -> up; + false -> down + end, + io:format("~s ~p ~p~n", [base64:encode(Site), Node, Status]) + end, + eval_qlc(mnesia:table(?NODE_TAB)) + ), + io:format("~nSHARDS~n", []), + lists:foreach( + fun(#?SHARD_TAB{shard = {DB, Shard}, leader = Leader}) -> + Status = + case lists:member(Leader, Nodes) of + true -> up; + false -> down + end, + io:format("~p/~s ~p ~p~n", [DB, Shard, Leader, Status]) + end, + eval_qlc(mnesia:table(?SHARD_TAB)) + ). + -spec this_site() -> site(). this_site() -> persistent_term:get(?emqx_ds_builtin_site). @@ -297,6 +327,7 @@ ensure_site() -> ok; _ -> Site = crypto:strong_rand_bytes(8), + logger:notice("Creating a new site with ID=~s", [base64:encode(Site)]), ok = filelib:ensure_dir(Filename), {ok, FD} = file:open(Filename, [write]), io:format(FD, "~p.", [Site]),