From a0e11f75d93ac6ed155d64f6cd7d5fdc09281bfd Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 14 Dec 2022 10:39:22 -0300 Subject: [PATCH 1/9] refactor(docs): use var for output filepath --- scripts/merge-i18n.escript | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/scripts/merge-i18n.escript b/scripts/merge-i18n.escript index 751ca841c..816cbe182 100755 --- a/scripts/merge-i18n.escript +++ b/scripts/merge-i18n.escript @@ -10,8 +10,9 @@ main(_) -> Conf = [merge(Conf0, Cfgs1), io_lib:nl() ], - ok = filelib:ensure_dir("apps/emqx_dashboard/priv/i18n.conf"), - ok = file:write_file("apps/emqx_dashboard/priv/i18n.conf", Conf). + OutputFile = "apps/emqx_dashboard/priv/i18n.conf", + ok = filelib:ensure_dir(OutputFile), + ok = file:write_file(OutputFile, Conf). merge(BaseConf, Cfgs) -> lists:foldl( From 6fdcba641e40c25778e83740a1055f28b6ef5149 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 14 Dec 2022 10:39:58 -0300 Subject: [PATCH 2/9] test(refactor): no need for monitor the janitor is already linked to the parent --- apps/emqx/test/emqx_test_janitor.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/emqx/test/emqx_test_janitor.erl b/apps/emqx/test/emqx_test_janitor.erl index b7d2c3507..6e81fa81d 100644 --- a/apps/emqx/test/emqx_test_janitor.erl +++ b/apps/emqx/test/emqx_test_janitor.erl @@ -49,8 +49,7 @@ push_on_exit_callback(Server, Callback) when is_function(Callback, 0) -> init(Parent) -> process_flag(trap_exit, true), - Ref = monitor(process, Parent), - {ok, #{callbacks => [], owner => {Ref, Parent}}}. + {ok, #{callbacks => [], owner => Parent}}. terminate(_Reason, #{callbacks := Callbacks}) -> lists:foreach(fun(Fun) -> Fun() end, Callbacks). @@ -63,7 +62,7 @@ handle_call(_Req, _From, State) -> handle_cast(_Req, State) -> {noreply, State}. -handle_info({'DOWN', Ref, process, Parent, _Reason}, State = #{owner := {Ref, Parent}}) -> +handle_info({'EXIT', Parent, _Reason}, State = #{owner := Parent}) -> {stop, normal, State}; handle_info(_Msg, State) -> {noreply, State}. From 4819794401d3bde69e3b0c83903c4d84244d6140 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 16 Dec 2022 17:18:40 -0300 Subject: [PATCH 3/9] test(refactor): decrease test teardown noise --- apps/emqx/test/emqx_common_test_helpers.erl | 10 +++++++++- .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 1 + 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index ac8659735..d6a44df15 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -65,7 +65,8 @@ -export([clear_screen/0]). -export([with_mock/4]). -export([ - on_exit/1 + on_exit/1, + call_janitor/0 ]). %% Toxiproxy API @@ -933,6 +934,13 @@ latency_up_proxy(off, Name, ProxyHost, ProxyPort) -> %% Testcase teardown utilities %%------------------------------------------------------------------------------- +%% stop the janitor gracefully to ensure proper cleanup order and less +%% noise in the logs. +call_janitor() -> + Janitor = get_or_spawn_janitor(), + exit(Janitor, normal), + ok. + get_or_spawn_janitor() -> case get({?MODULE, janitor_proc}) of undefined -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index b84b7d74b..0f4500a7d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -139,6 +139,7 @@ end_per_testcase(_TestCase, _Config) -> ok = snabbkaffe:stop(), delete_all_bridges(), ok = emqx_connector_web_hook_server:stop(), + emqx_common_test_helpers:call_janitor(), ok. %%------------------------------------------------------------------------------ From ce43e6b3d6ac563c92f09573d9ee93c196a1e2be Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 27 Dec 2022 10:26:43 -0300 Subject: [PATCH 4/9] chore: upgrade kafka_protocol, wolff, brod --- lib-ee/emqx_ee_bridge/rebar.config | 6 +++--- mix.exs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index d551bb1b5..ace2673e5 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,9 +1,9 @@ {erl_opts, [debug_info]}. {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.33.0"}}} - , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.0"}}} - , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}} + , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.3"}}} + , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} - , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}} + , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.7"}}} , {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/mix.exs b/mix.exs index 1bf30486e..92f6e8a69 100644 --- a/mix.exs +++ b/mix.exs @@ -132,10 +132,10 @@ defmodule EMQXUmbrella.MixProject do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.4", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.7.0"}, - {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true}, + {:wolff, github: "kafka4beam/wolff", tag: "1.7.3"}, + {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"}, - {:brod, github: "kafka4beam/brod", tag: "3.16.4"}, + {:brod, github: "kafka4beam/brod", tag: "3.16.7"}, {:snappyer, "1.2.8", override: true}, {:supervisor3, "1.1.11", override: true} ] From 24bae2641e07aaac6cf3e75f3d8b65984449bc41 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 29 Dec 2022 16:57:47 -0300 Subject: [PATCH 5/9] chore: upgrade wolff -> 1.7.4 --- lib-ee/emqx_ee_bridge/rebar.config | 2 +- mix.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index ace2673e5..2bd4036e0 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,6 +1,6 @@ {erl_opts, [debug_info]}. {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.33.0"}}} - , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.3"}}} + , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.4"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.7"}}} diff --git a/mix.exs b/mix.exs index 92f6e8a69..5c54e6662 100644 --- a/mix.exs +++ b/mix.exs @@ -132,7 +132,7 @@ defmodule EMQXUmbrella.MixProject do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.4", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.7.3"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.7.4"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"}, {:brod, github: "kafka4beam/brod", tag: "3.16.7"}, From 8b060a75f1fb813498acaa114877937dd595d615 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 27 Dec 2022 10:28:08 -0300 Subject: [PATCH 6/9] refactor(metrics): use absolute gauge values rather than deltas https://emqx.atlassian.net/browse/EMQX-8548 Currently, we face several issues trying to keep resource metrics reasonable. For example, when a resource is re-created and has its metrics reset, but then its durable queue resumes its previous work and leads to strange (often negative) metrics. Instead using `counters` that are shared by more than one worker to manage gauges, we introduce an ETS table whose key is not only scoped by the Resource ID as before, but also by the worker ID. This way, when a worker starts/terminates, they should set their own gauges to their values (often 0 or `replayq:count` when resuming off a queue). With this scoping and initialization procedure, we'll hopefully avoid hitting those strange metrics scenarios and have better control over the gauges. --- apps/emqx/src/emqx_metrics_worker.erl | 125 ++++++++- apps/emqx/test/emqx_metrics_worker_SUITE.erl | 239 +++++++++++++++--- apps/emqx_bridge/i18n/emqx_bridge_schema.conf | 2 +- apps/emqx_bridge/src/emqx_bridge_api.erl | 7 +- .../test/emqx_bridge_mqtt_SUITE.erl | 28 +- .../src/emqx_resource_manager.erl | 3 - .../src/emqx_resource_metrics.erl | 87 +++++-- .../src/emqx_resource_worker.erl | 231 ++++++++++------- .../test/emqx_resource_SUITE.erl | 9 +- .../kafka/emqx_bridge_impl_kafka_producer.erl | 16 +- .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 57 ++++- 11 files changed, 625 insertions(+), 179 deletions(-) diff --git a/apps/emqx/src/emqx_metrics_worker.erl b/apps/emqx/src/emqx_metrics_worker.erl index ab6a0b1a6..47ecca408 100644 --- a/apps/emqx/src/emqx_metrics_worker.erl +++ b/apps/emqx/src/emqx_metrics_worker.erl @@ -18,6 +18,8 @@ -behaviour(gen_server). +-include_lib("stdlib/include/ms_transform.hrl"). + %% API functions -export([ start_link/1, @@ -30,6 +32,11 @@ inc/3, inc/4, get/3, + get_gauge/3, + set_gauge/5, + shift_gauge/5, + get_gauges/2, + delete_gauges/2, get_rate/2, get_counters/2, create_metrics/3, @@ -68,14 +75,21 @@ last5m := float() }. -type metrics() :: #{ - counters := #{atom() => integer()}, - rate := #{atom() => rate()} + counters := #{metric_name() => integer()}, + gauges := #{metric_name() => integer()}, + rate := #{metric_name() => rate()} }. -type handler_name() :: atom(). +%% metric_id() is actually a resource id -type metric_id() :: binary() | atom(). +-type metric_name() :: atom(). +-type worker_id() :: term(). -define(CntrRef(Name), {?MODULE, Name}). -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)). +-define(GAUGE_TABLE(NAME), + list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(NAME) ++ "_gauge") +). -record(rate, { max = 0 :: number(), @@ -112,11 +126,12 @@ child_spec(ChldName, Name) -> modules => [emqx_metrics_worker] }. --spec create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}. +-spec create_metrics(handler_name(), metric_id(), [metric_name()]) -> ok | {error, term()}. create_metrics(Name, Id, Metrics) -> create_metrics(Name, Id, Metrics, Metrics). --spec create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok | {error, term()}. +-spec create_metrics(handler_name(), metric_id(), [metric_name()], [metric_name()]) -> + ok | {error, term()}. create_metrics(Name, Id, Metrics, RateMetrics) -> gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}). @@ -135,7 +150,7 @@ has_metrics(Name, Id) -> _ -> true end. --spec get(handler_name(), metric_id(), atom() | integer()) -> number(). +-spec get(handler_name(), metric_id(), metric_name() | integer()) -> number(). get(Name, Id, Metric) -> case get_ref(Name, Id) of not_found -> @@ -167,16 +182,102 @@ reset_counters(Name, Id) -> -spec get_metrics(handler_name(), metric_id()) -> metrics(). get_metrics(Name, Id) -> - #{rate => get_rate(Name, Id), counters => get_counters(Name, Id)}. + #{ + rate => get_rate(Name, Id), + counters => get_counters(Name, Id), + gauges => get_gauges(Name, Id) + }. -spec inc(handler_name(), metric_id(), atom()) -> ok. inc(Name, Id, Metric) -> inc(Name, Id, Metric, 1). --spec inc(handler_name(), metric_id(), atom(), integer()) -> ok. +-spec inc(handler_name(), metric_id(), metric_name(), integer()) -> ok. inc(Name, Id, Metric, Val) -> counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val). +-spec set_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok. +set_gauge(Name, Id, WorkerId, Metric, Val) -> + Table = ?GAUGE_TABLE(Name), + try + true = ets:insert(Table, {{Id, Metric, WorkerId}, Val}), + ok + catch + error:badarg -> + ok + end. + +-spec shift_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok. +shift_gauge(Name, Id, WorkerId, Metric, Val) -> + Table = ?GAUGE_TABLE(Name), + try + _ = ets:update_counter( + Table, + {Id, Metric, WorkerId}, + Val, + {{Id, Metric, WorkerId}, 0} + ), + ok + catch + error:badarg -> + ok + end. + +-spec get_gauge(handler_name(), metric_id(), metric_name()) -> integer(). +get_gauge(Name, Id, Metric) -> + Table = ?GAUGE_TABLE(Name), + MatchSpec = + ets:fun2ms( + fun({{Id0, Metric0, _WorkerId}, Val}) when Id0 =:= Id, Metric0 =:= Metric -> + Val + end + ), + try + lists:sum(ets:select(Table, MatchSpec)) + catch + error:badarg -> + 0 + end. + +-spec get_gauges(handler_name(), metric_id()) -> map(). +get_gauges(Name, Id) -> + Table = ?GAUGE_TABLE(Name), + MatchSpec = + ets:fun2ms( + fun({{Id0, Metric, _WorkerId}, Val}) when Id0 =:= Id -> + {Metric, Val} + end + ), + try + lists:foldr( + fun({Metric, Val}, Acc) -> + maps:update_with(Metric, fun(X) -> X + Val end, Val, Acc) + end, + #{}, + ets:select(Table, MatchSpec) + ) + catch + error:badarg -> + #{} + end. + +-spec delete_gauges(handler_name(), metric_id()) -> ok. +delete_gauges(Name, Id) -> + Table = ?GAUGE_TABLE(Name), + MatchSpec = + ets:fun2ms( + fun({{Id0, _Metric, _WorkerId}, _Val}) when Id0 =:= Id -> + true + end + ), + try + _ = ets:select_delete(Table, MatchSpec), + ok + catch + error:badarg -> + ok + end. + start_link(Name) -> gen_server:start_link({local, Name}, ?MODULE, Name, []). @@ -185,6 +286,7 @@ init(Name) -> %% the rate metrics erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), persistent_term:put(?CntrRef(Name), #{}), + _ = ets:new(?GAUGE_TABLE(Name), [named_table, ordered_set, public, {write_concurrency, true}]), {ok, #state{}}. handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) -> @@ -220,7 +322,10 @@ handle_call( _From, State = #state{metric_ids = MIDs, rates = Rates} ) -> - {reply, delete_counters(get_self_name(), Id), State#state{ + Name = get_self_name(), + delete_counters(Name, Id), + delete_gauges(Name, Id), + {reply, ok, State#state{ metric_ids = sets:del_element(Id, MIDs), rates = case Rates of @@ -233,7 +338,9 @@ handle_call( _From, State = #state{rates = Rates} ) -> - {reply, reset_counters(get_self_name(), Id), State#state{ + Name = get_self_name(), + delete_gauges(Name, Id), + {reply, reset_counters(Name, Id), State#state{ rates = case Rates of undefined -> diff --git a/apps/emqx/test/emqx_metrics_worker_SUITE.erl b/apps/emqx/test/emqx_metrics_worker_SUITE.erl index 326b0be1e..3f48010c6 100644 --- a/apps/emqx/test/emqx_metrics_worker_SUITE.erl +++ b/apps/emqx/test/emqx_metrics_worker_SUITE.erl @@ -47,7 +47,8 @@ end_per_testcase(_, _Config) -> t_get_metrics(_) -> Metrics = [a, b, c], - ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics), + Id = <<"testid">>, + ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics), %% all the metrics are set to zero at start ?assertMatch( #{ @@ -56,18 +57,22 @@ t_get_metrics(_) -> b := #{current := 0.0, max := 0.0, last5m := 0.0}, c := #{current := 0.0, max := 0.0, last5m := 0.0} }, + gauges := #{}, counters := #{ a := 0, b := 0, c := 0 } }, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), ct:sleep(1500), ?LET( #{ @@ -76,27 +81,73 @@ t_get_metrics(_) -> b := #{current := CurrB, max := MaxB, last5m := _}, c := #{current := CurrC, max := MaxC, last5m := _} }, + gauges := #{ + inflight := Inflight, + queuing := Queuing + }, counters := #{ a := 1, b := 1, c := 2 } }, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>), + emqx_metrics_worker:get_metrics(?NAME, Id), { ?assert(CurrA > 0), ?assert(CurrB > 0), ?assert(CurrC > 0), ?assert(MaxA > 0), ?assert(MaxB > 0), - ?assert(MaxC > 0) + ?assert(MaxC > 0), + ?assert(Inflight == 12), + ?assert(Queuing == 9) } ), - ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, Id). + +t_clear_metrics(_Config) -> + Metrics = [a, b, c], + Id = <<"testid">>, + ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics), + ?assertMatch( + #{ + rate := #{ + a := #{current := 0.0, max := 0.0, last5m := 0.0}, + b := #{current := 0.0, max := 0.0, last5m := 0.0}, + c := #{current := 0.0, max := 0.0, last5m := 0.0} + }, + gauges := #{}, + counters := #{ + a := 0, + b := 0, + c := 0 + } + }, + emqx_metrics_worker:get_metrics(?NAME, Id) + ), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), + ct:sleep(1500), + ok = emqx_metrics_worker:clear_metrics(?NAME, Id), + ?assertEqual( + #{ + counters => #{}, + gauges => #{}, + rate => #{current => 0.0, last5m => 0.0, max => 0.0} + }, + emqx_metrics_worker:get_metrics(?NAME, Id) + ), + ok. t_reset_metrics(_) -> Metrics = [a, b, c], - ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics), + Id = <<"testid">>, + ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics), %% all the metrics are set to zero at start ?assertMatch( #{ @@ -105,20 +156,24 @@ t_reset_metrics(_) -> b := #{current := 0.0, max := 0.0, last5m := 0.0}, c := #{current := 0.0, max := 0.0, last5m := 0.0} }, + gauges := #{}, counters := #{ a := 0, b := 0, c := 0 } }, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), ct:sleep(1500), - ok = emqx_metrics_worker:reset_metrics(?NAME, <<"testid">>), + ok = emqx_metrics_worker:reset_metrics(?NAME, Id), ?LET( #{ rate := #{ @@ -126,68 +181,83 @@ t_reset_metrics(_) -> b := #{current := CurrB, max := MaxB, last5m := _}, c := #{current := CurrC, max := MaxC, last5m := _} }, + gauges := Gauges, counters := #{ a := 0, b := 0, c := 0 } }, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>), + emqx_metrics_worker:get_metrics(?NAME, Id), { ?assert(CurrA == 0), ?assert(CurrB == 0), ?assert(CurrC == 0), ?assert(MaxA == 0), ?assert(MaxB == 0), - ?assert(MaxC == 0) + ?assert(MaxC == 0), + ?assertEqual(#{}, Gauges) } ), - ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, Id). t_get_metrics_2(_) -> Metrics = [a, b, c], + Id = <<"testid">>, ok = emqx_metrics_worker:create_metrics( ?NAME, - <<"testid">>, + Id, Metrics, [a] ), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), ?assertMatch( #{ rate := Rate = #{ a := #{current := _, max := _, last5m := _} }, + gauges := #{}, counters := #{ a := 1, b := 1, c := 1 } } when map_size(Rate) =:= 1, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), - ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, Id). t_recreate_metrics(_) -> - ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a]), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), + Id = <<"testid">>, + ok = emqx_metrics_worker:create_metrics(?NAME, Id, [a]), + ok = emqx_metrics_worker:inc(?NAME, Id, a), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), ?assertMatch( #{ rate := R = #{ a := #{current := _, max := _, last5m := _} }, + gauges := #{ + inflight := 12, + queuing := 9 + }, counters := C = #{ a := 1 } } when map_size(R) == 1 andalso map_size(C) == 1, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), %% we create the metrics again, to add some counters - ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a, b, c]), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), - ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:create_metrics(?NAME, Id, [a, b, c]), + ok = emqx_metrics_worker:inc(?NAME, Id, b), + ok = emqx_metrics_worker:inc(?NAME, Id, c), ?assertMatch( #{ rate := R = #{ @@ -195,13 +265,17 @@ t_recreate_metrics(_) -> b := #{current := _, max := _, last5m := _}, c := #{current := _, max := _, last5m := _} }, + gauges := #{ + inflight := 12, + queuing := 9 + }, counters := C = #{ a := 1, b := 1, c := 1 } } when map_size(R) == 3 andalso map_size(C) == 3, - emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, Id) ), - ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, Id). t_inc_matched(_) -> Metrics = ['rules.matched'], @@ -238,3 +312,102 @@ t_rate(_) -> ct:sleep(3000), ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule1">>), ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule:2">>). + +t_get_gauge(_Config) -> + Metric = 'queueing', + %% unknown handler name (inexistent table) + ?assertEqual(0, emqx_metrics_worker:get_gauge(unknown_name, unknown_id, Metric)), + %% unknown resource id + ?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, unknown_id, Metric)), + + Id = <<"some id">>, + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2), + + ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + ?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, unknown, Metric)), + + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3), + ?assertEqual(5, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 1), + ?assertEqual(4, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, Id, another_metric)), + + ok. + +t_get_gauges(_Config) -> + %% unknown handler name (inexistent table) + ?assertEqual(#{}, emqx_metrics_worker:get_gauges(unknown_name, unknown_id)), + %% unknown resource id + ?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, unknown_id)), + + Metric = 'queuing', + Id = <<"some id">>, + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2), + + ?assertEqual(#{queuing => 2}, emqx_metrics_worker:get_gauges(?NAME, Id)), + ?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, unknown)), + + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3), + ?assertEqual(#{queuing => 5}, emqx_metrics_worker:get_gauges(?NAME, Id)), + + AnotherMetric = 'inflight', + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 1), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, AnotherMetric, 10), + ?assertEqual(#{queuing => 4, inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, Id)), + + ok. + +t_delete_gauge(_Config) -> + %% unknown handler name (inexistent table) + ?assertEqual(ok, emqx_metrics_worker:delete_gauges(unknown_name, unknown_id)), + %% unknown resource id + ?assertEqual(ok, emqx_metrics_worker:delete_gauges(?NAME, unknown_id)), + + Metric = 'queuing', + AnotherMetric = 'inflight', + Id = <<"some id">>, + AnotherId = <<"another id">>, + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3), + ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, AnotherMetric, 10), + ok = emqx_metrics_worker:set_gauge(?NAME, AnotherId, worker_id1, AnotherMetric, 10), + ?assertEqual(#{queuing => 5, inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, Id)), + + ?assertEqual(ok, emqx_metrics_worker:delete_gauges(?NAME, Id)), + + ?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, Id)), + ?assertEqual(#{inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, AnotherId)), + + ok. + +t_shift_gauge(_Config) -> + Metric = 'queueing', + Id = <<"some id">>, + AnotherId = <<"another id">>, + + %% unknown handler name (inexistent table) + ?assertEqual( + ok, emqx_metrics_worker:shift_gauge(unknown_name, unknown_id, worker_id0, Metric, 2) + ), + ?assertEqual(0, emqx_metrics_worker:get_gauge(unknown_name, unknown_id, Metric)), + + %% empty resource id + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id0, Metric, 2)), + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, AnotherId, worker_id0, Metric, 2)), + ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, AnotherId, Metric)), + + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id0, Metric, 3)), + ?assertEqual(5, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id1, Metric, 10)), + ?assertEqual(15, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id1, Metric, -4)), + ?assertEqual(11, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)), + + ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, AnotherId, Metric)), + + ok. diff --git a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf index c465ef242..b62a2ee68 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf @@ -195,7 +195,7 @@ emqx_bridge_schema { metric_sent_inflight { desc { - en: """Count of messages that were sent asynchronously but ACKs are not received.""" + en: """Count of messages that were sent asynchronously but ACKs are not yet received.""" zh: """已异步地发送但没有收到 ACK 的消息个数。""" } label: { diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 6b5e307d8..fc58bafa0 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -686,7 +686,6 @@ format_resp( format_metrics(#{ counters := #{ - 'batching' := Batched, 'dropped' := Dropped, 'dropped.other' := DroppedOther, 'dropped.queue_full' := DroppedQueueFull, @@ -694,17 +693,19 @@ format_metrics(#{ 'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, - 'queuing' := Queued, 'retried' := Retried, 'failed' := SentFailed, - 'inflight' := SentInflight, 'success' := SentSucc, 'received' := Rcvd }, + gauges := Gauges, rate := #{ matched := #{current := Rate, last5m := Rate5m, max := RateMax} } }) -> + Batched = maps:get('batching', Gauges, 0), + Queued = maps:get('queuing', Gauges, 0), + SentInflight = maps:get('inflight', Gauges, 0), ?METRICS( Batched, Dropped, diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 1bf156ed4..342384593 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -19,6 +19,7 @@ -compile(export_all). -import(emqx_dashboard_api_test_helpers, [request/4, uri/1]). +-import(emqx_common_test_helpers, [on_exit/1]). -include("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -124,6 +125,7 @@ init_per_testcase(_, Config) -> Config. end_per_testcase(_, _Config) -> clear_resources(), + emqx_common_test_helpers:call_janitor(), ok. clear_resources() -> @@ -672,6 +674,12 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"name">> := ?BRIDGE_NAME_EGRESS } = jsx:decode(Bridge), BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + on_exit(fun() -> + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + ok + end), %% we now test if the bridge works as expected LocalTopic = <>, RemoteTopic = <>, @@ -733,15 +741,20 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> %% verify the metrics of the bridge, the message should be queued {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), + Decoded1 = jsx:decode(BridgeStr1), + ?assertMatch( + Status when (Status == <<"connected">> orelse Status == <<"connecting">>), + maps:get(<<"status">>, Decoded1) + ), %% matched >= 3 because of possible retries. ?assertMatch( #{ - <<"status">> := Status, - <<"metrics">> := #{ - <<"matched">> := Matched, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2 - } - } when Matched >= 3 andalso (Status == <<"connected">> orelse Status == <<"connecting">>), - jsx:decode(BridgeStr1) + <<"matched">> := Matched, + <<"success">> := 1, + <<"failed">> := 0, + <<"queuing">> := 2 + } when Matched >= 3, + maps:get(<<"metrics">>, Decoded1) ), %% start the listener 1883 to make the bridge reconnected @@ -766,9 +779,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> %% also verify the 2 messages have been sent to the remote broker assert_mqtt_msg_received(RemoteTopic, Payload1), assert_mqtt_msg_received(RemoteTopic, Payload2), - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), ok. assert_mqtt_msg_received(Topic, Payload) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 10c501865..b5f14780d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -141,9 +141,6 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> 'dropped.resource_not_found', 'dropped.resource_stopped', 'dropped.other', - 'queuing', - 'batching', - 'inflight', 'received' ], [matched] diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index e6637b68f..921373c47 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -24,11 +24,12 @@ ]). -export([ - batching_change/2, + batching_set/3, + batching_shift/3, batching_get/1, - inflight_change/2, + inflight_set/3, inflight_get/1, - queuing_change/2, + queuing_set/3, queuing_get/1, dropped_inc/1, dropped_inc/2, @@ -114,8 +115,6 @@ handle_telemetry_event( _HandlerConfig ) -> case Event of - batching -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val); dropped_other -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val); @@ -133,12 +132,8 @@ handle_telemetry_event( emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val); failed -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val); - inflight -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val); matched -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val); - queuing -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val); retried_failed -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val), @@ -152,6 +147,34 @@ handle_telemetry_event( _ -> ok end; +handle_telemetry_event( + [?TELEMETRY_PREFIX, Event], + _Measurements = #{gauge_set := Val}, + _Metadata = #{resource_id := ID, worker_id := WorkerID}, + _HandlerConfig +) -> + case Event of + batching -> + emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val); + inflight -> + emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val); + queuing -> + emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val); + _ -> + ok + end; +handle_telemetry_event( + [?TELEMETRY_PREFIX, Event], + _Measurements = #{gauge_shift := Val}, + _Metadata = #{resource_id := ID, worker_id := WorkerID}, + _HandlerConfig +) -> + case Event of + batching -> + emqx_metrics_worker:shift_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val); + _ -> + ok + end; handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> ok. @@ -160,26 +183,48 @@ handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> %% @doc Count of messages that are currently accumulated in memory waiting for %% being sent in one batch -batching_change(ID, Val) -> - telemetry:execute([?TELEMETRY_PREFIX, batching], #{counter_inc => Val}, #{resource_id => ID}). +batching_set(ID, WorkerID, Val) -> + telemetry:execute( + [?TELEMETRY_PREFIX, batching], + #{gauge_set => Val}, + #{resource_id => ID, worker_id => WorkerID} + ). + +batching_shift(_ID, _WorkerID = undefined, _Val) -> + ok; +batching_shift(ID, WorkerID, Val) -> + telemetry:execute( + [?TELEMETRY_PREFIX, batching], + #{gauge_shift => Val}, + #{resource_id => ID, worker_id => WorkerID} + ). batching_get(ID) -> - emqx_metrics_worker:get(?RES_METRICS, ID, 'batching'). + emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'batching'). -%% @doc Count of messages that are currently queuing. [Gauge] -queuing_change(ID, Val) -> - telemetry:execute([?TELEMETRY_PREFIX, queuing], #{counter_inc => Val}, #{resource_id => ID}). +%% @doc Count of batches of messages that are currently +%% queuing. [Gauge] +queuing_set(ID, WorkerID, Val) -> + telemetry:execute( + [?TELEMETRY_PREFIX, queuing], + #{gauge_set => Val}, + #{resource_id => ID, worker_id => WorkerID} + ). queuing_get(ID) -> - emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing'). + emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'queuing'). -%% @doc Count of messages that were sent asynchronously but ACKs are not -%% received. [Gauge] -inflight_change(ID, Val) -> - telemetry:execute([?TELEMETRY_PREFIX, inflight], #{counter_inc => Val}, #{resource_id => ID}). +%% @doc Count of batches of messages that were sent asynchronously but +%% ACKs are not yet received. [Gauge] +inflight_set(ID, WorkerID, Val) -> + telemetry:execute( + [?TELEMETRY_PREFIX, inflight], + #{gauge_set => Val}, + #{resource_id => ID, worker_id => WorkerID} + ). inflight_get(ID) -> - emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight'). + emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'inflight'). %% Counters (value can only got up): %% -------------------------------------- diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 482c82f6a..44cfec1b1 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -52,7 +52,7 @@ -export([queue_item_marshaller/1, estimate_size/1]). --export([reply_after_query/6, batch_reply_after_query/6]). +-export([reply_after_query/7, batch_reply_after_query/7]). -define(Q_ITEM(REQUEST), {q_item, REQUEST}). @@ -90,13 +90,27 @@ async_query(Id, Request, Opts) -> %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> - Result = call_query(sync, Id, ?QUERY(self(), Request, false), #{}), + %% Note: since calling this function implies in bypassing the + %% buffer workers, and each buffer worker index is used when + %% collecting gauge metrics, we use this dummy index. If this + %% call ends up calling buffering functions, that's a bug and + %% would mess up the metrics anyway. `undefined' is ignored by + %% `emqx_resource_metrics:*_shift/3'. + Index = undefined, + Result = call_query(sync, Id, Index, ?QUERY(self(), Request, false), #{}), _ = handle_query_result(Id, Result, false, false), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). simple_async_query(Id, Request, ReplyFun) -> - Result = call_query(async, Id, ?QUERY(ReplyFun, Request, false), #{}), + %% Note: since calling this function implies in bypassing the + %% buffer workers, and each buffer worker index is used when + %% collecting gauge metrics, we use this dummy index. If this + %% call ends up calling buffering functions, that's a bug and + %% would mess up the metrics anyway. `undefined' is ignored by + %% `emqx_resource_metrics:*_shift/3'. + Index = undefined, + Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), #{}), _ = handle_query_result(Id, Result, false, false), Result. @@ -133,9 +147,11 @@ init({Id, Index, Opts}) -> false -> undefined end, - emqx_resource_metrics:queuing_change(Id, queue_count(Queue)), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)), + emqx_resource_metrics:batching_set(Id, Index, 0), + emqx_resource_metrics:inflight_set(Id, Index, 0), InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), - ok = inflight_new(Name, InfltWinSZ), + ok = inflight_new(Name, InfltWinSZ, Id, Index), HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), St = #{ id => Id, @@ -158,10 +174,12 @@ running(cast, resume, _St) -> keep_state_and_data; running(cast, block, St) -> {next_state, blocked, St}; -running(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when +running( + cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, index := Index, queue := Q} = St +) when is_list(Batch) -> - Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), + Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]), {next_state, blocked, St#{queue := Q1}}; running({call, From}, {query, Request, _Opts}, St) -> query_or_acc(From, Request, St); @@ -180,28 +198,39 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) -> {keep_state_and_data, {state_timeout, ResumeT, resume}}; blocked(cast, block, _St) -> keep_state_and_data; -blocked(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when +blocked( + cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, index := Index, queue := Q} = St +) when is_list(Batch) -> - Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), + Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]), {keep_state, St#{queue := Q1}}; blocked(cast, resume, St) -> do_resume(St); blocked(state_timeout, resume, St) -> do_resume(St); -blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) -> +blocked({call, From}, {query, Request, _Opts}, #{id := Id, index := Index, queue := Q} = St) -> Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), _ = reply_caller(Id, ?REPLY(From, Request, false, Error)), - {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request, false))])}}; -blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) -> + {keep_state, St#{ + queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(From, Request, false))]) + }}; +blocked(cast, {query, Request, Opts}, #{id := Id, index := Index, queue := Q} = St) -> ReplyFun = maps:get(async_reply_fun, Opts, undefined), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), _ = reply_caller(Id, ?REPLY(ReplyFun, Request, false, Error)), {keep_state, St#{ - queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))]) + queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))]) }}. -terminate(_Reason, #{id := Id, index := Index}) -> +terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> + GaugeFns = + [ + fun emqx_resource_metrics:batching_set/3, + fun emqx_resource_metrics:inflight_set/3 + ], + lists:foreach(fun(Fn) -> Fn(Id, Index, 0) end, GaugeFns), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)), gproc_pool:disconnect_worker(Id, {Id, Index}). code_change(_OldVsn, State, _Extra) -> @@ -240,24 +269,33 @@ do_resume(#{id := Id, name := Name} = St) -> retry_queue(#{queue := undefined} = St) -> {next_state, running, St}; -retry_queue(#{queue := Q, id := Id, enable_batch := false, resume_interval := ResumeT} = St) -> +retry_queue( + #{ + queue := Q, + id := Id, + index := Index, + enable_batch := false, + resume_interval := ResumeT + } = St +) -> case get_first_n_from_queue(Q, 1) of [] -> {next_state, running, St}; [?QUERY(_, Request, HasSent) = Query] -> QueryOpts = #{inflight_name => maps:get(name, St)}, - Result = call_query(configured, Id, Query, QueryOpts), + Result = call_query(configured, Id, Index, Query, QueryOpts), case reply_caller(Id, ?REPLY(undefined, Request, HasSent, Result)) of true -> {keep_state, St, {state_timeout, ResumeT, resume}}; false -> - retry_queue(St#{queue := drop_head(Q, Id)}) + retry_queue(St#{queue := drop_head(Q, Id, Index)}) end end; retry_queue( #{ queue := Q, id := Id, + index := Index, enable_batch := true, batch_size := BatchSize, resume_interval := ResumeT @@ -268,7 +306,7 @@ retry_queue( {next_state, running, St}; Batch0 -> QueryOpts = #{inflight_name => maps:get(name, St)}, - Result = call_query(configured, Id, Batch0, QueryOpts), + Result = call_query(configured, Id, Index, Batch0, QueryOpts), %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue, %% we now change the 'from' field to 'undefined' so it will not reply the caller again. Batch = [?QUERY(undefined, Request, HasSent) || ?QUERY(_, Request, HasSent) <- Batch0], @@ -276,41 +314,55 @@ retry_queue( true -> {keep_state, St, {state_timeout, ResumeT, resume}}; false -> - retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id)}) + retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id, Index)}) end end. retry_inflight_sync( - Id, Ref, ?QUERY(_, _, HasSent) = Query, Name, #{resume_interval := ResumeT} = St0 + Id, + Ref, + ?QUERY(_, _, HasSent) = Query, + Name, + #{index := Index, resume_interval := ResumeT} = St0 ) -> - Result = call_query(sync, Id, Query, #{}), + Result = call_query(sync, Id, Index, Query, #{}), case handle_query_result(Id, Result, HasSent, false) of %% Send failed because resource down true -> {keep_state, St0, {state_timeout, ResumeT, resume}}; %% Send ok or failed but the resource is working false -> - inflight_drop(Name, Ref), + inflight_drop(Name, Ref, Id, Index), do_resume(St0) end. -query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> +query_or_acc( + From, + Request, + #{ + enable_batch := true, + acc := Acc, + acc_left := Left, + index := Index, + id := Id + } = St0 +) -> Acc1 = [?QUERY(From, Request, false) | Acc], - emqx_resource_metrics:batching_change(Id, 1), + emqx_resource_metrics:batching_shift(Id, Index, 1), St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); false -> {keep_state, ensure_flush_timer(St)} end; -query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) -> +query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, index := Index} = St) -> QueryOpts = #{ inflight_name => maps:get(name, St) }, - Result = call_query(configured, Id, ?QUERY(From, Request, false), QueryOpts), + Result = call_query(configured, Id, Index, ?QUERY(From, Request, false), QueryOpts), case reply_caller(Id, ?REPLY(From, Request, false, Result)) of true -> Query = ?QUERY(From, Request, false), - {next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}}; + {next_state, blocked, St#{queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query)])}}; false -> {keep_state, St} end. @@ -320,6 +372,7 @@ flush(#{acc := []} = St) -> flush( #{ id := Id, + index := Index, acc := Batch0, batch_size := Size, queue := Q0 @@ -329,12 +382,12 @@ flush( QueryOpts = #{ inflight_name => maps:get(name, St) }, - emqx_resource_metrics:batching_change(Id, -length(Batch)), - Result = call_query(configured, Id, Batch, QueryOpts), + emqx_resource_metrics:batching_shift(Id, Index, -length(Batch)), + Result = call_query(configured, Id, Index, Batch, QueryOpts), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of true -> - Q1 = maybe_append_queue(Id, Q0, [?Q_ITEM(Query) || Query <- Batch]), + Q1 = maybe_append_queue(Id, Index, Q0, [?Q_ITEM(Query) || Query <- Batch]), {next_state, blocked, St1#{queue := Q1}}; false -> {keep_state, St1} @@ -412,7 +465,7 @@ handle_query_result(Id, Result, HasSent, BlockWorker) -> inc_sent_success(Id, HasSent), BlockWorker. -call_query(QM0, Id, Query, QueryOpts) -> +call_query(QM0, Id, Index, Query, QueryOpts) -> ?tp(call_query_enter, #{id => Id, query => Query}), case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} -> @@ -423,7 +476,7 @@ call_query(QM0, Id, Query, QueryOpts) -> end, CM = maps:get(callback_mode, Data), emqx_resource_metrics:matched_inc(Id), - apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts); + apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> emqx_resource_metrics:matched_inc(Id), ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); @@ -452,10 +505,10 @@ call_query(QM0, Id, Query, QueryOpts) -> end ). -apply_query_fun(sync, Mod, Id, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); -apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> +apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), ?APPLY_RESOURCE( @@ -464,21 +517,20 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) true -> {async_return, inflight_full}; false -> - ok = emqx_resource_metrics:inflight_change(Id, 1), - ReplyFun = fun ?MODULE:reply_after_query/6, + ReplyFun = fun ?MODULE:reply_after_query/7, Ref = make_message_ref(), - Args = [self(), Id, Name, Ref, Query], - ok = inflight_append(Name, Ref, Query), + Args = [self(), Id, Index, Name, Ref, Query], + ok = inflight_append(Name, Ref, Query, Id, Index), Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), {async_return, Result} end, Request ); -apply_query_fun(sync, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request, _) <- Batch], ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); -apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> +apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), ?APPLY_RESOURCE( @@ -487,55 +539,46 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) true -> {async_return, inflight_full}; false -> - BatchLen = length(Batch), - ok = emqx_resource_metrics:inflight_change(Id, BatchLen), - ReplyFun = fun ?MODULE:batch_reply_after_query/6, + ReplyFun = fun ?MODULE:batch_reply_after_query/7, Ref = make_message_ref(), - Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, + Args = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]}, Requests = [Request || ?QUERY(_From, Request, _) <- Batch], - ok = inflight_append(Name, Ref, Batch), + ok = inflight_append(Name, Ref, Batch, Id, Index), Result = Mod:on_batch_query_async(Id, Requests, Args, ResSt), {async_return, Result} end, Batch ). -reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request, HasSent), Result) -> - %% NOTE: 'inflight' is message count that sent async but no ACK received, - %% NOT the message number ququed in the inflight window. - emqx_resource_metrics:inflight_change(Id, -1), +reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasSent), Result) -> + %% NOTE: 'inflight' is the count of messages that were sent async + %% but received no ACK, NOT the number of messages queued in the + %% inflight window. case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of true -> - %% we marked these messages are 'queuing' although they are actually - %% keeped in inflight window, not replayq - emqx_resource_metrics:queuing_change(Id, 1), ?MODULE:block(Pid); false -> - drop_inflight_and_resume(Pid, Name, Ref) + drop_inflight_and_resume(Pid, Name, Ref, Id, Index) end. -batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> - %% NOTE: 'inflight' is message count that sent async but no ACK received, - %% NOT the message number ququed in the inflight window. - BatchLen = length(Batch), - emqx_resource_metrics:inflight_change(Id, -BatchLen), +batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) -> + %% NOTE: 'inflight' is the count of messages that were sent async + %% but received no ACK, NOT the number of messages queued in the + %% inflight window. case batch_reply_caller(Id, Result, Batch) of true -> - %% we marked these messages are 'queuing' although they are actually - %% kept in inflight window, not replayq - emqx_resource_metrics:queuing_change(Id, BatchLen), ?MODULE:block(Pid); false -> - drop_inflight_and_resume(Pid, Name, Ref) + drop_inflight_and_resume(Pid, Name, Ref, Id, Index) end. -drop_inflight_and_resume(Pid, Name, Ref) -> +drop_inflight_and_resume(Pid, Name, Ref, Id, Index) -> case inflight_is_full(Name) of true -> - inflight_drop(Name, Ref), + inflight_drop(Name, Ref, Id, Index), ?MODULE:resume(Pid); false -> - inflight_drop(Name, Ref) + inflight_drop(Name, Ref, Id, Index) end. %%============================================================================== @@ -548,10 +591,10 @@ queue_item_marshaller(Bin) when is_binary(Bin) -> estimate_size(QItem) -> size(queue_item_marshaller(QItem)). -maybe_append_queue(Id, undefined, _Items) -> +maybe_append_queue(Id, _Index, undefined, _Items) -> emqx_resource_metrics:dropped_queue_not_enabled_inc(Id), undefined; -maybe_append_queue(Id, Q, Items) -> +maybe_append_queue(Id, Index, Q, Items) -> Q2 = case replayq:overflow(Q) of Overflow when Overflow =< 0 -> @@ -561,13 +604,13 @@ maybe_append_queue(Id, Q, Items) -> {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), - emqx_resource_metrics:queuing_change(Id, -Dropped), emqx_resource_metrics:dropped_queue_full_inc(Id), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 end, - emqx_resource_metrics:queuing_change(Id, 1), - replayq:append(Q2, Items). + Q3 = replayq:append(Q2, Items), + emqx_resource_metrics:queuing_set(Id, Index, replayq:count(Q3)), + Q3. get_first_n_from_queue(Q, N) -> get_first_n_from_queue(Q, N, []). @@ -580,23 +623,23 @@ get_first_n_from_queue(Q, N, Acc) when N > 0 -> ?Q_ITEM(Query) -> get_first_n_from_queue(Q, N - 1, [Query | Acc]) end. -drop_first_n_from_queue(Q, 0, _Id) -> +drop_first_n_from_queue(Q, 0, _Id, _Index) -> Q; -drop_first_n_from_queue(Q, N, Id) when N > 0 -> - drop_first_n_from_queue(drop_head(Q, Id), N - 1, Id). +drop_first_n_from_queue(Q, N, Id, Index) when N > 0 -> + drop_first_n_from_queue(drop_head(Q, Id, Index), N - 1, Id, Index). -drop_head(Q, Id) -> - {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), - ok = replayq:ack(Q1, AckRef), - emqx_resource_metrics:queuing_change(Id, -1), - Q1. +drop_head(Q, Id, Index) -> + {NewQ, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), + ok = replayq:ack(NewQ, AckRef), + emqx_resource_metrics:queuing_set(Id, Index, replayq:count(NewQ)), + NewQ. %%============================================================================== %% the inflight queue for async query -define(SIZE_REF, -1). -inflight_new(Name, InfltWinSZ) -> +inflight_new(Name, InfltWinSZ, Id, Index) -> _ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]), - inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}), + inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}, Id, Index), ok. inflight_get_first(Name) -> @@ -617,27 +660,39 @@ inflight_is_full(undefined) -> false; inflight_is_full(Name) -> [{_, {max_size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF), + Size = inflight_size(Name), + Size >= MaxSize. + +inflight_size(Name) -> + %% Note: we subtract 1 because there's a metadata row that hold + %% the maximum size value. + MetadataRowCount = 1, case ets:info(Name, size) of - Size when Size > MaxSize -> true; - _ -> false + undefined -> 0; + Size -> max(0, Size - MetadataRowCount) end. -inflight_append(undefined, _Ref, _Query) -> +inflight_append(undefined, _Ref, _Query, _Id, _Index) -> ok; -inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch) -> +inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch, Id, Index) -> ets:insert(Name, {Ref, [?QUERY(From, Req, true) || ?QUERY(From, Req, _) <- Batch]}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)), ok; -inflight_append(Name, Ref, ?QUERY(From, Req, _)) -> +inflight_append(Name, Ref, ?QUERY(From, Req, _), Id, Index) -> ets:insert(Name, {Ref, ?QUERY(From, Req, true)}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)), ok; -inflight_append(Name, Ref, Data) -> +inflight_append(Name, Ref, Data, _Id, _Index) -> ets:insert(Name, {Ref, Data}), + %% this is a metadata row being inserted; therefore, we don't bump + %% the inflight metric. ok. -inflight_drop(undefined, _) -> +inflight_drop(undefined, _, _Id, _Index) -> ok; -inflight_drop(Name, Ref) -> +inflight_drop(Name, Ref, Id, Index) -> ets:delete(Name, Ref), + emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)), ok. %%============================================================================== diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 107ca2a93..8a8b5aec2 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -39,6 +39,7 @@ groups() -> init_per_testcase(_, Config) -> emqx_connector_demo:set_callback_mode(always_sync), Config. + end_per_testcase(_, _Config) -> _ = emqx_resource:remove(?ID). @@ -503,7 +504,10 @@ t_stop_start(_) -> ), %% add some metrics to test their persistence - emqx_resource_metrics:batching_change(?ID, 5), + WorkerID0 = <<"worker:0">>, + WorkerID1 = <<"worker:1">>, + emqx_resource_metrics:batching_set(?ID, WorkerID0, 2), + emqx_resource_metrics:batching_set(?ID, WorkerID1, 3), ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)), {ok, _} = emqx_resource:check_and_recreate( @@ -537,7 +541,8 @@ t_stop_start(_) -> ?assert(is_process_alive(Pid1)), %% now stop while resetting the metrics - emqx_resource_metrics:batching_change(?ID, 5), + emqx_resource_metrics:batching_set(?ID, WorkerID0, 1), + emqx_resource_metrics:batching_set(?ID, WorkerID1, 4), ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)), ok = emqx_resource:stop(?ID), ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)), diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 6145716f2..a4ee994b5 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -315,11 +315,11 @@ handle_telemetry_event( emqx_resource_metrics:dropped_queue_full_inc(ID, Val); handle_telemetry_event( [wolff, queuing], - #{counter_inc := Val}, - #{bridge_id := ID}, + #{counter_inc := _Val}, + #{bridge_id := ID, partition_id := PartitionID}, _HandlerConfig -) when is_integer(Val) -> - emqx_resource_metrics:queuing_change(ID, Val); +) when is_integer(_Val) -> + emqx_resource_metrics:queuing_set(ID, PartitionID, 0); handle_telemetry_event( [wolff, retried], #{counter_inc := Val}, @@ -336,11 +336,11 @@ handle_telemetry_event( emqx_resource_metrics:failed_inc(ID, Val); handle_telemetry_event( [wolff, inflight], - #{counter_inc := Val}, - #{bridge_id := ID}, + #{counter_inc := _Val}, + #{bridge_id := ID, partition_id := PartitionID}, _HandlerConfig -) when is_integer(Val) -> - emqx_resource_metrics:inflight_change(ID, Val); +) when is_integer(_Val) -> + emqx_resource_metrics:inflight_set(ID, PartitionID, 0); handle_telemetry_event( [wolff, retried_failed], #{counter_inc := Val}, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 0f4500a7d..4912dca30 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -124,6 +124,7 @@ init_per_testcase(TestCase, Config0) when delete_all_bridges(), Tid = install_telemetry_handler(TestCase), Config = generate_config(Config0), + put(telemetry_table, Tid), [{telemetry_table, Tid} | Config]; false -> {skip, no_batching} @@ -133,6 +134,7 @@ init_per_testcase(TestCase, Config0) -> delete_all_bridges(), Tid = install_telemetry_handler(TestCase), Config = generate_config(Config0), + put(telemetry_table, Tid), [{telemetry_table, Tid} | Config]. end_per_testcase(_TestCase, _Config) -> @@ -393,7 +395,11 @@ assert_metrics(ExpectedMetrics, ResourceId) -> maps:keys(ExpectedMetrics) ), CurrentMetrics = current_metrics(ResourceId), - ?assertEqual(ExpectedMetrics, Metrics, #{current_metrics => CurrentMetrics}), + TelemetryTable = get(telemetry_table), + RecordedEvents = ets:tab2list(TelemetryTable), + ?assertEqual(ExpectedMetrics, Metrics, #{ + current_metrics => CurrentMetrics, recorded_events => RecordedEvents + }), ok. assert_empty_metrics(ResourceId) -> @@ -554,6 +560,7 @@ t_publish_success(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), + QueryMode = ?config(query_mode, Config), Topic = <<"t/topic">>, ?check_trace( create_bridge(Config), @@ -582,6 +589,17 @@ t_publish_success(Config) -> ), %% to avoid test flakiness wait_telemetry_event(TelemetryTable, success, ResourceId), + ExpectedInflightEvents = + case QueryMode of + sync -> 1; + async -> 3 + end, + wait_telemetry_event( + TelemetryTable, + inflight, + ResourceId, + #{n_events => ExpectedInflightEvents, timeout => 5_000} + ), assert_metrics( #{ batching => 0, @@ -601,6 +619,7 @@ t_publish_success_local_topic(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), + QueryMode = ?config(query_mode, Config), LocalTopic = <<"local/topic">>, {ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}), assert_empty_metrics(ResourceId), @@ -619,6 +638,17 @@ t_publish_success_local_topic(Config) -> ), %% to avoid test flakiness wait_telemetry_event(TelemetryTable, success, ResourceId), + ExpectedInflightEvents = + case QueryMode of + sync -> 1; + async -> 3 + end, + wait_telemetry_event( + TelemetryTable, + inflight, + ResourceId, + #{n_events => ExpectedInflightEvents, timeout => 5_000} + ), assert_metrics( #{ batching => 0, @@ -649,6 +679,7 @@ t_publish_templated(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), + QueryMode = ?config(query_mode, Config), Topic = <<"t/topic">>, PayloadTemplate = << "{\"payload\": \"${payload}\"," @@ -694,6 +725,17 @@ t_publish_templated(Config) -> ), %% to avoid test flakiness wait_telemetry_event(TelemetryTable, success, ResourceId), + ExpectedInflightEvents = + case QueryMode of + sync -> 1; + async -> 3 + end, + wait_telemetry_event( + TelemetryTable, + inflight, + ResourceId, + #{n_events => ExpectedInflightEvents, timeout => 5_000} + ), assert_metrics( #{ batching => 0, @@ -1053,7 +1095,7 @@ do_econnrefused_or_timeout_test(Config, Error) -> %% response expired, this succeeds. {econnrefused, async, _} -> wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{ - timeout => 10_000, n_events => 2 + timeout => 10_000, n_events => 1 }), CurrentMetrics = current_metrics(ResourceId), RecordedEvents = ets:tab2list(TelemetryTable), @@ -1290,6 +1332,17 @@ t_unrecoverable_error(Config) -> end ), wait_telemetry_event(TelemetryTable, failed, ResourceId), + ExpectedInflightEvents = + case QueryMode of + sync -> 1; + async -> 3 + end, + wait_telemetry_event( + TelemetryTable, + inflight, + ResourceId, + #{n_events => ExpectedInflightEvents, timeout => 5_000} + ), assert_metrics( #{ batching => 0, From 61246c43c48f124e8e4ff97e2fc1c3cad61d61ba Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 29 Dec 2022 17:02:14 -0300 Subject: [PATCH 7/9] fix(kakfa_producer): prevent multiple producers from multiplying each other's metrics --- .../kafka/emqx_bridge_impl_kafka_producer.erl | 61 +++++++++++-------- .../emqx_bridge_impl_kafka_producer_SUITE.erl | 4 +- 2 files changed, 39 insertions(+), 26 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index a4ee994b5..94aff0714 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -33,7 +33,10 @@ on_start(InstId, Config) -> authentication := Auth, ssl := SSL } = Config, - _ = maybe_install_wolff_telemetry_handlers(InstId), + %% TODO: change this to `kafka_producer` after refactoring for kafka_consumer + BridgeType = kafka, + ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), + _ = maybe_install_wolff_telemetry_handlers(InstId, ResourceID), %% it's a bug if producer config is not found %% the caller should not try to start a producer if %% there is no producer config @@ -137,7 +140,7 @@ on_query(_InstId, {send_message, Message}, #{message_template := Template, produ %% If the producer process is down when sending, this function would %% raise an error exception which is to be caught by the caller of this callback {_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}), - ok. + {async_return, ok}. compile_message_template(#{ key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate @@ -299,62 +302,72 @@ get_required(Field, Config, Throw) -> Value =:= none andalso throw(Throw), Value. +%% we *must* match the bridge id in the event metadata with that in +%% the handler config; otherwise, multiple kafka producer bridges will +%% install multiple handlers to the same wolff events, multiplying the handle_telemetry_event( [wolff, dropped], #{counter_inc := Val}, #{bridge_id := ID}, - _HandlerConfig + #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:dropped_inc(ID, Val); handle_telemetry_event( [wolff, dropped_queue_full], #{counter_inc := Val}, #{bridge_id := ID}, - _HandlerConfig + #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:dropped_queue_full_inc(ID, Val); handle_telemetry_event( [wolff, queuing], - #{counter_inc := _Val}, + #{gauge_set := Val}, #{bridge_id := ID, partition_id := PartitionID}, - _HandlerConfig -) when is_integer(_Val) -> - emqx_resource_metrics:queuing_set(ID, PartitionID, 0); + #{bridge_id := ID} +) when is_integer(Val) -> + emqx_resource_metrics:queuing_set(ID, PartitionID, Val); handle_telemetry_event( [wolff, retried], #{counter_inc := Val}, #{bridge_id := ID}, - _HandlerConfig + #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:retried_inc(ID, Val); handle_telemetry_event( [wolff, failed], #{counter_inc := Val}, #{bridge_id := ID}, - _HandlerConfig + #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:failed_inc(ID, Val); handle_telemetry_event( [wolff, inflight], - #{counter_inc := _Val}, + #{gauge_set := Val}, #{bridge_id := ID, partition_id := PartitionID}, - _HandlerConfig -) when is_integer(_Val) -> - emqx_resource_metrics:inflight_set(ID, PartitionID, 0); + #{bridge_id := ID} +) when is_integer(Val) -> + emqx_resource_metrics:inflight_set(ID, PartitionID, Val); handle_telemetry_event( [wolff, retried_failed], #{counter_inc := Val}, #{bridge_id := ID}, - _HandlerConfig + #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:retried_failed_inc(ID, Val); handle_telemetry_event( [wolff, retried_success], #{counter_inc := Val}, #{bridge_id := ID}, - _HandlerConfig + #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:retried_success_inc(ID, Val); +handle_telemetry_event( + [wolff, success], + #{counter_inc := Val}, + #{bridge_id := ID}, + #{bridge_id := ID} +) when is_integer(Val) -> + emqx_resource_metrics:success_inc(ID, Val); handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% Event that we do not handle ok. @@ -367,17 +380,12 @@ uninstall_telemetry_handlers(InstanceID) -> HandlerID = telemetry_handler_id(InstanceID), telemetry:detach(HandlerID). -maybe_install_wolff_telemetry_handlers(InstanceID) -> +maybe_install_wolff_telemetry_handlers(InstanceID, ResourceID) -> %% Attach event handlers for Kafka telemetry events. If a handler with the %% handler id already exists, the attach_many function does nothing telemetry:attach_many( %% unique handler id telemetry_handler_id(InstanceID), - %% Note: we don't handle `[wolff, success]' because, - %% currently, we already increment the success counter for - %% this resource at `emqx_rule_runtime:handle_action' when - %% the response is `ok' and we would double increment it - %% here. [ [wolff, dropped], [wolff, dropped_queue_full], @@ -386,8 +394,13 @@ maybe_install_wolff_telemetry_handlers(InstanceID) -> [wolff, failed], [wolff, inflight], [wolff, retried_failed], - [wolff, retried_success] + [wolff, retried_success], + [wolff, success] ], fun ?MODULE:handle_telemetry_event/4, - [] + %% we *must* keep track of the same id that is handed down to + %% wolff producers; otherwise, multiple kafka producer bridges + %% will install multiple handlers to the same wolff events, + %% multiplying the metric counts... + #{bridge_id => ResourceID} ). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 44149826d..8ada818e2 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -390,7 +390,7 @@ t_failed_creation_then_fix(_Config) -> }, {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), ct:pal("base offset before testing ~p", [Offset]), - ?assertEqual(ok, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)), + ?assertEqual({async_return, ok}, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)), {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), %% TODO: refactor those into init/end per testcase @@ -455,7 +455,7 @@ publish_helper(#{ StartRes = ?PRODUCER:on_start(InstId, Conf), {ok, State} = StartRes, OnQueryRes = ?PRODUCER:on_query(InstId, {send_message, Msg}, State), - ok = OnQueryRes, + {async_return, ok} = OnQueryRes, {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), ok = ?PRODUCER:on_stop(InstId, State), From 305ed68916aef0e579e847bacd6febb742cd2ffe Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 29 Dec 2022 18:04:54 -0300 Subject: [PATCH 8/9] chore: bump app vsns --- apps/emqx/src/emqx.app.src | 2 +- apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- apps/emqx_resource/src/emqx_resource.app.src | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index c281b11cc..bd7617e74 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -3,7 +3,7 @@ {id, "emqx"}, {description, "EMQX Core"}, % strict semver, bump manually! - {vsn, "5.0.13"}, + {vsn, "5.0.14"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 86ab01a97..89fb7adaf 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, []}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 78f5d8342..00389261b 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ From 69ac6b9e0dbeb8858019fd92dc33ed700b5e3cf4 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 2 Jan 2023 09:23:38 -0300 Subject: [PATCH 9/9] fix(kafka): fix handling of `dropped.queue_full` event from wolff https://emqx.atlassian.net/browse/EMQX-8530 https://github.com/kafka4beam/wolff/blob/cd20a37e658f4ae3d1468ca20e7d302822ee85dd/src/wolff_producer.erl#L772-L773 Wolff emits 2 events related to dropped messages when replayq reports overflow. Since in EMQX's side we also bump `dropped` when `dropped_queue_full` happens, that was leading to wrong metrics. --- .../src/kafka/emqx_bridge_impl_kafka_producer.erl | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 94aff0714..ada443019 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -318,7 +318,14 @@ handle_telemetry_event( #{bridge_id := ID}, #{bridge_id := ID} ) when is_integer(Val) -> - emqx_resource_metrics:dropped_queue_full_inc(ID, Val); + %% When wolff emits a `dropped_queue_full' event due to replayq + %% overflow, it also emits a `dropped' event (at the time of + %% writing, wolff is 1.7.4). Since we already bump `dropped' when + %% `dropped.queue_full' occurs, we have to correct it here. This + %% correction will have to be dropped if wolff stops also emitting + %% `dropped'. + emqx_resource_metrics:dropped_queue_full_inc(ID, Val), + emqx_resource_metrics:dropped_inc(ID, -Val); handle_telemetry_event( [wolff, queuing], #{gauge_set := Val},