feat(sessds): Add support for the retainer

Note: this is currently not ideal. Retained messages won't be
redelivered.
This commit is contained in:
ieQu1 2024-02-08 00:53:53 +01:00
parent 94254ec05b
commit c781240459
2 changed files with 158 additions and 9 deletions

View File

@ -241,8 +241,10 @@ info(mqueue_dropped, _Session) ->
%% seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S); %% seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S);
info(awaiting_rel_max, #{props := Conf}) -> info(awaiting_rel_max, #{props := Conf}) ->
maps:get(max_awaiting_rel, Conf); maps:get(max_awaiting_rel, Conf);
info(await_rel_timeout, #{props := Conf}) -> info(await_rel_timeout, #{props := _Conf}) ->
maps:get(await_rel_timeout, Conf). %% TODO: currently this setting is ignored:
%% maps:get(await_rel_timeout, Conf).
0.
-spec stats(session()) -> emqx_types:stats(). -spec stats(session()) -> emqx_types:stats().
stats(Session) -> stats(Session) ->
@ -438,9 +440,13 @@ pubcomp(_ClientInfo, PacketId, Session0) ->
-spec deliver(clientinfo(), [emqx_types:deliver()], session()) -> -spec deliver(clientinfo(), [emqx_types:deliver()], session()) ->
{ok, replies(), session()}. {ok, replies(), session()}.
deliver(_ClientInfo, _Delivers, Session) -> deliver(ClientInfo, Delivers, Session0) ->
%% TODO: system messages end up here. %% Durable sessions still have to handle some transient messages.
{ok, [], Session}. %% For example, retainer sends messages to the session directly.
Session = lists:foldl(
fun(Msg, Acc) -> enqueue_transient(ClientInfo, Msg, Acc) end, Session0, Delivers
),
{ok, [], pull_now(Session)}.
-spec handle_timeout(clientinfo(), _Timeout, session()) -> -spec handle_timeout(clientinfo(), _Timeout, session()) ->
{ok, replies(), session()} | {ok, replies(), timeout(), session()}. {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
@ -481,8 +487,8 @@ handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session = #{s :=
S = emqx_persistent_session_ds_state:commit(S0), S = emqx_persistent_session_ds_state:commit(S0),
From ! Ref, From ! Ref,
{ok, [], Session#{s => S}}; {ok, [], Session#{s => S}};
handle_timeout(_ClientInfo, expire_awaiting_rel, Session) -> handle_timeout(_ClientInfo, Timeout, Session) ->
%% TODO: stub ?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}),
{ok, [], Session}. {ok, [], Session}.
bump_last_alive(S0) -> bump_last_alive(S0) ->
@ -871,6 +877,54 @@ process_batch(
IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight
). ).
%%--------------------------------------------------------------------
%% Transient messages
%%--------------------------------------------------------------------
enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos := UpgradeQoS}}) ->
%% TODO: Such messages won't be retransmitted, should the session
%% reconnect before transient messages are acked.
%%
%% Proper solution could look like this: session publishes
%% transient messages to a separate DS DB that serves as a queue,
%% then subscribes to a special system topic that contains the
%% queued messages. Since streams in this DB are exclusive to the
%% session, messages from the queue can be dropped as soon as they
%% are acked.
Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
Msgs = [
Msg
|| SubMatch <- emqx_topic_gbt:matches(Msg0#message.topic, Subs, []),
Msg <- begin
#{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs),
emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS)
end
],
lists:foldl(fun do_enqueue_transient/2, Session, Msgs).
do_enqueue_transient(Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}) ->
case Qos of
?QOS_0 ->
S = S0,
Inflight = emqx_persistent_session_ds_inflight:push({undefined, Msg}, Inflight0);
?QOS_1 ->
SeqNo = inc_seqno(
?QOS_1, emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0)
),
S = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_1), SeqNo, S0),
Inflight = emqx_persistent_session_ds_inflight:push({SeqNo, Msg}, Inflight0);
?QOS_2 ->
SeqNo = inc_seqno(
?QOS_2, emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0)
),
S = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_2), SeqNo, S0),
Inflight = emqx_persistent_session_ds_inflight:push({SeqNo, Msg}, Inflight0)
end,
Session#{
inflight => Inflight,
s => S
}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Buffer drain %% Buffer drain
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -53,7 +53,7 @@ all() ->
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
TCsNonGeneric = [t_choose_impl], TCsNonGeneric = [t_choose_impl, t_transient],
TCGroups = [{group, tcp}, {group, quic}, {group, ws}], TCGroups = [{group, tcp}, {group, quic}, {group, ws}],
[ [
{persistence_disabled, TCGroups}, {persistence_disabled, TCGroups},
@ -265,7 +265,15 @@ messages(Topic, Payloads) ->
messages(Topic, Payloads, ?QOS_2). messages(Topic, Payloads, ?QOS_2).
messages(Topic, Payloads, QoS) -> messages(Topic, Payloads, QoS) ->
[#mqtt_msg{topic = Topic, payload = P, qos = QoS} || P <- Payloads]. lists:map(
fun
(Bin) when is_binary(Bin) ->
#mqtt_msg{topic = Topic, payload = Bin, qos = QoS};
(Msg = #mqtt_msg{}) ->
Msg#mqtt_msg{topic = Topic}
end,
Payloads
).
publish(Topic, Payload) -> publish(Topic, Payload) ->
publish(Topic, Payload, ?QOS_2). publish(Topic, Payload, ?QOS_2).
@ -1103,6 +1111,93 @@ t_unsubscribe_replay(Config) ->
), ),
ok = emqtt:disconnect(Sub1). ok = emqtt:disconnect(Sub1).
%% This testcase verifies that persistent sessions handle "transient"
%% mesages correctly.
%%
%% Transient messages are delivered to the channel directly, bypassing
%% the broker code that decides whether the messages should be
%% persisted or not, and therefore they are not persisted.
%%
%% `emqx_retainer' is an example of application that uses this
%% mechanism.
%%
%% This testcase creates the conditions when the transient messages
%% appear in the middle of the replay, to make sure the durable
%% session doesn't get confused and/or stuck if retained messages are
%% changed while the session was down.
t_transient(Config) ->
ConnFun = ?config(conn_fun, Config),
TopicPrefix = ?config(topic, Config),
ClientId = atom_to_binary(?FUNCTION_NAME),
ClientOpts = [
{proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 100}},
{max_inflight, 100}
| Config
],
Deliver = fun(Topic, Payload, QoS) ->
[Pid] = emqx_cm:lookup_channels(ClientId),
Msg = emqx_message:make(_From = <<"test">>, QoS, Topic, Payload),
Pid ! {deliver, Topic, Msg}
end,
Topic1 = <<TopicPrefix/binary, "/1">>,
Topic2 = <<TopicPrefix/binary, "/2">>,
Topic3 = <<TopicPrefix/binary, "/3">>,
%% 1. Start the client and subscribe to the topic:
{ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]),
?assertMatch({ok, _}, emqtt:ConnFun(Sub)),
?assertMatch({ok, _, _}, emqtt:subscribe(Sub, <<TopicPrefix/binary, "/#">>, qos2)),
%% 2. Publish regular messages:
publish(Topic1, <<"1">>, ?QOS_1),
publish(Topic1, <<"2">>, ?QOS_2),
Msgs1 = receive_messages(2),
[#{payload := <<"1">>, packet_id := PI1}, #{payload := <<"2">>, packet_id := PI2}] = Msgs1,
%% 3. Publish and recieve transient messages:
Deliver(Topic2, <<"3">>, ?QOS_0),
Deliver(Topic2, <<"4">>, ?QOS_1),
Deliver(Topic2, <<"5">>, ?QOS_2),
Msgs2 = receive_messages(3),
?assertMatch(
[
#{payload := <<"3">>, qos := ?QOS_0},
#{payload := <<"4">>, qos := ?QOS_1},
#{payload := <<"5">>, qos := ?QOS_2}
],
Msgs2
),
%% 4. Publish more regular messages:
publish(Topic3, <<"6">>, ?QOS_1),
publish(Topic3, <<"7">>, ?QOS_2),
Msgs3 = receive_messages(2),
[#{payload := <<"6">>, packet_id := PI6}, #{payload := <<"7">>, packet_id := PI7}] = Msgs3,
%% 5. Reconnect the client:
ok = emqtt:disconnect(Sub),
{ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]),
?assertMatch({ok, _}, emqtt:ConnFun(Sub1)),
%% 6. Recieve the historic messages and check that their packet IDs didn't change:
%% Note: durable session currenty WON'T replay transient messages.
ProcessMessage = fun(#{payload := P, packet_id := ID}) -> {ID, P} end,
?assertMatch(
#{
Topic1 := [{PI1, <<"1">>}, {PI2, <<"2">>}],
Topic3 := [{PI6, <<"6">>}, {PI7, <<"7">>}]
},
maps:groups_from_list(fun get_msgpub_topic/1, ProcessMessage, receive_messages(7, 5_000))
),
%% 7. Finish off by sending messages to all the topics to make
%% sure none of the streams are blocked:
[publish(T, <<"fin">>, ?QOS_2) || T <- [Topic1, Topic2, Topic3]],
?assertMatch(
#{
Topic1 := [<<"fin">>],
Topic2 := [<<"fin">>],
Topic3 := [<<"fin">>]
},
get_topicwise_order(receive_messages(3))
),
ok = emqtt:disconnect(Sub1).
t_multiple_subscription_matches(Config) -> t_multiple_subscription_matches(Config) ->
ConnFun = ?config(conn_fun, Config), ConnFun = ?config(conn_fun, Config),
Topic = ?config(topic, Config), Topic = ?config(topic, Config),