diff --git a/apps/emqx/include/emqx_session_mem.hrl b/apps/emqx/include/emqx_session_mem.hrl index bacb28bfb..9874a9018 100644 --- a/apps/emqx/include/emqx_session_mem.hrl +++ b/apps/emqx/include/emqx_session_mem.hrl @@ -49,10 +49,7 @@ %% Awaiting PUBREL Timeout (Unit: millisecond) await_rel_timeout :: timeout(), %% Created at - created_at :: pos_integer(), - - %% Timers - timers :: #{_Name => reference()} + created_at :: pos_integer() }). -endif. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index d6b6f0698..2b89170b9 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -130,6 +130,10 @@ -define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}). +-define(IS_COMMON_SESSION_TIMER(N), + ((N == retry_delivery) orelse (N == expire_awaiting_rel)) +). + -define(LIMITER_ROUTING, message_routing). -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}). @@ -723,8 +727,9 @@ do_publish( {ok, PubRes, NSession} -> RC = pubrec_reason_code(PubRes), NChannel0 = Channel#channel{session = NSession}, - NChannel1 = ensure_quota(PubRes, NChannel0), - handle_out(pubrec, {PacketId, RC}, NChannel1); + NChannel1 = ensure_timer(expire_awaiting_rel, NChannel0), + NChannel2 = ensure_quota(PubRes, NChannel1), + handle_out(pubrec, {PacketId, RC}, NChannel2); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> ok = emqx_metrics:inc('packets.publish.inuse'), handle_out(pubrec, {PacketId, RC}, Channel); @@ -953,7 +958,7 @@ handle_deliver( {ok, Channel#channel{session = NSession}}; {ok, Publishes, NSession} -> NChannel = Channel#channel{session = NSession}, - handle_out(publish, Publishes, NChannel) + handle_out(publish, Publishes, ensure_timer(retry_delivery, NChannel)) end. %% Nack delivers from shared subscription @@ -1067,6 +1072,10 @@ return_connack(AckPacket, Channel) -> }, {Packets, NChannel2} = do_deliver(Publishes, NChannel1), Outgoing = [?REPLY_OUTGOING(Packets) || length(Packets) > 0], + % NOTE + % Session timers are not restored here, so there's a tiny chance that + % the session becomes stuck, when it already has no place to track new + % messages. {ok, Replies ++ Outgoing, NChannel2} end. @@ -1307,14 +1316,27 @@ handle_timeout( end; handle_timeout( _TRef, - {emqx_session, Name}, + Name, + Channel = #channel{conn_state = disconnected} +) when ?IS_COMMON_SESSION_TIMER(Name) -> + {ok, Channel}; +handle_timeout( + _TRef, + Name, Channel = #channel{session = Session, clientinfo = ClientInfo} -) -> +) when ?IS_COMMON_SESSION_TIMER(Name) -> + % NOTE + % Responsibility for these timers is smeared across both this module and the + % `emqx_session` module: the latter holds configured timer intervals, and is + % responsible for the actual timeout logic. Yet they are managed here, since + % they are kind of common to all session implementations. case emqx_session:handle_timeout(ClientInfo, Name, Session) of - {ok, [], NSession} -> - {ok, Channel#channel{session = NSession}}; - {ok, Replies, NSession} -> - handle_out(publish, Replies, Channel#channel{session = NSession}) + {ok, Publishes, NSession} -> + NChannel = Channel#channel{session = NSession}, + handle_out(publish, Publishes, clean_timer(Name, NChannel)); + {ok, Publishes, Timeout, NSession} -> + NChannel = Channel#channel{session = NSession}, + handle_out(publish, Publishes, reset_timer(Name, Timeout, NChannel)) end; handle_timeout(_TRef, expire_session, Channel) -> shutdown(expired, Channel); @@ -1369,11 +1391,18 @@ ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> reset_timer(Name, Channel) -> ensure_timer(Name, clean_timer(Name, Channel)). +reset_timer(Name, Time, Channel) -> + ensure_timer(Name, Time, clean_timer(Name, Channel)). + clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. interval(keepalive, #channel{keepalive = KeepAlive}) -> emqx_keepalive:info(interval, KeepAlive); +interval(retry_delivery, #channel{session = Session}) -> + emqx_session:info(retry_interval, Session); +interval(expire_awaiting_rel, #channel{session = Session}) -> + emqx_session:info(await_rel_timeout, Session); interval(expire_session, #channel{conninfo = ConnInfo}) -> maps:get(expiry_interval, ConnInfo); interval(will_message, #channel{will_msg = WillMsg}) -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 6e0884d95..0572009f9 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -95,13 +95,6 @@ % Foreign session implementations -export([enrich_delivers/3]). -% Timers --export([ - ensure_timer/3, - reset_timer/3, - cancel_timer/2 -]). - % Utilities -export([should_discard/1]). @@ -113,7 +106,8 @@ conf/0, conninfo/0, reply/0, - replies/0 + replies/0, + common_timer_name/0 ]). -type session_id() :: _TODO. @@ -127,6 +121,8 @@ expiry_interval => non_neg_integer() }. +-type common_timer_name() :: retry_delivery | expire_awaiting_rel. + -type message() :: emqx_types:message(). -type publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}. -type pubrel() :: {pubrel, emqx_types:packet_id()}. @@ -415,33 +411,14 @@ enrich_subopts(_Opt, _V, Msg, _) -> %% Timeouts %%-------------------------------------------------------------------- --spec handle_timeout(clientinfo(), atom(), t()) -> - {ok, t()} | {ok, replies(), t()}. +-spec handle_timeout(clientinfo(), common_timer_name(), t()) -> + {ok, replies(), t()} + | {ok, replies(), timeout(), t()}. handle_timeout(ClientInfo, Timer, Session) -> ?IMPL(Session):handle_timeout(ClientInfo, Timer, Session). %%-------------------------------------------------------------------- -ensure_timer(Name, _Time, Timers = #{}) when is_map_key(Name, Timers) -> - Timers; -ensure_timer(Name, Time, Timers = #{}) when Time > 0 -> - TRef = emqx_utils:start_timer(Time, {?MODULE, Name}), - Timers#{Name => TRef}. - -reset_timer(Name, Time, Channel) -> - ensure_timer(Name, Time, cancel_timer(Name, Channel)). - -cancel_timer(Name, Timers) -> - case maps:take(Name, Timers) of - {TRef, NTimers} -> - ok = emqx_utils:cancel_timer(TRef), - NTimers; - error -> - Timers - end. - -%%-------------------------------------------------------------------- - -spec disconnect(clientinfo(), t()) -> {idle | shutdown, t()}. disconnect(_ClientInfo, Session) -> diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index f8276a369..9a59f960a 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -164,7 +164,6 @@ create(#{zone := Zone, clientid := ClientId}, #{expiry_interval := EI}, Conf) -> mqueue = emqx_mqueue:init(QueueOpts), next_pkt_id = 1, awaiting_rel = #{}, - timers = #{}, max_subscriptions = maps:get(max_subscriptions, Conf), max_awaiting_rel = maps:get(max_awaiting_rel, Conf), upgrade_qos = maps:get(upgrade_qos, Conf), @@ -339,7 +338,7 @@ get_subscription(Topic, #session{subscriptions = Subs}) -> publish( PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}, - Session = #session{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout} + Session = #session{awaiting_rel = AwaitingRel} ) -> case is_awaiting_full(Session) of false -> @@ -347,8 +346,7 @@ publish( false -> Results = emqx_broker:publish(Msg), AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel), - Session1 = ensure_timer(expire_awaiting_rel, Timeout, Session), - {ok, Results, Session1#session{awaiting_rel = AwaitingRel1}}; + {ok, Results, Session#session{awaiting_rel = AwaitingRel1}}; true -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} end; @@ -417,7 +415,7 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> case maps:take(PacketId, AwaitingRel) of {_Ts, AwaitingRel1} -> NSession = Session#session{awaiting_rel = AwaitingRel1}, - {ok, reconcile_expire_timer(NSession)}; + {ok, NSession}; error -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end. @@ -449,7 +447,7 @@ pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) -> case emqx_mqueue:is_empty(Q) of true -> - {ok, [], reconcile_retry_timer(Session)}; + {ok, [], Session}; false -> {Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q), do_deliver(ClientInfo, Msgs, [], Session#session{mqueue = Q1}) @@ -484,7 +482,7 @@ deliver(ClientInfo, Msgs, Session) -> do_deliver(ClientInfo, Msgs, [], Session). do_deliver(_ClientInfo, [], Publishes, Session) -> - {ok, lists:reverse(Publishes), reconcile_retry_timer(Session)}; + {ok, lists:reverse(Publishes), Session}; do_deliver(ClientInfo, [Msg | More], Acc, Session) -> case deliver_msg(ClientInfo, Msg, Session) of {ok, [], Session1} -> @@ -557,12 +555,13 @@ mark_begin_deliver(Msg) -> %% Timeouts %%-------------------------------------------------------------------- --spec handle_timeout(clientinfo(), atom(), session()) -> - {ok, replies(), session()}. -handle_timeout(ClientInfo, retry_delivery = Name, Session) -> - retry(ClientInfo, clean_timer(Name, Session)); -handle_timeout(ClientInfo, expire_awaiting_rel = Name, Session) -> - expire(ClientInfo, clean_timer(Name, Session)). +%% @doc Handle timeout events +-spec handle_timeout(clientinfo(), emqx_session:common_timer_name(), session()) -> + {ok, replies(), session()} | {ok, replies(), timeout(), session()}. +handle_timeout(ClientInfo, retry_delivery, Session) -> + retry(ClientInfo, Session); +handle_timeout(ClientInfo, expire_awaiting_rel, Session) -> + expire(ClientInfo, Session). %%-------------------------------------------------------------------- %% Retry Delivery @@ -585,8 +584,8 @@ retry(ClientInfo, Session = #session{inflight = Inflight}) -> ) end. -retry_delivery(_ClientInfo, [], Acc, _Now, Session) -> - {ok, lists:reverse(Acc), reconcile_retry_timer(Session)}; +retry_delivery(_ClientInfo, [], Acc, _, Session = #session{retry_interval = Interval}) -> + {ok, lists:reverse(Acc), Interval, Session}; retry_delivery( ClientInfo, [{PacketId, #inflight_data{timestamp = Ts} = Data} | More], @@ -599,8 +598,7 @@ retry_delivery( {Acc1, Inflight1} = do_retry_delivery(ClientInfo, PacketId, Data, Now, Acc, Inflight), retry_delivery(ClientInfo, More, Acc1, Now, Session#session{inflight = Inflight1}); false -> - NSession = ensure_timer(retry_delivery, Interval - max(0, Age), Session), - {ok, lists:reverse(Acc), NSession} + {ok, lists:reverse(Acc), Interval - max(0, Age), Session} end. do_retry_delivery( @@ -638,8 +636,7 @@ expire(ClientInfo, Session = #session{awaiting_rel = AwaitingRel}) -> {ok, [], Session}; _ -> Now = erlang:system_time(millisecond), - NSession = expire_awaiting_rel(ClientInfo, Now, Session), - {ok, [], reconcile_expire_timer(NSession)} + expire_awaiting_rel(ClientInfo, Now, Session) end. expire_awaiting_rel( @@ -651,7 +648,11 @@ expire_awaiting_rel( AwaitingRel1 = maps:filter(NotExpired, AwaitingRel), ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1), _ = emqx_session_events:handle_event(ClientInfo, {expired_rel, ExpiredCnt}), - Session#session{awaiting_rel = AwaitingRel1}. + Session1 = Session#session{awaiting_rel = AwaitingRel1}, + case maps:size(AwaitingRel1) of + 0 -> {ok, [], Session1}; + _ -> {ok, [], Timeout, Session1} + end. %%-------------------------------------------------------------------- %% Takeover, Resume and Replay @@ -673,7 +674,7 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = ), ok = emqx_metrics:inc('session.resumed'), ok = emqx_hooks:run('session.resumed', [ClientInfo, emqx_session:info(Session)]), - Session#session{timers = #{}}. + Session. -spec replay(emqx_types:clientinfo(), [emqx_types:message()], session()) -> {ok, replies(), session()}. @@ -705,7 +706,7 @@ replay(ClientInfo, Session) -> emqx_inflight:to_list(Session#session.inflight) ), {ok, More, Session1} = dequeue(ClientInfo, Session), - {ok, append(PubsResend, More), reconcile_expire_timer(Session1)}. + {ok, append(PubsResend, More), Session1}. append(L1, []) -> L1; append(L1, L2) -> L1 ++ L2. @@ -715,7 +716,7 @@ append(L1, L2) -> L1 ++ L2. -spec disconnect(session()) -> {idle, session()}. disconnect(Session = #session{}) -> % TODO: isolate expiry timer / timeout handling here? - {idle, cancel_timers(Session)}. + {idle, Session}. -spec terminate(Reason :: term(), session()) -> ok. terminate(Reason, Session) -> @@ -780,40 +781,6 @@ with_ts(Msg) -> age(Now, Ts) -> Now - Ts. -%%-------------------------------------------------------------------- - -reconcile_retry_timer(Session = #session{inflight = Inflight}) -> - case emqx_inflight:is_empty(Inflight) of - false -> - ensure_timer(retry_delivery, Session#session.retry_interval, Session); - true -> - cancel_timer(retry_delivery, Session) - end. - -reconcile_expire_timer(Session = #session{awaiting_rel = AwaitingRel}) -> - case maps:size(AwaitingRel) of - 0 -> - cancel_timer(expire_awaiting_rel, Session); - _ -> - ensure_timer(expire_awaiting_rel, Session#session.await_rel_timeout, Session) - end. - -%%-------------------------------------------------------------------- - -ensure_timer(Name, Timeout, Session = #session{timers = Timers}) -> - NTimers = emqx_session:ensure_timer(Name, Timeout, Timers), - Session#session{timers = NTimers}. - -clean_timer(Name, Session = #session{timers = Timers}) -> - Session#session{timers = maps:remove(Name, Timers)}. - -cancel_timers(Session = #session{timers = Timers}) -> - ok = maps:foreach(fun(_Name, TRef) -> emqx_utils:cancel_timer(TRef) end, Timers), - Session#session{timers = #{}}. - -cancel_timer(Name, Session = #session{timers = Timers}) -> - Session#session{timers = emqx_session:cancel_timer(Name, Timers)}. - %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 20b123b6c..0a66b3628 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -147,8 +147,7 @@ sessioninfo() -> awaiting_rel = awaiting_rel(), max_awaiting_rel = non_neg_integer(), await_rel_timeout = safty_timeout(), - created_at = timestamp(), - timers = #{} + created_at = timestamp() }, emqx_session:info(Session) ). diff --git a/apps/emqx/test/emqx_session_mem_SUITE.erl b/apps/emqx/test/emqx_session_mem_SUITE.erl index 514bbbf9c..a906c15b8 100644 --- a/apps/emqx/test/emqx_session_mem_SUITE.erl +++ b/apps/emqx/test/emqx_session_mem_SUITE.erl @@ -20,7 +20,6 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx_mqtt.hrl"). --include_lib("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -38,13 +37,6 @@ all() -> emqx_common_test_helpers:all(?MODULE). %% CT callbacks %%-------------------------------------------------------------------- --define(assertTimerSet(NAME, TIMEOUT), - ?assertReceive({timer, NAME, TIMEOUT} when is_integer(TIMEOUT)) -). --define(assertTimerCancel(NAME), - ?assertReceive({timer, NAME, cancel}) -). - init_per_suite(Config) -> ok = meck:new( [emqx_broker, emqx_hooks, emqx_session], @@ -65,26 +57,6 @@ end_per_suite(Config) -> ok = emqx_cth_suite:stop(?config(suite_apps, Config)), meck:unload([emqx_broker, emqx_hooks]). -init_per_testcase(_TestCase, Config) -> - Pid = self(), - ok = meck:expect( - emqx_session, ensure_timer, fun(Name, Timeout, Timers) -> - _ = Pid ! {timer, Name, Timeout}, - meck:passthrough([Name, Timeout, Timers]) - end - ), - ok = meck:expect( - emqx_session, cancel_timer, fun(Name, Timers) -> - _ = Pid ! {timer, Name, cancel}, - meck:passthrough([Name, Timers]) - end - ), - Config. - -end_per_testcase(_TestCase, Config) -> - ok = meck:delete(emqx_session, ensure_timer, 3), - Config. - %%-------------------------------------------------------------------- %% Test cases for session init %%-------------------------------------------------------------------- @@ -191,7 +163,6 @@ t_publish_qos2(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>), {ok, [], Session} = emqx_session_mem:publish(1, Msg, session()), - ?assertTimerSet(expire_awaiting_rel, _Timeout), ?assertEqual(1, emqx_session_mem:info(awaiting_rel_cnt, Session)), {ok, Session1} = emqx_session_mem:pubrel(1, Session), ?assertEqual(0, emqx_session_mem:info(awaiting_rel_cnt, Session1)), @@ -266,7 +237,6 @@ t_puback_with_dequeue(_) -> {_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})), Session = session(#{inflight => Inflight, mqueue => Q}), {ok, Msg1, [{_, Msg3}], Session1} = emqx_session_mem:puback(clientinfo(), 1, Session), - ?assertTimerSet(retry_delivery, _Timeout), ?assertEqual(1, emqx_session_mem:info(inflight_cnt, Session1)), ?assertEqual(0, emqx_session_mem:info(mqueue_len, Session1)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)). @@ -335,7 +305,6 @@ t_dequeue(_) -> Session1 = emqx_session_mem:enqueue(clientinfo(), Msgs, Session), {ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} = emqx_session_mem:dequeue(clientinfo(), Session1), - ?assertTimerSet(retry_delivery, _Timeout), ?assertEqual(0, emqx_session_mem:info(mqueue_len, Session2)), ?assertEqual(2, emqx_session_mem:info(inflight_cnt, Session2)), ?assertEqual(<<"t0">>, emqx_message:topic(Msg0)), @@ -349,7 +318,6 @@ t_deliver_qos0(_) -> Deliveries = enrich([delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]], Session1), {ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} = emqx_session_mem:deliver(clientinfo(), Deliveries, Session1), - ?assertTimerCancel(retry_delivery), ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)). @@ -361,7 +329,6 @@ t_deliver_qos1(_) -> Delivers = enrich([delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]], Session), {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session_mem:deliver(clientinfo(), Delivers, Session), - ?assertTimerSet(retry_delivery, _Timeout), ?assertEqual(2, emqx_session_mem:info(inflight_cnt, Session1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)), @@ -378,7 +345,6 @@ t_deliver_qos2(_) -> Delivers = enrich([delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)], Session), {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session_mem:deliver(clientinfo(), Delivers, Session), - ?assertTimerSet(retry_delivery, _Timeout), ?assertEqual(2, emqx_session_mem:info(inflight_cnt, Session1)), ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)). @@ -390,7 +356,6 @@ t_deliver_one_msg(_) -> enrich(delivery(?QOS_1, <<"t1">>), Session), Session ), - ?assertTimerSet(retry_delivery, _Timeout), ?assertEqual(1, emqx_session_mem:info(inflight_cnt, Session1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg)). @@ -399,13 +364,11 @@ t_deliver_when_inflight_is_full(_) -> Delivers = enrich([delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session), {ok, Publishes, Session1} = emqx_session_mem:deliver(clientinfo(), Delivers, Session), - {timer, _, Timeout} = ?assertTimerSet(retry_delivery, _Timeout), ?assertEqual(1, length(Publishes)), ?assertEqual(1, emqx_session_mem:info(inflight_cnt, Session1)), ?assertEqual(1, emqx_session_mem:info(mqueue_len, Session1)), {ok, Msg1, [{2, Msg2}], Session2} = emqx_session_mem:puback(clientinfo(), 1, Session1), - ?assertTimerSet(retry_delivery, Timeout), ?assertEqual(1, emqx_session_mem:info(inflight_cnt, Session2)), ?assertEqual(0, emqx_session_mem:info(mqueue_len, Session2)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), @@ -456,14 +419,16 @@ t_retry(_) -> RetryIntervalMs = 100, Session = session(#{retry_interval => RetryIntervalMs}), Delivers = enrich([delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session), - {ok, Pubs, Session1} = emqx_session_mem:deliver(clientinfo(), Delivers, Session), - {timer, Name, _} = ?assertTimerSet(_Name, RetryIntervalMs), + {ok, Pubs, Session1} = emqx_session_mem:deliver( + clientinfo(), Delivers, Session + ), %% 0.2s ElapseMs = 200, ok = timer:sleep(ElapseMs), Msgs1 = [{I, with_ts(wait_ack, emqx_message:set_flag(dup, Msg))} || {I, Msg} <- Pubs], - {ok, Msgs1T, Session2} = emqx_session_mem:handle_timeout(clientinfo(), Name, Session1), - ?assertTimerSet(Name, RetryIntervalMs), + {ok, Msgs1T, RetryIntervalMs, Session2} = emqx_session_mem:handle_timeout( + clientinfo(), retry_delivery, Session1 + ), ?assertEqual(inflight_data_to_msg(Msgs1), remove_deliver_flag(Msgs1T)), ?assertEqual(2, emqx_session_mem:info(inflight_cnt, Session2)). @@ -504,17 +469,20 @@ t_replay(_) -> ?assertEqual(6, emqx_session_mem:info(inflight_cnt, Session3)). t_expire_awaiting_rel(_) -> - {ok, [], Session} = emqx_session_mem:expire(clientinfo(), session()), - Timeout = emqx_session_mem:info(await_rel_timeout, Session), - Session1 = emqx_session_mem:set_field(awaiting_rel, #{1 => Ts = ts(millisecond)}, Session), - {ok, [], Session2} = emqx_session_mem:expire(clientinfo(), Session1), - ?assertTimerSet(expire_awaiting_rel, Timeout), - ?assertEqual(#{1 => Ts}, emqx_session_mem:info(awaiting_rel, Session2)). + Now = ts(millisecond), + AwaitRelTimeout = 10000, + Session = session(#{await_rel_timeout => AwaitRelTimeout}), + Ts1 = Now - 1000, + Ts2 = Now - 20000, + {ok, [], Session1} = emqx_session_mem:expire(clientinfo(), Session), + Session2 = emqx_session_mem:set_field(awaiting_rel, #{1 => Ts1, 2 => Ts2}, Session1), + {ok, [], Timeout, Session3} = emqx_session_mem:expire(clientinfo(), Session2), + ?assertEqual(#{1 => Ts1}, emqx_session_mem:info(awaiting_rel, Session3)), + ?assert(Timeout =< AwaitRelTimeout). t_expire_awaiting_rel_all(_) -> Session = session(#{awaiting_rel => #{1 => 1, 2 => 2}}), {ok, [], Session1} = emqx_session_mem:expire(clientinfo(), Session), - ?assertTimerCancel(expire_awaiting_rel), ?assertEqual(#{}, emqx_session_mem:info(awaiting_rel, Session1)). %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index 95fa229bb..6c0163e4f 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -1155,7 +1155,11 @@ do_publish( ) -> case emqx_mqttsn_session:publish(ClientInfo, MsgId, Msg, Session) of {ok, _PubRes, NSession} -> - handle_out(pubrec, MsgId, Channel#channel{session = NSession}); + NChannel1 = ensure_timer( + expire_awaiting_rel, + Channel#channel{session = NSession} + ), + handle_out(pubrec, MsgId, NChannel1); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ok = metrics_inc(Ctx, 'packets.publish.inuse'), %% XXX: Use PUBACK to reply a PUBLISH Error Code @@ -1980,7 +1984,11 @@ handle_deliver( of {ok, Publishes, NSession} -> NChannel = Channel#channel{session = NSession}, - handle_out(publish, Publishes, NChannel); + handle_out( + publish, + Publishes, + ensure_timer(retry_delivery, NChannel) + ); {ok, NSession} -> {ok, Channel#channel{session = NSession}} end. @@ -2046,27 +2054,41 @@ handle_timeout( end; handle_timeout( _TRef, - {emqx_session, _Name}, + retry_delivery, Channel = #channel{conn_state = disconnected} ) -> {ok, Channel}; handle_timeout( _TRef, - {emqx_session, _Name}, + retry_delivery, Channel = #channel{conn_state = asleep} +) -> + {ok, reset_timer(retry_delivery, Channel)}; +handle_timeout( + _TRef, + _Name = expire_awaiting_rel, + Channel = #channel{conn_state = disconnected} ) -> {ok, Channel}; handle_timeout( _TRef, - {emqx_session, Name}, - Channel = #channel{session = Session, clientinfo = ClientInfo} + Name = expire_awaiting_rel, + Channel = #channel{conn_state = asleep} ) -> + {ok, reset_timer(Name, Channel)}; +handle_timeout( + _TRef, + Name, + Channel = #channel{session = Session, clientinfo = ClientInfo} +) when Name == retry_delivery; Name == expire_awaiting_rel -> case emqx_mqttsn_session:handle_timeout(ClientInfo, Name, Session) of - {ok, [], NSession} -> - {ok, Channel#channel{session = NSession}}; {ok, Publishes, NSession} -> + NChannel = Channel#channel{session = NSession}, + handle_out(publish, Publishes, clean_timer(Name, NChannel)); + {ok, Publishes, Timeout, NSession} -> + NChannel = Channel#channel{session = NSession}, %% XXX: These replay messages should awaiting register acked? - handle_out(publish, Publishes, Channel#channel{session = NSession}) + handle_out(publish, Publishes, reset_timer(Name, Timeout, NChannel)) end; handle_timeout( _TRef, @@ -2195,12 +2217,18 @@ ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> reset_timer(Name, Channel) -> ensure_timer(Name, clean_timer(Name, Channel)). +reset_timer(Name, Time, Channel) -> + ensure_timer(Name, Time, clean_timer(Name, Channel)). + clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. interval(keepalive, #channel{keepalive = KeepAlive}) -> - emqx_keepalive:info(interval, KeepAlive). - + emqx_keepalive:info(interval, KeepAlive); +interval(retry_delivery, #channel{session = Session}) -> + emqx_mqttsn_session:info(retry_interval, Session); +interval(expire_awaiting_rel, #channel{session = Session}) -> + emqx_mqttsn_session:info(await_rel_timeout, Session). %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl index 27adf61a6..3621aa627 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl @@ -131,7 +131,7 @@ with_sess(Fun, Args, Session = #{session := Sess}) -> %% for publish / pubrec / pubcomp / deliver {ok, ResultReplies, Sess1} -> {ok, ResultReplies, Session#{session := Sess1}}; - %% for puback + %% for puback / handle_timeout {ok, Msgs, Replies, Sess1} -> {ok, Msgs, Replies, Session#{session := Sess1}}; %% for any errors