refactor(metrics): use absolute gauge values rather than deltas

https://emqx.atlassian.net/browse/EMQX-8548

Currently, we face several issues trying to keep resource metrics
reasonable.  For example, when a resource is re-created and has its
metrics reset, but then its durable queue resumes its previous work
and leads to strange (often negative) metrics.

Instead using `counters` that are shared by more than one worker to
manage gauges, we introduce an ETS table whose key is not only scoped
by the Resource ID as before, but also by the worker ID.  This way,
when a worker starts/terminates, they should set their own gauges to
their values (often 0 or `replayq:count` when resuming off a queue).
With this scoping and initialization procedure, we'll hopefully avoid
hitting those strange metrics scenarios and have better control over
the gauges.
This commit is contained in:
Thales Macedo Garitezi 2022-12-27 10:28:08 -03:00
parent 24bae2641e
commit 8b060a75f1
11 changed files with 625 additions and 179 deletions

View File

@ -18,6 +18,8 @@
-behaviour(gen_server).
-include_lib("stdlib/include/ms_transform.hrl").
%% API functions
-export([
start_link/1,
@ -30,6 +32,11 @@
inc/3,
inc/4,
get/3,
get_gauge/3,
set_gauge/5,
shift_gauge/5,
get_gauges/2,
delete_gauges/2,
get_rate/2,
get_counters/2,
create_metrics/3,
@ -68,14 +75,21 @@
last5m := float()
}.
-type metrics() :: #{
counters := #{atom() => integer()},
rate := #{atom() => rate()}
counters := #{metric_name() => integer()},
gauges := #{metric_name() => integer()},
rate := #{metric_name() => rate()}
}.
-type handler_name() :: atom().
%% metric_id() is actually a resource id
-type metric_id() :: binary() | atom().
-type metric_name() :: atom().
-type worker_id() :: term().
-define(CntrRef(Name), {?MODULE, Name}).
-define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)).
-define(GAUGE_TABLE(NAME),
list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(NAME) ++ "_gauge")
).
-record(rate, {
max = 0 :: number(),
@ -112,11 +126,12 @@ child_spec(ChldName, Name) ->
modules => [emqx_metrics_worker]
}.
-spec create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}.
-spec create_metrics(handler_name(), metric_id(), [metric_name()]) -> ok | {error, term()}.
create_metrics(Name, Id, Metrics) ->
create_metrics(Name, Id, Metrics, Metrics).
-spec create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok | {error, term()}.
-spec create_metrics(handler_name(), metric_id(), [metric_name()], [metric_name()]) ->
ok | {error, term()}.
create_metrics(Name, Id, Metrics, RateMetrics) ->
gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}).
@ -135,7 +150,7 @@ has_metrics(Name, Id) ->
_ -> true
end.
-spec get(handler_name(), metric_id(), atom() | integer()) -> number().
-spec get(handler_name(), metric_id(), metric_name() | integer()) -> number().
get(Name, Id, Metric) ->
case get_ref(Name, Id) of
not_found ->
@ -167,16 +182,102 @@ reset_counters(Name, Id) ->
-spec get_metrics(handler_name(), metric_id()) -> metrics().
get_metrics(Name, Id) ->
#{rate => get_rate(Name, Id), counters => get_counters(Name, Id)}.
#{
rate => get_rate(Name, Id),
counters => get_counters(Name, Id),
gauges => get_gauges(Name, Id)
}.
-spec inc(handler_name(), metric_id(), atom()) -> ok.
inc(Name, Id, Metric) ->
inc(Name, Id, Metric, 1).
-spec inc(handler_name(), metric_id(), atom(), integer()) -> ok.
-spec inc(handler_name(), metric_id(), metric_name(), integer()) -> ok.
inc(Name, Id, Metric, Val) ->
counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val).
-spec set_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok.
set_gauge(Name, Id, WorkerId, Metric, Val) ->
Table = ?GAUGE_TABLE(Name),
try
true = ets:insert(Table, {{Id, Metric, WorkerId}, Val}),
ok
catch
error:badarg ->
ok
end.
-spec shift_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok.
shift_gauge(Name, Id, WorkerId, Metric, Val) ->
Table = ?GAUGE_TABLE(Name),
try
_ = ets:update_counter(
Table,
{Id, Metric, WorkerId},
Val,
{{Id, Metric, WorkerId}, 0}
),
ok
catch
error:badarg ->
ok
end.
-spec get_gauge(handler_name(), metric_id(), metric_name()) -> integer().
get_gauge(Name, Id, Metric) ->
Table = ?GAUGE_TABLE(Name),
MatchSpec =
ets:fun2ms(
fun({{Id0, Metric0, _WorkerId}, Val}) when Id0 =:= Id, Metric0 =:= Metric ->
Val
end
),
try
lists:sum(ets:select(Table, MatchSpec))
catch
error:badarg ->
0
end.
-spec get_gauges(handler_name(), metric_id()) -> map().
get_gauges(Name, Id) ->
Table = ?GAUGE_TABLE(Name),
MatchSpec =
ets:fun2ms(
fun({{Id0, Metric, _WorkerId}, Val}) when Id0 =:= Id ->
{Metric, Val}
end
),
try
lists:foldr(
fun({Metric, Val}, Acc) ->
maps:update_with(Metric, fun(X) -> X + Val end, Val, Acc)
end,
#{},
ets:select(Table, MatchSpec)
)
catch
error:badarg ->
#{}
end.
-spec delete_gauges(handler_name(), metric_id()) -> ok.
delete_gauges(Name, Id) ->
Table = ?GAUGE_TABLE(Name),
MatchSpec =
ets:fun2ms(
fun({{Id0, _Metric, _WorkerId}, _Val}) when Id0 =:= Id ->
true
end
),
try
_ = ets:select_delete(Table, MatchSpec),
ok
catch
error:badarg ->
ok
end.
start_link(Name) ->
gen_server:start_link({local, Name}, ?MODULE, Name, []).
@ -185,6 +286,7 @@ init(Name) ->
%% the rate metrics
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
persistent_term:put(?CntrRef(Name), #{}),
_ = ets:new(?GAUGE_TABLE(Name), [named_table, ordered_set, public, {write_concurrency, true}]),
{ok, #state{}}.
handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) ->
@ -220,7 +322,10 @@ handle_call(
_From,
State = #state{metric_ids = MIDs, rates = Rates}
) ->
{reply, delete_counters(get_self_name(), Id), State#state{
Name = get_self_name(),
delete_counters(Name, Id),
delete_gauges(Name, Id),
{reply, ok, State#state{
metric_ids = sets:del_element(Id, MIDs),
rates =
case Rates of
@ -233,7 +338,9 @@ handle_call(
_From,
State = #state{rates = Rates}
) ->
{reply, reset_counters(get_self_name(), Id), State#state{
Name = get_self_name(),
delete_gauges(Name, Id),
{reply, reset_counters(Name, Id), State#state{
rates =
case Rates of
undefined ->

View File

@ -47,7 +47,8 @@ end_per_testcase(_, _Config) ->
t_get_metrics(_) ->
Metrics = [a, b, c],
ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics),
Id = <<"testid">>,
ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics),
%% all the metrics are set to zero at start
?assertMatch(
#{
@ -56,18 +57,22 @@ t_get_metrics(_) ->
b := #{current := 0.0, max := 0.0, last5m := 0.0},
c := #{current := 0.0, max := 0.0, last5m := 0.0}
},
gauges := #{},
counters := #{
a := 0,
b := 0,
c := 0
}
},
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
emqx_metrics_worker:get_metrics(?NAME, Id)
),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
ok = emqx_metrics_worker:inc(?NAME, Id, a),
ok = emqx_metrics_worker:inc(?NAME, Id, b),
ok = emqx_metrics_worker:inc(?NAME, Id, c),
ok = emqx_metrics_worker:inc(?NAME, Id, c),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9),
ct:sleep(1500),
?LET(
#{
@ -76,27 +81,73 @@ t_get_metrics(_) ->
b := #{current := CurrB, max := MaxB, last5m := _},
c := #{current := CurrC, max := MaxC, last5m := _}
},
gauges := #{
inflight := Inflight,
queuing := Queuing
},
counters := #{
a := 1,
b := 1,
c := 2
}
},
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>),
emqx_metrics_worker:get_metrics(?NAME, Id),
{
?assert(CurrA > 0),
?assert(CurrB > 0),
?assert(CurrC > 0),
?assert(MaxA > 0),
?assert(MaxB > 0),
?assert(MaxC > 0)
?assert(MaxC > 0),
?assert(Inflight == 12),
?assert(Queuing == 9)
}
),
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>).
ok = emqx_metrics_worker:clear_metrics(?NAME, Id).
t_clear_metrics(_Config) ->
Metrics = [a, b, c],
Id = <<"testid">>,
ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics),
?assertMatch(
#{
rate := #{
a := #{current := 0.0, max := 0.0, last5m := 0.0},
b := #{current := 0.0, max := 0.0, last5m := 0.0},
c := #{current := 0.0, max := 0.0, last5m := 0.0}
},
gauges := #{},
counters := #{
a := 0,
b := 0,
c := 0
}
},
emqx_metrics_worker:get_metrics(?NAME, Id)
),
ok = emqx_metrics_worker:inc(?NAME, Id, a),
ok = emqx_metrics_worker:inc(?NAME, Id, b),
ok = emqx_metrics_worker:inc(?NAME, Id, c),
ok = emqx_metrics_worker:inc(?NAME, Id, c),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9),
ct:sleep(1500),
ok = emqx_metrics_worker:clear_metrics(?NAME, Id),
?assertEqual(
#{
counters => #{},
gauges => #{},
rate => #{current => 0.0, last5m => 0.0, max => 0.0}
},
emqx_metrics_worker:get_metrics(?NAME, Id)
),
ok.
t_reset_metrics(_) ->
Metrics = [a, b, c],
ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics),
Id = <<"testid">>,
ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics),
%% all the metrics are set to zero at start
?assertMatch(
#{
@ -105,20 +156,24 @@ t_reset_metrics(_) ->
b := #{current := 0.0, max := 0.0, last5m := 0.0},
c := #{current := 0.0, max := 0.0, last5m := 0.0}
},
gauges := #{},
counters := #{
a := 0,
b := 0,
c := 0
}
},
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
emqx_metrics_worker:get_metrics(?NAME, Id)
),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
ok = emqx_metrics_worker:inc(?NAME, Id, a),
ok = emqx_metrics_worker:inc(?NAME, Id, b),
ok = emqx_metrics_worker:inc(?NAME, Id, c),
ok = emqx_metrics_worker:inc(?NAME, Id, c),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9),
ct:sleep(1500),
ok = emqx_metrics_worker:reset_metrics(?NAME, <<"testid">>),
ok = emqx_metrics_worker:reset_metrics(?NAME, Id),
?LET(
#{
rate := #{
@ -126,68 +181,83 @@ t_reset_metrics(_) ->
b := #{current := CurrB, max := MaxB, last5m := _},
c := #{current := CurrC, max := MaxC, last5m := _}
},
gauges := Gauges,
counters := #{
a := 0,
b := 0,
c := 0
}
},
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>),
emqx_metrics_worker:get_metrics(?NAME, Id),
{
?assert(CurrA == 0),
?assert(CurrB == 0),
?assert(CurrC == 0),
?assert(MaxA == 0),
?assert(MaxB == 0),
?assert(MaxC == 0)
?assert(MaxC == 0),
?assertEqual(#{}, Gauges)
}
),
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>).
ok = emqx_metrics_worker:clear_metrics(?NAME, Id).
t_get_metrics_2(_) ->
Metrics = [a, b, c],
Id = <<"testid">>,
ok = emqx_metrics_worker:create_metrics(
?NAME,
<<"testid">>,
Id,
Metrics,
[a]
),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
ok = emqx_metrics_worker:inc(?NAME, Id, a),
ok = emqx_metrics_worker:inc(?NAME, Id, b),
ok = emqx_metrics_worker:inc(?NAME, Id, c),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9),
?assertMatch(
#{
rate := Rate = #{
a := #{current := _, max := _, last5m := _}
},
gauges := #{},
counters := #{
a := 1,
b := 1,
c := 1
}
} when map_size(Rate) =:= 1,
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
emqx_metrics_worker:get_metrics(?NAME, Id)
),
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>).
ok = emqx_metrics_worker:clear_metrics(?NAME, Id).
t_recreate_metrics(_) ->
ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a]),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a),
Id = <<"testid">>,
ok = emqx_metrics_worker:create_metrics(?NAME, Id, [a]),
ok = emqx_metrics_worker:inc(?NAME, Id, a),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9),
?assertMatch(
#{
rate := R = #{
a := #{current := _, max := _, last5m := _}
},
gauges := #{
inflight := 12,
queuing := 9
},
counters := C = #{
a := 1
}
} when map_size(R) == 1 andalso map_size(C) == 1,
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
emqx_metrics_worker:get_metrics(?NAME, Id)
),
%% we create the metrics again, to add some counters
ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a, b, c]),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b),
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
ok = emqx_metrics_worker:create_metrics(?NAME, Id, [a, b, c]),
ok = emqx_metrics_worker:inc(?NAME, Id, b),
ok = emqx_metrics_worker:inc(?NAME, Id, c),
?assertMatch(
#{
rate := R = #{
@ -195,13 +265,17 @@ t_recreate_metrics(_) ->
b := #{current := _, max := _, last5m := _},
c := #{current := _, max := _, last5m := _}
},
gauges := #{
inflight := 12,
queuing := 9
},
counters := C = #{
a := 1, b := 1, c := 1
}
} when map_size(R) == 3 andalso map_size(C) == 3,
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
emqx_metrics_worker:get_metrics(?NAME, Id)
),
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>).
ok = emqx_metrics_worker:clear_metrics(?NAME, Id).
t_inc_matched(_) ->
Metrics = ['rules.matched'],
@ -238,3 +312,102 @@ t_rate(_) ->
ct:sleep(3000),
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule1">>),
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule:2">>).
t_get_gauge(_Config) ->
Metric = 'queueing',
%% unknown handler name (inexistent table)
?assertEqual(0, emqx_metrics_worker:get_gauge(unknown_name, unknown_id, Metric)),
%% unknown resource id
?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, unknown_id, Metric)),
Id = <<"some id">>,
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2),
?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, unknown, Metric)),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3),
?assertEqual(5, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 1),
?assertEqual(4, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, Id, another_metric)),
ok.
t_get_gauges(_Config) ->
%% unknown handler name (inexistent table)
?assertEqual(#{}, emqx_metrics_worker:get_gauges(unknown_name, unknown_id)),
%% unknown resource id
?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, unknown_id)),
Metric = 'queuing',
Id = <<"some id">>,
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2),
?assertEqual(#{queuing => 2}, emqx_metrics_worker:get_gauges(?NAME, Id)),
?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, unknown)),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3),
?assertEqual(#{queuing => 5}, emqx_metrics_worker:get_gauges(?NAME, Id)),
AnotherMetric = 'inflight',
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 1),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, AnotherMetric, 10),
?assertEqual(#{queuing => 4, inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, Id)),
ok.
t_delete_gauge(_Config) ->
%% unknown handler name (inexistent table)
?assertEqual(ok, emqx_metrics_worker:delete_gauges(unknown_name, unknown_id)),
%% unknown resource id
?assertEqual(ok, emqx_metrics_worker:delete_gauges(?NAME, unknown_id)),
Metric = 'queuing',
AnotherMetric = 'inflight',
Id = <<"some id">>,
AnotherId = <<"another id">>,
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3),
ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, AnotherMetric, 10),
ok = emqx_metrics_worker:set_gauge(?NAME, AnotherId, worker_id1, AnotherMetric, 10),
?assertEqual(#{queuing => 5, inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, Id)),
?assertEqual(ok, emqx_metrics_worker:delete_gauges(?NAME, Id)),
?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, Id)),
?assertEqual(#{inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, AnotherId)),
ok.
t_shift_gauge(_Config) ->
Metric = 'queueing',
Id = <<"some id">>,
AnotherId = <<"another id">>,
%% unknown handler name (inexistent table)
?assertEqual(
ok, emqx_metrics_worker:shift_gauge(unknown_name, unknown_id, worker_id0, Metric, 2)
),
?assertEqual(0, emqx_metrics_worker:get_gauge(unknown_name, unknown_id, Metric)),
%% empty resource id
?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id0, Metric, 2)),
?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, AnotherId, worker_id0, Metric, 2)),
?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, AnotherId, Metric)),
?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id0, Metric, 3)),
?assertEqual(5, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id1, Metric, 10)),
?assertEqual(15, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id1, Metric, -4)),
?assertEqual(11, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, AnotherId, Metric)),
ok.

View File

@ -195,7 +195,7 @@ emqx_bridge_schema {
metric_sent_inflight {
desc {
en: """Count of messages that were sent asynchronously but ACKs are not received."""
en: """Count of messages that were sent asynchronously but ACKs are not yet received."""
zh: """已异步地发送但没有收到 ACK 的消息个数。"""
}
label: {

View File

@ -686,7 +686,6 @@ format_resp(
format_metrics(#{
counters := #{
'batching' := Batched,
'dropped' := Dropped,
'dropped.other' := DroppedOther,
'dropped.queue_full' := DroppedQueueFull,
@ -694,17 +693,19 @@ format_metrics(#{
'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
'queuing' := Queued,
'retried' := Retried,
'failed' := SentFailed,
'inflight' := SentInflight,
'success' := SentSucc,
'received' := Rcvd
},
gauges := Gauges,
rate := #{
matched := #{current := Rate, last5m := Rate5m, max := RateMax}
}
}) ->
Batched = maps:get('batching', Gauges, 0),
Queued = maps:get('queuing', Gauges, 0),
SentInflight = maps:get('inflight', Gauges, 0),
?METRICS(
Batched,
Dropped,

View File

@ -19,6 +19,7 @@
-compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/4, uri/1]).
-import(emqx_common_test_helpers, [on_exit/1]).
-include("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
@ -124,6 +125,7 @@ init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, _Config) ->
clear_resources(),
emqx_common_test_helpers:call_janitor(),
ok.
clear_resources() ->
@ -672,6 +674,12 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"name">> := ?BRIDGE_NAME_EGRESS
} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
on_exit(fun() ->
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok
end),
%% we now test if the bridge works as expected
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
@ -733,15 +741,20 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
%% verify the metrics of the bridge, the message should be queued
{ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
Decoded1 = jsx:decode(BridgeStr1),
?assertMatch(
Status when (Status == <<"connected">> orelse Status == <<"connecting">>),
maps:get(<<"status">>, Decoded1)
),
%% matched >= 3 because of possible retries.
?assertMatch(
#{
<<"status">> := Status,
<<"metrics">> := #{
<<"matched">> := Matched, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2
}
} when Matched >= 3 andalso (Status == <<"connected">> orelse Status == <<"connecting">>),
jsx:decode(BridgeStr1)
<<"matched">> := Matched,
<<"success">> := 1,
<<"failed">> := 0,
<<"queuing">> := 2
} when Matched >= 3,
maps:get(<<"metrics">>, Decoded1)
),
%% start the listener 1883 to make the bridge reconnected
@ -766,9 +779,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
%% also verify the 2 messages have been sent to the remote broker
assert_mqtt_msg_received(RemoteTopic, Payload1),
assert_mqtt_msg_received(RemoteTopic, Payload2),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
assert_mqtt_msg_received(Topic, Payload) ->

View File

@ -141,9 +141,6 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
'dropped.resource_not_found',
'dropped.resource_stopped',
'dropped.other',
'queuing',
'batching',
'inflight',
'received'
],
[matched]

View File

@ -24,11 +24,12 @@
]).
-export([
batching_change/2,
batching_set/3,
batching_shift/3,
batching_get/1,
inflight_change/2,
inflight_set/3,
inflight_get/1,
queuing_change/2,
queuing_set/3,
queuing_get/1,
dropped_inc/1,
dropped_inc/2,
@ -114,8 +115,6 @@ handle_telemetry_event(
_HandlerConfig
) ->
case Event of
batching ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val);
dropped_other ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val);
@ -133,12 +132,8 @@ handle_telemetry_event(
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val);
failed ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val);
inflight ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val);
matched ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val);
queuing ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val);
retried_failed ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val),
@ -152,6 +147,34 @@ handle_telemetry_event(
_ ->
ok
end;
handle_telemetry_event(
[?TELEMETRY_PREFIX, Event],
_Measurements = #{gauge_set := Val},
_Metadata = #{resource_id := ID, worker_id := WorkerID},
_HandlerConfig
) ->
case Event of
batching ->
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val);
inflight ->
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val);
queuing ->
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val);
_ ->
ok
end;
handle_telemetry_event(
[?TELEMETRY_PREFIX, Event],
_Measurements = #{gauge_shift := Val},
_Metadata = #{resource_id := ID, worker_id := WorkerID},
_HandlerConfig
) ->
case Event of
batching ->
emqx_metrics_worker:shift_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val);
_ ->
ok
end;
handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
ok.
@ -160,26 +183,48 @@ handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
%% @doc Count of messages that are currently accumulated in memory waiting for
%% being sent in one batch
batching_change(ID, Val) ->
telemetry:execute([?TELEMETRY_PREFIX, batching], #{counter_inc => Val}, #{resource_id => ID}).
batching_set(ID, WorkerID, Val) ->
telemetry:execute(
[?TELEMETRY_PREFIX, batching],
#{gauge_set => Val},
#{resource_id => ID, worker_id => WorkerID}
).
batching_shift(_ID, _WorkerID = undefined, _Val) ->
ok;
batching_shift(ID, WorkerID, Val) ->
telemetry:execute(
[?TELEMETRY_PREFIX, batching],
#{gauge_shift => Val},
#{resource_id => ID, worker_id => WorkerID}
).
batching_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'batching').
emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'batching').
%% @doc Count of messages that are currently queuing. [Gauge]
queuing_change(ID, Val) ->
telemetry:execute([?TELEMETRY_PREFIX, queuing], #{counter_inc => Val}, #{resource_id => ID}).
%% @doc Count of batches of messages that are currently
%% queuing. [Gauge]
queuing_set(ID, WorkerID, Val) ->
telemetry:execute(
[?TELEMETRY_PREFIX, queuing],
#{gauge_set => Val},
#{resource_id => ID, worker_id => WorkerID}
).
queuing_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing').
emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'queuing').
%% @doc Count of messages that were sent asynchronously but ACKs are not
%% received. [Gauge]
inflight_change(ID, Val) ->
telemetry:execute([?TELEMETRY_PREFIX, inflight], #{counter_inc => Val}, #{resource_id => ID}).
%% @doc Count of batches of messages that were sent asynchronously but
%% ACKs are not yet received. [Gauge]
inflight_set(ID, WorkerID, Val) ->
telemetry:execute(
[?TELEMETRY_PREFIX, inflight],
#{gauge_set => Val},
#{resource_id => ID, worker_id => WorkerID}
).
inflight_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight').
emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'inflight').
%% Counters (value can only got up):
%% --------------------------------------

View File

@ -52,7 +52,7 @@
-export([queue_item_marshaller/1, estimate_size/1]).
-export([reply_after_query/6, batch_reply_after_query/6]).
-export([reply_after_query/7, batch_reply_after_query/7]).
-define(Q_ITEM(REQUEST), {q_item, REQUEST}).
@ -90,13 +90,27 @@ async_query(Id, Request, Opts) ->
%% simple query the resource without batching and queuing messages.
-spec simple_sync_query(id(), request()) -> Result :: term().
simple_sync_query(Id, Request) ->
Result = call_query(sync, Id, ?QUERY(self(), Request, false), #{}),
%% Note: since calling this function implies in bypassing the
%% buffer workers, and each buffer worker index is used when
%% collecting gauge metrics, we use this dummy index. If this
%% call ends up calling buffering functions, that's a bug and
%% would mess up the metrics anyway. `undefined' is ignored by
%% `emqx_resource_metrics:*_shift/3'.
Index = undefined,
Result = call_query(sync, Id, Index, ?QUERY(self(), Request, false), #{}),
_ = handle_query_result(Id, Result, false, false),
Result.
-spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
simple_async_query(Id, Request, ReplyFun) ->
Result = call_query(async, Id, ?QUERY(ReplyFun, Request, false), #{}),
%% Note: since calling this function implies in bypassing the
%% buffer workers, and each buffer worker index is used when
%% collecting gauge metrics, we use this dummy index. If this
%% call ends up calling buffering functions, that's a bug and
%% would mess up the metrics anyway. `undefined' is ignored by
%% `emqx_resource_metrics:*_shift/3'.
Index = undefined,
Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), #{}),
_ = handle_query_result(Id, Result, false, false),
Result.
@ -133,9 +147,11 @@ init({Id, Index, Opts}) ->
false ->
undefined
end,
emqx_resource_metrics:queuing_change(Id, queue_count(Queue)),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
emqx_resource_metrics:batching_set(Id, Index, 0),
emqx_resource_metrics:inflight_set(Id, Index, 0),
InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
ok = inflight_new(Name, InfltWinSZ),
ok = inflight_new(Name, InfltWinSZ, Id, Index),
HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
St = #{
id => Id,
@ -158,10 +174,12 @@ running(cast, resume, _St) ->
keep_state_and_data;
running(cast, block, St) ->
{next_state, blocked, St};
running(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
running(
cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, index := Index, queue := Q} = St
) when
is_list(Batch)
->
Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]),
{next_state, blocked, St#{queue := Q1}};
running({call, From}, {query, Request, _Opts}, St) ->
query_or_acc(From, Request, St);
@ -180,28 +198,39 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
{keep_state_and_data, {state_timeout, ResumeT, resume}};
blocked(cast, block, _St) ->
keep_state_and_data;
blocked(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
blocked(
cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, index := Index, queue := Q} = St
) when
is_list(Batch)
->
Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]),
{keep_state, St#{queue := Q1}};
blocked(cast, resume, St) ->
do_resume(St);
blocked(state_timeout, resume, St) ->
do_resume(St);
blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) ->
blocked({call, From}, {query, Request, _Opts}, #{id := Id, index := Index, queue := Q} = St) ->
Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
_ = reply_caller(Id, ?REPLY(From, Request, false, Error)),
{keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request, false))])}};
blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) ->
{keep_state, St#{
queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(From, Request, false))])
}};
blocked(cast, {query, Request, Opts}, #{id := Id, index := Index, queue := Q} = St) ->
ReplyFun = maps:get(async_reply_fun, Opts, undefined),
Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
_ = reply_caller(Id, ?REPLY(ReplyFun, Request, false, Error)),
{keep_state, St#{
queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))])
queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))])
}}.
terminate(_Reason, #{id := Id, index := Index}) ->
terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
GaugeFns =
[
fun emqx_resource_metrics:batching_set/3,
fun emqx_resource_metrics:inflight_set/3
],
lists:foreach(fun(Fn) -> Fn(Id, Index, 0) end, GaugeFns),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
gproc_pool:disconnect_worker(Id, {Id, Index}).
code_change(_OldVsn, State, _Extra) ->
@ -240,24 +269,33 @@ do_resume(#{id := Id, name := Name} = St) ->
retry_queue(#{queue := undefined} = St) ->
{next_state, running, St};
retry_queue(#{queue := Q, id := Id, enable_batch := false, resume_interval := ResumeT} = St) ->
retry_queue(
#{
queue := Q,
id := Id,
index := Index,
enable_batch := false,
resume_interval := ResumeT
} = St
) ->
case get_first_n_from_queue(Q, 1) of
[] ->
{next_state, running, St};
[?QUERY(_, Request, HasSent) = Query] ->
QueryOpts = #{inflight_name => maps:get(name, St)},
Result = call_query(configured, Id, Query, QueryOpts),
Result = call_query(configured, Id, Index, Query, QueryOpts),
case reply_caller(Id, ?REPLY(undefined, Request, HasSent, Result)) of
true ->
{keep_state, St, {state_timeout, ResumeT, resume}};
false ->
retry_queue(St#{queue := drop_head(Q, Id)})
retry_queue(St#{queue := drop_head(Q, Id, Index)})
end
end;
retry_queue(
#{
queue := Q,
id := Id,
index := Index,
enable_batch := true,
batch_size := BatchSize,
resume_interval := ResumeT
@ -268,7 +306,7 @@ retry_queue(
{next_state, running, St};
Batch0 ->
QueryOpts = #{inflight_name => maps:get(name, St)},
Result = call_query(configured, Id, Batch0, QueryOpts),
Result = call_query(configured, Id, Index, Batch0, QueryOpts),
%% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue,
%% we now change the 'from' field to 'undefined' so it will not reply the caller again.
Batch = [?QUERY(undefined, Request, HasSent) || ?QUERY(_, Request, HasSent) <- Batch0],
@ -276,41 +314,55 @@ retry_queue(
true ->
{keep_state, St, {state_timeout, ResumeT, resume}};
false ->
retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id)})
retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id, Index)})
end
end.
retry_inflight_sync(
Id, Ref, ?QUERY(_, _, HasSent) = Query, Name, #{resume_interval := ResumeT} = St0
Id,
Ref,
?QUERY(_, _, HasSent) = Query,
Name,
#{index := Index, resume_interval := ResumeT} = St0
) ->
Result = call_query(sync, Id, Query, #{}),
Result = call_query(sync, Id, Index, Query, #{}),
case handle_query_result(Id, Result, HasSent, false) of
%% Send failed because resource down
true ->
{keep_state, St0, {state_timeout, ResumeT, resume}};
%% Send ok or failed but the resource is working
false ->
inflight_drop(Name, Ref),
inflight_drop(Name, Ref, Id, Index),
do_resume(St0)
end.
query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) ->
query_or_acc(
From,
Request,
#{
enable_batch := true,
acc := Acc,
acc_left := Left,
index := Index,
id := Id
} = St0
) ->
Acc1 = [?QUERY(From, Request, false) | Acc],
emqx_resource_metrics:batching_change(Id, 1),
emqx_resource_metrics:batching_shift(Id, Index, 1),
St = St0#{acc := Acc1, acc_left := Left - 1},
case Left =< 1 of
true -> flush(St);
false -> {keep_state, ensure_flush_timer(St)}
end;
query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) ->
query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, index := Index} = St) ->
QueryOpts = #{
inflight_name => maps:get(name, St)
},
Result = call_query(configured, Id, ?QUERY(From, Request, false), QueryOpts),
Result = call_query(configured, Id, Index, ?QUERY(From, Request, false), QueryOpts),
case reply_caller(Id, ?REPLY(From, Request, false, Result)) of
true ->
Query = ?QUERY(From, Request, false),
{next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}};
{next_state, blocked, St#{queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query)])}};
false ->
{keep_state, St}
end.
@ -320,6 +372,7 @@ flush(#{acc := []} = St) ->
flush(
#{
id := Id,
index := Index,
acc := Batch0,
batch_size := Size,
queue := Q0
@ -329,12 +382,12 @@ flush(
QueryOpts = #{
inflight_name => maps:get(name, St)
},
emqx_resource_metrics:batching_change(Id, -length(Batch)),
Result = call_query(configured, Id, Batch, QueryOpts),
emqx_resource_metrics:batching_shift(Id, Index, -length(Batch)),
Result = call_query(configured, Id, Index, Batch, QueryOpts),
St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
case batch_reply_caller(Id, Result, Batch) of
true ->
Q1 = maybe_append_queue(Id, Q0, [?Q_ITEM(Query) || Query <- Batch]),
Q1 = maybe_append_queue(Id, Index, Q0, [?Q_ITEM(Query) || Query <- Batch]),
{next_state, blocked, St1#{queue := Q1}};
false ->
{keep_state, St1}
@ -412,7 +465,7 @@ handle_query_result(Id, Result, HasSent, BlockWorker) ->
inc_sent_success(Id, HasSent),
BlockWorker.
call_query(QM0, Id, Query, QueryOpts) ->
call_query(QM0, Id, Index, Query, QueryOpts) ->
?tp(call_query_enter, #{id => Id, query => Query}),
case emqx_resource_manager:ets_lookup(Id) of
{ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} ->
@ -423,7 +476,7 @@ call_query(QM0, Id, Query, QueryOpts) ->
end,
CM = maps:get(callback_mode, Data),
emqx_resource_metrics:matched_inc(Id),
apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts);
apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Query, ResSt, QueryOpts);
{ok, _Group, #{status := stopped}} ->
emqx_resource_metrics:matched_inc(Id),
?RESOURCE_ERROR(stopped, "resource stopped or disabled");
@ -452,10 +505,10 @@ call_query(QM0, Id, Query, QueryOpts) ->
end
).
apply_query_fun(sync, Mod, Id, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) ->
apply_query_fun(sync, Mod, Id, _Index, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) ->
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined),
?APPLY_RESOURCE(
@ -464,21 +517,20 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts)
true ->
{async_return, inflight_full};
false ->
ok = emqx_resource_metrics:inflight_change(Id, 1),
ReplyFun = fun ?MODULE:reply_after_query/6,
ReplyFun = fun ?MODULE:reply_after_query/7,
Ref = make_message_ref(),
Args = [self(), Id, Name, Ref, Query],
ok = inflight_append(Name, Ref, Query),
Args = [self(), Id, Index, Name, Ref, Query],
ok = inflight_append(Name, Ref, Query, Id, Index),
Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
{async_return, Result}
end,
Request
);
apply_query_fun(sync, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) ->
apply_query_fun(sync, Mod, Id, _Index, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) ->
?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined),
?APPLY_RESOURCE(
@ -487,55 +539,46 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts)
true ->
{async_return, inflight_full};
false ->
BatchLen = length(Batch),
ok = emqx_resource_metrics:inflight_change(Id, BatchLen),
ReplyFun = fun ?MODULE:batch_reply_after_query/6,
ReplyFun = fun ?MODULE:batch_reply_after_query/7,
Ref = make_message_ref(),
Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
Args = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]},
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
ok = inflight_append(Name, Ref, Batch),
ok = inflight_append(Name, Ref, Batch, Id, Index),
Result = Mod:on_batch_query_async(Id, Requests, Args, ResSt),
{async_return, Result}
end,
Batch
).
reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request, HasSent), Result) ->
%% NOTE: 'inflight' is message count that sent async but no ACK received,
%% NOT the message number ququed in the inflight window.
emqx_resource_metrics:inflight_change(Id, -1),
reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasSent), Result) ->
%% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the
%% inflight window.
case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of
true ->
%% we marked these messages are 'queuing' although they are actually
%% keeped in inflight window, not replayq
emqx_resource_metrics:queuing_change(Id, 1),
?MODULE:block(Pid);
false ->
drop_inflight_and_resume(Pid, Name, Ref)
drop_inflight_and_resume(Pid, Name, Ref, Id, Index)
end.
batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
%% NOTE: 'inflight' is message count that sent async but no ACK received,
%% NOT the message number ququed in the inflight window.
BatchLen = length(Batch),
emqx_resource_metrics:inflight_change(Id, -BatchLen),
batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) ->
%% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the
%% inflight window.
case batch_reply_caller(Id, Result, Batch) of
true ->
%% we marked these messages are 'queuing' although they are actually
%% kept in inflight window, not replayq
emqx_resource_metrics:queuing_change(Id, BatchLen),
?MODULE:block(Pid);
false ->
drop_inflight_and_resume(Pid, Name, Ref)
drop_inflight_and_resume(Pid, Name, Ref, Id, Index)
end.
drop_inflight_and_resume(Pid, Name, Ref) ->
drop_inflight_and_resume(Pid, Name, Ref, Id, Index) ->
case inflight_is_full(Name) of
true ->
inflight_drop(Name, Ref),
inflight_drop(Name, Ref, Id, Index),
?MODULE:resume(Pid);
false ->
inflight_drop(Name, Ref)
inflight_drop(Name, Ref, Id, Index)
end.
%%==============================================================================
@ -548,10 +591,10 @@ queue_item_marshaller(Bin) when is_binary(Bin) ->
estimate_size(QItem) ->
size(queue_item_marshaller(QItem)).
maybe_append_queue(Id, undefined, _Items) ->
maybe_append_queue(Id, _Index, undefined, _Items) ->
emqx_resource_metrics:dropped_queue_not_enabled_inc(Id),
undefined;
maybe_append_queue(Id, Q, Items) ->
maybe_append_queue(Id, Index, Q, Items) ->
Q2 =
case replayq:overflow(Q) of
Overflow when Overflow =< 0 ->
@ -561,13 +604,13 @@ maybe_append_queue(Id, Q, Items) ->
{Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts),
ok = replayq:ack(Q1, QAckRef),
Dropped = length(Items2),
emqx_resource_metrics:queuing_change(Id, -Dropped),
emqx_resource_metrics:dropped_queue_full_inc(Id),
?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
Q1
end,
emqx_resource_metrics:queuing_change(Id, 1),
replayq:append(Q2, Items).
Q3 = replayq:append(Q2, Items),
emqx_resource_metrics:queuing_set(Id, Index, replayq:count(Q3)),
Q3.
get_first_n_from_queue(Q, N) ->
get_first_n_from_queue(Q, N, []).
@ -580,23 +623,23 @@ get_first_n_from_queue(Q, N, Acc) when N > 0 ->
?Q_ITEM(Query) -> get_first_n_from_queue(Q, N - 1, [Query | Acc])
end.
drop_first_n_from_queue(Q, 0, _Id) ->
drop_first_n_from_queue(Q, 0, _Id, _Index) ->
Q;
drop_first_n_from_queue(Q, N, Id) when N > 0 ->
drop_first_n_from_queue(drop_head(Q, Id), N - 1, Id).
drop_first_n_from_queue(Q, N, Id, Index) when N > 0 ->
drop_first_n_from_queue(drop_head(Q, Id, Index), N - 1, Id, Index).
drop_head(Q, Id) ->
{Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
ok = replayq:ack(Q1, AckRef),
emqx_resource_metrics:queuing_change(Id, -1),
Q1.
drop_head(Q, Id, Index) ->
{NewQ, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
ok = replayq:ack(NewQ, AckRef),
emqx_resource_metrics:queuing_set(Id, Index, replayq:count(NewQ)),
NewQ.
%%==============================================================================
%% the inflight queue for async query
-define(SIZE_REF, -1).
inflight_new(Name, InfltWinSZ) ->
inflight_new(Name, InfltWinSZ, Id, Index) ->
_ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]),
inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}),
inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}, Id, Index),
ok.
inflight_get_first(Name) ->
@ -617,27 +660,39 @@ inflight_is_full(undefined) ->
false;
inflight_is_full(Name) ->
[{_, {max_size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF),
Size = inflight_size(Name),
Size >= MaxSize.
inflight_size(Name) ->
%% Note: we subtract 1 because there's a metadata row that hold
%% the maximum size value.
MetadataRowCount = 1,
case ets:info(Name, size) of
Size when Size > MaxSize -> true;
_ -> false
undefined -> 0;
Size -> max(0, Size - MetadataRowCount)
end.
inflight_append(undefined, _Ref, _Query) ->
inflight_append(undefined, _Ref, _Query, _Id, _Index) ->
ok;
inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch) ->
inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch, Id, Index) ->
ets:insert(Name, {Ref, [?QUERY(From, Req, true) || ?QUERY(From, Req, _) <- Batch]}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)),
ok;
inflight_append(Name, Ref, ?QUERY(From, Req, _)) ->
inflight_append(Name, Ref, ?QUERY(From, Req, _), Id, Index) ->
ets:insert(Name, {Ref, ?QUERY(From, Req, true)}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)),
ok;
inflight_append(Name, Ref, Data) ->
inflight_append(Name, Ref, Data, _Id, _Index) ->
ets:insert(Name, {Ref, Data}),
%% this is a metadata row being inserted; therefore, we don't bump
%% the inflight metric.
ok.
inflight_drop(undefined, _) ->
inflight_drop(undefined, _, _Id, _Index) ->
ok;
inflight_drop(Name, Ref) ->
inflight_drop(Name, Ref, Id, Index) ->
ets:delete(Name, Ref),
emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)),
ok.
%%==============================================================================

View File

@ -39,6 +39,7 @@ groups() ->
init_per_testcase(_, Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
Config.
end_per_testcase(_, _Config) ->
_ = emqx_resource:remove(?ID).
@ -503,7 +504,10 @@ t_stop_start(_) ->
),
%% add some metrics to test their persistence
emqx_resource_metrics:batching_change(?ID, 5),
WorkerID0 = <<"worker:0">>,
WorkerID1 = <<"worker:1">>,
emqx_resource_metrics:batching_set(?ID, WorkerID0, 2),
emqx_resource_metrics:batching_set(?ID, WorkerID1, 3),
?assertEqual(5, emqx_resource_metrics:batching_get(?ID)),
{ok, _} = emqx_resource:check_and_recreate(
@ -537,7 +541,8 @@ t_stop_start(_) ->
?assert(is_process_alive(Pid1)),
%% now stop while resetting the metrics
emqx_resource_metrics:batching_change(?ID, 5),
emqx_resource_metrics:batching_set(?ID, WorkerID0, 1),
emqx_resource_metrics:batching_set(?ID, WorkerID1, 4),
?assertEqual(5, emqx_resource_metrics:batching_get(?ID)),
ok = emqx_resource:stop(?ID),
?assertEqual(0, emqx_resource_metrics:batching_get(?ID)),

View File

@ -315,11 +315,11 @@ handle_telemetry_event(
emqx_resource_metrics:dropped_queue_full_inc(ID, Val);
handle_telemetry_event(
[wolff, queuing],
#{counter_inc := Val},
#{bridge_id := ID},
#{counter_inc := _Val},
#{bridge_id := ID, partition_id := PartitionID},
_HandlerConfig
) when is_integer(Val) ->
emqx_resource_metrics:queuing_change(ID, Val);
) when is_integer(_Val) ->
emqx_resource_metrics:queuing_set(ID, PartitionID, 0);
handle_telemetry_event(
[wolff, retried],
#{counter_inc := Val},
@ -336,11 +336,11 @@ handle_telemetry_event(
emqx_resource_metrics:failed_inc(ID, Val);
handle_telemetry_event(
[wolff, inflight],
#{counter_inc := Val},
#{bridge_id := ID},
#{counter_inc := _Val},
#{bridge_id := ID, partition_id := PartitionID},
_HandlerConfig
) when is_integer(Val) ->
emqx_resource_metrics:inflight_change(ID, Val);
) when is_integer(_Val) ->
emqx_resource_metrics:inflight_set(ID, PartitionID, 0);
handle_telemetry_event(
[wolff, retried_failed],
#{counter_inc := Val},

View File

@ -124,6 +124,7 @@ init_per_testcase(TestCase, Config0) when
delete_all_bridges(),
Tid = install_telemetry_handler(TestCase),
Config = generate_config(Config0),
put(telemetry_table, Tid),
[{telemetry_table, Tid} | Config];
false ->
{skip, no_batching}
@ -133,6 +134,7 @@ init_per_testcase(TestCase, Config0) ->
delete_all_bridges(),
Tid = install_telemetry_handler(TestCase),
Config = generate_config(Config0),
put(telemetry_table, Tid),
[{telemetry_table, Tid} | Config].
end_per_testcase(_TestCase, _Config) ->
@ -393,7 +395,11 @@ assert_metrics(ExpectedMetrics, ResourceId) ->
maps:keys(ExpectedMetrics)
),
CurrentMetrics = current_metrics(ResourceId),
?assertEqual(ExpectedMetrics, Metrics, #{current_metrics => CurrentMetrics}),
TelemetryTable = get(telemetry_table),
RecordedEvents = ets:tab2list(TelemetryTable),
?assertEqual(ExpectedMetrics, Metrics, #{
current_metrics => CurrentMetrics, recorded_events => RecordedEvents
}),
ok.
assert_empty_metrics(ResourceId) ->
@ -554,6 +560,7 @@ t_publish_success(Config) ->
ResourceId = ?config(resource_id, Config),
ServiceAccountJSON = ?config(service_account_json, Config),
TelemetryTable = ?config(telemetry_table, Config),
QueryMode = ?config(query_mode, Config),
Topic = <<"t/topic">>,
?check_trace(
create_bridge(Config),
@ -582,6 +589,17 @@ t_publish_success(Config) ->
),
%% to avoid test flakiness
wait_telemetry_event(TelemetryTable, success, ResourceId),
ExpectedInflightEvents =
case QueryMode of
sync -> 1;
async -> 3
end,
wait_telemetry_event(
TelemetryTable,
inflight,
ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000}
),
assert_metrics(
#{
batching => 0,
@ -601,6 +619,7 @@ t_publish_success_local_topic(Config) ->
ResourceId = ?config(resource_id, Config),
ServiceAccountJSON = ?config(service_account_json, Config),
TelemetryTable = ?config(telemetry_table, Config),
QueryMode = ?config(query_mode, Config),
LocalTopic = <<"local/topic">>,
{ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}),
assert_empty_metrics(ResourceId),
@ -619,6 +638,17 @@ t_publish_success_local_topic(Config) ->
),
%% to avoid test flakiness
wait_telemetry_event(TelemetryTable, success, ResourceId),
ExpectedInflightEvents =
case QueryMode of
sync -> 1;
async -> 3
end,
wait_telemetry_event(
TelemetryTable,
inflight,
ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000}
),
assert_metrics(
#{
batching => 0,
@ -649,6 +679,7 @@ t_publish_templated(Config) ->
ResourceId = ?config(resource_id, Config),
ServiceAccountJSON = ?config(service_account_json, Config),
TelemetryTable = ?config(telemetry_table, Config),
QueryMode = ?config(query_mode, Config),
Topic = <<"t/topic">>,
PayloadTemplate = <<
"{\"payload\": \"${payload}\","
@ -694,6 +725,17 @@ t_publish_templated(Config) ->
),
%% to avoid test flakiness
wait_telemetry_event(TelemetryTable, success, ResourceId),
ExpectedInflightEvents =
case QueryMode of
sync -> 1;
async -> 3
end,
wait_telemetry_event(
TelemetryTable,
inflight,
ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000}
),
assert_metrics(
#{
batching => 0,
@ -1053,7 +1095,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
%% response expired, this succeeds.
{econnrefused, async, _} ->
wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{
timeout => 10_000, n_events => 2
timeout => 10_000, n_events => 1
}),
CurrentMetrics = current_metrics(ResourceId),
RecordedEvents = ets:tab2list(TelemetryTable),
@ -1290,6 +1332,17 @@ t_unrecoverable_error(Config) ->
end
),
wait_telemetry_event(TelemetryTable, failed, ResourceId),
ExpectedInflightEvents =
case QueryMode of
sync -> 1;
async -> 3
end,
wait_telemetry_event(
TelemetryTable,
inflight,
ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000}
),
assert_metrics(
#{
batching => 0,