diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index d9cee1e04..20e2aef77 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -104,6 +104,7 @@ activate(Name, Details) -> activate(Name, Details, Message) -> gen_server:call(?MODULE, {activate_alarm, Name, Details, Message}). +-spec deactivate(binary() | atom()) -> ok | {error, not_found}. deactivate(Name) -> deactivate(Name, no_details, <<"">>). @@ -113,12 +114,14 @@ deactivate(Name, Details) -> deactivate(Name, Details, Message) -> gen_server:call(?MODULE, {deactivate_alarm, Name, Details, Message}). +-spec delete_all_deactivated_alarms() -> ok. delete_all_deactivated_alarms() -> gen_server:call(?MODULE, delete_all_deactivated_alarms). get_alarms() -> get_alarms(all). +-spec get_alarms(all | activated | deactivated) -> [map()]. get_alarms(all) -> gen_server:call(?MODULE, {get_alarms, all}); diff --git a/apps/emqx/src/emqx_alarm_handler.erl b/apps/emqx/src/emqx_alarm_handler.erl index 37f0591f3..e19084cde 100644 --- a/apps/emqx/src/emqx_alarm_handler.erl +++ b/apps/emqx/src/emqx_alarm_handler.erl @@ -72,11 +72,11 @@ handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) -> {ok, State}; handle_event({clear_alarm, system_memory_high_watermark}, State) -> - emqx_alarm:deactivate(high_system_memory_usage), + _ = emqx_alarm:deactivate(high_system_memory_usage), {ok, State}; handle_event({clear_alarm, process_memory_high_watermark}, State) -> - emqx_alarm:deactivate(high_process_memory_usage), + _ = emqx_alarm:deactivate(high_process_memory_usage), {ok, State}; handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) -> @@ -86,7 +86,7 @@ handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) -> {ok, State}; handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) -> - emqx_alarm:deactivate(runq_overload), + _ = emqx_alarm:deactivate(runq_overload), {ok, State}; handle_event(_, State) -> diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index b93bb92c8..4afc4f99d 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -44,6 +44,7 @@ %% PubSub Infos -export([ subscriptions/1 + , subscriptions_via_topic/1 , subscribers/1 , subscribed/2 ]). @@ -359,6 +360,11 @@ subscriptions(SubId) -> undefined -> [] end. +-spec(subscriptions_via_topic(emqx_types:topic()) -> [emqx_types:subopts()]). +subscriptions_via_topic(Topic) -> + MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=', '$1', Topic}], ['$_']}], + ets:select(?SUBOPTION, MatchSpec). + -spec(subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean()). subscribed(SubPid, Topic) when is_pid(SubPid) -> ets:member(?SUBOPTION, {SubPid, Topic}); diff --git a/apps/emqx/src/proto/emqx_broker_proto_v1.erl b/apps/emqx/src/proto/emqx_broker_proto_v1.erl index 539ca9962..d55fef88f 100644 --- a/apps/emqx/src/proto/emqx_broker_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_broker_proto_v1.erl @@ -22,10 +22,15 @@ , forward/3 , forward_async/3 - , client_subscriptions/2 + , list_client_subscriptions/2 + , list_subscriptions_via_topic/2 , lookup_client/2 , kickout_client/2 + + , start_listener/2 + , stop_listener/2 + , restart_listener/2 ]). -include("bpapi.hrl"). @@ -43,12 +48,6 @@ forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). --spec client_subscriptions(node(), emqx_types:clientid()) -> - [{emqx_types:topic(), emqx_types:subopts()}] - | emqx_rpc:badrpc(). -client_subscriptions(Node, ClientId) -> - rpc:call(Node, emqx_broker, subscriptions, [ClientId]). - -spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}. kickout_client(Node, ClientId) -> rpc:call(Node, emqx_cm, kick_session, [ClientId]). @@ -57,3 +56,25 @@ kickout_client(Node, ClientId) -> [emqx_cm:channel_info()] | {badrpc, _}. lookup_client(Node, Key) -> rpc:call(Node, emqx_cm, lookup_client, [Key]). + +-spec list_client_subscriptions(node(), emqx_types:clientid()) -> + [{emqx_types:topic(), emqx_types:subopts()}] + | emqx_rpc:badrpc(). +list_client_subscriptions(Node, ClientId) -> + rpc:call(Node, emqx_broker, subscriptions, [ClientId]). + +-spec list_subscriptions_via_topic(node(), emqx_types:topic()) -> [emqx_types:subopts()]. +list_subscriptions_via_topic(Node, Topic) -> + rpc:call(Node, emqx_broker, subscriptions_via_topic, [Topic]). + +-spec start_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}. +start_listener(Node, Id) -> + rpc:call(Node, emqx_listeners, start_listener, [Id]). + +-spec stop_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}. +stop_listener(Node, Id) -> + rpc:call(Node, emqx_listeners, stop_listener, [Id]). + +-spec restart_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}. +restart_listener(Node, Id) -> + rpc:call(Node, emqx_listeners, restart_listener, [Id]). diff --git a/apps/emqx/src/proto/emqx_proto_v1.erl b/apps/emqx/src/proto/emqx_proto_v1.erl index e7d273756..a92561e8c 100644 --- a/apps/emqx/src/proto/emqx_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_proto_v1.erl @@ -24,9 +24,13 @@ , is_running/1 + , get_alarms/2 , get_stats/1 , get_metrics/1 + , deactivate_alarm/2 + , delete_all_deactivated_alarms/1 + , clean_authz_cache/1 , clean_authz_cache/2 ]). @@ -38,6 +42,10 @@ introduced_in() -> is_running(Node) -> rpc:call(Node, emqx, is_running, []). +-spec get_alarms(node(), all | activated | deactivated) -> [map()]. +get_alarms(Node, Type) -> + rpc:call(Node, emqx_alarm, get_alarms, [Type]). + -spec get_stats(node()) -> emqx_stats:stats() | {badrpc, _}. get_stats(Node) -> rpc:call(Node, emqx_stats, getstats, []). @@ -56,3 +64,12 @@ clean_authz_cache(Node, ClientId) -> -spec clean_authz_cache(node()) -> ok | {badrpc, _}. clean_authz_cache(Node) -> rpc:call(Node, emqx_authz_cache, drain_cache, []). + +-spec deactivate_alarm(node(), binary() | atom()) -> + ok | {error, not_found} | {badrpc, _}. +deactivate_alarm(Node, Name) -> + rpc:call(Node, emqx_alarm, deactivate, [Name]). + +-spec delete_all_deactivated_alarms(node()) -> ok | {badrpc, _}. +delete_all_deactivated_alarms(Node) -> + rpc:call(Node, emqx_alarm, delete_all_deactivated_alarms, []). diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 1adc2f61f..2bf92a3c7 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -59,7 +59,7 @@ ]). %% Internal funcs --export([call_client/3]). +-export([do_call_client/2]). %% Subscriptions -export([ list_subscriptions/1 @@ -90,8 +90,10 @@ , list_listeners_by_id/1 , get_listener/2 , manage_listener/2 + , do_update_listener/2 , update_listener/2 , update_listener/3 + , do_remove_listener/1 , remove_listener/1 , remove_listener/2 ]). @@ -121,12 +123,6 @@ -elvis([{elvis_style, god_modules, disable}]). --export_type([listener_manage_op/0]). - --type listener_manage_op() :: start_listener - | stop_listener - | restart_listener. - %% TODO: remove these function after all api use minirest version 1.X return() -> ok. @@ -280,7 +276,7 @@ list_client_subscriptions(ClientId) -> end. client_subscriptions(Node, ClientId) -> - {Node, wrap_rpc(emqx_broker_proto_v1:client_subscriptions(Node, ClientId))}. + {Node, wrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}. clean_authz_cache(ClientId) -> Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()], @@ -305,7 +301,7 @@ set_ratelimit_policy(ClientId, Policy) -> set_quota_policy(ClientId, Policy) -> call_client(ClientId, {quota, Policy}). -set_keepalive(ClientId, Interval)when Interval >= 0 andalso Interval =< 65535 -> +set_keepalive(ClientId, Interval) when Interval >= 0 andalso Interval =< 65535 -> call_client(ClientId, {keepalive, Interval}); set_keepalive(_ClientId, _Interval) -> {error, <<"mqtt3.1.1 specification: keepalive must between 0~65535">>}. @@ -322,7 +318,8 @@ call_client(ClientId, Req) -> end. %% @private -call_client(Node, ClientId, Req) when Node =:= node() -> +-spec do_call_client(emqx_types:clientid(), term()) -> term(). +do_call_client(ClientId, Req) -> case emqx_cm:lookup_channels(ClientId) of [] -> {error, not_found}; Pids when is_list(Pids) -> @@ -332,9 +329,11 @@ call_client(Node, ClientId, Req) when Node =:= node() -> erlang:apply(ConnMod, call, [Pid, Req]); undefined -> {error, not_found} end - end; + end. + +%% @private call_client(Node, ClientId, Req) -> - rpc_call(Node, call_client, [Node, ClientId, Req]). + wrap_rpc(emqx_management_proto_v1:call_client(Node, ClientId, Req)). %%-------------------------------------------------------------------- %% Subscriptions @@ -354,26 +353,17 @@ list_subscriptions_via_topic(Topic, FormatFun) -> lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- mria_mnesia:running_nodes()]). - -list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() -> - MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}], - erlang:apply(M, F, [ets:select(emqx_suboption, MatchSpec)]); - -list_subscriptions_via_topic(Node, Topic, FormatFun) -> - rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]). +list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) -> + case wrap_rpc(emqx_broker_proto_v1:list_subscriptions_via_topic(Node, Topic)) of + {error, Reason} -> {error, Reason}; + Result -> M:F(Result) + end. lookup_subscriptions(ClientId) -> lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]). -lookup_subscriptions(Node, ClientId) when Node =:= node() -> - case ets:lookup(emqx_subid, ClientId) of - [] -> []; - [{_, Pid}] -> - ets:match_object(emqx_suboption, {{Pid, '_'}, '_'}) - end; - lookup_subscriptions(Node, ClientId) -> - rpc_call(Node, lookup_subscriptions, [Node, ClientId]). + wrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId)). %%-------------------------------------------------------------------- %% Routes @@ -390,7 +380,7 @@ subscribe(ClientId, TopicTables) -> subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables). subscribe([Node | Nodes], ClientId, TopicTables) -> - case rpc_call(Node, do_subscribe, [ClientId, TopicTables]) of + case wrap_rpc(emqx_management_proto_v1:subscribe(Node, ClientId, TopicTables)) of {error, _} -> subscribe(Nodes, ClientId, TopicTables); Re -> Re end; @@ -398,6 +388,8 @@ subscribe([Node | Nodes], ClientId, TopicTables) -> subscribe([], _ClientId, _TopicTables) -> {error, channel_not_found}. +-spec do_subscribe(emqx_types:clientid(), emqx_types:topic_filters()) -> + {subscribe, _} | {error, atom()}. do_subscribe(ClientId, TopicTables) -> case ets:lookup(emqx_channel, ClientId) of [] -> {error, channel_not_found}; @@ -410,18 +402,23 @@ publish(Msg) -> emqx_metrics:inc_msg(Msg), emqx:publish(Msg). +-spec unsubscribe(emqx_types:clientid(), emqx_types:topic()) -> + {unsubscribe, _} | {error, channel_not_found}. unsubscribe(ClientId, Topic) -> unsubscribe(mria_mnesia:running_nodes(), ClientId, Topic). +-spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) -> + {unsubscribe, _} | {error, channel_not_found}. unsubscribe([Node | Nodes], ClientId, Topic) -> - case rpc_call(Node, do_unsubscribe, [ClientId, Topic]) of + case wrap_rpc(emqx_management_proto_v1:unsubscribe(Node, ClientId, Topic)) of {error, _} -> unsubscribe(Nodes, ClientId, Topic); Re -> Re end; - unsubscribe([], _ClientId, _Topic) -> {error, channel_not_found}. +-spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) -> + {unsubscribe, _} | {error, _}. do_unsubscribe(ClientId, Topic) -> case ets:lookup(emqx_channel, ClientId) of [] -> {error, channel_not_found}; @@ -457,44 +454,51 @@ listener_id_filter(Id, Listeners) -> Filter = fun(#{id := Id0}) -> Id0 =:= Id end, lists:filter(Filter, Listeners). --spec manage_listener( listener_manage_op() - , Param :: map()) -> - ok | {error, Reason :: term()}. -manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()-> - erlang:apply(emqx_listeners, Operation, [ID]); -manage_listener(Operation, Param = #{node := Node}) -> - rpc_call(Node, manage_listener, [Operation, Param]). +-spec manage_listener( start_listener | stop_listener | restart_listener + , #{id := atom(), node := node()} + ) -> ok | {error, Reason :: term()}. +manage_listener(start_listener, #{id := ID, node := Node}) -> + wrap_rpc(emqx_broker_proto_v1:start_listener(Node, ID)); +manage_listener(stop_listener, #{id := ID, node := Node}) -> + wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, ID)); +manage_listener(restart_listener, #{id := ID, node := Node}) -> + wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, ID)). -update_listener(Id, Config) -> - [update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()]. - -update_listener(Node, Id, Config) when Node =:= node() -> +-spec do_update_listener(string(), emqx_config:update_request()) -> + map() | {error, _}. +do_update_listener(Id, Config) -> case emqx_listeners:parse_listener_id(Id) of {error, {invalid_listener_id, Id}} -> {error, {invalid_listener_id, Id}}; {Type, Name} -> case emqx:update_config([listeners, Type, Name], Config, #{}) of {ok, #{raw_config := RawConf}} -> - RawConf#{node => Node, id => Id, running => true}; + RawConf#{node => node(), id => Id, running => true}; {error, Reason} -> {error, Reason} end - end; + end. + +update_listener(Id, Config) -> + [update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()]. + update_listener(Node, Id, Config) -> - rpc_call(Node, update_listener, [Node, Id, Config]). + wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)). remove_listener(Id) -> [remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()]. -remove_listener(Node, Id) when Node =:= node() -> +-spec do_remove_listener(string()) -> ok. +do_remove_listener(Id) -> {Type, Name} = emqx_listeners:parse_listener_id(Id), case emqx:remove_config([listeners, Type, Name], #{}) of {ok, _} -> ok; {error, Reason} -> error(Reason) - end; + end. + remove_listener(Node, Id) -> - rpc_call(Node, remove_listener, [Node, Id]). + wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)). %%-------------------------------------------------------------------- %% Get Alarms @@ -503,23 +507,17 @@ remove_listener(Node, Id) -> get_alarms(Type) -> [{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()]. -get_alarms(Node, Type) when Node =:= node() -> - add_duration_field(emqx_alarm:get_alarms(Type)); get_alarms(Node, Type) -> - rpc_call(Node, get_alarms, [Node, Type]). + add_duration_field(wrap_rpc(emqx_proto_v1:get_alarms(Node, Type))). -deactivate(Node, Name) when Node =:= node() -> - emqx_alarm:deactivate(Name); deactivate(Node, Name) -> - rpc_call(Node, deactivate, [Node, Name]). + wrap_rpc(emqx_proto_v1:deactivate_alarm(Node, Name)). delete_all_deactivated_alarms() -> [delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()]. -delete_all_deactivated_alarms(Node) when Node =:= node() -> - emqx_alarm:delete_all_deactivated_alarms(); delete_all_deactivated_alarms(Node) -> - rpc_call(Node, delete_deactivated_alarms, [Node]). + wrap_rpc(emqx_proto_v1:delete_all_deactivated_alarms(Node)). add_duration_field(Alarms) -> Now = erlang:system_time(microsecond), @@ -562,12 +560,6 @@ item(route, {Topic, Node}) -> %% Internal Functions. %%-------------------------------------------------------------------- -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Res -> Res - end. - wrap_rpc({badrpc, Reason}) -> {error, Reason}; wrap_rpc(Res) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index be38e401d..62292c362 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -579,9 +579,6 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) -> case do_unsubscribe(ClientID, Topic) of {error, channel_not_found} -> {404, ?CLIENT_ID_NOT_FOUND}; - {error, Reason} -> - Message = list_to_binary(io_lib:format("~p", [Reason])), - {500, #{code => <<"UNKNOW_ERROR">>, message => Message}}; {unsubscribe, [{Topic, #{}}]} -> {200} end. @@ -608,6 +605,8 @@ do_subscribe(ClientID, Topic0, Qos) -> end end. +-spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) -> + {unsubscribe, _} | {error, channel_not_found}. do_unsubscribe(ClientID, Topic) -> case emqx_mgmt:unsubscribe(ClientID, Topic) of {error, Reason} -> diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v1.erl b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl index 0620f957d..d3d8ff989 100644 --- a/apps/emqx_management/src/proto/emqx_management_proto_v1.erl +++ b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl @@ -25,6 +25,13 @@ , list_subscriptions/1 , list_listeners/1 + , remove_listener/2 + + , update_listener/3 + , subscribe/3 + , unsubscribe/3 + + , call_client/3 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -47,3 +54,26 @@ list_subscriptions(Node) -> -spec list_listeners(node()) -> [map()] | {badrpc, _}. list_listeners(Node) -> rpc:call(Node, emqx_mgmt, do_list_listeners, []). + +-spec remove_listener(node(), string()) -> ok | {badrpc, _}. +remove_listener(Node, Id) -> + rpc:call(Node, emqx_mgmt, do_remove_listener, [Id]). + +-spec update_listener(node(), string(), emqx_config:update_request()) -> + map() | {error, _} | {badrpc, _}. +update_listener(Node, Id, Config) -> + rpc:call(Node, emqx_mgmt, do_update_listener, [Id, Config]). + +-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) -> + {subscribe, _} | {error, atom()} | {badrpc, _}. +subscribe(Node, ClientId, TopicTables) -> + rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]). + +-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) -> + {unsubscribe, _} | {error, _} | {badrpc, _}. +unsubscribe(Node, ClientId, Topic) -> + rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]). + +-spec call_client(node(), emqx_types:clientid(), term()) -> term(). +call_client(Node, ClientId, Req) -> + rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]). diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 3c4864ab7..3282ea8c4 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -89,7 +89,7 @@ t_clients(_) -> {ok, _} = emqx_mgmt_api_test_util:request_api(post, SubscribePath, "", AuthHeader, SubscribeBody), timer:sleep(100), - [{{_, AfterSubTopic}, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1), + [{AfterSubTopic, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1), ?assertEqual(AfterSubTopic, Topic), ?assertEqual(AfterSubQos, Qos),