feat(ds): Support QoS 0

This commit is contained in:
ieQu1 2023-11-15 13:24:49 +01:00
parent 1ced8786fd
commit c5bb86db67
6 changed files with 87 additions and 22 deletions

View File

@ -301,7 +301,9 @@ update_expiry(Msg) ->
Msg.
%% @doc Message to PUBLISH Packet.
-spec to_packet(emqx_types:packet_id(), emqx_types:message()) ->
%%
%% When QoS=0 then packet id must be `undefined'
-spec to_packet(emqx_types:packet_id() | undefined, emqx_types:message()) ->
emqx_types:packet().
to_packet(
PacketId,

View File

@ -27,6 +27,7 @@
-export_type([inflight/0, seqno/0]).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_utils/include/emqx_message.hrl").
-include("emqx_persistent_session_ds.hrl").
-ifdef(TEST).
@ -176,9 +177,12 @@ fetch(SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 ->
#inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0,
ItBegin = get_last_iterator(DSStream, Ranges),
{ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N),
{Publishes, UntilSeqno} = publish(FirstSeqno, Messages),
case range_size(FirstSeqno, UntilSeqno) of
Size when Size > 0 ->
case Messages of
[] ->
fetch(SessionId, Inflight0, Streams, N, Acc);
_ ->
{Publishes, UntilSeqno} = publish(FirstSeqno, Messages, _PreserveQoS0 = true),
Size = range_size(FirstSeqno, UntilSeqno),
%% We need to preserve the iterator pointing to the beginning of the
%% range, so that we can replay it if needed.
Range0 = #ds_pubrange{
@ -197,9 +201,7 @@ fetch(SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 ->
next_seqno = UntilSeqno,
offset_ranges = Ranges ++ [Range]
},
fetch(SessionId, Inflight, Streams, N - Size, [Publishes | Acc]);
0 ->
fetch(SessionId, Inflight0, Streams, N, Acc)
fetch(SessionId, Inflight, Streams, N - Size, [Publishes | Acc])
end;
fetch(_SessionId, Inflight, _Streams, _N, Acc) ->
Publishes = lists:append(lists:reverse(Acc)),
@ -268,7 +270,7 @@ replay_range(
end,
MessagesReplay = [emqx_message:set_flag(dup, true, Msg) || Msg <- MessagesUnacked],
%% Asserting that range is consistent with the message storage state.
{Replies, Until} = publish(FirstUnacked, MessagesReplay),
{Replies, Until} = publish(FirstUnacked, MessagesReplay, _PreserveQoS0 = false),
%% Again, we need to keep the iterator pointing past the end of the
%% range, so that we can pick up where we left off.
Range = Range0#ds_pubrange{iterator = ItNext},
@ -276,15 +278,18 @@ replay_range(
replay_range(Range0 = #ds_pubrange{type = checkpoint}, _AckedUntil, Acc) ->
{Range0, Acc}.
publish(FirstSeqno, Messages) ->
lists:mapfoldl(
fun(Message, Seqno) ->
PacketId = seqno_to_packet_id(Seqno),
{{PacketId, Message}, next_seqno(Seqno)}
end,
FirstSeqno,
Messages
).
publish(FirstSeqNo, Messages, PreserveQos0) ->
do_publish(FirstSeqNo, Messages, PreserveQos0, []).
do_publish(SeqNo, [], _, Acc) ->
{lists:reverse(Acc), SeqNo};
do_publish(SeqNo, [#message{qos = 0} | Messages], false, Acc) ->
do_publish(SeqNo, Messages, false, Acc);
do_publish(SeqNo, [#message{qos = 0} = Message | Messages], true, Acc) ->
do_publish(SeqNo, Messages, true, [{undefined, Message} | Acc]);
do_publish(SeqNo, [Message | Messages], PreserveQos0, Acc) ->
PacketId = seqno_to_packet_id(SeqNo),
do_publish(next_seqno(SeqNo), Messages, PreserveQos0, [{PacketId, Message} | Acc]).
-spec preserve_range(ds_pubrange()) -> ok.
preserve_range(Range = #ds_pubrange{type = inflight}) ->

View File

@ -338,7 +338,7 @@ pubcomp(_ClientInfo, _PacketId, _Session = #{}) ->
-spec deliver(clientinfo(), [emqx_types:deliver()], session()) ->
{ok, replies(), session()}.
deliver(_ClientInfo, _Delivers, Session) ->
%% TODO: QoS0 and system messages end up here.
%% TODO: system messages end up here.
{ok, [], Session}.
-spec handle_timeout(clientinfo(), _Timeout, session()) ->

View File

@ -1775,9 +1775,9 @@ fields("session_persistence") ->
)},
{"idle_poll_interval",
sc(
duration(),
timeout_duration(),
#{
default => 100,
default => <<"100ms">>,
desc => ?DESC(session_ds_idle_poll_interval)
}
)},

View File

@ -233,6 +233,31 @@ t_session_subscription_iterators(Config) ->
),
ok.
t_qos0(Config) ->
Sub = connect(<<?MODULE_STRING "1">>, true, 30),
Pub = connect(<<?MODULE_STRING "2">>, true, 0),
try
{ok, _, [1]} = emqtt:subscribe(Sub, <<"t/#">>, qos1),
Messages = [
{<<"t/1">>, <<"1">>, 0},
{<<"t/1">>, <<"2">>, 1},
{<<"t/1">>, <<"3">>, 0}
],
[emqtt:publish(Pub, Topic, Payload, Qos) || {Topic, Payload, Qos} <- Messages],
?assertMatch(
[
#{qos := 0, topic := <<"t/1">>, payload := <<"1">>},
#{qos := 1, topic := <<"t/1">>, payload := <<"2">>},
#{qos := 0, topic := <<"t/1">>, payload := <<"3">>}
],
receive_messages(3)
)
after
emqtt:stop(Sub),
emqtt:stop(Pub)
end.
%%
connect(ClientId, CleanStart, EI) ->
@ -273,7 +298,7 @@ consume(It) ->
end.
receive_messages(Count) ->
receive_messages(Count, []).
lists:reverse(receive_messages(Count, [])).
receive_messages(0, Msgs) ->
Msgs;
@ -307,4 +332,6 @@ get_mqtt_port(Node, Type) ->
clear_db() ->
ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
mria:stop(),
ok = mnesia:delete_schema([node()]),
ok.

View File

@ -34,7 +34,8 @@
drop_db/1,
shard_leader/2,
this_site/0,
set_leader/3
set_leader/3,
print_status/0
]).
%% gen_server
@ -100,6 +101,35 @@
%% API funcions
%%================================================================================
-spec print_status() -> ok.
print_status() ->
io:format("THIS SITE:~n~s~n", [base64:encode(this_site())]),
io:format("~nSITES:~n", []),
Nodes = [node() | nodes()],
lists:foreach(
fun(#?NODE_TAB{site = Site, node = Node}) ->
Status =
case lists:member(Node, Nodes) of
true -> up;
false -> down
end,
io:format("~s ~p ~p~n", [base64:encode(Site), Node, Status])
end,
eval_qlc(mnesia:table(?NODE_TAB))
),
io:format("~nSHARDS~n", []),
lists:foreach(
fun(#?SHARD_TAB{shard = {DB, Shard}, leader = Leader}) ->
Status =
case lists:member(Leader, Nodes) of
true -> up;
false -> down
end,
io:format("~p/~s ~p ~p~n", [DB, Shard, Leader, Status])
end,
eval_qlc(mnesia:table(?SHARD_TAB))
).
-spec this_site() -> site().
this_site() ->
persistent_term:get(?emqx_ds_builtin_site).
@ -297,6 +327,7 @@ ensure_site() ->
ok;
_ ->
Site = crypto:strong_rand_bytes(8),
logger:notice("Creating a new site with ID=~s", [base64:encode(Site)]),
ok = filelib:ensure_dir(Filename),
{ok, FD} = file:open(Filename, [write]),
io:format(FD, "~p.", [Site]),