From b35d37c92d03ed127c21dd17b5f440a9ee134843 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Thu, 13 Sep 2018 18:20:41 +0200 Subject: [PATCH] Add new shared subscription dispatch strategy 'random' was already there before this change Added two new strategies: 'sticky' and 'round_robin' 'sticky' is made default as it is the cheapest --- Makefile | 16 +-- priv/emqx.schema | 11 +- src/emqx_shared_sub.erl | 66 +++++++++--- test/emqx_mock_client.erl | 9 +- test/emqx_shared_sub_SUITE.erl | 186 +++++++++++++++++++++++++++++++++ 5 files changed, 264 insertions(+), 24 deletions(-) create mode 100644 test/emqx_shared_sub_SUITE.erl diff --git a/Makefile b/Makefile index 54b13727a..bb866a898 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \ - emqx_mountpoint emqx_listeners emqx_protocol emqx_pool + emqx_mountpoint emqx_listeners emqx_protocol emqx_pool emqx_shared_sub CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) @@ -84,23 +84,23 @@ rebar-cover: coveralls: @rebar3 coveralls send -cuttlefish: deps - @mv ./deps/cuttlefish/cuttlefish ./cuttlefish -rebar-cuttlefish: rebar-deps - @make -C _build/default/lib/cuttlefish - @mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish +cuttlefish: rebar-deps + @if [ ! -f cuttlefish ]; then \ + make -C _build/default/lib/cuttlefish; \ + mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish; \ + fi rebar-deps: @rebar3 get-deps -rebar-eunit: rebar-cuttlefish +rebar-eunit: cuttlefish @rebar3 eunit rebar-compile: @rebar3 compile -rebar-ct: rebar-cuttlefish app.config +rebar-ct: cuttlefish app.config @rebar3 as test compile @ln -s -f '../../../../etc' _build/test/lib/emqx/ @ln -s -f '../../../../data' _build/test/lib/emqx/ diff --git a/priv/emqx.schema b/priv/emqx.schema index c5afd9b43..d0a54c6e0 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1726,9 +1726,16 @@ end}. {datatype, {enum, [local,one,quorum,all]}} ]}. +%% @doc Shared Subscription Dispatch Strategy. {mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [ - {default, random}, - {datatype, {enum, [random, round_robbin, hash]}} + {default, round_robbin}, + {datatype, + {enum, + [random, %% randomly pick a subscriber + round_robbin, %% round robin alive subscribers one message after another + sticky, %% pick a random subscriber and stick to it + hash %% hash client ID to a group member + ]}} ]}. {mapping, "broker.route_batch_clean", "emqx.route_batch_clean", [ diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 0cbfab60a..096af9243 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -26,7 +26,6 @@ -export([start_link/0]). --export([strategy/0]). -export([subscribe/3, unsubscribe/3]). -export([dispatch/3]). @@ -36,6 +35,7 @@ -define(SERVER, ?MODULE). -define(TAB, emqx_shared_subscription). +-define(ALIVE_SUBS, emqx_alive_shared_subscribers). -record(state, {pmon}). -record(emqx_shared_subscription, {group, topic, subpid}). @@ -62,9 +62,9 @@ mnesia(copy) -> start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec(strategy() -> round_robin | random | hash). +-spec(strategy() -> random | round_robin | stiky | hash). strategy() -> - emqx_config:get_env(shared_subscription_strategy, random). + emqx_config:get_env(shared_subscription_strategy, round_robin). subscribe(undefined, _Topic, _SubPid) -> ok; @@ -80,23 +80,56 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. -%% TODO: dispatch strategy, ensure the delivery... dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}) -> - case pick(subscribers(Group, Topic)) of + #message{from = ClientId} = Msg, + case pick(strategy(), ClientId, Group, Topic) of false -> Delivery; SubPid -> SubPid ! {dispatch, Topic, Msg}, Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]} end. -pick([]) -> - false; -pick([SubPid]) -> - SubPid; -pick(SubPids) -> - lists:nth(rand:uniform(length(SubPids)), SubPids). +pick(sticky, ClientId, Group, Topic) -> + Sub0 = erlang:get(shared_sub_sticky), + case is_sub_alive(Sub0) of + true -> + %% the old subscriber is still alive + %% keep using it for sticky strategy + Sub0; + false -> + %% randomly pick one for the first message + Sub = do_pick(random, ClientId, Group, Topic), + %% stick to whatever pick result + erlang:put(shared_sub_sticky, Sub), + Sub + end; +pick(Strategy, ClientId, Group, Topic) -> + do_pick(Strategy, ClientId, Group, Topic). + +do_pick(Strategy, ClientId, Group, Topic) -> + All = subscribers(Group, Topic), + pick_subscriber(Strategy, ClientId, All). + +pick_subscriber(_, _ClientId, []) -> false; +pick_subscriber(_, _ClientId, [Sub]) -> Sub; +pick_subscriber(Strategy, ClientId, Subs) -> + Nth = do_pick_subscriber(Strategy, ClientId, length(Subs)), + lists:nth(Nth, Subs). + +do_pick_subscriber(random, _ClientId, Count) -> + rand:uniform(Count); +do_pick_subscriber(hash, ClientId, Count) -> + 1 + erlang:phash2(ClientId) rem Count; +do_pick_subscriber(round_robin, _ClientId, Count) -> + Rem = case erlang:get(shared_sub_round_robin) of + undefined -> 0; + N -> (N + 1) rem Count + end, + _ = erlang:put(shared_sub_round_robin, Rem), + Rem + 1. subscribers(Group, Topic) -> ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). + %%----------------------------------------------------------------------------- %% gen_server callbacks %%----------------------------------------------------------------------------- @@ -104,6 +137,7 @@ subscribers(Group, Topic) -> init([]) -> {atomic, PMon} = mnesia:transaction(fun init_monitors/0), mnesia:subscribe({table, ?TAB, simple}), + ets:new(?ALIVE_SUBS, [named_table, {read_concurrency, true}, protected]), {ok, update_stats(#state{pmon = PMon})}. init_monitors() -> @@ -117,8 +151,9 @@ handle_call(Req, _From, State) -> {reply, ignored, State}. handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) -> - {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; - + NewPmon = emqx_pmon:monitor(SubPid, PMon), + ets:insert(?ALIVE_SUBS, {SubPid}), + {noreply, update_stats(State#state{pmon = NewPmon})}; handle_cast(Msg, State) -> emqx_logger:error("[SharedSub] unexpected cast: ~p", [Msg]), {noreply, State}. @@ -154,6 +189,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- cleanup_down(SubPid) -> + ets:delete(?ALIVE_SUBS, SubPid), lists:foreach( fun(Record) -> mnesia:dirty_delete_object(?TAB, Record) @@ -162,3 +198,7 @@ cleanup_down(SubPid) -> update_stats(State) -> emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State. +%% erlang:is_process_alive/1 is expensive +%% and does not work with remote pids +is_sub_alive(Sub) -> [] =/= ets:lookup(?ALIVE_SUBS, Sub). + diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index 633e42b3f..ad1e2d22d 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -16,7 +16,8 @@ -behaviour(gen_server). --export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0]). +-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0, + try_get_last_message/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,6 +42,12 @@ get_last_message() -> [{last_message, Msg}] = ets:lookup(?TAB, last_message), Msg. +try_get_last_message() -> + case ets:lookup(?TAB, last_message) of + [{last_message, Msg}] -> Msg; + [] -> false + end. + init([ClientId]) -> Result = lists:member(?TAB, ets:all()), if Result == false -> diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl new file mode 100644 index 000000000..b44a00680 --- /dev/null +++ b/test/emqx_shared_sub_SUITE.erl @@ -0,0 +1,186 @@ + +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_shared_sub_SUITE). + +-export([all/0, init_per_suite/1, end_per_suite/1]). +-export([t_random_basic/1, t_random/1, t_round_robin/1, t_sticky/1, t_hash/1, t_not_so_sticky/1]). + +-include("emqx.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). + +all() -> [t_random_basic, t_random, t_round_robin, t_sticky, t_hash, t_not_so_sticky]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +t_random(_) -> + application:set_env(?APPLICATION, shared_subscription_strategy, random), + ClientId = <<"ClientId">>, + {ok, ConnPid} = emqx_mock_client:start_link(ClientId), + {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), + Message1 = emqx_message:make(<<"ClientId">>, 2, <<"foo">>, <<"hello">>), + emqx_session:subscribe(SPid, [{<<"foo">>, #{qos => 2, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid) =:= [{SPid}], 1000), + emqx_session:publish(SPid, 1, Message1), + ?wait(case emqx_mock_client:try_get_last_message() of + {publish, 1, _} -> true; + Other -> Other + end, 1000), + emqx_session:puback(SPid, 2), + emqx_session:puback(SPid, 3, reasoncode), + emqx_session:pubrec(SPid, 4), + emqx_session:pubrec(SPid, 5, reasoncode), + emqx_session:pubrel(SPid, 6, reasoncode), + emqx_session:pubcomp(SPid, 7, reasoncode), + emqx_mock_client:close_session(ConnPid, SPid), + ok. + +t_round_robin(_) -> + test_two_messages(round_robin). + +t_sticky(_) -> + test_two_messages(sticky). + +t_hash(_) -> + test_two_messages(hash). + +%% if the original subscriber dies, change to another one alive +t_not_so_sticky(_) -> + application:set_env(?APPLICATION, shared_subscription_strategy, sticky), + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1), + {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2), + {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal), + {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal), + Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>), + emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), + emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso + ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + emqx_session:publish(SPid1, 1, Message1), + ?wait(case emqx_mock_client:try_get_last_message() of + {publish, _, #message{payload = <<"hello1">>}} -> true; + Other -> Other + end, 1000), + emqx_session:publish(SPid1, 2, Message2), + ?wait(case emqx_mock_client:try_get_last_message() of + {publish, _, #message{payload = <<"hello2">>}} -> true; + Other -> Other + end, 1000), + emqx_mock_client:close_session(ConnPid2, SPid2), + ?wait(ets:tab2list(emqx_alive_shared_subscribers) =:= [], 1000), + ok. + +test_two_messages(Strategy) -> + application:set_env(?APPLICATION, shared_subscription_strategy, Strategy), + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1), + {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2), + {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal), + {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal), + Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>), + emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), + emqx_session:subscribe(SPid2, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso + ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + emqx_session:publish(SPid1, 1, Message1), + Me = self(), + WaitF = fun(ExpectedPayload) -> + case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of + {true, Pid} -> + Me ! {subscriber, Pid}, + true; + Other -> + Other + end + end, + ?wait(WaitF(<<"hello1">>), 2000), + UsedSubPid1 = receive {subscriber, P1} -> P1 end, + %% publish both messages with SPid1 + emqx_session:publish(SPid1, 2, Message2), + ?wait(WaitF(<<"hello2">>), 2000), + UsedSubPid2 = receive {subscriber, P2} -> P2 end, + case Strategy of + sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2); + round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2); + hash -> ?assert(UsedSubPid1 =:= UsedSubPid2); + _ -> ok + end, +>>>>>>> 38d0d409... Add 'hash' option for shared subscription + emqx_mock_client:close_session(ConnPid1, SPid1), + emqx_mock_client:close_session(ConnPid2, SPid2), + ok. + +%%------------------------------------------------------------------------------ +%% help functions +%%------------------------------------------------------------------------------ + +wait_for(Fn, Ln, F, Timeout) -> + {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end), + wait_for_down(Fn, Ln, Timeout, Pid, Mref, false). + +wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) -> + receive + {'DOWN', Mref, process, Pid, normal} -> + ok; + {'DOWN', Mref, process, Pid, {C, E, S}} -> + erlang:raise(C, {Fn, Ln, E}, S) + after + Timeout -> + case Kill of + true -> + erlang:demonitor(Mref, [flush]), + erlang:exit(Pid, kill), + erlang:error({Fn, Ln, timeout}); + false -> + Pid ! stop, + wait_for_down(Fn, Ln, Timeout, Pid, Mref, true) + end + end. + +wait_loop(_F, true) -> exit(normal); +wait_loop(F, LastRes) -> + Res = catch_call(F), + receive + stop -> erlang:exit(LastRes) + after + 100 -> wait_loop(F, Res) + end. + +catch_call(F) -> + try + case F() of + true -> true; + Other -> erlang:error({unexpected, Other}) + end + catch + C : E : S -> + {C, E, S} + end. +