From 0087303b251baa549921599a23b66524daaaad8f Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Wed, 20 Jan 2021 08:42:31 +0100 Subject: [PATCH] feat(shared_sub): Support hashing from source topic. --- etc/emqx.conf | 4 ++- src/emqx_shared_sub.erl | 46 ++++++++++++++++++++++------------ test/emqx_shared_sub_SUITE.erl | 43 ++++++++++++++++++++++++++++++- 3 files changed, 75 insertions(+), 18 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index bf6e0470f..343622336 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2122,7 +2122,9 @@ broker.session_locking_strategy = quorum ## - random ## - round_robin ## - sticky -## - hash +## - hash # same as hash_clientid +## - hash_clientid +## - hash_topic broker.shared_subscription_strategy = random ## Enable/disable shared dispatch acknowledgement for QoS1 and QoS2 messages diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index f4b9bf78c..edc276f2a 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -58,6 +58,15 @@ , code_change/3 ]). +-export_type([strategy/0]). + +-type strategy() :: random + | round_robin + | sticky + | hash %% same as hash_clientid, backward compatible + | hash_clientid + | hash_topic. + -define(SERVER, ?MODULE). -define(TAB, emqx_shared_subscription). -define(SHARED_SUBS, emqx_shared_subscriber). @@ -111,8 +120,8 @@ dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery, _FailedSubs = []). dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> - #message{from = ClientId} = Msg, - case pick(strategy(), ClientId, Group, Topic, FailedSubs) of + #message{from = ClientId, topic = SourceTopic} = Msg, + case pick(strategy(), ClientId, SourceTopic, Group, Topic, FailedSubs) of false -> {error, no_subscribers}; {Type, SubPid} -> @@ -124,7 +133,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> end end. --spec(strategy() -> random | round_robin | sticky | hash). +-spec(strategy() -> strategy()). strategy() -> emqx:get_env(shared_subscription_strategy, random). @@ -226,7 +235,7 @@ maybe_ack(Msg) -> without_ack_ref(Msg) end. -pick(sticky, ClientId, Group, Topic, FailedSubs) -> +pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), case is_active_sub(Sub0, FailedSubs) of true -> @@ -235,15 +244,15 @@ pick(sticky, ClientId, Group, Topic, FailedSubs) -> {fresh, Sub0}; false -> %% randomly pick one for the first message - {Type, Sub} = do_pick(random, ClientId, Group, Topic, [Sub0 | FailedSubs]), + {Type, Sub} = do_pick(random, ClientId, SourceTopic, Group, Topic, [Sub0 | FailedSubs]), %% stick to whatever pick result erlang:put({shared_sub_sticky, Group, Topic}, Sub), {Type, Sub} end; -pick(Strategy, ClientId, Group, Topic, FailedSubs) -> - do_pick(Strategy, ClientId, Group, Topic, FailedSubs). +pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> + do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs). -do_pick(Strategy, ClientId, Group, Topic, FailedSubs) -> +do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> All = subscribers(Group, Topic), case All -- FailedSubs of [] when All =:= [] -> @@ -251,22 +260,27 @@ do_pick(Strategy, ClientId, Group, Topic, FailedSubs) -> false; [] -> %% All offline? pick one anyway - {retry, pick_subscriber(Group, Topic, Strategy, ClientId, All)}; + {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)}; Subs -> %% More than one available - {fresh, pick_subscriber(Group, Topic, Strategy, ClientId, Subs)} + {fresh, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs)} end. -pick_subscriber(_Group, _Topic, _Strategy, _ClientId, [Sub]) -> Sub; -pick_subscriber(Group, Topic, Strategy, ClientId, Subs) -> - Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, length(Subs)), +pick_subscriber(_Group, _Topic, _Strategy, _ClientId, _SourceTopic, [Sub]) -> Sub; +pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs) -> + Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, length(Subs)), lists:nth(Nth, Subs). -do_pick_subscriber(_Group, _Topic, random, _ClientId, Count) -> +do_pick_subscriber(_Group, _Topic, random, _ClientId, _SourceTopic, Count) -> rand:uniform(Count); -do_pick_subscriber(_Group, _Topic, hash, ClientId, Count) -> +do_pick_subscriber(Group, Topic, hash, ClientId, SourceTopic, Count) -> + %% backward compatible + do_pick_subscriber(Group, Topic, hash_clientid, ClientId, SourceTopic, Count); +do_pick_subscriber(_Group, _Topic, hash_clientid, ClientId, _SourceTopic, Count) -> 1 + erlang:phash2(ClientId) rem Count; -do_pick_subscriber(Group, Topic, round_robin, _ClientId, Count) -> +do_pick_subscriber(_Group, _Topic, hash_topic, _ClientId, SourceTopic, Count) -> + 1 + erlang:phash2(SourceTopic) rem Count; +do_pick_subscriber(Group, Topic, round_robin, _ClientId, _SourceTopic, Count) -> Rem = case erlang:get({shared_sub_round_robin, Group, Topic}) of undefined -> 0; N -> (N + 1) rem Count diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 71f25e96e..c2d7468d9 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -169,6 +169,47 @@ t_sticky(_) -> t_hash(_) -> test_two_messages(hash, false). +t_hash_clinetid(_) -> + test_two_messages(hash_clientid, false). + +t_hash_topic(_) -> + ok = ensure_config(hash_topic, false), + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]), + {ok, _} = emqtt:connect(ConnPid2), + + Topic1 = <<"foo/bar1">>, + Topic2 = <<"foo/bar2">>, + ?assert(erlang:phash2(Topic1) rem 2 =/= erlang:phash2(Topic2) rem 2), + Message1 = emqx_message:make(ClientId1, 0, Topic1, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 0, Topic2, <<"hello2">>), + emqtt:subscribe(ConnPid1, {<<"$share/group1/foo/#">>, 0}), + emqtt:subscribe(ConnPid2, {<<"$share/group1/foo/#">>, 0}), + ct:sleep(100), + emqx:publish(Message1), + Me = self(), + WaitF = fun(ExpectedPayload) -> + case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of + {true, Pid} -> + Me ! {subscriber, Pid}, + true; + Other -> + Other + end + end, + WaitF(<<"hello1">>), + UsedSubPid1 = receive {subscriber, P1} -> P1 end, + emqx_broker:publish(Message2), + WaitF(<<"hello2">>), + UsedSubPid2 = receive {subscriber, P2} -> P2 end, + ?assert(UsedSubPid1 =/= UsedSubPid2), + emqtt:stop(ConnPid1), + emqtt:stop(ConnPid2), + ok. + %% if the original subscriber dies, change to another one alive t_not_so_sticky(_) -> ok = ensure_config(sticky), @@ -246,7 +287,7 @@ last_message(ExpectedPayload, Pids) -> after 100 -> <<"not yet?">> end. - + t_dispatch(_) -> ok = ensure_config(random), Topic = <<"foo">>,