Add new shared subscription dispatch strategy
'random' was already there before this change Added two new strategies: 'sticky' and 'round_robin' 'sticky' is made default as it is the cheapest
This commit is contained in:
parent
8f35d13e17
commit
b35d37c92d
16
Makefile
16
Makefile
|
@ -39,7 +39,7 @@ CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access
|
|||
emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \
|
||||
emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
|
||||
emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \
|
||||
emqx_mountpoint emqx_listeners emqx_protocol emqx_pool
|
||||
emqx_mountpoint emqx_listeners emqx_protocol emqx_pool emqx_shared_sub
|
||||
|
||||
CT_NODE_NAME = emqxct@127.0.0.1
|
||||
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
|
||||
|
@ -84,23 +84,23 @@ rebar-cover:
|
|||
coveralls:
|
||||
@rebar3 coveralls send
|
||||
|
||||
cuttlefish: deps
|
||||
@mv ./deps/cuttlefish/cuttlefish ./cuttlefish
|
||||
|
||||
rebar-cuttlefish: rebar-deps
|
||||
@make -C _build/default/lib/cuttlefish
|
||||
@mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish
|
||||
cuttlefish: rebar-deps
|
||||
@if [ ! -f cuttlefish ]; then \
|
||||
make -C _build/default/lib/cuttlefish; \
|
||||
mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish; \
|
||||
fi
|
||||
|
||||
rebar-deps:
|
||||
@rebar3 get-deps
|
||||
|
||||
rebar-eunit: rebar-cuttlefish
|
||||
rebar-eunit: cuttlefish
|
||||
@rebar3 eunit
|
||||
|
||||
rebar-compile:
|
||||
@rebar3 compile
|
||||
|
||||
rebar-ct: rebar-cuttlefish app.config
|
||||
rebar-ct: cuttlefish app.config
|
||||
@rebar3 as test compile
|
||||
@ln -s -f '../../../../etc' _build/test/lib/emqx/
|
||||
@ln -s -f '../../../../data' _build/test/lib/emqx/
|
||||
|
|
|
@ -1726,9 +1726,16 @@ end}.
|
|||
{datatype, {enum, [local,one,quorum,all]}}
|
||||
]}.
|
||||
|
||||
%% @doc Shared Subscription Dispatch Strategy.
|
||||
{mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [
|
||||
{default, random},
|
||||
{datatype, {enum, [random, round_robbin, hash]}}
|
||||
{default, round_robbin},
|
||||
{datatype,
|
||||
{enum,
|
||||
[random, %% randomly pick a subscriber
|
||||
round_robbin, %% round robin alive subscribers one message after another
|
||||
sticky, %% pick a random subscriber and stick to it
|
||||
hash %% hash client ID to a group member
|
||||
]}}
|
||||
]}.
|
||||
|
||||
{mapping, "broker.route_batch_clean", "emqx.route_batch_clean", [
|
||||
|
|
|
@ -26,7 +26,6 @@
|
|||
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([strategy/0]).
|
||||
-export([subscribe/3, unsubscribe/3]).
|
||||
-export([dispatch/3]).
|
||||
|
||||
|
@ -36,6 +35,7 @@
|
|||
|
||||
-define(SERVER, ?MODULE).
|
||||
-define(TAB, emqx_shared_subscription).
|
||||
-define(ALIVE_SUBS, emqx_alive_shared_subscribers).
|
||||
|
||||
-record(state, {pmon}).
|
||||
-record(emqx_shared_subscription, {group, topic, subpid}).
|
||||
|
@ -62,9 +62,9 @@ mnesia(copy) ->
|
|||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
-spec(strategy() -> round_robin | random | hash).
|
||||
-spec(strategy() -> random | round_robin | stiky | hash).
|
||||
strategy() ->
|
||||
emqx_config:get_env(shared_subscription_strategy, random).
|
||||
emqx_config:get_env(shared_subscription_strategy, round_robin).
|
||||
|
||||
subscribe(undefined, _Topic, _SubPid) ->
|
||||
ok;
|
||||
|
@ -80,23 +80,56 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
|||
record(Group, Topic, SubPid) ->
|
||||
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
|
||||
|
||||
%% TODO: dispatch strategy, ensure the delivery...
|
||||
dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}) ->
|
||||
case pick(subscribers(Group, Topic)) of
|
||||
#message{from = ClientId} = Msg,
|
||||
case pick(strategy(), ClientId, Group, Topic) of
|
||||
false -> Delivery;
|
||||
SubPid -> SubPid ! {dispatch, Topic, Msg},
|
||||
Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]}
|
||||
end.
|
||||
|
||||
pick([]) ->
|
||||
false;
|
||||
pick([SubPid]) ->
|
||||
SubPid;
|
||||
pick(SubPids) ->
|
||||
lists:nth(rand:uniform(length(SubPids)), SubPids).
|
||||
pick(sticky, ClientId, Group, Topic) ->
|
||||
Sub0 = erlang:get(shared_sub_sticky),
|
||||
case is_sub_alive(Sub0) of
|
||||
true ->
|
||||
%% the old subscriber is still alive
|
||||
%% keep using it for sticky strategy
|
||||
Sub0;
|
||||
false ->
|
||||
%% randomly pick one for the first message
|
||||
Sub = do_pick(random, ClientId, Group, Topic),
|
||||
%% stick to whatever pick result
|
||||
erlang:put(shared_sub_sticky, Sub),
|
||||
Sub
|
||||
end;
|
||||
pick(Strategy, ClientId, Group, Topic) ->
|
||||
do_pick(Strategy, ClientId, Group, Topic).
|
||||
|
||||
do_pick(Strategy, ClientId, Group, Topic) ->
|
||||
All = subscribers(Group, Topic),
|
||||
pick_subscriber(Strategy, ClientId, All).
|
||||
|
||||
pick_subscriber(_, _ClientId, []) -> false;
|
||||
pick_subscriber(_, _ClientId, [Sub]) -> Sub;
|
||||
pick_subscriber(Strategy, ClientId, Subs) ->
|
||||
Nth = do_pick_subscriber(Strategy, ClientId, length(Subs)),
|
||||
lists:nth(Nth, Subs).
|
||||
|
||||
do_pick_subscriber(random, _ClientId, Count) ->
|
||||
rand:uniform(Count);
|
||||
do_pick_subscriber(hash, ClientId, Count) ->
|
||||
1 + erlang:phash2(ClientId) rem Count;
|
||||
do_pick_subscriber(round_robin, _ClientId, Count) ->
|
||||
Rem = case erlang:get(shared_sub_round_robin) of
|
||||
undefined -> 0;
|
||||
N -> (N + 1) rem Count
|
||||
end,
|
||||
_ = erlang:put(shared_sub_round_robin, Rem),
|
||||
Rem + 1.
|
||||
|
||||
subscribers(Group, Topic) ->
|
||||
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
|
||||
|
||||
%%-----------------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%-----------------------------------------------------------------------------
|
||||
|
@ -104,6 +137,7 @@ subscribers(Group, Topic) ->
|
|||
init([]) ->
|
||||
{atomic, PMon} = mnesia:transaction(fun init_monitors/0),
|
||||
mnesia:subscribe({table, ?TAB, simple}),
|
||||
ets:new(?ALIVE_SUBS, [named_table, {read_concurrency, true}, protected]),
|
||||
{ok, update_stats(#state{pmon = PMon})}.
|
||||
|
||||
init_monitors() ->
|
||||
|
@ -117,8 +151,9 @@ handle_call(Req, _From, State) ->
|
|||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) ->
|
||||
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
||||
|
||||
NewPmon = emqx_pmon:monitor(SubPid, PMon),
|
||||
ets:insert(?ALIVE_SUBS, {SubPid}),
|
||||
{noreply, update_stats(State#state{pmon = NewPmon})};
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[SharedSub] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
@ -154,6 +189,7 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
cleanup_down(SubPid) ->
|
||||
ets:delete(?ALIVE_SUBS, SubPid),
|
||||
lists:foreach(
|
||||
fun(Record) ->
|
||||
mnesia:dirty_delete_object(?TAB, Record)
|
||||
|
@ -162,3 +198,7 @@ cleanup_down(SubPid) ->
|
|||
update_stats(State) ->
|
||||
emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State.
|
||||
|
||||
%% erlang:is_process_alive/1 is expensive
|
||||
%% and does not work with remote pids
|
||||
is_sub_alive(Sub) -> [] =/= ets:lookup(?ALIVE_SUBS, Sub).
|
||||
|
||||
|
|
|
@ -16,7 +16,8 @@
|
|||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0]).
|
||||
-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0,
|
||||
try_get_last_message/0]).
|
||||
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
@ -41,6 +42,12 @@ get_last_message() ->
|
|||
[{last_message, Msg}] = ets:lookup(?TAB, last_message),
|
||||
Msg.
|
||||
|
||||
try_get_last_message() ->
|
||||
case ets:lookup(?TAB, last_message) of
|
||||
[{last_message, Msg}] -> Msg;
|
||||
[] -> false
|
||||
end.
|
||||
|
||||
init([ClientId]) ->
|
||||
Result = lists:member(?TAB, ets:all()),
|
||||
if Result == false ->
|
||||
|
|
|
@ -0,0 +1,186 @@
|
|||
|
||||
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
|
||||
-module(emqx_shared_sub_SUITE).
|
||||
|
||||
-export([all/0, init_per_suite/1, end_per_suite/1]).
|
||||
-export([t_random_basic/1, t_random/1, t_round_robin/1, t_sticky/1, t_hash/1, t_not_so_sticky/1]).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
|
||||
|
||||
all() -> [t_random_basic, t_random, t_round_robin, t_sticky, t_hash, t_not_so_sticky].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_broker_helpers:run_setup_steps(),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_broker_helpers:run_teardown_steps().
|
||||
|
||||
t_random(_) ->
|
||||
application:set_env(?APPLICATION, shared_subscription_strategy, random),
|
||||
ClientId = <<"ClientId">>,
|
||||
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),
|
||||
{ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
|
||||
Message1 = emqx_message:make(<<"ClientId">>, 2, <<"foo">>, <<"hello">>),
|
||||
emqx_session:subscribe(SPid, [{<<"foo">>, #{qos => 2, share => <<"group1">>}}]),
|
||||
%% wait for the subscription to show up
|
||||
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid) =:= [{SPid}], 1000),
|
||||
emqx_session:publish(SPid, 1, Message1),
|
||||
?wait(case emqx_mock_client:try_get_last_message() of
|
||||
{publish, 1, _} -> true;
|
||||
Other -> Other
|
||||
end, 1000),
|
||||
emqx_session:puback(SPid, 2),
|
||||
emqx_session:puback(SPid, 3, reasoncode),
|
||||
emqx_session:pubrec(SPid, 4),
|
||||
emqx_session:pubrec(SPid, 5, reasoncode),
|
||||
emqx_session:pubrel(SPid, 6, reasoncode),
|
||||
emqx_session:pubcomp(SPid, 7, reasoncode),
|
||||
emqx_mock_client:close_session(ConnPid, SPid),
|
||||
ok.
|
||||
|
||||
t_round_robin(_) ->
|
||||
test_two_messages(round_robin).
|
||||
|
||||
t_sticky(_) ->
|
||||
test_two_messages(sticky).
|
||||
|
||||
t_hash(_) ->
|
||||
test_two_messages(hash).
|
||||
|
||||
%% if the original subscriber dies, change to another one alive
|
||||
t_not_so_sticky(_) ->
|
||||
application:set_env(?APPLICATION, shared_subscription_strategy, sticky),
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
{ok, ConnPid1} = emqx_mock_client:start_link(ClientId1),
|
||||
{ok, ConnPid2} = emqx_mock_client:start_link(ClientId2),
|
||||
{ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal),
|
||||
{ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal),
|
||||
Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>),
|
||||
Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>),
|
||||
emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]),
|
||||
emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]),
|
||||
%% wait for the subscription to show up
|
||||
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso
|
||||
ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000),
|
||||
emqx_session:publish(SPid1, 1, Message1),
|
||||
?wait(case emqx_mock_client:try_get_last_message() of
|
||||
{publish, _, #message{payload = <<"hello1">>}} -> true;
|
||||
Other -> Other
|
||||
end, 1000),
|
||||
emqx_session:publish(SPid1, 2, Message2),
|
||||
?wait(case emqx_mock_client:try_get_last_message() of
|
||||
{publish, _, #message{payload = <<"hello2">>}} -> true;
|
||||
Other -> Other
|
||||
end, 1000),
|
||||
emqx_mock_client:close_session(ConnPid2, SPid2),
|
||||
?wait(ets:tab2list(emqx_alive_shared_subscribers) =:= [], 1000),
|
||||
ok.
|
||||
|
||||
test_two_messages(Strategy) ->
|
||||
application:set_env(?APPLICATION, shared_subscription_strategy, Strategy),
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
{ok, ConnPid1} = emqx_mock_client:start_link(ClientId1),
|
||||
{ok, ConnPid2} = emqx_mock_client:start_link(ClientId2),
|
||||
{ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal),
|
||||
{ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal),
|
||||
Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>),
|
||||
Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>),
|
||||
emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]),
|
||||
emqx_session:subscribe(SPid2, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]),
|
||||
%% wait for the subscription to show up
|
||||
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso
|
||||
ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000),
|
||||
emqx_session:publish(SPid1, 1, Message1),
|
||||
Me = self(),
|
||||
WaitF = fun(ExpectedPayload) ->
|
||||
case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
|
||||
{true, Pid} ->
|
||||
Me ! {subscriber, Pid},
|
||||
true;
|
||||
Other ->
|
||||
Other
|
||||
end
|
||||
end,
|
||||
?wait(WaitF(<<"hello1">>), 2000),
|
||||
UsedSubPid1 = receive {subscriber, P1} -> P1 end,
|
||||
%% publish both messages with SPid1
|
||||
emqx_session:publish(SPid1, 2, Message2),
|
||||
?wait(WaitF(<<"hello2">>), 2000),
|
||||
UsedSubPid2 = receive {subscriber, P2} -> P2 end,
|
||||
case Strategy of
|
||||
sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2);
|
||||
round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2);
|
||||
hash -> ?assert(UsedSubPid1 =:= UsedSubPid2);
|
||||
_ -> ok
|
||||
end,
|
||||
>>>>>>> 38d0d409... Add 'hash' option for shared subscription
|
||||
emqx_mock_client:close_session(ConnPid1, SPid1),
|
||||
emqx_mock_client:close_session(ConnPid2, SPid2),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% help functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
wait_for(Fn, Ln, F, Timeout) ->
|
||||
{Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end),
|
||||
wait_for_down(Fn, Ln, Timeout, Pid, Mref, false).
|
||||
|
||||
wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) ->
|
||||
receive
|
||||
{'DOWN', Mref, process, Pid, normal} ->
|
||||
ok;
|
||||
{'DOWN', Mref, process, Pid, {C, E, S}} ->
|
||||
erlang:raise(C, {Fn, Ln, E}, S)
|
||||
after
|
||||
Timeout ->
|
||||
case Kill of
|
||||
true ->
|
||||
erlang:demonitor(Mref, [flush]),
|
||||
erlang:exit(Pid, kill),
|
||||
erlang:error({Fn, Ln, timeout});
|
||||
false ->
|
||||
Pid ! stop,
|
||||
wait_for_down(Fn, Ln, Timeout, Pid, Mref, true)
|
||||
end
|
||||
end.
|
||||
|
||||
wait_loop(_F, true) -> exit(normal);
|
||||
wait_loop(F, LastRes) ->
|
||||
Res = catch_call(F),
|
||||
receive
|
||||
stop -> erlang:exit(LastRes)
|
||||
after
|
||||
100 -> wait_loop(F, Res)
|
||||
end.
|
||||
|
||||
catch_call(F) ->
|
||||
try
|
||||
case F() of
|
||||
true -> true;
|
||||
Other -> erlang:error({unexpected, Other})
|
||||
end
|
||||
catch
|
||||
C : E : S ->
|
||||
{C, E, S}
|
||||
end.
|
||||
|
Loading…
Reference in New Issue