From 8b060a75f1fb813498acaa114877937dd595d615 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 27 Dec 2022 10:28:08 -0300 Subject: [PATCH] refactor(metrics): use absolute gauge values rather than deltas https://emqx.atlassian.net/browse/EMQX-8548 Currently, we face several issues trying to keep resource metrics reasonable. For example, when a resource is re-created and has its metrics reset, but then its durable queue resumes its previous work and leads to strange (often negative) metrics. Instead using `counters` that are shared by more than one worker to manage gauges, we introduce an ETS table whose key is not only scoped by the Resource ID as before, but also by the worker ID. This way, when a worker starts/terminates, they should set their own gauges to their values (often 0 or `replayq:count` when resuming off a queue). With this scoping and initialization procedure, we'll hopefully avoid hitting those strange metrics scenarios and have better control over the gauges. --- apps/emqx/src/emqx_metrics_worker.erl | 125 ++++++++- apps/emqx/test/emqx_metrics_worker_SUITE.erl | 239 +++++++++++++++--- apps/emqx_bridge/i18n/emqx_bridge_schema.conf | 2 +- apps/emqx_bridge/src/emqx_bridge_api.erl | 7 +- .../test/emqx_bridge_mqtt_SUITE.erl | 28 +- .../src/emqx_resource_manager.erl | 3 - .../src/emqx_resource_metrics.erl | 87 +++++-- .../src/emqx_resource_worker.erl | 231 ++++++++++------- .../test/emqx_resource_SUITE.erl | 9 +- .../kafka/emqx_bridge_impl_kafka_producer.erl | 16 +- .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 57 ++++- 11 files changed, 625 insertions(+), 179 deletions(-) diff --git a/apps/emqx/src/emqx_metrics_worker.erl b/apps/emqx/src/emqx_metrics_worker.erl index ab6a0b1a6..47ecca408 100644 --- a/apps/emqx/src/emqx_metrics_worker.erl +++ b/apps/emqx/src/emqx_metrics_worker.erl @@ -18,6 +18,8 @@ -behaviour(gen_server). +-include_lib("stdlib/include/ms_transform.hrl"). + %% API functions -export([ start_link/1, @@ -30,6 +32,11 @@ inc/3, inc/4, get/3, + get_gauge/3, + set_gauge/5, + shift_gauge/5, + get_gauges/2, + delete_gauges/2, get_rate/2, get_counters/2, create_metrics/3, @@ -68,14 +75,21 @@ last5m := float() }. -type metrics() :: #{ - counters := #{atom() => integer()}, - rate := #{atom() => rate()} + counters := #{metric_name() => integer()}, + gauges := #{metric_name() => integer()}, + rate := #{metric_name() => rate()} }. -type handler_name() :: atom(). +%% metric_id() is actually a resource id -type metric_id() :: binary() | atom(). +-type metric_name() :: atom(). +-type worker_id() :: term(). -define(CntrRef(Name), {?MODULE, Name}). -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)). +-define(GAUGE_TABLE(NAME), + list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(NAME) ++ "_gauge") +). -record(rate, { max = 0 :: number(), @@ -112,11 +126,12 @@ child_spec(ChldName, Name) -> modules => [emqx_metrics_worker] }. --spec create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}. +-spec create_metrics(handler_name(), metric_id(), [metric_name()]) -> ok | {error, term()}. create_metrics(Name, Id, Metrics) -> create_metrics(Name, Id, Metrics, Metrics). --spec create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok | {error, term()}. +-spec create_metrics(handler_name(), metric_id(), [metric_name()], [metric_name()]) -> + ok | {error, term()}. create_metrics(Name, Id, Metrics, RateMetrics) -> gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}). @@ -135,7 +150,7 @@ has_metrics(Name, Id) -> _ -> true end. --spec get(handler_name(), metric_id(), atom() | integer()) -> number(). +-spec get(handler_name(), metric_id(), metric_name() | integer()) -> number(). get(Name, Id, Metric) -> case get_ref(Name, Id) of not_found -> @@ -167,16 +182,102 @@ reset_counters(Name, Id) -> -spec get_metrics(handler_name(), metric_id()) -> metrics(). get_metrics(Name, Id) -> - #{rate => get_rate(Name, Id), counters => get_counters(Name, Id)}. + #{ + rate => get_rate(Name, Id), + counters => get_counters(Name, Id), + gauges => get_gauges(Name, Id) + }. -spec inc(handler_name(), metric_id(), atom()) -> ok. inc(Name, Id, Metric) -> inc(Name, Id, Metric, 1). --spec inc(handler_name(), metric_id(), atom(), integer()) -> ok. +-spec inc(handler_name(), metric_id(), metric_name(), integer()) -> ok. inc(Name, Id, Metric, Val) -> counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val). +-spec set_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok. +set_gauge(Name, Id, WorkerId, Metric, Val) -> + Table = ?GAUGE_TABLE(Name), + try + true = ets:insert(Table, {{Id, Metric, WorkerId}, Val}), + ok + catch + error:badarg -> + ok + end. + +-spec shift_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok. +shift_gauge(Name, Id, WorkerId, Metric, Val) -> + Table = ?GAUGE_TABLE(Name), + try + _ = ets:update_counter( + Table, + {Id, Metric, WorkerId}, + Val, + {{Id, Metric, WorkerId}, 0} + ), + ok + catch + error:badarg -> + ok + end. + +-spec get_gauge(handler_name(), metric_id(), metric_name()) -> integer(). +get_gauge(Name, Id, Metric) -> + Table = ?GAUGE_TABLE(Name), + MatchSpec = + ets:fun2ms( + fun({{Id0, Metric0, _WorkerId}, Val}) when Id0 =:= Id, Metric0 =:= Metric -> + Val + end + ), + try + lists:sum(ets:select(Table, MatchSpec)) + catch + error:badarg -> + 0 + end. + +-spec get_gauges(handler_name(), metric_id()) -> map(). +get_gauges(Name, Id) -> + Table = ?GAUGE_TABLE(Name), + MatchSpec = + ets:fun2ms( + fun({{Id0, Metric, _WorkerId}, Val}) when Id0 =:= Id -> + {Metric, Val} + end + ), + try + lists:foldr( + fun({Metric, Val}, Acc) -> + maps:update_with(Metric, fun(X) -> X + Val end, Val, Acc) + end, + #{}, + ets:select(Table, MatchSpec) + ) + catch + error:badarg -> + #{} + end. + +-spec delete_gauges(handler_name(), metric_id()) -> ok. +delete_gauges(Name, Id) -> + Table = ?GAUGE_TABLE(Name), + MatchSpec = + ets:fun2ms( + fun({{Id0, _Metric, _WorkerId}, _Val}) when Id0 =:= Id -> + true + end + ), + try + _ = ets:select_delete(Table, MatchSpec), + ok + catch + error:badarg -> + ok + end. + start_link(Name) -> gen_server:start_link({local, Name}, ?MODULE, Name, []). @@ -185,6 +286,7 @@ init(Name) -> %% the rate metrics erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), persistent_term:put(?CntrRef(Name), #{}), + _ = ets:new(?GAUGE_TABLE(Name), [named_table, ordered_set, public, {write_concurrency, true}]), {ok, #state{}}. handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) -> @@ -220,7 +322,10 @@ handle_call( _From, State = #state{metric_ids = MIDs, rates = Rates} ) -> - {reply, delete_counters(get_self_name(), Id), State#state{ + Name = get_self_name(), + delete_counters(Name, Id), + delete_gauges(Name, Id), + {reply, ok, State#state{ metric_ids = sets:del_element(Id, MIDs), rates = case Rates of @@ -233,7 +338,9 @@ handle_call( _From, State = #state{rates = Rates} ) -> - {reply, reset_counters(get_self_name(), Id), State#state{ + Name = get_self_name(), + delete_gauges(Name, Id), + {reply, reset_counters(Name, Id), State#state{ rates = case Rates of undefined -> diff --git a/apps/emqx/test/emqx_metrics_worker_SUITE.erl b/apps/emqx/test/emqx_metrics_worker_SUITE.erl index 326b0be1e..3f48010c6 100644 --- a/apps/emqx/test/emqx_metrics_worker_SUITE.erl +++ b/apps/emqx/test/emqx_metrics_worker_SUITE.erl @@ -47,7 +47,8 @@ end_per_testcase(_, _Config) -> t_get_metrics(_) -> Metrics = [a, b, c], - ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics), + Id = <<"testid">>, + ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics), %% all the metrics are set to zero at start ?assertMatch( #{ @@ -56,18 +57,22 @@ t_get_metrics(_) -> b := #{current := 0.0, max := 0.0, last5m := 0.0}, c := #{current := 0.0, max := 0.0, last5m := 0.0} }, + gauges := #{}, counters := #{ a := 0, b := 0, c := 0 } }, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), ct:sleep(1500), ?LET( #{ @@ -76,27 +81,73 @@ t_get_metrics(_) -> b := #{current := CurrB, max := MaxB, last5m := _}, c := #{current := CurrC, max := MaxC, last5m := _} }, + gauges := #{ + inflight := Inflight, + queuing := Queuing + }, counters := #{ a := 1, b := 1, c := 2 } }, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>), + emqx_metrics_worker:get_metrics(?NAME, Id), { ?assert(CurrA > 0), ?assert(CurrB > 0), ?assert(CurrC > 0), ?assert(MaxA > 0), ?assert(MaxB > 0), - ?assert(MaxC > 0) + ?assert(MaxC > 0), + ?assert(Inflight == 12), + ?assert(Queuing == 9) } ), - ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, Id). + +t_clear_metrics(_Config) -> + Metrics = [a, b, c], + Id = <<"testid">>, + ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics), + ?assertMatch( + #{ + rate := #{ + a := #{current := 0.0, max := 0.0, last5m := 0.0}, + b := #{current := 0.0, max := 0.0, last5m := 0.0}, + c := #{current := 0.0, max := 0.0, last5m := 0.0} + }, + gauges := #{}, + counters := #{ + a := 0, + b := 0, + c := 0 + } + }, + emqx_metrics_worker:get_metrics(?NAME, Id) + ), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), + ct:sleep(1500), + ok = emqx_metrics_worker:clear_metrics(?NAME, Id), + ?assertEqual( + #{ + counters => #{}, + gauges => #{}, + rate => #{current => 0.0, last5m => 0.0, max => 0.0} + }, + emqx_metrics_worker:get_metrics(?NAME, Id) + ), + ok. t_reset_metrics(_) -> Metrics = [a, b, c], - ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics), + Id = <<"testid">>, + ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics), %% all the metrics are set to zero at start ?assertMatch( #{ @@ -105,20 +156,24 @@ t_reset_metrics(_) -> b := #{current := 0.0, max := 0.0, last5m := 0.0}, c := #{current := 0.0, max := 0.0, last5m := 0.0} }, + gauges := #{}, counters := #{ a := 0, b := 0, c := 0 } }, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), ct:sleep(1500), - ok = emqx_metrics_worker:reset_metrics(?NAME, <<"testid">>), + ok = emqx_metrics_worker:reset_metrics(?NAME, Id), ?LET( #{ rate := #{ @@ -126,68 +181,83 @@ t_reset_metrics(_) -> b := #{current := CurrB, max := MaxB, last5m := _}, c := #{current := CurrC, max := MaxC, last5m := _} }, + gauges := Gauges, counters := #{ a := 0, b := 0, c := 0 } }, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>), + emqx_metrics_worker:get_metrics(?NAME, Id), { ?assert(CurrA == 0), ?assert(CurrB == 0), ?assert(CurrC == 0), ?assert(MaxA == 0), ?assert(MaxB == 0), - ?assert(MaxC == 0) + ?assert(MaxC == 0), + ?assertEqual(#{}, Gauges) } ), - ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, Id). t_get_metrics_2(_) -> Metrics = [a, b, c], + Id = <<"testid">>, ok = emqx_metrics_worker:create_metrics( ?NAME, - <<"testid">>, + Id, Metrics, [a] ), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), ?assertMatch( #{ rate := Rate = #{ a := #{current := _, max := _, last5m := _} }, + gauges := #{}, counters := #{ a := 1, b := 1, c := 1 } } when map_size(Rate) =:= 1, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), - ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, Id). t_recreate_metrics(_) -> - ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a]), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), + Id = <<"testid">>, + ok = emqx_metrics_worker:create_metrics(?NAME, Id, [a]), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), ?assertMatch( #{ rate := R = #{ a := #{current := _, max := _, last5m := _} }, + gauges := #{ + inflight := 12, + queuing := 9 + }, counters := C = #{ a := 1 } } when map_size(R) == 1 andalso map_size(C) == 1, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), %% we create the metrics again, to add some counters - ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a, b, c]), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:create_metrics(?NAME, Id, [a, b, c]), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), ?assertMatch( #{ rate := R = #{ @@ -195,13 +265,17 @@ t_recreate_metrics(_) -> b := #{current := _, max := _, last5m := _}, c := #{current := _, max := _, last5m := _} }, + gauges := #{ + inflight := 12, + queuing := 9 + }, counters := C = #{ a := 1, b := 1, c := 1 } } when map_size(R) == 3 andalso map_size(C) == 3, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), - ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, Id). t_inc_matched(_) -> Metrics = ['rules.matched'], @@ -238,3 +312,102 @@ t_rate(_) -> ct:sleep(3000), ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule1">>), ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule:2">>). + +t_get_gauge(_Config) -> + Metric = 'queueing', + %% unknown handler name (inexistent table) + ?assertEqual(0, emqx_metrics_worker:get_gauge(unknown_name, unknown_id, Metric)), + %% unknown resource id + ?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, unknown_id, Metric)), + + Id = <<"some id">>, + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2), + + ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + ?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, unknown, Metric)), + + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3), + ?assertEqual(5, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 1), + ?assertEqual(4, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, Id, another_metric)), + + ok. + +t_get_gauges(_Config) -> + %% unknown handler name (inexistent table) + ?assertEqual(#{}, emqx_metrics_worker:get_gauges(unknown_name, unknown_id)), + %% unknown resource id + ?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, unknown_id)), + + Metric = 'queuing', + Id = <<"some id">>, + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2), + + ?assertEqual(#{queuing => 2}, emqx_metrics_worker:get_gauges(?NAME, Id)), + ?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, unknown)), + + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3), + ?assertEqual(#{queuing => 5}, emqx_metrics_worker:get_gauges(?NAME, Id)), + + AnotherMetric = 'inflight', + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 1), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, AnotherMetric, 10), + ?assertEqual(#{queuing => 4, inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, Id)), + + ok. + +t_delete_gauge(_Config) -> + %% unknown handler name (inexistent table) + ?assertEqual(ok, emqx_metrics_worker:delete_gauges(unknown_name, unknown_id)), + %% unknown resource id + ?assertEqual(ok, emqx_metrics_worker:delete_gauges(?NAME, unknown_id)), + + Metric = 'queuing', + AnotherMetric = 'inflight', + Id = <<"some id">>, + AnotherId = <<"another id">>, + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, AnotherMetric, 10), + ok = emqx_metrics_worker:set_gauge(?NAME, AnotherId, worker_id1, AnotherMetric, 10), + ?assertEqual(#{queuing => 5, inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, Id)), + + ?assertEqual(ok, emqx_metrics_worker:delete_gauges(?NAME, Id)), + + ?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, Id)), + ?assertEqual(#{inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, AnotherId)), + + ok. + +t_shift_gauge(_Config) -> + Metric = 'queueing', + Id = <<"some id">>, + AnotherId = <<"another id">>, + + %% unknown handler name (inexistent table) + ?assertEqual( + ok, emqx_metrics_worker:shift_gauge(unknown_name, unknown_id, worker_id0, Metric, 2) + ), + ?assertEqual(0, emqx_metrics_worker:get_gauge(unknown_name, unknown_id, Metric)), + + %% empty resource id + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id0, Metric, 2)), + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, AnotherId, worker_id0, Metric, 2)), + ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, AnotherId, Metric)), + + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id0, Metric, 3)), + ?assertEqual(5, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id1, Metric, 10)), + ?assertEqual(15, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id1, Metric, -4)), + ?assertEqual(11, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, AnotherId, Metric)), + + ok. diff --git a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf index c465ef242..b62a2ee68 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf @@ -195,7 +195,7 @@ emqx_bridge_schema { metric_sent_inflight { desc { - en: """Count of messages that were sent asynchronously but ACKs are not received.""" + en: """Count of messages that were sent asynchronously but ACKs are not yet received.""" zh: """已异步地发送但没有收到 ACK 的消息个数。""" } label: { diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 6b5e307d8..fc58bafa0 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -686,7 +686,6 @@ format_resp( format_metrics(#{ counters := #{ - 'batching' := Batched, 'dropped' := Dropped, 'dropped.other' := DroppedOther, 'dropped.queue_full' := DroppedQueueFull, @@ -694,17 +693,19 @@ format_metrics(#{ 'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, - 'queuing' := Queued, 'retried' := Retried, 'failed' := SentFailed, - 'inflight' := SentInflight, 'success' := SentSucc, 'received' := Rcvd }, + gauges := Gauges, rate := #{ matched := #{current := Rate, last5m := Rate5m, max := RateMax} } }) -> + Batched = maps:get('batching', Gauges, 0), + Queued = maps:get('queuing', Gauges, 0), + SentInflight = maps:get('inflight', Gauges, 0), ?METRICS( Batched, Dropped, diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 1bf156ed4..342384593 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -19,6 +19,7 @@ -compile(export_all). -import(emqx_dashboard_api_test_helpers, [request/4, uri/1]). +-import(emqx_common_test_helpers, [on_exit/1]). -include("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -124,6 +125,7 @@ init_per_testcase(_, Config) -> Config. end_per_testcase(_, _Config) -> clear_resources(), + emqx_common_test_helpers:call_janitor(), ok. clear_resources() -> @@ -672,6 +674,12 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"name">> := ?BRIDGE_NAME_EGRESS } = jsx:decode(Bridge), BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + on_exit(fun() -> + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + ok + end), %% we now test if the bridge works as expected LocalTopic = <>, RemoteTopic = <>, @@ -733,15 +741,20 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> %% verify the metrics of the bridge, the message should be queued {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), + Decoded1 = jsx:decode(BridgeStr1), + ?assertMatch( + Status when (Status == <<"connected">> orelse Status == <<"connecting">>), + maps:get(<<"status">>, Decoded1) + ), %% matched >= 3 because of possible retries. ?assertMatch( #{ - <<"status">> := Status, - <<"metrics">> := #{ - <<"matched">> := Matched, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2 - } - } when Matched >= 3 andalso (Status == <<"connected">> orelse Status == <<"connecting">>), - jsx:decode(BridgeStr1) + <<"matched">> := Matched, + <<"success">> := 1, + <<"failed">> := 0, + <<"queuing">> := 2 + } when Matched >= 3, + maps:get(<<"metrics">>, Decoded1) ), %% start the listener 1883 to make the bridge reconnected @@ -766,9 +779,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> %% also verify the 2 messages have been sent to the remote broker assert_mqtt_msg_received(RemoteTopic, Payload1), assert_mqtt_msg_received(RemoteTopic, Payload2), - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), ok. assert_mqtt_msg_received(Topic, Payload) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 10c501865..b5f14780d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -141,9 +141,6 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> 'dropped.resource_not_found', 'dropped.resource_stopped', 'dropped.other', - 'queuing', - 'batching', - 'inflight', 'received' ], [matched] diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index e6637b68f..921373c47 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -24,11 +24,12 @@ ]). -export([ - batching_change/2, + batching_set/3, + batching_shift/3, batching_get/1, - inflight_change/2, + inflight_set/3, inflight_get/1, - queuing_change/2, + queuing_set/3, queuing_get/1, dropped_inc/1, dropped_inc/2, @@ -114,8 +115,6 @@ handle_telemetry_event( _HandlerConfig ) -> case Event of - batching -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val); dropped_other -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val); @@ -133,12 +132,8 @@ handle_telemetry_event( emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val); failed -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val); - inflight -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val); matched -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val); - queuing -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val); retried_failed -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val), @@ -152,6 +147,34 @@ handle_telemetry_event( _ -> ok end; +handle_telemetry_event( + [?TELEMETRY_PREFIX, Event], + _Measurements = #{gauge_set := Val}, + _Metadata = #{resource_id := ID, worker_id := WorkerID}, + _HandlerConfig +) -> + case Event of + batching -> + emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val); + inflight -> + emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val); + queuing -> + emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val); + _ -> + ok + end; +handle_telemetry_event( + [?TELEMETRY_PREFIX, Event], + _Measurements = #{gauge_shift := Val}, + _Metadata = #{resource_id := ID, worker_id := WorkerID}, + _HandlerConfig +) -> + case Event of + batching -> + emqx_metrics_worker:shift_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val); + _ -> + ok + end; handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> ok. @@ -160,26 +183,48 @@ handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> %% @doc Count of messages that are currently accumulated in memory waiting for %% being sent in one batch -batching_change(ID, Val) -> - telemetry:execute([?TELEMETRY_PREFIX, batching], #{counter_inc => Val}, #{resource_id => ID}). +batching_set(ID, WorkerID, Val) -> + telemetry:execute( + [?TELEMETRY_PREFIX, batching], + #{gauge_set => Val}, + #{resource_id => ID, worker_id => WorkerID} + ). + +batching_shift(_ID, _WorkerID = undefined, _Val) -> + ok; +batching_shift(ID, WorkerID, Val) -> + telemetry:execute( + [?TELEMETRY_PREFIX, batching], + #{gauge_shift => Val}, + #{resource_id => ID, worker_id => WorkerID} + ). batching_get(ID) -> - emqx_metrics_worker:get(?RES_METRICS, ID, 'batching'). + emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'batching'). -%% @doc Count of messages that are currently queuing. [Gauge] -queuing_change(ID, Val) -> - telemetry:execute([?TELEMETRY_PREFIX, queuing], #{counter_inc => Val}, #{resource_id => ID}). +%% @doc Count of batches of messages that are currently +%% queuing. [Gauge] +queuing_set(ID, WorkerID, Val) -> + telemetry:execute( + [?TELEMETRY_PREFIX, queuing], + #{gauge_set => Val}, + #{resource_id => ID, worker_id => WorkerID} + ). queuing_get(ID) -> - emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing'). + emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'queuing'). -%% @doc Count of messages that were sent asynchronously but ACKs are not -%% received. [Gauge] -inflight_change(ID, Val) -> - telemetry:execute([?TELEMETRY_PREFIX, inflight], #{counter_inc => Val}, #{resource_id => ID}). +%% @doc Count of batches of messages that were sent asynchronously but +%% ACKs are not yet received. [Gauge] +inflight_set(ID, WorkerID, Val) -> + telemetry:execute( + [?TELEMETRY_PREFIX, inflight], + #{gauge_set => Val}, + #{resource_id => ID, worker_id => WorkerID} + ). inflight_get(ID) -> - emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight'). + emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'inflight'). %% Counters (value can only got up): %% -------------------------------------- diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 482c82f6a..44cfec1b1 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -52,7 +52,7 @@ -export([queue_item_marshaller/1, estimate_size/1]). --export([reply_after_query/6, batch_reply_after_query/6]). +-export([reply_after_query/7, batch_reply_after_query/7]). -define(Q_ITEM(REQUEST), {q_item, REQUEST}). @@ -90,13 +90,27 @@ async_query(Id, Request, Opts) -> %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> - Result = call_query(sync, Id, ?QUERY(self(), Request, false), #{}), + %% Note: since calling this function implies in bypassing the + %% buffer workers, and each buffer worker index is used when + %% collecting gauge metrics, we use this dummy index. If this + %% call ends up calling buffering functions, that's a bug and + %% would mess up the metrics anyway. `undefined' is ignored by + %% `emqx_resource_metrics:*_shift/3'. + Index = undefined, + Result = call_query(sync, Id, Index, ?QUERY(self(), Request, false), #{}), _ = handle_query_result(Id, Result, false, false), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). simple_async_query(Id, Request, ReplyFun) -> - Result = call_query(async, Id, ?QUERY(ReplyFun, Request, false), #{}), + %% Note: since calling this function implies in bypassing the + %% buffer workers, and each buffer worker index is used when + %% collecting gauge metrics, we use this dummy index. If this + %% call ends up calling buffering functions, that's a bug and + %% would mess up the metrics anyway. `undefined' is ignored by + %% `emqx_resource_metrics:*_shift/3'. + Index = undefined, + Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), #{}), _ = handle_query_result(Id, Result, false, false), Result. @@ -133,9 +147,11 @@ init({Id, Index, Opts}) -> false -> undefined end, - emqx_resource_metrics:queuing_change(Id, queue_count(Queue)), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)), + emqx_resource_metrics:batching_set(Id, Index, 0), + emqx_resource_metrics:inflight_set(Id, Index, 0), InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), - ok = inflight_new(Name, InfltWinSZ), + ok = inflight_new(Name, InfltWinSZ, Id, Index), HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), St = #{ id => Id, @@ -158,10 +174,12 @@ running(cast, resume, _St) -> keep_state_and_data; running(cast, block, St) -> {next_state, blocked, St}; -running(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when +running( + cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, index := Index, queue := Q} = St +) when is_list(Batch) -> - Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), + Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]), {next_state, blocked, St#{queue := Q1}}; running({call, From}, {query, Request, _Opts}, St) -> query_or_acc(From, Request, St); @@ -180,28 +198,39 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) -> {keep_state_and_data, {state_timeout, ResumeT, resume}}; blocked(cast, block, _St) -> keep_state_and_data; -blocked(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when +blocked( + cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, index := Index, queue := Q} = St +) when is_list(Batch) -> - Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), + Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]), {keep_state, St#{queue := Q1}}; blocked(cast, resume, St) -> do_resume(St); blocked(state_timeout, resume, St) -> do_resume(St); -blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) -> +blocked({call, From}, {query, Request, _Opts}, #{id := Id, index := Index, queue := Q} = St) -> Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), _ = reply_caller(Id, ?REPLY(From, Request, false, Error)), - {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request, false))])}}; -blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) -> + {keep_state, St#{ + queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(From, Request, false))]) + }}; +blocked(cast, {query, Request, Opts}, #{id := Id, index := Index, queue := Q} = St) -> ReplyFun = maps:get(async_reply_fun, Opts, undefined), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), _ = reply_caller(Id, ?REPLY(ReplyFun, Request, false, Error)), {keep_state, St#{ - queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))]) + queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))]) }}. -terminate(_Reason, #{id := Id, index := Index}) -> +terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> + GaugeFns = + [ + fun emqx_resource_metrics:batching_set/3, + fun emqx_resource_metrics:inflight_set/3 + ], + lists:foreach(fun(Fn) -> Fn(Id, Index, 0) end, GaugeFns), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)), gproc_pool:disconnect_worker(Id, {Id, Index}). code_change(_OldVsn, State, _Extra) -> @@ -240,24 +269,33 @@ do_resume(#{id := Id, name := Name} = St) -> retry_queue(#{queue := undefined} = St) -> {next_state, running, St}; -retry_queue(#{queue := Q, id := Id, enable_batch := false, resume_interval := ResumeT} = St) -> +retry_queue( + #{ + queue := Q, + id := Id, + index := Index, + enable_batch := false, + resume_interval := ResumeT + } = St +) -> case get_first_n_from_queue(Q, 1) of [] -> {next_state, running, St}; [?QUERY(_, Request, HasSent) = Query] -> QueryOpts = #{inflight_name => maps:get(name, St)}, - Result = call_query(configured, Id, Query, QueryOpts), + Result = call_query(configured, Id, Index, Query, QueryOpts), case reply_caller(Id, ?REPLY(undefined, Request, HasSent, Result)) of true -> {keep_state, St, {state_timeout, ResumeT, resume}}; false -> - retry_queue(St#{queue := drop_head(Q, Id)}) + retry_queue(St#{queue := drop_head(Q, Id, Index)}) end end; retry_queue( #{ queue := Q, id := Id, + index := Index, enable_batch := true, batch_size := BatchSize, resume_interval := ResumeT @@ -268,7 +306,7 @@ retry_queue( {next_state, running, St}; Batch0 -> QueryOpts = #{inflight_name => maps:get(name, St)}, - Result = call_query(configured, Id, Batch0, QueryOpts), + Result = call_query(configured, Id, Index, Batch0, QueryOpts), %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue, %% we now change the 'from' field to 'undefined' so it will not reply the caller again. Batch = [?QUERY(undefined, Request, HasSent) || ?QUERY(_, Request, HasSent) <- Batch0], @@ -276,41 +314,55 @@ retry_queue( true -> {keep_state, St, {state_timeout, ResumeT, resume}}; false -> - retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id)}) + retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id, Index)}) end end. retry_inflight_sync( - Id, Ref, ?QUERY(_, _, HasSent) = Query, Name, #{resume_interval := ResumeT} = St0 + Id, + Ref, + ?QUERY(_, _, HasSent) = Query, + Name, + #{index := Index, resume_interval := ResumeT} = St0 ) -> - Result = call_query(sync, Id, Query, #{}), + Result = call_query(sync, Id, Index, Query, #{}), case handle_query_result(Id, Result, HasSent, false) of %% Send failed because resource down true -> {keep_state, St0, {state_timeout, ResumeT, resume}}; %% Send ok or failed but the resource is working false -> - inflight_drop(Name, Ref), + inflight_drop(Name, Ref, Id, Index), do_resume(St0) end. -query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> +query_or_acc( + From, + Request, + #{ + enable_batch := true, + acc := Acc, + acc_left := Left, + index := Index, + id := Id + } = St0 +) -> Acc1 = [?QUERY(From, Request, false) | Acc], - emqx_resource_metrics:batching_change(Id, 1), + emqx_resource_metrics:batching_shift(Id, Index, 1), St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); false -> {keep_state, ensure_flush_timer(St)} end; -query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) -> +query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, index := Index} = St) -> QueryOpts = #{ inflight_name => maps:get(name, St) }, - Result = call_query(configured, Id, ?QUERY(From, Request, false), QueryOpts), + Result = call_query(configured, Id, Index, ?QUERY(From, Request, false), QueryOpts), case reply_caller(Id, ?REPLY(From, Request, false, Result)) of true -> Query = ?QUERY(From, Request, false), - {next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}}; + {next_state, blocked, St#{queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query)])}}; false -> {keep_state, St} end. @@ -320,6 +372,7 @@ flush(#{acc := []} = St) -> flush( #{ id := Id, + index := Index, acc := Batch0, batch_size := Size, queue := Q0 @@ -329,12 +382,12 @@ flush( QueryOpts = #{ inflight_name => maps:get(name, St) }, - emqx_resource_metrics:batching_change(Id, -length(Batch)), - Result = call_query(configured, Id, Batch, QueryOpts), + emqx_resource_metrics:batching_shift(Id, Index, -length(Batch)), + Result = call_query(configured, Id, Index, Batch, QueryOpts), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of true -> - Q1 = maybe_append_queue(Id, Q0, [?Q_ITEM(Query) || Query <- Batch]), + Q1 = maybe_append_queue(Id, Index, Q0, [?Q_ITEM(Query) || Query <- Batch]), {next_state, blocked, St1#{queue := Q1}}; false -> {keep_state, St1} @@ -412,7 +465,7 @@ handle_query_result(Id, Result, HasSent, BlockWorker) -> inc_sent_success(Id, HasSent), BlockWorker. -call_query(QM0, Id, Query, QueryOpts) -> +call_query(QM0, Id, Index, Query, QueryOpts) -> ?tp(call_query_enter, #{id => Id, query => Query}), case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} -> @@ -423,7 +476,7 @@ call_query(QM0, Id, Query, QueryOpts) -> end, CM = maps:get(callback_mode, Data), emqx_resource_metrics:matched_inc(Id), - apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts); + apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> emqx_resource_metrics:matched_inc(Id), ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); @@ -452,10 +505,10 @@ call_query(QM0, Id, Query, QueryOpts) -> end ). -apply_query_fun(sync, Mod, Id, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); -apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> +apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), ?APPLY_RESOURCE( @@ -464,21 +517,20 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) true -> {async_return, inflight_full}; false -> - ok = emqx_resource_metrics:inflight_change(Id, 1), - ReplyFun = fun ?MODULE:reply_after_query/6, + ReplyFun = fun ?MODULE:reply_after_query/7, Ref = make_message_ref(), - Args = [self(), Id, Name, Ref, Query], - ok = inflight_append(Name, Ref, Query), + Args = [self(), Id, Index, Name, Ref, Query], + ok = inflight_append(Name, Ref, Query, Id, Index), Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), {async_return, Result} end, Request ); -apply_query_fun(sync, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request, _) <- Batch], ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); -apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> +apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), ?APPLY_RESOURCE( @@ -487,55 +539,46 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) true -> {async_return, inflight_full}; false -> - BatchLen = length(Batch), - ok = emqx_resource_metrics:inflight_change(Id, BatchLen), - ReplyFun = fun ?MODULE:batch_reply_after_query/6, + ReplyFun = fun ?MODULE:batch_reply_after_query/7, Ref = make_message_ref(), - Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, + Args = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]}, Requests = [Request || ?QUERY(_From, Request, _) <- Batch], - ok = inflight_append(Name, Ref, Batch), + ok = inflight_append(Name, Ref, Batch, Id, Index), Result = Mod:on_batch_query_async(Id, Requests, Args, ResSt), {async_return, Result} end, Batch ). -reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request, HasSent), Result) -> - %% NOTE: 'inflight' is message count that sent async but no ACK received, - %% NOT the message number ququed in the inflight window. - emqx_resource_metrics:inflight_change(Id, -1), +reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasSent), Result) -> + %% NOTE: 'inflight' is the count of messages that were sent async + %% but received no ACK, NOT the number of messages queued in the + %% inflight window. case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of true -> - %% we marked these messages are 'queuing' although they are actually - %% keeped in inflight window, not replayq - emqx_resource_metrics:queuing_change(Id, 1), ?MODULE:block(Pid); false -> - drop_inflight_and_resume(Pid, Name, Ref) + drop_inflight_and_resume(Pid, Name, Ref, Id, Index) end. -batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> - %% NOTE: 'inflight' is message count that sent async but no ACK received, - %% NOT the message number ququed in the inflight window. - BatchLen = length(Batch), - emqx_resource_metrics:inflight_change(Id, -BatchLen), +batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) -> + %% NOTE: 'inflight' is the count of messages that were sent async + %% but received no ACK, NOT the number of messages queued in the + %% inflight window. case batch_reply_caller(Id, Result, Batch) of true -> - %% we marked these messages are 'queuing' although they are actually - %% kept in inflight window, not replayq - emqx_resource_metrics:queuing_change(Id, BatchLen), ?MODULE:block(Pid); false -> - drop_inflight_and_resume(Pid, Name, Ref) + drop_inflight_and_resume(Pid, Name, Ref, Id, Index) end. -drop_inflight_and_resume(Pid, Name, Ref) -> +drop_inflight_and_resume(Pid, Name, Ref, Id, Index) -> case inflight_is_full(Name) of true -> - inflight_drop(Name, Ref), + inflight_drop(Name, Ref, Id, Index), ?MODULE:resume(Pid); false -> - inflight_drop(Name, Ref) + inflight_drop(Name, Ref, Id, Index) end. %%============================================================================== @@ -548,10 +591,10 @@ queue_item_marshaller(Bin) when is_binary(Bin) -> estimate_size(QItem) -> size(queue_item_marshaller(QItem)). -maybe_append_queue(Id, undefined, _Items) -> +maybe_append_queue(Id, _Index, undefined, _Items) -> emqx_resource_metrics:dropped_queue_not_enabled_inc(Id), undefined; -maybe_append_queue(Id, Q, Items) -> +maybe_append_queue(Id, Index, Q, Items) -> Q2 = case replayq:overflow(Q) of Overflow when Overflow =< 0 -> @@ -561,13 +604,13 @@ maybe_append_queue(Id, Q, Items) -> {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), - emqx_resource_metrics:queuing_change(Id, -Dropped), emqx_resource_metrics:dropped_queue_full_inc(Id), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 end, - emqx_resource_metrics:queuing_change(Id, 1), - replayq:append(Q2, Items). + Q3 = replayq:append(Q2, Items), + emqx_resource_metrics:queuing_set(Id, Index, replayq:count(Q3)), + Q3. get_first_n_from_queue(Q, N) -> get_first_n_from_queue(Q, N, []). @@ -580,23 +623,23 @@ get_first_n_from_queue(Q, N, Acc) when N > 0 -> ?Q_ITEM(Query) -> get_first_n_from_queue(Q, N - 1, [Query | Acc]) end. -drop_first_n_from_queue(Q, 0, _Id) -> +drop_first_n_from_queue(Q, 0, _Id, _Index) -> Q; -drop_first_n_from_queue(Q, N, Id) when N > 0 -> - drop_first_n_from_queue(drop_head(Q, Id), N - 1, Id). +drop_first_n_from_queue(Q, N, Id, Index) when N > 0 -> + drop_first_n_from_queue(drop_head(Q, Id, Index), N - 1, Id, Index). -drop_head(Q, Id) -> - {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), - ok = replayq:ack(Q1, AckRef), - emqx_resource_metrics:queuing_change(Id, -1), - Q1. +drop_head(Q, Id, Index) -> + {NewQ, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), + ok = replayq:ack(NewQ, AckRef), + emqx_resource_metrics:queuing_set(Id, Index, replayq:count(NewQ)), + NewQ. %%============================================================================== %% the inflight queue for async query -define(SIZE_REF, -1). -inflight_new(Name, InfltWinSZ) -> +inflight_new(Name, InfltWinSZ, Id, Index) -> _ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]), - inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}), + inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}, Id, Index), ok. inflight_get_first(Name) -> @@ -617,27 +660,39 @@ inflight_is_full(undefined) -> false; inflight_is_full(Name) -> [{_, {max_size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF), + Size = inflight_size(Name), + Size >= MaxSize. + +inflight_size(Name) -> + %% Note: we subtract 1 because there's a metadata row that hold + %% the maximum size value. + MetadataRowCount = 1, case ets:info(Name, size) of - Size when Size > MaxSize -> true; - _ -> false + undefined -> 0; + Size -> max(0, Size - MetadataRowCount) end. -inflight_append(undefined, _Ref, _Query) -> +inflight_append(undefined, _Ref, _Query, _Id, _Index) -> ok; -inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch) -> +inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch, Id, Index) -> ets:insert(Name, {Ref, [?QUERY(From, Req, true) || ?QUERY(From, Req, _) <- Batch]}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)), ok; -inflight_append(Name, Ref, ?QUERY(From, Req, _)) -> +inflight_append(Name, Ref, ?QUERY(From, Req, _), Id, Index) -> ets:insert(Name, {Ref, ?QUERY(From, Req, true)}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)), ok; -inflight_append(Name, Ref, Data) -> +inflight_append(Name, Ref, Data, _Id, _Index) -> ets:insert(Name, {Ref, Data}), + %% this is a metadata row being inserted; therefore, we don't bump + %% the inflight metric. ok. -inflight_drop(undefined, _) -> +inflight_drop(undefined, _, _Id, _Index) -> ok; -inflight_drop(Name, Ref) -> +inflight_drop(Name, Ref, Id, Index) -> ets:delete(Name, Ref), + emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)), ok. %%============================================================================== diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 107ca2a93..8a8b5aec2 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -39,6 +39,7 @@ groups() -> init_per_testcase(_, Config) -> emqx_connector_demo:set_callback_mode(always_sync), Config. + end_per_testcase(_, _Config) -> _ = emqx_resource:remove(?ID). @@ -503,7 +504,10 @@ t_stop_start(_) -> ), %% add some metrics to test their persistence - emqx_resource_metrics:batching_change(?ID, 5), + WorkerID0 = <<"worker:0">>, + WorkerID1 = <<"worker:1">>, + emqx_resource_metrics:batching_set(?ID, WorkerID0, 2), + emqx_resource_metrics:batching_set(?ID, WorkerID1, 3), ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)), {ok, _} = emqx_resource:check_and_recreate( @@ -537,7 +541,8 @@ t_stop_start(_) -> ?assert(is_process_alive(Pid1)), %% now stop while resetting the metrics - emqx_resource_metrics:batching_change(?ID, 5), + emqx_resource_metrics:batching_set(?ID, WorkerID0, 1), + emqx_resource_metrics:batching_set(?ID, WorkerID1, 4), ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)), ok = emqx_resource:stop(?ID), ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)), diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 6145716f2..a4ee994b5 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -315,11 +315,11 @@ handle_telemetry_event( emqx_resource_metrics:dropped_queue_full_inc(ID, Val); handle_telemetry_event( [wolff, queuing], - #{counter_inc := Val}, - #{bridge_id := ID}, + #{counter_inc := _Val}, + #{bridge_id := ID, partition_id := PartitionID}, _HandlerConfig -) when is_integer(Val) -> - emqx_resource_metrics:queuing_change(ID, Val); +) when is_integer(_Val) -> + emqx_resource_metrics:queuing_set(ID, PartitionID, 0); handle_telemetry_event( [wolff, retried], #{counter_inc := Val}, @@ -336,11 +336,11 @@ handle_telemetry_event( emqx_resource_metrics:failed_inc(ID, Val); handle_telemetry_event( [wolff, inflight], - #{counter_inc := Val}, - #{bridge_id := ID}, + #{counter_inc := _Val}, + #{bridge_id := ID, partition_id := PartitionID}, _HandlerConfig -) when is_integer(Val) -> - emqx_resource_metrics:inflight_change(ID, Val); +) when is_integer(_Val) -> + emqx_resource_metrics:inflight_set(ID, PartitionID, 0); handle_telemetry_event( [wolff, retried_failed], #{counter_inc := Val}, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 0f4500a7d..4912dca30 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -124,6 +124,7 @@ init_per_testcase(TestCase, Config0) when delete_all_bridges(), Tid = install_telemetry_handler(TestCase), Config = generate_config(Config0), + put(telemetry_table, Tid), [{telemetry_table, Tid} | Config]; false -> {skip, no_batching} @@ -133,6 +134,7 @@ init_per_testcase(TestCase, Config0) -> delete_all_bridges(), Tid = install_telemetry_handler(TestCase), Config = generate_config(Config0), + put(telemetry_table, Tid), [{telemetry_table, Tid} | Config]. end_per_testcase(_TestCase, _Config) -> @@ -393,7 +395,11 @@ assert_metrics(ExpectedMetrics, ResourceId) -> maps:keys(ExpectedMetrics) ), CurrentMetrics = current_metrics(ResourceId), - ?assertEqual(ExpectedMetrics, Metrics, #{current_metrics => CurrentMetrics}), + TelemetryTable = get(telemetry_table), + RecordedEvents = ets:tab2list(TelemetryTable), + ?assertEqual(ExpectedMetrics, Metrics, #{ + current_metrics => CurrentMetrics, recorded_events => RecordedEvents + }), ok. assert_empty_metrics(ResourceId) -> @@ -554,6 +560,7 @@ t_publish_success(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), + QueryMode = ?config(query_mode, Config), Topic = <<"t/topic">>, ?check_trace( create_bridge(Config), @@ -582,6 +589,17 @@ t_publish_success(Config) -> ), %% to avoid test flakiness wait_telemetry_event(TelemetryTable, success, ResourceId), + ExpectedInflightEvents = + case QueryMode of + sync -> 1; + async -> 3 + end, + wait_telemetry_event( + TelemetryTable, + inflight, + ResourceId, + #{n_events => ExpectedInflightEvents, timeout => 5_000} + ), assert_metrics( #{ batching => 0, @@ -601,6 +619,7 @@ t_publish_success_local_topic(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), + QueryMode = ?config(query_mode, Config), LocalTopic = <<"local/topic">>, {ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}), assert_empty_metrics(ResourceId), @@ -619,6 +638,17 @@ t_publish_success_local_topic(Config) -> ), %% to avoid test flakiness wait_telemetry_event(TelemetryTable, success, ResourceId), + ExpectedInflightEvents = + case QueryMode of + sync -> 1; + async -> 3 + end, + wait_telemetry_event( + TelemetryTable, + inflight, + ResourceId, + #{n_events => ExpectedInflightEvents, timeout => 5_000} + ), assert_metrics( #{ batching => 0, @@ -649,6 +679,7 @@ t_publish_templated(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), + QueryMode = ?config(query_mode, Config), Topic = <<"t/topic">>, PayloadTemplate = << "{\"payload\": \"${payload}\"," @@ -694,6 +725,17 @@ t_publish_templated(Config) -> ), %% to avoid test flakiness wait_telemetry_event(TelemetryTable, success, ResourceId), + ExpectedInflightEvents = + case QueryMode of + sync -> 1; + async -> 3 + end, + wait_telemetry_event( + TelemetryTable, + inflight, + ResourceId, + #{n_events => ExpectedInflightEvents, timeout => 5_000} + ), assert_metrics( #{ batching => 0, @@ -1053,7 +1095,7 @@ do_econnrefused_or_timeout_test(Config, Error) -> %% response expired, this succeeds. {econnrefused, async, _} -> wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{ - timeout => 10_000, n_events => 2 + timeout => 10_000, n_events => 1 }), CurrentMetrics = current_metrics(ResourceId), RecordedEvents = ets:tab2list(TelemetryTable), @@ -1290,6 +1332,17 @@ t_unrecoverable_error(Config) -> end ), wait_telemetry_event(TelemetryTable, failed, ResourceId), + ExpectedInflightEvents = + case QueryMode of + sync -> 1; + async -> 3 + end, + wait_telemetry_event( + TelemetryTable, + inflight, + ResourceId, + #{n_events => ExpectedInflightEvents, timeout => 5_000} + ), assert_metrics( #{ batching => 0,