Merge pull request #9104 from zmstone/1005-fix-shared-sub-dispatch

fix(shared): re-dispatch inflight (QoS1) and mqueue messages
This commit is contained in:
Zaiming (Stone) Shi 2022-10-09 11:41:30 +02:00 committed by GitHub
commit 63774ba5d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 322 additions and 105 deletions

View File

@ -11,6 +11,7 @@
* Fix GET /listeners API crash When some nodes still in initial configuration. [#9002](https://github.com/emqx/emqx/pull/9002)
* Fix empty variable interpolation in authentication and authorization. Placeholders for undefined variables are rendered now as empty strings and do not cause errors anymore. [#8963](https://github.com/emqx/emqx/pull/8963)
* Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8986](https://github.com/emqx/emqx/pull/8986)
* Redispatch shared subscription messages. [#9104](https://github.com/emqx/emqx/pull/9104)
# 5.0.8

View File

@ -997,8 +997,13 @@ maybe_nack(Delivers) ->
lists:filter(fun not_nacked/1, Delivers).
not_nacked({deliver, _Topic, Msg}) ->
not (emqx_shared_sub:is_ack_required(Msg) andalso
(ok == emqx_shared_sub:nack_no_connection(Msg))).
case emqx_shared_sub:is_ack_required(Msg) of
true ->
ok = emqx_shared_sub:nack_no_connection(Msg),
false;
false ->
true
end.
maybe_mark_as_delivered(Session, Delivers) ->
case emqx_session:info(is_persistent, Session) of
@ -1222,6 +1227,8 @@ handle_call(
ChanInfo1 = info(NChannel),
emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}),
reply(ok, reset_timer(alive_timer, NChannel));
handle_call(get_mqueue, Channel) ->
reply({ok, get_mqueue(Channel)}, Channel);
handle_call(Req, Channel) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
reply(ignored, Channel).
@ -2224,3 +2231,6 @@ get_mqtt_conf(Zone, Key, Default) ->
set_field(Name, Value, Channel) ->
Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
setelement(Pos + 1, Channel, Value).
get_mqueue(#channel{session = Session}) ->
emqx_session:get_mqueue(Session).

View File

@ -66,7 +66,8 @@
in/2,
out/1,
stats/1,
dropped/1
dropped/1,
to_list/1
]).
-define(NO_PRIORITY_TABLE, disabled).
@ -109,7 +110,7 @@
dropped = 0 :: count(),
p_table = ?NO_PRIORITY_TABLE :: p_table(),
default_p = ?LOWEST_PRIORITY :: priority(),
q = ?PQUEUE:new() :: pq(),
q = emqx_pqueue:new() :: pq(),
shift_opts :: #shift_opts{},
last_prio :: non_neg_integer() | undefined,
p_credit :: non_neg_integer() | undefined
@ -118,7 +119,7 @@
-type mqueue() :: #mqueue{}.
-spec init(options()) -> mqueue().
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
init(Opts = #{max_len := MaxLen0, store_qos0 := Qos0}) ->
MaxLen =
case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
true -> MaxLen0;
@ -126,7 +127,7 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
end,
#mqueue{
max_len = MaxLen,
store_qos0 = QoS_0,
store_qos0 = Qos0,
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
default_p = get_priority_opt(Opts),
shift_opts = get_shift_opt(Opts)
@ -152,6 +153,19 @@ len(#mqueue{len = Len}) -> Len.
max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
%% @doc Return all queued items in a list.
-spec to_list(mqueue()) -> list().
to_list(MQ) ->
to_list(MQ, []).
to_list(MQ, Acc) ->
case out(MQ) of
{empty, _MQ} ->
lists:reverse(Acc);
{{value, Msg}, Q1} ->
to_list(Q1, [Msg | Acc])
end.
%% @doc Return number of dropped messages.
-spec dropped(mqueue()) -> count().
dropped(#mqueue{dropped = Dropped}) -> Dropped.

View File

@ -60,7 +60,8 @@
info/2,
is_session/1,
stats/1,
obtain_next_pkt_id/1
obtain_next_pkt_id/1,
get_mqueue/1
]).
-export([
@ -801,6 +802,7 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
-spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok.
terminate(ClientInfo, Reason, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session),
Reason =/= takenover andalso
redispatch_shared_messages(Session),
ok.
@ -811,29 +813,20 @@ run_terminate_hooks(ClientInfo, takenover, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session) ->
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
redispatch_shared_messages(#session{inflight = Inflight}) ->
InflightList = emqx_inflight:to_list(Inflight),
lists:foreach(
fun
%% Only QoS1 messages get redispatched, because QoS2 messages
%% must be sent to the same client, once they're in flight
({_, #inflight_data{message = #message{qos = ?QOS_2} = Msg}}) ->
?SLOG(warning, #{msg => qos2_lost_no_redispatch}, #{message => Msg});
({_, #inflight_data{message = #message{topic = Topic, qos = ?QOS_1} = Msg}}) ->
case emqx_shared_sub:get_group(Msg) of
{ok, Group} ->
%% Note that dispatch is called with self() in failed subs
%% This is done to avoid dispatching back to caller
Delivery = #delivery{sender = self(), message = Msg},
emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]);
_ ->
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
AllInflights = emqx_inflight:to_list(fun sort_fun/2, Inflight),
F = fun
({_PacketId, #inflight_data{message = #message{qos = ?QOS_1} = Msg}}) ->
%% For QoS 2, here is what the spec says:
%% If the Client's Session terminates before the Client reconnects,
%% the Server MUST NOT send the Application Message to any other
%% subscribed Client [MQTT-4.8.2-5].
{true, Msg};
({_PacketId, #inflight_data{}}) ->
false
end;
(_) ->
ok
end,
InflightList
).
InflightList = lists:filtermap(F, AllInflights),
emqx_shared_sub:redispatch(InflightList ++ emqx_mqueue:to_list(Q)).
-compile({inline, [run_hook/2]}).
run_hook(Name, Args) ->
@ -925,3 +918,6 @@ age(Now, Ts) -> Now - Ts.
set_field(Name, Value, Session) ->
Pos = emqx_misc:index_of(Name, record_info(fields, session)),
setelement(Pos + 1, Session, Value).
get_mqueue(#session{mqueue = Q}) ->
emqx_mqueue:to_list(Q).

View File

@ -39,15 +39,15 @@
-export([
dispatch/3,
dispatch/4,
do_dispatch_with_ack/4
do_dispatch_with_ack/4,
redispatch/1
]).
-export([
maybe_ack/1,
maybe_nack_dropped/1,
nack_no_connection/1,
is_ack_required/1,
get_group/1
is_ack_required/1
]).
%% for testing
@ -96,6 +96,9 @@
-define(ACK, shared_sub_ack).
-define(NACK(Reason), {shared_sub_nack, Reason}).
-define(NO_ACK, no_ack).
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()).
-record(state, {pmon}).
@ -144,7 +147,8 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
false ->
{error, no_subscribers};
{Type, SubPid} ->
case do_dispatch(SubPid, Group, Topic, Msg, Type) of
Msg1 = with_redispatch_to(Msg, Group, Topic),
case do_dispatch(SubPid, Group, Topic, Msg1, Type) of
ok ->
{ok, 1};
{error, _Reason} ->
@ -223,16 +227,53 @@ without_group_ack(Msg) ->
get_group_ack(Msg) ->
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) ->
Msg;
with_redispatch_to(Msg, Group, Topic) ->
emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
%% @hidden Redispatch is neede only for the messages with redispatch_to header added.
is_redispatch_needed(#message{} = Msg) ->
case get_redispatch_to(Msg) of
?REDISPATCH_TO(_, _) ->
true;
_ ->
false
end.
%% @doc Redispatch shared deliveries to other members in the group.
redispatch(Messages0) ->
Messages = lists:filter(fun is_redispatch_needed/1, Messages0),
case length(Messages) of
L when L > 0 ->
?SLOG(info, #{
msg => "redispatching_shared_subscription_message",
count => L
}),
lists:foreach(fun redispatch_shared_message/1, Messages);
_ ->
ok
end.
redispatch_shared_message(#message{} = Msg) ->
%% As long as it's still a #message{} record in inflight,
%% we should try to re-dispatch
?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg),
%% Note that dispatch is called with self() in failed subs
%% This is done to avoid dispatching back to caller
Delivery = #delivery{sender = self(), message = Msg},
dispatch(Group, Topic, Delivery, [self()]).
%% @hidden Return the `redispatch_to` group-topic in the message header.
%% `false` is returned if the message is not a shared dispatch.
%% or when it's a QoS 0 message.
-spec get_redispatch_to(emqx_types:message()) -> redispatch_to() | false.
get_redispatch_to(Msg) ->
emqx_message:get_header(redispatch_to, Msg, false).
-spec is_ack_required(emqx_types:message()) -> boolean().
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
-spec get_group(emqx_types:message()) -> {ok, any()} | error.
get_group(Msg) ->
case get_group_ack(Msg) of
?NO_ACK -> error;
{Group, _Sender, _Ref} -> {ok, Group}
end.
%% @doc Negative ack dropped message due to inflight window or message queue being full.
-spec maybe_nack_dropped(emqx_types:message()) -> boolean().
maybe_nack_dropped(Msg) ->

View File

@ -25,10 +25,20 @@
-define(SUITE, ?MODULE).
-define(wait(For, Timeout),
emqx_common_test_helpers:wait_for(
?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout
)
-define(WAIT(TIMEOUT, PATTERN, Res),
(fun() ->
receive
PATTERN ->
Res;
Other ->
ct:fail(#{
expected => ??PATTERN,
got => Other
})
after TIMEOUT ->
ct:fail({timeout, ??PATTERN})
end
end)()
).
-define(ack, shared_sub_ack).
@ -45,10 +55,26 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
t_is_ack_required(_) ->
init_per_testcase(Case, Config) ->
try
?MODULE:Case({'init', Config})
catch
error:function_clause ->
Config
end.
end_per_testcase(Case, Config) ->
try
?MODULE:Case({'end', Config})
catch
error:function_clause ->
ok
end.
t_is_ack_required(Config) when is_list(Config) ->
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
t_maybe_nack_dropped(_) ->
t_maybe_nack_dropped(Config) when is_list(Config) ->
?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)),
@ -60,7 +86,7 @@ t_maybe_nack_dropped(_) ->
end
).
t_nack_no_connection(_) ->
t_nack_no_connection(Config) when is_list(Config) ->
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
?assertEqual(
@ -71,7 +97,7 @@ t_nack_no_connection(_) ->
end
).
t_maybe_ack(_) ->
t_maybe_ack(Config) when is_list(Config) ->
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
?assertEqual(
@ -86,10 +112,7 @@ t_maybe_ack(_) ->
end
).
% t_subscribers(_) ->
% error('TODO').
t_random_basic(_) ->
t_random_basic(Config) when is_list(Config) ->
ok = ensure_config(random),
ClientId = <<"ClientId">>,
Topic = <<"foo">>,
@ -121,7 +144,7 @@ t_random_basic(_) ->
%% After the connection for the 2nd session is also closed,
%% i.e. when all clients are offline, the following message(s)
%% should be delivered randomly.
t_no_connection_nack(_) ->
t_no_connection_nack(Config) when is_list(Config) ->
ok = ensure_config(sticky),
Publisher = <<"publisher">>,
Subscriber1 = <<"Subscriber1">>,
@ -153,54 +176,22 @@ t_no_connection_nack(_) ->
%% This is the connection which was picked by broker to dispatch (sticky) for 1st message
?assertMatch([#{packet_id := 1}], recv_msgs(1)),
%% Now kill the connection, expect all following messages to be delivered to the other
%% subscriber.
%emqx_mock_client:stop(ConnPid),
%% sleep then make synced calls to session processes to ensure that
%% the connection pid's 'EXIT' message is propagated to the session process
%% also to be sure sessions are still alive
% timer:sleep(2),
% _ = emqx_session:info(SPid1),
% _ = emqx_session:info(SPid2),
% %% Now we know what is the other still alive connection
% [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid],
% %% Send some more messages
% PacketIdList = lists:seq(2, 10),
% lists:foreach(fun(Id) ->
% SendF(Id),
% ?wait(Received(Id, TheOtherConnPid), 1000)
% end, PacketIdList),
% %% Now close the 2nd (last connection)
% emqx_mock_client:stop(TheOtherConnPid),
% timer:sleep(2),
% %% both sessions should have conn_pid = undefined
% ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
% ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
% %% send more messages, but all should be queued in session state
% lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
% {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
% {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
% ?assertEqual(length(PacketIdList), L1 + L2),
% %% clean up
% emqx_mock_client:close_session(PubConnPid),
% emqx_sm:close_session(SPid1),
% emqx_sm:close_session(SPid2),
ok.
t_random(_) ->
t_random(Config) when is_list(Config) ->
ok = ensure_config(random, true),
test_two_messages(random).
t_round_robin(_) ->
t_round_robin(Config) when is_list(Config) ->
ok = ensure_config(round_robin, true),
test_two_messages(round_robin).
t_round_robin_per_group(_) ->
t_round_robin_per_group(Config) when is_list(Config) ->
ok = ensure_config(round_robin_per_group, true),
test_two_messages(round_robin_per_group).
%% this would fail if executed with the standard round_robin strategy
t_round_robin_per_group_even_distribution_one_group(_) ->
t_round_robin_per_group_even_distribution_one_group(Config) when is_list(Config) ->
ok = ensure_config(round_robin_per_group, true),
Topic = <<"foo/bar">>,
Group = <<"group1">>,
@ -264,7 +255,7 @@ t_round_robin_per_group_even_distribution_one_group(_) ->
),
ok.
t_round_robin_per_group_even_distribution_two_groups(_) ->
t_round_robin_per_group_even_distribution_two_groups(Config) when is_list(Config) ->
ok = ensure_config(round_robin_per_group, true),
Topic = <<"foo/bar">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, <<"C0">>}]),
@ -350,19 +341,19 @@ t_round_robin_per_group_even_distribution_two_groups(_) ->
),
ok.
t_sticky(_) ->
t_sticky(Config) when is_list(Config) ->
ok = ensure_config(sticky, true),
test_two_messages(sticky).
t_hash(_) ->
t_hash(Config) when is_list(Config) ->
ok = ensure_config(hash, false),
test_two_messages(hash).
t_hash_clinetid(_) ->
t_hash_clinetid(Config) when is_list(Config) ->
ok = ensure_config(hash_clientid, false),
test_two_messages(hash_clientid).
t_hash_topic(_) ->
t_hash_topic(Config) when is_list(Config) ->
ok = ensure_config(hash_topic, false),
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
@ -407,7 +398,7 @@ t_hash_topic(_) ->
ok.
%% if the original subscriber dies, change to another one alive
t_not_so_sticky(_) ->
t_not_so_sticky(Config) when is_list(Config) ->
ok = ensure_config(sticky),
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
@ -481,7 +472,7 @@ last_message(ExpectedPayload, Pids, Timeout) ->
<<"not yet?">>
end.
t_dispatch(_) ->
t_dispatch(Config) when is_list(Config) ->
ok = ensure_config(random),
Topic = <<"foo">>,
?assertEqual(
@ -494,13 +485,13 @@ t_dispatch(_) ->
emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})
).
t_uncovered_func(_) ->
t_uncovered_func(Config) when is_list(Config) ->
ignored = gen_server:call(emqx_shared_sub, ignored),
ok = gen_server:cast(emqx_shared_sub, ignored),
ignored = emqx_shared_sub ! ignored,
{mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
t_per_group_config(_) ->
t_per_group_config(Config) when is_list(Config) ->
ok = ensure_group_config(#{
<<"local_group">> => local,
<<"round_robin_group">> => round_robin,
@ -521,7 +512,7 @@ t_per_group_config(_) ->
test_two_messages(round_robin_per_group, <<"round_robin_per_group_group">>),
test_two_messages(round_robin_per_group, <<"round_robin_per_group_group">>).
t_local(_) ->
t_local(Config) when is_list(Config) ->
GroupConfig = #{
<<"local_group">> => local,
<<"round_robin_group">> => round_robin,
@ -567,7 +558,7 @@ t_local(_) ->
?assertNotEqual(UsedSubPid1, UsedSubPid2),
ok.
t_remote(_) ->
t_remote(Config) when is_list(Config) ->
%% This testcase verifies dispatching of shared messages to the remote nodes via backplane API.
%%
%% In this testcase we start two EMQX nodes: local and remote.
@ -620,7 +611,7 @@ t_remote(_) ->
stop_slave(Node)
end.
t_local_fallback(_) ->
t_local_fallback(Config) when is_list(Config) ->
ok = ensure_group_config(#{
<<"local_group">> => local,
<<"round_robin_group">> => round_robin,
@ -653,9 +644,14 @@ t_local_fallback(_) ->
%% This one tests that broker tries to select another shared subscriber
%% If the first one doesn't return an ACK
t_redispatch(_) ->
ok = ensure_config(sticky, true),
t_redispatch_qos1_with_ack(Config) when is_list(Config) ->
test_redispatch_qos1(Config, true).
t_redispatch_qos1_no_ack(Config) when is_list(Config) ->
test_redispatch_qos1(Config, false).
test_redispatch_qos1(_Config, AckEnabled) ->
ok = ensure_config(sticky, AckEnabled),
Group = <<"group1">>,
Topic = <<"foo/bar">>,
ClientId1 = <<"ClientId1">>,
@ -682,10 +678,169 @@ t_redispatch(_) ->
emqtt:stop(UsedSubPid2),
ok.
t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) ->
ok = ensure_config(sticky, true),
Group = <<"group1">>,
Topic = <<"foo/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
SubOpts = [{clean_start, false}],
{ok, ConnPub} = emqtt:start_link([{clientid, <<"pub">>}]),
{ok, _} = emqtt:connect(ConnPub),
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1} | SubOpts]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2} | SubOpts]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
ok = emqtt:stop(ConnPid1),
ok = emqtt:stop(ConnPid2),
[Pid1, Pid2] = emqx_shared_sub:subscribers(Group, Topic),
?assert(is_process_alive(Pid1)),
?assert(is_process_alive(Pid2)),
{ok, _} = emqtt:publish(ConnPub, Topic, <<"hello11">>, 1),
ct:sleep(100),
{ok, Msgs1} = gen_server:call(Pid1, get_mqueue),
{ok, Msgs2} = gen_server:call(Pid2, get_mqueue),
%% assert the message is in mqueue (because socket is closed)
?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2),
emqtt:stop(ConnPub),
ok.
%% No ack, QoS 2 subscriptions,
%% client1 receives one message, send pubrec, then suspend
%% client2 acts normal (auto_ack=true)
%% Expected behaviour:
%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
t_dispatch_qos2({init, Config}) when is_list(Config) ->
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
Config;
t_dispatch_qos2({'end', Config}) when is_list(Config) ->
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
t_dispatch_qos2(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
ct:sleep(100),
ok = sys:suspend(ConnPid1),
%% One message is inflight
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec2 > MsgRec1),
sys:resume(ConnPid1),
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
%% the 4th message yet since max_inflight is 1.
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
ct:sleep(100),
%% no message expected
?assertEqual([], collect_msgs(0)),
%% now kill client 1
kill_process(ConnPid1),
%% client 2 should receive the message
MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec4 > MsgRec3),
emqtt:stop(ConnPid2),
ok.
t_dispatch_qos0({init, Config}) when is_list(Config) ->
Config;
t_dispatch_qos0({'end', Config}) when is_list(Config) ->
ok;
t_dispatch_qos0(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
%% subscribe with QoS 0
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}),
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}),
%% publish with QoS 2, but should be downgraded to 0 as the subscribers
%% subscribe with QoS 0
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
ct:sleep(100),
ok = sys:suspend(ConnPid1),
?assertMatch([_], emqx:publish(Message1)),
?assertMatch([_], emqx:publish(Message2)),
?assertMatch([_], emqx:publish(Message3)),
?assertMatch([_], emqx:publish(Message4)),
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec2 > MsgRec1),
kill_process(ConnPid1),
%% expect no redispatch
?assertEqual([], collect_msgs(timer:seconds(2))),
emqtt:stop(ConnPid2),
ok.
%%--------------------------------------------------------------------
%% help functions
%%--------------------------------------------------------------------
kill_process(Pid) ->
_ = unlink(Pid),
_ = monitor(process, Pid),
erlang:exit(Pid, kill),
receive
{'DOWN', _, process, Pid, _} ->
ok
end.
collect_msgs(Timeout) ->
collect_msgs([], Timeout).
collect_msgs(Acc, Timeout) ->
receive
Msg ->
collect_msgs([Msg | Acc], Timeout)
after Timeout ->
lists:reverse(Acc)
end.
ensure_config(Strategy) ->
ensure_config(Strategy, _AckEnabled = true).