From 67718ef84f55b29c9c858954c39f58b314f60b4a Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 19 Dec 2022 17:07:52 +0100 Subject: [PATCH 1/6] docs: prepare to deprecate broker.shared_dispatch_ack_enabled --- apps/emqx/i18n/emqx_schema_i18n.conf | 6 ++++-- apps/emqx/src/emqx_schema.erl | 4 ++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index e7c4890f7..750c0c2cd 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -1078,11 +1078,13 @@ Supported configurations are the following: broker_shared_dispatch_ack_enabled { desc { - en: """Enable/disable shared dispatch acknowledgement for QoS 1 and QoS 2 messages. + en: """Deprecated, will be removed in 5.1. +Enable/disable shared dispatch acknowledgement for QoS 1 and QoS 2 messages. This should allow messages to be dispatched to a different subscriber in the group in case the picked (based on `shared_subscription_strategy`) subscriber is offline. """ - zh: """启用/禁用 QoS 1 和 QoS 2 消息的共享派发确认。 + zh: """该配置项已废弃,会在 5.1 中移除。 +启用/禁用 QoS 1 和 QoS 2 消息的共享派发确认。 开启后,允许将消息从未及时回复 ACK 的订阅者 (例如,客户端离线)重新派发给另外一个订阅者。 """ } diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 171b6dc42..b1afcac7f 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1198,6 +1198,10 @@ fields("broker") -> sc( boolean(), #{ + %% TODO: deprecated => {since, "5.1.0"} + %% in favor of session message re-dispatch at termination + %% we will stop supporting dispatch acks for shared + %% subscriptions. default => false, desc => ?DESC(broker_shared_dispatch_ack_enabled) } From 29f394aa70b3a0965c38c1754d9b16965591f4bc Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 19 Dec 2022 19:01:13 +0100 Subject: [PATCH 2/6] fix(shared): handle unsubscribe for sticky strategy prior to this change, the message is dispatched to a shared subscriber even after unsubscribed --- apps/emqx/src/emqx_shared_sub.erl | 29 +++++++++++----- apps/emqx/test/emqx_session_SUITE.erl | 2 +- apps/emqx/test/emqx_shared_sub_SUITE.erl | 33 +++++++++++++++++++ .../emqx_connector/src/emqx_connector_jwt.erl | 2 +- .../src/emqx_connector_jwt_sup.erl | 2 +- 5 files changed, 56 insertions(+), 12 deletions(-) diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 975b403b9..5b8985fdb 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -139,7 +139,7 @@ record(Group, Topic, SubPid) -> -spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). dispatch(Group, Topic, Delivery) -> - dispatch(Group, Topic, Delivery, _FailedSubs = []). + dispatch(Group, Topic, Delivery, _FailedSubs = #{}). dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> #message{from = ClientId, topic = SourceTopic} = Msg, @@ -151,9 +151,9 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> case do_dispatch(SubPid, Group, Topic, Msg1, Type) of ok -> {ok, 1}; - {error, _Reason} -> + {error, Reason} -> %% Failed to dispatch to this sub, try next. - dispatch(Group, Topic, Delivery, [SubPid | FailedSubs]) + dispatch(Group, Topic, Delivery, FailedSubs#{SubPid => Reason}) end end. @@ -262,7 +262,8 @@ redispatch_shared_message(#message{} = Msg) -> %% Note that dispatch is called with self() in failed subs %% This is done to avoid dispatching back to caller Delivery = #delivery{sender = self(), message = Msg}, - dispatch(Group, Topic, Delivery, [self()]). + FailedSubs = #{self() => sender}, + dispatch(Group, Topic, Delivery, FailedSubs). %% @hidden Return the `redispatch_to` group-topic in the message header. %% `false` is returned if the message is not a shared dispatch. @@ -307,14 +308,22 @@ maybe_ack(Msg) -> pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), - case is_active_sub(Sub0, FailedSubs) of + All = subscribers(Group, Topic), + case is_active_sub(Sub0, FailedSubs, All) of true -> %% the old subscriber is still alive %% keep using it for sticky strategy {fresh, Sub0}; false -> %% randomly pick one for the first message - {Type, Sub} = do_pick(random, ClientId, SourceTopic, Group, Topic, [Sub0 | FailedSubs]), + {Type, Sub} = do_pick( + random, + ClientId, + SourceTopic, + Group, + Topic, + FailedSubs#{Sub0 => noproc} + ), %% stick to whatever pick result erlang:put({shared_sub_sticky, Group, Topic}, Sub), {Type, Sub} @@ -324,7 +333,7 @@ pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> All = subscribers(Group, Topic), - case All -- FailedSubs of + case lists:filter(fun(Sub) -> not maps:is_key(Sub, FailedSubs) end, All) of [] when All =:= [] -> %% Genuinely no subscriber false; @@ -512,8 +521,10 @@ update_stats(State) -> State. %% Return 'true' if the subscriber process is alive AND not in the failed list -is_active_sub(Pid, FailedSubs) -> - is_alive_sub(Pid) andalso not lists:member(Pid, FailedSubs). +is_active_sub(Pid, FailedSubs, All) -> + lists:member(Pid, All) andalso + is_alive_sub(Pid) andalso + (not maps:is_key(Pid, FailedSubs)). %% erlang:is_process_alive/1 does not work with remote pid. is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index c171382c7..91bf4659b 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -190,7 +190,7 @@ t_publish_qos2_with_error_return(_) -> begin Msg2 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload2">>), - {ok, [], Session1} = emqx_session:publish(clientinfo(), PacketId2 = 2, Msg2, Session), + {ok, [], Session1} = emqx_session:publish(clientinfo(), _PacketId2 = 2, Msg2, Session), ?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)), {error, RC2 = ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish( clientinfo(), _PacketId3 = 3, Msg2, Session1 diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index a428a6fa9..1beca6d5a 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -344,6 +344,39 @@ t_sticky(Config) when is_list(Config) -> ok = ensure_config(sticky, true), test_two_messages(sticky). +%% two subscribers in one shared group +%% one unsubscribe after receiving a message +%% the other one in the group should receive the next message +t_sticky_unsubscribe(Config) when is_list(Config) -> + ok = ensure_config(sticky, false), + Topic = <<"foo/bar/sticky-unsub">>, + ClientId1 = <<"c1-sticky-unsub">>, + ClientId2 = <<"c2-sticky-unsub">>, + Group = <<"gsu">>, + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + ShareTopic = <<"$share/", Group/binary, "/", Topic/binary>>, + emqtt:subscribe(ConnPid1, {ShareTopic, 0}), + emqtt:subscribe(ConnPid2, {ShareTopic, 0}), + + Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>), + ct:sleep(100), + + emqx:publish(Message1), + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + emqtt:unsubscribe(UsedSubPid1, ShareTopic), + emqx:publish(Message2), + {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]), + ?assertNotEqual(UsedSubPid1, UsedSubPid2), + + kill_process(ConnPid1, fun(_) -> emqtt:stop(ConnPid1) end), + kill_process(ConnPid2, fun(_) -> emqtt:stop(ConnPid2) end), + ok. + t_hash(Config) when is_list(Config) -> ok = ensure_config(hash, false), test_two_messages(hash). diff --git a/apps/emqx_connector/src/emqx_connector_jwt.erl b/apps/emqx_connector/src/emqx_connector_jwt.erl index 3dc39c675..c5cd54cb9 100644 --- a/apps/emqx_connector/src/emqx_connector_jwt.erl +++ b/apps/emqx_connector/src/emqx_connector_jwt.erl @@ -16,7 +16,7 @@ -module(emqx_connector_jwt). --include("emqx_connector_tables.hrl"). +-include_lib("emqx_connector/include/emqx_connector_tables.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). %% API diff --git a/apps/emqx_connector/src/emqx_connector_jwt_sup.erl b/apps/emqx_connector/src/emqx_connector_jwt_sup.erl index 611e152bd..ac1d22b71 100644 --- a/apps/emqx_connector/src/emqx_connector_jwt_sup.erl +++ b/apps/emqx_connector/src/emqx_connector_jwt_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor). --include("emqx_connector_tables.hrl"). +-include_lib("emqx_connector/include/emqx_connector_tables.hrl"). -export([ start_link/0, From 9d1e41bdb14a699e98c107fc37e163cc0b543dc6 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 19 Dec 2022 19:07:12 +0100 Subject: [PATCH 3/6] docs: update change logs --- changes/v5.0.13-en.md | 3 +++ changes/v5.0.13-zh.md | 3 +++ 2 files changed, 6 insertions(+) diff --git a/changes/v5.0.13-en.md b/changes/v5.0.13-en.md index 1691e3b81..acd210e71 100644 --- a/changes/v5.0.13-en.md +++ b/changes/v5.0.13-en.md @@ -17,3 +17,6 @@ ## Bug fixes - Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487). + +- Fix shared subscription 'sticky' strategy [#9578](https://github.com/emqx/emqx/pull/9578). + Prior to this change, a 'sticky' subscriber may continue to receive messages after unsubscribing. diff --git a/changes/v5.0.13-zh.md b/changes/v5.0.13-zh.md index 3f5d5b3a0..9b74dc5f0 100644 --- a/changes/v5.0.13-zh.md +++ b/changes/v5.0.13-zh.md @@ -17,3 +17,6 @@ ## 修复 - 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)。 + +- 修复共享订阅的 'sticky' 策略 [#9578](https://github.com/emqx/emqx/pull/9578)。 + 在此修复前,使用 'sticky' 策略的订阅客户端可能在取消订阅后继续收到消息。 From 4f1fb0b6290df3be1817f71efc185ff124b583a8 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 19 Dec 2022 20:04:33 +0100 Subject: [PATCH 4/6] refactor(shared_sub): improve sticky strategy performance * avoid calling ets:select twice for sticky strategy * when the calling process is terminating, no loop-back dispatch --- apps/emqx/src/emqx_shared_sub.erl | 52 ++++++++++++++++++------------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 5b8985fdb..051ba7b11 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -97,6 +97,7 @@ -define(NACK(Reason), {shared_sub_nack, Reason}). -define(NO_ACK, no_ack). -define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). +-define(SUBSCRIBER_DOWN, noproc). -type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()). @@ -262,7 +263,8 @@ redispatch_shared_message(#message{} = Msg) -> %% Note that dispatch is called with self() in failed subs %% This is done to avoid dispatching back to caller Delivery = #delivery{sender = self(), message = Msg}, - FailedSubs = #{self() => sender}, + %% Self is terminating, it makes no sense to loop-back the dispatch + FailedSubs = #{self() => ?SUBSCRIBER_DOWN}, dispatch(Group, Topic, Delivery, FailedSubs). %% @hidden Return the `redispatch_to` group-topic in the message header. @@ -308,7 +310,7 @@ maybe_ack(Msg) -> pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), - All = subscribers(Group, Topic), + All = subscribers(Group, Topic, FailedSubs), case is_active_sub(Sub0, FailedSubs, All) of true -> %% the old subscriber is still alive @@ -316,27 +318,25 @@ pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> {fresh, Sub0}; false -> %% randomly pick one for the first message - {Type, Sub} = do_pick( - random, - ClientId, - SourceTopic, - Group, - Topic, - FailedSubs#{Sub0 => noproc} - ), - %% stick to whatever pick result - erlang:put({shared_sub_sticky, Group, Topic}, Sub), - {Type, Sub} + FailedSubs1 = FailedSubs#{Sub0 => ?SUBSCRIBER_DOWN}, + Res = do_pick(All, random, ClientId, SourceTopic, Group, Topic, FailedSubs1), + case Res of + {_, Sub} -> + %% stick to whatever pick result + erlang:put({shared_sub_sticky, Group, Topic}, Sub); + _ -> + ok + end, + Res end; pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> - do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs). + All = subscribers(Group, Topic, FailedSubs), + do_pick(All, Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs). -do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> - All = subscribers(Group, Topic), +do_pick([], _Strategy, _ClientId, _SourceTopic, _Group, _Topic, _FailedSubs) -> + false; +do_pick(All, Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> case lists:filter(fun(Sub) -> not maps:is_key(Sub, FailedSubs) end, All) of - [] when All =:= [] -> - %% Genuinely no subscriber - false; [] -> %% All offline? pick one anyway {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)}; @@ -383,6 +383,16 @@ do_pick_subscriber(Group, Topic, round_robin_per_group, _ClientId, _SourceTopic, {Group, Topic}, 0 }). +%% Select ETS table to get all subscriber pids which are not down. +subscribers(Group, Topic, FailedSubs) -> + lists:filter( + fun(P) -> + ?SUBSCRIBER_DOWN =/= maps:get(P, FailedSubs, false) + end, + subscribers(Group, Topic) + ). + +%% Select ETS table to get all subscriber pids. subscribers(Group, Topic) -> ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). @@ -523,8 +533,8 @@ update_stats(State) -> %% Return 'true' if the subscriber process is alive AND not in the failed list is_active_sub(Pid, FailedSubs, All) -> lists:member(Pid, All) andalso - is_alive_sub(Pid) andalso - (not maps:is_key(Pid, FailedSubs)). + (not maps:is_key(Pid, FailedSubs)) andalso + is_alive_sub(Pid). %% erlang:is_process_alive/1 does not work with remote pid. is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> From ad3a7939102b6822d2f324ce694916034ef1725b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 19 Dec 2022 20:12:38 +0100 Subject: [PATCH 5/6] fix(shared_sub): insert alive pid table at mnesia table event --- apps/emqx/src/emqx_shared_sub.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 051ba7b11..556347f89 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -446,6 +446,7 @@ handle_info( {mnesia_table_event, {write, #emqx_shared_subscription{subpid = SubPid}, _}}, State = #state{pmon = PMon} ) -> + ok = maybe_insert_alive_tab(SubPid), {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; %% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until %% it `unsubscribed` the last topic. From e932569f342da4b64cb51dc6ae27db8ac3df5bfd Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 19 Dec 2022 20:19:32 +0100 Subject: [PATCH 6/6] refactor: delete stale code 'hash' strategy has been removed from config schema hence no need to keep the compatibility code --- apps/emqx/src/emqx_shared_sub.erl | 5 ----- apps/emqx/test/emqx_shared_sub_SUITE.erl | 6 +++--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 556347f89..e20297703 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -81,8 +81,6 @@ | round_robin_per_group | sticky | local - %% same as hash_clientid, backward compatible - | hash | hash_clientid | hash_topic. @@ -360,9 +358,6 @@ pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs) -> do_pick_subscriber(_Group, _Topic, random, _ClientId, _SourceTopic, Count) -> rand:uniform(Count); -do_pick_subscriber(Group, Topic, hash, ClientId, SourceTopic, Count) -> - %% backward compatible - do_pick_subscriber(Group, Topic, hash_clientid, ClientId, SourceTopic, Count); do_pick_subscriber(_Group, _Topic, hash_clientid, ClientId, _SourceTopic, Count) -> 1 + erlang:phash2(ClientId) rem Count; do_pick_subscriber(_Group, _Topic, hash_topic, _ClientId, SourceTopic, Count) -> diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 1beca6d5a..3455c41f9 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -378,8 +378,8 @@ t_sticky_unsubscribe(Config) when is_list(Config) -> ok. t_hash(Config) when is_list(Config) -> - ok = ensure_config(hash, false), - test_two_messages(hash). + ok = ensure_config(hash_clientid, false), + test_two_messages(hash_clientid). t_hash_clinetid(Config) when is_list(Config) -> ok = ensure_config(hash_clientid, false), @@ -486,7 +486,7 @@ test_two_messages(Strategy, Group) -> sticky -> ?assertEqual(UsedSubPid1, UsedSubPid2); round_robin -> ?assertNotEqual(UsedSubPid1, UsedSubPid2); round_robin_per_group -> ?assertNotEqual(UsedSubPid1, UsedSubPid2); - hash -> ?assertEqual(UsedSubPid1, UsedSubPid2); + hash_clientid -> ?assertEqual(UsedSubPid1, UsedSubPid2); _ -> ok end, ok.