From b35d37c92d03ed127c21dd17b5f440a9ee134843 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Thu, 13 Sep 2018 18:20:41 +0200 Subject: [PATCH 1/2] Add new shared subscription dispatch strategy 'random' was already there before this change Added two new strategies: 'sticky' and 'round_robin' 'sticky' is made default as it is the cheapest --- Makefile | 16 +-- priv/emqx.schema | 11 +- src/emqx_shared_sub.erl | 66 +++++++++--- test/emqx_mock_client.erl | 9 +- test/emqx_shared_sub_SUITE.erl | 186 +++++++++++++++++++++++++++++++++ 5 files changed, 264 insertions(+), 24 deletions(-) create mode 100644 test/emqx_shared_sub_SUITE.erl diff --git a/Makefile b/Makefile index 54b13727a..bb866a898 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \ - emqx_mountpoint emqx_listeners emqx_protocol emqx_pool + emqx_mountpoint emqx_listeners emqx_protocol emqx_pool emqx_shared_sub CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) @@ -84,23 +84,23 @@ rebar-cover: coveralls: @rebar3 coveralls send -cuttlefish: deps - @mv ./deps/cuttlefish/cuttlefish ./cuttlefish -rebar-cuttlefish: rebar-deps - @make -C _build/default/lib/cuttlefish - @mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish +cuttlefish: rebar-deps + @if [ ! -f cuttlefish ]; then \ + make -C _build/default/lib/cuttlefish; \ + mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish; \ + fi rebar-deps: @rebar3 get-deps -rebar-eunit: rebar-cuttlefish +rebar-eunit: cuttlefish @rebar3 eunit rebar-compile: @rebar3 compile -rebar-ct: rebar-cuttlefish app.config +rebar-ct: cuttlefish app.config @rebar3 as test compile @ln -s -f '../../../../etc' _build/test/lib/emqx/ @ln -s -f '../../../../data' _build/test/lib/emqx/ diff --git a/priv/emqx.schema b/priv/emqx.schema index c5afd9b43..d0a54c6e0 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1726,9 +1726,16 @@ end}. {datatype, {enum, [local,one,quorum,all]}} ]}. +%% @doc Shared Subscription Dispatch Strategy. {mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [ - {default, random}, - {datatype, {enum, [random, round_robbin, hash]}} + {default, round_robbin}, + {datatype, + {enum, + [random, %% randomly pick a subscriber + round_robbin, %% round robin alive subscribers one message after another + sticky, %% pick a random subscriber and stick to it + hash %% hash client ID to a group member + ]}} ]}. {mapping, "broker.route_batch_clean", "emqx.route_batch_clean", [ diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 0cbfab60a..096af9243 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -26,7 +26,6 @@ -export([start_link/0]). --export([strategy/0]). -export([subscribe/3, unsubscribe/3]). -export([dispatch/3]). @@ -36,6 +35,7 @@ -define(SERVER, ?MODULE). -define(TAB, emqx_shared_subscription). +-define(ALIVE_SUBS, emqx_alive_shared_subscribers). -record(state, {pmon}). -record(emqx_shared_subscription, {group, topic, subpid}). @@ -62,9 +62,9 @@ mnesia(copy) -> start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec(strategy() -> round_robin | random | hash). +-spec(strategy() -> random | round_robin | stiky | hash). strategy() -> - emqx_config:get_env(shared_subscription_strategy, random). + emqx_config:get_env(shared_subscription_strategy, round_robin). subscribe(undefined, _Topic, _SubPid) -> ok; @@ -80,23 +80,56 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. -%% TODO: dispatch strategy, ensure the delivery... dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}) -> - case pick(subscribers(Group, Topic)) of + #message{from = ClientId} = Msg, + case pick(strategy(), ClientId, Group, Topic) of false -> Delivery; SubPid -> SubPid ! {dispatch, Topic, Msg}, Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]} end. -pick([]) -> - false; -pick([SubPid]) -> - SubPid; -pick(SubPids) -> - lists:nth(rand:uniform(length(SubPids)), SubPids). +pick(sticky, ClientId, Group, Topic) -> + Sub0 = erlang:get(shared_sub_sticky), + case is_sub_alive(Sub0) of + true -> + %% the old subscriber is still alive + %% keep using it for sticky strategy + Sub0; + false -> + %% randomly pick one for the first message + Sub = do_pick(random, ClientId, Group, Topic), + %% stick to whatever pick result + erlang:put(shared_sub_sticky, Sub), + Sub + end; +pick(Strategy, ClientId, Group, Topic) -> + do_pick(Strategy, ClientId, Group, Topic). + +do_pick(Strategy, ClientId, Group, Topic) -> + All = subscribers(Group, Topic), + pick_subscriber(Strategy, ClientId, All). + +pick_subscriber(_, _ClientId, []) -> false; +pick_subscriber(_, _ClientId, [Sub]) -> Sub; +pick_subscriber(Strategy, ClientId, Subs) -> + Nth = do_pick_subscriber(Strategy, ClientId, length(Subs)), + lists:nth(Nth, Subs). + +do_pick_subscriber(random, _ClientId, Count) -> + rand:uniform(Count); +do_pick_subscriber(hash, ClientId, Count) -> + 1 + erlang:phash2(ClientId) rem Count; +do_pick_subscriber(round_robin, _ClientId, Count) -> + Rem = case erlang:get(shared_sub_round_robin) of + undefined -> 0; + N -> (N + 1) rem Count + end, + _ = erlang:put(shared_sub_round_robin, Rem), + Rem + 1. subscribers(Group, Topic) -> ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). + %%----------------------------------------------------------------------------- %% gen_server callbacks %%----------------------------------------------------------------------------- @@ -104,6 +137,7 @@ subscribers(Group, Topic) -> init([]) -> {atomic, PMon} = mnesia:transaction(fun init_monitors/0), mnesia:subscribe({table, ?TAB, simple}), + ets:new(?ALIVE_SUBS, [named_table, {read_concurrency, true}, protected]), {ok, update_stats(#state{pmon = PMon})}. init_monitors() -> @@ -117,8 +151,9 @@ handle_call(Req, _From, State) -> {reply, ignored, State}. handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) -> - {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; - + NewPmon = emqx_pmon:monitor(SubPid, PMon), + ets:insert(?ALIVE_SUBS, {SubPid}), + {noreply, update_stats(State#state{pmon = NewPmon})}; handle_cast(Msg, State) -> emqx_logger:error("[SharedSub] unexpected cast: ~p", [Msg]), {noreply, State}. @@ -154,6 +189,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- cleanup_down(SubPid) -> + ets:delete(?ALIVE_SUBS, SubPid), lists:foreach( fun(Record) -> mnesia:dirty_delete_object(?TAB, Record) @@ -162,3 +198,7 @@ cleanup_down(SubPid) -> update_stats(State) -> emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State. +%% erlang:is_process_alive/1 is expensive +%% and does not work with remote pids +is_sub_alive(Sub) -> [] =/= ets:lookup(?ALIVE_SUBS, Sub). + diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index 633e42b3f..ad1e2d22d 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -16,7 +16,8 @@ -behaviour(gen_server). --export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0]). +-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0, + try_get_last_message/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,6 +42,12 @@ get_last_message() -> [{last_message, Msg}] = ets:lookup(?TAB, last_message), Msg. +try_get_last_message() -> + case ets:lookup(?TAB, last_message) of + [{last_message, Msg}] -> Msg; + [] -> false + end. + init([ClientId]) -> Result = lists:member(?TAB, ets:all()), if Result == false -> diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl new file mode 100644 index 000000000..b44a00680 --- /dev/null +++ b/test/emqx_shared_sub_SUITE.erl @@ -0,0 +1,186 @@ + +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_shared_sub_SUITE). + +-export([all/0, init_per_suite/1, end_per_suite/1]). +-export([t_random_basic/1, t_random/1, t_round_robin/1, t_sticky/1, t_hash/1, t_not_so_sticky/1]). + +-include("emqx.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). + +all() -> [t_random_basic, t_random, t_round_robin, t_sticky, t_hash, t_not_so_sticky]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +t_random(_) -> + application:set_env(?APPLICATION, shared_subscription_strategy, random), + ClientId = <<"ClientId">>, + {ok, ConnPid} = emqx_mock_client:start_link(ClientId), + {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), + Message1 = emqx_message:make(<<"ClientId">>, 2, <<"foo">>, <<"hello">>), + emqx_session:subscribe(SPid, [{<<"foo">>, #{qos => 2, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid) =:= [{SPid}], 1000), + emqx_session:publish(SPid, 1, Message1), + ?wait(case emqx_mock_client:try_get_last_message() of + {publish, 1, _} -> true; + Other -> Other + end, 1000), + emqx_session:puback(SPid, 2), + emqx_session:puback(SPid, 3, reasoncode), + emqx_session:pubrec(SPid, 4), + emqx_session:pubrec(SPid, 5, reasoncode), + emqx_session:pubrel(SPid, 6, reasoncode), + emqx_session:pubcomp(SPid, 7, reasoncode), + emqx_mock_client:close_session(ConnPid, SPid), + ok. + +t_round_robin(_) -> + test_two_messages(round_robin). + +t_sticky(_) -> + test_two_messages(sticky). + +t_hash(_) -> + test_two_messages(hash). + +%% if the original subscriber dies, change to another one alive +t_not_so_sticky(_) -> + application:set_env(?APPLICATION, shared_subscription_strategy, sticky), + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1), + {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2), + {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal), + {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal), + Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>), + emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), + emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso + ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + emqx_session:publish(SPid1, 1, Message1), + ?wait(case emqx_mock_client:try_get_last_message() of + {publish, _, #message{payload = <<"hello1">>}} -> true; + Other -> Other + end, 1000), + emqx_session:publish(SPid1, 2, Message2), + ?wait(case emqx_mock_client:try_get_last_message() of + {publish, _, #message{payload = <<"hello2">>}} -> true; + Other -> Other + end, 1000), + emqx_mock_client:close_session(ConnPid2, SPid2), + ?wait(ets:tab2list(emqx_alive_shared_subscribers) =:= [], 1000), + ok. + +test_two_messages(Strategy) -> + application:set_env(?APPLICATION, shared_subscription_strategy, Strategy), + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1), + {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2), + {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal), + {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal), + Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>), + emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), + emqx_session:subscribe(SPid2, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso + ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + emqx_session:publish(SPid1, 1, Message1), + Me = self(), + WaitF = fun(ExpectedPayload) -> + case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of + {true, Pid} -> + Me ! {subscriber, Pid}, + true; + Other -> + Other + end + end, + ?wait(WaitF(<<"hello1">>), 2000), + UsedSubPid1 = receive {subscriber, P1} -> P1 end, + %% publish both messages with SPid1 + emqx_session:publish(SPid1, 2, Message2), + ?wait(WaitF(<<"hello2">>), 2000), + UsedSubPid2 = receive {subscriber, P2} -> P2 end, + case Strategy of + sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2); + round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2); + hash -> ?assert(UsedSubPid1 =:= UsedSubPid2); + _ -> ok + end, +>>>>>>> 38d0d409... Add 'hash' option for shared subscription + emqx_mock_client:close_session(ConnPid1, SPid1), + emqx_mock_client:close_session(ConnPid2, SPid2), + ok. + +%%------------------------------------------------------------------------------ +%% help functions +%%------------------------------------------------------------------------------ + +wait_for(Fn, Ln, F, Timeout) -> + {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end), + wait_for_down(Fn, Ln, Timeout, Pid, Mref, false). + +wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) -> + receive + {'DOWN', Mref, process, Pid, normal} -> + ok; + {'DOWN', Mref, process, Pid, {C, E, S}} -> + erlang:raise(C, {Fn, Ln, E}, S) + after + Timeout -> + case Kill of + true -> + erlang:demonitor(Mref, [flush]), + erlang:exit(Pid, kill), + erlang:error({Fn, Ln, timeout}); + false -> + Pid ! stop, + wait_for_down(Fn, Ln, Timeout, Pid, Mref, true) + end + end. + +wait_loop(_F, true) -> exit(normal); +wait_loop(F, LastRes) -> + Res = catch_call(F), + receive + stop -> erlang:exit(LastRes) + after + 100 -> wait_loop(F, Res) + end. + +catch_call(F) -> + try + case F() of + true -> true; + Other -> erlang:error({unexpected, Other}) + end + catch + C : E : S -> + {C, E, S} + end. + From 3b9247994d018e4c665b74a44b09152fa02ddfe2 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Sat, 15 Sep 2018 15:02:16 +0200 Subject: [PATCH 2/2] Refine emqx_mock_client Before this change, eqmx_mock_client uses a shared ets table to store last received message, this causes troulbe when we want to start / stop two or more clients in one test case the ets table gets owned by the first spanwed client and gets closed when the owner client dies. Now it keeps the last received message in process state and a gen_server call is added to retrieve it for verification Along with this change in emqx_mock_client, it made possible to write test case to verify the actual subscriber pid used in shared subscription strategy, so test cases were added (and modified) to verify different strategies --- test/emqx_mock_client.erl | 50 ++++++++++++---------------------- test/emqx_session_SUITE.erl | 2 +- test/emqx_shared_sub_SUITE.erl | 30 ++++++++++++++------ 3 files changed, 39 insertions(+), 43 deletions(-) diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index ad1e2d22d..4528239e6 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -16,15 +16,12 @@ -behaviour(gen_server). --export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0, - try_get_last_message/0]). +-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {clean_start, client_id, client_pid}). - --define(TAB, messages). +-record(state, {clean_start, client_id, client_pid, last_msg}). start_link(ClientId) -> gen_server:start_link(?MODULE, [ClientId], []). @@ -38,25 +35,14 @@ close_session(ClientPid, SessPid) -> stop(CPid) -> gen_server:call(CPid, stop). -get_last_message() -> - [{last_message, Msg}] = ets:lookup(?TAB, last_message), - Msg. - -try_get_last_message() -> - case ets:lookup(?TAB, last_message) of - [{last_message, Msg}] -> Msg; - [] -> false - end. +get_last_message(Pid) -> + gen_server:call(Pid, get_last_message). init([ClientId]) -> - Result = lists:member(?TAB, ets:all()), - if Result == false -> - ets:new(?TAB, [set, named_table, public]); - true -> ok - end, - {ok, - #state{clean_start = true, - client_id = ClientId} + {ok, #state{clean_start = true, + client_id = ClientId, + last_msg = undefined + } }. handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> @@ -68,28 +54,26 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> expiry_interval => 0 }, {ok, SessPid} = emqx_sm:open_session(Attrs), - {reply, {ok, SessPid}, State#state{ - clean_start = true, - client_id = ClientId, - client_pid = ClientPid - }}; - + {reply, {ok, SessPid}, + State#state{clean_start = true, + client_id = ClientId, + client_pid = ClientPid + }}; handle_call({stop_session, SessPid}, _From, State) -> emqx_sm:close_session(SessPid), {stop, normal, ok, State}; - +handle_call(get_last_message, _From, #state{last_msg = Msg} = State) -> + {reply, Msg, State}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; - handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. -handle_info({_, Msg}, State) -> - ets:insert(?TAB, {last_message, Msg}), - {noreply, State}; +handle_info({deliver, Msg}, State) -> + {noreply, State#state{last_msg = Msg}}; handle_info(_Info, State) -> {noreply, State}. diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 2b60b747d..f79b84557 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -40,7 +40,7 @@ t_session_all(_) -> [{<<"topic">>, _}] = emqx:subscriptions({SPid, <<"ClientId">>}), emqx_session:publish(SPid, 1, Message1), timer:sleep(200), - {publish, 1, _} = emqx_mock_client:get_last_message(), + {publish, 1, _} = emqx_mock_client:get_last_message(ConnPid), emqx_session:puback(SPid, 2), emqx_session:puback(SPid, 3, reasoncode), emqx_session:pubrec(SPid, 4), diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index b44a00680..8eb309001 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -19,6 +19,7 @@ -export([t_random_basic/1, t_random/1, t_round_robin/1, t_sticky/1, t_hash/1, t_not_so_sticky/1]). -include("emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). @@ -32,7 +33,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). -t_random(_) -> +t_random_basic(_) -> application:set_env(?APPLICATION, shared_subscription_strategy, random), ClientId = <<"ClientId">>, {ok, ConnPid} = emqx_mock_client:start_link(ClientId), @@ -42,7 +43,7 @@ t_random(_) -> %% wait for the subscription to show up ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid) =:= [{SPid}], 1000), emqx_session:publish(SPid, 1, Message1), - ?wait(case emqx_mock_client:try_get_last_message() of + ?wait(case emqx_mock_client:get_last_message(ConnPid) of {publish, 1, _} -> true; Other -> Other end, 1000), @@ -55,6 +56,9 @@ t_random(_) -> emqx_mock_client:close_session(ConnPid, SPid), ok. +t_random(_) -> + test_two_messages(random). + t_round_robin(_) -> test_two_messages(round_robin). @@ -76,17 +80,19 @@ t_not_so_sticky(_) -> Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>), Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>), emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), - emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]), %% wait for the subscription to show up - ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso - ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}], 1000), emqx_session:publish(SPid1, 1, Message1), - ?wait(case emqx_mock_client:try_get_last_message() of + ?wait(case emqx_mock_client:get_last_message(ConnPid1) of {publish, _, #message{payload = <<"hello1">>}} -> true; Other -> Other end, 1000), - emqx_session:publish(SPid1, 2, Message2), - ?wait(case emqx_mock_client:try_get_last_message() of + emqx_mock_client:close_session(ConnPid1, SPid1), + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [], 1000), + emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]), + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + emqx_session:publish(SPid2, 2, Message2), + ?wait(case emqx_mock_client:get_last_message(ConnPid2) of {publish, _, #message{payload = <<"hello2">>}} -> true; Other -> Other end, 1000), @@ -132,11 +138,17 @@ test_two_messages(Strategy) -> hash -> ?assert(UsedSubPid1 =:= UsedSubPid2); _ -> ok end, ->>>>>>> 38d0d409... Add 'hash' option for shared subscription emqx_mock_client:close_session(ConnPid1, SPid1), emqx_mock_client:close_session(ConnPid2, SPid2), ok. +last_message(_ExpectedPayload, []) -> <<"not yet?">>; +last_message(ExpectedPayload, [Pid | Pids]) -> + case emqx_mock_client:get_last_message(Pid) of + {publish, _, #message{payload = ExpectedPayload}} -> {true, Pid}; + _Other -> last_message(ExpectedPayload, Pids) + end. + %%------------------------------------------------------------------------------ %% help functions %%------------------------------------------------------------------------------