feat(shared_sub): Support hashing from source topic.
This commit is contained in:
parent
5aa63ed203
commit
0087303b25
|
@ -2122,7 +2122,9 @@ broker.session_locking_strategy = quorum
|
|||
## - random
|
||||
## - round_robin
|
||||
## - sticky
|
||||
## - hash
|
||||
## - hash # same as hash_clientid
|
||||
## - hash_clientid
|
||||
## - hash_topic
|
||||
broker.shared_subscription_strategy = random
|
||||
|
||||
## Enable/disable shared dispatch acknowledgement for QoS1 and QoS2 messages
|
||||
|
|
|
@ -58,6 +58,15 @@
|
|||
, code_change/3
|
||||
]).
|
||||
|
||||
-export_type([strategy/0]).
|
||||
|
||||
-type strategy() :: random
|
||||
| round_robin
|
||||
| sticky
|
||||
| hash %% same as hash_clientid, backward compatible
|
||||
| hash_clientid
|
||||
| hash_topic.
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
-define(TAB, emqx_shared_subscription).
|
||||
-define(SHARED_SUBS, emqx_shared_subscriber).
|
||||
|
@ -111,8 +120,8 @@ dispatch(Group, Topic, Delivery) ->
|
|||
dispatch(Group, Topic, Delivery, _FailedSubs = []).
|
||||
|
||||
dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
||||
#message{from = ClientId} = Msg,
|
||||
case pick(strategy(), ClientId, Group, Topic, FailedSubs) of
|
||||
#message{from = ClientId, topic = SourceTopic} = Msg,
|
||||
case pick(strategy(), ClientId, SourceTopic, Group, Topic, FailedSubs) of
|
||||
false ->
|
||||
{error, no_subscribers};
|
||||
{Type, SubPid} ->
|
||||
|
@ -124,7 +133,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
|||
end
|
||||
end.
|
||||
|
||||
-spec(strategy() -> random | round_robin | sticky | hash).
|
||||
-spec(strategy() -> strategy()).
|
||||
strategy() ->
|
||||
emqx:get_env(shared_subscription_strategy, random).
|
||||
|
||||
|
@ -226,7 +235,7 @@ maybe_ack(Msg) ->
|
|||
without_ack_ref(Msg)
|
||||
end.
|
||||
|
||||
pick(sticky, ClientId, Group, Topic, FailedSubs) ->
|
||||
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
||||
Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
|
||||
case is_active_sub(Sub0, FailedSubs) of
|
||||
true ->
|
||||
|
@ -235,15 +244,15 @@ pick(sticky, ClientId, Group, Topic, FailedSubs) ->
|
|||
{fresh, Sub0};
|
||||
false ->
|
||||
%% randomly pick one for the first message
|
||||
{Type, Sub} = do_pick(random, ClientId, Group, Topic, [Sub0 | FailedSubs]),
|
||||
{Type, Sub} = do_pick(random, ClientId, SourceTopic, Group, Topic, [Sub0 | FailedSubs]),
|
||||
%% stick to whatever pick result
|
||||
erlang:put({shared_sub_sticky, Group, Topic}, Sub),
|
||||
{Type, Sub}
|
||||
end;
|
||||
pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
|
||||
do_pick(Strategy, ClientId, Group, Topic, FailedSubs).
|
||||
pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
||||
do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs).
|
||||
|
||||
do_pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
|
||||
do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
||||
All = subscribers(Group, Topic),
|
||||
case All -- FailedSubs of
|
||||
[] when All =:= [] ->
|
||||
|
@ -251,22 +260,27 @@ do_pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
|
|||
false;
|
||||
[] ->
|
||||
%% All offline? pick one anyway
|
||||
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, All)};
|
||||
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)};
|
||||
Subs ->
|
||||
%% More than one available
|
||||
{fresh, pick_subscriber(Group, Topic, Strategy, ClientId, Subs)}
|
||||
{fresh, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs)}
|
||||
end.
|
||||
|
||||
pick_subscriber(_Group, _Topic, _Strategy, _ClientId, [Sub]) -> Sub;
|
||||
pick_subscriber(Group, Topic, Strategy, ClientId, Subs) ->
|
||||
Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, length(Subs)),
|
||||
pick_subscriber(_Group, _Topic, _Strategy, _ClientId, _SourceTopic, [Sub]) -> Sub;
|
||||
pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs) ->
|
||||
Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, length(Subs)),
|
||||
lists:nth(Nth, Subs).
|
||||
|
||||
do_pick_subscriber(_Group, _Topic, random, _ClientId, Count) ->
|
||||
do_pick_subscriber(_Group, _Topic, random, _ClientId, _SourceTopic, Count) ->
|
||||
rand:uniform(Count);
|
||||
do_pick_subscriber(_Group, _Topic, hash, ClientId, Count) ->
|
||||
do_pick_subscriber(Group, Topic, hash, ClientId, SourceTopic, Count) ->
|
||||
%% backward compatible
|
||||
do_pick_subscriber(Group, Topic, hash_clientid, ClientId, SourceTopic, Count);
|
||||
do_pick_subscriber(_Group, _Topic, hash_clientid, ClientId, _SourceTopic, Count) ->
|
||||
1 + erlang:phash2(ClientId) rem Count;
|
||||
do_pick_subscriber(Group, Topic, round_robin, _ClientId, Count) ->
|
||||
do_pick_subscriber(_Group, _Topic, hash_topic, _ClientId, SourceTopic, Count) ->
|
||||
1 + erlang:phash2(SourceTopic) rem Count;
|
||||
do_pick_subscriber(Group, Topic, round_robin, _ClientId, _SourceTopic, Count) ->
|
||||
Rem = case erlang:get({shared_sub_round_robin, Group, Topic}) of
|
||||
undefined -> 0;
|
||||
N -> (N + 1) rem Count
|
||||
|
|
|
@ -169,6 +169,47 @@ t_sticky(_) ->
|
|||
t_hash(_) ->
|
||||
test_two_messages(hash, false).
|
||||
|
||||
t_hash_clinetid(_) ->
|
||||
test_two_messages(hash_clientid, false).
|
||||
|
||||
t_hash_topic(_) ->
|
||||
ok = ensure_config(hash_topic, false),
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
||||
{ok, _} = emqtt:connect(ConnPid1),
|
||||
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
|
||||
{ok, _} = emqtt:connect(ConnPid2),
|
||||
|
||||
Topic1 = <<"foo/bar1">>,
|
||||
Topic2 = <<"foo/bar2">>,
|
||||
?assert(erlang:phash2(Topic1) rem 2 =/= erlang:phash2(Topic2) rem 2),
|
||||
Message1 = emqx_message:make(ClientId1, 0, Topic1, <<"hello1">>),
|
||||
Message2 = emqx_message:make(ClientId1, 0, Topic2, <<"hello2">>),
|
||||
emqtt:subscribe(ConnPid1, {<<"$share/group1/foo/#">>, 0}),
|
||||
emqtt:subscribe(ConnPid2, {<<"$share/group1/foo/#">>, 0}),
|
||||
ct:sleep(100),
|
||||
emqx:publish(Message1),
|
||||
Me = self(),
|
||||
WaitF = fun(ExpectedPayload) ->
|
||||
case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
|
||||
{true, Pid} ->
|
||||
Me ! {subscriber, Pid},
|
||||
true;
|
||||
Other ->
|
||||
Other
|
||||
end
|
||||
end,
|
||||
WaitF(<<"hello1">>),
|
||||
UsedSubPid1 = receive {subscriber, P1} -> P1 end,
|
||||
emqx_broker:publish(Message2),
|
||||
WaitF(<<"hello2">>),
|
||||
UsedSubPid2 = receive {subscriber, P2} -> P2 end,
|
||||
?assert(UsedSubPid1 =/= UsedSubPid2),
|
||||
emqtt:stop(ConnPid1),
|
||||
emqtt:stop(ConnPid2),
|
||||
ok.
|
||||
|
||||
%% if the original subscriber dies, change to another one alive
|
||||
t_not_so_sticky(_) ->
|
||||
ok = ensure_config(sticky),
|
||||
|
|
Loading…
Reference in New Issue