diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 992905e48..5f5c4a60f 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -461,6 +461,14 @@ set_keepalive(_ClientId, _Interval) -> %% @private call_client(ClientId, Req) -> + case emqx_cm_registry:is_enabled() of + true -> + do_call_client(ClientId, Req); + false -> + call_client_on_all_nodes(ClientId, Req) + end. + +call_client_on_all_nodes(ClientId, Req) -> Results = [call_client(Node, ClientId, Req) || Node <- emqx:running_nodes()], Expected = lists:filter( fun diff --git a/apps/emqx_management/test/emqx_mgmt_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_SUITE.erl index 9ce737353..44d87c8be 100644 --- a/apps/emqx_management/test/emqx_mgmt_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_SUITE.erl @@ -28,14 +28,19 @@ all() -> [ {group, persistence_disabled}, - {group, persistence_enabled} + {group, persistence_enabled}, + {group, cm_registry_enabled}, + {group, cm_registry_disabled} ]. groups() -> - TCs = emqx_common_test_helpers:all(?MODULE), + CMRegistryTCs = [t_call_client_cluster], + TCs = emqx_common_test_helpers:all(?MODULE) -- CMRegistryTCs, [ {persistence_disabled, [], TCs}, - {persistence_enabled, [], [t_persist_list_subs]} + {persistence_enabled, [], [t_persist_list_subs]}, + {cm_registry_enabled, CMRegistryTCs}, + {cm_registry_disabled, CMRegistryTCs} ]. init_per_group(persistence_disabled, Config) -> @@ -66,10 +71,17 @@ init_per_group(persistence_enabled, Config) -> [ {apps, Apps} | Config - ]. + ]; +init_per_group(cm_registry_enabled, Config) -> + [{emqx_config, "broker.enable_session_registry = true"} | Config]; +init_per_group(cm_registry_disabled, Config) -> + [{emqx_config, "broker.enable_session_registry = false"} | Config]. end_per_group(_Grp, Config) -> - emqx_cth_suite:stop(?config(apps, Config)). + case ?config(apps, Config) of + undefined -> ok; + Apps -> emqx_cth_suite:stop(Apps) + end. init_per_suite(Config) -> Config. @@ -447,6 +459,38 @@ t_persist_list_subs(_) -> %% clients: VerifySubs(). +t_call_client_cluster(Config) -> + [Node1, Node2] = ?config(cluster, Config), + {ok, Node1Client, Node1ClientId} = connect_client(Node1), + {ok, Node2Client, Node2ClientId} = connect_client(Node2), + ?assertMatch( + {[], #{}}, rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_args(Node1ClientId)) + ), + ?assertMatch( + {[], #{}}, rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(Node2ClientId)) + ), + ?assertMatch( + {[], #{}}, rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_args(Node2ClientId)) + ), + ?assertMatch( + {[], #{}}, rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(Node1ClientId)) + ), + _ = emqtt:stop(Node1Client), + _ = emqtt:stop(Node2Client). + +t_call_client_cluster(init, Config) -> + Apps = [{emqx, ?config(emqx_config, Config)}, emqx_management], + Cluster = emqx_cth_cluster:start( + [ + {list_to_atom(atom_to_list(?MODULE) ++ "1"), #{role => core, apps => Apps}}, + {list_to_atom(atom_to_list(?MODULE) ++ "2"), #{role => core, apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + [{cluster, Cluster} | Config]; +t_call_client_cluster('end', Config) -> + emqx_cth_cluster:stop(?config(cluster, Config)). + %%% helpers ident(Arg) -> Arg. @@ -462,3 +506,21 @@ setup_clients(Config) -> disconnect_clients(Config) -> Clients = ?config(clients, Config), lists:foreach(fun emqtt:disconnect/1, Clients). + +get_mqtt_port(Node) -> + {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]), + Port. + +connect_client(Node) -> + Port = get_mqtt_port(Node), + ClientId = <<(atom_to_binary(Node))/binary, "_client">>, + {ok, Client} = emqtt:start_link([ + {port, Port}, + {proto_ver, v5}, + {clientid, ClientId} + ]), + {ok, _} = emqtt:connect(Client), + {ok, Client, ClientId}. + +client_msgs_args(ClientId) -> + [mqueue_msgs, ClientId, #{limit => 10, continuation => none}].