diff --git a/Makefile b/Makefile index f482050c3..c5033df7b 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \ - emqx_mountpoint emqx_listeners emqx_protocol emqx_pool + emqx_mountpoint emqx_listeners emqx_protocol emqx_pool emqx_shared_sub CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) @@ -84,23 +84,23 @@ rebar-cover: coveralls: @rebar3 coveralls send -cuttlefish: deps - @mv ./deps/cuttlefish/cuttlefish ./cuttlefish -rebar-cuttlefish: rebar-deps - @make -C _build/default/lib/cuttlefish - @mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish +cuttlefish: rebar-deps + @if [ ! -f cuttlefish ]; then \ + make -C _build/default/lib/cuttlefish; \ + mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish; \ + fi rebar-deps: @rebar3 get-deps -rebar-eunit: rebar-cuttlefish +rebar-eunit: cuttlefish @rebar3 eunit rebar-compile: @rebar3 compile -rebar-ct: rebar-cuttlefish app.config +rebar-ct: cuttlefish app.config @rebar3 as test compile @ln -s -f '../../../../etc' _build/test/lib/emqx/ @ln -s -f '../../../../data' _build/test/lib/emqx/ diff --git a/priv/emqx.schema b/priv/emqx.schema index 2aa0a35a1..3d9328bf2 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1734,9 +1734,16 @@ end}. {datatype, {enum, [local,one,quorum,all]}} ]}. +%% @doc Shared Subscription Dispatch Strategy. {mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [ - {default, random}, - {datatype, {enum, [random, round_robbin, hash]}} + {default, round_robbin}, + {datatype, + {enum, + [random, %% randomly pick a subscriber + round_robbin, %% round robin alive subscribers one message after another + sticky, %% pick a random subscriber and stick to it + hash %% hash client ID to a group member + ]}} ]}. {mapping, "broker.route_batch_clean", "emqx.route_batch_clean", [ diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 0cbfab60a..096af9243 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -26,7 +26,6 @@ -export([start_link/0]). --export([strategy/0]). -export([subscribe/3, unsubscribe/3]). -export([dispatch/3]). @@ -36,6 +35,7 @@ -define(SERVER, ?MODULE). -define(TAB, emqx_shared_subscription). +-define(ALIVE_SUBS, emqx_alive_shared_subscribers). -record(state, {pmon}). -record(emqx_shared_subscription, {group, topic, subpid}). @@ -62,9 +62,9 @@ mnesia(copy) -> start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec(strategy() -> round_robin | random | hash). +-spec(strategy() -> random | round_robin | stiky | hash). strategy() -> - emqx_config:get_env(shared_subscription_strategy, random). + emqx_config:get_env(shared_subscription_strategy, round_robin). subscribe(undefined, _Topic, _SubPid) -> ok; @@ -80,23 +80,56 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. -%% TODO: dispatch strategy, ensure the delivery... dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}) -> - case pick(subscribers(Group, Topic)) of + #message{from = ClientId} = Msg, + case pick(strategy(), ClientId, Group, Topic) of false -> Delivery; SubPid -> SubPid ! {dispatch, Topic, Msg}, Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]} end. -pick([]) -> - false; -pick([SubPid]) -> - SubPid; -pick(SubPids) -> - lists:nth(rand:uniform(length(SubPids)), SubPids). +pick(sticky, ClientId, Group, Topic) -> + Sub0 = erlang:get(shared_sub_sticky), + case is_sub_alive(Sub0) of + true -> + %% the old subscriber is still alive + %% keep using it for sticky strategy + Sub0; + false -> + %% randomly pick one for the first message + Sub = do_pick(random, ClientId, Group, Topic), + %% stick to whatever pick result + erlang:put(shared_sub_sticky, Sub), + Sub + end; +pick(Strategy, ClientId, Group, Topic) -> + do_pick(Strategy, ClientId, Group, Topic). + +do_pick(Strategy, ClientId, Group, Topic) -> + All = subscribers(Group, Topic), + pick_subscriber(Strategy, ClientId, All). + +pick_subscriber(_, _ClientId, []) -> false; +pick_subscriber(_, _ClientId, [Sub]) -> Sub; +pick_subscriber(Strategy, ClientId, Subs) -> + Nth = do_pick_subscriber(Strategy, ClientId, length(Subs)), + lists:nth(Nth, Subs). + +do_pick_subscriber(random, _ClientId, Count) -> + rand:uniform(Count); +do_pick_subscriber(hash, ClientId, Count) -> + 1 + erlang:phash2(ClientId) rem Count; +do_pick_subscriber(round_robin, _ClientId, Count) -> + Rem = case erlang:get(shared_sub_round_robin) of + undefined -> 0; + N -> (N + 1) rem Count + end, + _ = erlang:put(shared_sub_round_robin, Rem), + Rem + 1. subscribers(Group, Topic) -> ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). + %%----------------------------------------------------------------------------- %% gen_server callbacks %%----------------------------------------------------------------------------- @@ -104,6 +137,7 @@ subscribers(Group, Topic) -> init([]) -> {atomic, PMon} = mnesia:transaction(fun init_monitors/0), mnesia:subscribe({table, ?TAB, simple}), + ets:new(?ALIVE_SUBS, [named_table, {read_concurrency, true}, protected]), {ok, update_stats(#state{pmon = PMon})}. init_monitors() -> @@ -117,8 +151,9 @@ handle_call(Req, _From, State) -> {reply, ignored, State}. handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) -> - {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; - + NewPmon = emqx_pmon:monitor(SubPid, PMon), + ets:insert(?ALIVE_SUBS, {SubPid}), + {noreply, update_stats(State#state{pmon = NewPmon})}; handle_cast(Msg, State) -> emqx_logger:error("[SharedSub] unexpected cast: ~p", [Msg]), {noreply, State}. @@ -154,6 +189,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- cleanup_down(SubPid) -> + ets:delete(?ALIVE_SUBS, SubPid), lists:foreach( fun(Record) -> mnesia:dirty_delete_object(?TAB, Record) @@ -162,3 +198,7 @@ cleanup_down(SubPid) -> update_stats(State) -> emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State. +%% erlang:is_process_alive/1 is expensive +%% and does not work with remote pids +is_sub_alive(Sub) -> [] =/= ets:lookup(?ALIVE_SUBS, Sub). + diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index 633e42b3f..4528239e6 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -16,14 +16,12 @@ -behaviour(gen_server). --export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0]). +-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {clean_start, client_id, client_pid}). - --define(TAB, messages). +-record(state, {clean_start, client_id, client_pid, last_msg}). start_link(ClientId) -> gen_server:start_link(?MODULE, [ClientId], []). @@ -37,19 +35,14 @@ close_session(ClientPid, SessPid) -> stop(CPid) -> gen_server:call(CPid, stop). -get_last_message() -> - [{last_message, Msg}] = ets:lookup(?TAB, last_message), - Msg. +get_last_message(Pid) -> + gen_server:call(Pid, get_last_message). init([ClientId]) -> - Result = lists:member(?TAB, ets:all()), - if Result == false -> - ets:new(?TAB, [set, named_table, public]); - true -> ok - end, - {ok, - #state{clean_start = true, - client_id = ClientId} + {ok, #state{clean_start = true, + client_id = ClientId, + last_msg = undefined + } }. handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> @@ -61,28 +54,26 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> expiry_interval => 0 }, {ok, SessPid} = emqx_sm:open_session(Attrs), - {reply, {ok, SessPid}, State#state{ - clean_start = true, - client_id = ClientId, - client_pid = ClientPid - }}; - + {reply, {ok, SessPid}, + State#state{clean_start = true, + client_id = ClientId, + client_pid = ClientPid + }}; handle_call({stop_session, SessPid}, _From, State) -> emqx_sm:close_session(SessPid), {stop, normal, ok, State}; - +handle_call(get_last_message, _From, #state{last_msg = Msg} = State) -> + {reply, Msg, State}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; - handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. -handle_info({_, Msg}, State) -> - ets:insert(?TAB, {last_message, Msg}), - {noreply, State}; +handle_info({deliver, Msg}, State) -> + {noreply, State#state{last_msg = Msg}}; handle_info(_Info, State) -> {noreply, State}. diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 2b60b747d..f79b84557 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -40,7 +40,7 @@ t_session_all(_) -> [{<<"topic">>, _}] = emqx:subscriptions({SPid, <<"ClientId">>}), emqx_session:publish(SPid, 1, Message1), timer:sleep(200), - {publish, 1, _} = emqx_mock_client:get_last_message(), + {publish, 1, _} = emqx_mock_client:get_last_message(ConnPid), emqx_session:puback(SPid, 2), emqx_session:puback(SPid, 3, reasoncode), emqx_session:pubrec(SPid, 4), diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl new file mode 100644 index 000000000..8eb309001 --- /dev/null +++ b/test/emqx_shared_sub_SUITE.erl @@ -0,0 +1,198 @@ + +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_shared_sub_SUITE). + +-export([all/0, init_per_suite/1, end_per_suite/1]). +-export([t_random_basic/1, t_random/1, t_round_robin/1, t_sticky/1, t_hash/1, t_not_so_sticky/1]). + +-include("emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). + +all() -> [t_random_basic, t_random, t_round_robin, t_sticky, t_hash, t_not_so_sticky]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +t_random_basic(_) -> + application:set_env(?APPLICATION, shared_subscription_strategy, random), + ClientId = <<"ClientId">>, + {ok, ConnPid} = emqx_mock_client:start_link(ClientId), + {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), + Message1 = emqx_message:make(<<"ClientId">>, 2, <<"foo">>, <<"hello">>), + emqx_session:subscribe(SPid, [{<<"foo">>, #{qos => 2, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid) =:= [{SPid}], 1000), + emqx_session:publish(SPid, 1, Message1), + ?wait(case emqx_mock_client:get_last_message(ConnPid) of + {publish, 1, _} -> true; + Other -> Other + end, 1000), + emqx_session:puback(SPid, 2), + emqx_session:puback(SPid, 3, reasoncode), + emqx_session:pubrec(SPid, 4), + emqx_session:pubrec(SPid, 5, reasoncode), + emqx_session:pubrel(SPid, 6, reasoncode), + emqx_session:pubcomp(SPid, 7, reasoncode), + emqx_mock_client:close_session(ConnPid, SPid), + ok. + +t_random(_) -> + test_two_messages(random). + +t_round_robin(_) -> + test_two_messages(round_robin). + +t_sticky(_) -> + test_two_messages(sticky). + +t_hash(_) -> + test_two_messages(hash). + +%% if the original subscriber dies, change to another one alive +t_not_so_sticky(_) -> + application:set_env(?APPLICATION, shared_subscription_strategy, sticky), + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1), + {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2), + {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal), + {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal), + Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>), + emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}], 1000), + emqx_session:publish(SPid1, 1, Message1), + ?wait(case emqx_mock_client:get_last_message(ConnPid1) of + {publish, _, #message{payload = <<"hello1">>}} -> true; + Other -> Other + end, 1000), + emqx_mock_client:close_session(ConnPid1, SPid1), + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [], 1000), + emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]), + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + emqx_session:publish(SPid2, 2, Message2), + ?wait(case emqx_mock_client:get_last_message(ConnPid2) of + {publish, _, #message{payload = <<"hello2">>}} -> true; + Other -> Other + end, 1000), + emqx_mock_client:close_session(ConnPid2, SPid2), + ?wait(ets:tab2list(emqx_alive_shared_subscribers) =:= [], 1000), + ok. + +test_two_messages(Strategy) -> + application:set_env(?APPLICATION, shared_subscription_strategy, Strategy), + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1), + {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2), + {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal), + {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal), + Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>), + emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), + emqx_session:subscribe(SPid2, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso + ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + emqx_session:publish(SPid1, 1, Message1), + Me = self(), + WaitF = fun(ExpectedPayload) -> + case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of + {true, Pid} -> + Me ! {subscriber, Pid}, + true; + Other -> + Other + end + end, + ?wait(WaitF(<<"hello1">>), 2000), + UsedSubPid1 = receive {subscriber, P1} -> P1 end, + %% publish both messages with SPid1 + emqx_session:publish(SPid1, 2, Message2), + ?wait(WaitF(<<"hello2">>), 2000), + UsedSubPid2 = receive {subscriber, P2} -> P2 end, + case Strategy of + sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2); + round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2); + hash -> ?assert(UsedSubPid1 =:= UsedSubPid2); + _ -> ok + end, + emqx_mock_client:close_session(ConnPid1, SPid1), + emqx_mock_client:close_session(ConnPid2, SPid2), + ok. + +last_message(_ExpectedPayload, []) -> <<"not yet?">>; +last_message(ExpectedPayload, [Pid | Pids]) -> + case emqx_mock_client:get_last_message(Pid) of + {publish, _, #message{payload = ExpectedPayload}} -> {true, Pid}; + _Other -> last_message(ExpectedPayload, Pids) + end. + +%%------------------------------------------------------------------------------ +%% help functions +%%------------------------------------------------------------------------------ + +wait_for(Fn, Ln, F, Timeout) -> + {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end), + wait_for_down(Fn, Ln, Timeout, Pid, Mref, false). + +wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) -> + receive + {'DOWN', Mref, process, Pid, normal} -> + ok; + {'DOWN', Mref, process, Pid, {C, E, S}} -> + erlang:raise(C, {Fn, Ln, E}, S) + after + Timeout -> + case Kill of + true -> + erlang:demonitor(Mref, [flush]), + erlang:exit(Pid, kill), + erlang:error({Fn, Ln, timeout}); + false -> + Pid ! stop, + wait_for_down(Fn, Ln, Timeout, Pid, Mref, true) + end + end. + +wait_loop(_F, true) -> exit(normal); +wait_loop(F, LastRes) -> + Res = catch_call(F), + receive + stop -> erlang:exit(LastRes) + after + 100 -> wait_loop(F, Res) + end. + +catch_call(F) -> + try + case F() of + true -> true; + Other -> erlang:error({unexpected, Other}) + end + catch + C : E : S -> + {C, E, S} + end. +