From 9a6a8a778b9394f3ab172db4d865e9ffe25aecef Mon Sep 17 00:00:00 2001 From: lafirest Date: Tue, 11 Jan 2022 17:16:02 +0800 Subject: [PATCH 1/2] feat(emqx_exhook): add metrics --- apps/emqx_exhook/include/emqx_exhook.hrl | 3 + apps/emqx_exhook/src/emqx_exhook_api.erl | 274 +++++++++++++----- apps/emqx_exhook/src/emqx_exhook_metrics.erl | 247 ++++++++++++++++ apps/emqx_exhook/src/emqx_exhook_mgr.erl | 92 +++--- apps/emqx_exhook/src/emqx_exhook_server.erl | 21 +- apps/emqx_exhook/src/emqx_exhook_sup.erl | 3 +- .../test/emqx_exhook_api_SUITE.erl | 31 +- 7 files changed, 544 insertions(+), 127 deletions(-) create mode 100644 apps/emqx_exhook/src/emqx_exhook_metrics.erl diff --git a/apps/emqx_exhook/include/emqx_exhook.hrl b/apps/emqx_exhook/include/emqx_exhook.hrl index 74f2e552e..e478c8cc7 100644 --- a/apps/emqx_exhook/include/emqx_exhook.hrl +++ b/apps/emqx_exhook/include/emqx_exhook.hrl @@ -18,6 +18,9 @@ -define(EMQX_EXHOOK_HRL, true). -define(APP, emqx_exhook). +-define(HOOKS_REF_COUNTER, emqx_exhook_ref_counter). +-define(HOOKS_METRICS, emqx_exhook_metrics). +-define(METRICS_PRECISION, 1). -define(ENABLED_HOOKS, [ {'client.connect', {emqx_exhook_handler, on_client_connect, []}} diff --git a/apps/emqx_exhook/src/emqx_exhook_api.erl b/apps/emqx_exhook/src/emqx_exhook_api.erl index 7581dd17b..02264c143 100644 --- a/apps/emqx_exhook/src/emqx_exhook_api.erl +++ b/apps/emqx_exhook/src/emqx_exhook_api.erl @@ -23,33 +23,36 @@ -export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]). --export([exhooks/2, action_with_name/2, move/2]). +-export([exhooks/2, action_with_name/2, move/2, server_hooks/2]). --import(hoconsc, [mk/2, ref/1, enum/1, array/1]). +-import(hoconsc, [mk/2, ref/1, enum/1, array/1, map/2]). -import(emqx_dashboard_swagger, [schema_with_example/2, error_codes/2]). -define(TAGS, [<<"exhooks">>]). -define(BAD_REQUEST, 'BAD_REQUEST'). -define(BAD_RPC, 'BAD_RPC'). +%%-------------------------------------------------------------------- +%% schema +%%-------------------------------------------------------------------- namespace() -> "exhook". api_spec() -> emqx_dashboard_swagger:spec(?MODULE). -paths() -> ["/exhooks", "/exhooks/:name", "/exhooks/:name/move"]. +paths() -> ["/exhooks", "/exhooks/:name", "/exhooks/:name/move", "/exhooks/:name/hooks"]. schema(("/exhooks")) -> #{ 'operationId' => exhooks, get => #{tags => ?TAGS, description => <<"List all servers">>, - responses => #{200 => mk(array(ref(detailed_server_info)), #{})} + responses => #{200 => mk(array(ref(list_server_info)), #{})} }, post => #{tags => ?TAGS, description => <<"Add a servers">>, 'requestBody' => server_conf_schema(), - responses => #{201 => mk(ref(detailed_server_info), #{}), + responses => #{201 => mk(ref(detail_server_info), #{}), 500 => error_codes([?BAD_RPC], <<"Bad RPC">>) } } @@ -60,7 +63,7 @@ schema("/exhooks/:name") -> get => #{tags => ?TAGS, description => <<"Get the detail information of server">>, parameters => params_server_name_in_path(), - responses => #{200 => mk(ref(detailed_server_info), #{}), + responses => #{200 => mk(ref(detail_server_info), #{}), 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) } }, @@ -77,11 +80,21 @@ schema("/exhooks/:name") -> description => <<"Delete the server">>, parameters => params_server_name_in_path(), responses => #{204 => <<>>, - 500 => error_codes([?BAD_RPC], <<"Bad RPC">>) - } + 500 => error_codes([?BAD_RPC], <<"Bad RPC">>) } } }; +schema("/exhooks/:name/hooks") -> + #{'operationId' => server_hooks, + get => #{tags => ?TAGS, + description => <<"Get the hooks information of server">>, + parameters => params_server_name_in_path(), + responses => #{200 => mk(array(ref(list_hook_info)), #{}), + 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) + } + } + }; + schema("/exhooks/:name/move") -> #{'operationId' => move, post => #{tags => ?TAGS, @@ -96,25 +109,56 @@ schema("/exhooks/:name/move") -> }. fields(move_req) -> - [ - {position, mk(enum([top, bottom, before, 'after']), #{})}, - {related, mk(string(), #{desc => <<"Relative position of movement">>, - default => <<>>, - example => <<>> - })} + [ {position, mk(enum([top, bottom, before, 'after']), #{})} + , {related, mk(string(), #{desc => <<"Relative position of movement">>, + default => <<>>, + example => <<>> + })} ]; -fields(detailed_server_info) -> - [ {status, mk(enum([running, waiting, stopped]), #{})} - , {hooks, mk(array(string()), #{default => []})} - , {node_status, mk(ref(node_status), #{})} +fields(list_server_info) -> + [ {metrics, mk(ref(metrics), #{})} + , {node_metrics, mk(array(ref(node_metrics)), #{})} + , {node_status, mk(array(ref(node_status)), #{})} + , {hooks, mk(array(ref(hook_info)), #{})} ] ++ emqx_exhook_schema:server_config(); +fields(detail_server_info) -> + [ {metrics, mk(ref(metrics), #{})} + , {node_metrics, mk(array(ref(node_metrics)), #{})} + , {node_status, mk(array(ref(node_status)), #{})} + , {hooks, mk(array(ref(hook_info)), #{})} + ] ++ emqx_exhook_schema:server_config(); + +fields(list_hook_info) -> + [ {name, mk(binary(), #{})} + , {params, mk(map(name, binary()), #{})} + , {metrics, mk(ref(metrics), #{})} + , {node_metrics, mk(array(ref(node_metrics)), #{})} + ]; + +fields(node_metrics) -> + [ {node, mk(string(), #{})} + , {metrics, mk(ref(metrics), #{})} + ]; + fields(node_status) -> [ {node, mk(string(), #{})} , {status, mk(enum([running, waiting, stopped, not_found, error]), #{})} ]; +fields(hook_info) -> + [ {name, mk(binary(), #{})} + , {params, mk(map(name, binary()), #{})} + ]; + +fields(metrics) -> + [ {succeed, mk(integer(), #{})} + , {failed, mk(integer(), #{})} + , {rate, mk(integer(), #{})} + , {max_rate, mk(integer(), #{})} + ]; + fields(server_config) -> emqx_exhook_schema:server_config(). @@ -140,16 +184,19 @@ server_conf_schema() -> } }). - +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- exhooks(get, _) -> - ServerL = emqx_exhook_mgr:list(), - ServerL2 = nodes_all_server_status(ServerL), - {200, ServerL2}; + Confs = emqx:get_config([exhook, servers]), + Infos = nodes_all_server_info(Confs), + {200, Infos}; exhooks(post, #{body := Body}) -> case emqx_exhook_mgr:update_config([exhook, servers], {add, Body}) of - {ok, Result} -> - {201, Result}; + {ok, _} -> + #{<<"name">> := Name} = Body, + get_nodes_server_info(Name); {error, Error} -> {500, #{code => <<"BAD_RPC">>, message => Error @@ -157,16 +204,7 @@ exhooks(post, #{body := Body}) -> end. action_with_name(get, #{bindings := #{name := Name}}) -> - Result = emqx_exhook_mgr:lookup(Name), - NodeStatus = nodes_server_status(Name), - case Result of - not_found -> - {400, #{code => <<"BAD_REQUEST">>, - message => <<"Server not found">> - }}; - ServerInfo -> - {200, ServerInfo#{node_status => NodeStatus}} - end; + get_nodes_server_info(Name); action_with_name(put, #{bindings := #{name := Name}, body := Body}) -> case emqx_exhook_mgr:update_config([exhook, servers], @@ -177,8 +215,7 @@ action_with_name(put, #{bindings := #{name := Name}, body := Body}) -> }}; {ok, {error, Reason}} -> {400, #{code => <<"BAD_REQUEST">>, - message => unicode:characters_to_binary( - io_lib:format("Error Reason:~p~n", [Reason])) + message => unicode:characters_to_binary(io_lib:format("Error Reason:~p~n", [Reason])) }}; {ok, _} -> {200}; @@ -216,59 +253,136 @@ move(post, #{bindings := #{name := Name}, body := Body}) -> }} end. -nodes_server_status(Name) -> - StatusL = call_cluster(emqx_exhook_mgr, server_status, [Name]), +server_hooks(get, #{bindings := #{name := Name}}) -> + Confs = emqx:get_config([exhook, servers]), + case lists:search(fun(#{name := CfgName}) -> CfgName =:= Name end, Confs) of + false -> + {400, #{code => <<"BAD_REQUEST">>, + message => <<"Server not found">> + }}; + _ -> + Info = get_nodes_server_hooks_info(Name), + {200, Info} + end. - Handler = fun({Node, {error, _}}) -> - #{node => Node, - status => error - }; - ({Node, Status}) -> - #{node => Node, - status => Status - } - end, +get_nodes_server_info(Name) -> + Confs = emqx:get_config([exhook, servers]), + case lists:search(fun(#{name := CfgName}) -> CfgName =:= Name end, Confs) of + false -> + {400, #{code => <<"BAD_REQUEST">>, + message => <<"Server not found">> + }}; + {value, Conf} -> + NodeStatus = nodes_server_info(Name), + {200, maps:merge(Conf, NodeStatus)} + end. - lists:map(Handler, StatusL). +%%-------------------------------------------------------------------- +%% GET /exhooks +%%-------------------------------------------------------------------- +nodes_all_server_info(ConfL) -> + AllInfos = call_cluster(emqx_exhook_mgr, all_servers_info, []), + Default = emqx_exhook_metrics:new_metrics_info(), + node_all_server_info(ConfL, AllInfos, Default, []). -nodes_all_server_status(ServerL) -> - AllStatusL = call_cluster(emqx_exhook_mgr, all_servers_status, []), +node_all_server_info([#{name := ServerName} = Conf | T], AllInfos, Default, Acc) -> + Info = fill_cluster_server_info(AllInfos, [], [], ServerName, Default), + AllInfo = maps:merge(Conf, Info), + node_all_server_info(T, AllInfos, Default, [AllInfo | Acc]); - AggreMap = lists:foldl(fun(#{name := Name}, Acc) -> - Acc#{Name => []} - end, - #{}, - ServerL), +node_all_server_info([], _, _, Acc) -> + lists:reverse(Acc). - AddToMap = fun(Servers, Node, Status, Map) -> - lists:foldl(fun(Name, Acc) -> - StatusL = maps:get(Name, Acc), - StatusL2 = [#{node => Node, - status => Status - } | StatusL], - Acc#{Name := StatusL2} - end, - Map, - Servers) - end, +fill_cluster_server_info([{Node, {error, _}} | T], StatusL, MetricsL, ServerName, Default) -> + fill_cluster_server_info(T, + [#{node => Node, status => error} | StatusL], + [#{node => Node, metrics => Default} | MetricsL], + ServerName, + Default); - AggreMap2 = lists:foldl(fun({Node, #{running := Running, - waiting := Waiting, - stopped := Stopped}}, - Acc) -> - AddToMap(Stopped, Node, stopped, - AddToMap(Waiting, Node, waiting, - AddToMap(Running, Node, running, Acc))) - end, - AggreMap, - AllStatusL), +fill_cluster_server_info([{Node, Result} | T], StatusL, MetricsL, ServerName, Default) -> + #{status := Status, metrics := Metrics} = Result, + fill_cluster_server_info(T, + [#{node => Node, status => maps:get(ServerName, Status, error)} | StatusL], + [#{node => Node, metrics => maps:get(ServerName, Metrics, Default)} | MetricsL], + ServerName, + Default); - Handler = fun(#{name := Name} = Server) -> - Server#{node_status => maps:get(Name, AggreMap2)} - end, +fill_cluster_server_info([], StatusL, MetricsL, ServerName, _) -> + Metrics = emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL), + #{metrics => Metrics, + node_metrics => MetricsL, + node_status => StatusL, + hooks => emqx_exhook_mgr:hooks(ServerName) + }. - lists:map(Handler, ServerL). +%%-------------------------------------------------------------------- +%% GET /exhooks/{name} +%%-------------------------------------------------------------------- +nodes_server_info(Name) -> + InfoL = call_cluster(emqx_exhook_mgr, server_info, [Name]), + Default = emqx_exhook_metrics:new_metrics_info(), + nodes_server_info(InfoL, Name, Default, [], []). +nodes_server_info([{Node, {error, _}} | T], Name, Default, StatusL, MetricsL) -> + nodes_server_info(T, + Name, + Default, + [#{node => Node, status => error} | StatusL], + [#{node => Node, metrics => Default} | MetricsL] + ); + +nodes_server_info([{Node, Result} | T], Name, Default, StatusL, MetricsL) -> + #{status := Status, metrics := Metrics} = Result, + nodes_server_info(T, + Name, + Default, + [#{node => Node, status => Status} | StatusL], + [#{node => Node, metrics => Metrics} | MetricsL] + ); + +nodes_server_info([], Name, _, StatusL, MetricsL) -> + #{metrics => emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL), + node_status => StatusL, + node_metrics => MetricsL, + hooks => emqx_exhook_mgr:hooks(Name) + }. + +%%-------------------------------------------------------------------- +%% GET /exhooks/{name}/hooks +%%-------------------------------------------------------------------- +get_nodes_server_hooks_info(Name) -> + case emqx_exhook_mgr:hooks(Name) of + [] -> []; + Hooks -> + AllInfos = call_cluster(emqx_exhook_mgr, server_hooks_metrics, [Name]), + Default = emqx_exhook_metrics:new_metrics_info(), + get_nodes_server_hooks_info(Hooks, AllInfos, Default, []) + end. + +get_nodes_server_hooks_info([#{name := Name} = Spec | T], AllInfos, Default, Acc) -> + Info = fill_server_hooks_info(AllInfos, Name, Default, []), + AllInfo = maps:merge(Spec, Info), + get_nodes_server_hooks_info(T, AllInfos, Default, [AllInfo | Acc]); + +get_nodes_server_hooks_info([], _, _, Acc) -> + Acc. + +fill_server_hooks_info([{_, {error, _}} | T], Name, Default, MetricsL) -> + fill_server_hooks_info(T, Name, Default, MetricsL); + +fill_server_hooks_info([{Node, MetricsMap} | T], Name, Default, MetricsL) -> + Metrics = maps:get(Name, MetricsMap, Default), + NodeMetrics = #{node => Node, metrics => Metrics}, + fill_server_hooks_info(T, Name, Default, [NodeMetrics | MetricsL]); + +fill_server_hooks_info([], _Name, _Default, MetricsL) -> + Metrics = emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL), + #{metrics => Metrics, node_metrics => MetricsL}. + +%%-------------------------------------------------------------------- +%% cluster call +%%-------------------------------------------------------------------- call_cluster(Module, Fun, Args) -> Nodes = mria_mnesia:running_nodes(), [{Node, rpc_call(Node, Module, Fun, Args)} || Node <- Nodes]. diff --git a/apps/emqx_exhook/src/emqx_exhook_metrics.erl b/apps/emqx_exhook/src/emqx_exhook_metrics.erl new file mode 100644 index 000000000..ce92bb4db --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook_metrics.erl @@ -0,0 +1,247 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_metrics). + +-include("emqx_exhook.hrl"). + +%% API +-export([ init/0, succeed/2, failed/2 + , update/1, new_metrics_info/0, servers_metrics/0 + , delete_server/1, server_metrics/1, hooks_metrics/1 + , metrics_aggregate/1, metrics_aggregate_by_key/2, hooks_metrics_aggregate/1 + , metrics_aggregate_by/2 + ]). + +-record(metrics, { + index :: index() + ,succeed = 0 :: non_neg_integer() + ,failed = 0 :: non_neg_integer() + ,rate = 0 :: non_neg_integer() + ,max_rate = 0 :: non_neg_integer() + ,window_rate :: integer() + }). + +-type server_name() :: emqx_exhook_mgr:server_name(). +-type hookpoint() :: emqx_exhook_server:hookpoint(). +-type index() :: {server_name(), hookpoint()}. +-type hooks_metrics() :: #{hookpoint() => metrics_info()}. +-type servers_metrics() :: #{server_name() => metrics_info()}. + +-type metrics_info() :: #{ succeed := non_neg_integer() + , failed := non_neg_integer() + , rate := number() + , max_rate := number() + }. + +-define(INDEX(ServerName, HookPoint), {ServerName, HookPoint}). +-export_type([metrics_info/0, servers_metrics/0, hooks_metrics/0]). + +%%-------------------------------------------------------------------- +%%% API +%%-------------------------------------------------------------------- +init() -> + _ = ets:new(?HOOKS_METRICS, + [ set, named_table, public + , {keypos, #metrics.index}, {write_concurrency, true} + , {read_concurrency, true} + ]), + ok. + +-spec new_metric_info() -> metrics_info(). +new_metric_info() -> + #{succeed => 0, + failed => 0, + rate => 0, + max_rate => 0 + }. + +-spec succeed(server_name(), hookpoint()) -> integer(). +succeed(Server, Hook) -> + inc(Server, Hook, #metrics.succeed, + #metrics{index = {Server, Hook} + ,window_rate = 1 + ,succeed = 1 + }). + +-spec failed(server_name(), hookpoint()) -> integer(). +failed(Server, Hook) -> + inc(Server, Hook, #metrics.failed, + #metrics{index = {Server, Hook} + ,window_rate = 1 + ,failed = 1 + }). + +-spec update(pos_integer()) -> true. +update(Interval) -> + Fun = fun(#metrics{rate = Rate, + window_rate = WindowRate, + max_rate = MaxRate} = Metrics, + _) -> + case calc_metric(WindowRate, Interval) of + Rate -> true; + NewRate -> + MaxRate2 = erlang:max(MaxRate, NewRate), + Metrics2 = Metrics#metrics{rate = NewRate, + window_rate = 0, + max_rate = MaxRate2}, + ets:insert(?HOOKS_METRICS, Metrics2) + end + end, + + ets:foldl(Fun, true, ?HOOKS_METRICS). + +-spec delete_server(server_name()) -> true. +delete_server(Name) -> + ets:match_delete(?HOOKS_METRICS, + {metrics, {Name, '_'}, '_', '_', '_', '_', '_'}). + +-spec server_metrics(server_name()) -> metrics_info(). +server_metrics(SvrName) -> + Hooks = ets:match(?HOOKS_METRICS, + {metrics, {SvrName, '_'}, '_', '_', '_', '_', '_'}), + + Fold = fun(#metrics{succeed = Succeed, + failed = Failed, + rate = Rate, + max_rate = MaxRate}, + Acc) -> + [#{ succeed => Succeed + , failed => Failed + , rate => Rate + , max_rate => MaxRate + } | Acc] + end, + + AllMetrics = lists:foldl(Fold, [], Hooks), + metrics_aggregate(AllMetrics). + +-spec servers_metrics() -> servers_metrics(). +servers_metrics() -> + AllMetrics = ets:tab2list(?HOOKS_METRICS), + + GroupFun = fun(#metrics{index = ?INDEX(ServerName, _), + succeed = Succeed, + failed = Failed, + rate = Rate, + max_rate = MaxRate + }, + Acc) -> + SvrGroup = maps:get(ServerName, Acc, []), + Metrics = #{ succeed => Succeed + , failed => Failed + , rate => Rate + , max_rate => MaxRate + }, + Acc#{ServerName => [Metrics | SvrGroup]} + end, + + GroupBySever = lists:foldl(GroupFun, #{}, AllMetrics), + + MapFun = fun(_SvrName, Group) -> metrics_aggregate(Group) end, + maps:map(MapFun, GroupBySever). + +-spec hooks_metrics(server_name()) -> hooks_metrics(). +hooks_metrics(SvrName) -> + Hooks = ets:match(?HOOKS_METRICS, + {metrics, {SvrName, '_'}, '_', '_', '_', '_', '_'}), + + Fold = fun(#metrics{index = ?INDEX(_, HookPoint), + succeed = Succeed, + failed = Failed, + rate = Rate, + max_rate = MaxRate}, + Acc) -> + Acc#{HookPoint => #{ succeed => Succeed + , failed => Failed + , rate => Rate + , max_rate => MaxRate + }} + end, + + lists:foldl(Fold, #{}, Hooks). + +-spec metrics_aggregate(list(metrics_info())) -> metrics_info(). +metrics_aggregate(MetricsL) -> + metrics_aggregate_by(fun(X) -> X end, MetricsL). + +-spec metrics_aggregate_by_key(Key, list(HasMetrics)) -> metrics_info() + when Key :: any(), + HasMetrics :: #{Key => metrics_info()}. +metrics_aggregate_by_key(Key, MetricsL) -> + metrics_aggregate_by(fun(X) -> maps:get(Key, X, new_metrics_info()) end, + MetricsL). + +-spec hooks_metrics_aggregate(list(hooks_metrics())) -> hooks_metrics(). +hooks_metrics_aggregate([]) -> + #{}; + +hooks_metrics_aggregate([H | _] = MapL) -> + Hooks = maps:keys(H), + + Fold = fun(Hook, Acc) -> + Metrics = metrics_aggregate_by_key(Hook, MapL), + Acc#{Hook => Metrics} + end, + + lists:foldl(Fold, #{}, Hooks). + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +-spec inc(server_name(), hookpoint(), pos_integer(), #metrics{}) -> integer(). +inc(Server, Hook, Pos, Default) -> + Index = {Server, Hook}, + ets:update_counter(?HOOKS_METRICS, + Index, + [{#metrics.window_rate, 1}, {Pos, 1}], + Default). + +-spec new_metrics_info() -> metrics_info(). +new_metrics_info() -> + #{ succeed => 0 + , failed => 0 + , rate => 0 + , max_rate => 0 + }. + +-spec calc_metric(non_neg_integer(), non_neg_integer()) -> non_neg_integer(). +calc_metric(Val, Interval) -> + erlang:ceil(Val * ?METRICS_PRECISION / Interval). + +-spec metrics_add(metrics_info(), metrics_info()) -> metrics_info(). +metrics_add(#{succeed := S1, failed := F1, rate := R1, max_rate := M1} + , #{succeed := S2, failed := F2, rate := R2, max_rate := M2} = Acc) -> + Acc#{ succeed := S1 + S2 + , failed := F1 + F2 + , rate := R1 + R2 + , max_rate := M1 + M2 + }. + +-spec metrics_aggregate_by(fun((any()) -> metrics_info()), list(metrics_info())) -> metrics_info(). +metrics_aggregate_by(_, []) -> + new_metric_info(); + +metrics_aggregate_by(Fun, MetricsL) -> + Fold = fun(E, Acc) -> metrics_add(Fun(E), Acc) end, + #{rate := Rate, + max_rate := MaxRate} = Result = lists:foldl(Fold, new_metric_info(), MetricsL), + + Len = erlang:length(MetricsL), + + Result#{rate := Rate div Len, + max_rate := MaxRate div Len + }. diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl index d3d429021..6123cd468 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mgr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -30,14 +30,16 @@ , lookup/1 , enable/1 , disable/1 - , server_status/1 - , all_servers_status/0 + , server_info/1 + , all_servers_info/0 + , server_hooks_metrics/1 ]). %% Helper funcs -export([ running/0 , server/1 - , init_counter_table/0 + , hooks/1 + , init_ref_counter_table/0 ]). -export([ update_config/2 @@ -86,9 +88,9 @@ }. -define(DEFAULT_TIMEOUT, 60000). --define(CNTER, emqx_exhook_counter). +-define(REFRESH_INTERVAL, timer:seconds(5)). --export_type([server_info/0]). +-export_type([servers/0, server/0, server_info/0]). %%-------------------------------------------------------------------- %% APIs @@ -113,17 +115,20 @@ enable(Name) -> disable(Name) -> update_config([exhook, servers], {enable, Name, false}). -server_status(Name) -> - call({server_status, Name}). +server_info(Name) -> + call({?FUNCTION_NAME, Name}). -all_servers_status() -> - call(all_servers_status). +all_servers_info() -> + call(?FUNCTION_NAME). + +server_hooks_metrics(Name) -> + call({?FUNCTION_NAME, Name}). call(Req) -> gen_server:call(?MODULE, Req, ?DEFAULT_TIMEOUT). -init_counter_table() -> - _ = ets:new(?CNTER, [named_table, public]). +init_ref_counter_table() -> + _ = ets:new(?HOOKS_REF_COUNTER, [named_table, public]). %%===================================================================== %% Hocon schema @@ -180,6 +185,7 @@ init([]) -> ServerL = emqx:get_config([exhook, servers]), {Waiting, Running, Stopped} = load_all_servers(ServerL), Orders = reorder(ServerL), + refresh_tick(), {ok, ensure_reload_timer( #{waiting => Waiting, running => Running, @@ -235,6 +241,8 @@ handle_call({update_config, {delete, ToDelete}, _}, _From, State) -> orders := maps:remove(ToDelete, Orders) }, + emqx_exhook_metrics:delete_server(ToDelete), + {reply, ok, State3}; handle_call({update_config, {add, RawConf}, NewConfL}, @@ -245,32 +253,22 @@ handle_call({update_config, {add, RawConf}, NewConfL}, case emqx_exhook_server:load(Name, Conf) of {ok, ServerState} -> save(Name, ServerState), - Status = running, - Hooks = hooks(Name), State2 = State#{running := Running#{Name => Conf}}; {error, _} -> - Status = running, - Hooks = [], StateT = State#{waiting := Waitting#{Name => Conf}}, State2 = ensure_reload_timer(StateT); disable -> - Status = stopped, - Hooks = [], State2 = State#{stopped := Stopped#{Name => Conf}} end, Orders = reorder(NewConfL), - Resulte = maps:merge(Conf, #{status => Status, hooks => Hooks}), - {reply, Resulte, State2#{orders := Orders}}; + {reply, ok, State2#{orders := Orders}}; handle_call({lookup, Name}, _From, State) -> case where_is_server(Name, State) of not_found -> Result = not_found; {Where, #{Name := Conf}} -> - Result = maps:merge(Conf, - #{ status => Where - , hooks => hooks(Name) - }) + Result = maps:merge(Conf, #{status => Where}) end, {reply, Result, State}; @@ -282,21 +280,41 @@ handle_call({update_config, {enable, Name, _Enable}, NewConfL}, _From, State) -> {Result, State2} = restart_server(Name, NewConfL, State), {reply, Result, State2}; -handle_call({server_status, Name}, _From, State) -> +handle_call({server_info, Name}, _From, State) -> case where_is_server(Name, State) of not_found -> Result = not_found; {Status, _} -> - Result = Status + HooksMetrics = emqx_exhook_metrics:server_metrics(Name), + Result = #{ status => Status + , metrics => HooksMetrics + } end, {reply, Result, State}; -handle_call(all_servers_status, _From, #{running := Running, - waiting := Waiting, - stopped := Stopped} = State) -> - {reply, #{running => maps:keys(Running), - waiting => maps:keys(Waiting), - stopped => maps:keys(Stopped)}, State}; +handle_call(all_servers_info, _From, #{running := Running, + waiting := Waiting, + stopped := Stopped} = State) -> + MakeStatus = fun(Status, Servers, Acc) -> + lists:foldl(fun(Name, IAcc) -> IAcc#{Name => Status} end, + Acc, + maps:keys(Servers)) + end, + Status = lists:foldl(fun({Status, Servers}, Acc) -> MakeStatus(Status, Servers, Acc) end, + #{}, + [{running, Running}, {waiting, Waiting}, {stopped, Stopped}]), + + Metrics = emqx_exhook_metrics:servers_metrics(), + + Result = #{ status => Status + , metrics => Metrics + }, + + {reply, Result, State}; + +handle_call({server_hooks_metrics, Name}, _From, State) -> + Result = emqx_exhook_metrics:hooks_metrics(Name), + {reply, Result, State}; handle_call(_Request, _From, State) -> Reply = ok, @@ -318,6 +336,11 @@ handle_info({timeout, _Ref, {reload, Name}}, State) -> {noreply, ensure_reload_timer(NState)} end; +handle_info(refresh_tick, State) -> + refresh_tick(), + emqx_exhook_metrics:update(?REFRESH_INTERVAL), + {noreply, State}; + handle_info(_Info, State) -> {noreply, State}. @@ -490,7 +513,6 @@ get_servers_info(Status, Map) -> end, maps:fold(Fold, [], Map). - where_is_server(Name, #{running := Running}) when is_map_key(Name, Running) -> {running, Running}; @@ -549,6 +571,10 @@ sort_name_by_order(Names, Orders) -> maps:get(A, Orders) < maps:get(B, Orders) end, Names). + +refresh_tick() -> + erlang:send_after(?REFRESH_INTERVAL, self(), ?FUNCTION_NAME). + %%-------------------------------------------------------------------- %% Server state persistent save(Name, ServerState) -> @@ -590,5 +616,5 @@ hooks(Name) -> undefined -> []; Service -> - emqx_exhook_server:hookpoints(Service) + emqx_exhook_server:hooks(Service) end. diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 3d0f6397e..9821f739b 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -19,8 +19,6 @@ -include("emqx_exhook.hrl"). -include_lib("emqx/include/logger.hrl"). - --define(CNTER, emqx_exhook_counter). -define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client). %% Load/Unload @@ -33,7 +31,7 @@ %% Infos -export([ name/1 - , hookpoints/1 + , hooks/1 , format/1 , failed_action/1 ]). @@ -72,7 +70,7 @@ | 'message.acked' | 'message.dropped'. --export_type([server/0]). +-export_type([server/0, hookpoint/0]). -dialyzer({nowarn_function, [inc_metrics/2]}). @@ -215,20 +213,20 @@ ensure_hooks(HookSpecs) -> ?SLOG(error, #{msg => "skipped_unknown_hookpoint", hookpoint => Hookpoint}); {Hookpoint, {M, F, A}} -> emqx_hooks:put(Hookpoint, {M, F, A}), - ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0}) + ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, 1}, {Hookpoint, 0}) end end, maps:keys(HookSpecs)). may_unload_hooks(HookSpecs) -> lists:foreach(fun(Hookpoint) -> - case ets:update_counter(?CNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of + case ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of Cnt when Cnt =< 0 -> case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of {Hookpoint, {M, F, _A}} -> emqx_hooks:del(Hookpoint, {M, F}); _ -> ok end, - ets:delete(?CNTER, Hookpoint); + ets:delete(?HOOKS_REF_COUNTER, Hookpoint); _ -> ok end end, maps:keys(HookSpecs)). @@ -244,8 +242,13 @@ format(#{name := Name, hookspec := Hooks}) -> name(#{name := Name}) -> Name. -hookpoints(#{hookspec := Hooks}) -> - maps:keys(Hooks). +hooks(#{hookspec := Hooks}) -> + FoldFun = fun(Hook, Params, Acc) -> + [#{ name => Hook + , params => Params + } | Acc] + end, + maps:fold(FoldFun, [], Hooks). -spec call(hookpoint(), map(), server()) -> ignore | {ok, Resp :: term()} diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index 0675b3279..513ca783e 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -42,7 +42,8 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - _ = emqx_exhook_mgr:init_counter_table(), + _ = emqx_exhook_metrics:init(), + _ = emqx_exhook_mgr:init_ref_counter_table(), Mngr = ?CHILD(emqx_exhook_mgr, worker, []), {ok, {{one_for_one, 10, 100}, [Mngr]}}. diff --git a/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl index b82451714..2b0a42e27 100644 --- a/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl @@ -37,7 +37,7 @@ exhook { ">>). all() -> - [t_list, t_get, t_add, t_move_1, t_move_2, t_delete, t_update]. + [t_list, t_get, t_add, t_move_1, t_move_2, t_delete, t_hooks, t_update]. init_per_suite(Config) -> application:load(emqx_conf), @@ -94,7 +94,11 @@ t_list(_) -> [Svr] = List, ?assertMatch(#{name := <<"default">>, - status := <<"running">>}, Svr). + metrics := _, + node_metrics := _, + node_status := _, + hooks := _ + }, Svr). t_get(_) -> {ok, Data} = request_api(get, api_path(["exhooks", "default"]), "", @@ -103,7 +107,11 @@ t_get(_) -> Svr = decode_json(Data), ?assertMatch(#{name := <<"default">>, - status := <<"running">>}, Svr). + metrics := _, + node_metrics := _, + node_status := _, + hooks := _ + }, Svr). t_add(Cfg) -> Template = proplists:get_value(template, Cfg), @@ -116,7 +124,10 @@ t_add(Cfg) -> Svr = decode_json(Data), ?assertMatch(#{name := <<"test1">>, - status := <<"running">>}, Svr), + metrics := _, + node_metrics := _, + node_status := _, + hooks := _}, Svr), ?assertMatch([<<"default">>, <<"test1">>], emqx_exhook_mgr:running()). @@ -143,6 +154,18 @@ t_delete(_) -> ?assertMatch({ok, <<>>}, Result), ?assertMatch([<<"default">>], emqx_exhook_mgr:running()). +t_hooks(_Cfg) -> + {ok, Data} = request_api(get, api_path(["exhooks", "default", "hooks"]), "", + auth_header_()), + + [Hook1 | _] = decode_json(Data), + + ?assertMatch(#{name := _, + params := _, + metrics := _, + node_metrics := _ + }, Hook1). + t_update(Cfg) -> Template = proplists:get_value(template, Cfg), Instance = Template#{enable => false}, From 86af3a9b8fd73013fef173757848b8d2fb6e2a8a Mon Sep 17 00:00:00 2001 From: lafirest Date: Mon, 17 Jan 2022 17:16:58 +0800 Subject: [PATCH 2/2] fix(emqx_exhook): fix some error of indent and dialyzer --- apps/emqx_exhook/src/emqx_exhook_api.erl | 11 +++- apps/emqx_exhook/src/emqx_exhook_metrics.erl | 53 ++++++++------------ apps/emqx_exhook/src/emqx_exhook_mgr.erl | 2 +- 3 files changed, 31 insertions(+), 35 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook_api.erl b/apps/emqx_exhook/src/emqx_exhook_api.erl index 02264c143..2df2176d3 100644 --- a/apps/emqx_exhook/src/emqx_exhook_api.erl +++ b/apps/emqx_exhook/src/emqx_exhook_api.erl @@ -32,6 +32,14 @@ -define(BAD_REQUEST, 'BAD_REQUEST'). -define(BAD_RPC, 'BAD_RPC'). +-type rpc_result() :: {error, any()} + | any(). + +-dialyzer([{nowarn_function, [ fill_cluster_server_info/5 + , nodes_server_info/5 + , fill_server_hooks_info/4 + ]}]). + %%-------------------------------------------------------------------- %% schema %%-------------------------------------------------------------------- @@ -144,7 +152,7 @@ fields(node_metrics) -> fields(node_status) -> [ {node, mk(string(), #{})} - , {status, mk(enum([running, waiting, stopped, not_found, error]), #{})} + , {status, mk(enum([running, waiting, stopped, error]), #{})} ]; fields(hook_info) -> @@ -387,6 +395,7 @@ call_cluster(Module, Fun, Args) -> Nodes = mria_mnesia:running_nodes(), [{Node, rpc_call(Node, Module, Fun, Args)} || Node <- Nodes]. +-spec rpc_call(node(), atom(), atom(), list()) -> rpc_result(). rpc_call(Node, Module, Fun, Args) when Node =:= node() -> erlang:apply(Module, Fun, Args); diff --git a/apps/emqx_exhook/src/emqx_exhook_metrics.erl b/apps/emqx_exhook/src/emqx_exhook_metrics.erl index ce92bb4db..abe592a97 100644 --- a/apps/emqx_exhook/src/emqx_exhook_metrics.erl +++ b/apps/emqx_exhook/src/emqx_exhook_metrics.erl @@ -21,18 +21,17 @@ %% API -export([ init/0, succeed/2, failed/2 , update/1, new_metrics_info/0, servers_metrics/0 - , delete_server/1, server_metrics/1, hooks_metrics/1 - , metrics_aggregate/1, metrics_aggregate_by_key/2, hooks_metrics_aggregate/1 + , on_server_deleted/1, server_metrics/1, hooks_metrics/1 + , metrics_aggregate/1, metrics_aggregate_by_key/2 , metrics_aggregate_by/2 ]). --record(metrics, { - index :: index() - ,succeed = 0 :: non_neg_integer() - ,failed = 0 :: non_neg_integer() - ,rate = 0 :: non_neg_integer() - ,max_rate = 0 :: non_neg_integer() - ,window_rate :: integer() +-record(metrics, { index :: index() + , succeed = 0 :: non_neg_integer() + , failed = 0 :: non_neg_integer() + , rate = 0 :: non_neg_integer() + , max_rate = 0 :: non_neg_integer() + , window_rate :: integer() }). -type server_name() :: emqx_exhook_mgr:server_name(). @@ -69,7 +68,7 @@ new_metric_info() -> max_rate => 0 }. --spec succeed(server_name(), hookpoint()) -> integer(). +-spec succeed(server_name(), hookpoint()) -> ok. succeed(Server, Hook) -> inc(Server, Hook, #metrics.succeed, #metrics{index = {Server, Hook} @@ -77,7 +76,7 @@ succeed(Server, Hook) -> ,succeed = 1 }). --spec failed(server_name(), hookpoint()) -> integer(). +-spec failed(server_name(), hookpoint()) -> ok. failed(Server, Hook) -> inc(Server, Hook, #metrics.failed, #metrics{index = {Server, Hook} @@ -104,8 +103,8 @@ update(Interval) -> ets:foldl(Fun, true, ?HOOKS_METRICS). --spec delete_server(server_name()) -> true. -delete_server(Name) -> +-spec on_server_deleted(server_name()) -> true. +on_server_deleted(Name) -> ets:match_delete(?HOOKS_METRICS, {metrics, {Name, '_'}, '_', '_', '_', '_', '_'}). @@ -185,30 +184,17 @@ metrics_aggregate_by_key(Key, MetricsL) -> metrics_aggregate_by(fun(X) -> maps:get(Key, X, new_metrics_info()) end, MetricsL). --spec hooks_metrics_aggregate(list(hooks_metrics())) -> hooks_metrics(). -hooks_metrics_aggregate([]) -> - #{}; - -hooks_metrics_aggregate([H | _] = MapL) -> - Hooks = maps:keys(H), - - Fold = fun(Hook, Acc) -> - Metrics = metrics_aggregate_by_key(Hook, MapL), - Acc#{Hook => Metrics} - end, - - lists:foldl(Fold, #{}, Hooks). - %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- --spec inc(server_name(), hookpoint(), pos_integer(), #metrics{}) -> integer(). +-spec inc(server_name(), hookpoint(), pos_integer(), #metrics{}) -> ok. inc(Server, Hook, Pos, Default) -> Index = {Server, Hook}, - ets:update_counter(?HOOKS_METRICS, - Index, - [{#metrics.window_rate, 1}, {Pos, 1}], - Default). + _ = ets:update_counter(?HOOKS_METRICS, + Index, + [{#metrics.window_rate, 1}, {Pos, 1}], + Default), + ok. -spec new_metrics_info() -> metrics_info(). new_metrics_info() -> @@ -231,7 +217,8 @@ metrics_add(#{succeed := S1, failed := F1, rate := R1, max_rate := M1} , max_rate := M1 + M2 }. --spec metrics_aggregate_by(fun((any()) -> metrics_info()), list(metrics_info())) -> metrics_info(). +-spec metrics_aggregate_by(fun((X) -> metrics_info()), list(X)) -> metrics_info() + when X :: any(). metrics_aggregate_by(_, []) -> new_metric_info(); diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl index 6123cd468..300858807 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mgr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -241,7 +241,7 @@ handle_call({update_config, {delete, ToDelete}, _}, _From, State) -> orders := maps:remove(ToDelete, Orders) }, - emqx_exhook_metrics:delete_server(ToDelete), + emqx_exhook_metrics:on_server_deleted(ToDelete), {reply, ok, State3};