Merge pull request #13344 from thalesmg/20240626-r572-multi-node-bulk-subscribe

fix(client mgmt api): make bulk subscribe work again in clusters
This commit is contained in:
Thales Macedo Garitezi 2024-06-28 12:08:16 -03:00 committed by GitHub
commit 063e7657b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 164 additions and 17 deletions

View File

@ -2,7 +2,7 @@
{application, emqx_management, [ {application, emqx_management, [
{description, "EMQX Management API and CLI"}, {description, "EMQX Management API and CLI"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.2.2"}, {vsn, "5.2.3"},
{modules, []}, {modules, []},
{registered, [emqx_management_sup]}, {registered, [emqx_management_sup]},
{applications, [ {applications, [

View File

@ -567,7 +567,18 @@ list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
subscribe(ClientId, TopicTables) -> subscribe(ClientId, TopicTables) ->
subscribe(emqx:running_nodes(), ClientId, TopicTables). case emqx_cm_registry:is_enabled() of
false ->
subscribe(emqx:running_nodes(), ClientId, TopicTables);
true ->
with_client_node(
ClientId,
{error, channel_not_found},
fun(Node) ->
subscribe([Node], ClientId, TopicTables)
end
)
end.
subscribe([Node | Nodes], ClientId, TopicTables) -> subscribe([Node | Nodes], ClientId, TopicTables) ->
case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of
@ -615,7 +626,18 @@ do_unsubscribe(ClientId, Topic) ->
-spec unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) -> -spec unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) ->
{unsubscribe, _} | {error, channel_not_found}. {unsubscribe, _} | {error, channel_not_found}.
unsubscribe_batch(ClientId, Topics) -> unsubscribe_batch(ClientId, Topics) ->
unsubscribe_batch(emqx:running_nodes(), ClientId, Topics). case emqx_cm_registry:is_enabled() of
false ->
unsubscribe_batch(emqx:running_nodes(), ClientId, Topics);
true ->
with_client_node(
ClientId,
{error, channel_not_found},
fun(Node) ->
unsubscribe_batch([Node], ClientId, Topics)
end
)
end.
-spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) -> -spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
{unsubscribe_batch, _} | {error, channel_not_found}. {unsubscribe_batch, _} | {error, channel_not_found}.
@ -691,15 +713,33 @@ delete_banned(Who) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
lookup_running_client(ClientId, FormatFun) -> lookup_running_client(ClientId, FormatFun) ->
case emqx_cm_registry:is_enabled() of
false ->
lists:append([ lists:append([
lookup_client(Node, {clientid, ClientId}, FormatFun) lookup_client(Node, {clientid, ClientId}, FormatFun)
|| Node <- emqx:running_nodes() || Node <- emqx:running_nodes()
]). ]);
true ->
with_client_node(
ClientId,
_WhenNotFound = [],
fun(Node) -> lookup_client(Node, {clientid, ClientId}, FormatFun) end
)
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal Functions. %% Internal Functions.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
with_client_node(ClientId, WhenNotFound, Fn) ->
case emqx_cm_registry:lookup_channels(ClientId) of
[ChanPid | _] ->
Node = node(ChanPid),
Fn(Node);
[] ->
WhenNotFound
end.
unwrap_rpc({badrpc, Reason}) -> unwrap_rpc({badrpc, Reason}) ->
{error, Reason}; {error, Reason};
unwrap_rpc(Res) -> unwrap_rpc(Res) ->

View File

@ -1224,10 +1224,20 @@ subscribe(#{clientid := ClientID, topic := Topic} = Sub) ->
end. end.
subscribe_batch(#{clientid := ClientID, topics := Topics}) -> subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
%% We use emqx_channel instead of emqx_channel_info (used by the emqx_mgmt:lookup_client/2), %% On the one hand, we first try to use `emqx_channel' instead of `emqx_channel_info'
%% as the emqx_channel_info table will only be populated after the hook `client.connected` %% (used by the `emqx_mgmt:lookup_client/2'), as the `emqx_channel_info' table will
%% has returned. So if one want to subscribe topics in this hook, it will fail. %% only be populated after the hook `client.connected' has returned. So if one want to
case ets:lookup(?CHAN_TAB, ClientID) of %% subscribe topics in this hook, it will fail.
%% ... On the other hand, using only `emqx_channel' would render this API unusable if
%% called from a node that doesn't have hold the targeted client connection, so we
%% fall back to `emqx_mgmt:lookup_client/2', which consults the global registry.
Result1 = ets:lookup(?CHAN_TAB, ClientID),
Result =
case Result1 of
[] -> emqx_mgmt:lookup_client({clientid, ClientID}, _FormatFn = undefined);
_ -> Result1
end,
case Result of
[] -> [] ->
{404, ?CLIENTID_NOT_FOUND}; {404, ?CLIENTID_NOT_FOUND};
_ -> _ ->

View File

@ -180,9 +180,14 @@ t_lookup_client(_Config) ->
), ),
?assertEqual([], emqx_mgmt:lookup_client({clientid, <<"notfound">>}, ?FORMATFUN)), ?assertEqual([], emqx_mgmt:lookup_client({clientid, <<"notfound">>}, ?FORMATFUN)),
meck:expect(emqx, running_nodes, 0, [node(), 'fake@nonode']), meck:expect(emqx, running_nodes, 0, [node(), 'fake@nonode']),
try
emqx:update_config([broker, enable_session_registry], false),
?assertMatch( ?assertMatch(
[_ | {error, nodedown}], emqx_mgmt:lookup_client({clientid, <<"client1">>}, ?FORMATFUN) [_ | {error, nodedown}], emqx_mgmt:lookup_client({clientid, <<"client1">>}, ?FORMATFUN)
). )
after
emqx:update_config([broker, enable_session_registry], true)
end.
t_kickout_client(init, Config) -> t_kickout_client(init, Config) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),

View File

@ -29,14 +29,18 @@ all() ->
AllTCs = emqx_common_test_helpers:all(?MODULE), AllTCs = emqx_common_test_helpers:all(?MODULE),
[ [
{group, persistent_sessions}, {group, persistent_sessions},
{group, non_persistent_cluster},
{group, msgs_base64_encoding}, {group, msgs_base64_encoding},
{group, msgs_plain_encoding} {group, msgs_plain_encoding}
| AllTCs -- (persistent_session_testcases() ++ client_msgs_testcases()) | AllTCs --
(persistent_session_testcases() ++
non_persistent_cluster_testcases() ++ client_msgs_testcases())
]. ].
groups() -> groups() ->
[ [
{persistent_sessions, persistent_session_testcases()}, {persistent_sessions, persistent_session_testcases()},
{non_persistent_cluster, non_persistent_cluster_testcases()},
{msgs_base64_encoding, client_msgs_testcases()}, {msgs_base64_encoding, client_msgs_testcases()},
{msgs_plain_encoding, client_msgs_testcases()} {msgs_plain_encoding, client_msgs_testcases()}
]. ].
@ -52,6 +56,10 @@ persistent_session_testcases() ->
t_persistent_sessions_subscriptions1, t_persistent_sessions_subscriptions1,
t_list_clients_v2 t_list_clients_v2
]. ].
non_persistent_cluster_testcases() ->
[
t_bulk_subscribe
].
client_msgs_testcases() -> client_msgs_testcases() ->
[ [
t_inflight_messages, t_inflight_messages,
@ -96,6 +104,24 @@ init_per_group(persistent_sessions, Config) ->
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}
), ),
[{nodes, Nodes} | Config]; [{nodes, Nodes} | Config];
init_per_group(non_persistent_cluster, Config) ->
AppSpecs = [
emqx,
emqx_conf,
emqx_management
],
Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(
"dashboard.listeners.http { enable = true, bind = 18084 }"
),
Cluster = [
{mgmt_api_clients_SUITE1, #{role => core, apps => AppSpecs ++ [Dashboard]}},
{mgmt_api_clients_SUITE2, #{role => core, apps => AppSpecs}}
],
Nodes = emqx_cth_cluster:start(
Cluster,
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{nodes, Nodes} | Config];
init_per_group(msgs_base64_encoding, Config) -> init_per_group(msgs_base64_encoding, Config) ->
[{payload_encoding, base64} | Config]; [{payload_encoding, base64} | Config];
init_per_group(msgs_plain_encoding, Config) -> init_per_group(msgs_plain_encoding, Config) ->
@ -103,7 +129,10 @@ init_per_group(msgs_plain_encoding, Config) ->
init_per_group(_Group, Config) -> init_per_group(_Group, Config) ->
Config. Config.
end_per_group(persistent_sessions, Config) -> end_per_group(Group, Config) when
Group =:= persistent_sessions;
Group =:= non_persistent_cluster
->
Nodes = ?config(nodes, Config), Nodes = ?config(nodes, Config),
emqx_cth_cluster:stop(Nodes), emqx_cth_cluster:stop(Nodes),
ok; ok;
@ -1572,6 +1601,42 @@ t_subscribe_shared_topic_nl(_Config) ->
PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1}) PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1})
). ).
%% Checks that we can use the bulk subscribe API on a different node than the one a client
%% is connected to.
t_bulk_subscribe(Config) ->
[N1, N2] = ?config(nodes, Config),
APIPort = 18084,
Port1 = get_mqtt_port(N1, tcp),
Port2 = get_mqtt_port(N2, tcp),
?check_trace(
begin
ClientId1 = <<"bulk-sub1">>,
_C1 = connect_client(#{port => Port2, clientid => ClientId1, clean_start => true}),
ClientId2 = <<"bulk-sub2">>,
C2 = connect_client(#{port => Port1, clientid => ClientId2, clean_start => true}),
Topic = <<"testtopic">>,
BulkSub = [#{topic => Topic, qos => 1, nl => 1, rh => 1}],
?assertMatch({200, [_]}, bulk_subscribe_request(APIPort, ClientId1, BulkSub)),
?assertMatch(
{200, [_]},
get_subscriptions_request(APIPort, ClientId1, #{simplify_result => true})
),
{ok, _} = emqtt:publish(C2, Topic, <<"hi1">>, [{qos, 1}]),
?assertReceive({publish, #{topic := Topic, payload := <<"hi1">>}}),
BulkUnsub = [#{topic => Topic}],
?assertMatch({204, _}, bulk_unsubscribe_request(APIPort, ClientId1, BulkUnsub)),
?assertMatch(
{200, []},
get_subscriptions_request(APIPort, ClientId1, #{simplify_result => true})
),
{ok, _} = emqtt:publish(C2, Topic, <<"hi2">>, [{qos, 1}]),
?assertNotReceive({publish, _}),
ok
end,
[]
),
ok.
t_list_clients_v2(Config) -> t_list_clients_v2(Config) ->
[N1, N2] = ?config(nodes, Config), [N1, N2] = ?config(nodes, Config),
APIPort = 18084, APIPort = 18084,
@ -1935,9 +2000,17 @@ maybe_json_decode(X) ->
end. end.
get_subscriptions_request(APIPort, ClientId) -> get_subscriptions_request(APIPort, ClientId) ->
get_subscriptions_request(APIPort, ClientId, _Opts = #{}).
get_subscriptions_request(APIPort, ClientId, Opts) ->
Simplify = maps:get(simplify_result, Opts, false),
Host = "http://127.0.0.1:" ++ integer_to_list(APIPort), Host = "http://127.0.0.1:" ++ integer_to_list(APIPort),
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "subscriptions"]), Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "subscriptions"]),
request(get, Path, []). Res = request(get, Path, []),
case Simplify of
true -> simplify_result(Res);
false -> Res
end.
get_client_request(Port, ClientId) -> get_client_request(Port, ClientId) ->
Host = "http://127.0.0.1:" ++ integer_to_list(Port), Host = "http://127.0.0.1:" ++ integer_to_list(Port),
@ -1952,6 +2025,24 @@ list_request(Port, QueryParams) ->
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients"]), Path = emqx_mgmt_api_test_util:api_path(Host, ["clients"]),
request(get, Path, [], QueryParams). request(get, Path, [], QueryParams).
bulk_subscribe_request(Port, ClientId, Body) ->
Host = "http://127.0.0.1:" ++ integer_to_list(Port),
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "subscribe", "bulk"]),
simplify_result(request(post, Path, Body)).
bulk_unsubscribe_request(Port, ClientId, Body) ->
Host = "http://127.0.0.1:" ++ integer_to_list(Port),
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "unsubscribe", "bulk"]),
simplify_result(request(post, Path, Body)).
simplify_result(Res) ->
case Res of
{error, {{_, Status, _}, _, Body}} ->
{Status, Body};
{ok, {{_, Status, _}, _, Body}} ->
{Status, Body}
end.
list_v2_request(Port, QueryParams = #{}) -> list_v2_request(Port, QueryParams = #{}) ->
Host = "http://127.0.0.1:" ++ integer_to_list(Port), Host = "http://127.0.0.1:" ++ integer_to_list(Port),
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients_v2"]), Path = emqx_mgmt_api_test_util:api_path(Host, ["clients_v2"]),

View File

@ -0,0 +1 @@
Fixed an issue that prevented the `POST /clients/:clientid/subscribe/bulk` API from working properly if the node receiving the API request did not hold the connection to the targeted clientid.