Improve shared sub dispatch implementation. (#2144)
Before this change, when shared dispatch ack is enabled (in config) in case all subscribers are offline (all sessions gave negative ack) the message is simply discarded. In this change, it is ensured to have one session picked according to configured dispatch strategy when no subscriber is online. The messages dispatched in such scenario are then queued in session state.
This commit is contained in:
parent
55ec358cd6
commit
a6f138b55c
|
@ -184,7 +184,7 @@ info(State = #state{conn_pid = ConnPid,
|
|||
{upgrade_qos, UpgradeQoS},
|
||||
{inflight, Inflight},
|
||||
{retry_interval, RetryInterval},
|
||||
{mqueue_len, MQueue},
|
||||
{mqueue_len, emqx_mqueue:len(MQueue)},
|
||||
{awaiting_rel, AwaitingRel},
|
||||
{max_awaiting_rel, MaxAwaitingRel},
|
||||
{await_rel_timeout, AwaitRelTimeout}].
|
||||
|
|
|
@ -91,18 +91,12 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}, F
|
|||
case pick(strategy(), ClientId, Group, Topic, FailedSubs) of
|
||||
false ->
|
||||
Delivery;
|
||||
SubPid ->
|
||||
case do_dispatch(SubPid, Topic, Msg) of
|
||||
{Type, SubPid} ->
|
||||
case do_dispatch(SubPid, Topic, Msg, Type) of
|
||||
ok ->
|
||||
Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]};
|
||||
{error, _Reason} ->
|
||||
%% failed to dispatch to this sub, try next
|
||||
%% 'Reason' is discarded so far, meaning for QoS1/2 messages
|
||||
%% if all subscribers are off line, the dispatch would faile
|
||||
%% even if there are sessions not expired yet.
|
||||
%% If required, we can make use of the 'no_connection' reason to perform
|
||||
%% retry without requiring acks, so the messages can be delivered
|
||||
%% to sessions of offline clients
|
||||
%% Failed to dispatch to this sub, try next.
|
||||
dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])
|
||||
end
|
||||
end.
|
||||
|
@ -115,19 +109,23 @@ strategy() ->
|
|||
ack_enabled() ->
|
||||
emqx_config:get_env(shared_dispatch_ack_enabled, false).
|
||||
|
||||
do_dispatch(SubPid, Topic, Msg) when SubPid =:= self() ->
|
||||
do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() ->
|
||||
%% Deadlock otherwise
|
||||
_ = erlang:send(SubPid, {dispatch, Topic, Msg}),
|
||||
ok;
|
||||
do_dispatch(SubPid, Topic, Msg) ->
|
||||
dispatch_per_qos(SubPid, Topic, Msg).
|
||||
do_dispatch(SubPid, Topic, Msg, Type) ->
|
||||
dispatch_per_qos(SubPid, Topic, Msg, Type).
|
||||
|
||||
%% return either 'ok' (when everything is fine) or 'error'
|
||||
dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg) ->
|
||||
dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
||||
%% For QoS 0 message, send it as regular dispatch
|
||||
_ = erlang:send(SubPid, {dispatch, Topic, Msg}),
|
||||
ok;
|
||||
dispatch_per_qos(SubPid, Topic, Msg) ->
|
||||
dispatch_per_qos(SubPid, Topic, Msg, retry) ->
|
||||
%% Retry implies all subscribers nack:ed, send again without ack
|
||||
_ = erlang:send(SubPid, {dispatch, Topic, Msg}),
|
||||
ok;
|
||||
dispatch_per_qos(SubPid, Topic, Msg, fresh) ->
|
||||
case ack_enabled() of
|
||||
true ->
|
||||
dispatch_with_ack(SubPid, Topic, Msg);
|
||||
|
@ -211,24 +209,32 @@ pick(sticky, ClientId, Group, Topic, FailedSubs) ->
|
|||
true ->
|
||||
%% the old subscriber is still alive
|
||||
%% keep using it for sticky strategy
|
||||
Sub0;
|
||||
{fresh, Sub0};
|
||||
false ->
|
||||
%% randomly pick one for the first message
|
||||
Sub = do_pick(random, ClientId, Group, Topic, FailedSubs),
|
||||
{Type, Sub} = do_pick(random, ClientId, Group, Topic, [Sub0 | FailedSubs]),
|
||||
%% stick to whatever pick result
|
||||
erlang:put({shared_sub_sticky, Group, Topic}, Sub),
|
||||
Sub
|
||||
{Type, Sub}
|
||||
end;
|
||||
pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
|
||||
do_pick(Strategy, ClientId, Group, Topic, FailedSubs).
|
||||
|
||||
do_pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
|
||||
case subscribers(Group, Topic) -- FailedSubs of
|
||||
[] -> false;
|
||||
[Sub] -> Sub;
|
||||
All -> pick_subscriber(Group, Topic, Strategy, ClientId, All)
|
||||
All = subscribers(Group, Topic),
|
||||
case All -- FailedSubs of
|
||||
[] when FailedSubs =:= [] ->
|
||||
%% Genuinely no subscriber
|
||||
false;
|
||||
[] ->
|
||||
%% All offline? pick one anyway
|
||||
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, All)};
|
||||
Subs ->
|
||||
%% More than one available
|
||||
{fresh, pick_subscriber(Group, Topic, Strategy, ClientId, Subs)}
|
||||
end.
|
||||
|
||||
pick_subscriber(_Group, _Topic, _Strategy, _ClientId, [Sub]) -> Sub;
|
||||
pick_subscriber(Group, Topic, Strategy, ClientId, Subs) ->
|
||||
Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, length(Subs)),
|
||||
lists:nth(Nth, Subs).
|
||||
|
|
|
@ -72,6 +72,9 @@ t_random_basic(_) ->
|
|||
%% out which member it picked, then close its connection
|
||||
%% send the second message, the message should be 'nack'ed
|
||||
%% by the sticky session and delivered to the 2nd session.
|
||||
%% After the connection for the 2nd session is also closed,
|
||||
%% i.e. when all clients are offline, the following message(s)
|
||||
%% should be delivered randomly.
|
||||
t_no_connection_nack(_) ->
|
||||
ok = ensure_config(sticky),
|
||||
Publisher = <<"publisher">>,
|
||||
|
@ -117,7 +120,7 @@ t_no_connection_nack(_) ->
|
|||
%% sleep then make synced calls to session processes to ensure that
|
||||
%% the connection pid's 'EXIT' message is propagated to the session process
|
||||
%% also to be sure sessions are still alive
|
||||
timer:sleep(5),
|
||||
timer:sleep(2),
|
||||
_ = emqx_session:info(SPid1),
|
||||
_ = emqx_session:info(SPid2),
|
||||
%% Now we know what is the other still alive connection
|
||||
|
@ -128,11 +131,21 @@ t_no_connection_nack(_) ->
|
|||
SendF(Id),
|
||||
?wait(Received(Id, TheOtherConnPid), 1000)
|
||||
end, PacketIdList),
|
||||
%% Now close the 2nd (last connection)
|
||||
emqx_mock_client:stop(TheOtherConnPid),
|
||||
timer:sleep(2),
|
||||
%% both sessions should have conn_pid = undefined
|
||||
?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
|
||||
?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
|
||||
%% send more messages, but all should be queued in session state
|
||||
lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
|
||||
{_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
|
||||
{_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
|
||||
?assertEqual(length(PacketIdList), L1 + L2),
|
||||
%% clean up
|
||||
emqx_mock_client:close_session(PubConnPid),
|
||||
emqx_sm:close_session(SPid1),
|
||||
emqx_sm:close_session(SPid2),
|
||||
emqx_mock_client:close_session(TheOtherConnPid),
|
||||
ok.
|
||||
|
||||
t_random(_) ->
|
||||
|
|
Loading…
Reference in New Issue