853 lines
32 KiB
Erlang
853 lines
32 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2018-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_shared_sub_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
-define(SUITE, ?MODULE).
|
|
|
|
-define(ack, shared_sub_ack).
|
|
-define(no_ack, no_ack).
|
|
|
|
-define(WAIT(TIMEOUT, PATTERN, Res),
|
|
(fun() ->
|
|
receive
|
|
PATTERN ->
|
|
Res;
|
|
Other ->
|
|
ct:fail(#{expected => ??PATTERN,
|
|
got => Other
|
|
})
|
|
after
|
|
TIMEOUT ->
|
|
ct:fail({timeout, ??PATTERN})
|
|
end
|
|
end)()).
|
|
|
|
all() -> emqx_ct:all(?SUITE).
|
|
|
|
init_per_suite(Config) ->
|
|
net_kernel:start(['master@127.0.0.1', longnames]),
|
|
emqx_ct_helpers:boot_modules(all),
|
|
PortDiscovery = application:get_env(gen_rpc, port_discovery),
|
|
application:set_env(gen_rpc, port_discovery, stateless),
|
|
application:ensure_all_started(gen_rpc),
|
|
%% ensure emqx_modules app modules are loaded
|
|
%% so the mnesia tables are created
|
|
ok = load_app(emqx_modules),
|
|
emqx_ct_helpers:start_apps([]),
|
|
[{port_discovery, PortDiscovery} | Config].
|
|
|
|
end_per_suite(Config) ->
|
|
emqx_ct_helpers:stop_apps([gen_rpc]),
|
|
case proplists:get_value(port_discovery, Config) of
|
|
{ok, OldValue} -> application:set_env(gen_rpc, port_discovery, OldValue);
|
|
_ -> ok
|
|
end.
|
|
|
|
init_per_testcase(Case, Config) ->
|
|
try
|
|
?MODULE:Case({'init', Config})
|
|
catch
|
|
error : function_clause ->
|
|
Config
|
|
end.
|
|
|
|
end_per_testcase(Case, Config) ->
|
|
try
|
|
?MODULE:Case({'end', Config})
|
|
catch
|
|
error : function_clause ->
|
|
ok
|
|
end.
|
|
|
|
t_is_ack_required(Config) when is_list(Config) ->
|
|
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
|
|
|
|
t_maybe_nack_dropped(Config) when is_list(Config) ->
|
|
?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
|
|
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
|
?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)),
|
|
?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
|
|
|
|
t_nack_no_connection(Config) when is_list(Config) ->
|
|
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
|
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
|
|
?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok
|
|
after 100 -> timeout end).
|
|
|
|
t_maybe_ack(Config) when is_list(Config) ->
|
|
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
|
|
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
|
?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}},
|
|
emqx_shared_sub:maybe_ack(Msg)),
|
|
?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
|
|
|
|
t_random_basic(Config) when is_list(Config) ->
|
|
ok = ensure_config(random),
|
|
ClientId = <<"ClientId">>,
|
|
Topic = <<"foo">>,
|
|
Payload = <<"hello">>,
|
|
emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}),
|
|
MsgQoS2 = emqx_message:make(ClientId, 2, Topic, Payload),
|
|
%% wait for the subscription to show up
|
|
ct:sleep(200),
|
|
?assertEqual(true, subscribed(<<"group1">>, Topic, self())),
|
|
emqx:publish(MsgQoS2),
|
|
receive
|
|
{deliver, Topic0, #message{from = ClientId0,
|
|
payload = Payload0}} = M->
|
|
ct:pal("==== received: ~p", [M]),
|
|
?assertEqual(Topic, Topic0),
|
|
?assertEqual(ClientId, ClientId0),
|
|
?assertEqual(Payload, Payload0)
|
|
after 1000 -> ct:fail(waiting_basic_failed)
|
|
end,
|
|
ok.
|
|
|
|
%% Start two subscribers share subscribe to "$share/g1/foo/bar"
|
|
%% Set 'sticky' dispatch strategy, send 1st message to find
|
|
%% out which member it picked, then close its connection
|
|
%% send the second message, the message should be 'nack'ed
|
|
%% by the sticky session and delivered to the 2nd session.
|
|
%% After the connection for the 2nd session is also closed,
|
|
%% i.e. when all clients are offline, the following message(s)
|
|
%% should be delivered randomly.
|
|
t_no_connection_nack(Config) when is_list(Config) ->
|
|
ok = ensure_config(sticky),
|
|
Publisher = <<"publisher">>,
|
|
Subscriber1 = <<"Subscriber1">>,
|
|
Subscriber2 = <<"Subscriber2">>,
|
|
QoS = 1,
|
|
Group = <<"g1">>,
|
|
Topic = <<"foo/bar">>,
|
|
ShareTopic = <<"$share/", Group/binary, $/, Topic/binary>>,
|
|
|
|
ExpProp = [{properties, #{'Session-Expiry-Interval' => timer:seconds(30)}}],
|
|
{ok, SubConnPid1} = emqtt:start_link([{clientid, Subscriber1}] ++ ExpProp),
|
|
{ok, _} = emqtt:connect(SubConnPid1),
|
|
{ok, SubConnPid2} = emqtt:start_link([{clientid, Subscriber2}] ++ ExpProp),
|
|
{ok, _} = emqtt:connect(SubConnPid2),
|
|
emqtt:subscribe(SubConnPid1, ShareTopic, QoS),
|
|
emqtt:subscribe(SubConnPid1, ShareTopic, QoS),
|
|
|
|
%% wait for the subscriptions to show up
|
|
ct:sleep(200),
|
|
MkPayload = fun(PacketId) ->
|
|
iolist_to_binary(["hello-", integer_to_list(PacketId)])
|
|
end,
|
|
SendF = fun(PacketId) ->
|
|
M = emqx_message:make(Publisher, QoS, Topic, MkPayload(PacketId)),
|
|
emqx:publish(M#message{id = PacketId})
|
|
end,
|
|
SendF(1),
|
|
ct:sleep(200),
|
|
%% This is the connection which was picked by broker to dispatch (sticky) for 1st message
|
|
?assertMatch([#{packet_id := 1}], recv_msgs(1)),
|
|
ok.
|
|
|
|
t_random(Config) when is_list(Config) ->
|
|
ok = ensure_config(random, true),
|
|
test_two_messages(random).
|
|
|
|
t_round_robin(Config) when is_list(Config) ->
|
|
ok = ensure_config(round_robin, true),
|
|
test_two_messages(round_robin).
|
|
|
|
t_sticky(Config) when is_list(Config) ->
|
|
ok = ensure_config(sticky, true),
|
|
test_two_messages(sticky).
|
|
|
|
t_hash(Config) when is_list(Config) ->
|
|
ok = ensure_config(hash, false),
|
|
test_two_messages(hash).
|
|
|
|
t_hash_clinetid(Config) when is_list(Config) ->
|
|
ok = ensure_config(hash_clientid, false),
|
|
test_two_messages(hash_clientid).
|
|
|
|
t_hash_topic(Config) when is_list(Config) ->
|
|
ok = ensure_config(hash_topic, false),
|
|
ClientId1 = <<"ClientId1">>,
|
|
ClientId2 = <<"ClientId2">>,
|
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
|
{ok, _} = emqtt:connect(ConnPid1),
|
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
|
|
{ok, _} = emqtt:connect(ConnPid2),
|
|
|
|
Topic1 = <<"foo/bar1">>,
|
|
Topic2 = <<"foo/bar2">>,
|
|
?assert(erlang:phash2(Topic1) rem 2 =/= erlang:phash2(Topic2) rem 2),
|
|
Message1 = emqx_message:make(ClientId1, 0, Topic1, <<"hello1">>),
|
|
Message2 = emqx_message:make(ClientId1, 0, Topic2, <<"hello2">>),
|
|
emqtt:subscribe(ConnPid1, {<<"$share/group1/foo/#">>, 0}),
|
|
emqtt:subscribe(ConnPid2, {<<"$share/group1/foo/#">>, 0}),
|
|
ct:sleep(100),
|
|
emqx:publish(Message1),
|
|
Me = self(),
|
|
WaitF = fun(ExpectedPayload) ->
|
|
case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
|
|
{true, Pid} ->
|
|
Me ! {subscriber, Pid},
|
|
true;
|
|
Other ->
|
|
Other
|
|
end
|
|
end,
|
|
WaitF(<<"hello1">>),
|
|
UsedSubPid1 = receive {subscriber, P1} -> P1 end,
|
|
emqx_broker:publish(Message2),
|
|
WaitF(<<"hello2">>),
|
|
UsedSubPid2 = receive {subscriber, P2} -> P2 end,
|
|
?assert(UsedSubPid1 =/= UsedSubPid2),
|
|
emqtt:stop(ConnPid1),
|
|
emqtt:stop(ConnPid2),
|
|
ok.
|
|
|
|
%% if the original subscriber dies, change to another one alive
|
|
t_not_so_sticky(Config) when is_list(Config) ->
|
|
ok = ensure_config(sticky),
|
|
ClientId1 = <<"ClientId1">>,
|
|
ClientId2 = <<"ClientId2">>,
|
|
{ok, C1} = emqtt:start_link([{clientid, ClientId1}]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
{ok, C2} = emqtt:start_link([{clientid, ClientId2}]),
|
|
{ok, _} = emqtt:connect(C2),
|
|
|
|
emqtt:subscribe(C1, {<<"$share/group1/foo/bar">>, 0}),
|
|
timer:sleep(50),
|
|
emqtt:publish(C2, <<"foo/bar">>, <<"hello1">>),
|
|
?assertMatch([#{payload := <<"hello1">>}], recv_msgs(1)),
|
|
|
|
emqtt:unsubscribe(C1, <<"$share/group1/foo/bar">>),
|
|
timer:sleep(50),
|
|
emqtt:subscribe(C1, {<<"$share/group1/foo/#">>, 0}),
|
|
timer:sleep(50),
|
|
emqtt:publish(C2, <<"foo/bar">>, <<"hello2">>),
|
|
?assertMatch([#{payload := <<"hello2">>}], recv_msgs(1)),
|
|
emqtt:disconnect(C1),
|
|
emqtt:disconnect(C2),
|
|
ok.
|
|
|
|
test_two_messages(Strategy) ->
|
|
test_two_messages(Strategy, <<"group1">>).
|
|
|
|
test_two_messages(Strategy, Group) ->
|
|
Topic = <<"foo/bar">>,
|
|
ClientId1 = <<"ClientId1">>,
|
|
ClientId2 = <<"ClientId2">>,
|
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
|
|
{ok, _} = emqtt:connect(ConnPid1),
|
|
{ok, _} = emqtt:connect(ConnPid2),
|
|
|
|
emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 0}),
|
|
emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 0}),
|
|
|
|
Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
|
|
Message2 = emqx_message:make(ClientId1, 0, Topic, <<"hello2">>),
|
|
ct:sleep(100),
|
|
|
|
emqx:publish(Message1),
|
|
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
|
|
|
|
emqx:publish(Message2),
|
|
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
|
|
|
|
emqtt:stop(ConnPid1),
|
|
emqtt:stop(ConnPid2),
|
|
|
|
case Strategy of
|
|
sticky -> ?assertEqual(UsedSubPid1, UsedSubPid2);
|
|
round_robin -> ?assertNotEqual(UsedSubPid1, UsedSubPid2);
|
|
hash -> ?assertEqual(UsedSubPid1, UsedSubPid2);
|
|
_ -> ok
|
|
end,
|
|
ok.
|
|
|
|
last_message(ExpectedPayload, Pids) ->
|
|
last_message(ExpectedPayload, Pids, 6000).
|
|
|
|
last_message(ExpectedPayload, Pids, Timeout) ->
|
|
receive
|
|
{publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
|
|
?assert(lists:member(Pid, Pids)),
|
|
{true, Pid}
|
|
after Timeout ->
|
|
ct:pal("not yet"),
|
|
<<"not yet?">>
|
|
end.
|
|
|
|
t_dispatch(Config) when is_list(Config) ->
|
|
ok = ensure_config(random),
|
|
Topic = <<"foo">>,
|
|
?assertEqual({error, no_subscribers},
|
|
emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})),
|
|
emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}),
|
|
?assertEqual({ok, 1},
|
|
emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})).
|
|
|
|
t_uncovered_func(Config) when is_list(Config) ->
|
|
ignored = gen_server:call(emqx_shared_sub, ignored),
|
|
ok = gen_server:cast(emqx_shared_sub, ignored),
|
|
ignored = emqx_shared_sub ! ignored,
|
|
{mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
|
|
|
|
t_per_group_config(Config) when is_list(Config) ->
|
|
ok = ensure_group_config(#{
|
|
<<"local_group_fallback">> => local,
|
|
<<"local_group">> => local,
|
|
<<"round_robin_group">> => round_robin,
|
|
<<"sticky_group">> => sticky
|
|
}),
|
|
%% Each test is repeated 4 times because random strategy may technically pass the test
|
|
%% so we run 8 tests to make random pass in only 1/256 runs
|
|
|
|
test_two_messages(sticky, <<"sticky_group">>),
|
|
test_two_messages(sticky, <<"sticky_group">>),
|
|
test_two_messages(round_robin, <<"round_robin_group">>),
|
|
test_two_messages(round_robin, <<"round_robin_group">>),
|
|
test_two_messages(sticky, <<"sticky_group">>),
|
|
test_two_messages(sticky, <<"sticky_group">>),
|
|
test_two_messages(round_robin, <<"round_robin_group">>),
|
|
test_two_messages(round_robin, <<"round_robin_group">>).
|
|
|
|
t_local({'init', Config}) ->
|
|
Node = start_slave(local_shared_sub_test19, 21884),
|
|
GroupConfig = #{
|
|
<<"local_group_fallback">> => local,
|
|
<<"local_group">> => local,
|
|
<<"round_robin_group">> => round_robin,
|
|
<<"sticky_group">> => sticky
|
|
},
|
|
ok = ensure_group_config(Node, GroupConfig),
|
|
ok = ensure_group_config(GroupConfig),
|
|
[{slave_node, Node} | Config];
|
|
t_local({'end', _Config}) ->
|
|
ok = stop_slave(local_shared_sub_test19);
|
|
t_local(Config) when is_list(Config) ->
|
|
Node = proplists:get_value(slave_node, Config),
|
|
Topic = <<"local_foo1/bar">>,
|
|
ClientId1 = <<"ClientId1">>,
|
|
ClientId2 = <<"ClientId2">>,
|
|
|
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {port, 21884}]),
|
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
|
|
|
|
{ok, _} = emqtt:connect(ConnPid1),
|
|
{ok, _} = emqtt:connect(ConnPid2),
|
|
|
|
emqtt:subscribe(ConnPid1, {<<"$share/local_group/", Topic/binary>>, 0}),
|
|
emqtt:subscribe(ConnPid2, {<<"$share/local_group/", Topic/binary>>, 0}),
|
|
|
|
ct:sleep(100),
|
|
|
|
Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
|
|
Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
|
|
|
|
emqx:publish(Message1),
|
|
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
|
|
|
|
rpc:call(Node, emqx, publish, [Message2]),
|
|
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
|
|
RemoteLocalGroupStrategy = rpc:call(Node, emqx_shared_sub, strategy, [<<"local_group">>]),
|
|
|
|
emqtt:stop(ConnPid1),
|
|
emqtt:stop(ConnPid2),
|
|
emqx_node_helpers:stop_slave(Node),
|
|
|
|
?assertEqual(local, emqx_shared_sub:strategy(<<"local_group">>)),
|
|
?assertEqual(local, RemoteLocalGroupStrategy),
|
|
|
|
?assertNotEqual(UsedSubPid1, UsedSubPid2),
|
|
ok.
|
|
|
|
t_local_fallback({'init', Config}) ->
|
|
ok = ensure_group_config(#{
|
|
<<"local_group_fallback">> => local,
|
|
<<"local_group">> => local,
|
|
<<"round_robin_group">> => round_robin,
|
|
<<"sticky_group">> => sticky
|
|
}),
|
|
|
|
Node = start_slave(local_fallback_shared_sub_test19, 11885),
|
|
[{slave_node, Node} | Config];
|
|
t_local_fallback({'end', _}) ->
|
|
ok = stop_slave(local_fallback_shared_sub_test19);
|
|
t_local_fallback(Config) when is_list(Config) ->
|
|
Topic = <<"local_foo2/bar">>,
|
|
ClientId1 = <<"ClientId1">>,
|
|
ClientId2 = <<"ClientId2">>,
|
|
Node = proplists:get_value(slave_node, Config),
|
|
|
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
|
{ok, _} = emqtt:connect(ConnPid1),
|
|
Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
|
|
Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
|
|
|
|
emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/", Topic/binary>>, 0}),
|
|
ok = emqx_node_helpers:wait_for_synced_routes([node(), Node], Topic, timer:seconds(10)),
|
|
|
|
[{share, Topic, {ok, 1}}] = emqx:publish(Message1),
|
|
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),
|
|
|
|
[{share, Topic, {ok, 1}}] = rpc:call(Node, emqx, publish, [Message2]),
|
|
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1]),
|
|
|
|
emqtt:stop(ConnPid1),
|
|
emqx_node_helpers:stop_slave(Node),
|
|
|
|
?assertEqual(UsedSubPid1, UsedSubPid2),
|
|
ok.
|
|
|
|
%% This one tests that broker tries to select another shared subscriber
|
|
%% If the first one doesn't return an ACK
|
|
t_redispatch_with_ack(Config) when is_list(Config) ->
|
|
test_redispatch(Config, true).
|
|
|
|
t_redispatch_no_ack(Config) when is_list(Config) ->
|
|
test_redispatch(Config, false).
|
|
|
|
test_redispatch(_Config, AckEnabled) ->
|
|
ok = ensure_config(sticky, AckEnabled),
|
|
application:set_env(emqx, shared_dispatch_ack_enabled, true),
|
|
|
|
Group = <<"group1">>,
|
|
|
|
Topic = <<"foo/bar">>,
|
|
ClientId1 = <<"ClientId1">>,
|
|
ClientId2 = <<"ClientId2">>,
|
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
|
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]),
|
|
{ok, _} = emqtt:connect(ConnPid1),
|
|
{ok, _} = emqtt:connect(ConnPid2),
|
|
|
|
emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
|
|
emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
|
|
|
|
Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
|
|
|
|
emqx:publish(Message),
|
|
|
|
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
|
|
ok = emqtt:stop(UsedSubPid1),
|
|
|
|
Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000),
|
|
?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res),
|
|
|
|
{true, UsedSubPid2} = Res,
|
|
emqtt:stop(UsedSubPid2),
|
|
ok.
|
|
|
|
t_redispatch_wildcard_with_ack(Config) when is_list(Config)->
|
|
redispatch_wildcard(Config, true).
|
|
|
|
t_redispatch_wildcard_no_ack(Config) when is_list(Config) ->
|
|
redispatch_wildcard(Config, false).
|
|
|
|
%% This one tests that broker tries to redispatch to another member in the group
|
|
%% if the first one disconnected before acking (auto_ack set to false)
|
|
redispatch_wildcard(_Config, AckEnabled) ->
|
|
ok = ensure_config(sticky, AckEnabled),
|
|
|
|
Group = <<"group1">>,
|
|
|
|
Topic = <<"foo/bar/1">>,
|
|
ClientId1 = <<"ClientId1">>,
|
|
ClientId2 = <<"ClientId2">>,
|
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
|
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]),
|
|
{ok, _} = emqtt:connect(ConnPid1),
|
|
{ok, _} = emqtt:connect(ConnPid2),
|
|
|
|
emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}),
|
|
emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}),
|
|
|
|
Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
|
|
|
|
emqx:publish(Message),
|
|
|
|
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
|
|
ok = emqtt:stop(UsedSubPid1),
|
|
|
|
Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000),
|
|
?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res),
|
|
|
|
{true, UsedSubPid2} = Res,
|
|
emqtt:stop(UsedSubPid2),
|
|
ok.
|
|
|
|
t_dispatch_when_inflights_are_full({init, Config}) ->
|
|
%% make sure broker does not push more than one inflight
|
|
meck:new(emqx_zone, [passthrough, no_history]),
|
|
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
|
|
Config;
|
|
t_dispatch_when_inflights_are_full({'end', _Config}) ->
|
|
meck:unload(emqx_zone);
|
|
t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
|
|
ok = ensure_config(round_robin, _AckEnabled = true),
|
|
Topic = <<"foo/bar">>,
|
|
ClientId1 = <<"ClientId1">>,
|
|
ClientId2 = <<"ClientId2">>,
|
|
|
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
|
|
{ok, _} = emqtt:connect(ConnPid1),
|
|
{ok, _} = emqtt:connect(ConnPid2),
|
|
|
|
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar">>, 2}),
|
|
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar">>, 2}),
|
|
|
|
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
|
|
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
|
|
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
|
|
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
|
|
ct:sleep(100),
|
|
|
|
sys:suspend(ConnPid1),
|
|
sys:suspend(ConnPid2),
|
|
|
|
%% Fill in the inflight for first client
|
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
|
|
|
|
%% Fill in the inflight for second client
|
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
|
|
|
|
%% Now kill any client
|
|
ok = kill_process(ConnPid1),
|
|
|
|
%% And try to send the message
|
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
|
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
|
|
|
|
%% And see that it gets dispatched to the client which is alive, even if it's inflight is full
|
|
sys:resume(ConnPid2),
|
|
ct:sleep(100),
|
|
?assertMatch({true, ConnPid2}, last_message(<<"hello3">>, [ConnPid1, ConnPid2])),
|
|
?assertMatch({true, ConnPid2}, last_message(<<"hello4">>, [ConnPid1, ConnPid2])),
|
|
|
|
emqtt:stop(ConnPid2),
|
|
ok.
|
|
|
|
%% No ack, QoS 2 subscriptions,
|
|
%% client1 receives one message, send pubrec, then suspend
|
|
%% client2 acts normal (auto_ack=true)
|
|
%% Expected behaviour:
|
|
%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
|
|
t_dispatch_qos2({init, Config}) when is_list(Config) ->
|
|
meck:new(emqx_zone, [passthrough, no_history]),
|
|
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
|
|
Config;
|
|
t_dispatch_qos2({'end', Config}) when is_list(Config) ->
|
|
meck:unload(emqx_zone);
|
|
t_dispatch_qos2(Config) when is_list(Config) ->
|
|
ok = ensure_config(round_robin, _AckEnabled = false),
|
|
Topic = <<"foo/bar/1">>,
|
|
ClientId1 = <<"ClientId1">>,
|
|
ClientId2 = <<"ClientId2">>,
|
|
|
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
|
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
|
|
{ok, _} = emqtt:connect(ConnPid1),
|
|
{ok, _} = emqtt:connect(ConnPid2),
|
|
|
|
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
|
|
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
|
|
|
|
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
|
|
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
|
|
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
|
|
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
|
|
ct:sleep(100),
|
|
|
|
ok = sys:suspend(ConnPid1),
|
|
|
|
%% One message is inflight
|
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
|
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
|
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
|
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
|
|
|
|
%% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
|
|
%% on if it's picked as the first one for round_robin
|
|
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
|
|
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
|
|
case MsgRec2 of
|
|
<<"hello3">> ->
|
|
?assertEqual(<<"hello1">>, MsgRec1);
|
|
<<"hello4">> ->
|
|
?assertEqual(<<"hello2">>, MsgRec1)
|
|
end,
|
|
sys:resume(ConnPid1),
|
|
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
|
|
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
|
|
%% the 4th message yet since max_inflight is 1.
|
|
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
|
|
ct:sleep(100),
|
|
%% no message expected
|
|
?assertEqual([], collect_msgs(0)),
|
|
%% now kill client 1
|
|
kill_process(ConnPid1),
|
|
%% client 2 should receive the message
|
|
MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
|
|
case MsgRec2 of
|
|
<<"hello3">> ->
|
|
?assertEqual(<<"hello2">>, MsgRec3),
|
|
?assertEqual(<<"hello4">>, MsgRec4);
|
|
<<"hello4">> ->
|
|
?assertEqual(<<"hello1">>, MsgRec3),
|
|
?assertEqual(<<"hello3">>, MsgRec4)
|
|
end,
|
|
emqtt:stop(ConnPid2),
|
|
ok.
|
|
|
|
t_dispatch_qos0({init, Config}) when is_list(Config) ->
|
|
Config;
|
|
t_dispatch_qos0({'end', Config}) when is_list(Config) ->
|
|
ok;
|
|
t_dispatch_qos0(Config) when is_list(Config) ->
|
|
ok = ensure_config(round_robin, _AckEnabled = false),
|
|
Topic = <<"foo/bar/1">>,
|
|
ClientId1 = <<"ClientId1">>,
|
|
ClientId2 = <<"ClientId2">>,
|
|
|
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
|
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
|
|
{ok, _} = emqtt:connect(ConnPid1),
|
|
{ok, _} = emqtt:connect(ConnPid2),
|
|
|
|
%% subscribe with QoS 0
|
|
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}),
|
|
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}),
|
|
|
|
%% publish with QoS 2, but should be downgraded to 0 as the subscribers
|
|
%% subscribe with QoS 0
|
|
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
|
|
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
|
|
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
|
|
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
|
|
ct:sleep(100),
|
|
|
|
ok = sys:suspend(ConnPid1),
|
|
|
|
?assertMatch([_], emqx:publish(Message1)),
|
|
?assertMatch([_], emqx:publish(Message2)),
|
|
?assertMatch([_], emqx:publish(Message3)),
|
|
?assertMatch([_], emqx:publish(Message4)),
|
|
|
|
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
|
|
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
|
|
%% assert hello2 > hello1 or hello4 > hello3
|
|
?assert(MsgRec2 > MsgRec1),
|
|
|
|
kill_process(ConnPid1),
|
|
%% expect no redispatch
|
|
?assertEqual([], collect_msgs(timer:seconds(2))),
|
|
emqtt:stop(ConnPid2),
|
|
ok.
|
|
|
|
t_session_takeover({init, Config}) when is_list(Config) ->
|
|
Config;
|
|
t_session_takeover({'end', Config}) when is_list(Config) ->
|
|
ok;
|
|
t_session_takeover(Config) when is_list(Config) ->
|
|
Topic = <<"t1/a">>,
|
|
ClientId = iolist_to_binary("c" ++ integer_to_list(erlang:system_time())),
|
|
Opts = [{clientid, ClientId},
|
|
{auto_ack, true},
|
|
{proto_ver, v5},
|
|
{clean_start, false},
|
|
{properties, #{'Session-Expiry-Interval' => 60}}
|
|
],
|
|
{ok, ConnPid1} = emqtt:start_link(Opts),
|
|
%% with the same client ID, start another client
|
|
{ok, ConnPid2} = emqtt:start_link(Opts),
|
|
{ok, _} = emqtt:connect(ConnPid1),
|
|
emqtt:subscribe(ConnPid1, {<<"$share/t1/", Topic/binary>>, _QoS = 1}),
|
|
Message1 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello1">>),
|
|
Message2 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello2">>),
|
|
Message3 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello3">>),
|
|
Message4 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello4">>),
|
|
%% Make sure client1 is functioning
|
|
?assertMatch([_], emqx:publish(Message1)),
|
|
{true, _} = last_message(<<"hello1">>, [ConnPid1]),
|
|
%% Kill client1
|
|
emqtt:stop(ConnPid1),
|
|
%% publish another message (should end up in client1's session)
|
|
?assertMatch([_], emqx:publish(Message2)),
|
|
%% connect client2 (with the same clientid)
|
|
{ok, _} = emqtt:connect(ConnPid2), %% should trigger session take over
|
|
?assertMatch([_], emqx:publish(Message3)),
|
|
?assertMatch([_], emqx:publish(Message4)),
|
|
{true, _} = last_message(<<"hello2">>, [ConnPid2]),
|
|
{true, _} = last_message(<<"hello3">>, [ConnPid2]),
|
|
{true, _} = last_message(<<"hello4">>, [ConnPid2]),
|
|
?assertEqual([], collect_msgs(timer:seconds(2))),
|
|
emqtt:stop(ConnPid2),
|
|
ok.
|
|
|
|
|
|
t_session_kicked({init, Config}) when is_list(Config) ->
|
|
meck:new(emqx_zone, [passthrough, no_history]),
|
|
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
|
|
Config;
|
|
t_session_kicked({'end', Config}) when is_list(Config) ->
|
|
meck:unload(emqx_zone);
|
|
t_session_kicked(Config) when is_list(Config) ->
|
|
ok = ensure_config(round_robin, _AckEnabled = false),
|
|
Topic = <<"foo/bar/1">>,
|
|
ClientId1 = <<"ClientId1">>,
|
|
ClientId2 = <<"ClientId2">>,
|
|
|
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
|
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
|
|
{ok, _} = emqtt:connect(ConnPid1),
|
|
{ok, _} = emqtt:connect(ConnPid2),
|
|
|
|
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
|
|
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
|
|
|
|
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
|
|
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
|
|
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
|
|
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
|
|
ct:sleep(100),
|
|
|
|
ok = sys:suspend(ConnPid1),
|
|
|
|
%% One message is inflight
|
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
|
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
|
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
|
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
|
|
|
|
%% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
|
|
%% on if it's picked as the first one for round_robin
|
|
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
|
|
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
|
|
case MsgRec2 of
|
|
<<"hello3">> ->
|
|
?assertEqual(<<"hello1">>, MsgRec1);
|
|
<<"hello4">> ->
|
|
?assertEqual(<<"hello2">>, MsgRec1)
|
|
end,
|
|
sys:resume(ConnPid1),
|
|
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
|
|
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
|
|
%% the 4th message yet since max_inflight is 1.
|
|
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
|
|
case MsgRec2 of
|
|
<<"hello3">> ->
|
|
?assertEqual(<<"hello2">>, MsgRec3);
|
|
<<"hello4">> ->
|
|
?assertEqual(<<"hello1">>, MsgRec3)
|
|
end,
|
|
%% no message expected
|
|
?assertEqual([], collect_msgs(0)),
|
|
%% now kick client 1
|
|
kill_process(ConnPid1, fun(_Pid) -> emqx_cm:kick_session(ClientId1) end),
|
|
%% client 2 should NOT receive the message
|
|
?assertEqual([], collect_msgs(1000)),
|
|
emqtt:stop(ConnPid2),
|
|
?assertEqual([], collect_msgs(0)),
|
|
ok.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% help functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
kill_process(Pid) ->
|
|
kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end).
|
|
|
|
kill_process(Pid, WithFun) ->
|
|
_ = unlink(Pid),
|
|
_ = monitor(process, Pid),
|
|
_ = WithFun(Pid),
|
|
receive
|
|
{'DOWN', _, process, Pid, _} ->
|
|
ok
|
|
after 10_000 ->
|
|
error(timeout)
|
|
end.
|
|
|
|
collect_msgs(Timeout) ->
|
|
collect_msgs([], Timeout).
|
|
|
|
collect_msgs(Acc, Timeout) ->
|
|
receive
|
|
Msg ->
|
|
collect_msgs([Msg | Acc], Timeout)
|
|
after
|
|
Timeout ->
|
|
lists:reverse(Acc)
|
|
end.
|
|
|
|
ensure_config(Strategy) ->
|
|
ensure_config(Strategy, _AckEnabled = true).
|
|
|
|
ensure_config(Strategy, AckEnabled) ->
|
|
application:set_env(emqx, shared_subscription_strategy, Strategy),
|
|
application:set_env(emqx, shared_dispatch_ack_enabled, AckEnabled),
|
|
ok.
|
|
|
|
ensure_group_config(Node, Group2Strategy) ->
|
|
rpc:call(Node, application, set_env, [emqx, shared_subscription_strategy_per_group, Group2Strategy]).
|
|
|
|
ensure_group_config(Group2Strategy) ->
|
|
application:set_env(emqx, shared_subscription_strategy_per_group, Group2Strategy).
|
|
|
|
subscribed(Group, Topic, Pid) ->
|
|
lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)).
|
|
|
|
recv_msgs(Count) ->
|
|
recv_msgs(Count, []).
|
|
|
|
recv_msgs(0, Msgs) ->
|
|
Msgs;
|
|
recv_msgs(Count, Msgs) ->
|
|
receive
|
|
{publish, Msg} ->
|
|
recv_msgs(Count-1, [Msg|Msgs]);
|
|
_Other -> recv_msgs(Count, Msgs) %%TODO:: remove the branch?
|
|
after 100 ->
|
|
Msgs
|
|
end.
|
|
|
|
start_slave(Name, Port) ->
|
|
ok = emqx_ct_helpers:start_apps([emqx_modules]),
|
|
Listeners = [#{listen_on => {{127,0,0,1}, Port},
|
|
start_apps => [emqx, emqx_modules],
|
|
name => "internal",
|
|
opts => [{zone,internal}],
|
|
proto => tcp}],
|
|
emqx_node_helpers:start_slave(Name, #{listeners => Listeners}).
|
|
|
|
stop_slave(Name) ->
|
|
emqx_node_helpers:stop_slave(Name).
|
|
|
|
load_app(App) ->
|
|
case application:load(App) of
|
|
ok -> ok;
|
|
{error, {already_loaded, _}} -> ok;
|
|
{error, Reason} -> error({failed_to_load_app, App, Reason})
|
|
end.
|