diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index aeee7cad7..c22793cf0 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -2,7 +2,7 @@ {application, emqx_management, [ {description, "EMQX Management API and CLI"}, % strict semver, bump manually! - {vsn, "5.2.2"}, + {vsn, "5.2.3"}, {modules, []}, {registered, [emqx_management_sup]}, {applications, [ diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 547324925..810ccf08f 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1224,10 +1224,20 @@ subscribe(#{clientid := ClientID, topic := Topic} = Sub) -> end. subscribe_batch(#{clientid := ClientID, topics := Topics}) -> - %% We use emqx_channel instead of emqx_channel_info (used by the emqx_mgmt:lookup_client/2), - %% as the emqx_channel_info table will only be populated after the hook `client.connected` - %% has returned. So if one want to subscribe topics in this hook, it will fail. - case ets:lookup(?CHAN_TAB, ClientID) of + %% On the one hand, we first try to use `emqx_channel' instead of `emqx_channel_info' + %% (used by the `emqx_mgmt:lookup_client/2'), as the `emqx_channel_info' table will + %% only be populated after the hook `client.connected' has returned. So if one want to + %% subscribe topics in this hook, it will fail. + %% ... On the other hand, using only `emqx_channel' would render this API unusable if + %% called from a node that doesn't have hold the targeted client connection, so we + %% fall back to `emqx_mgmt:lookup_client/2', which consults the global registry. + Result1 = ets:lookup(?CHAN_TAB, ClientID), + Result = + case Result1 of + [] -> emqx_mgmt:lookup_client({clientid, ClientID}, _FormatFn = undefined); + _ -> Result1 + end, + case Result of [] -> {404, ?CLIENTID_NOT_FOUND}; _ -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 9557c3214..b9fcdfe72 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -29,14 +29,18 @@ all() -> AllTCs = emqx_common_test_helpers:all(?MODULE), [ {group, persistent_sessions}, + {group, non_persistent_cluster}, {group, msgs_base64_encoding}, {group, msgs_plain_encoding} - | AllTCs -- (persistent_session_testcases() ++ client_msgs_testcases()) + | AllTCs -- + (persistent_session_testcases() ++ + non_persistent_cluster_testcases() ++ client_msgs_testcases()) ]. groups() -> [ {persistent_sessions, persistent_session_testcases()}, + {non_persistent_cluster, non_persistent_cluster_testcases()}, {msgs_base64_encoding, client_msgs_testcases()}, {msgs_plain_encoding, client_msgs_testcases()} ]. @@ -52,6 +56,10 @@ persistent_session_testcases() -> t_persistent_sessions_subscriptions1, t_list_clients_v2 ]. +non_persistent_cluster_testcases() -> + [ + t_bulk_subscribe + ]. client_msgs_testcases() -> [ t_inflight_messages, @@ -96,6 +104,24 @@ init_per_group(persistent_sessions, Config) -> #{work_dir => emqx_cth_suite:work_dir(Config)} ), [{nodes, Nodes} | Config]; +init_per_group(non_persistent_cluster, Config) -> + AppSpecs = [ + emqx, + emqx_conf, + emqx_management + ], + Dashboard = emqx_mgmt_api_test_util:emqx_dashboard( + "dashboard.listeners.http { enable = true, bind = 18084 }" + ), + Cluster = [ + {mgmt_api_clients_SUITE1, #{role => core, apps => AppSpecs ++ [Dashboard]}}, + {mgmt_api_clients_SUITE2, #{role => core, apps => AppSpecs}} + ], + Nodes = emqx_cth_cluster:start( + Cluster, + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{nodes, Nodes} | Config]; init_per_group(msgs_base64_encoding, Config) -> [{payload_encoding, base64} | Config]; init_per_group(msgs_plain_encoding, Config) -> @@ -103,7 +129,10 @@ init_per_group(msgs_plain_encoding, Config) -> init_per_group(_Group, Config) -> Config. -end_per_group(persistent_sessions, Config) -> +end_per_group(Group, Config) when + Group =:= persistent_sessions; + Group =:= non_persistent_cluster +-> Nodes = ?config(nodes, Config), emqx_cth_cluster:stop(Nodes), ok; @@ -1572,6 +1601,42 @@ t_subscribe_shared_topic_nl(_Config) -> PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1}) ). +%% Checks that we can use the bulk subscribe API on a different node than the one a client +%% is connected to. +t_bulk_subscribe(Config) -> + [N1, N2] = ?config(nodes, Config), + APIPort = 18084, + Port1 = get_mqtt_port(N1, tcp), + Port2 = get_mqtt_port(N2, tcp), + ?check_trace( + begin + ClientId1 = <<"bulk-sub1">>, + _C1 = connect_client(#{port => Port2, clientid => ClientId1, clean_start => true}), + ClientId2 = <<"bulk-sub2">>, + C2 = connect_client(#{port => Port1, clientid => ClientId2, clean_start => true}), + Topic = <<"testtopic">>, + BulkSub = [#{topic => Topic, qos => 1, nl => 1, rh => 1}], + ?assertMatch({200, [_]}, bulk_subscribe_request(APIPort, ClientId1, BulkSub)), + ?assertMatch( + {200, [_]}, + get_subscriptions_request(APIPort, ClientId1, #{simplify_result => true}) + ), + {ok, _} = emqtt:publish(C2, Topic, <<"hi1">>, [{qos, 1}]), + ?assertReceive({publish, #{topic := Topic, payload := <<"hi1">>}}), + BulkUnsub = [#{topic => Topic}], + ?assertMatch({204, _}, bulk_unsubscribe_request(APIPort, ClientId1, BulkUnsub)), + ?assertMatch( + {200, []}, + get_subscriptions_request(APIPort, ClientId1, #{simplify_result => true}) + ), + {ok, _} = emqtt:publish(C2, Topic, <<"hi2">>, [{qos, 1}]), + ?assertNotReceive({publish, _}), + ok + end, + [] + ), + ok. + t_list_clients_v2(Config) -> [N1, N2] = ?config(nodes, Config), APIPort = 18084, @@ -1935,9 +2000,17 @@ maybe_json_decode(X) -> end. get_subscriptions_request(APIPort, ClientId) -> + get_subscriptions_request(APIPort, ClientId, _Opts = #{}). + +get_subscriptions_request(APIPort, ClientId, Opts) -> + Simplify = maps:get(simplify_result, Opts, false), Host = "http://127.0.0.1:" ++ integer_to_list(APIPort), Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "subscriptions"]), - request(get, Path, []). + Res = request(get, Path, []), + case Simplify of + true -> simplify_result(Res); + false -> Res + end. get_client_request(Port, ClientId) -> Host = "http://127.0.0.1:" ++ integer_to_list(Port), @@ -1952,6 +2025,24 @@ list_request(Port, QueryParams) -> Path = emqx_mgmt_api_test_util:api_path(Host, ["clients"]), request(get, Path, [], QueryParams). +bulk_subscribe_request(Port, ClientId, Body) -> + Host = "http://127.0.0.1:" ++ integer_to_list(Port), + Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "subscribe", "bulk"]), + simplify_result(request(post, Path, Body)). + +bulk_unsubscribe_request(Port, ClientId, Body) -> + Host = "http://127.0.0.1:" ++ integer_to_list(Port), + Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "unsubscribe", "bulk"]), + simplify_result(request(post, Path, Body)). + +simplify_result(Res) -> + case Res of + {error, {{_, Status, _}, _, Body}} -> + {Status, Body}; + {ok, {{_, Status, _}, _, Body}} -> + {Status, Body} + end. + list_v2_request(Port, QueryParams = #{}) -> Host = "http://127.0.0.1:" ++ integer_to_list(Port), Path = emqx_mgmt_api_test_util:api_path(Host, ["clients_v2"]), diff --git a/changes/ce/fix-13344.en.md b/changes/ce/fix-13344.en.md new file mode 100644 index 000000000..58882651f --- /dev/null +++ b/changes/ce/fix-13344.en.md @@ -0,0 +1 @@ +Fixed an issue that prevented the `POST /clients/:clientid/subscribe/bulk` API from working properly if the node receiving the API request did not hold the connection to the targeted clientid.