diff --git a/apps/emqx/src/emqx_authz_cache.erl b/apps/emqx/src/emqx_authz_cache.erl index 0012ad7db..41e4831f5 100644 --- a/apps/emqx/src/emqx_authz_cache.erl +++ b/apps/emqx/src/emqx_authz_cache.erl @@ -28,6 +28,7 @@ , get_cache_ttl/0 , is_enabled/0 , drain_cache/0 + , drain_cache/1 ]). %% export for test @@ -154,6 +155,16 @@ drain_cache() -> _ = persistent_term:put(drain_k(), time_now()), ok. +-spec drain_cache(emqx_types:clientid()) -> ok | {error, not_found}. +drain_cache(ClientId) -> + case emqx_cm:lookup_channels(ClientId) of + [] -> + {error, not_found}; + Pids when is_list(Pids) -> + erlang:send(lists:last(Pids), clean_authz_cache), + ok + end. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index a84fe112c..6c889e000 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -56,6 +56,8 @@ -export([ lookup_channels/1 , lookup_channels/2 + + , lookup_client/1 ]). %% Test/debug interface @@ -80,8 +82,16 @@ , get_connected_client_count/0 ]). +-export_type([ channel_info/0 + ]). + -type(chan_pid() :: pid()). +-type(channel_info() :: { _Chan :: {emqx_types:clientid(), pid()} + , _Info :: emqx_types:infos() + , _Stats :: emqx_types:stats() + }). + %% Tables for channel management. -define(CHAN_TAB, emqx_channel). -define(CHAN_CONN_TAB, emqx_channel_conn). @@ -502,6 +512,18 @@ lookup_channels(global, ClientId) -> lookup_channels(local, ClientId) -> [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)]. +-spec lookup_client({clientid, emqx_types:clientid()} | {username, emqx_types:username()}) -> + [channel_info()]. +lookup_client({username, Username}) -> + MatchSpec = [{ {'_', #{clientinfo => #{username => '$1'}}, '_'} + , [{'=:=','$1', Username}] + , ['$_'] + }], + ets:select(emqx_channel_info, MatchSpec); +lookup_client({clientid, ClientId}) -> + [Rec || Key <- ets:lookup(emqx_channel, ClientId) + , Rec <- ets:lookup(emqx_channel_info, Key)]. + %% @private rpc_call(Node, Fun, Args, Timeout) -> case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of diff --git a/apps/emqx/src/proto/emqx_broker_proto_v1.erl b/apps/emqx/src/proto/emqx_broker_proto_v1.erl index 7643fa40b..539ca9962 100644 --- a/apps/emqx/src/proto/emqx_broker_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_broker_proto_v1.erl @@ -19,8 +19,13 @@ -behaviour(emqx_bpapi). -export([ introduced_in/0 + , forward/3 , forward_async/3 + , client_subscriptions/2 + + , lookup_client/2 + , kickout_client/2 ]). -include("bpapi.hrl"). @@ -37,3 +42,18 @@ forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> -spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> true. forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). + +-spec client_subscriptions(node(), emqx_types:clientid()) -> + [{emqx_types:topic(), emqx_types:subopts()}] + | emqx_rpc:badrpc(). +client_subscriptions(Node, ClientId) -> + rpc:call(Node, emqx_broker, subscriptions, [ClientId]). + +-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}. +kickout_client(Node, ClientId) -> + rpc:call(Node, emqx_cm, kick_session, [ClientId]). + +-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) -> + [emqx_cm:channel_info()] | {badrpc, _}. +lookup_client(Node, Key) -> + rpc:call(Node, emqx_cm, lookup_client, [Key]). diff --git a/apps/emqx/src/proto/emqx_proto_v1.erl b/apps/emqx/src/proto/emqx_proto_v1.erl index bcfb187a0..e7d273756 100644 --- a/apps/emqx/src/proto/emqx_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_proto_v1.erl @@ -21,7 +21,14 @@ -include("bpapi.hrl"). -export([ introduced_in/0 + , is_running/1 + + , get_stats/1 + , get_metrics/1 + + , clean_authz_cache/1 + , clean_authz_cache/2 ]). introduced_in() -> @@ -30,3 +37,22 @@ introduced_in() -> -spec is_running(node()) -> boolean() | {badrpc, term()}. is_running(Node) -> rpc:call(Node, emqx, is_running, []). + +-spec get_stats(node()) -> emqx_stats:stats() | {badrpc, _}. +get_stats(Node) -> + rpc:call(Node, emqx_stats, getstats, []). + +-spec get_metrics(node()) -> [{emqx_metrics:metric_name(), non_neg_integer()}] | {badrpc, _}. +get_metrics(Node) -> + rpc:call(Node, emqx_metrics, all, []). + +-spec clean_authz_cache(node(), emqx_types:clientid()) -> + ok + | {error, not_found} + | {badrpc, _}. +clean_authz_cache(Node, ClientId) -> + rpc:call(Node, emqx_authz_cache, drain_cache, [ClientId]). + +-spec clean_authz_cache(node()) -> ok | {badrpc, _}. +clean_authz_cache(Node) -> + rpc:call(Node, emqx_authz_cache, drain_cache, []). diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 3ab37fb4a..1adc2f61f 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -29,7 +29,9 @@ , lookup_node/1 , list_brokers/0 , lookup_broker/1 + , node_info/0 , node_info/1 + , broker_info/0 , broker_info/1 ]). @@ -65,6 +67,8 @@ , list_subscriptions_via_topic/3 , lookup_subscriptions/1 , lookup_subscriptions/2 + + , do_list_subscriptions/0 ]). %% Routes @@ -80,7 +84,8 @@ ]). %% Listeners --export([ list_listeners/0 +-export([ do_list_listeners/0 + , list_listeners/0 , list_listeners/1 , list_listeners_by_id/1 , get_listener/2 @@ -116,6 +121,12 @@ -elvis([{elvis_style, god_modules, disable}]). +-export_type([listener_manage_op/0]). + +-type listener_manage_op() :: start_listener + | stop_listener + | restart_listener. + %% TODO: remove these function after all api use minirest version 1.X return() -> ok. @@ -134,7 +145,7 @@ list_nodes() -> lookup_node(Node) -> node_info(Node). -node_info(Node) when Node =:= node() -> +node_info() -> Memory = emqx_vm:get_memory(), Info = maps:from_list([{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()]), BrokerInfo = emqx_sys:info(), @@ -151,9 +162,10 @@ node_info(Node) when Node =:= node() -> node_status => 'Running', uptime => proplists:get_value(uptime, BrokerInfo), version => iolist_to_binary(proplists:get_value(version, BrokerInfo)) - }; + }. + node_info(Node) -> - rpc_call(Node, node_info, [Node]). + wrap_rpc(emqx_management_proto_v1:node_info(Node)). stopped_node_info(Node) -> #{name => Node, node_status => 'Stopped'}. @@ -168,12 +180,12 @@ list_brokers() -> lookup_broker(Node) -> broker_info(Node). -broker_info(Node) when Node =:= node() -> +broker_info() -> Info = maps:from_list([{K, iolist_to_binary(V)} || {K, V} <- emqx_sys:info()]), - Info#{node => Node, otp_release => iolist_to_binary(otp_rel()), node_status => 'Running'}; + Info#{node => node(), otp_release => iolist_to_binary(otp_rel()), node_status => 'Running'}. broker_info(Node) -> - rpc_call(Node, broker_info, [Node]). + wrap_rpc(emqx_management_proto_v1:broker_info(Node)). %%-------------------------------------------------------------------- %% Metrics and Stats @@ -182,10 +194,8 @@ broker_info(Node) -> get_metrics() -> nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]). -get_metrics(Node) when Node =:= node() -> - emqx_metrics:all(); get_metrics(Node) -> - rpc_call(Node, get_metrics, [Node]). + wrap_rpc(emqx_proto_v1:get_metrics(Node)). get_stats() -> GlobalStatsKeys = @@ -209,10 +219,8 @@ delete_keys(List, []) -> delete_keys(List, [Key | Keys]) -> delete_keys(proplists:delete(Key, List), Keys). -get_stats(Node) when Node =:= node() -> - emqx_stats:getstats(); get_stats(Node) -> - rpc_call(Node, get_stats, [Node]). + wrap_rpc(emqx_proto_v1:get_stats(Node)). nodes_info_count(PropList) -> NodeCount = @@ -239,24 +247,11 @@ lookup_client({username, Username}, FormatFun) -> lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- mria_mnesia:running_nodes()]). -lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() -> - lists:append(lists:map( - fun(Key) -> - lists:map(fun M:F/1, ets:lookup(emqx_channel_info, Key)) - end, ets:lookup(emqx_channel, ClientId))); - -lookup_client(Node, {clientid, ClientId}, FormatFun) -> - rpc_call(Node, lookup_client, [Node, {clientid, ClientId}, FormatFun]); - -lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() -> - MatchSpec = [{ {'_', #{clientinfo => #{username => '$1'}}, '_'} - , [{'=:=','$1', Username}] - , ['$_'] - }], - lists:map(fun M:F/1, ets:select(emqx_channel_info, MatchSpec)); - -lookup_client(Node, {username, Username}, FormatFun) -> - rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]). +lookup_client(Node, Key, {M, F}) -> + case wrap_rpc(emqx_broker_proto_v1:lookup_client(Node, Key)) of + {error, Err} -> {error, Err}; + L -> lists:map(fun M:F/1, L) + end. kickout_client({ClientID, FormatFun}) -> case lookup_client({clientid, ClientID}, FormatFun) of @@ -267,11 +262,8 @@ kickout_client({ClientID, FormatFun}) -> check_results(Results) end. -kickout_client(Node, ClientId) when Node =:= node() -> - emqx_cm:kick_session(ClientId); - kickout_client(Node, ClientId) -> - rpc_call(Node, kickout_client, [Node, ClientId]). + wrap_rpc(emqx_broker_proto_v1:kickout_client(Node, ClientId)). list_authz_cache(ClientId) -> call_client(ClientId, list_authz_cache). @@ -287,27 +279,15 @@ list_client_subscriptions(ClientId) -> [Result | _] -> Result end. -client_subscriptions(Node, ClientId) when Node =:= node() -> - {Node, emqx_broker:subscriptions(ClientId)}; - client_subscriptions(Node, ClientId) -> - rpc_call(Node, client_subscriptions, [Node, ClientId]). + {Node, wrap_rpc(emqx_broker_proto_v1:client_subscriptions(Node, ClientId))}. clean_authz_cache(ClientId) -> Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()], check_results(Results). - -clean_authz_cache(Node, ClientId) when Node =:= node() -> - case emqx_cm:lookup_channels(ClientId) of - [] -> - {error, not_found}; - Pids when is_list(Pids) -> - erlang:send(lists:last(Pids), clean_authz_cache), - ok - end; clean_authz_cache(Node, ClientId) -> - rpc_call(Node, clean_authz_cache, [Node, ClientId]). + wrap_rpc(emqx_proto_v1:clean_authz_cache(Node, ClientId)). clean_authz_cache_all() -> Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()], @@ -316,11 +296,8 @@ clean_authz_cache_all() -> BadNodes -> {error, BadNodes} end. -clean_authz_cache_all(Node) when Node =:= node() -> - emqx_authz_cache:drain_cache(); - clean_authz_cache_all(Node) -> - rpc_call(Node, clean_authz_cache_all, [Node]). + wrap_rpc(emqx_proto_v1:clean_authz_cache(Node)). set_ratelimit_policy(ClientId, Policy) -> call_client(ClientId, {ratelimit, Policy}). @@ -363,14 +340,15 @@ call_client(Node, ClientId, Req) -> %% Subscriptions %%-------------------------------------------------------------------- -list_subscriptions(Node) when Node =:= node() -> +-spec do_list_subscriptions() -> [map()]. +do_list_subscriptions() -> case check_row_limit([mqtt_subproperty]) of false -> throw(max_row_limit); ok -> [item(subscription, Sub) || Sub <- ets:tab2list(mqtt_subproperty)] - end; + end. list_subscriptions(Node) -> - rpc_call(Node, list_subscriptions, [Node]). + wrap_rpc(emqx_management_proto_v1:list_subscriptions(Node)). list_subscriptions_via_topic(Topic, FormatFun) -> lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) @@ -455,14 +433,14 @@ do_unsubscribe(ClientId, Topic) -> %% Listeners %%-------------------------------------------------------------------- +do_list_listeners() -> + [Conf#{node => node(), id => Id} || {Id, Conf} <- emqx_listeners:list()]. + list_listeners() -> lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]). -list_listeners(Node) when Node =:= node() -> - [Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()]; - list_listeners(Node) -> - rpc_call(Node, list_listeners, [Node]). + wrap_rpc(emqx_management_proto_v1:list_listeners(Node)). list_listeners_by_id(Id) -> listener_id_filter(Id, list_listeners()). @@ -479,10 +457,7 @@ listener_id_filter(Id, Listeners) -> Filter = fun(#{id := Id0}) -> Id0 =:= Id end, lists:filter(Filter, Listeners). - --spec manage_listener( Operation :: start_listener - | stop_listener - | restart_listener +-spec manage_listener( listener_manage_op() , Param :: map()) -> ok | {error, Reason :: term()}. manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()-> @@ -593,6 +568,11 @@ rpc_call(Node, Fun, Args) -> Res -> Res end. +wrap_rpc({badrpc, Reason}) -> + {error, Reason}; +wrap_rpc(Res) -> + Res. + otp_rel() -> lists:concat([emqx_vm:get_otp_version(), "/", erlang:system_info(version)]). @@ -610,7 +590,7 @@ check_row_limit([Tab | Tables], Limit) -> check_results(Results) -> case lists:any(fun(Item) -> Item =:= ok end, Results) of true -> ok; - false -> lists:last(Results) + false -> wrap_rpc(lists:last(Results)) end. max_row_limit() -> diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v1.erl b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl new file mode 100644 index 000000000..0620f957d --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl @@ -0,0 +1,49 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_management_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , node_info/1 + , broker_info/1 + , list_subscriptions/1 + + , list_listeners/1 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec node_info(node()) -> map() | {badrpc, _}. +node_info(Node) -> + rpc:call(Node, emqx_mgmt, node_info, []). + +-spec broker_info(node()) -> map() | {badrpc, _}. +broker_info(Node) -> + rpc:call(Node, emqx_mgmt, broker_info, []). + +-spec list_subscriptions(node()) -> [map()] | {badrpc, _}. +list_subscriptions(Node) -> + rpc:call(Node, emqx_mgmt, do_list_subscriptions, []). + +-spec list_listeners(node()) -> [map()] | {badrpc, _}. +list_listeners(Node) -> + rpc:call(Node, emqx_mgmt, do_list_listeners, []).