diff --git a/apps/emqx/src/emqx_metrics_worker.erl b/apps/emqx/src/emqx_metrics_worker.erl index ab6a0b1a6..47ecca408 100644 --- a/apps/emqx/src/emqx_metrics_worker.erl +++ b/apps/emqx/src/emqx_metrics_worker.erl @@ -18,6 +18,8 @@ -behaviour(gen_server). +-include_lib("stdlib/include/ms_transform.hrl"). + %% API functions -export([ start_link/1, @@ -30,6 +32,11 @@ inc/3, inc/4, get/3, + get_gauge/3, + set_gauge/5, + shift_gauge/5, + get_gauges/2, + delete_gauges/2, get_rate/2, get_counters/2, create_metrics/3, @@ -68,14 +75,21 @@ last5m := float() }. -type metrics() :: #{ - counters := #{atom() => integer()}, - rate := #{atom() => rate()} + counters := #{metric_name() => integer()}, + gauges := #{metric_name() => integer()}, + rate := #{metric_name() => rate()} }. -type handler_name() :: atom(). +%% metric_id() is actually a resource id -type metric_id() :: binary() | atom(). +-type metric_name() :: atom(). +-type worker_id() :: term(). -define(CntrRef(Name), {?MODULE, Name}). -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)). +-define(GAUGE_TABLE(NAME), + list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(NAME) ++ "_gauge") +). -record(rate, { max = 0 :: number(), @@ -112,11 +126,12 @@ child_spec(ChldName, Name) -> modules => [emqx_metrics_worker] }. --spec create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}. +-spec create_metrics(handler_name(), metric_id(), [metric_name()]) -> ok | {error, term()}. create_metrics(Name, Id, Metrics) -> create_metrics(Name, Id, Metrics, Metrics). --spec create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok | {error, term()}. +-spec create_metrics(handler_name(), metric_id(), [metric_name()], [metric_name()]) -> + ok | {error, term()}. create_metrics(Name, Id, Metrics, RateMetrics) -> gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}). @@ -135,7 +150,7 @@ has_metrics(Name, Id) -> _ -> true end. --spec get(handler_name(), metric_id(), atom() | integer()) -> number(). +-spec get(handler_name(), metric_id(), metric_name() | integer()) -> number(). get(Name, Id, Metric) -> case get_ref(Name, Id) of not_found -> @@ -167,16 +182,102 @@ reset_counters(Name, Id) -> -spec get_metrics(handler_name(), metric_id()) -> metrics(). get_metrics(Name, Id) -> - #{rate => get_rate(Name, Id), counters => get_counters(Name, Id)}. + #{ + rate => get_rate(Name, Id), + counters => get_counters(Name, Id), + gauges => get_gauges(Name, Id) + }. -spec inc(handler_name(), metric_id(), atom()) -> ok. inc(Name, Id, Metric) -> inc(Name, Id, Metric, 1). --spec inc(handler_name(), metric_id(), atom(), integer()) -> ok. +-spec inc(handler_name(), metric_id(), metric_name(), integer()) -> ok. inc(Name, Id, Metric, Val) -> counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val). +-spec set_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok. +set_gauge(Name, Id, WorkerId, Metric, Val) -> + Table = ?GAUGE_TABLE(Name), + try + true = ets:insert(Table, {{Id, Metric, WorkerId}, Val}), + ok + catch + error:badarg -> + ok + end. + +-spec shift_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok. +shift_gauge(Name, Id, WorkerId, Metric, Val) -> + Table = ?GAUGE_TABLE(Name), + try + _ = ets:update_counter( + Table, + {Id, Metric, WorkerId}, + Val, + {{Id, Metric, WorkerId}, 0} + ), + ok + catch + error:badarg -> + ok + end. + +-spec get_gauge(handler_name(), metric_id(), metric_name()) -> integer(). +get_gauge(Name, Id, Metric) -> + Table = ?GAUGE_TABLE(Name), + MatchSpec = + ets:fun2ms( + fun({{Id0, Metric0, _WorkerId}, Val}) when Id0 =:= Id, Metric0 =:= Metric -> + Val + end + ), + try + lists:sum(ets:select(Table, MatchSpec)) + catch + error:badarg -> + 0 + end. + +-spec get_gauges(handler_name(), metric_id()) -> map(). +get_gauges(Name, Id) -> + Table = ?GAUGE_TABLE(Name), + MatchSpec = + ets:fun2ms( + fun({{Id0, Metric, _WorkerId}, Val}) when Id0 =:= Id -> + {Metric, Val} + end + ), + try + lists:foldr( + fun({Metric, Val}, Acc) -> + maps:update_with(Metric, fun(X) -> X + Val end, Val, Acc) + end, + #{}, + ets:select(Table, MatchSpec) + ) + catch + error:badarg -> + #{} + end. + +-spec delete_gauges(handler_name(), metric_id()) -> ok. +delete_gauges(Name, Id) -> + Table = ?GAUGE_TABLE(Name), + MatchSpec = + ets:fun2ms( + fun({{Id0, _Metric, _WorkerId}, _Val}) when Id0 =:= Id -> + true + end + ), + try + _ = ets:select_delete(Table, MatchSpec), + ok + catch + error:badarg -> + ok + end. + start_link(Name) -> gen_server:start_link({local, Name}, ?MODULE, Name, []). @@ -185,6 +286,7 @@ init(Name) -> %% the rate metrics erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), persistent_term:put(?CntrRef(Name), #{}), + _ = ets:new(?GAUGE_TABLE(Name), [named_table, ordered_set, public, {write_concurrency, true}]), {ok, #state{}}. handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) -> @@ -220,7 +322,10 @@ handle_call( _From, State = #state{metric_ids = MIDs, rates = Rates} ) -> - {reply, delete_counters(get_self_name(), Id), State#state{ + Name = get_self_name(), + delete_counters(Name, Id), + delete_gauges(Name, Id), + {reply, ok, State#state{ metric_ids = sets:del_element(Id, MIDs), rates = case Rates of @@ -233,7 +338,9 @@ handle_call( _From, State = #state{rates = Rates} ) -> - {reply, reset_counters(get_self_name(), Id), State#state{ + Name = get_self_name(), + delete_gauges(Name, Id), + {reply, reset_counters(Name, Id), State#state{ rates = case Rates of undefined -> diff --git a/apps/emqx/test/emqx_metrics_worker_SUITE.erl b/apps/emqx/test/emqx_metrics_worker_SUITE.erl index 326b0be1e..3f48010c6 100644 --- a/apps/emqx/test/emqx_metrics_worker_SUITE.erl +++ b/apps/emqx/test/emqx_metrics_worker_SUITE.erl @@ -47,7 +47,8 @@ end_per_testcase(_, _Config) -> t_get_metrics(_) -> Metrics = [a, b, c], - ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics), + Id = <<"testid">>, + ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics), %% all the metrics are set to zero at start ?assertMatch( #{ @@ -56,18 +57,22 @@ t_get_metrics(_) -> b := #{current := 0.0, max := 0.0, last5m := 0.0}, c := #{current := 0.0, max := 0.0, last5m := 0.0} }, + gauges := #{}, counters := #{ a := 0, b := 0, c := 0 } }, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), ct:sleep(1500), ?LET( #{ @@ -76,27 +81,73 @@ t_get_metrics(_) -> b := #{current := CurrB, max := MaxB, last5m := _}, c := #{current := CurrC, max := MaxC, last5m := _} }, + gauges := #{ + inflight := Inflight, + queuing := Queuing + }, counters := #{ a := 1, b := 1, c := 2 } }, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>), + emqx_metrics_worker:get_metrics(?NAME, Id), { ?assert(CurrA > 0), ?assert(CurrB > 0), ?assert(CurrC > 0), ?assert(MaxA > 0), ?assert(MaxB > 0), - ?assert(MaxC > 0) + ?assert(MaxC > 0), + ?assert(Inflight == 12), + ?assert(Queuing == 9) } ), - ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, Id). + +t_clear_metrics(_Config) -> + Metrics = [a, b, c], + Id = <<"testid">>, + ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics), + ?assertMatch( + #{ + rate := #{ + a := #{current := 0.0, max := 0.0, last5m := 0.0}, + b := #{current := 0.0, max := 0.0, last5m := 0.0}, + c := #{current := 0.0, max := 0.0, last5m := 0.0} + }, + gauges := #{}, + counters := #{ + a := 0, + b := 0, + c := 0 + } + }, + emqx_metrics_worker:get_metrics(?NAME, Id) + ), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), + ct:sleep(1500), + ok = emqx_metrics_worker:clear_metrics(?NAME, Id), + ?assertEqual( + #{ + counters => #{}, + gauges => #{}, + rate => #{current => 0.0, last5m => 0.0, max => 0.0} + }, + emqx_metrics_worker:get_metrics(?NAME, Id) + ), + ok. t_reset_metrics(_) -> Metrics = [a, b, c], - ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics), + Id = <<"testid">>, + ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics), %% all the metrics are set to zero at start ?assertMatch( #{ @@ -105,20 +156,24 @@ t_reset_metrics(_) -> b := #{current := 0.0, max := 0.0, last5m := 0.0}, c := #{current := 0.0, max := 0.0, last5m := 0.0} }, + gauges := #{}, counters := #{ a := 0, b := 0, c := 0 } }, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), ct:sleep(1500), - ok = emqx_metrics_worker:reset_metrics(?NAME, <<"testid">>), + ok = emqx_metrics_worker:reset_metrics(?NAME, Id), ?LET( #{ rate := #{ @@ -126,68 +181,83 @@ t_reset_metrics(_) -> b := #{current := CurrB, max := MaxB, last5m := _}, c := #{current := CurrC, max := MaxC, last5m := _} }, + gauges := Gauges, counters := #{ a := 0, b := 0, c := 0 } }, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>), + emqx_metrics_worker:get_metrics(?NAME, Id), { ?assert(CurrA == 0), ?assert(CurrB == 0), ?assert(CurrC == 0), ?assert(MaxA == 0), ?assert(MaxB == 0), - ?assert(MaxC == 0) + ?assert(MaxC == 0), + ?assertEqual(#{}, Gauges) } ), - ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, Id). t_get_metrics_2(_) -> Metrics = [a, b, c], + Id = <<"testid">>, ok = emqx_metrics_worker:create_metrics( ?NAME, - <<"testid">>, + Id, Metrics, [a] ), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), ?assertMatch( #{ rate := Rate = #{ a := #{current := _, max := _, last5m := _} }, + gauges := #{}, counters := #{ a := 1, b := 1, c := 1 } } when map_size(Rate) =:= 1, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), - ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, Id). t_recreate_metrics(_) -> - ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a]), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), + Id = <<"testid">>, + ok = emqx_metrics_worker:create_metrics(?NAME, Id, [a]), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), ?assertMatch( #{ rate := R = #{ a := #{current := _, max := _, last5m := _} }, + gauges := #{ + inflight := 12, + queuing := 9 + }, counters := C = #{ a := 1 } } when map_size(R) == 1 andalso map_size(C) == 1, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), %% we create the metrics again, to add some counters - ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a, b, c]), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:create_metrics(?NAME, Id, [a, b, c]), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), ?assertMatch( #{ rate := R = #{ @@ -195,13 +265,17 @@ t_recreate_metrics(_) -> b := #{current := _, max := _, last5m := _}, c := #{current := _, max := _, last5m := _} }, + gauges := #{ + inflight := 12, + queuing := 9 + }, counters := C = #{ a := 1, b := 1, c := 1 } } when map_size(R) == 3 andalso map_size(C) == 3, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), - ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, Id). t_inc_matched(_) -> Metrics = ['rules.matched'], @@ -238,3 +312,102 @@ t_rate(_) -> ct:sleep(3000), ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule1">>), ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule:2">>). + +t_get_gauge(_Config) -> + Metric = 'queueing', + %% unknown handler name (inexistent table) + ?assertEqual(0, emqx_metrics_worker:get_gauge(unknown_name, unknown_id, Metric)), + %% unknown resource id + ?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, unknown_id, Metric)), + + Id = <<"some id">>, + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2), + + ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + ?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, unknown, Metric)), + + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3), + ?assertEqual(5, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 1), + ?assertEqual(4, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, Id, another_metric)), + + ok. + +t_get_gauges(_Config) -> + %% unknown handler name (inexistent table) + ?assertEqual(#{}, emqx_metrics_worker:get_gauges(unknown_name, unknown_id)), + %% unknown resource id + ?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, unknown_id)), + + Metric = 'queuing', + Id = <<"some id">>, + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2), + + ?assertEqual(#{queuing => 2}, emqx_metrics_worker:get_gauges(?NAME, Id)), + ?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, unknown)), + + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3), + ?assertEqual(#{queuing => 5}, emqx_metrics_worker:get_gauges(?NAME, Id)), + + AnotherMetric = 'inflight', + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 1), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, AnotherMetric, 10), + ?assertEqual(#{queuing => 4, inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, Id)), + + ok. + +t_delete_gauge(_Config) -> + %% unknown handler name (inexistent table) + ?assertEqual(ok, emqx_metrics_worker:delete_gauges(unknown_name, unknown_id)), + %% unknown resource id + ?assertEqual(ok, emqx_metrics_worker:delete_gauges(?NAME, unknown_id)), + + Metric = 'queuing', + AnotherMetric = 'inflight', + Id = <<"some id">>, + AnotherId = <<"another id">>, + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, AnotherMetric, 10), + ok = emqx_metrics_worker:set_gauge(?NAME, AnotherId, worker_id1, AnotherMetric, 10), + ?assertEqual(#{queuing => 5, inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, Id)), + + ?assertEqual(ok, emqx_metrics_worker:delete_gauges(?NAME, Id)), + + ?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, Id)), + ?assertEqual(#{inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, AnotherId)), + + ok. + +t_shift_gauge(_Config) -> + Metric = 'queueing', + Id = <<"some id">>, + AnotherId = <<"another id">>, + + %% unknown handler name (inexistent table) + ?assertEqual( + ok, emqx_metrics_worker:shift_gauge(unknown_name, unknown_id, worker_id0, Metric, 2) + ), + ?assertEqual(0, emqx_metrics_worker:get_gauge(unknown_name, unknown_id, Metric)), + + %% empty resource id + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id0, Metric, 2)), + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, AnotherId, worker_id0, Metric, 2)), + ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, AnotherId, Metric)), + + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id0, Metric, 3)), + ?assertEqual(5, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id1, Metric, 10)), + ?assertEqual(15, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id1, Metric, -4)), + ?assertEqual(11, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, AnotherId, Metric)), + + ok. diff --git a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf index c465ef242..b62a2ee68 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf @@ -195,7 +195,7 @@ emqx_bridge_schema { metric_sent_inflight { desc { - en: """Count of messages that were sent asynchronously but ACKs are not received.""" + en: """Count of messages that were sent asynchronously but ACKs are not yet received.""" zh: """已异步地发送但没有收到 ACK 的消息个数。""" } label: { diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 6b5e307d8..fc58bafa0 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -686,7 +686,6 @@ format_resp( format_metrics(#{ counters := #{ - 'batching' := Batched, 'dropped' := Dropped, 'dropped.other' := DroppedOther, 'dropped.queue_full' := DroppedQueueFull, @@ -694,17 +693,19 @@ format_metrics(#{ 'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, - 'queuing' := Queued, 'retried' := Retried, 'failed' := SentFailed, - 'inflight' := SentInflight, 'success' := SentSucc, 'received' := Rcvd }, + gauges := Gauges, rate := #{ matched := #{current := Rate, last5m := Rate5m, max := RateMax} } }) -> + Batched = maps:get('batching', Gauges, 0), + Queued = maps:get('queuing', Gauges, 0), + SentInflight = maps:get('inflight', Gauges, 0), ?METRICS( Batched, Dropped, diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 1bf156ed4..342384593 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -19,6 +19,7 @@ -compile(export_all). -import(emqx_dashboard_api_test_helpers, [request/4, uri/1]). +-import(emqx_common_test_helpers, [on_exit/1]). -include("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -124,6 +125,7 @@ init_per_testcase(_, Config) -> Config. end_per_testcase(_, _Config) -> clear_resources(), + emqx_common_test_helpers:call_janitor(), ok. clear_resources() -> @@ -672,6 +674,12 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"name">> := ?BRIDGE_NAME_EGRESS } = jsx:decode(Bridge), BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + on_exit(fun() -> + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + ok + end), %% we now test if the bridge works as expected LocalTopic = <>, RemoteTopic = <>, @@ -733,15 +741,20 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> %% verify the metrics of the bridge, the message should be queued {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), + Decoded1 = jsx:decode(BridgeStr1), + ?assertMatch( + Status when (Status == <<"connected">> orelse Status == <<"connecting">>), + maps:get(<<"status">>, Decoded1) + ), %% matched >= 3 because of possible retries. ?assertMatch( #{ - <<"status">> := Status, - <<"metrics">> := #{ - <<"matched">> := Matched, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2 - } - } when Matched >= 3 andalso (Status == <<"connected">> orelse Status == <<"connecting">>), - jsx:decode(BridgeStr1) + <<"matched">> := Matched, + <<"success">> := 1, + <<"failed">> := 0, + <<"queuing">> := 2 + } when Matched >= 3, + maps:get(<<"metrics">>, Decoded1) ), %% start the listener 1883 to make the bridge reconnected @@ -766,9 +779,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> %% also verify the 2 messages have been sent to the remote broker assert_mqtt_msg_received(RemoteTopic, Payload1), assert_mqtt_msg_received(RemoteTopic, Payload2), - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), ok. assert_mqtt_msg_received(Topic, Payload) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 10c501865..b5f14780d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -141,9 +141,6 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> 'dropped.resource_not_found', 'dropped.resource_stopped', 'dropped.other', - 'queuing', - 'batching', - 'inflight', 'received' ], [matched] diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index e6637b68f..921373c47 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -24,11 +24,12 @@ ]). -export([ - batching_change/2, + batching_set/3, + batching_shift/3, batching_get/1, - inflight_change/2, + inflight_set/3, inflight_get/1, - queuing_change/2, + queuing_set/3, queuing_get/1, dropped_inc/1, dropped_inc/2, @@ -114,8 +115,6 @@ handle_telemetry_event( _HandlerConfig ) -> case Event of - batching -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val); dropped_other -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val); @@ -133,12 +132,8 @@ handle_telemetry_event( emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val); failed -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val); - inflight -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val); matched -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val); - queuing -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val); retried_failed -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val), @@ -152,6 +147,34 @@ handle_telemetry_event( _ -> ok end; +handle_telemetry_event( + [?TELEMETRY_PREFIX, Event], + _Measurements = #{gauge_set := Val}, + _Metadata = #{resource_id := ID, worker_id := WorkerID}, + _HandlerConfig +) -> + case Event of + batching -> + emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val); + inflight -> + emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val); + queuing -> + emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val); + _ -> + ok + end; +handle_telemetry_event( + [?TELEMETRY_PREFIX, Event], + _Measurements = #{gauge_shift := Val}, + _Metadata = #{resource_id := ID, worker_id := WorkerID}, + _HandlerConfig +) -> + case Event of + batching -> + emqx_metrics_worker:shift_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val); + _ -> + ok + end; handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> ok. @@ -160,26 +183,48 @@ handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> %% @doc Count of messages that are currently accumulated in memory waiting for %% being sent in one batch -batching_change(ID, Val) -> - telemetry:execute([?TELEMETRY_PREFIX, batching], #{counter_inc => Val}, #{resource_id => ID}). +batching_set(ID, WorkerID, Val) -> + telemetry:execute( + [?TELEMETRY_PREFIX, batching], + #{gauge_set => Val}, + #{resource_id => ID, worker_id => WorkerID} + ). + +batching_shift(_ID, _WorkerID = undefined, _Val) -> + ok; +batching_shift(ID, WorkerID, Val) -> + telemetry:execute( + [?TELEMETRY_PREFIX, batching], + #{gauge_shift => Val}, + #{resource_id => ID, worker_id => WorkerID} + ). batching_get(ID) -> - emqx_metrics_worker:get(?RES_METRICS, ID, 'batching'). + emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'batching'). -%% @doc Count of messages that are currently queuing. [Gauge] -queuing_change(ID, Val) -> - telemetry:execute([?TELEMETRY_PREFIX, queuing], #{counter_inc => Val}, #{resource_id => ID}). +%% @doc Count of batches of messages that are currently +%% queuing. [Gauge] +queuing_set(ID, WorkerID, Val) -> + telemetry:execute( + [?TELEMETRY_PREFIX, queuing], + #{gauge_set => Val}, + #{resource_id => ID, worker_id => WorkerID} + ). queuing_get(ID) -> - emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing'). + emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'queuing'). -%% @doc Count of messages that were sent asynchronously but ACKs are not -%% received. [Gauge] -inflight_change(ID, Val) -> - telemetry:execute([?TELEMETRY_PREFIX, inflight], #{counter_inc => Val}, #{resource_id => ID}). +%% @doc Count of batches of messages that were sent asynchronously but +%% ACKs are not yet received. [Gauge] +inflight_set(ID, WorkerID, Val) -> + telemetry:execute( + [?TELEMETRY_PREFIX, inflight], + #{gauge_set => Val}, + #{resource_id => ID, worker_id => WorkerID} + ). inflight_get(ID) -> - emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight'). + emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'inflight'). %% Counters (value can only got up): %% -------------------------------------- diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 482c82f6a..44cfec1b1 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -52,7 +52,7 @@ -export([queue_item_marshaller/1, estimate_size/1]). --export([reply_after_query/6, batch_reply_after_query/6]). +-export([reply_after_query/7, batch_reply_after_query/7]). -define(Q_ITEM(REQUEST), {q_item, REQUEST}). @@ -90,13 +90,27 @@ async_query(Id, Request, Opts) -> %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> - Result = call_query(sync, Id, ?QUERY(self(), Request, false), #{}), + %% Note: since calling this function implies in bypassing the + %% buffer workers, and each buffer worker index is used when + %% collecting gauge metrics, we use this dummy index. If this + %% call ends up calling buffering functions, that's a bug and + %% would mess up the metrics anyway. `undefined' is ignored by + %% `emqx_resource_metrics:*_shift/3'. + Index = undefined, + Result = call_query(sync, Id, Index, ?QUERY(self(), Request, false), #{}), _ = handle_query_result(Id, Result, false, false), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). simple_async_query(Id, Request, ReplyFun) -> - Result = call_query(async, Id, ?QUERY(ReplyFun, Request, false), #{}), + %% Note: since calling this function implies in bypassing the + %% buffer workers, and each buffer worker index is used when + %% collecting gauge metrics, we use this dummy index. If this + %% call ends up calling buffering functions, that's a bug and + %% would mess up the metrics anyway. `undefined' is ignored by + %% `emqx_resource_metrics:*_shift/3'. + Index = undefined, + Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), #{}), _ = handle_query_result(Id, Result, false, false), Result. @@ -133,9 +147,11 @@ init({Id, Index, Opts}) -> false -> undefined end, - emqx_resource_metrics:queuing_change(Id, queue_count(Queue)), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)), + emqx_resource_metrics:batching_set(Id, Index, 0), + emqx_resource_metrics:inflight_set(Id, Index, 0), InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), - ok = inflight_new(Name, InfltWinSZ), + ok = inflight_new(Name, InfltWinSZ, Id, Index), HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), St = #{ id => Id, @@ -158,10 +174,12 @@ running(cast, resume, _St) -> keep_state_and_data; running(cast, block, St) -> {next_state, blocked, St}; -running(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when +running( + cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, index := Index, queue := Q} = St +) when is_list(Batch) -> - Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), + Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]), {next_state, blocked, St#{queue := Q1}}; running({call, From}, {query, Request, _Opts}, St) -> query_or_acc(From, Request, St); @@ -180,28 +198,39 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) -> {keep_state_and_data, {state_timeout, ResumeT, resume}}; blocked(cast, block, _St) -> keep_state_and_data; -blocked(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when +blocked( + cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, index := Index, queue := Q} = St +) when is_list(Batch) -> - Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), + Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]), {keep_state, St#{queue := Q1}}; blocked(cast, resume, St) -> do_resume(St); blocked(state_timeout, resume, St) -> do_resume(St); -blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) -> +blocked({call, From}, {query, Request, _Opts}, #{id := Id, index := Index, queue := Q} = St) -> Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), _ = reply_caller(Id, ?REPLY(From, Request, false, Error)), - {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request, false))])}}; -blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) -> + {keep_state, St#{ + queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(From, Request, false))]) + }}; +blocked(cast, {query, Request, Opts}, #{id := Id, index := Index, queue := Q} = St) -> ReplyFun = maps:get(async_reply_fun, Opts, undefined), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), _ = reply_caller(Id, ?REPLY(ReplyFun, Request, false, Error)), {keep_state, St#{ - queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))]) + queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))]) }}. -terminate(_Reason, #{id := Id, index := Index}) -> +terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> + GaugeFns = + [ + fun emqx_resource_metrics:batching_set/3, + fun emqx_resource_metrics:inflight_set/3 + ], + lists:foreach(fun(Fn) -> Fn(Id, Index, 0) end, GaugeFns), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)), gproc_pool:disconnect_worker(Id, {Id, Index}). code_change(_OldVsn, State, _Extra) -> @@ -240,24 +269,33 @@ do_resume(#{id := Id, name := Name} = St) -> retry_queue(#{queue := undefined} = St) -> {next_state, running, St}; -retry_queue(#{queue := Q, id := Id, enable_batch := false, resume_interval := ResumeT} = St) -> +retry_queue( + #{ + queue := Q, + id := Id, + index := Index, + enable_batch := false, + resume_interval := ResumeT + } = St +) -> case get_first_n_from_queue(Q, 1) of [] -> {next_state, running, St}; [?QUERY(_, Request, HasSent) = Query] -> QueryOpts = #{inflight_name => maps:get(name, St)}, - Result = call_query(configured, Id, Query, QueryOpts), + Result = call_query(configured, Id, Index, Query, QueryOpts), case reply_caller(Id, ?REPLY(undefined, Request, HasSent, Result)) of true -> {keep_state, St, {state_timeout, ResumeT, resume}}; false -> - retry_queue(St#{queue := drop_head(Q, Id)}) + retry_queue(St#{queue := drop_head(Q, Id, Index)}) end end; retry_queue( #{ queue := Q, id := Id, + index := Index, enable_batch := true, batch_size := BatchSize, resume_interval := ResumeT @@ -268,7 +306,7 @@ retry_queue( {next_state, running, St}; Batch0 -> QueryOpts = #{inflight_name => maps:get(name, St)}, - Result = call_query(configured, Id, Batch0, QueryOpts), + Result = call_query(configured, Id, Index, Batch0, QueryOpts), %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue, %% we now change the 'from' field to 'undefined' so it will not reply the caller again. Batch = [?QUERY(undefined, Request, HasSent) || ?QUERY(_, Request, HasSent) <- Batch0], @@ -276,41 +314,55 @@ retry_queue( true -> {keep_state, St, {state_timeout, ResumeT, resume}}; false -> - retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id)}) + retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id, Index)}) end end. retry_inflight_sync( - Id, Ref, ?QUERY(_, _, HasSent) = Query, Name, #{resume_interval := ResumeT} = St0 + Id, + Ref, + ?QUERY(_, _, HasSent) = Query, + Name, + #{index := Index, resume_interval := ResumeT} = St0 ) -> - Result = call_query(sync, Id, Query, #{}), + Result = call_query(sync, Id, Index, Query, #{}), case handle_query_result(Id, Result, HasSent, false) of %% Send failed because resource down true -> {keep_state, St0, {state_timeout, ResumeT, resume}}; %% Send ok or failed but the resource is working false -> - inflight_drop(Name, Ref), + inflight_drop(Name, Ref, Id, Index), do_resume(St0) end. -query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> +query_or_acc( + From, + Request, + #{ + enable_batch := true, + acc := Acc, + acc_left := Left, + index := Index, + id := Id + } = St0 +) -> Acc1 = [?QUERY(From, Request, false) | Acc], - emqx_resource_metrics:batching_change(Id, 1), + emqx_resource_metrics:batching_shift(Id, Index, 1), St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); false -> {keep_state, ensure_flush_timer(St)} end; -query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) -> +query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, index := Index} = St) -> QueryOpts = #{ inflight_name => maps:get(name, St) }, - Result = call_query(configured, Id, ?QUERY(From, Request, false), QueryOpts), + Result = call_query(configured, Id, Index, ?QUERY(From, Request, false), QueryOpts), case reply_caller(Id, ?REPLY(From, Request, false, Result)) of true -> Query = ?QUERY(From, Request, false), - {next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}}; + {next_state, blocked, St#{queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query)])}}; false -> {keep_state, St} end. @@ -320,6 +372,7 @@ flush(#{acc := []} = St) -> flush( #{ id := Id, + index := Index, acc := Batch0, batch_size := Size, queue := Q0 @@ -329,12 +382,12 @@ flush( QueryOpts = #{ inflight_name => maps:get(name, St) }, - emqx_resource_metrics:batching_change(Id, -length(Batch)), - Result = call_query(configured, Id, Batch, QueryOpts), + emqx_resource_metrics:batching_shift(Id, Index, -length(Batch)), + Result = call_query(configured, Id, Index, Batch, QueryOpts), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of true -> - Q1 = maybe_append_queue(Id, Q0, [?Q_ITEM(Query) || Query <- Batch]), + Q1 = maybe_append_queue(Id, Index, Q0, [?Q_ITEM(Query) || Query <- Batch]), {next_state, blocked, St1#{queue := Q1}}; false -> {keep_state, St1} @@ -412,7 +465,7 @@ handle_query_result(Id, Result, HasSent, BlockWorker) -> inc_sent_success(Id, HasSent), BlockWorker. -call_query(QM0, Id, Query, QueryOpts) -> +call_query(QM0, Id, Index, Query, QueryOpts) -> ?tp(call_query_enter, #{id => Id, query => Query}), case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} -> @@ -423,7 +476,7 @@ call_query(QM0, Id, Query, QueryOpts) -> end, CM = maps:get(callback_mode, Data), emqx_resource_metrics:matched_inc(Id), - apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts); + apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> emqx_resource_metrics:matched_inc(Id), ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); @@ -452,10 +505,10 @@ call_query(QM0, Id, Query, QueryOpts) -> end ). -apply_query_fun(sync, Mod, Id, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); -apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> +apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), ?APPLY_RESOURCE( @@ -464,21 +517,20 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) true -> {async_return, inflight_full}; false -> - ok = emqx_resource_metrics:inflight_change(Id, 1), - ReplyFun = fun ?MODULE:reply_after_query/6, + ReplyFun = fun ?MODULE:reply_after_query/7, Ref = make_message_ref(), - Args = [self(), Id, Name, Ref, Query], - ok = inflight_append(Name, Ref, Query), + Args = [self(), Id, Index, Name, Ref, Query], + ok = inflight_append(Name, Ref, Query, Id, Index), Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), {async_return, Result} end, Request ); -apply_query_fun(sync, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request, _) <- Batch], ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); -apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> +apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), ?APPLY_RESOURCE( @@ -487,55 +539,46 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) true -> {async_return, inflight_full}; false -> - BatchLen = length(Batch), - ok = emqx_resource_metrics:inflight_change(Id, BatchLen), - ReplyFun = fun ?MODULE:batch_reply_after_query/6, + ReplyFun = fun ?MODULE:batch_reply_after_query/7, Ref = make_message_ref(), - Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, + Args = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]}, Requests = [Request || ?QUERY(_From, Request, _) <- Batch], - ok = inflight_append(Name, Ref, Batch), + ok = inflight_append(Name, Ref, Batch, Id, Index), Result = Mod:on_batch_query_async(Id, Requests, Args, ResSt), {async_return, Result} end, Batch ). -reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request, HasSent), Result) -> - %% NOTE: 'inflight' is message count that sent async but no ACK received, - %% NOT the message number ququed in the inflight window. - emqx_resource_metrics:inflight_change(Id, -1), +reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasSent), Result) -> + %% NOTE: 'inflight' is the count of messages that were sent async + %% but received no ACK, NOT the number of messages queued in the + %% inflight window. case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of true -> - %% we marked these messages are 'queuing' although they are actually - %% keeped in inflight window, not replayq - emqx_resource_metrics:queuing_change(Id, 1), ?MODULE:block(Pid); false -> - drop_inflight_and_resume(Pid, Name, Ref) + drop_inflight_and_resume(Pid, Name, Ref, Id, Index) end. -batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> - %% NOTE: 'inflight' is message count that sent async but no ACK received, - %% NOT the message number ququed in the inflight window. - BatchLen = length(Batch), - emqx_resource_metrics:inflight_change(Id, -BatchLen), +batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) -> + %% NOTE: 'inflight' is the count of messages that were sent async + %% but received no ACK, NOT the number of messages queued in the + %% inflight window. case batch_reply_caller(Id, Result, Batch) of true -> - %% we marked these messages are 'queuing' although they are actually - %% kept in inflight window, not replayq - emqx_resource_metrics:queuing_change(Id, BatchLen), ?MODULE:block(Pid); false -> - drop_inflight_and_resume(Pid, Name, Ref) + drop_inflight_and_resume(Pid, Name, Ref, Id, Index) end. -drop_inflight_and_resume(Pid, Name, Ref) -> +drop_inflight_and_resume(Pid, Name, Ref, Id, Index) -> case inflight_is_full(Name) of true -> - inflight_drop(Name, Ref), + inflight_drop(Name, Ref, Id, Index), ?MODULE:resume(Pid); false -> - inflight_drop(Name, Ref) + inflight_drop(Name, Ref, Id, Index) end. %%============================================================================== @@ -548,10 +591,10 @@ queue_item_marshaller(Bin) when is_binary(Bin) -> estimate_size(QItem) -> size(queue_item_marshaller(QItem)). -maybe_append_queue(Id, undefined, _Items) -> +maybe_append_queue(Id, _Index, undefined, _Items) -> emqx_resource_metrics:dropped_queue_not_enabled_inc(Id), undefined; -maybe_append_queue(Id, Q, Items) -> +maybe_append_queue(Id, Index, Q, Items) -> Q2 = case replayq:overflow(Q) of Overflow when Overflow =< 0 -> @@ -561,13 +604,13 @@ maybe_append_queue(Id, Q, Items) -> {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), - emqx_resource_metrics:queuing_change(Id, -Dropped), emqx_resource_metrics:dropped_queue_full_inc(Id), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 end, - emqx_resource_metrics:queuing_change(Id, 1), - replayq:append(Q2, Items). + Q3 = replayq:append(Q2, Items), + emqx_resource_metrics:queuing_set(Id, Index, replayq:count(Q3)), + Q3. get_first_n_from_queue(Q, N) -> get_first_n_from_queue(Q, N, []). @@ -580,23 +623,23 @@ get_first_n_from_queue(Q, N, Acc) when N > 0 -> ?Q_ITEM(Query) -> get_first_n_from_queue(Q, N - 1, [Query | Acc]) end. -drop_first_n_from_queue(Q, 0, _Id) -> +drop_first_n_from_queue(Q, 0, _Id, _Index) -> Q; -drop_first_n_from_queue(Q, N, Id) when N > 0 -> - drop_first_n_from_queue(drop_head(Q, Id), N - 1, Id). +drop_first_n_from_queue(Q, N, Id, Index) when N > 0 -> + drop_first_n_from_queue(drop_head(Q, Id, Index), N - 1, Id, Index). -drop_head(Q, Id) -> - {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), - ok = replayq:ack(Q1, AckRef), - emqx_resource_metrics:queuing_change(Id, -1), - Q1. +drop_head(Q, Id, Index) -> + {NewQ, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), + ok = replayq:ack(NewQ, AckRef), + emqx_resource_metrics:queuing_set(Id, Index, replayq:count(NewQ)), + NewQ. %%============================================================================== %% the inflight queue for async query -define(SIZE_REF, -1). -inflight_new(Name, InfltWinSZ) -> +inflight_new(Name, InfltWinSZ, Id, Index) -> _ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]), - inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}), + inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}, Id, Index), ok. inflight_get_first(Name) -> @@ -617,27 +660,39 @@ inflight_is_full(undefined) -> false; inflight_is_full(Name) -> [{_, {max_size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF), + Size = inflight_size(Name), + Size >= MaxSize. + +inflight_size(Name) -> + %% Note: we subtract 1 because there's a metadata row that hold + %% the maximum size value. + MetadataRowCount = 1, case ets:info(Name, size) of - Size when Size > MaxSize -> true; - _ -> false + undefined -> 0; + Size -> max(0, Size - MetadataRowCount) end. -inflight_append(undefined, _Ref, _Query) -> +inflight_append(undefined, _Ref, _Query, _Id, _Index) -> ok; -inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch) -> +inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch, Id, Index) -> ets:insert(Name, {Ref, [?QUERY(From, Req, true) || ?QUERY(From, Req, _) <- Batch]}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)), ok; -inflight_append(Name, Ref, ?QUERY(From, Req, _)) -> +inflight_append(Name, Ref, ?QUERY(From, Req, _), Id, Index) -> ets:insert(Name, {Ref, ?QUERY(From, Req, true)}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)), ok; -inflight_append(Name, Ref, Data) -> +inflight_append(Name, Ref, Data, _Id, _Index) -> ets:insert(Name, {Ref, Data}), + %% this is a metadata row being inserted; therefore, we don't bump + %% the inflight metric. ok. -inflight_drop(undefined, _) -> +inflight_drop(undefined, _, _Id, _Index) -> ok; -inflight_drop(Name, Ref) -> +inflight_drop(Name, Ref, Id, Index) -> ets:delete(Name, Ref), + emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)), ok. %%============================================================================== diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 107ca2a93..8a8b5aec2 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -39,6 +39,7 @@ groups() -> init_per_testcase(_, Config) -> emqx_connector_demo:set_callback_mode(always_sync), Config. + end_per_testcase(_, _Config) -> _ = emqx_resource:remove(?ID). @@ -503,7 +504,10 @@ t_stop_start(_) -> ), %% add some metrics to test their persistence - emqx_resource_metrics:batching_change(?ID, 5), + WorkerID0 = <<"worker:0">>, + WorkerID1 = <<"worker:1">>, + emqx_resource_metrics:batching_set(?ID, WorkerID0, 2), + emqx_resource_metrics:batching_set(?ID, WorkerID1, 3), ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)), {ok, _} = emqx_resource:check_and_recreate( @@ -537,7 +541,8 @@ t_stop_start(_) -> ?assert(is_process_alive(Pid1)), %% now stop while resetting the metrics - emqx_resource_metrics:batching_change(?ID, 5), + emqx_resource_metrics:batching_set(?ID, WorkerID0, 1), + emqx_resource_metrics:batching_set(?ID, WorkerID1, 4), ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)), ok = emqx_resource:stop(?ID), ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)), diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 6145716f2..a4ee994b5 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -315,11 +315,11 @@ handle_telemetry_event( emqx_resource_metrics:dropped_queue_full_inc(ID, Val); handle_telemetry_event( [wolff, queuing], - #{counter_inc := Val}, - #{bridge_id := ID}, + #{counter_inc := _Val}, + #{bridge_id := ID, partition_id := PartitionID}, _HandlerConfig -) when is_integer(Val) -> - emqx_resource_metrics:queuing_change(ID, Val); +) when is_integer(_Val) -> + emqx_resource_metrics:queuing_set(ID, PartitionID, 0); handle_telemetry_event( [wolff, retried], #{counter_inc := Val}, @@ -336,11 +336,11 @@ handle_telemetry_event( emqx_resource_metrics:failed_inc(ID, Val); handle_telemetry_event( [wolff, inflight], - #{counter_inc := Val}, - #{bridge_id := ID}, + #{counter_inc := _Val}, + #{bridge_id := ID, partition_id := PartitionID}, _HandlerConfig -) when is_integer(Val) -> - emqx_resource_metrics:inflight_change(ID, Val); +) when is_integer(_Val) -> + emqx_resource_metrics:inflight_set(ID, PartitionID, 0); handle_telemetry_event( [wolff, retried_failed], #{counter_inc := Val}, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 0f4500a7d..4912dca30 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -124,6 +124,7 @@ init_per_testcase(TestCase, Config0) when delete_all_bridges(), Tid = install_telemetry_handler(TestCase), Config = generate_config(Config0), + put(telemetry_table, Tid), [{telemetry_table, Tid} | Config]; false -> {skip, no_batching} @@ -133,6 +134,7 @@ init_per_testcase(TestCase, Config0) -> delete_all_bridges(), Tid = install_telemetry_handler(TestCase), Config = generate_config(Config0), + put(telemetry_table, Tid), [{telemetry_table, Tid} | Config]. end_per_testcase(_TestCase, _Config) -> @@ -393,7 +395,11 @@ assert_metrics(ExpectedMetrics, ResourceId) -> maps:keys(ExpectedMetrics) ), CurrentMetrics = current_metrics(ResourceId), - ?assertEqual(ExpectedMetrics, Metrics, #{current_metrics => CurrentMetrics}), + TelemetryTable = get(telemetry_table), + RecordedEvents = ets:tab2list(TelemetryTable), + ?assertEqual(ExpectedMetrics, Metrics, #{ + current_metrics => CurrentMetrics, recorded_events => RecordedEvents + }), ok. assert_empty_metrics(ResourceId) -> @@ -554,6 +560,7 @@ t_publish_success(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), + QueryMode = ?config(query_mode, Config), Topic = <<"t/topic">>, ?check_trace( create_bridge(Config), @@ -582,6 +589,17 @@ t_publish_success(Config) -> ), %% to avoid test flakiness wait_telemetry_event(TelemetryTable, success, ResourceId), + ExpectedInflightEvents = + case QueryMode of + sync -> 1; + async -> 3 + end, + wait_telemetry_event( + TelemetryTable, + inflight, + ResourceId, + #{n_events => ExpectedInflightEvents, timeout => 5_000} + ), assert_metrics( #{ batching => 0, @@ -601,6 +619,7 @@ t_publish_success_local_topic(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), + QueryMode = ?config(query_mode, Config), LocalTopic = <<"local/topic">>, {ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}), assert_empty_metrics(ResourceId), @@ -619,6 +638,17 @@ t_publish_success_local_topic(Config) -> ), %% to avoid test flakiness wait_telemetry_event(TelemetryTable, success, ResourceId), + ExpectedInflightEvents = + case QueryMode of + sync -> 1; + async -> 3 + end, + wait_telemetry_event( + TelemetryTable, + inflight, + ResourceId, + #{n_events => ExpectedInflightEvents, timeout => 5_000} + ), assert_metrics( #{ batching => 0, @@ -649,6 +679,7 @@ t_publish_templated(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), + QueryMode = ?config(query_mode, Config), Topic = <<"t/topic">>, PayloadTemplate = << "{\"payload\": \"${payload}\"," @@ -694,6 +725,17 @@ t_publish_templated(Config) -> ), %% to avoid test flakiness wait_telemetry_event(TelemetryTable, success, ResourceId), + ExpectedInflightEvents = + case QueryMode of + sync -> 1; + async -> 3 + end, + wait_telemetry_event( + TelemetryTable, + inflight, + ResourceId, + #{n_events => ExpectedInflightEvents, timeout => 5_000} + ), assert_metrics( #{ batching => 0, @@ -1053,7 +1095,7 @@ do_econnrefused_or_timeout_test(Config, Error) -> %% response expired, this succeeds. {econnrefused, async, _} -> wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{ - timeout => 10_000, n_events => 2 + timeout => 10_000, n_events => 1 }), CurrentMetrics = current_metrics(ResourceId), RecordedEvents = ets:tab2list(TelemetryTable), @@ -1290,6 +1332,17 @@ t_unrecoverable_error(Config) -> end ), wait_telemetry_event(TelemetryTable, failed, ResourceId), + ExpectedInflightEvents = + case QueryMode of + sync -> 1; + async -> 3 + end, + wait_telemetry_event( + TelemetryTable, + inflight, + ResourceId, + #{n_events => ExpectedInflightEvents, timeout => 5_000} + ), assert_metrics( #{ batching => 0,