Merge pull request #6783 from k32/bpapi-mgmt2
refactor(emqx_mgmt): Decorate RPCs
This commit is contained in:
commit
664be3b7d1
|
@ -104,6 +104,7 @@ activate(Name, Details) ->
|
||||||
activate(Name, Details, Message) ->
|
activate(Name, Details, Message) ->
|
||||||
gen_server:call(?MODULE, {activate_alarm, Name, Details, Message}).
|
gen_server:call(?MODULE, {activate_alarm, Name, Details, Message}).
|
||||||
|
|
||||||
|
-spec deactivate(binary() | atom()) -> ok | {error, not_found}.
|
||||||
deactivate(Name) ->
|
deactivate(Name) ->
|
||||||
deactivate(Name, no_details, <<"">>).
|
deactivate(Name, no_details, <<"">>).
|
||||||
|
|
||||||
|
@ -113,12 +114,14 @@ deactivate(Name, Details) ->
|
||||||
deactivate(Name, Details, Message) ->
|
deactivate(Name, Details, Message) ->
|
||||||
gen_server:call(?MODULE, {deactivate_alarm, Name, Details, Message}).
|
gen_server:call(?MODULE, {deactivate_alarm, Name, Details, Message}).
|
||||||
|
|
||||||
|
-spec delete_all_deactivated_alarms() -> ok.
|
||||||
delete_all_deactivated_alarms() ->
|
delete_all_deactivated_alarms() ->
|
||||||
gen_server:call(?MODULE, delete_all_deactivated_alarms).
|
gen_server:call(?MODULE, delete_all_deactivated_alarms).
|
||||||
|
|
||||||
get_alarms() ->
|
get_alarms() ->
|
||||||
get_alarms(all).
|
get_alarms(all).
|
||||||
|
|
||||||
|
-spec get_alarms(all | activated | deactivated) -> [map()].
|
||||||
get_alarms(all) ->
|
get_alarms(all) ->
|
||||||
gen_server:call(?MODULE, {get_alarms, all});
|
gen_server:call(?MODULE, {get_alarms, all});
|
||||||
|
|
||||||
|
|
|
@ -72,11 +72,11 @@ handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_event({clear_alarm, system_memory_high_watermark}, State) ->
|
handle_event({clear_alarm, system_memory_high_watermark}, State) ->
|
||||||
emqx_alarm:deactivate(high_system_memory_usage),
|
_ = emqx_alarm:deactivate(high_system_memory_usage),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_event({clear_alarm, process_memory_high_watermark}, State) ->
|
handle_event({clear_alarm, process_memory_high_watermark}, State) ->
|
||||||
emqx_alarm:deactivate(high_process_memory_usage),
|
_ = emqx_alarm:deactivate(high_process_memory_usage),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) ->
|
handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) ->
|
||||||
|
@ -86,7 +86,7 @@ handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) ->
|
handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) ->
|
||||||
emqx_alarm:deactivate(runq_overload),
|
_ = emqx_alarm:deactivate(runq_overload),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_event(_, State) ->
|
handle_event(_, State) ->
|
||||||
|
|
|
@ -44,6 +44,7 @@
|
||||||
|
|
||||||
%% PubSub Infos
|
%% PubSub Infos
|
||||||
-export([ subscriptions/1
|
-export([ subscriptions/1
|
||||||
|
, subscriptions_via_topic/1
|
||||||
, subscribers/1
|
, subscribers/1
|
||||||
, subscribed/2
|
, subscribed/2
|
||||||
]).
|
]).
|
||||||
|
@ -359,6 +360,11 @@ subscriptions(SubId) ->
|
||||||
undefined -> []
|
undefined -> []
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec(subscriptions_via_topic(emqx_types:topic()) -> [emqx_types:subopts()]).
|
||||||
|
subscriptions_via_topic(Topic) ->
|
||||||
|
MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=', '$1', Topic}], ['$_']}],
|
||||||
|
ets:select(?SUBOPTION, MatchSpec).
|
||||||
|
|
||||||
-spec(subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean()).
|
-spec(subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean()).
|
||||||
subscribed(SubPid, Topic) when is_pid(SubPid) ->
|
subscribed(SubPid, Topic) when is_pid(SubPid) ->
|
||||||
ets:member(?SUBOPTION, {SubPid, Topic});
|
ets:member(?SUBOPTION, {SubPid, Topic});
|
||||||
|
|
|
@ -22,10 +22,15 @@
|
||||||
|
|
||||||
, forward/3
|
, forward/3
|
||||||
, forward_async/3
|
, forward_async/3
|
||||||
, client_subscriptions/2
|
, list_client_subscriptions/2
|
||||||
|
, list_subscriptions_via_topic/2
|
||||||
|
|
||||||
, lookup_client/2
|
, lookup_client/2
|
||||||
, kickout_client/2
|
, kickout_client/2
|
||||||
|
|
||||||
|
, start_listener/2
|
||||||
|
, stop_listener/2
|
||||||
|
, restart_listener/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include("bpapi.hrl").
|
-include("bpapi.hrl").
|
||||||
|
@ -43,12 +48,6 @@ forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
|
||||||
forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
|
forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
|
||||||
emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
|
emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
|
||||||
|
|
||||||
-spec client_subscriptions(node(), emqx_types:clientid()) ->
|
|
||||||
[{emqx_types:topic(), emqx_types:subopts()}]
|
|
||||||
| emqx_rpc:badrpc().
|
|
||||||
client_subscriptions(Node, ClientId) ->
|
|
||||||
rpc:call(Node, emqx_broker, subscriptions, [ClientId]).
|
|
||||||
|
|
||||||
-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
|
-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
|
||||||
kickout_client(Node, ClientId) ->
|
kickout_client(Node, ClientId) ->
|
||||||
rpc:call(Node, emqx_cm, kick_session, [ClientId]).
|
rpc:call(Node, emqx_cm, kick_session, [ClientId]).
|
||||||
|
@ -57,3 +56,25 @@ kickout_client(Node, ClientId) ->
|
||||||
[emqx_cm:channel_info()] | {badrpc, _}.
|
[emqx_cm:channel_info()] | {badrpc, _}.
|
||||||
lookup_client(Node, Key) ->
|
lookup_client(Node, Key) ->
|
||||||
rpc:call(Node, emqx_cm, lookup_client, [Key]).
|
rpc:call(Node, emqx_cm, lookup_client, [Key]).
|
||||||
|
|
||||||
|
-spec list_client_subscriptions(node(), emqx_types:clientid()) ->
|
||||||
|
[{emqx_types:topic(), emqx_types:subopts()}]
|
||||||
|
| emqx_rpc:badrpc().
|
||||||
|
list_client_subscriptions(Node, ClientId) ->
|
||||||
|
rpc:call(Node, emqx_broker, subscriptions, [ClientId]).
|
||||||
|
|
||||||
|
-spec list_subscriptions_via_topic(node(), emqx_types:topic()) -> [emqx_types:subopts()].
|
||||||
|
list_subscriptions_via_topic(Node, Topic) ->
|
||||||
|
rpc:call(Node, emqx_broker, subscriptions_via_topic, [Topic]).
|
||||||
|
|
||||||
|
-spec start_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}.
|
||||||
|
start_listener(Node, Id) ->
|
||||||
|
rpc:call(Node, emqx_listeners, start_listener, [Id]).
|
||||||
|
|
||||||
|
-spec stop_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}.
|
||||||
|
stop_listener(Node, Id) ->
|
||||||
|
rpc:call(Node, emqx_listeners, stop_listener, [Id]).
|
||||||
|
|
||||||
|
-spec restart_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}.
|
||||||
|
restart_listener(Node, Id) ->
|
||||||
|
rpc:call(Node, emqx_listeners, restart_listener, [Id]).
|
||||||
|
|
|
@ -24,9 +24,13 @@
|
||||||
|
|
||||||
, is_running/1
|
, is_running/1
|
||||||
|
|
||||||
|
, get_alarms/2
|
||||||
, get_stats/1
|
, get_stats/1
|
||||||
, get_metrics/1
|
, get_metrics/1
|
||||||
|
|
||||||
|
, deactivate_alarm/2
|
||||||
|
, delete_all_deactivated_alarms/1
|
||||||
|
|
||||||
, clean_authz_cache/1
|
, clean_authz_cache/1
|
||||||
, clean_authz_cache/2
|
, clean_authz_cache/2
|
||||||
]).
|
]).
|
||||||
|
@ -38,6 +42,10 @@ introduced_in() ->
|
||||||
is_running(Node) ->
|
is_running(Node) ->
|
||||||
rpc:call(Node, emqx, is_running, []).
|
rpc:call(Node, emqx, is_running, []).
|
||||||
|
|
||||||
|
-spec get_alarms(node(), all | activated | deactivated) -> [map()].
|
||||||
|
get_alarms(Node, Type) ->
|
||||||
|
rpc:call(Node, emqx_alarm, get_alarms, [Type]).
|
||||||
|
|
||||||
-spec get_stats(node()) -> emqx_stats:stats() | {badrpc, _}.
|
-spec get_stats(node()) -> emqx_stats:stats() | {badrpc, _}.
|
||||||
get_stats(Node) ->
|
get_stats(Node) ->
|
||||||
rpc:call(Node, emqx_stats, getstats, []).
|
rpc:call(Node, emqx_stats, getstats, []).
|
||||||
|
@ -56,3 +64,12 @@ clean_authz_cache(Node, ClientId) ->
|
||||||
-spec clean_authz_cache(node()) -> ok | {badrpc, _}.
|
-spec clean_authz_cache(node()) -> ok | {badrpc, _}.
|
||||||
clean_authz_cache(Node) ->
|
clean_authz_cache(Node) ->
|
||||||
rpc:call(Node, emqx_authz_cache, drain_cache, []).
|
rpc:call(Node, emqx_authz_cache, drain_cache, []).
|
||||||
|
|
||||||
|
-spec deactivate_alarm(node(), binary() | atom()) ->
|
||||||
|
ok | {error, not_found} | {badrpc, _}.
|
||||||
|
deactivate_alarm(Node, Name) ->
|
||||||
|
rpc:call(Node, emqx_alarm, deactivate, [Name]).
|
||||||
|
|
||||||
|
-spec delete_all_deactivated_alarms(node()) -> ok | {badrpc, _}.
|
||||||
|
delete_all_deactivated_alarms(Node) ->
|
||||||
|
rpc:call(Node, emqx_alarm, delete_all_deactivated_alarms, []).
|
||||||
|
|
|
@ -59,7 +59,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Internal funcs
|
%% Internal funcs
|
||||||
-export([call_client/3]).
|
-export([do_call_client/2]).
|
||||||
|
|
||||||
%% Subscriptions
|
%% Subscriptions
|
||||||
-export([ list_subscriptions/1
|
-export([ list_subscriptions/1
|
||||||
|
@ -90,8 +90,10 @@
|
||||||
, list_listeners_by_id/1
|
, list_listeners_by_id/1
|
||||||
, get_listener/2
|
, get_listener/2
|
||||||
, manage_listener/2
|
, manage_listener/2
|
||||||
|
, do_update_listener/2
|
||||||
, update_listener/2
|
, update_listener/2
|
||||||
, update_listener/3
|
, update_listener/3
|
||||||
|
, do_remove_listener/1
|
||||||
, remove_listener/1
|
, remove_listener/1
|
||||||
, remove_listener/2
|
, remove_listener/2
|
||||||
]).
|
]).
|
||||||
|
@ -121,12 +123,6 @@
|
||||||
|
|
||||||
-elvis([{elvis_style, god_modules, disable}]).
|
-elvis([{elvis_style, god_modules, disable}]).
|
||||||
|
|
||||||
-export_type([listener_manage_op/0]).
|
|
||||||
|
|
||||||
-type listener_manage_op() :: start_listener
|
|
||||||
| stop_listener
|
|
||||||
| restart_listener.
|
|
||||||
|
|
||||||
%% TODO: remove these function after all api use minirest version 1.X
|
%% TODO: remove these function after all api use minirest version 1.X
|
||||||
return() ->
|
return() ->
|
||||||
ok.
|
ok.
|
||||||
|
@ -280,7 +276,7 @@ list_client_subscriptions(ClientId) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
client_subscriptions(Node, ClientId) ->
|
client_subscriptions(Node, ClientId) ->
|
||||||
{Node, wrap_rpc(emqx_broker_proto_v1:client_subscriptions(Node, ClientId))}.
|
{Node, wrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}.
|
||||||
|
|
||||||
clean_authz_cache(ClientId) ->
|
clean_authz_cache(ClientId) ->
|
||||||
Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
|
Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
|
||||||
|
@ -322,7 +318,8 @@ call_client(ClientId, Req) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
call_client(Node, ClientId, Req) when Node =:= node() ->
|
-spec do_call_client(emqx_types:clientid(), term()) -> term().
|
||||||
|
do_call_client(ClientId, Req) ->
|
||||||
case emqx_cm:lookup_channels(ClientId) of
|
case emqx_cm:lookup_channels(ClientId) of
|
||||||
[] -> {error, not_found};
|
[] -> {error, not_found};
|
||||||
Pids when is_list(Pids) ->
|
Pids when is_list(Pids) ->
|
||||||
|
@ -332,9 +329,11 @@ call_client(Node, ClientId, Req) when Node =:= node() ->
|
||||||
erlang:apply(ConnMod, call, [Pid, Req]);
|
erlang:apply(ConnMod, call, [Pid, Req]);
|
||||||
undefined -> {error, not_found}
|
undefined -> {error, not_found}
|
||||||
end
|
end
|
||||||
end;
|
end.
|
||||||
|
|
||||||
|
%% @private
|
||||||
call_client(Node, ClientId, Req) ->
|
call_client(Node, ClientId, Req) ->
|
||||||
rpc_call(Node, call_client, [Node, ClientId, Req]).
|
wrap_rpc(emqx_management_proto_v1:call_client(Node, ClientId, Req)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Subscriptions
|
%% Subscriptions
|
||||||
|
@ -354,26 +353,17 @@ list_subscriptions_via_topic(Topic, FormatFun) ->
|
||||||
lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun)
|
lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun)
|
||||||
|| Node <- mria_mnesia:running_nodes()]).
|
|| Node <- mria_mnesia:running_nodes()]).
|
||||||
|
|
||||||
|
list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) ->
|
||||||
list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
|
case wrap_rpc(emqx_broker_proto_v1:list_subscriptions_via_topic(Node, Topic)) of
|
||||||
MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}],
|
{error, Reason} -> {error, Reason};
|
||||||
erlang:apply(M, F, [ets:select(emqx_suboption, MatchSpec)]);
|
Result -> M:F(Result)
|
||||||
|
end.
|
||||||
list_subscriptions_via_topic(Node, Topic, FormatFun) ->
|
|
||||||
rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]).
|
|
||||||
|
|
||||||
lookup_subscriptions(ClientId) ->
|
lookup_subscriptions(ClientId) ->
|
||||||
lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]).
|
lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]).
|
||||||
|
|
||||||
lookup_subscriptions(Node, ClientId) when Node =:= node() ->
|
|
||||||
case ets:lookup(emqx_subid, ClientId) of
|
|
||||||
[] -> [];
|
|
||||||
[{_, Pid}] ->
|
|
||||||
ets:match_object(emqx_suboption, {{Pid, '_'}, '_'})
|
|
||||||
end;
|
|
||||||
|
|
||||||
lookup_subscriptions(Node, ClientId) ->
|
lookup_subscriptions(Node, ClientId) ->
|
||||||
rpc_call(Node, lookup_subscriptions, [Node, ClientId]).
|
wrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Routes
|
%% Routes
|
||||||
|
@ -390,7 +380,7 @@ subscribe(ClientId, TopicTables) ->
|
||||||
subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables).
|
subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables).
|
||||||
|
|
||||||
subscribe([Node | Nodes], ClientId, TopicTables) ->
|
subscribe([Node | Nodes], ClientId, TopicTables) ->
|
||||||
case rpc_call(Node, do_subscribe, [ClientId, TopicTables]) of
|
case wrap_rpc(emqx_management_proto_v1:subscribe(Node, ClientId, TopicTables)) of
|
||||||
{error, _} -> subscribe(Nodes, ClientId, TopicTables);
|
{error, _} -> subscribe(Nodes, ClientId, TopicTables);
|
||||||
Re -> Re
|
Re -> Re
|
||||||
end;
|
end;
|
||||||
|
@ -398,6 +388,8 @@ subscribe([Node | Nodes], ClientId, TopicTables) ->
|
||||||
subscribe([], _ClientId, _TopicTables) ->
|
subscribe([], _ClientId, _TopicTables) ->
|
||||||
{error, channel_not_found}.
|
{error, channel_not_found}.
|
||||||
|
|
||||||
|
-spec do_subscribe(emqx_types:clientid(), emqx_types:topic_filters()) ->
|
||||||
|
{subscribe, _} | {error, atom()}.
|
||||||
do_subscribe(ClientId, TopicTables) ->
|
do_subscribe(ClientId, TopicTables) ->
|
||||||
case ets:lookup(emqx_channel, ClientId) of
|
case ets:lookup(emqx_channel, ClientId) of
|
||||||
[] -> {error, channel_not_found};
|
[] -> {error, channel_not_found};
|
||||||
|
@ -410,18 +402,23 @@ publish(Msg) ->
|
||||||
emqx_metrics:inc_msg(Msg),
|
emqx_metrics:inc_msg(Msg),
|
||||||
emqx:publish(Msg).
|
emqx:publish(Msg).
|
||||||
|
|
||||||
|
-spec unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
|
||||||
|
{unsubscribe, _} | {error, channel_not_found}.
|
||||||
unsubscribe(ClientId, Topic) ->
|
unsubscribe(ClientId, Topic) ->
|
||||||
unsubscribe(mria_mnesia:running_nodes(), ClientId, Topic).
|
unsubscribe(mria_mnesia:running_nodes(), ClientId, Topic).
|
||||||
|
|
||||||
|
-spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
|
||||||
|
{unsubscribe, _} | {error, channel_not_found}.
|
||||||
unsubscribe([Node | Nodes], ClientId, Topic) ->
|
unsubscribe([Node | Nodes], ClientId, Topic) ->
|
||||||
case rpc_call(Node, do_unsubscribe, [ClientId, Topic]) of
|
case wrap_rpc(emqx_management_proto_v1:unsubscribe(Node, ClientId, Topic)) of
|
||||||
{error, _} -> unsubscribe(Nodes, ClientId, Topic);
|
{error, _} -> unsubscribe(Nodes, ClientId, Topic);
|
||||||
Re -> Re
|
Re -> Re
|
||||||
end;
|
end;
|
||||||
|
|
||||||
unsubscribe([], _ClientId, _Topic) ->
|
unsubscribe([], _ClientId, _Topic) ->
|
||||||
{error, channel_not_found}.
|
{error, channel_not_found}.
|
||||||
|
|
||||||
|
-spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
|
||||||
|
{unsubscribe, _} | {error, _}.
|
||||||
do_unsubscribe(ClientId, Topic) ->
|
do_unsubscribe(ClientId, Topic) ->
|
||||||
case ets:lookup(emqx_channel, ClientId) of
|
case ets:lookup(emqx_channel, ClientId) of
|
||||||
[] -> {error, channel_not_found};
|
[] -> {error, channel_not_found};
|
||||||
|
@ -457,44 +454,51 @@ listener_id_filter(Id, Listeners) ->
|
||||||
Filter = fun(#{id := Id0}) -> Id0 =:= Id end,
|
Filter = fun(#{id := Id0}) -> Id0 =:= Id end,
|
||||||
lists:filter(Filter, Listeners).
|
lists:filter(Filter, Listeners).
|
||||||
|
|
||||||
-spec manage_listener( listener_manage_op()
|
-spec manage_listener( start_listener | stop_listener | restart_listener
|
||||||
, Param :: map()) ->
|
, #{id := atom(), node := node()}
|
||||||
ok | {error, Reason :: term()}.
|
) -> ok | {error, Reason :: term()}.
|
||||||
manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()->
|
manage_listener(start_listener, #{id := ID, node := Node}) ->
|
||||||
erlang:apply(emqx_listeners, Operation, [ID]);
|
wrap_rpc(emqx_broker_proto_v1:start_listener(Node, ID));
|
||||||
manage_listener(Operation, Param = #{node := Node}) ->
|
manage_listener(stop_listener, #{id := ID, node := Node}) ->
|
||||||
rpc_call(Node, manage_listener, [Operation, Param]).
|
wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, ID));
|
||||||
|
manage_listener(restart_listener, #{id := ID, node := Node}) ->
|
||||||
|
wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, ID)).
|
||||||
|
|
||||||
update_listener(Id, Config) ->
|
-spec do_update_listener(string(), emqx_config:update_request()) ->
|
||||||
[update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()].
|
map() | {error, _}.
|
||||||
|
do_update_listener(Id, Config) ->
|
||||||
update_listener(Node, Id, Config) when Node =:= node() ->
|
|
||||||
case emqx_listeners:parse_listener_id(Id) of
|
case emqx_listeners:parse_listener_id(Id) of
|
||||||
{error, {invalid_listener_id, Id}} ->
|
{error, {invalid_listener_id, Id}} ->
|
||||||
{error, {invalid_listener_id, Id}};
|
{error, {invalid_listener_id, Id}};
|
||||||
{Type, Name} ->
|
{Type, Name} ->
|
||||||
case emqx:update_config([listeners, Type, Name], Config, #{}) of
|
case emqx:update_config([listeners, Type, Name], Config, #{}) of
|
||||||
{ok, #{raw_config := RawConf}} ->
|
{ok, #{raw_config := RawConf}} ->
|
||||||
RawConf#{node => Node, id => Id, running => true};
|
RawConf#{node => node(), id => Id, running => true};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end
|
end
|
||||||
end;
|
end.
|
||||||
|
|
||||||
|
update_listener(Id, Config) ->
|
||||||
|
[update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()].
|
||||||
|
|
||||||
update_listener(Node, Id, Config) ->
|
update_listener(Node, Id, Config) ->
|
||||||
rpc_call(Node, update_listener, [Node, Id, Config]).
|
wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)).
|
||||||
|
|
||||||
remove_listener(Id) ->
|
remove_listener(Id) ->
|
||||||
[remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()].
|
[remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()].
|
||||||
|
|
||||||
remove_listener(Node, Id) when Node =:= node() ->
|
-spec do_remove_listener(string()) -> ok.
|
||||||
|
do_remove_listener(Id) ->
|
||||||
{Type, Name} = emqx_listeners:parse_listener_id(Id),
|
{Type, Name} = emqx_listeners:parse_listener_id(Id),
|
||||||
case emqx:remove_config([listeners, Type, Name], #{}) of
|
case emqx:remove_config([listeners, Type, Name], #{}) of
|
||||||
{ok, _} -> ok;
|
{ok, _} -> ok;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
error(Reason)
|
error(Reason)
|
||||||
end;
|
end.
|
||||||
|
|
||||||
remove_listener(Node, Id) ->
|
remove_listener(Node, Id) ->
|
||||||
rpc_call(Node, remove_listener, [Node, Id]).
|
wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Get Alarms
|
%% Get Alarms
|
||||||
|
@ -503,23 +507,17 @@ remove_listener(Node, Id) ->
|
||||||
get_alarms(Type) ->
|
get_alarms(Type) ->
|
||||||
[{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()].
|
[{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()].
|
||||||
|
|
||||||
get_alarms(Node, Type) when Node =:= node() ->
|
|
||||||
add_duration_field(emqx_alarm:get_alarms(Type));
|
|
||||||
get_alarms(Node, Type) ->
|
get_alarms(Node, Type) ->
|
||||||
rpc_call(Node, get_alarms, [Node, Type]).
|
add_duration_field(wrap_rpc(emqx_proto_v1:get_alarms(Node, Type))).
|
||||||
|
|
||||||
deactivate(Node, Name) when Node =:= node() ->
|
|
||||||
emqx_alarm:deactivate(Name);
|
|
||||||
deactivate(Node, Name) ->
|
deactivate(Node, Name) ->
|
||||||
rpc_call(Node, deactivate, [Node, Name]).
|
wrap_rpc(emqx_proto_v1:deactivate_alarm(Node, Name)).
|
||||||
|
|
||||||
delete_all_deactivated_alarms() ->
|
delete_all_deactivated_alarms() ->
|
||||||
[delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()].
|
[delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()].
|
||||||
|
|
||||||
delete_all_deactivated_alarms(Node) when Node =:= node() ->
|
|
||||||
emqx_alarm:delete_all_deactivated_alarms();
|
|
||||||
delete_all_deactivated_alarms(Node) ->
|
delete_all_deactivated_alarms(Node) ->
|
||||||
rpc_call(Node, delete_deactivated_alarms, [Node]).
|
wrap_rpc(emqx_proto_v1:delete_all_deactivated_alarms(Node)).
|
||||||
|
|
||||||
add_duration_field(Alarms) ->
|
add_duration_field(Alarms) ->
|
||||||
Now = erlang:system_time(microsecond),
|
Now = erlang:system_time(microsecond),
|
||||||
|
@ -562,12 +560,6 @@ item(route, {Topic, Node}) ->
|
||||||
%% Internal Functions.
|
%% Internal Functions.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
rpc_call(Node, Fun, Args) ->
|
|
||||||
case rpc:call(Node, ?MODULE, Fun, Args) of
|
|
||||||
{badrpc, Reason} -> {error, Reason};
|
|
||||||
Res -> Res
|
|
||||||
end.
|
|
||||||
|
|
||||||
wrap_rpc({badrpc, Reason}) ->
|
wrap_rpc({badrpc, Reason}) ->
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
wrap_rpc(Res) ->
|
wrap_rpc(Res) ->
|
||||||
|
|
|
@ -579,9 +579,6 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) ->
|
||||||
case do_unsubscribe(ClientID, Topic) of
|
case do_unsubscribe(ClientID, Topic) of
|
||||||
{error, channel_not_found} ->
|
{error, channel_not_found} ->
|
||||||
{404, ?CLIENT_ID_NOT_FOUND};
|
{404, ?CLIENT_ID_NOT_FOUND};
|
||||||
{error, Reason} ->
|
|
||||||
Message = list_to_binary(io_lib:format("~p", [Reason])),
|
|
||||||
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
|
|
||||||
{unsubscribe, [{Topic, #{}}]} ->
|
{unsubscribe, [{Topic, #{}}]} ->
|
||||||
{200}
|
{200}
|
||||||
end.
|
end.
|
||||||
|
@ -608,6 +605,8 @@ do_subscribe(ClientID, Topic0, Qos) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
|
||||||
|
{unsubscribe, _} | {error, channel_not_found}.
|
||||||
do_unsubscribe(ClientID, Topic) ->
|
do_unsubscribe(ClientID, Topic) ->
|
||||||
case emqx_mgmt:unsubscribe(ClientID, Topic) of
|
case emqx_mgmt:unsubscribe(ClientID, Topic) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
|
|
@ -25,6 +25,13 @@
|
||||||
, list_subscriptions/1
|
, list_subscriptions/1
|
||||||
|
|
||||||
, list_listeners/1
|
, list_listeners/1
|
||||||
|
, remove_listener/2
|
||||||
|
|
||||||
|
, update_listener/3
|
||||||
|
, subscribe/3
|
||||||
|
, unsubscribe/3
|
||||||
|
|
||||||
|
, call_client/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/bpapi.hrl").
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
@ -47,3 +54,26 @@ list_subscriptions(Node) ->
|
||||||
-spec list_listeners(node()) -> [map()] | {badrpc, _}.
|
-spec list_listeners(node()) -> [map()] | {badrpc, _}.
|
||||||
list_listeners(Node) ->
|
list_listeners(Node) ->
|
||||||
rpc:call(Node, emqx_mgmt, do_list_listeners, []).
|
rpc:call(Node, emqx_mgmt, do_list_listeners, []).
|
||||||
|
|
||||||
|
-spec remove_listener(node(), string()) -> ok | {badrpc, _}.
|
||||||
|
remove_listener(Node, Id) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, do_remove_listener, [Id]).
|
||||||
|
|
||||||
|
-spec update_listener(node(), string(), emqx_config:update_request()) ->
|
||||||
|
map() | {error, _} | {badrpc, _}.
|
||||||
|
update_listener(Node, Id, Config) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, do_update_listener, [Id, Config]).
|
||||||
|
|
||||||
|
-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
|
||||||
|
{subscribe, _} | {error, atom()} | {badrpc, _}.
|
||||||
|
subscribe(Node, ClientId, TopicTables) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]).
|
||||||
|
|
||||||
|
-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) ->
|
||||||
|
{unsubscribe, _} | {error, _} | {badrpc, _}.
|
||||||
|
unsubscribe(Node, ClientId, Topic) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]).
|
||||||
|
|
||||||
|
-spec call_client(node(), emqx_types:clientid(), term()) -> term().
|
||||||
|
call_client(Node, ClientId, Req) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]).
|
||||||
|
|
|
@ -89,7 +89,7 @@ t_clients(_) ->
|
||||||
{ok, _} = emqx_mgmt_api_test_util:request_api(post, SubscribePath,
|
{ok, _} = emqx_mgmt_api_test_util:request_api(post, SubscribePath,
|
||||||
"", AuthHeader, SubscribeBody),
|
"", AuthHeader, SubscribeBody),
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
[{{_, AfterSubTopic}, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1),
|
[{AfterSubTopic, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1),
|
||||||
?assertEqual(AfterSubTopic, Topic),
|
?assertEqual(AfterSubTopic, Topic),
|
||||||
?assertEqual(AfterSubQos, Qos),
|
?assertEqual(AfterSubQos, Qos),
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue