From 7b7f44b9ac397ed64dca407ed29b1c3ca900b4fd Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 26 Jun 2024 11:26:01 -0300 Subject: [PATCH 1/4] fix(client mgmt api): make bulk subscribe work again in clusters Fixes https://emqx.atlassian.net/browse/EMQX-12337 --- .../src/emqx_management.app.src | 2 +- .../src/emqx_mgmt_api_clients.erl | 18 +++- .../test/emqx_mgmt_api_clients_SUITE.erl | 97 ++++++++++++++++++- changes/ce/fix-13344.en.md | 1 + 4 files changed, 110 insertions(+), 8 deletions(-) create mode 100644 changes/ce/fix-13344.en.md diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index aeee7cad7..c22793cf0 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -2,7 +2,7 @@ {application, emqx_management, [ {description, "EMQX Management API and CLI"}, % strict semver, bump manually! - {vsn, "5.2.2"}, + {vsn, "5.2.3"}, {modules, []}, {registered, [emqx_management_sup]}, {applications, [ diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 547324925..810ccf08f 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1224,10 +1224,20 @@ subscribe(#{clientid := ClientID, topic := Topic} = Sub) -> end. subscribe_batch(#{clientid := ClientID, topics := Topics}) -> - %% We use emqx_channel instead of emqx_channel_info (used by the emqx_mgmt:lookup_client/2), - %% as the emqx_channel_info table will only be populated after the hook `client.connected` - %% has returned. So if one want to subscribe topics in this hook, it will fail. - case ets:lookup(?CHAN_TAB, ClientID) of + %% On the one hand, we first try to use `emqx_channel' instead of `emqx_channel_info' + %% (used by the `emqx_mgmt:lookup_client/2'), as the `emqx_channel_info' table will + %% only be populated after the hook `client.connected' has returned. So if one want to + %% subscribe topics in this hook, it will fail. + %% ... On the other hand, using only `emqx_channel' would render this API unusable if + %% called from a node that doesn't have hold the targeted client connection, so we + %% fall back to `emqx_mgmt:lookup_client/2', which consults the global registry. + Result1 = ets:lookup(?CHAN_TAB, ClientID), + Result = + case Result1 of + [] -> emqx_mgmt:lookup_client({clientid, ClientID}, _FormatFn = undefined); + _ -> Result1 + end, + case Result of [] -> {404, ?CLIENTID_NOT_FOUND}; _ -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 9557c3214..b9fcdfe72 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -29,14 +29,18 @@ all() -> AllTCs = emqx_common_test_helpers:all(?MODULE), [ {group, persistent_sessions}, + {group, non_persistent_cluster}, {group, msgs_base64_encoding}, {group, msgs_plain_encoding} - | AllTCs -- (persistent_session_testcases() ++ client_msgs_testcases()) + | AllTCs -- + (persistent_session_testcases() ++ + non_persistent_cluster_testcases() ++ client_msgs_testcases()) ]. groups() -> [ {persistent_sessions, persistent_session_testcases()}, + {non_persistent_cluster, non_persistent_cluster_testcases()}, {msgs_base64_encoding, client_msgs_testcases()}, {msgs_plain_encoding, client_msgs_testcases()} ]. @@ -52,6 +56,10 @@ persistent_session_testcases() -> t_persistent_sessions_subscriptions1, t_list_clients_v2 ]. +non_persistent_cluster_testcases() -> + [ + t_bulk_subscribe + ]. client_msgs_testcases() -> [ t_inflight_messages, @@ -96,6 +104,24 @@ init_per_group(persistent_sessions, Config) -> #{work_dir => emqx_cth_suite:work_dir(Config)} ), [{nodes, Nodes} | Config]; +init_per_group(non_persistent_cluster, Config) -> + AppSpecs = [ + emqx, + emqx_conf, + emqx_management + ], + Dashboard = emqx_mgmt_api_test_util:emqx_dashboard( + "dashboard.listeners.http { enable = true, bind = 18084 }" + ), + Cluster = [ + {mgmt_api_clients_SUITE1, #{role => core, apps => AppSpecs ++ [Dashboard]}}, + {mgmt_api_clients_SUITE2, #{role => core, apps => AppSpecs}} + ], + Nodes = emqx_cth_cluster:start( + Cluster, + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{nodes, Nodes} | Config]; init_per_group(msgs_base64_encoding, Config) -> [{payload_encoding, base64} | Config]; init_per_group(msgs_plain_encoding, Config) -> @@ -103,7 +129,10 @@ init_per_group(msgs_plain_encoding, Config) -> init_per_group(_Group, Config) -> Config. -end_per_group(persistent_sessions, Config) -> +end_per_group(Group, Config) when + Group =:= persistent_sessions; + Group =:= non_persistent_cluster +-> Nodes = ?config(nodes, Config), emqx_cth_cluster:stop(Nodes), ok; @@ -1572,6 +1601,42 @@ t_subscribe_shared_topic_nl(_Config) -> PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1}) ). +%% Checks that we can use the bulk subscribe API on a different node than the one a client +%% is connected to. +t_bulk_subscribe(Config) -> + [N1, N2] = ?config(nodes, Config), + APIPort = 18084, + Port1 = get_mqtt_port(N1, tcp), + Port2 = get_mqtt_port(N2, tcp), + ?check_trace( + begin + ClientId1 = <<"bulk-sub1">>, + _C1 = connect_client(#{port => Port2, clientid => ClientId1, clean_start => true}), + ClientId2 = <<"bulk-sub2">>, + C2 = connect_client(#{port => Port1, clientid => ClientId2, clean_start => true}), + Topic = <<"testtopic">>, + BulkSub = [#{topic => Topic, qos => 1, nl => 1, rh => 1}], + ?assertMatch({200, [_]}, bulk_subscribe_request(APIPort, ClientId1, BulkSub)), + ?assertMatch( + {200, [_]}, + get_subscriptions_request(APIPort, ClientId1, #{simplify_result => true}) + ), + {ok, _} = emqtt:publish(C2, Topic, <<"hi1">>, [{qos, 1}]), + ?assertReceive({publish, #{topic := Topic, payload := <<"hi1">>}}), + BulkUnsub = [#{topic => Topic}], + ?assertMatch({204, _}, bulk_unsubscribe_request(APIPort, ClientId1, BulkUnsub)), + ?assertMatch( + {200, []}, + get_subscriptions_request(APIPort, ClientId1, #{simplify_result => true}) + ), + {ok, _} = emqtt:publish(C2, Topic, <<"hi2">>, [{qos, 1}]), + ?assertNotReceive({publish, _}), + ok + end, + [] + ), + ok. + t_list_clients_v2(Config) -> [N1, N2] = ?config(nodes, Config), APIPort = 18084, @@ -1935,9 +2000,17 @@ maybe_json_decode(X) -> end. get_subscriptions_request(APIPort, ClientId) -> + get_subscriptions_request(APIPort, ClientId, _Opts = #{}). + +get_subscriptions_request(APIPort, ClientId, Opts) -> + Simplify = maps:get(simplify_result, Opts, false), Host = "http://127.0.0.1:" ++ integer_to_list(APIPort), Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "subscriptions"]), - request(get, Path, []). + Res = request(get, Path, []), + case Simplify of + true -> simplify_result(Res); + false -> Res + end. get_client_request(Port, ClientId) -> Host = "http://127.0.0.1:" ++ integer_to_list(Port), @@ -1952,6 +2025,24 @@ list_request(Port, QueryParams) -> Path = emqx_mgmt_api_test_util:api_path(Host, ["clients"]), request(get, Path, [], QueryParams). +bulk_subscribe_request(Port, ClientId, Body) -> + Host = "http://127.0.0.1:" ++ integer_to_list(Port), + Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "subscribe", "bulk"]), + simplify_result(request(post, Path, Body)). + +bulk_unsubscribe_request(Port, ClientId, Body) -> + Host = "http://127.0.0.1:" ++ integer_to_list(Port), + Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "unsubscribe", "bulk"]), + simplify_result(request(post, Path, Body)). + +simplify_result(Res) -> + case Res of + {error, {{_, Status, _}, _, Body}} -> + {Status, Body}; + {ok, {{_, Status, _}, _, Body}} -> + {Status, Body} + end. + list_v2_request(Port, QueryParams = #{}) -> Host = "http://127.0.0.1:" ++ integer_to_list(Port), Path = emqx_mgmt_api_test_util:api_path(Host, ["clients_v2"]), diff --git a/changes/ce/fix-13344.en.md b/changes/ce/fix-13344.en.md new file mode 100644 index 000000000..58882651f --- /dev/null +++ b/changes/ce/fix-13344.en.md @@ -0,0 +1 @@ +Fixed an issue that prevented the `POST /clients/:clientid/subscribe/bulk` API from working properly if the node receiving the API request did not hold the connection to the targeted clientid. From c49900af502ed9521eebaac4a61b9024c4496d8c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 27 Jun 2024 09:37:57 -0300 Subject: [PATCH 2/4] perf(mgmt): optimize clientid lookup when registry is enabled --- apps/emqx_management/src/emqx_mgmt.erl | 19 +++++++++++++++---- apps/emqx_management/test/emqx_mgmt_SUITE.erl | 11 ++++++++--- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 95303a1e6..1ea88e9e5 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -691,10 +691,21 @@ delete_banned(Who) -> %%-------------------------------------------------------------------- lookup_running_client(ClientId, FormatFun) -> - lists:append([ - lookup_client(Node, {clientid, ClientId}, FormatFun) - || Node <- emqx:running_nodes() - ]). + case emqx_cm_registry:is_enabled() of + false -> + lists:append([ + lookup_client(Node, {clientid, ClientId}, FormatFun) + || Node <- emqx:running_nodes() + ]); + true -> + case emqx_cm_registry:lookup_channels(ClientId) of + [ChanPid | _] -> + Node = node(ChanPid), + lookup_client(Node, {clientid, ClientId}, FormatFun); + [] -> + [] + end + end. %%-------------------------------------------------------------------- %% Internal Functions. diff --git a/apps/emqx_management/test/emqx_mgmt_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_SUITE.erl index e5de64b5a..3d7924f35 100644 --- a/apps/emqx_management/test/emqx_mgmt_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_SUITE.erl @@ -180,9 +180,14 @@ t_lookup_client(_Config) -> ), ?assertEqual([], emqx_mgmt:lookup_client({clientid, <<"notfound">>}, ?FORMATFUN)), meck:expect(emqx, running_nodes, 0, [node(), 'fake@nonode']), - ?assertMatch( - [_ | {error, nodedown}], emqx_mgmt:lookup_client({clientid, <<"client1">>}, ?FORMATFUN) - ). + try + emqx:update_config([broker, enable_session_registry], false), + ?assertMatch( + [_ | {error, nodedown}], emqx_mgmt:lookup_client({clientid, <<"client1">>}, ?FORMATFUN) + ) + after + emqx:update_config([broker, enable_session_registry], true) + end. t_kickout_client(init, Config) -> process_flag(trap_exit, true), From 0b329dbf063c1e67bdb520a8050096740532c495 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 27 Jun 2024 09:50:04 -0300 Subject: [PATCH 3/4] perf(mgmt): optimize bulk subscribe when registry is enabled --- apps/emqx_management/src/emqx_mgmt.erl | 34 ++++++++++++++++++++------ 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 1ea88e9e5..afb88692e 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -567,7 +567,18 @@ list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) -> %%-------------------------------------------------------------------- subscribe(ClientId, TopicTables) -> - subscribe(emqx:running_nodes(), ClientId, TopicTables). + case emqx_cm_registry:is_enabled() of + false -> + subscribe(emqx:running_nodes(), ClientId, TopicTables); + true -> + with_client_node( + ClientId, + {error, channel_not_found}, + fun(Node) -> + subscribe([Node], ClientId, TopicTables) + end + ) + end. subscribe([Node | Nodes], ClientId, TopicTables) -> case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of @@ -698,19 +709,26 @@ lookup_running_client(ClientId, FormatFun) -> || Node <- emqx:running_nodes() ]); true -> - case emqx_cm_registry:lookup_channels(ClientId) of - [ChanPid | _] -> - Node = node(ChanPid), - lookup_client(Node, {clientid, ClientId}, FormatFun); - [] -> - [] - end + with_client_node( + ClientId, + _WhenNotFound = [], + fun(Node) -> lookup_client(Node, {clientid, ClientId}, FormatFun) end + ) end. %%-------------------------------------------------------------------- %% Internal Functions. %%-------------------------------------------------------------------- +with_client_node(ClientId, WhenNotFound, Fn) -> + case emqx_cm_registry:lookup_channels(ClientId) of + [ChanPid | _] -> + Node = node(ChanPid), + Fn(Node); + [] -> + WhenNotFound + end. + unwrap_rpc({badrpc, Reason}) -> {error, Reason}; unwrap_rpc(Res) -> From e76e94b497ec7af3186d91671148fd94519d65c5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 27 Jun 2024 09:52:32 -0300 Subject: [PATCH 4/4] perf(mgmt): optimize bulk unsubscribe when registry is enabled --- apps/emqx_management/src/emqx_mgmt.erl | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index afb88692e..7631426c5 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -626,7 +626,18 @@ do_unsubscribe(ClientId, Topic) -> -spec unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) -> {unsubscribe, _} | {error, channel_not_found}. unsubscribe_batch(ClientId, Topics) -> - unsubscribe_batch(emqx:running_nodes(), ClientId, Topics). + case emqx_cm_registry:is_enabled() of + false -> + unsubscribe_batch(emqx:running_nodes(), ClientId, Topics); + true -> + with_client_node( + ClientId, + {error, channel_not_found}, + fun(Node) -> + unsubscribe_batch([Node], ClientId, Topics) + end + ) + end. -spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) -> {unsubscribe_batch, _} | {error, channel_not_found}.