refactor(emqx_resource): ingress bridge counter
Unify code paths for resource metrics by removing emqx_resource:inc_received/1 and adding emqx_resource_metrics:received_inc/1 & friends.
This commit is contained in:
parent
7e02eac3bc
commit
b9d012e072
|
@ -136,7 +136,7 @@ drop_bridge(Name) ->
|
||||||
%% When use this bridge as a data source, ?MODULE:on_message_received will be called
|
%% When use this bridge as a data source, ?MODULE:on_message_received will be called
|
||||||
%% if the bridge received msgs from the remote broker.
|
%% if the bridge received msgs from the remote broker.
|
||||||
on_message_received(Msg, HookPoint, ResId) ->
|
on_message_received(Msg, HookPoint, ResId) ->
|
||||||
emqx_resource:inc_received(ResId),
|
emqx_resource_metrics:received_inc(ResId),
|
||||||
emqx:run_hook(HookPoint, [Msg]).
|
emqx:run_hook(HookPoint, [Msg]).
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
|
|
|
@ -111,7 +111,7 @@
|
||||||
list_group_instances/1
|
list_group_instances/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([inc_received/1, apply_reply_fun/2]).
|
-export([apply_reply_fun/2]).
|
||||||
|
|
||||||
-optional_callbacks([
|
-optional_callbacks([
|
||||||
on_query/3,
|
on_query/3,
|
||||||
|
@ -467,8 +467,5 @@ apply_reply_fun(From, Result) ->
|
||||||
|
|
||||||
%% =================================================================================
|
%% =================================================================================
|
||||||
|
|
||||||
inc_received(ResId) ->
|
|
||||||
emqx_metrics_worker:inc(?RES_METRICS, ResId, 'received').
|
|
||||||
|
|
||||||
filter_instances(Filter) ->
|
filter_instances(Filter) ->
|
||||||
[Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
|
[Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
|
||||||
|
|
|
@ -55,6 +55,9 @@
|
||||||
matched_inc/1,
|
matched_inc/1,
|
||||||
matched_inc/2,
|
matched_inc/2,
|
||||||
matched_get/1,
|
matched_get/1,
|
||||||
|
received_inc/1,
|
||||||
|
received_inc/2,
|
||||||
|
received_get/1,
|
||||||
retried_inc/1,
|
retried_inc/1,
|
||||||
retried_inc/2,
|
retried_inc/2,
|
||||||
retried_get/1,
|
retried_get/1,
|
||||||
|
@ -87,6 +90,7 @@ events() ->
|
||||||
inflight,
|
inflight,
|
||||||
matched,
|
matched,
|
||||||
queuing,
|
queuing,
|
||||||
|
received,
|
||||||
retried_failed,
|
retried_failed,
|
||||||
retried_success,
|
retried_success,
|
||||||
success
|
success
|
||||||
|
@ -134,6 +138,8 @@ handle_telemetry_event(
|
||||||
emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val);
|
emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val);
|
||||||
matched ->
|
matched ->
|
||||||
emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val);
|
emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val);
|
||||||
|
received ->
|
||||||
|
emqx_metrics_worker:inc(?RES_METRICS, ID, 'received', Val);
|
||||||
retried_failed ->
|
retried_failed ->
|
||||||
emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val),
|
emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val),
|
||||||
emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val),
|
emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val),
|
||||||
|
@ -309,6 +315,16 @@ matched_inc(ID, Val) ->
|
||||||
matched_get(ID) ->
|
matched_get(ID) ->
|
||||||
emqx_metrics_worker:get(?RES_METRICS, ID, 'matched').
|
emqx_metrics_worker:get(?RES_METRICS, ID, 'matched').
|
||||||
|
|
||||||
|
%% @doc The number of messages that have been received from a bridge
|
||||||
|
received_inc(ID) ->
|
||||||
|
received_inc(ID, 1).
|
||||||
|
|
||||||
|
received_inc(ID, Val) ->
|
||||||
|
telemetry:execute([?TELEMETRY_PREFIX, received], #{counter_inc => Val}, #{resource_id => ID}).
|
||||||
|
|
||||||
|
received_get(ID) ->
|
||||||
|
emqx_metrics_worker:get(?RES_METRICS, ID, 'received').
|
||||||
|
|
||||||
%% @doc The number of times message sends have been retried
|
%% @doc The number of times message sends have been retried
|
||||||
retried_inc(ID) ->
|
retried_inc(ID) ->
|
||||||
retried_inc(ID, 1).
|
retried_inc(ID, 1).
|
||||||
|
|
Loading…
Reference in New Issue