diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 5b8985fdb..051ba7b11 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -97,6 +97,7 @@ -define(NACK(Reason), {shared_sub_nack, Reason}). -define(NO_ACK, no_ack). -define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). +-define(SUBSCRIBER_DOWN, noproc). -type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()). @@ -262,7 +263,8 @@ redispatch_shared_message(#message{} = Msg) -> %% Note that dispatch is called with self() in failed subs %% This is done to avoid dispatching back to caller Delivery = #delivery{sender = self(), message = Msg}, - FailedSubs = #{self() => sender}, + %% Self is terminating, it makes no sense to loop-back the dispatch + FailedSubs = #{self() => ?SUBSCRIBER_DOWN}, dispatch(Group, Topic, Delivery, FailedSubs). %% @hidden Return the `redispatch_to` group-topic in the message header. @@ -308,7 +310,7 @@ maybe_ack(Msg) -> pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), - All = subscribers(Group, Topic), + All = subscribers(Group, Topic, FailedSubs), case is_active_sub(Sub0, FailedSubs, All) of true -> %% the old subscriber is still alive @@ -316,27 +318,25 @@ pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> {fresh, Sub0}; false -> %% randomly pick one for the first message - {Type, Sub} = do_pick( - random, - ClientId, - SourceTopic, - Group, - Topic, - FailedSubs#{Sub0 => noproc} - ), - %% stick to whatever pick result - erlang:put({shared_sub_sticky, Group, Topic}, Sub), - {Type, Sub} + FailedSubs1 = FailedSubs#{Sub0 => ?SUBSCRIBER_DOWN}, + Res = do_pick(All, random, ClientId, SourceTopic, Group, Topic, FailedSubs1), + case Res of + {_, Sub} -> + %% stick to whatever pick result + erlang:put({shared_sub_sticky, Group, Topic}, Sub); + _ -> + ok + end, + Res end; pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> - do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs). + All = subscribers(Group, Topic, FailedSubs), + do_pick(All, Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs). -do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> - All = subscribers(Group, Topic), +do_pick([], _Strategy, _ClientId, _SourceTopic, _Group, _Topic, _FailedSubs) -> + false; +do_pick(All, Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> case lists:filter(fun(Sub) -> not maps:is_key(Sub, FailedSubs) end, All) of - [] when All =:= [] -> - %% Genuinely no subscriber - false; [] -> %% All offline? pick one anyway {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)}; @@ -383,6 +383,16 @@ do_pick_subscriber(Group, Topic, round_robin_per_group, _ClientId, _SourceTopic, {Group, Topic}, 0 }). +%% Select ETS table to get all subscriber pids which are not down. +subscribers(Group, Topic, FailedSubs) -> + lists:filter( + fun(P) -> + ?SUBSCRIBER_DOWN =/= maps:get(P, FailedSubs, false) + end, + subscribers(Group, Topic) + ). + +%% Select ETS table to get all subscriber pids. subscribers(Group, Topic) -> ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). @@ -523,8 +533,8 @@ update_stats(State) -> %% Return 'true' if the subscriber process is alive AND not in the failed list is_active_sub(Pid, FailedSubs, All) -> lists:member(Pid, All) andalso - is_alive_sub(Pid) andalso - (not maps:is_key(Pid, FailedSubs)). + (not maps:is_key(Pid, FailedSubs)) andalso + is_alive_sub(Pid). %% erlang:is_process_alive/1 does not work with remote pid. is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->