From a6f138b55c2a308abf246094c714f335ddd38292 Mon Sep 17 00:00:00 2001 From: spring2maz <40776645+spring2maz@users.noreply.github.com> Date: Tue, 22 Jan 2019 02:57:37 +0100 Subject: [PATCH] Improve shared sub dispatch implementation. (#2144) Before this change, when shared dispatch ack is enabled (in config) in case all subscribers are offline (all sessions gave negative ack) the message is simply discarded. In this change, it is ensured to have one session picked according to configured dispatch strategy when no subscriber is online. The messages dispatched in such scenario are then queued in session state. --- src/emqx_session.erl | 2 +- src/emqx_shared_sub.erl | 48 +++++++++++++++++++--------------- test/emqx_shared_sub_SUITE.erl | 17 ++++++++++-- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 098d76c52..689d84b29 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -184,7 +184,7 @@ info(State = #state{conn_pid = ConnPid, {upgrade_qos, UpgradeQoS}, {inflight, Inflight}, {retry_interval, RetryInterval}, - {mqueue_len, MQueue}, + {mqueue_len, emqx_mqueue:len(MQueue)}, {awaiting_rel, AwaitingRel}, {max_awaiting_rel, MaxAwaitingRel}, {await_rel_timeout, AwaitRelTimeout}]. diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 67f03a93c..e14963bc7 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -91,18 +91,12 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}, F case pick(strategy(), ClientId, Group, Topic, FailedSubs) of false -> Delivery; - SubPid -> - case do_dispatch(SubPid, Topic, Msg) of + {Type, SubPid} -> + case do_dispatch(SubPid, Topic, Msg, Type) of ok -> Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]}; {error, _Reason} -> - %% failed to dispatch to this sub, try next - %% 'Reason' is discarded so far, meaning for QoS1/2 messages - %% if all subscribers are off line, the dispatch would faile - %% even if there are sessions not expired yet. - %% If required, we can make use of the 'no_connection' reason to perform - %% retry without requiring acks, so the messages can be delivered - %% to sessions of offline clients + %% Failed to dispatch to this sub, try next. dispatch(Group, Topic, Delivery, [SubPid | FailedSubs]) end end. @@ -115,19 +109,23 @@ strategy() -> ack_enabled() -> emqx_config:get_env(shared_dispatch_ack_enabled, false). -do_dispatch(SubPid, Topic, Msg) when SubPid =:= self() -> +do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise _ = erlang:send(SubPid, {dispatch, Topic, Msg}), ok; -do_dispatch(SubPid, Topic, Msg) -> - dispatch_per_qos(SubPid, Topic, Msg). +do_dispatch(SubPid, Topic, Msg, Type) -> + dispatch_per_qos(SubPid, Topic, Msg, Type). %% return either 'ok' (when everything is fine) or 'error' -dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg) -> +dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch _ = erlang:send(SubPid, {dispatch, Topic, Msg}), ok; -dispatch_per_qos(SubPid, Topic, Msg) -> +dispatch_per_qos(SubPid, Topic, Msg, retry) -> + %% Retry implies all subscribers nack:ed, send again without ack + _ = erlang:send(SubPid, {dispatch, Topic, Msg}), + ok; +dispatch_per_qos(SubPid, Topic, Msg, fresh) -> case ack_enabled() of true -> dispatch_with_ack(SubPid, Topic, Msg); @@ -211,24 +209,32 @@ pick(sticky, ClientId, Group, Topic, FailedSubs) -> true -> %% the old subscriber is still alive %% keep using it for sticky strategy - Sub0; + {fresh, Sub0}; false -> %% randomly pick one for the first message - Sub = do_pick(random, ClientId, Group, Topic, FailedSubs), + {Type, Sub} = do_pick(random, ClientId, Group, Topic, [Sub0 | FailedSubs]), %% stick to whatever pick result erlang:put({shared_sub_sticky, Group, Topic}, Sub), - Sub + {Type, Sub} end; pick(Strategy, ClientId, Group, Topic, FailedSubs) -> do_pick(Strategy, ClientId, Group, Topic, FailedSubs). do_pick(Strategy, ClientId, Group, Topic, FailedSubs) -> - case subscribers(Group, Topic) -- FailedSubs of - [] -> false; - [Sub] -> Sub; - All -> pick_subscriber(Group, Topic, Strategy, ClientId, All) + All = subscribers(Group, Topic), + case All -- FailedSubs of + [] when FailedSubs =:= [] -> + %% Genuinely no subscriber + false; + [] -> + %% All offline? pick one anyway + {retry, pick_subscriber(Group, Topic, Strategy, ClientId, All)}; + Subs -> + %% More than one available + {fresh, pick_subscriber(Group, Topic, Strategy, ClientId, Subs)} end. +pick_subscriber(_Group, _Topic, _Strategy, _ClientId, [Sub]) -> Sub; pick_subscriber(Group, Topic, Strategy, ClientId, Subs) -> Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, length(Subs)), lists:nth(Nth, Subs). diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index dd0e20ad1..cc5a5f515 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -72,6 +72,9 @@ t_random_basic(_) -> %% out which member it picked, then close its connection %% send the second message, the message should be 'nack'ed %% by the sticky session and delivered to the 2nd session. +%% After the connection for the 2nd session is also closed, +%% i.e. when all clients are offline, the following message(s) +%% should be delivered randomly. t_no_connection_nack(_) -> ok = ensure_config(sticky), Publisher = <<"publisher">>, @@ -117,7 +120,7 @@ t_no_connection_nack(_) -> %% sleep then make synced calls to session processes to ensure that %% the connection pid's 'EXIT' message is propagated to the session process %% also to be sure sessions are still alive - timer:sleep(5), + timer:sleep(2), _ = emqx_session:info(SPid1), _ = emqx_session:info(SPid2), %% Now we know what is the other still alive connection @@ -128,11 +131,21 @@ t_no_connection_nack(_) -> SendF(Id), ?wait(Received(Id, TheOtherConnPid), 1000) end, PacketIdList), + %% Now close the 2nd (last connection) + emqx_mock_client:stop(TheOtherConnPid), + timer:sleep(2), + %% both sessions should have conn_pid = undefined + ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))), + ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))), + %% send more messages, but all should be queued in session state + lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList), + {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)), + {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)), + ?assertEqual(length(PacketIdList), L1 + L2), %% clean up emqx_mock_client:close_session(PubConnPid), emqx_sm:close_session(SPid1), emqx_sm:close_session(SPid2), - emqx_mock_client:close_session(TheOtherConnPid), ok. t_random(_) ->