From e8279a02ef2d931c7045b703e975bf4eaea1a1a2 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 5 Oct 2022 21:27:42 +0200 Subject: [PATCH 1/2] fix(shared): re-dispatch inflight (QoS1) and mqueue messages when session terminates (not due to take over) shared delivery should be re-dispatched to other members in the group --- CHANGES-5.0.md | 1 + apps/emqx/src/emqx_mqueue.erl | 22 +- apps/emqx/src/emqx_session.erl | 40 ++-- apps/emqx/src/emqx_shared_sub.erl | 53 ++++- apps/emqx/test/emqx_shared_sub_SUITE.erl | 247 +++++++++++++++++------ 5 files changed, 270 insertions(+), 93 deletions(-) diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index 20d972096..af8aaecf1 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -10,6 +10,7 @@ * Fix GET /listeners API crash When some nodes still in initial configuration. [#9002](https://github.com/emqx/emqx/pull/9002) * Fix empty variable interpolation in authentication and authorization. Placeholders for undefined variables are rendered now as empty strings and do not cause errors anymore. [#8963](https://github.com/emqx/emqx/pull/8963) * Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8986](https://github.com/emqx/emqx/pull/8986) +* Redispatch shared subscription messages. # 5.0.8 diff --git a/apps/emqx/src/emqx_mqueue.erl b/apps/emqx/src/emqx_mqueue.erl index f8556bc39..ae55d4b6e 100644 --- a/apps/emqx/src/emqx_mqueue.erl +++ b/apps/emqx/src/emqx_mqueue.erl @@ -66,7 +66,8 @@ in/2, out/1, stats/1, - dropped/1 + dropped/1, + to_list/1 ]). -define(NO_PRIORITY_TABLE, disabled). @@ -109,7 +110,7 @@ dropped = 0 :: count(), p_table = ?NO_PRIORITY_TABLE :: p_table(), default_p = ?LOWEST_PRIORITY :: priority(), - q = ?PQUEUE:new() :: pq(), + q = emqx_pqueue:new() :: pq(), shift_opts :: #shift_opts{}, last_prio :: non_neg_integer() | undefined, p_credit :: non_neg_integer() | undefined @@ -118,7 +119,7 @@ -type mqueue() :: #mqueue{}. -spec init(options()) -> mqueue(). -init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> +init(Opts = #{max_len := MaxLen0, store_qos0 := Qos0}) -> MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of true -> MaxLen0; @@ -126,7 +127,7 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> end, #mqueue{ max_len = MaxLen, - store_qos0 = QoS_0, + store_qos0 = Qos0, p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE), default_p = get_priority_opt(Opts), shift_opts = get_shift_opt(Opts) @@ -152,6 +153,19 @@ len(#mqueue{len = Len}) -> Len. max_len(#mqueue{max_len = MaxLen}) -> MaxLen. +%% @doc Return all queued items in a list. +-spec to_list(mqueue()) -> list(). +to_list(MQ) -> + to_list(MQ, []). + +to_list(MQ, Acc) -> + case out(MQ) of + {empty, _MQ} -> + lists:reverse(Acc); + {{value, Msg}, Q1} -> + to_list(Q1, [Msg | Acc]) + end. + %% @doc Return number of dropped messages. -spec dropped(mqueue()) -> count(). dropped(#mqueue{dropped = Dropped}) -> Dropped. diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 8ce8a1802..2e79bcfb1 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -801,7 +801,8 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) -> -spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok. terminate(ClientInfo, Reason, Session) -> run_terminate_hooks(ClientInfo, Reason, Session), - redispatch_shared_messages(Session), + Reason =/= takenover andalso + redispatch_shared_messages(Session), ok. run_terminate_hooks(ClientInfo, discarded, Session) -> @@ -811,29 +812,20 @@ run_terminate_hooks(ClientInfo, takenover, Session) -> run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). -redispatch_shared_messages(#session{inflight = Inflight}) -> - InflightList = emqx_inflight:to_list(Inflight), - lists:foreach( - fun - %% Only QoS1 messages get redispatched, because QoS2 messages - %% must be sent to the same client, once they're in flight - ({_, #inflight_data{message = #message{qos = ?QOS_2} = Msg}}) -> - ?SLOG(warning, #{msg => qos2_lost_no_redispatch}, #{message => Msg}); - ({_, #inflight_data{message = #message{topic = Topic, qos = ?QOS_1} = Msg}}) -> - case emqx_shared_sub:get_group(Msg) of - {ok, Group} -> - %% Note that dispatch is called with self() in failed subs - %% This is done to avoid dispatching back to caller - Delivery = #delivery{sender = self(), message = Msg}, - emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]); - _ -> - false - end; - (_) -> - ok - end, - InflightList - ). +redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> + AllInflights = emqx_inflight:to_list(fun sort_fun/2, Inflight), + F = fun + ({_PacketId, #inflight_data{message = #message{qos = ?QOS_1} = Msg}}) -> + %% For QoS 2, here is what the spec says: + %% If the Client's Session terminates before the Client reconnects, + %% the Server MUST NOT send the Application Message to any other + %% subscribed Client [MQTT-4.8.2-5]. + {true, Msg}; + ({_PacketId, #inflight_data{}}) -> + false + end, + InflightList = lists:filtermap(F, AllInflights), + emqx_shared_sub:redispatch(InflightList ++ emqx_mqueue:to_list(Q)). -compile({inline, [run_hook/2]}). run_hook(Name, Args) -> diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 0527cbfe7..0f7e082eb 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -39,7 +39,8 @@ -export([ dispatch/3, dispatch/4, - do_dispatch_with_ack/4 + do_dispatch_with_ack/4, + redispatch/1 ]). -export([ @@ -96,6 +97,9 @@ -define(ACK, shared_sub_ack). -define(NACK(Reason), {shared_sub_nack, Reason}). -define(NO_ACK, no_ack). +-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). + +-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()). -record(state, {pmon}). @@ -144,7 +148,8 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> false -> {error, no_subscribers}; {Type, SubPid} -> - case do_dispatch(SubPid, Group, Topic, Msg, Type) of + Msg1 = with_redispatch_to(Msg, Group, Topic), + case do_dispatch(SubPid, Group, Topic, Msg1, Type) of ok -> {ok, 1}; {error, _Reason} -> @@ -223,6 +228,50 @@ without_group_ack(Msg) -> get_group_ack(Msg) -> emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). +with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> + Msg; +with_redispatch_to(Msg, Group, Topic) -> + emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg). + +%% @hidden Redispatch is neede only for the messages with redispatch_to header added. +is_redispatch_needed(#message{} = Msg) -> + case get_redispatch_to(Msg) of + ?REDISPATCH_TO(_, _) -> + true; + _ -> + false + end. + +%% @doc Redispatch shared deliveries to other members in the group. +redispatch(Messages0) -> + Messages = lists:filter(fun is_redispatch_needed/1, Messages0), + case length(Messages) of + L when L > 0 -> + ?SLOG(info, #{ + msg => "redispatching_shared_subscription_message", + count => L + }), + lists:foreach(fun redispatch_shared_message/1, Messages); + _ -> + ok + end. + +redispatch_shared_message(#message{} = Msg) -> + %% As long as it's still a #message{} record in inflight, + %% we should try to re-dispatch + ?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg), + %% Note that dispatch is called with self() in failed subs + %% This is done to avoid dispatching back to caller + Delivery = #delivery{sender = self(), message = Msg}, + dispatch(Group, Topic, Delivery, [self()]). + +%% @hidden Return the `redispatch_to` group-topic in the message header. +%% `false` is returned if the message is not a shared dispatch. +%% or when it's a QoS 0 message. +-spec get_redispatch_to(emqx_types:message()) -> redispatch_to() | false. +get_redispatch_to(Msg) -> + emqx_message:get_header(redispatch_to, Msg, false). + -spec is_ack_required(emqx_types:message()) -> boolean(). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 8616028ca..5089a3a24 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -25,10 +25,20 @@ -define(SUITE, ?MODULE). --define(wait(For, Timeout), - emqx_common_test_helpers:wait_for( - ?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout - ) +-define(WAIT(TIMEOUT, PATTERN, Res), + (fun() -> + receive + PATTERN -> + Res; + Other -> + ct:fail(#{ + expected => ??PATTERN, + got => Other + }) + after TIMEOUT -> + ct:fail({timeout, ??PATTERN}) + end + end)() ). -define(ack, shared_sub_ack). @@ -45,10 +55,26 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([]). -t_is_ack_required(_) -> +init_per_testcase(Case, Config) -> + try + ?MODULE:Case({'init', Config}) + catch + error:function_clause -> + Config + end. + +end_per_testcase(Case, Config) -> + try + ?MODULE:Case({'end', Config}) + catch + error:function_clause -> + ok + end. + +t_is_ack_required(Config) when is_list(Config) -> ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). -t_maybe_nack_dropped(_) -> +t_maybe_nack_dropped(Config) when is_list(Config) -> ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)), @@ -60,7 +86,7 @@ t_maybe_nack_dropped(_) -> end ). -t_nack_no_connection(_) -> +t_nack_no_connection(Config) when is_list(Config) -> Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual( @@ -71,7 +97,7 @@ t_nack_no_connection(_) -> end ). -t_maybe_ack(_) -> +t_maybe_ack(Config) when is_list(Config) -> ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual( @@ -86,10 +112,7 @@ t_maybe_ack(_) -> end ). -% t_subscribers(_) -> -% error('TODO'). - -t_random_basic(_) -> +t_random_basic(Config) when is_list(Config) -> ok = ensure_config(random), ClientId = <<"ClientId">>, Topic = <<"foo">>, @@ -121,7 +144,7 @@ t_random_basic(_) -> %% After the connection for the 2nd session is also closed, %% i.e. when all clients are offline, the following message(s) %% should be delivered randomly. -t_no_connection_nack(_) -> +t_no_connection_nack(Config) when is_list(Config) -> ok = ensure_config(sticky), Publisher = <<"publisher">>, Subscriber1 = <<"Subscriber1">>, @@ -153,54 +176,22 @@ t_no_connection_nack(_) -> %% This is the connection which was picked by broker to dispatch (sticky) for 1st message ?assertMatch([#{packet_id := 1}], recv_msgs(1)), - %% Now kill the connection, expect all following messages to be delivered to the other - %% subscriber. - %emqx_mock_client:stop(ConnPid), - %% sleep then make synced calls to session processes to ensure that - %% the connection pid's 'EXIT' message is propagated to the session process - %% also to be sure sessions are still alive - % timer:sleep(2), - % _ = emqx_session:info(SPid1), - % _ = emqx_session:info(SPid2), - % %% Now we know what is the other still alive connection - % [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid], - % %% Send some more messages - % PacketIdList = lists:seq(2, 10), - % lists:foreach(fun(Id) -> - % SendF(Id), - % ?wait(Received(Id, TheOtherConnPid), 1000) - % end, PacketIdList), - % %% Now close the 2nd (last connection) - % emqx_mock_client:stop(TheOtherConnPid), - % timer:sleep(2), - % %% both sessions should have conn_pid = undefined - % ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))), - % ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))), - % %% send more messages, but all should be queued in session state - % lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList), - % {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)), - % {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)), - % ?assertEqual(length(PacketIdList), L1 + L2), - % %% clean up - % emqx_mock_client:close_session(PubConnPid), - % emqx_sm:close_session(SPid1), - % emqx_sm:close_session(SPid2), ok. -t_random(_) -> +t_random(Config) when is_list(Config) -> ok = ensure_config(random, true), test_two_messages(random). -t_round_robin(_) -> +t_round_robin(Config) when is_list(Config) -> ok = ensure_config(round_robin, true), test_two_messages(round_robin). -t_round_robin_per_group(_) -> +t_round_robin_per_group(Config) when is_list(Config) -> ok = ensure_config(round_robin_per_group, true), test_two_messages(round_robin_per_group). %% this would fail if executed with the standard round_robin strategy -t_round_robin_per_group_even_distribution_one_group(_) -> +t_round_robin_per_group_even_distribution_one_group(Config) when is_list(Config) -> ok = ensure_config(round_robin_per_group, true), Topic = <<"foo/bar">>, Group = <<"group1">>, @@ -264,7 +255,7 @@ t_round_robin_per_group_even_distribution_one_group(_) -> ), ok. -t_round_robin_per_group_even_distribution_two_groups(_) -> +t_round_robin_per_group_even_distribution_two_groups(Config) when is_list(Config) -> ok = ensure_config(round_robin_per_group, true), Topic = <<"foo/bar">>, {ok, ConnPid1} = emqtt:start_link([{clientid, <<"C0">>}]), @@ -350,19 +341,19 @@ t_round_robin_per_group_even_distribution_two_groups(_) -> ), ok. -t_sticky(_) -> +t_sticky(Config) when is_list(Config) -> ok = ensure_config(sticky, true), test_two_messages(sticky). -t_hash(_) -> +t_hash(Config) when is_list(Config) -> ok = ensure_config(hash, false), test_two_messages(hash). -t_hash_clinetid(_) -> +t_hash_clinetid(Config) when is_list(Config) -> ok = ensure_config(hash_clientid, false), test_two_messages(hash_clientid). -t_hash_topic(_) -> +t_hash_topic(Config) when is_list(Config) -> ok = ensure_config(hash_topic, false), ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, @@ -407,7 +398,7 @@ t_hash_topic(_) -> ok. %% if the original subscriber dies, change to another one alive -t_not_so_sticky(_) -> +t_not_so_sticky(Config) when is_list(Config) -> ok = ensure_config(sticky), ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, @@ -481,7 +472,7 @@ last_message(ExpectedPayload, Pids, Timeout) -> <<"not yet?">> end. -t_dispatch(_) -> +t_dispatch(Config) when is_list(Config) -> ok = ensure_config(random), Topic = <<"foo">>, ?assertEqual( @@ -494,13 +485,13 @@ t_dispatch(_) -> emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}}) ). -t_uncovered_func(_) -> +t_uncovered_func(Config) when is_list(Config) -> ignored = gen_server:call(emqx_shared_sub, ignored), ok = gen_server:cast(emqx_shared_sub, ignored), ignored = emqx_shared_sub ! ignored, {mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}. -t_per_group_config(_) -> +t_per_group_config(Config) when is_list(Config) -> ok = ensure_group_config(#{ <<"local_group">> => local, <<"round_robin_group">> => round_robin, @@ -521,7 +512,7 @@ t_per_group_config(_) -> test_two_messages(round_robin_per_group, <<"round_robin_per_group_group">>), test_two_messages(round_robin_per_group, <<"round_robin_per_group_group">>). -t_local(_) -> +t_local(Config) when is_list(Config) -> GroupConfig = #{ <<"local_group">> => local, <<"round_robin_group">> => round_robin, @@ -567,7 +558,7 @@ t_local(_) -> ?assertNotEqual(UsedSubPid1, UsedSubPid2), ok. -t_remote(_) -> +t_remote(Config) when is_list(Config) -> %% This testcase verifies dispatching of shared messages to the remote nodes via backplane API. %% %% In this testcase we start two EMQX nodes: local and remote. @@ -620,7 +611,7 @@ t_remote(_) -> stop_slave(Node) end. -t_local_fallback(_) -> +t_local_fallback(Config) when is_list(Config) -> ok = ensure_group_config(#{ <<"local_group">> => local, <<"round_robin_group">> => round_robin, @@ -653,9 +644,14 @@ t_local_fallback(_) -> %% This one tests that broker tries to select another shared subscriber %% If the first one doesn't return an ACK -t_redispatch(_) -> - ok = ensure_config(sticky, true), +t_redispatch_qos1_with_ack(Config) when is_list(Config) -> + test_redispatch_qos1(Config, true). +t_redispatch_qos1_no_ack(Config) when is_list(Config) -> + test_redispatch_qos1(Config, false). + +test_redispatch_qos1(_Config, AckEnabled) -> + ok = ensure_config(sticky, AckEnabled), Group = <<"group1">>, Topic = <<"foo/bar">>, ClientId1 = <<"ClientId1">>, @@ -682,10 +678,135 @@ t_redispatch(_) -> emqtt:stop(UsedSubPid2), ok. +%% No ack, QoS 2 subscriptions, +%% client1 receives one message, send pubrec, then suspend +%% client2 acts normal (auto_ack=true) +%% Expected behaviour: +%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down +t_dispatch_qos2({init, Config}) when is_list(Config) -> + emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1), + Config; +t_dispatch_qos2({'end', Config}) when is_list(Config) -> + emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0); +t_dispatch_qos2(Config) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = false), + Topic = <<"foo/bar/1">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}), + emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}), + + Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>), + ct:sleep(100), + + ok = sys:suspend(ConnPid1), + + %% One message is inflight + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)), + + MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), + MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), + %% assert hello2 > hello1 or hello4 > hello3 + ?assert(MsgRec2 > MsgRec1), + + sys:resume(ConnPid1), + %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false + %% so it will never send PUBCOMP, hence EMQX should not attempt to send + %% the 4th message yet since max_inflight is 1. + MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3), + ct:sleep(100), + %% no message expected + ?assertEqual([], collect_msgs(0)), + %% now kill client 1 + kill_process(ConnPid1), + %% client 2 should receive the message + MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4), + %% assert hello2 > hello1 or hello4 > hello3 + ?assert(MsgRec4 > MsgRec3), + emqtt:stop(ConnPid2), + ok. + +t_dispatch_qos0({init, Config}) when is_list(Config) -> + Config; +t_dispatch_qos0({'end', Config}) when is_list(Config) -> + ok; +t_dispatch_qos0(Config) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = false), + Topic = <<"foo/bar/1">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + %% subscribe with QoS 0 + emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}), + emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}), + + %% publish with QoS 2, but should be downgraded to 0 as the subscribers + %% subscribe with QoS 0 + Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>), + ct:sleep(100), + + ok = sys:suspend(ConnPid1), + + ?assertMatch([_], emqx:publish(Message1)), + ?assertMatch([_], emqx:publish(Message2)), + ?assertMatch([_], emqx:publish(Message3)), + ?assertMatch([_], emqx:publish(Message4)), + + MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), + MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), + %% assert hello2 > hello1 or hello4 > hello3 + ?assert(MsgRec2 > MsgRec1), + + kill_process(ConnPid1), + %% expect no redispatch + ?assertEqual([], collect_msgs(timer:seconds(2))), + emqtt:stop(ConnPid2), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- +kill_process(Pid) -> + _ = unlink(Pid), + _ = monitor(process, Pid), + erlang:exit(Pid, kill), + receive + {'DOWN', _, process, Pid, _} -> + ok + end. + +collect_msgs(Timeout) -> + collect_msgs([], Timeout). + +collect_msgs(Acc, Timeout) -> + receive + Msg -> + collect_msgs([Msg | Acc], Timeout) + after Timeout -> + lists:reverse(Acc) + end. + ensure_config(Strategy) -> ensure_config(Strategy, _AckEnabled = true). From 1c29e2806a941a2240443511f01f2fe2c56a8e61 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 5 Oct 2022 22:23:07 +0200 Subject: [PATCH 2/2] test(shared): add a test case to ensure shared dispatch retry to ensure retry will not enter a dead loop --- CHANGES-5.0.md | 2 +- apps/emqx/src/emqx_channel.erl | 14 ++++++++-- apps/emqx/src/emqx_session.erl | 6 ++++- apps/emqx/src/emqx_shared_sub.erl | 10 +------ apps/emqx/test/emqx_shared_sub_SUITE.erl | 34 ++++++++++++++++++++++++ 5 files changed, 53 insertions(+), 13 deletions(-) diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index af8aaecf1..1a08e41e8 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -10,7 +10,7 @@ * Fix GET /listeners API crash When some nodes still in initial configuration. [#9002](https://github.com/emqx/emqx/pull/9002) * Fix empty variable interpolation in authentication and authorization. Placeholders for undefined variables are rendered now as empty strings and do not cause errors anymore. [#8963](https://github.com/emqx/emqx/pull/8963) * Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8986](https://github.com/emqx/emqx/pull/8986) -* Redispatch shared subscription messages. +* Redispatch shared subscription messages. [#9104](https://github.com/emqx/emqx/pull/9104) # 5.0.8 diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index d1a111dc5..742868694 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -997,8 +997,13 @@ maybe_nack(Delivers) -> lists:filter(fun not_nacked/1, Delivers). not_nacked({deliver, _Topic, Msg}) -> - not (emqx_shared_sub:is_ack_required(Msg) andalso - (ok == emqx_shared_sub:nack_no_connection(Msg))). + case emqx_shared_sub:is_ack_required(Msg) of + true -> + ok = emqx_shared_sub:nack_no_connection(Msg), + false; + false -> + true + end. maybe_mark_as_delivered(Session, Delivers) -> case emqx_session:info(is_persistent, Session) of @@ -1222,6 +1227,8 @@ handle_call( ChanInfo1 = info(NChannel), emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}), reply(ok, reset_timer(alive_timer, NChannel)); +handle_call(get_mqueue, Channel) -> + reply({ok, get_mqueue(Channel)}, Channel); handle_call(Req, Channel) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), reply(ignored, Channel). @@ -2224,3 +2231,6 @@ get_mqtt_conf(Zone, Key, Default) -> set_field(Name, Value, Channel) -> Pos = emqx_misc:index_of(Name, record_info(fields, channel)), setelement(Pos + 1, Channel, Value). + +get_mqueue(#channel{session = Session}) -> + emqx_session:get_mqueue(Session). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 2e79bcfb1..b285d0a88 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -60,7 +60,8 @@ info/2, is_session/1, stats/1, - obtain_next_pkt_id/1 + obtain_next_pkt_id/1, + get_mqueue/1 ]). -export([ @@ -917,3 +918,6 @@ age(Now, Ts) -> Now - Ts. set_field(Name, Value, Session) -> Pos = emqx_misc:index_of(Name, record_info(fields, session)), setelement(Pos + 1, Session, Value). + +get_mqueue(#session{mqueue = Q}) -> + emqx_mqueue:to_list(Q). diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 0f7e082eb..975b403b9 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -47,8 +47,7 @@ maybe_ack/1, maybe_nack_dropped/1, nack_no_connection/1, - is_ack_required/1, - get_group/1 + is_ack_required/1 ]). %% for testing @@ -275,13 +274,6 @@ get_redispatch_to(Msg) -> -spec is_ack_required(emqx_types:message()) -> boolean(). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). --spec get_group(emqx_types:message()) -> {ok, any()} | error. -get_group(Msg) -> - case get_group_ack(Msg) of - ?NO_ACK -> error; - {Group, _Sender, _Ref} -> {ok, Group} - end. - %% @doc Negative ack dropped message due to inflight window or message queue being full. -spec maybe_nack_dropped(emqx_types:message()) -> boolean(). maybe_nack_dropped(Msg) -> diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 5089a3a24..291286aa2 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -678,6 +678,40 @@ test_redispatch_qos1(_Config, AckEnabled) -> emqtt:stop(UsedSubPid2), ok. +t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) -> + ok = ensure_config(sticky, true), + Group = <<"group1">>, + Topic = <<"foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + SubOpts = [{clean_start, false}], + {ok, ConnPub} = emqtt:start_link([{clientid, <<"pub">>}]), + {ok, _} = emqtt:connect(ConnPub), + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1} | SubOpts]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2} | SubOpts]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}), + emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}), + + ok = emqtt:stop(ConnPid1), + ok = emqtt:stop(ConnPid2), + + [Pid1, Pid2] = emqx_shared_sub:subscribers(Group, Topic), + ?assert(is_process_alive(Pid1)), + ?assert(is_process_alive(Pid2)), + + {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello11">>, 1), + ct:sleep(100), + {ok, Msgs1} = gen_server:call(Pid1, get_mqueue), + {ok, Msgs2} = gen_server:call(Pid2, get_mqueue), + %% assert the message is in mqueue (because socket is closed) + ?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2), + emqtt:stop(ConnPub), + ok. + %% No ack, QoS 2 subscriptions, %% client1 receives one message, send pubrec, then suspend %% client2 acts normal (auto_ack=true)