diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 16b6db8a9..7494aca95 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -241,8 +241,10 @@ info(mqueue_dropped, _Session) -> %% seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S); info(awaiting_rel_max, #{props := Conf}) -> maps:get(max_awaiting_rel, Conf); -info(await_rel_timeout, #{props := Conf}) -> - maps:get(await_rel_timeout, Conf). +info(await_rel_timeout, #{props := _Conf}) -> + %% TODO: currently this setting is ignored: + %% maps:get(await_rel_timeout, Conf). + 0. -spec stats(session()) -> emqx_types:stats(). stats(Session) -> @@ -389,7 +391,7 @@ publish(_PacketId, Msg, Session) -> puback(_ClientInfo, PacketId, Session0) -> case update_seqno(puback, PacketId, Session0) of {ok, Msg, Session} -> - {ok, Msg, [], inc_send_quota(Session)}; + {ok, Msg, [], pull_now(Session)}; Error -> Error end. @@ -429,7 +431,7 @@ pubrel(_PacketId, Session = #{}) -> pubcomp(_ClientInfo, PacketId, Session0) -> case update_seqno(pubcomp, PacketId, Session0) of {ok, Msg, Session} -> - {ok, Msg, [], inc_send_quota(Session)}; + {ok, Msg, [], pull_now(Session)}; Error = {error, _} -> Error end. @@ -438,9 +440,13 @@ pubcomp(_ClientInfo, PacketId, Session0) -> -spec deliver(clientinfo(), [emqx_types:deliver()], session()) -> {ok, replies(), session()}. -deliver(_ClientInfo, _Delivers, Session) -> - %% TODO: system messages end up here. - {ok, [], Session}. +deliver(ClientInfo, Delivers, Session0) -> + %% Durable sessions still have to handle some transient messages. + %% For example, retainer sends messages to the session directly. + Session = lists:foldl( + fun(Msg, Acc) -> enqueue_transient(ClientInfo, Msg, Acc) end, Session0, Delivers + ), + {ok, [], pull_now(Session)}. -spec handle_timeout(clientinfo(), _Timeout, session()) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. @@ -481,8 +487,8 @@ handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session = #{s := S = emqx_persistent_session_ds_state:commit(S0), From ! Ref, {ok, [], Session#{s => S}}; -handle_timeout(_ClientInfo, expire_awaiting_rel, Session) -> - %% TODO: stub +handle_timeout(_ClientInfo, Timeout, Session) -> + ?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}), {ok, [], Session}. bump_last_alive(S0) -> @@ -871,6 +877,48 @@ process_batch( IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight ). +%%-------------------------------------------------------------------- +%% Transient messages +%%-------------------------------------------------------------------- + +enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos := UpgradeQoS}}) -> + %% TODO: Such messages won't be retransmitted, should the session + %% reconnect before transient messages are acked. + %% + %% Proper solution could look like this: session publishes + %% transient messages to a separate DS DB that serves as a queue, + %% then subscribes to a special system topic that contains the + %% queued messages. Since streams in this DB are exclusive to the + %% session, messages from the queue can be dropped as soon as they + %% are acked. + Subs = emqx_persistent_session_ds_state:get_subscriptions(S), + Msgs = [ + Msg + || SubMatch <- emqx_topic_gbt:matches(Msg0#message.topic, Subs, []), + Msg <- begin + #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs), + emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS) + end + ], + lists:foldl(fun do_enqueue_transient/2, Session, Msgs). + +do_enqueue_transient(Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}) -> + case Qos of + ?QOS_0 -> + S = S0, + Inflight = emqx_persistent_session_ds_inflight:push({undefined, Msg}, Inflight0); + QoS when QoS =:= ?QOS_1; QoS =:= ?QOS_2 -> + SeqNo = inc_seqno( + QoS, emqx_persistent_session_ds_state:get_seqno(?next(QoS), S0) + ), + S = emqx_persistent_session_ds_state:put_seqno(?next(QoS), SeqNo, S0), + Inflight = emqx_persistent_session_ds_inflight:push({SeqNo, Msg}, Inflight0) + end, + Session#{ + inflight => Inflight, + s => S + }. + %%-------------------------------------------------------------------- %% Buffer drain %%-------------------------------------------------------------------- @@ -907,11 +955,6 @@ ensure_timers(Session0) -> Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1), emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2). --spec inc_send_quota(session()) -> session(). -inc_send_quota(Session = #{inflight := Inflight0}) -> - Inflight = emqx_persistent_session_ds_inflight:inc_send_quota(Inflight0), - pull_now(Session#{inflight => Inflight}). - -spec pull_now(session()) -> session(). pull_now(Session) -> emqx_session:reset_timer(?TIMER_PULL, 0, Session). @@ -957,26 +1000,28 @@ try_get_live_session(ClientId) -> -spec update_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), session()} | {error, _}. -update_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) -> +update_seqno(Track, PacketId, Session = #{id := SessionId, s := S, inflight := Inflight0}) -> SeqNo = packet_id_to_seqno(PacketId, S), case Track of puback -> - QoS = ?QOS_1, - SeqNoKey = ?committed(?QOS_1); + SeqNoKey = ?committed(?QOS_1), + Result = emqx_persistent_session_ds_inflight:puback(SeqNo, Inflight0); pubrec -> - QoS = ?QOS_2, - SeqNoKey = ?rec; + SeqNoKey = ?rec, + Result = emqx_persistent_session_ds_inflight:pubrec(SeqNo, Inflight0); pubcomp -> - QoS = ?QOS_2, - SeqNoKey = ?committed(?QOS_2) + SeqNoKey = ?committed(?QOS_2), + Result = emqx_persistent_session_ds_inflight:pubcomp(SeqNo, Inflight0) end, - Current = emqx_persistent_session_ds_state:get_seqno(SeqNoKey, S), - case inc_seqno(QoS, Current) of - SeqNo -> + case Result of + {ok, Inflight} -> %% TODO: we pass a bogus message into the hook: Msg = emqx_message:make(SessionId, <<>>, <<>>), - {ok, Msg, Session#{s => emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S)}}; - Expected -> + {ok, Msg, Session#{ + s => emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S), + inflight => Inflight + }}; + {error, Expected} -> ?SLOG(warning, #{ msg => "out-of-order_commit", track => Track, diff --git a/apps/emqx/src/emqx_persistent_session_ds_inflight.erl b/apps/emqx/src/emqx_persistent_session_ds_inflight.erl index 1a603abde..21194c8c2 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_inflight.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_inflight.erl @@ -22,7 +22,9 @@ pop/1, n_buffered/2, n_inflight/1, - inc_send_quota/1, + puback/2, + pubrec/2, + pubcomp/2, receive_maximum/1 ]). @@ -34,13 +36,28 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). +-ifdef(TEST). +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-endif. + %%================================================================================ %% Type declarations %%================================================================================ +-type payload() :: + {emqx_persistent_session_ds:seqno() | undefined, emqx_types:message()} + | {pubrel, emqx_persistent_session_ds:seqno()}. + -record(inflight, { - queue :: queue:queue(), receive_maximum :: pos_integer(), + %% Main queue: + queue :: queue:queue(payload()), + %% Queues that are used to track sequence numbers of ack tracks: + puback_queue :: iqueue(), + pubrec_queue :: iqueue(), + pubcomp_queue :: iqueue(), + %% Counters: n_inflight = 0 :: non_neg_integer(), n_qos0 = 0 :: non_neg_integer(), n_qos1 = 0 :: non_neg_integer(), @@ -49,17 +66,19 @@ -type t() :: #inflight{}. --type payload() :: - {emqx_persistent_session_ds:seqno() | undefined, emqx_types:message()} - | {pubrel, emqx_persistent_session_ds:seqno()}. - %%================================================================================ %% API funcions %%================================================================================ -spec new(non_neg_integer()) -> t(). new(ReceiveMaximum) when ReceiveMaximum > 0 -> - #inflight{queue = queue:new(), receive_maximum = ReceiveMaximum}. + #inflight{ + receive_maximum = ReceiveMaximum, + queue = queue:new(), + puback_queue = iqueue_new(), + pubrec_queue = iqueue_new(), + pubcomp_queue = iqueue_new() + }. -spec receive_maximum(t()) -> pos_integer(). receive_maximum(#inflight{receive_maximum = ReceiveMaximum}) -> @@ -86,6 +105,9 @@ pop(Rec0) -> receive_maximum = ReceiveMaximum, n_inflight = NInflight, queue = Q0, + puback_queue = QAck, + pubrec_queue = QRec, + pubcomp_queue = QComp, n_qos0 = NQos0, n_qos1 = NQos1, n_qos2 = NQos2 @@ -96,17 +118,24 @@ pop(Rec0) -> case Payload of {pubrel, _} -> Rec0#inflight{queue = Q}; - {_, #message{qos = Qos}} -> + {SeqNo, #message{qos = Qos}} -> case Qos of ?QOS_0 -> Rec0#inflight{queue = Q, n_qos0 = NQos0 - 1}; ?QOS_1 -> Rec0#inflight{ - queue = Q, n_qos1 = NQos1 - 1, n_inflight = NInflight + 1 + queue = Q, + n_qos1 = NQos1 - 1, + n_inflight = NInflight + 1, + puback_queue = ipush(SeqNo, QAck) }; ?QOS_2 -> Rec0#inflight{ - queue = Q, n_qos2 = NQos2 - 1, n_inflight = NInflight + 1 + queue = Q, + n_qos2 = NQos2 - 1, + n_inflight = NInflight + 1, + pubrec_queue = ipush(SeqNo, QRec), + pubcomp_queue = ipush(SeqNo, QComp) } end end, @@ -129,12 +158,190 @@ n_buffered(all, #inflight{n_qos0 = NQos0, n_qos1 = NQos1, n_qos2 = NQos2}) -> n_inflight(#inflight{n_inflight = NInflight}) -> NInflight. +-spec puback(emqx_persistent_session_ds:seqno(), t()) -> {ok, t()} | {error, Expected} when + Expected :: emqx_persistent_session_ds:seqno() | undefined. +puback(SeqNo, Rec = #inflight{puback_queue = Q0, n_inflight = N}) -> + case ipop(Q0) of + {{value, SeqNo}, Q} -> + {ok, Rec#inflight{ + puback_queue = Q, + n_inflight = max(0, N - 1) + }}; + {{value, Expected}, _} -> + {error, Expected}; + _ -> + {error, undefined} + end. + +-spec pubcomp(emqx_persistent_session_ds:seqno(), t()) -> {ok, t()} | {error, Expected} when + Expected :: emqx_persistent_session_ds:seqno() | undefined. +pubcomp(SeqNo, Rec = #inflight{pubcomp_queue = Q0, n_inflight = N}) -> + case ipop(Q0) of + {{value, SeqNo}, Q} -> + {ok, Rec#inflight{ + pubcomp_queue = Q, + n_inflight = max(0, N - 1) + }}; + {{value, Expected}, _} -> + {error, Expected}; + _ -> + {error, undefined} + end. + +%% PUBREC doesn't affect inflight window: %% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Flow_Control --spec inc_send_quota(t()) -> t(). -inc_send_quota(Rec = #inflight{n_inflight = NInflight0}) -> - NInflight = max(NInflight0 - 1, 0), - Rec#inflight{n_inflight = NInflight}. +-spec pubrec(emqx_persistent_session_ds:seqno(), t()) -> {ok, t()} | {error, Expected} when + Expected :: emqx_persistent_session_ds:seqno() | undefined. +pubrec(SeqNo, Rec = #inflight{pubrec_queue = Q0}) -> + case ipop(Q0) of + {{value, SeqNo}, Q} -> + {ok, Rec#inflight{ + pubrec_queue = Q + }}; + {{value, Expected}, _} -> + {error, Expected}; + _ -> + {error, undefined} + end. %%================================================================================ %% Internal functions %%================================================================================ + +%%%% Interval queue: + +%% "Interval queue": a data structure that represents a queue of +%% monotonically increasing non-negative integers in a compact manner. +%% It is functionally equivalent to a `queue:queue(integer())'. +-record(iqueue, { + %% Head interval: + head = 0 :: integer(), + head_end = 0 :: integer(), + %% Intermediate ranges: + queue :: queue:queue({integer(), integer()}), + %% End interval: + tail = 0 :: integer(), + tail_end = 0 :: integer() +}). + +-type iqueue() :: #iqueue{}. + +iqueue_new() -> + #iqueue{ + queue = queue:new() + }. + +%% @doc Push a value into the interval queue: +-spec ipush(integer(), iqueue()) -> iqueue(). +ipush(Val, Q = #iqueue{tail_end = Val, head_end = Val}) -> + %% Optimization: head and tail intervals overlap, and the newly + %% inserted value extends both. Attach it to both intervals, to + %% avoid `queue:out' in `ipop': + Q#iqueue{ + tail_end = Val + 1, + head_end = Val + 1 + }; +ipush(Val, Q = #iqueue{tail_end = Val}) -> + %% Extend tail interval: + Q#iqueue{ + tail_end = Val + 1 + }; +ipush(Val, Q = #iqueue{tail = Tl, tail_end = End, queue = IQ0}) when is_number(Val), Val > End -> + IQ = queue:in({Tl, End}, IQ0), + %% Begin a new interval: + Q#iqueue{ + queue = IQ, + tail = Val, + tail_end = Val + 1 + }. + +-spec ipop(iqueue()) -> {{value, integer()}, iqueue()} | {empty, iqueue()}. +ipop(Q = #iqueue{head = Hd, head_end = HdEnd}) when Hd < HdEnd -> + %% Head interval is not empty. Consume a value from it: + {{value, Hd}, Q#iqueue{head = Hd + 1}}; +ipop(Q = #iqueue{head_end = End, tail_end = End}) -> + %% Head interval is fully consumed, and it's overlaps with the + %% tail interval. It means the queue is empty: + {empty, Q}; +ipop(Q = #iqueue{head = Hd0, tail = Tl, tail_end = TlEnd, queue = IQ0}) -> + %% Head interval is fully consumed, and it doesn't overlap with + %% the tail interval. Replace the head interval with the next + %% interval from the queue or with the tail interval: + case queue:out(IQ0) of + {{value, {Hd, HdEnd}}, IQ} -> + ipop(Q#iqueue{head = max(Hd0, Hd), head_end = HdEnd, queue = IQ}); + {empty, _} -> + ipop(Q#iqueue{head = max(Hd0, Tl), head_end = TlEnd}) + end. + +-ifdef(TEST). + +%% Test that behavior of iqueue is identical to that of a regular queue of integers: +iqueue_compat_test_() -> + Props = [iqueue_compat()], + Opts = [{numtests, 1000}, {to_file, user}, {max_size, 100}], + {timeout, 30, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}. + +%% Generate a sequence of pops and pushes with monotonically +%% increasing arguments, and verify replaying produces equivalent +%% results for the optimized and the reference implementation: +iqueue_compat() -> + ?FORALL( + Cmds, + iqueue_commands(), + begin + lists:foldl( + fun + ({push, N}, {IQ, Q, Acc}) -> + {ipush(N, IQ), queue:in(N, Q), [N | Acc]}; + (pop, {IQ0, Q0, Acc}) -> + {Ret, IQ} = ipop(IQ0), + {Expected, Q} = queue:out(Q0), + ?assertEqual( + Expected, + Ret, + #{ + sequence => lists:reverse(Acc), + q => queue:to_list(Q0), + iq0 => iqueue_print(IQ0), + iq => iqueue_print(IQ) + } + ), + {IQ, Q, [pop | Acc]} + end, + {iqueue_new(), queue:new(), []}, + Cmds + ), + true + end + ). + +iqueue_cmd() -> + oneof([ + pop, + {push, range(1, 3)} + ]). + +iqueue_commands() -> + ?LET( + Cmds, + list(iqueue_cmd()), + process_test_cmds(Cmds, 0) + ). + +process_test_cmds([], _) -> + []; +process_test_cmds([pop | Tl], Cnt) -> + [pop | process_test_cmds(Tl, Cnt)]; +process_test_cmds([{push, N} | Tl], Cnt0) -> + Cnt = Cnt0 + N, + [{push, Cnt} | process_test_cmds(Tl, Cnt)]. + +iqueue_print(I = #iqueue{head = Hd, head_end = HdEnd, queue = Q, tail = Tl, tail_end = TlEnd}) -> + #{ + hd => {Hd, HdEnd}, + tl => {Tl, TlEnd}, + q => queue:to_list(Q) + }. + +-endif. diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index f8ee11c08..a5c171f67 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -53,7 +53,7 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), - TCsNonGeneric = [t_choose_impl], + TCsNonGeneric = [t_choose_impl, t_transient], TCGroups = [{group, tcp}, {group, quic}, {group, ws}], [ {persistence_disabled, TCGroups}, @@ -265,7 +265,15 @@ messages(Topic, Payloads) -> messages(Topic, Payloads, ?QOS_2). messages(Topic, Payloads, QoS) -> - [#mqtt_msg{topic = Topic, payload = P, qos = QoS} || P <- Payloads]. + lists:map( + fun + (Bin) when is_binary(Bin) -> + #mqtt_msg{topic = Topic, payload = Bin, qos = QoS}; + (Msg = #mqtt_msg{}) -> + Msg#mqtt_msg{topic = Topic} + end, + Payloads + ). publish(Topic, Payload) -> publish(Topic, Payload, ?QOS_2). @@ -1103,6 +1111,93 @@ t_unsubscribe_replay(Config) -> ), ok = emqtt:disconnect(Sub1). +%% This testcase verifies that persistent sessions handle "transient" +%% mesages correctly. +%% +%% Transient messages are delivered to the channel directly, bypassing +%% the broker code that decides whether the messages should be +%% persisted or not, and therefore they are not persisted. +%% +%% `emqx_retainer' is an example of application that uses this +%% mechanism. +%% +%% This testcase creates the conditions when the transient messages +%% appear in the middle of the replay, to make sure the durable +%% session doesn't get confused and/or stuck if retained messages are +%% changed while the session was down. +t_transient(Config) -> + ConnFun = ?config(conn_fun, Config), + TopicPrefix = ?config(topic, Config), + ClientId = atom_to_binary(?FUNCTION_NAME), + ClientOpts = [ + {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 100}}, + {max_inflight, 100} + | Config + ], + Deliver = fun(Topic, Payload, QoS) -> + [Pid] = emqx_cm:lookup_channels(ClientId), + Msg = emqx_message:make(_From = <<"test">>, QoS, Topic, Payload), + Pid ! {deliver, Topic, Msg} + end, + Topic1 = <>, + Topic2 = <>, + Topic3 = <>, + %% 1. Start the client and subscribe to the topic: + {ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]), + ?assertMatch({ok, _}, emqtt:ConnFun(Sub)), + ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, <>, qos2)), + %% 2. Publish regular messages: + publish(Topic1, <<"1">>, ?QOS_1), + publish(Topic1, <<"2">>, ?QOS_2), + Msgs1 = receive_messages(2), + [#{payload := <<"1">>, packet_id := PI1}, #{payload := <<"2">>, packet_id := PI2}] = Msgs1, + %% 3. Publish and recieve transient messages: + Deliver(Topic2, <<"3">>, ?QOS_0), + Deliver(Topic2, <<"4">>, ?QOS_1), + Deliver(Topic2, <<"5">>, ?QOS_2), + Msgs2 = receive_messages(3), + ?assertMatch( + [ + #{payload := <<"3">>, qos := ?QOS_0}, + #{payload := <<"4">>, qos := ?QOS_1}, + #{payload := <<"5">>, qos := ?QOS_2} + ], + Msgs2 + ), + %% 4. Publish more regular messages: + publish(Topic3, <<"6">>, ?QOS_1), + publish(Topic3, <<"7">>, ?QOS_2), + Msgs3 = receive_messages(2), + [#{payload := <<"6">>, packet_id := PI6}, #{payload := <<"7">>, packet_id := PI7}] = Msgs3, + %% 5. Reconnect the client: + ok = emqtt:disconnect(Sub), + {ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]), + ?assertMatch({ok, _}, emqtt:ConnFun(Sub1)), + %% 6. Recieve the historic messages and check that their packet IDs didn't change: + %% Note: durable session currenty WON'T replay transient messages. + ProcessMessage = fun(#{payload := P, packet_id := ID}) -> {ID, P} end, + ?assertMatch( + #{ + Topic1 := [{PI1, <<"1">>}, {PI2, <<"2">>}], + Topic3 := [{PI6, <<"6">>}, {PI7, <<"7">>}] + }, + maps:groups_from_list(fun get_msgpub_topic/1, ProcessMessage, receive_messages(7, 5_000)) + ), + %% 7. Finish off by sending messages to all the topics to make + %% sure none of the streams are blocked: + [publish(T, <<"fin">>, ?QOS_2) || T <- [Topic1, Topic2, Topic3]], + ?assertMatch( + #{ + Topic1 := [<<"fin">>], + Topic2 := [<<"fin">>], + Topic3 := [<<"fin">>] + }, + get_topicwise_order(receive_messages(3)) + ), + ok = emqtt:disconnect(Sub1). + t_multiple_subscription_matches(Config) -> ConnFun = ?config(conn_fun, Config), Topic = ?config(topic, Config),