Merge pull request #12685 from SergeTupchiy/refactor-emqx-mgmt-call-client
Refactor emqx mgmt call client
This commit is contained in:
commit
e35e8847b9
|
@ -39,6 +39,7 @@
|
||||||
{emqx_management,2}.
|
{emqx_management,2}.
|
||||||
{emqx_management,3}.
|
{emqx_management,3}.
|
||||||
{emqx_management,4}.
|
{emqx_management,4}.
|
||||||
|
{emqx_management,5}.
|
||||||
{emqx_metrics,1}.
|
{emqx_metrics,1}.
|
||||||
{emqx_mgmt_api_plugins,1}.
|
{emqx_mgmt_api_plugins,1}.
|
||||||
{emqx_mgmt_api_plugins,2}.
|
{emqx_mgmt_api_plugins,2}.
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
-include("emqx_mgmt.hrl").
|
-include("emqx_mgmt.hrl").
|
||||||
-include_lib("emqx/include/emqx_cm.hrl").
|
-include_lib("emqx/include/emqx_cm.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
|
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
|
||||||
-elvis([{elvis_style, god_modules, disable}]).
|
-elvis([{elvis_style, god_modules, disable}]).
|
||||||
|
@ -117,6 +118,13 @@
|
||||||
|
|
||||||
-elvis([{elvis_style, god_modules, disable}]).
|
-elvis([{elvis_style, god_modules, disable}]).
|
||||||
|
|
||||||
|
-define(maybe_log_node_errors(LogData, Errors),
|
||||||
|
case Errors of
|
||||||
|
[] -> ok;
|
||||||
|
_ -> ?SLOG(error, (LogData)#{node_errors => Errors})
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Node Info
|
%% Node Info
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -185,7 +193,7 @@ get_sys_memory() ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
node_info(Nodes) ->
|
node_info(Nodes) ->
|
||||||
emqx_rpc:unwrap_erpc(emqx_management_proto_v4:node_info(Nodes)).
|
emqx_rpc:unwrap_erpc(emqx_management_proto_v5:node_info(Nodes)).
|
||||||
|
|
||||||
stopped_node_info(Node) ->
|
stopped_node_info(Node) ->
|
||||||
{Node, #{node => Node, node_status => 'stopped', role => core}}.
|
{Node, #{node => Node, node_status => 'stopped', role => core}}.
|
||||||
|
@ -248,7 +256,7 @@ convert_broker_info({K, V}, M) ->
|
||||||
M#{K => iolist_to_binary(V)}.
|
M#{K => iolist_to_binary(V)}.
|
||||||
|
|
||||||
broker_info(Nodes) ->
|
broker_info(Nodes) ->
|
||||||
emqx_rpc:unwrap_erpc(emqx_management_proto_v4:broker_info(Nodes)).
|
emqx_rpc:unwrap_erpc(emqx_management_proto_v5:broker_info(Nodes)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Metrics and Stats
|
%% Metrics and Stats
|
||||||
|
@ -361,7 +369,7 @@ kickout_client(Node, ClientId) ->
|
||||||
|
|
||||||
kickout_clients(ClientIds) when is_list(ClientIds) ->
|
kickout_clients(ClientIds) when is_list(ClientIds) ->
|
||||||
F = fun(Node) ->
|
F = fun(Node) ->
|
||||||
emqx_management_proto_v4:kickout_clients(Node, ClientIds)
|
emqx_management_proto_v5:kickout_clients(Node, ClientIds)
|
||||||
end,
|
end,
|
||||||
Results = lists:map(F, emqx:running_nodes()),
|
Results = lists:map(F, emqx:running_nodes()),
|
||||||
case lists:filter(fun(Res) -> Res =/= ok end, Results) of
|
case lists:filter(fun(Res) -> Res =/= ok end, Results) of
|
||||||
|
@ -461,17 +469,34 @@ set_keepalive(_ClientId, _Interval) ->
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
call_client(ClientId, Req) ->
|
call_client(ClientId, Req) ->
|
||||||
Results = [call_client(Node, ClientId, Req) || Node <- emqx:running_nodes()],
|
case emqx_cm_registry:is_enabled() of
|
||||||
Expected = lists:filter(
|
true ->
|
||||||
|
do_call_client(ClientId, Req);
|
||||||
|
false ->
|
||||||
|
call_client_on_all_nodes(ClientId, Req)
|
||||||
|
end.
|
||||||
|
|
||||||
|
call_client_on_all_nodes(ClientId, Req) ->
|
||||||
|
Nodes = emqx:running_nodes(),
|
||||||
|
Results = call_client(Nodes, ClientId, Req),
|
||||||
|
{Expected, Errs} = lists:foldr(
|
||||||
fun
|
fun
|
||||||
({error, _}) -> false;
|
({_N, {error, not_found}}, Acc) -> Acc;
|
||||||
(_) -> true
|
({_N, {error, _}} = Err, {OkAcc, ErrAcc}) -> {OkAcc, [Err | ErrAcc]};
|
||||||
|
({_N, OkRes}, {OkAcc, ErrAcc}) -> {[OkRes | OkAcc], ErrAcc}
|
||||||
end,
|
end,
|
||||||
Results
|
{[], []},
|
||||||
|
lists:zip(Nodes, Results)
|
||||||
),
|
),
|
||||||
|
?maybe_log_node_errors(#{msg => "call_client_failed", request => Req}, Errs),
|
||||||
case Expected of
|
case Expected of
|
||||||
|
[] ->
|
||||||
|
case Errs of
|
||||||
[] -> {error, not_found};
|
[] -> {error, not_found};
|
||||||
[Result | _] -> Result
|
[{_Node, FirstErr} | _] -> FirstErr
|
||||||
|
end;
|
||||||
|
[Result | _] ->
|
||||||
|
Result
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
@ -491,8 +516,8 @@ do_call_client(ClientId, Req) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
call_client(Node, ClientId, Req) ->
|
call_client(Nodes, ClientId, Req) ->
|
||||||
unwrap_rpc(emqx_management_proto_v4:call_client(Node, ClientId, Req)).
|
emqx_rpc:unwrap_erpc(emqx_management_proto_v5:call_client(Nodes, ClientId, Req)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Subscriptions
|
%% Subscriptions
|
||||||
|
@ -505,7 +530,7 @@ do_list_subscriptions() ->
|
||||||
throw(not_implemented).
|
throw(not_implemented).
|
||||||
|
|
||||||
list_subscriptions(Node) ->
|
list_subscriptions(Node) ->
|
||||||
unwrap_rpc(emqx_management_proto_v4:list_subscriptions(Node)).
|
unwrap_rpc(emqx_management_proto_v5:list_subscriptions(Node)).
|
||||||
|
|
||||||
list_subscriptions_via_topic(Topic, FormatFun) ->
|
list_subscriptions_via_topic(Topic, FormatFun) ->
|
||||||
lists:append([
|
lists:append([
|
||||||
|
@ -527,7 +552,7 @@ subscribe(ClientId, TopicTables) ->
|
||||||
subscribe(emqx:running_nodes(), ClientId, TopicTables).
|
subscribe(emqx:running_nodes(), ClientId, TopicTables).
|
||||||
|
|
||||||
subscribe([Node | Nodes], ClientId, TopicTables) ->
|
subscribe([Node | Nodes], ClientId, TopicTables) ->
|
||||||
case unwrap_rpc(emqx_management_proto_v4:subscribe(Node, ClientId, TopicTables)) of
|
case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of
|
||||||
{error, _} -> subscribe(Nodes, ClientId, TopicTables);
|
{error, _} -> subscribe(Nodes, ClientId, TopicTables);
|
||||||
{subscribe, Res} -> {subscribe, Res, Node}
|
{subscribe, Res} -> {subscribe, Res, Node}
|
||||||
end;
|
end;
|
||||||
|
@ -554,7 +579,7 @@ unsubscribe(ClientId, Topic) ->
|
||||||
-spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
|
-spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
|
||||||
{unsubscribe, _} | {error, channel_not_found}.
|
{unsubscribe, _} | {error, channel_not_found}.
|
||||||
unsubscribe([Node | Nodes], ClientId, Topic) ->
|
unsubscribe([Node | Nodes], ClientId, Topic) ->
|
||||||
case unwrap_rpc(emqx_management_proto_v4:unsubscribe(Node, ClientId, Topic)) of
|
case unwrap_rpc(emqx_management_proto_v5:unsubscribe(Node, ClientId, Topic)) of
|
||||||
{error, _} -> unsubscribe(Nodes, ClientId, Topic);
|
{error, _} -> unsubscribe(Nodes, ClientId, Topic);
|
||||||
Re -> Re
|
Re -> Re
|
||||||
end;
|
end;
|
||||||
|
@ -577,7 +602,7 @@ unsubscribe_batch(ClientId, Topics) ->
|
||||||
-spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
|
-spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
|
||||||
{unsubscribe_batch, _} | {error, channel_not_found}.
|
{unsubscribe_batch, _} | {error, channel_not_found}.
|
||||||
unsubscribe_batch([Node | Nodes], ClientId, Topics) ->
|
unsubscribe_batch([Node | Nodes], ClientId, Topics) ->
|
||||||
case unwrap_rpc(emqx_management_proto_v4:unsubscribe_batch(Node, ClientId, Topics)) of
|
case unwrap_rpc(emqx_management_proto_v5:unsubscribe_batch(Node, ClientId, Topics)) of
|
||||||
{error, _} -> unsubscribe_batch(Nodes, ClientId, Topics);
|
{error, _} -> unsubscribe_batch(Nodes, ClientId, Topics);
|
||||||
Re -> Re
|
Re -> Re
|
||||||
end;
|
end;
|
||||||
|
@ -656,6 +681,7 @@ lookup_running_client(ClientId, FormatFun) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal Functions.
|
%% Internal Functions.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
unwrap_rpc({badrpc, Reason}) ->
|
unwrap_rpc({badrpc, Reason}) ->
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
unwrap_rpc(Res) ->
|
unwrap_rpc(Res) ->
|
||||||
|
|
|
@ -407,7 +407,7 @@ get_configs_v1(QueryStr) ->
|
||||||
Node = maps:get(<<"node">>, QueryStr, node()),
|
Node = maps:get(<<"node">>, QueryStr, node()),
|
||||||
case
|
case
|
||||||
lists:member(Node, emqx:running_nodes()) andalso
|
lists:member(Node, emqx:running_nodes()) andalso
|
||||||
emqx_management_proto_v4:get_full_config(Node)
|
emqx_management_proto_v5:get_full_config(Node)
|
||||||
of
|
of
|
||||||
false ->
|
false ->
|
||||||
Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])),
|
Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])),
|
||||||
|
|
|
@ -516,7 +516,7 @@ list_listeners() ->
|
||||||
lists:map(fun list_listeners/1, [Self | lists:delete(Self, emqx:running_nodes())]).
|
lists:map(fun list_listeners/1, [Self | lists:delete(Self, emqx:running_nodes())]).
|
||||||
|
|
||||||
list_listeners(Node) ->
|
list_listeners(Node) ->
|
||||||
wrap_rpc(emqx_management_proto_v4:list_listeners(Node)).
|
wrap_rpc(emqx_management_proto_v5:list_listeners(Node)).
|
||||||
|
|
||||||
listener_status_by_id(NodeL) ->
|
listener_status_by_id(NodeL) ->
|
||||||
Listeners = maps:to_list(listener_status_by_id(NodeL, #{})),
|
Listeners = maps:to_list(listener_status_by_id(NodeL, #{})),
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_management_proto_v5).
|
||||||
|
|
||||||
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
introduced_in/0,
|
||||||
|
|
||||||
|
node_info/1,
|
||||||
|
broker_info/1,
|
||||||
|
list_subscriptions/1,
|
||||||
|
|
||||||
|
list_listeners/1,
|
||||||
|
subscribe/3,
|
||||||
|
unsubscribe/3,
|
||||||
|
unsubscribe_batch/3,
|
||||||
|
|
||||||
|
call_client/3,
|
||||||
|
|
||||||
|
get_full_config/1,
|
||||||
|
|
||||||
|
kickout_clients/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
|
||||||
|
introduced_in() ->
|
||||||
|
"5.6.0".
|
||||||
|
|
||||||
|
-spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) ->
|
||||||
|
{unsubscribe, _} | {error, _} | {badrpc, _}.
|
||||||
|
unsubscribe_batch(Node, ClientId, Topics) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]).
|
||||||
|
|
||||||
|
-spec node_info([node()]) -> emqx_rpc:erpc_multicall(map()).
|
||||||
|
node_info(Nodes) ->
|
||||||
|
erpc:multicall(Nodes, emqx_mgmt, node_info, [], 30000).
|
||||||
|
|
||||||
|
-spec broker_info([node()]) -> emqx_rpc:erpc_multicall(map()).
|
||||||
|
broker_info(Nodes) ->
|
||||||
|
erpc:multicall(Nodes, emqx_mgmt, broker_info, [], 30000).
|
||||||
|
|
||||||
|
-spec list_subscriptions(node()) -> [map()] | {badrpc, _}.
|
||||||
|
list_subscriptions(Node) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, do_list_subscriptions, []).
|
||||||
|
|
||||||
|
-spec list_listeners(node()) -> map() | {badrpc, _}.
|
||||||
|
list_listeners(Node) ->
|
||||||
|
rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []).
|
||||||
|
|
||||||
|
-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
|
||||||
|
{subscribe, _} | {error, atom()} | {badrpc, _}.
|
||||||
|
subscribe(Node, ClientId, TopicTables) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]).
|
||||||
|
|
||||||
|
-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) ->
|
||||||
|
{unsubscribe, _} | {error, _} | {badrpc, _}.
|
||||||
|
unsubscribe(Node, ClientId, Topic) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]).
|
||||||
|
|
||||||
|
-spec call_client([node()], emqx_types:clientid(), term()) -> emqx_rpc:erpc_multicall(term()).
|
||||||
|
call_client(Nodes, ClientId, Req) ->
|
||||||
|
erpc:multicall(Nodes, emqx_mgmt, do_call_client, [ClientId, Req], 30000).
|
||||||
|
|
||||||
|
-spec get_full_config(node()) -> map() | list() | {badrpc, _}.
|
||||||
|
get_full_config(Node) ->
|
||||||
|
rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []).
|
||||||
|
|
||||||
|
-spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}.
|
||||||
|
kickout_clients(Node, ClientIds) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]).
|
|
@ -28,14 +28,19 @@
|
||||||
all() ->
|
all() ->
|
||||||
[
|
[
|
||||||
{group, persistence_disabled},
|
{group, persistence_disabled},
|
||||||
{group, persistence_enabled}
|
{group, persistence_enabled},
|
||||||
|
{group, cm_registry_enabled},
|
||||||
|
{group, cm_registry_disabled}
|
||||||
].
|
].
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
TCs = emqx_common_test_helpers:all(?MODULE),
|
CMRegistryTCs = [t_call_client_cluster],
|
||||||
|
TCs = emqx_common_test_helpers:all(?MODULE) -- CMRegistryTCs,
|
||||||
[
|
[
|
||||||
{persistence_disabled, [], TCs},
|
{persistence_disabled, [], TCs},
|
||||||
{persistence_enabled, [], [t_persist_list_subs]}
|
{persistence_enabled, [], [t_persist_list_subs]},
|
||||||
|
{cm_registry_enabled, CMRegistryTCs},
|
||||||
|
{cm_registry_disabled, CMRegistryTCs}
|
||||||
].
|
].
|
||||||
|
|
||||||
init_per_group(persistence_disabled, Config) ->
|
init_per_group(persistence_disabled, Config) ->
|
||||||
|
@ -66,10 +71,17 @@ init_per_group(persistence_enabled, Config) ->
|
||||||
[
|
[
|
||||||
{apps, Apps}
|
{apps, Apps}
|
||||||
| Config
|
| Config
|
||||||
].
|
];
|
||||||
|
init_per_group(cm_registry_enabled, Config) ->
|
||||||
|
[{emqx_config, "broker.enable_session_registry = true"} | Config];
|
||||||
|
init_per_group(cm_registry_disabled, Config) ->
|
||||||
|
[{emqx_config, "broker.enable_session_registry = false"} | Config].
|
||||||
|
|
||||||
end_per_group(_Grp, Config) ->
|
end_per_group(_Grp, Config) ->
|
||||||
emqx_cth_suite:stop(?config(apps, Config)).
|
case ?config(apps, Config) of
|
||||||
|
undefined -> ok;
|
||||||
|
Apps -> emqx_cth_suite:stop(Apps)
|
||||||
|
end.
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
@ -447,6 +459,83 @@ t_persist_list_subs(_) ->
|
||||||
%% clients:
|
%% clients:
|
||||||
VerifySubs().
|
VerifySubs().
|
||||||
|
|
||||||
|
t_call_client_cluster(Config) ->
|
||||||
|
[Node1, Node2] = ?config(cluster, Config),
|
||||||
|
[Node1ClientId, Node2ClientId] = ?config(client_ids, Config),
|
||||||
|
?assertMatch(
|
||||||
|
{[], #{}}, rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_args(Node1ClientId))
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{[], #{}}, rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(Node2ClientId))
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{[], #{}}, rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_args(Node2ClientId))
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{[], #{}}, rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(Node1ClientId))
|
||||||
|
),
|
||||||
|
|
||||||
|
case proplists:get_value(name, ?config(tc_group_properties, Config)) of
|
||||||
|
cm_registry_disabled ->
|
||||||
|
%% Simulating crashes that must be handled by erpc multicall
|
||||||
|
?assertMatch(
|
||||||
|
{error, _},
|
||||||
|
rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_bad_args(Node2ClientId))
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{error, _},
|
||||||
|
rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_bad_args(Node1ClientId))
|
||||||
|
);
|
||||||
|
cm_registry_enabled ->
|
||||||
|
%% Direct call to remote pid is expected to crash
|
||||||
|
?assertMatch(
|
||||||
|
{badrpc, {'EXIT', _}},
|
||||||
|
rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_bad_args(Node1ClientId))
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{badrpc, {'EXIT', _}},
|
||||||
|
rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_bad_args(Node2ClientId))
|
||||||
|
);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
|
||||||
|
NotFoundClientId = <<"no_such_client_id">>,
|
||||||
|
?assertEqual(
|
||||||
|
{error, not_found},
|
||||||
|
rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(NotFoundClientId))
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
{error, not_found},
|
||||||
|
rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(NotFoundClientId))
|
||||||
|
).
|
||||||
|
|
||||||
|
t_call_client_cluster(init, Config) ->
|
||||||
|
Apps = [{emqx, ?config(emqx_config, Config)}, emqx_management],
|
||||||
|
[Node1, Node2] =
|
||||||
|
Cluster = emqx_cth_cluster:start(
|
||||||
|
[
|
||||||
|
{list_to_atom(atom_to_list(?MODULE) ++ "1"), #{role => core, apps => Apps}},
|
||||||
|
{list_to_atom(atom_to_list(?MODULE) ++ "2"), #{role => core, apps => Apps}}
|
||||||
|
],
|
||||||
|
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
|
||||||
|
),
|
||||||
|
{ok, Node1Client, Node1ClientId} = connect_client(Node1),
|
||||||
|
{ok, Node2Client, Node2ClientId} = connect_client(Node2),
|
||||||
|
%% They may exit during the test due to simulated crashes
|
||||||
|
unlink(Node1Client),
|
||||||
|
unlink(Node2Client),
|
||||||
|
[
|
||||||
|
{cluster, Cluster},
|
||||||
|
{client_ids, [Node1ClientId, Node2ClientId]},
|
||||||
|
{client_pids, [Node1Client, Node2Client]}
|
||||||
|
| Config
|
||||||
|
];
|
||||||
|
t_call_client_cluster('end', Config) ->
|
||||||
|
emqx_cth_cluster:stop(?config(cluster, Config)),
|
||||||
|
[exit(ClientPid, kill) || ClientPid <- ?config(client_pids, Config)],
|
||||||
|
ok.
|
||||||
|
|
||||||
%%% helpers
|
%%% helpers
|
||||||
ident(Arg) ->
|
ident(Arg) ->
|
||||||
Arg.
|
Arg.
|
||||||
|
@ -462,3 +551,24 @@ setup_clients(Config) ->
|
||||||
disconnect_clients(Config) ->
|
disconnect_clients(Config) ->
|
||||||
Clients = ?config(clients, Config),
|
Clients = ?config(clients, Config),
|
||||||
lists:foreach(fun emqtt:disconnect/1, Clients).
|
lists:foreach(fun emqtt:disconnect/1, Clients).
|
||||||
|
|
||||||
|
get_mqtt_port(Node) ->
|
||||||
|
{_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
|
||||||
|
Port.
|
||||||
|
|
||||||
|
connect_client(Node) ->
|
||||||
|
Port = get_mqtt_port(Node),
|
||||||
|
ClientId = <<(atom_to_binary(Node))/binary, "_client">>,
|
||||||
|
{ok, Client} = emqtt:start_link([
|
||||||
|
{port, Port},
|
||||||
|
{proto_ver, v5},
|
||||||
|
{clientid, ClientId}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
{ok, Client, ClientId}.
|
||||||
|
|
||||||
|
client_msgs_args(ClientId) ->
|
||||||
|
[mqueue_msgs, ClientId, #{limit => 10, continuation => none}].
|
||||||
|
|
||||||
|
client_msgs_bad_args(ClientId) ->
|
||||||
|
[mqueue_msgs, ClientId, "bad_page_params"].
|
||||||
|
|
|
@ -287,12 +287,12 @@ t_configs_node({'init', Config}) ->
|
||||||
(other_node, _) -> <<"log=2">>;
|
(other_node, _) -> <<"log=2">>;
|
||||||
(bad_node, _) -> {badrpc, bad}
|
(bad_node, _) -> {badrpc, bad}
|
||||||
end,
|
end,
|
||||||
meck:expect(emqx_management_proto_v4, get_full_config, F),
|
meck:expect(emqx_management_proto_v5, get_full_config, F),
|
||||||
meck:expect(emqx_conf_proto_v3, get_hocon_config, F2),
|
meck:expect(emqx_conf_proto_v3, get_hocon_config, F2),
|
||||||
meck:expect(hocon_pp, do, fun(Conf, _) -> Conf end),
|
meck:expect(hocon_pp, do, fun(Conf, _) -> Conf end),
|
||||||
Config;
|
Config;
|
||||||
t_configs_node({'end', _}) ->
|
t_configs_node({'end', _}) ->
|
||||||
meck:unload([emqx, emqx_management_proto_v4, emqx_conf_proto_v3, hocon_pp]);
|
meck:unload([emqx, emqx_management_proto_v5, emqx_conf_proto_v3, hocon_pp]);
|
||||||
t_configs_node(_) ->
|
t_configs_node(_) ->
|
||||||
Node = atom_to_list(node()),
|
Node = atom_to_list(node()),
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue