diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 02422fe55..c6fef6d6f 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -89,7 +89,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}) - end. pick(sticky, ClientId, Group, Topic) -> - Sub0 = erlang:get(shared_sub_sticky), + Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), case is_sub_alive(Sub0) of true -> %% the old subscriber is still alive @@ -99,32 +99,33 @@ pick(sticky, ClientId, Group, Topic) -> %% randomly pick one for the first message Sub = do_pick(random, ClientId, Group, Topic), %% stick to whatever pick result - erlang:put(shared_sub_sticky, Sub), + erlang:put({shared_sub_sticky, Group, Topic}, Sub), Sub end; pick(Strategy, ClientId, Group, Topic) -> do_pick(Strategy, ClientId, Group, Topic). do_pick(Strategy, ClientId, Group, Topic) -> - All = subscribers(Group, Topic), - pick_subscriber(Strategy, ClientId, All). + case subscribers(Group, Topic) of + [] -> false; + [Sub] -> Sub; + All -> pick_subscriber(Group, Topic, Strategy, ClientId, All) + end. -pick_subscriber(_, _ClientId, []) -> false; -pick_subscriber(_, _ClientId, [Sub]) -> Sub; -pick_subscriber(Strategy, ClientId, Subs) -> - Nth = do_pick_subscriber(Strategy, ClientId, length(Subs)), +pick_subscriber(Group, Topic, Strategy, ClientId, Subs) -> + Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, length(Subs)), lists:nth(Nth, Subs). -do_pick_subscriber(random, _ClientId, Count) -> +do_pick_subscriber(_Group, _Topic, random, _ClientId, Count) -> rand:uniform(Count); -do_pick_subscriber(hash, ClientId, Count) -> +do_pick_subscriber(_Group, _Topic, hash, ClientId, Count) -> 1 + erlang:phash2(ClientId) rem Count; -do_pick_subscriber(round_robin, _ClientId, Count) -> - Rem = case erlang:get(shared_sub_round_robin) of +do_pick_subscriber(Group, Topic, round_robin, _ClientId, Count) -> + Rem = case erlang:get({shared_sub_round_robin, Group, Topic}) of undefined -> 0; N -> (N + 1) rem Count end, - _ = erlang:put(shared_sub_round_robin, Rem), + _ = erlang:put({shared_sub_round_robin, Group, Topic}, Rem), Rem + 1. subscribers(Group, Topic) ->