emqx/test/emqx_shared_sub_SUITE.erl

260 lines
11 KiB
Erlang

%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_shared_sub_SUITE).
-export([all/0, init_per_suite/1, end_per_suite/1]).
-export([t_random_basic/1,
t_random/1,
t_round_robin/1,
t_sticky/1,
t_hash/1,
t_not_so_sticky/1,
t_no_connection_nack/1
]).
-include("emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
all() -> [t_random_basic,
t_random,
t_round_robin,
t_sticky,
t_hash,
t_not_so_sticky,
t_no_connection_nack].
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_random_basic(_) ->
ok = ensure_config(random),
ClientId = <<"ClientId">>,
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),
{ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
Message1 = emqx_message:make(<<"ClientId">>, 2, <<"foo">>, <<"hello">>),
emqx_session:subscribe(SPid, [{<<"foo">>, #{qos => 2, share => <<"group1">>}}]),
%% wait for the subscription to show up
?wait(subscribed(<<"group1">>, <<"foo">>, SPid), 1000),
PacketId = 1,
emqx_session:publish(SPid, PacketId, Message1),
?wait(case emqx_mock_client:get_last_message(ConnPid) of
[{publish, 1, _}] -> true;
Other -> Other
end, 1000),
emqx_session:pubrec(SPid, PacketId, reasoncode),
emqx_session:pubcomp(SPid, PacketId, reasoncode),
emqx_mock_client:close_session(ConnPid),
ok.
%% Start two subscribers share subscribe to "$share/g1/foo/bar"
%% Set 'sticky' dispatch strategy, send 1st message to find
%% out which member it picked, then close its connection
%% send the second message, the message should be 'nack'ed
%% by the sticky session and delivered to the 2nd session.
%% After the connection for the 2nd session is also closed,
%% i.e. when all clients are offline, the following message(s)
%% should be delivered randomly.
t_no_connection_nack(_) ->
ok = ensure_config(sticky),
Publisher = <<"publisher">>,
Subscriber1 = <<"Subscriber1">>,
Subscriber2 = <<"Subscriber2">>,
QoS = 1,
Group = <<"g1">>,
Topic = <<"foo/bar">>,
{ok, PubConnPid} = emqx_mock_client:start_link(Publisher),
{ok, SubConnPid1} = emqx_mock_client:start_link(Subscriber1),
{ok, SubConnPid2} = emqx_mock_client:start_link(Subscriber2),
%% allow session to persist after connection shutdown
Attrs = #{expiry_interval => timer:seconds(30)},
{ok, P_Pid} = emqx_mock_client:open_session(PubConnPid, Publisher, internal, Attrs),
{ok, SPid1} = emqx_mock_client:open_session(SubConnPid1, Subscriber1, internal, Attrs),
{ok, SPid2} = emqx_mock_client:open_session(SubConnPid2, Subscriber2, internal, Attrs),
emqx_session:subscribe(SPid1, [{Topic, #{qos => QoS, share => Group}}]),
emqx_session:subscribe(SPid2, [{Topic, #{qos => QoS, share => Group}}]),
%% wait for the subscriptions to show up
?wait(subscribed(Group, Topic, SPid1), 1000),
?wait(subscribed(Group, Topic, SPid2), 1000),
MkPayload = fun(PacketId) -> iolist_to_binary(["hello-", integer_to_list(PacketId)]) end,
SendF = fun(PacketId) -> emqx_session:publish(P_Pid, PacketId, emqx_message:make(Publisher, QoS, Topic, MkPayload(PacketId))) end,
SendF(1),
Ref = make_ref(),
CasePid = self(),
Received =
fun(PacketId, ConnPid) ->
Payload = MkPayload(PacketId),
case emqx_mock_client:get_last_message(ConnPid) of
[{publish, _, #message{payload = Payload}}] ->
CasePid ! {Ref, PacketId, ConnPid},
true;
_Other ->
false
end
end,
?wait(Received(1, SubConnPid1) orelse Received(1, SubConnPid2), 1000),
%% This is the connection which was picked by broker to dispatch (sticky) for 1st message
ConnPid = receive {Ref, 1, Pid} -> Pid after 1000 -> error(timeout) end,
%% Now kill the connection, expect all following messages to be delivered to the other subscriber.
emqx_mock_client:stop(ConnPid),
%% sleep then make synced calls to session processes to ensure that
%% the connection pid's 'EXIT' message is propagated to the session process
%% also to be sure sessions are still alive
timer:sleep(2),
_ = emqx_session:info(SPid1),
_ = emqx_session:info(SPid2),
%% Now we know what is the other still alive connection
[TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid],
%% Send some more messages
PacketIdList = lists:seq(2, 10),
lists:foreach(fun(Id) ->
SendF(Id),
?wait(Received(Id, TheOtherConnPid), 1000)
end, PacketIdList),
%% Now close the 2nd (last connection)
emqx_mock_client:stop(TheOtherConnPid),
timer:sleep(2),
%% both sessions should have conn_pid = undefined
?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
%% send more messages, but all should be queued in session state
lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
{_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
{_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
?assertEqual(length(PacketIdList), L1 + L2),
%% clean up
emqx_mock_client:close_session(PubConnPid),
emqx_sm:close_session(SPid1),
emqx_sm:close_session(SPid2),
ok.
t_random(_) ->
test_two_messages(random).
t_round_robin(_) ->
test_two_messages(round_robin).
t_sticky(_) ->
test_two_messages(sticky).
t_hash(_) ->
test_two_messages(hash, false).
%% if the original subscriber dies, change to another one alive
t_not_so_sticky(_) ->
ok = ensure_config(sticky),
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqx_mock_client:start_link(ClientId1),
{ok, ConnPid2} = emqx_mock_client:start_link(ClientId2),
{ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal),
{ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal),
Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>),
emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]),
%% wait for the subscription to show up
?wait(subscribed(<<"group1">>, <<"foo/bar">>, SPid1), 1000),
emqx_session:publish(SPid1, 1, Message1),
?wait(case emqx_mock_client:get_last_message(ConnPid1) of
[{publish, _, #message{payload = <<"hello1">>}}] -> true;
Other -> Other
end, 1000),
emqx_mock_client:close_session(ConnPid1),
?wait(not subscribed(<<"group1">>, <<"foo/bar">>, SPid1), 1000),
emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]),
?wait(subscribed(<<"group1">>, <<"foo/#">>, SPid2), 1000),
emqx_session:publish(SPid2, 2, Message2),
?wait(case emqx_mock_client:get_last_message(ConnPid2) of
[{publish, _, #message{payload = <<"hello2">>}}] -> true;
Other -> Other
end, 1000),
emqx_mock_client:close_session(ConnPid2),
?wait(not subscribed(<<"group1">>, <<"foo/#">>, SPid2), 1000),
ok.
test_two_messages(Strategy) ->
test_two_messages(Strategy, _WithAck = true).
test_two_messages(Strategy, WithAck) ->
ok = ensure_config(Strategy, WithAck),
Topic = <<"foo/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqx_mock_client:start_link(ClientId1),
{ok, ConnPid2} = emqx_mock_client:start_link(ClientId2),
{ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal),
{ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal),
Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 0, Topic, <<"hello2">>),
emqx_session:subscribe(SPid1, [{Topic, #{qos => 0, share => <<"group1">>}}]),
emqx_session:subscribe(SPid2, [{Topic, #{qos => 0, share => <<"group1">>}}]),
%% wait for the subscription to show up
?wait(subscribed(<<"group1">>, Topic, SPid1) andalso
subscribed(<<"group1">>, Topic, SPid2), 1000),
emqx_broker:publish(Message1),
Me = self(),
WaitF = fun(ExpectedPayload) ->
case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
{true, Pid} ->
Me ! {subscriber, Pid},
true;
Other ->
Other
end
end,
?wait(WaitF(<<"hello1">>), 2000),
UsedSubPid1 = receive {subscriber, P1} -> P1 end,
emqx_broker:publish(Message2),
?wait(WaitF(<<"hello2">>), 2000),
UsedSubPid2 = receive {subscriber, P2} -> P2 end,
case Strategy of
sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2);
round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2);
hash -> ?assert(UsedSubPid1 =:= UsedSubPid2);
_ -> ok
end,
emqx_mock_client:close_session(ConnPid1),
emqx_mock_client:close_session(ConnPid2),
ok.
last_message(_ExpectedPayload, []) -> <<"not yet?">>;
last_message(ExpectedPayload, [Pid | Pids]) ->
case emqx_mock_client:get_last_message(Pid) of
[{publish, _, #message{payload = ExpectedPayload}}] -> {true, Pid};
_Other -> last_message(ExpectedPayload, Pids)
end.
%%------------------------------------------------------------------------------
%% help functions
%%------------------------------------------------------------------------------
ensure_config(Strategy) ->
ensure_config(Strategy, _AckEnabled = true).
ensure_config(Strategy, AckEnabled) ->
application:set_env(emqx, shared_subscription_strategy, Strategy),
application:set_env(emqx, shared_dispatch_ack_enabled, AckEnabled),
ok.
subscribed(Group, Topic, Pid) ->
lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)).