Merge pull request #6700 from lafirest/feat/exhook_metrics

feat(emqx_exhooks): add metrics
This commit is contained in:
lafirest 2022-01-18 09:53:11 +08:00 committed by GitHub
commit 8f9ecf3e90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 541 additions and 128 deletions

View File

@ -18,6 +18,9 @@
-define(EMQX_EXHOOK_HRL, true). -define(EMQX_EXHOOK_HRL, true).
-define(APP, emqx_exhook). -define(APP, emqx_exhook).
-define(HOOKS_REF_COUNTER, emqx_exhook_ref_counter).
-define(HOOKS_METRICS, emqx_exhook_metrics).
-define(METRICS_PRECISION, 1).
-define(ENABLED_HOOKS, -define(ENABLED_HOOKS,
[ {'client.connect', {emqx_exhook_handler, on_client_connect, []}} [ {'client.connect', {emqx_exhook_handler, on_client_connect, []}}

View File

@ -23,33 +23,44 @@
-export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]). -export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]).
-export([exhooks/2, action_with_name/2, move/2]). -export([exhooks/2, action_with_name/2, move/2, server_hooks/2]).
-import(hoconsc, [mk/2, ref/1, enum/1, array/1]). -import(hoconsc, [mk/2, ref/1, enum/1, array/1, map/2]).
-import(emqx_dashboard_swagger, [schema_with_example/2, error_codes/2]). -import(emqx_dashboard_swagger, [schema_with_example/2, error_codes/2]).
-define(TAGS, [<<"exhooks">>]). -define(TAGS, [<<"exhooks">>]).
-define(BAD_REQUEST, 'BAD_REQUEST'). -define(BAD_REQUEST, 'BAD_REQUEST').
-define(BAD_RPC, 'BAD_RPC'). -define(BAD_RPC, 'BAD_RPC').
-type rpc_result() :: {error, any()}
| any().
-dialyzer([{nowarn_function, [ fill_cluster_server_info/5
, nodes_server_info/5
, fill_server_hooks_info/4
]}]).
%%--------------------------------------------------------------------
%% schema
%%--------------------------------------------------------------------
namespace() -> "exhook". namespace() -> "exhook".
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE). emqx_dashboard_swagger:spec(?MODULE).
paths() -> ["/exhooks", "/exhooks/:name", "/exhooks/:name/move"]. paths() -> ["/exhooks", "/exhooks/:name", "/exhooks/:name/move", "/exhooks/:name/hooks"].
schema(("/exhooks")) -> schema(("/exhooks")) ->
#{ #{
'operationId' => exhooks, 'operationId' => exhooks,
get => #{tags => ?TAGS, get => #{tags => ?TAGS,
description => <<"List all servers">>, description => <<"List all servers">>,
responses => #{200 => mk(array(ref(detailed_server_info)), #{})} responses => #{200 => mk(array(ref(list_server_info)), #{})}
}, },
post => #{tags => ?TAGS, post => #{tags => ?TAGS,
description => <<"Add a servers">>, description => <<"Add a servers">>,
'requestBody' => server_conf_schema(), 'requestBody' => server_conf_schema(),
responses => #{201 => mk(ref(detailed_server_info), #{}), responses => #{201 => mk(ref(detail_server_info), #{}),
500 => error_codes([?BAD_RPC], <<"Bad RPC">>) 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
} }
} }
@ -60,7 +71,7 @@ schema("/exhooks/:name") ->
get => #{tags => ?TAGS, get => #{tags => ?TAGS,
description => <<"Get the detail information of server">>, description => <<"Get the detail information of server">>,
parameters => params_server_name_in_path(), parameters => params_server_name_in_path(),
responses => #{200 => mk(ref(detailed_server_info), #{}), responses => #{200 => mk(ref(detail_server_info), #{}),
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>)
} }
}, },
@ -77,7 +88,17 @@ schema("/exhooks/:name") ->
description => <<"Delete the server">>, description => <<"Delete the server">>,
parameters => params_server_name_in_path(), parameters => params_server_name_in_path(),
responses => #{204 => <<>>, responses => #{204 => <<>>,
500 => error_codes([?BAD_RPC], <<"Bad RPC">>) 500 => error_codes([?BAD_RPC], <<"Bad RPC">>) }
}
};
schema("/exhooks/:name/hooks") ->
#{'operationId' => server_hooks,
get => #{tags => ?TAGS,
description => <<"Get the hooks information of server">>,
parameters => params_server_name_in_path(),
responses => #{200 => mk(array(ref(list_hook_info)), #{}),
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>)
} }
} }
}; };
@ -96,23 +117,54 @@ schema("/exhooks/:name/move") ->
}. }.
fields(move_req) -> fields(move_req) ->
[ [ {position, mk(enum([top, bottom, before, 'after']), #{})}
{position, mk(enum([top, bottom, before, 'after']), #{})}, , {related, mk(string(), #{desc => <<"Relative position of movement">>,
{related, mk(string(), #{desc => <<"Relative position of movement">>,
default => <<>>, default => <<>>,
example => <<>> example => <<>>
})} })}
]; ];
fields(detailed_server_info) -> fields(list_server_info) ->
[ {status, mk(enum([running, waiting, stopped]), #{})} [ {metrics, mk(ref(metrics), #{})}
, {hooks, mk(array(string()), #{default => []})} , {node_metrics, mk(array(ref(node_metrics)), #{})}
, {node_status, mk(ref(node_status), #{})} , {node_status, mk(array(ref(node_status)), #{})}
, {hooks, mk(array(ref(hook_info)), #{})}
] ++ emqx_exhook_schema:server_config(); ] ++ emqx_exhook_schema:server_config();
fields(detail_server_info) ->
[ {metrics, mk(ref(metrics), #{})}
, {node_metrics, mk(array(ref(node_metrics)), #{})}
, {node_status, mk(array(ref(node_status)), #{})}
, {hooks, mk(array(ref(hook_info)), #{})}
] ++ emqx_exhook_schema:server_config();
fields(list_hook_info) ->
[ {name, mk(binary(), #{})}
, {params, mk(map(name, binary()), #{})}
, {metrics, mk(ref(metrics), #{})}
, {node_metrics, mk(array(ref(node_metrics)), #{})}
];
fields(node_metrics) ->
[ {node, mk(string(), #{})}
, {metrics, mk(ref(metrics), #{})}
];
fields(node_status) -> fields(node_status) ->
[ {node, mk(string(), #{})} [ {node, mk(string(), #{})}
, {status, mk(enum([running, waiting, stopped, not_found, error]), #{})} , {status, mk(enum([running, waiting, stopped, error]), #{})}
];
fields(hook_info) ->
[ {name, mk(binary(), #{})}
, {params, mk(map(name, binary()), #{})}
];
fields(metrics) ->
[ {succeed, mk(integer(), #{})}
, {failed, mk(integer(), #{})}
, {rate, mk(integer(), #{})}
, {max_rate, mk(integer(), #{})}
]; ];
fields(server_config) -> fields(server_config) ->
@ -140,16 +192,19 @@ server_conf_schema() ->
} }
}). }).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
exhooks(get, _) -> exhooks(get, _) ->
ServerL = emqx_exhook_mgr:list(), Confs = emqx:get_config([exhook, servers]),
ServerL2 = nodes_all_server_status(ServerL), Infos = nodes_all_server_info(Confs),
{200, ServerL2}; {200, Infos};
exhooks(post, #{body := Body}) -> exhooks(post, #{body := Body}) ->
case emqx_exhook_mgr:update_config([exhook, servers], {add, Body}) of case emqx_exhook_mgr:update_config([exhook, servers], {add, Body}) of
{ok, Result} -> {ok, _} ->
{201, Result}; #{<<"name">> := Name} = Body,
get_nodes_server_info(Name);
{error, Error} -> {error, Error} ->
{500, #{code => <<"BAD_RPC">>, {500, #{code => <<"BAD_RPC">>,
message => Error message => Error
@ -157,16 +212,7 @@ exhooks(post, #{body := Body}) ->
end. end.
action_with_name(get, #{bindings := #{name := Name}}) -> action_with_name(get, #{bindings := #{name := Name}}) ->
Result = emqx_exhook_mgr:lookup(Name), get_nodes_server_info(Name);
NodeStatus = nodes_server_status(Name),
case Result of
not_found ->
{400, #{code => <<"BAD_REQUEST">>,
message => <<"Server not found">>
}};
ServerInfo ->
{200, ServerInfo#{node_status => NodeStatus}}
end;
action_with_name(put, #{bindings := #{name := Name}, body := Body}) -> action_with_name(put, #{bindings := #{name := Name}, body := Body}) ->
case emqx_exhook_mgr:update_config([exhook, servers], case emqx_exhook_mgr:update_config([exhook, servers],
@ -177,8 +223,7 @@ action_with_name(put, #{bindings := #{name := Name}, body := Body}) ->
}}; }};
{ok, {error, Reason}} -> {ok, {error, Reason}} ->
{400, #{code => <<"BAD_REQUEST">>, {400, #{code => <<"BAD_REQUEST">>,
message => unicode:characters_to_binary( message => unicode:characters_to_binary(io_lib:format("Error Reason:~p~n", [Reason]))
io_lib:format("Error Reason:~p~n", [Reason]))
}}; }};
{ok, _} -> {ok, _} ->
{200}; {200};
@ -216,63 +261,141 @@ move(post, #{bindings := #{name := Name}, body := Body}) ->
}} }}
end. end.
nodes_server_status(Name) -> server_hooks(get, #{bindings := #{name := Name}}) ->
StatusL = call_cluster(emqx_exhook_mgr, server_status, [Name]), Confs = emqx:get_config([exhook, servers]),
case lists:search(fun(#{name := CfgName}) -> CfgName =:= Name end, Confs) of
false ->
{400, #{code => <<"BAD_REQUEST">>,
message => <<"Server not found">>
}};
_ ->
Info = get_nodes_server_hooks_info(Name),
{200, Info}
end.
Handler = fun({Node, {error, _}}) -> get_nodes_server_info(Name) ->
#{node => Node, Confs = emqx:get_config([exhook, servers]),
status => error case lists:search(fun(#{name := CfgName}) -> CfgName =:= Name end, Confs) of
}; false ->
({Node, Status}) -> {400, #{code => <<"BAD_REQUEST">>,
#{node => Node, message => <<"Server not found">>
status => Status }};
} {value, Conf} ->
end, NodeStatus = nodes_server_info(Name),
{200, maps:merge(Conf, NodeStatus)}
end.
lists:map(Handler, StatusL). %%--------------------------------------------------------------------
%% GET /exhooks
%%--------------------------------------------------------------------
nodes_all_server_info(ConfL) ->
AllInfos = call_cluster(emqx_exhook_mgr, all_servers_info, []),
Default = emqx_exhook_metrics:new_metrics_info(),
node_all_server_info(ConfL, AllInfos, Default, []).
nodes_all_server_status(ServerL) -> node_all_server_info([#{name := ServerName} = Conf | T], AllInfos, Default, Acc) ->
AllStatusL = call_cluster(emqx_exhook_mgr, all_servers_status, []), Info = fill_cluster_server_info(AllInfos, [], [], ServerName, Default),
AllInfo = maps:merge(Conf, Info),
node_all_server_info(T, AllInfos, Default, [AllInfo | Acc]);
AggreMap = lists:foldl(fun(#{name := Name}, Acc) -> node_all_server_info([], _, _, Acc) ->
Acc#{Name => []} lists:reverse(Acc).
end,
#{},
ServerL),
AddToMap = fun(Servers, Node, Status, Map) -> fill_cluster_server_info([{Node, {error, _}} | T], StatusL, MetricsL, ServerName, Default) ->
lists:foldl(fun(Name, Acc) -> fill_cluster_server_info(T,
StatusL = maps:get(Name, Acc), [#{node => Node, status => error} | StatusL],
StatusL2 = [#{node => Node, [#{node => Node, metrics => Default} | MetricsL],
status => Status ServerName,
} | StatusL], Default);
Acc#{Name := StatusL2}
end,
Map,
Servers)
end,
AggreMap2 = lists:foldl(fun({Node, #{running := Running, fill_cluster_server_info([{Node, Result} | T], StatusL, MetricsL, ServerName, Default) ->
waiting := Waiting, #{status := Status, metrics := Metrics} = Result,
stopped := Stopped}}, fill_cluster_server_info(T,
Acc) -> [#{node => Node, status => maps:get(ServerName, Status, error)} | StatusL],
AddToMap(Stopped, Node, stopped, [#{node => Node, metrics => maps:get(ServerName, Metrics, Default)} | MetricsL],
AddToMap(Waiting, Node, waiting, ServerName,
AddToMap(Running, Node, running, Acc))) Default);
end,
AggreMap,
AllStatusL),
Handler = fun(#{name := Name} = Server) -> fill_cluster_server_info([], StatusL, MetricsL, ServerName, _) ->
Server#{node_status => maps:get(Name, AggreMap2)} Metrics = emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
end, #{metrics => Metrics,
node_metrics => MetricsL,
node_status => StatusL,
hooks => emqx_exhook_mgr:hooks(ServerName)
}.
lists:map(Handler, ServerL). %%--------------------------------------------------------------------
%% GET /exhooks/{name}
%%--------------------------------------------------------------------
nodes_server_info(Name) ->
InfoL = call_cluster(emqx_exhook_mgr, server_info, [Name]),
Default = emqx_exhook_metrics:new_metrics_info(),
nodes_server_info(InfoL, Name, Default, [], []).
nodes_server_info([{Node, {error, _}} | T], Name, Default, StatusL, MetricsL) ->
nodes_server_info(T,
Name,
Default,
[#{node => Node, status => error} | StatusL],
[#{node => Node, metrics => Default} | MetricsL]
);
nodes_server_info([{Node, Result} | T], Name, Default, StatusL, MetricsL) ->
#{status := Status, metrics := Metrics} = Result,
nodes_server_info(T,
Name,
Default,
[#{node => Node, status => Status} | StatusL],
[#{node => Node, metrics => Metrics} | MetricsL]
);
nodes_server_info([], Name, _, StatusL, MetricsL) ->
#{metrics => emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
node_status => StatusL,
node_metrics => MetricsL,
hooks => emqx_exhook_mgr:hooks(Name)
}.
%%--------------------------------------------------------------------
%% GET /exhooks/{name}/hooks
%%--------------------------------------------------------------------
get_nodes_server_hooks_info(Name) ->
case emqx_exhook_mgr:hooks(Name) of
[] -> [];
Hooks ->
AllInfos = call_cluster(emqx_exhook_mgr, server_hooks_metrics, [Name]),
Default = emqx_exhook_metrics:new_metrics_info(),
get_nodes_server_hooks_info(Hooks, AllInfos, Default, [])
end.
get_nodes_server_hooks_info([#{name := Name} = Spec | T], AllInfos, Default, Acc) ->
Info = fill_server_hooks_info(AllInfos, Name, Default, []),
AllInfo = maps:merge(Spec, Info),
get_nodes_server_hooks_info(T, AllInfos, Default, [AllInfo | Acc]);
get_nodes_server_hooks_info([], _, _, Acc) ->
Acc.
fill_server_hooks_info([{_, {error, _}} | T], Name, Default, MetricsL) ->
fill_server_hooks_info(T, Name, Default, MetricsL);
fill_server_hooks_info([{Node, MetricsMap} | T], Name, Default, MetricsL) ->
Metrics = maps:get(Name, MetricsMap, Default),
NodeMetrics = #{node => Node, metrics => Metrics},
fill_server_hooks_info(T, Name, Default, [NodeMetrics | MetricsL]);
fill_server_hooks_info([], _Name, _Default, MetricsL) ->
Metrics = emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
#{metrics => Metrics, node_metrics => MetricsL}.
%%--------------------------------------------------------------------
%% cluster call
%%--------------------------------------------------------------------
call_cluster(Module, Fun, Args) -> call_cluster(Module, Fun, Args) ->
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
[{Node, rpc_call(Node, Module, Fun, Args)} || Node <- Nodes]. [{Node, rpc_call(Node, Module, Fun, Args)} || Node <- Nodes].
-spec rpc_call(node(), atom(), atom(), list()) -> rpc_result().
rpc_call(Node, Module, Fun, Args) when Node =:= node() -> rpc_call(Node, Module, Fun, Args) when Node =:= node() ->
erlang:apply(Module, Fun, Args); erlang:apply(Module, Fun, Args);

View File

@ -0,0 +1,234 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exhook_metrics).
-include("emqx_exhook.hrl").
%% API
-export([ init/0, succeed/2, failed/2
, update/1, new_metrics_info/0, servers_metrics/0
, on_server_deleted/1, server_metrics/1, hooks_metrics/1
, metrics_aggregate/1, metrics_aggregate_by_key/2
, metrics_aggregate_by/2
]).
-record(metrics, { index :: index()
, succeed = 0 :: non_neg_integer()
, failed = 0 :: non_neg_integer()
, rate = 0 :: non_neg_integer()
, max_rate = 0 :: non_neg_integer()
, window_rate :: integer()
}).
-type server_name() :: emqx_exhook_mgr:server_name().
-type hookpoint() :: emqx_exhook_server:hookpoint().
-type index() :: {server_name(), hookpoint()}.
-type hooks_metrics() :: #{hookpoint() => metrics_info()}.
-type servers_metrics() :: #{server_name() => metrics_info()}.
-type metrics_info() :: #{ succeed := non_neg_integer()
, failed := non_neg_integer()
, rate := number()
, max_rate := number()
}.
-define(INDEX(ServerName, HookPoint), {ServerName, HookPoint}).
-export_type([metrics_info/0, servers_metrics/0, hooks_metrics/0]).
%%--------------------------------------------------------------------
%%% API
%%--------------------------------------------------------------------
init() ->
_ = ets:new(?HOOKS_METRICS,
[ set, named_table, public
, {keypos, #metrics.index}, {write_concurrency, true}
, {read_concurrency, true}
]),
ok.
-spec new_metric_info() -> metrics_info().
new_metric_info() ->
#{succeed => 0,
failed => 0,
rate => 0,
max_rate => 0
}.
-spec succeed(server_name(), hookpoint()) -> ok.
succeed(Server, Hook) ->
inc(Server, Hook, #metrics.succeed,
#metrics{index = {Server, Hook}
,window_rate = 1
,succeed = 1
}).
-spec failed(server_name(), hookpoint()) -> ok.
failed(Server, Hook) ->
inc(Server, Hook, #metrics.failed,
#metrics{index = {Server, Hook}
,window_rate = 1
,failed = 1
}).
-spec update(pos_integer()) -> true.
update(Interval) ->
Fun = fun(#metrics{rate = Rate,
window_rate = WindowRate,
max_rate = MaxRate} = Metrics,
_) ->
case calc_metric(WindowRate, Interval) of
Rate -> true;
NewRate ->
MaxRate2 = erlang:max(MaxRate, NewRate),
Metrics2 = Metrics#metrics{rate = NewRate,
window_rate = 0,
max_rate = MaxRate2},
ets:insert(?HOOKS_METRICS, Metrics2)
end
end,
ets:foldl(Fun, true, ?HOOKS_METRICS).
-spec on_server_deleted(server_name()) -> true.
on_server_deleted(Name) ->
ets:match_delete(?HOOKS_METRICS,
{metrics, {Name, '_'}, '_', '_', '_', '_', '_'}).
-spec server_metrics(server_name()) -> metrics_info().
server_metrics(SvrName) ->
Hooks = ets:match(?HOOKS_METRICS,
{metrics, {SvrName, '_'}, '_', '_', '_', '_', '_'}),
Fold = fun(#metrics{succeed = Succeed,
failed = Failed,
rate = Rate,
max_rate = MaxRate},
Acc) ->
[#{ succeed => Succeed
, failed => Failed
, rate => Rate
, max_rate => MaxRate
} | Acc]
end,
AllMetrics = lists:foldl(Fold, [], Hooks),
metrics_aggregate(AllMetrics).
-spec servers_metrics() -> servers_metrics().
servers_metrics() ->
AllMetrics = ets:tab2list(?HOOKS_METRICS),
GroupFun = fun(#metrics{index = ?INDEX(ServerName, _),
succeed = Succeed,
failed = Failed,
rate = Rate,
max_rate = MaxRate
},
Acc) ->
SvrGroup = maps:get(ServerName, Acc, []),
Metrics = #{ succeed => Succeed
, failed => Failed
, rate => Rate
, max_rate => MaxRate
},
Acc#{ServerName => [Metrics | SvrGroup]}
end,
GroupBySever = lists:foldl(GroupFun, #{}, AllMetrics),
MapFun = fun(_SvrName, Group) -> metrics_aggregate(Group) end,
maps:map(MapFun, GroupBySever).
-spec hooks_metrics(server_name()) -> hooks_metrics().
hooks_metrics(SvrName) ->
Hooks = ets:match(?HOOKS_METRICS,
{metrics, {SvrName, '_'}, '_', '_', '_', '_', '_'}),
Fold = fun(#metrics{index = ?INDEX(_, HookPoint),
succeed = Succeed,
failed = Failed,
rate = Rate,
max_rate = MaxRate},
Acc) ->
Acc#{HookPoint => #{ succeed => Succeed
, failed => Failed
, rate => Rate
, max_rate => MaxRate
}}
end,
lists:foldl(Fold, #{}, Hooks).
-spec metrics_aggregate(list(metrics_info())) -> metrics_info().
metrics_aggregate(MetricsL) ->
metrics_aggregate_by(fun(X) -> X end, MetricsL).
-spec metrics_aggregate_by_key(Key, list(HasMetrics)) -> metrics_info()
when Key :: any(),
HasMetrics :: #{Key => metrics_info()}.
metrics_aggregate_by_key(Key, MetricsL) ->
metrics_aggregate_by(fun(X) -> maps:get(Key, X, new_metrics_info()) end,
MetricsL).
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
-spec inc(server_name(), hookpoint(), pos_integer(), #metrics{}) -> ok.
inc(Server, Hook, Pos, Default) ->
Index = {Server, Hook},
_ = ets:update_counter(?HOOKS_METRICS,
Index,
[{#metrics.window_rate, 1}, {Pos, 1}],
Default),
ok.
-spec new_metrics_info() -> metrics_info().
new_metrics_info() ->
#{ succeed => 0
, failed => 0
, rate => 0
, max_rate => 0
}.
-spec calc_metric(non_neg_integer(), non_neg_integer()) -> non_neg_integer().
calc_metric(Val, Interval) ->
erlang:ceil(Val * ?METRICS_PRECISION / Interval).
-spec metrics_add(metrics_info(), metrics_info()) -> metrics_info().
metrics_add(#{succeed := S1, failed := F1, rate := R1, max_rate := M1}
, #{succeed := S2, failed := F2, rate := R2, max_rate := M2} = Acc) ->
Acc#{ succeed := S1 + S2
, failed := F1 + F2
, rate := R1 + R2
, max_rate := M1 + M2
}.
-spec metrics_aggregate_by(fun((X) -> metrics_info()), list(X)) -> metrics_info()
when X :: any().
metrics_aggregate_by(_, []) ->
new_metric_info();
metrics_aggregate_by(Fun, MetricsL) ->
Fold = fun(E, Acc) -> metrics_add(Fun(E), Acc) end,
#{rate := Rate,
max_rate := MaxRate} = Result = lists:foldl(Fold, new_metric_info(), MetricsL),
Len = erlang:length(MetricsL),
Result#{rate := Rate div Len,
max_rate := MaxRate div Len
}.

View File

@ -30,14 +30,16 @@
, lookup/1 , lookup/1
, enable/1 , enable/1
, disable/1 , disable/1
, server_status/1 , server_info/1
, all_servers_status/0 , all_servers_info/0
, server_hooks_metrics/1
]). ]).
%% Helper funcs %% Helper funcs
-export([ running/0 -export([ running/0
, server/1 , server/1
, init_counter_table/0 , hooks/1
, init_ref_counter_table/0
]). ]).
-export([ update_config/2 -export([ update_config/2
@ -86,9 +88,9 @@
}. }.
-define(DEFAULT_TIMEOUT, 60000). -define(DEFAULT_TIMEOUT, 60000).
-define(CNTER, emqx_exhook_counter). -define(REFRESH_INTERVAL, timer:seconds(5)).
-export_type([server_info/0]). -export_type([servers/0, server/0, server_info/0]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
@ -113,17 +115,20 @@ enable(Name) ->
disable(Name) -> disable(Name) ->
update_config([exhook, servers], {enable, Name, false}). update_config([exhook, servers], {enable, Name, false}).
server_status(Name) -> server_info(Name) ->
call({server_status, Name}). call({?FUNCTION_NAME, Name}).
all_servers_status() -> all_servers_info() ->
call(all_servers_status). call(?FUNCTION_NAME).
server_hooks_metrics(Name) ->
call({?FUNCTION_NAME, Name}).
call(Req) -> call(Req) ->
gen_server:call(?MODULE, Req, ?DEFAULT_TIMEOUT). gen_server:call(?MODULE, Req, ?DEFAULT_TIMEOUT).
init_counter_table() -> init_ref_counter_table() ->
_ = ets:new(?CNTER, [named_table, public]). _ = ets:new(?HOOKS_REF_COUNTER, [named_table, public]).
%%===================================================================== %%=====================================================================
%% Hocon schema %% Hocon schema
@ -180,6 +185,7 @@ init([]) ->
ServerL = emqx:get_config([exhook, servers]), ServerL = emqx:get_config([exhook, servers]),
{Waiting, Running, Stopped} = load_all_servers(ServerL), {Waiting, Running, Stopped} = load_all_servers(ServerL),
Orders = reorder(ServerL), Orders = reorder(ServerL),
refresh_tick(),
{ok, ensure_reload_timer( {ok, ensure_reload_timer(
#{waiting => Waiting, #{waiting => Waiting,
running => Running, running => Running,
@ -235,6 +241,8 @@ handle_call({update_config, {delete, ToDelete}, _}, _From, State) ->
orders := maps:remove(ToDelete, Orders) orders := maps:remove(ToDelete, Orders)
}, },
emqx_exhook_metrics:on_server_deleted(ToDelete),
{reply, ok, State3}; {reply, ok, State3};
handle_call({update_config, {add, RawConf}, NewConfL}, handle_call({update_config, {add, RawConf}, NewConfL},
@ -245,32 +253,22 @@ handle_call({update_config, {add, RawConf}, NewConfL},
case emqx_exhook_server:load(Name, Conf) of case emqx_exhook_server:load(Name, Conf) of
{ok, ServerState} -> {ok, ServerState} ->
save(Name, ServerState), save(Name, ServerState),
Status = running,
Hooks = hooks(Name),
State2 = State#{running := Running#{Name => Conf}}; State2 = State#{running := Running#{Name => Conf}};
{error, _} -> {error, _} ->
Status = running,
Hooks = [],
StateT = State#{waiting := Waitting#{Name => Conf}}, StateT = State#{waiting := Waitting#{Name => Conf}},
State2 = ensure_reload_timer(StateT); State2 = ensure_reload_timer(StateT);
disable -> disable ->
Status = stopped,
Hooks = [],
State2 = State#{stopped := Stopped#{Name => Conf}} State2 = State#{stopped := Stopped#{Name => Conf}}
end, end,
Orders = reorder(NewConfL), Orders = reorder(NewConfL),
Resulte = maps:merge(Conf, #{status => Status, hooks => Hooks}), {reply, ok, State2#{orders := Orders}};
{reply, Resulte, State2#{orders := Orders}};
handle_call({lookup, Name}, _From, State) -> handle_call({lookup, Name}, _From, State) ->
case where_is_server(Name, State) of case where_is_server(Name, State) of
not_found -> not_found ->
Result = not_found; Result = not_found;
{Where, #{Name := Conf}} -> {Where, #{Name := Conf}} ->
Result = maps:merge(Conf, Result = maps:merge(Conf, #{status => Where})
#{ status => Where
, hooks => hooks(Name)
})
end, end,
{reply, Result, State}; {reply, Result, State};
@ -282,21 +280,41 @@ handle_call({update_config, {enable, Name, _Enable}, NewConfL}, _From, State) ->
{Result, State2} = restart_server(Name, NewConfL, State), {Result, State2} = restart_server(Name, NewConfL, State),
{reply, Result, State2}; {reply, Result, State2};
handle_call({server_status, Name}, _From, State) -> handle_call({server_info, Name}, _From, State) ->
case where_is_server(Name, State) of case where_is_server(Name, State) of
not_found -> not_found ->
Result = not_found; Result = not_found;
{Status, _} -> {Status, _} ->
Result = Status HooksMetrics = emqx_exhook_metrics:server_metrics(Name),
Result = #{ status => Status
, metrics => HooksMetrics
}
end, end,
{reply, Result, State}; {reply, Result, State};
handle_call(all_servers_status, _From, #{running := Running, handle_call(all_servers_info, _From, #{running := Running,
waiting := Waiting, waiting := Waiting,
stopped := Stopped} = State) -> stopped := Stopped} = State) ->
{reply, #{running => maps:keys(Running), MakeStatus = fun(Status, Servers, Acc) ->
waiting => maps:keys(Waiting), lists:foldl(fun(Name, IAcc) -> IAcc#{Name => Status} end,
stopped => maps:keys(Stopped)}, State}; Acc,
maps:keys(Servers))
end,
Status = lists:foldl(fun({Status, Servers}, Acc) -> MakeStatus(Status, Servers, Acc) end,
#{},
[{running, Running}, {waiting, Waiting}, {stopped, Stopped}]),
Metrics = emqx_exhook_metrics:servers_metrics(),
Result = #{ status => Status
, metrics => Metrics
},
{reply, Result, State};
handle_call({server_hooks_metrics, Name}, _From, State) ->
Result = emqx_exhook_metrics:hooks_metrics(Name),
{reply, Result, State};
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
Reply = ok, Reply = ok,
@ -318,6 +336,11 @@ handle_info({timeout, _Ref, {reload, Name}}, State) ->
{noreply, ensure_reload_timer(NState)} {noreply, ensure_reload_timer(NState)}
end; end;
handle_info(refresh_tick, State) ->
refresh_tick(),
emqx_exhook_metrics:update(?REFRESH_INTERVAL),
{noreply, State};
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
@ -490,7 +513,6 @@ get_servers_info(Status, Map) ->
end, end,
maps:fold(Fold, [], Map). maps:fold(Fold, [], Map).
where_is_server(Name, #{running := Running}) when is_map_key(Name, Running) -> where_is_server(Name, #{running := Running}) when is_map_key(Name, Running) ->
{running, Running}; {running, Running};
@ -549,6 +571,10 @@ sort_name_by_order(Names, Orders) ->
maps:get(A, Orders) < maps:get(B, Orders) maps:get(A, Orders) < maps:get(B, Orders)
end, end,
Names). Names).
refresh_tick() ->
erlang:send_after(?REFRESH_INTERVAL, self(), ?FUNCTION_NAME).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Server state persistent %% Server state persistent
save(Name, ServerState) -> save(Name, ServerState) ->
@ -590,5 +616,5 @@ hooks(Name) ->
undefined -> undefined ->
[]; [];
Service -> Service ->
emqx_exhook_server:hookpoints(Service) emqx_exhook_server:hooks(Service)
end. end.

View File

@ -19,8 +19,6 @@
-include("emqx_exhook.hrl"). -include("emqx_exhook.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-define(CNTER, emqx_exhook_counter).
-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client). -define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
%% Load/Unload %% Load/Unload
@ -33,7 +31,7 @@
%% Infos %% Infos
-export([ name/1 -export([ name/1
, hookpoints/1 , hooks/1
, format/1 , format/1
, failed_action/1 , failed_action/1
]). ]).
@ -72,7 +70,7 @@
| 'message.acked' | 'message.acked'
| 'message.dropped'. | 'message.dropped'.
-export_type([server/0]). -export_type([server/0, hookpoint/0]).
-dialyzer({nowarn_function, [inc_metrics/2]}). -dialyzer({nowarn_function, [inc_metrics/2]}).
@ -215,20 +213,20 @@ ensure_hooks(HookSpecs) ->
?SLOG(error, #{msg => "skipped_unknown_hookpoint", hookpoint => Hookpoint}); ?SLOG(error, #{msg => "skipped_unknown_hookpoint", hookpoint => Hookpoint});
{Hookpoint, {M, F, A}} -> {Hookpoint, {M, F, A}} ->
emqx_hooks:put(Hookpoint, {M, F, A}), emqx_hooks:put(Hookpoint, {M, F, A}),
ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0}) ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
end end
end, maps:keys(HookSpecs)). end, maps:keys(HookSpecs)).
may_unload_hooks(HookSpecs) -> may_unload_hooks(HookSpecs) ->
lists:foreach(fun(Hookpoint) -> lists:foreach(fun(Hookpoint) ->
case ets:update_counter(?CNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of case ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of
Cnt when Cnt =< 0 -> Cnt when Cnt =< 0 ->
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
{Hookpoint, {M, F, _A}} -> {Hookpoint, {M, F, _A}} ->
emqx_hooks:del(Hookpoint, {M, F}); emqx_hooks:del(Hookpoint, {M, F});
_ -> ok _ -> ok
end, end,
ets:delete(?CNTER, Hookpoint); ets:delete(?HOOKS_REF_COUNTER, Hookpoint);
_ -> ok _ -> ok
end end
end, maps:keys(HookSpecs)). end, maps:keys(HookSpecs)).
@ -244,8 +242,13 @@ format(#{name := Name, hookspec := Hooks}) ->
name(#{name := Name}) -> name(#{name := Name}) ->
Name. Name.
hookpoints(#{hookspec := Hooks}) -> hooks(#{hookspec := Hooks}) ->
maps:keys(Hooks). FoldFun = fun(Hook, Params, Acc) ->
[#{ name => Hook
, params => Params
} | Acc]
end,
maps:fold(FoldFun, [], Hooks).
-spec call(hookpoint(), map(), server()) -> ignore -spec call(hookpoint(), map(), server()) -> ignore
| {ok, Resp :: term()} | {ok, Resp :: term()}

View File

@ -42,7 +42,8 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
_ = emqx_exhook_mgr:init_counter_table(), _ = emqx_exhook_metrics:init(),
_ = emqx_exhook_mgr:init_ref_counter_table(),
Mngr = ?CHILD(emqx_exhook_mgr, worker, []), Mngr = ?CHILD(emqx_exhook_mgr, worker, []),
{ok, {{one_for_one, 10, 100}, [Mngr]}}. {ok, {{one_for_one, 10, 100}, [Mngr]}}.

View File

@ -37,7 +37,7 @@ exhook {
">>). ">>).
all() -> all() ->
[t_list, t_get, t_add, t_move_1, t_move_2, t_delete, t_update]. [t_list, t_get, t_add, t_move_1, t_move_2, t_delete, t_hooks, t_update].
init_per_suite(Config) -> init_per_suite(Config) ->
application:load(emqx_conf), application:load(emqx_conf),
@ -94,7 +94,11 @@ t_list(_) ->
[Svr] = List, [Svr] = List,
?assertMatch(#{name := <<"default">>, ?assertMatch(#{name := <<"default">>,
status := <<"running">>}, Svr). metrics := _,
node_metrics := _,
node_status := _,
hooks := _
}, Svr).
t_get(_) -> t_get(_) ->
{ok, Data} = request_api(get, api_path(["exhooks", "default"]), "", {ok, Data} = request_api(get, api_path(["exhooks", "default"]), "",
@ -103,7 +107,11 @@ t_get(_) ->
Svr = decode_json(Data), Svr = decode_json(Data),
?assertMatch(#{name := <<"default">>, ?assertMatch(#{name := <<"default">>,
status := <<"running">>}, Svr). metrics := _,
node_metrics := _,
node_status := _,
hooks := _
}, Svr).
t_add(Cfg) -> t_add(Cfg) ->
Template = proplists:get_value(template, Cfg), Template = proplists:get_value(template, Cfg),
@ -116,7 +124,10 @@ t_add(Cfg) ->
Svr = decode_json(Data), Svr = decode_json(Data),
?assertMatch(#{name := <<"test1">>, ?assertMatch(#{name := <<"test1">>,
status := <<"running">>}, Svr), metrics := _,
node_metrics := _,
node_status := _,
hooks := _}, Svr),
?assertMatch([<<"default">>, <<"test1">>], emqx_exhook_mgr:running()). ?assertMatch([<<"default">>, <<"test1">>], emqx_exhook_mgr:running()).
@ -143,6 +154,18 @@ t_delete(_) ->
?assertMatch({ok, <<>>}, Result), ?assertMatch({ok, <<>>}, Result),
?assertMatch([<<"default">>], emqx_exhook_mgr:running()). ?assertMatch([<<"default">>], emqx_exhook_mgr:running()).
t_hooks(_Cfg) ->
{ok, Data} = request_api(get, api_path(["exhooks", "default", "hooks"]), "",
auth_header_()),
[Hook1 | _] = decode_json(Data),
?assertMatch(#{name := _,
params := _,
metrics := _,
node_metrics := _
}, Hook1).
t_update(Cfg) -> t_update(Cfg) ->
Template = proplists:get_value(template, Cfg), Template = proplists:get_value(template, Cfg),
Instance = Template#{enable => false}, Instance = Template#{enable => false},