Merge pull request #7365 from lafirest/fix/emqx_exhook_metrics

fix(exhook): use emqx_hook_metrics to count hook call information
This commit is contained in:
lafirest 2022-03-22 17:47:24 +08:00 committed by GitHub
commit c257b0acc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 11 deletions

View File

@ -20,7 +20,6 @@
-define(APP, emqx_exhook). -define(APP, emqx_exhook).
-define(HOOKS_REF_COUNTER, emqx_exhook_ref_counter). -define(HOOKS_REF_COUNTER, emqx_exhook_ref_counter).
-define(HOOKS_METRICS, emqx_exhook_metrics). -define(HOOKS_METRICS, emqx_exhook_metrics).
-define(METRICS_PRECISION, 1).
-define(ENABLED_HOOKS, -define(ENABLED_HOOKS,
[ {'client.connect', {emqx_exhook_handler, on_client_connect, []}} [ {'client.connect', {emqx_exhook_handler, on_client_connect, []}}

View File

@ -206,7 +206,8 @@ new_metrics_info() ->
-spec calc_metric(non_neg_integer(), non_neg_integer()) -> non_neg_integer(). -spec calc_metric(non_neg_integer(), non_neg_integer()) -> non_neg_integer().
calc_metric(Val, Interval) -> calc_metric(Val, Interval) ->
erlang:ceil(Val * ?METRICS_PRECISION / Interval). %% the base unit of interval is milliseconds, but the rate is seconds
erlang:ceil(Val * 1000 / Interval).
-spec metrics_add(metrics_info(), metrics_info()) -> metrics_info(). -spec metrics_add(metrics_info(), metrics_info()) -> metrics_info().
metrics_add(#{succeed := S1, failed := F1, rate := R1, max_rate := M1} metrics_add(#{succeed := S1, failed := F1, rate := R1, max_rate := M1}

View File

@ -153,7 +153,7 @@ unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) ->
ok. ok.
do_deinit(Name, ReqOpts) -> do_deinit(Name, ReqOpts) ->
_ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts), _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts),
ok. ok.
do_init(ChannName, ReqOpts) -> do_init(ChannName, ReqOpts) ->
@ -161,7 +161,7 @@ do_init(ChannName, ReqOpts) ->
BrokerInfo = maps:with([version, sysdescr, uptime, datetime], BrokerInfo = maps:with([version, sysdescr, uptime, datetime],
maps:from_list(emqx_sys:info())), maps:from_list(emqx_sys:info())),
Req = #{broker => BrokerInfo}, Req = #{broker => BrokerInfo},
case do_call(ChannName, 'on_provider_loaded', Req, ReqOpts) of case do_call(ChannName, undefined, 'on_provider_loaded', Req, ReqOpts) of
{ok, InitialResp} -> {ok, InitialResp} ->
try try
{ok, resolve_hookspec(maps:get(hooks, InitialResp, []))} {ok, resolve_hookspec(maps:get(hooks, InitialResp, []))}
@ -255,7 +255,6 @@ hooks(#{hookspec := Hooks}) ->
| {error, term()}. | {error, term()}.
call(Hookpoint, Req, #{name := ChannName, options := ReqOpts, call(Hookpoint, Req, #{name := ChannName, options := ReqOpts,
hookspec := Hooks, prefix := Prefix}) -> hookspec := Hooks, prefix := Prefix}) ->
GrpcFunc = hk2func(Hookpoint),
case maps:get(Hookpoint, Hooks, undefined) of case maps:get(Hookpoint, Hooks, undefined) of
undefined -> ignore; undefined -> ignore;
Opts -> Opts ->
@ -269,7 +268,8 @@ call(Hookpoint, Req, #{name := ChannName, options := ReqOpts,
false -> ignore; false -> ignore;
_ -> _ ->
inc_metrics(Prefix, Hookpoint), inc_metrics(Prefix, Hookpoint),
do_call(ChannName, GrpcFunc, Req, ReqOpts) GrpcFun = hk2func(Hookpoint),
do_call(ChannName, Hookpoint, GrpcFun, Req, ReqOpts)
end end
end. end.
@ -287,29 +287,39 @@ match_topic_filter(_, []) ->
match_topic_filter(TopicName, TopicFilter) -> match_topic_filter(TopicName, TopicFilter) ->
lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter). lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
-spec do_call(binary(), atom(), map(), map()) -> {ok, map()} | {error, term()}. -spec do_call(binary(), atom(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
do_call(ChannName, Fun, Req, ReqOpts) -> do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) ->
Options = ReqOpts#{channel => ChannName}, Options = ReqOpts#{channel => ChannName},
?SLOG(debug, #{msg => "do_call", module => ?PB_CLIENT_MOD, function => Fun, ?SLOG(debug, #{msg => "do_call", module => ?PB_CLIENT_MOD, function => Fun,
req => Req, options => Options}), req => Req, options => Options}),
case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of
{ok, Resp, Metadata} -> {ok, Resp, Metadata} ->
?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}), ?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}),
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:succeed/2),
{ok, Resp}; {ok, Resp};
{error, {Code, Msg}, _Metadata} -> {error, {Code, Msg}, _Metadata} ->
?SLOG(error, #{msg => "exhook_call_error", module => ?PB_CLIENT_MOD, function => Fun, ?SLOG(error, #{msg => "exhook_call_error", module => ?PB_CLIENT_MOD, function => Fun,
req => Req, options => Options, code => Code, packet => Msg}), req => Req, options => Options, code => Code, packet => Msg}),
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
{error, {Code, Msg}}; {error, {Code, Msg}};
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{msg => "exhook_call_error", module => ?PB_CLIENT_MOD, function => Fun, ?SLOG(error, #{msg => "exhook_call_error", module => ?PB_CLIENT_MOD, function => Fun,
req => Req, options => Options, reason => Reason}), req => Req, options => Options, reason => Reason}),
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
{error, Reason}; {error, Reason};
{'EXIT', {Reason, Stk}} -> {'EXIT', {Reason, Stk}} ->
?SLOG(error, #{msg => "exhook_call_exception", module => ?PB_CLIENT_MOD, function => Fun, ?SLOG(error, #{msg => "exhook_call_exception", module => ?PB_CLIENT_MOD, function => Fun,
req => Req, options => Options, stacktrace => Stk}), req => Req, options => Options, stacktrace => Stk}),
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
{error, Reason} {error, Reason}
end. end.
update_metrics(undefined, _ChannName, _Fun) ->
ok;
update_metrics(Hookpoint, ChannName, Fun) ->
Fun(ChannName, Hookpoint).
failed_action(#{options := Opts}) -> failed_action(#{options := Opts}) ->
maps:get(failed_action, Opts). maps:get(failed_action, Opts).