diff --git a/apps/emqx_exhook/include/emqx_exhook.hrl b/apps/emqx_exhook/include/emqx_exhook.hrl index 466146212..4ed9e16cc 100644 --- a/apps/emqx_exhook/include/emqx_exhook.hrl +++ b/apps/emqx_exhook/include/emqx_exhook.hrl @@ -20,7 +20,6 @@ -define(APP, emqx_exhook). -define(HOOKS_REF_COUNTER, emqx_exhook_ref_counter). -define(HOOKS_METRICS, emqx_exhook_metrics). --define(METRICS_PRECISION, 1). -define(ENABLED_HOOKS, [ {'client.connect', {emqx_exhook_handler, on_client_connect, []}} diff --git a/apps/emqx_exhook/src/emqx_exhook_metrics.erl b/apps/emqx_exhook/src/emqx_exhook_metrics.erl index b530f1035..ae161fc4b 100644 --- a/apps/emqx_exhook/src/emqx_exhook_metrics.erl +++ b/apps/emqx_exhook/src/emqx_exhook_metrics.erl @@ -206,7 +206,8 @@ new_metrics_info() -> -spec calc_metric(non_neg_integer(), non_neg_integer()) -> non_neg_integer(). calc_metric(Val, Interval) -> - erlang:ceil(Val * ?METRICS_PRECISION / Interval). + %% the base unit of interval is milliseconds, but the rate is seconds + erlang:ceil(Val * 1000 / Interval). -spec metrics_add(metrics_info(), metrics_info()) -> metrics_info(). metrics_add(#{succeed := S1, failed := F1, rate := R1, max_rate := M1} diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 19fd7ab5b..d9e75693b 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -153,7 +153,7 @@ unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) -> ok. do_deinit(Name, ReqOpts) -> - _ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts), + _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts), ok. do_init(ChannName, ReqOpts) -> @@ -161,7 +161,7 @@ do_init(ChannName, ReqOpts) -> BrokerInfo = maps:with([version, sysdescr, uptime, datetime], maps:from_list(emqx_sys:info())), Req = #{broker => BrokerInfo}, - case do_call(ChannName, 'on_provider_loaded', Req, ReqOpts) of + case do_call(ChannName, undefined, 'on_provider_loaded', Req, ReqOpts) of {ok, InitialResp} -> try {ok, resolve_hookspec(maps:get(hooks, InitialResp, []))} @@ -255,7 +255,6 @@ hooks(#{hookspec := Hooks}) -> | {error, term()}. call(Hookpoint, Req, #{name := ChannName, options := ReqOpts, hookspec := Hooks, prefix := Prefix}) -> - GrpcFunc = hk2func(Hookpoint), case maps:get(Hookpoint, Hooks, undefined) of undefined -> ignore; Opts -> @@ -269,7 +268,8 @@ call(Hookpoint, Req, #{name := ChannName, options := ReqOpts, false -> ignore; _ -> inc_metrics(Prefix, Hookpoint), - do_call(ChannName, GrpcFunc, Req, ReqOpts) + GrpcFun = hk2func(Hookpoint), + do_call(ChannName, Hookpoint, GrpcFun, Req, ReqOpts) end end. @@ -287,29 +287,39 @@ match_topic_filter(_, []) -> match_topic_filter(TopicName, TopicFilter) -> lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter). --spec do_call(binary(), atom(), map(), map()) -> {ok, map()} | {error, term()}. -do_call(ChannName, Fun, Req, ReqOpts) -> +-spec do_call(binary(), atom(), atom(), map(), map()) -> {ok, map()} | {error, term()}. +do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) -> Options = ReqOpts#{channel => ChannName}, ?SLOG(debug, #{msg => "do_call", module => ?PB_CLIENT_MOD, function => Fun, req => Req, options => Options}), case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of {ok, Resp, Metadata} -> ?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}), + update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:succeed/2), {ok, Resp}; {error, {Code, Msg}, _Metadata} -> ?SLOG(error, #{msg => "exhook_call_error", module => ?PB_CLIENT_MOD, function => Fun, - req => Req, options => Options, code => Code, packet => Msg}), + req => Req, options => Options, code => Code, packet => Msg}), + update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2), {error, {Code, Msg}}; {error, Reason} -> ?SLOG(error, #{msg => "exhook_call_error", module => ?PB_CLIENT_MOD, function => Fun, - req => Req, options => Options, reason => Reason}), + req => Req, options => Options, reason => Reason}), + update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2), {error, Reason}; {'EXIT', {Reason, Stk}} -> ?SLOG(error, #{msg => "exhook_call_exception", module => ?PB_CLIENT_MOD, function => Fun, - req => Req, options => Options, stacktrace => Stk}), + req => Req, options => Options, stacktrace => Stk}), + update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2), {error, Reason} end. +update_metrics(undefined, _ChannName, _Fun) -> + ok; + +update_metrics(Hookpoint, ChannName, Fun) -> + Fun(ChannName, Hookpoint). + failed_action(#{options := Opts}) -> maps:get(failed_action, Opts).