diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 098d76c52..689d84b29 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -184,7 +184,7 @@ info(State = #state{conn_pid = ConnPid, {upgrade_qos, UpgradeQoS}, {inflight, Inflight}, {retry_interval, RetryInterval}, - {mqueue_len, MQueue}, + {mqueue_len, emqx_mqueue:len(MQueue)}, {awaiting_rel, AwaitingRel}, {max_awaiting_rel, MaxAwaitingRel}, {await_rel_timeout, AwaitRelTimeout}]. diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 67f03a93c..e14963bc7 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -91,18 +91,12 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}, F case pick(strategy(), ClientId, Group, Topic, FailedSubs) of false -> Delivery; - SubPid -> - case do_dispatch(SubPid, Topic, Msg) of + {Type, SubPid} -> + case do_dispatch(SubPid, Topic, Msg, Type) of ok -> Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]}; {error, _Reason} -> - %% failed to dispatch to this sub, try next - %% 'Reason' is discarded so far, meaning for QoS1/2 messages - %% if all subscribers are off line, the dispatch would faile - %% even if there are sessions not expired yet. - %% If required, we can make use of the 'no_connection' reason to perform - %% retry without requiring acks, so the messages can be delivered - %% to sessions of offline clients + %% Failed to dispatch to this sub, try next. dispatch(Group, Topic, Delivery, [SubPid | FailedSubs]) end end. @@ -115,19 +109,23 @@ strategy() -> ack_enabled() -> emqx_config:get_env(shared_dispatch_ack_enabled, false). -do_dispatch(SubPid, Topic, Msg) when SubPid =:= self() -> +do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise _ = erlang:send(SubPid, {dispatch, Topic, Msg}), ok; -do_dispatch(SubPid, Topic, Msg) -> - dispatch_per_qos(SubPid, Topic, Msg). +do_dispatch(SubPid, Topic, Msg, Type) -> + dispatch_per_qos(SubPid, Topic, Msg, Type). %% return either 'ok' (when everything is fine) or 'error' -dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg) -> +dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch _ = erlang:send(SubPid, {dispatch, Topic, Msg}), ok; -dispatch_per_qos(SubPid, Topic, Msg) -> +dispatch_per_qos(SubPid, Topic, Msg, retry) -> + %% Retry implies all subscribers nack:ed, send again without ack + _ = erlang:send(SubPid, {dispatch, Topic, Msg}), + ok; +dispatch_per_qos(SubPid, Topic, Msg, fresh) -> case ack_enabled() of true -> dispatch_with_ack(SubPid, Topic, Msg); @@ -211,24 +209,32 @@ pick(sticky, ClientId, Group, Topic, FailedSubs) -> true -> %% the old subscriber is still alive %% keep using it for sticky strategy - Sub0; + {fresh, Sub0}; false -> %% randomly pick one for the first message - Sub = do_pick(random, ClientId, Group, Topic, FailedSubs), + {Type, Sub} = do_pick(random, ClientId, Group, Topic, [Sub0 | FailedSubs]), %% stick to whatever pick result erlang:put({shared_sub_sticky, Group, Topic}, Sub), - Sub + {Type, Sub} end; pick(Strategy, ClientId, Group, Topic, FailedSubs) -> do_pick(Strategy, ClientId, Group, Topic, FailedSubs). do_pick(Strategy, ClientId, Group, Topic, FailedSubs) -> - case subscribers(Group, Topic) -- FailedSubs of - [] -> false; - [Sub] -> Sub; - All -> pick_subscriber(Group, Topic, Strategy, ClientId, All) + All = subscribers(Group, Topic), + case All -- FailedSubs of + [] when FailedSubs =:= [] -> + %% Genuinely no subscriber + false; + [] -> + %% All offline? pick one anyway + {retry, pick_subscriber(Group, Topic, Strategy, ClientId, All)}; + Subs -> + %% More than one available + {fresh, pick_subscriber(Group, Topic, Strategy, ClientId, Subs)} end. +pick_subscriber(_Group, _Topic, _Strategy, _ClientId, [Sub]) -> Sub; pick_subscriber(Group, Topic, Strategy, ClientId, Subs) -> Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, length(Subs)), lists:nth(Nth, Subs). diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index dd0e20ad1..cc5a5f515 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -72,6 +72,9 @@ t_random_basic(_) -> %% out which member it picked, then close its connection %% send the second message, the message should be 'nack'ed %% by the sticky session and delivered to the 2nd session. +%% After the connection for the 2nd session is also closed, +%% i.e. when all clients are offline, the following message(s) +%% should be delivered randomly. t_no_connection_nack(_) -> ok = ensure_config(sticky), Publisher = <<"publisher">>, @@ -117,7 +120,7 @@ t_no_connection_nack(_) -> %% sleep then make synced calls to session processes to ensure that %% the connection pid's 'EXIT' message is propagated to the session process %% also to be sure sessions are still alive - timer:sleep(5), + timer:sleep(2), _ = emqx_session:info(SPid1), _ = emqx_session:info(SPid2), %% Now we know what is the other still alive connection @@ -128,11 +131,21 @@ t_no_connection_nack(_) -> SendF(Id), ?wait(Received(Id, TheOtherConnPid), 1000) end, PacketIdList), + %% Now close the 2nd (last connection) + emqx_mock_client:stop(TheOtherConnPid), + timer:sleep(2), + %% both sessions should have conn_pid = undefined + ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))), + ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))), + %% send more messages, but all should be queued in session state + lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList), + {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)), + {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)), + ?assertEqual(length(PacketIdList), L1 + L2), %% clean up emqx_mock_client:close_session(PubConnPid), emqx_sm:close_session(SPid1), emqx_sm:close_session(SPid2), - emqx_mock_client:close_session(TheOtherConnPid), ok. t_random(_) ->