From 7d798c10e9e5dbb12687fb3cc988944e6c4f832e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 17 May 2023 15:23:42 -0300 Subject: [PATCH 1/4] perf(buffer_worker): flush metrics periodically inside buffer worker process Fixes https://emqx.atlassian.net/browse/EMQX-9905 Since calling `telemetry` is costly in a hot path, we instead collect metrics inside the buffer workers state and periodically flush them, rather than immediately as events happen. --- .../test/emqx_bridge_mqtt_SUITE.erl | 26 +- .../test/emqx_bridge_gcp_pubsub_SUITE.erl | 57 +-- apps/emqx_resource/include/emqx_resource.hrl | 4 + .../src/emqx_resource_buffer_worker.erl | 383 +++++++++++------- .../src/schema/emqx_resource_schema.erl | 6 + .../test/emqx_resource_SUITE.erl | 86 +++- 6 files changed, 341 insertions(+), 221 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index bd5cda3f0..f0de07da2 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -100,17 +100,21 @@ ?assertMetrics(Pat, true, BridgeID) ). -define(assertMetrics(Pat, Guard, BridgeID), - ?assertMatch( - #{ - <<"metrics">> := Pat, - <<"node_metrics">> := [ - #{ - <<"node">> := _, - <<"metrics">> := Pat - } - ] - } when Guard, - request_bridge_metrics(BridgeID) + ?retry( + _Sleep = 300, + _Attempts0 = 20, + ?assertMatch( + #{ + <<"metrics">> := Pat, + <<"node_metrics">> := [ + #{ + <<"node">> := _, + <<"metrics">> := Pat + } + ] + } when Guard, + request_bridge_metrics(BridgeID) + ) ) ). diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl index 55527bf1f..49ca57c42 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl @@ -288,6 +288,7 @@ gcp_pubsub_config(Config) -> " pipelining = ~b\n" " resource_opts = {\n" " request_timeout = 500ms\n" + " metrics_flush_interval = 700ms\n" " worker_pool_size = 1\n" " query_mode = ~s\n" " batch_size = ~b\n" @@ -529,12 +530,14 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> end. receive_all_events(EventName, Timeout) -> - receive_all_events(EventName, Timeout, []). + receive_all_events(EventName, Timeout, _MaxEvents = 10, _Count = 0, _Acc = []). -receive_all_events(EventName, Timeout, Acc) -> +receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents -> + lists:reverse(Acc); +receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) -> receive {telemetry, #{name := [_, _, EventName]} = Event} -> - receive_all_events(EventName, Timeout, [Event | Acc]) + receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc]) after Timeout -> lists:reverse(Acc) end. @@ -557,8 +560,9 @@ wait_n_events(_TelemetryTable, _ResourceId, NEvents, _Timeout, _EventName) when ok; wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) -> receive - {telemetry, #{name := [_, _, EventName]}} -> - wait_n_events(TelemetryTable, ResourceId, NEvents - 1, Timeout, EventName) + {telemetry, #{name := [_, _, EventName], measurements := #{counter_inc := Inc}} = Event} -> + ct:pal("telemetry event: ~p", [Event]), + wait_n_events(TelemetryTable, ResourceId, NEvents - Inc, Timeout, EventName) after Timeout -> RecordedEvents = ets:tab2list(TelemetryTable), CurrentMetrics = current_metrics(ResourceId), @@ -575,7 +579,6 @@ t_publish_success(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), - QueryMode = ?config(query_mode, Config), Topic = <<"t/topic">>, ?check_trace( create_bridge(Config), @@ -604,17 +607,6 @@ t_publish_success(Config) -> ), %% to avoid test flakiness wait_telemetry_event(TelemetryTable, success, ResourceId), - ExpectedInflightEvents = - case QueryMode of - sync -> 1; - async -> 3 - end, - wait_telemetry_event( - TelemetryTable, - inflight, - ResourceId, - #{n_events => ExpectedInflightEvents, timeout => 5_000} - ), wait_until_gauge_is(queuing, 0, 500), wait_until_gauge_is(inflight, 0, 500), assert_metrics( @@ -635,7 +627,6 @@ t_publish_success_local_topic(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), - QueryMode = ?config(query_mode, Config), LocalTopic = <<"local/topic">>, {ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}), assert_empty_metrics(ResourceId), @@ -654,17 +645,6 @@ t_publish_success_local_topic(Config) -> ), %% to avoid test flakiness wait_telemetry_event(TelemetryTable, success, ResourceId), - ExpectedInflightEvents = - case QueryMode of - sync -> 1; - async -> 3 - end, - wait_telemetry_event( - TelemetryTable, - inflight, - ResourceId, - #{n_events => ExpectedInflightEvents, timeout => 5_000} - ), wait_until_gauge_is(queuing, 0, 500), wait_until_gauge_is(inflight, 0, 500), assert_metrics( @@ -696,7 +676,6 @@ t_publish_templated(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), - QueryMode = ?config(query_mode, Config), Topic = <<"t/topic">>, PayloadTemplate = << "{\"payload\": \"${payload}\"," @@ -742,17 +721,6 @@ t_publish_templated(Config) -> ), %% to avoid test flakiness wait_telemetry_event(TelemetryTable, success, ResourceId), - ExpectedInflightEvents = - case QueryMode of - sync -> 1; - async -> 3 - end, - wait_telemetry_event( - TelemetryTable, - inflight, - ResourceId, - #{n_events => ExpectedInflightEvents, timeout => 5_000} - ), wait_until_gauge_is(queuing, 0, 500), wait_until_gauge_is(inflight, 0, 500), assert_metrics( @@ -1089,9 +1057,6 @@ do_econnrefused_or_timeout_test(Config, Error) -> %% message as dropped; and since it never considers the %% response expired, this succeeds. econnrefused -> - wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{ - timeout => 10_000, n_events => 1 - }), %% even waiting, hard to avoid flakiness... simpler to just sleep %% a bit until stabilization. ct:sleep(200), @@ -1111,8 +1076,8 @@ do_econnrefused_or_timeout_test(Config, Error) -> CurrentMetrics ); timeout -> - wait_until_gauge_is(inflight, 0, _Timeout = 400), - wait_until_gauge_is(queuing, 0, _Timeout = 400), + wait_until_gauge_is(inflight, 0, _Timeout = 1_000), + wait_until_gauge_is(queuing, 0, _Timeout = 1_000), assert_metrics( #{ dropped => 0, diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index e6f86fb59..7f3ac580d 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -103,6 +103,10 @@ -define(HEALTHCHECK_INTERVAL, 15000). -define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>). +%% milliseconds +-define(DEFAULT_METRICS_FLUSH_INTERVAL, 5_000). +-define(DEFAULT_METRICS_FLUSH_INTERVAL_RAW, <<"5s">>). + %% milliseconds -define(START_TIMEOUT, 5000). -define(START_TIMEOUT_RAW, <<"5s">>). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 6145c3d87..993e69749 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -80,16 +80,30 @@ -type health_check_interval() :: timer:time(). -type state() :: blocked | running. -type inflight_key() :: integer(). +-type counters() :: #{ + dropped_expired => non_neg_integer(), + dropped_queue_full => non_neg_integer(), + dropped_resource_not_found => non_neg_integer(), + dropped_resource_stopped => non_neg_integer(), + success => non_neg_integer(), + failed => non_neg_integer(), + retried_success => non_neg_integer(), + retried_failed => non_neg_integer() +}. +-type inflight_table() :: ets:tid() | atom() | reference(). -type data() :: #{ id := id(), index := index(), - inflight_tid := ets:tid(), + inflight_tid := inflight_table(), async_workers := #{pid() => reference()}, batch_size := pos_integer(), batch_time := timer:time(), + counters := counters(), + metrics_flush_interval := timer:time(), queue := replayq:q(), resume_interval := timer:time(), - tref := undefined | timer:tref() + tref := undefined | {timer:tref() | reference(), reference()}, + metrics_tref := undefined | {timer:tref() | reference(), reference()} }. callback_mode() -> [state_functions, state_enter]. @@ -171,24 +185,29 @@ init({Id, Index, Opts}) -> emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)), emqx_resource_metrics:inflight_set(Id, Index, 0), InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT), - InflightTID = inflight_new(InflightWinSize, Id, Index), + InflightTID = inflight_new(InflightWinSize), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), DefaultResumeInterval = default_resume_interval(RequestTimeout, HealthCheckInterval), ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval), - Data = #{ + MetricsFlushInterval = maps:get(metrics_flush_interval, Opts, ?DEFAULT_METRICS_FLUSH_INTERVAL), + Data0 = #{ id => Id, index => Index, inflight_tid => InflightTID, async_workers => #{}, batch_size => BatchSize, batch_time => BatchTime, + counters => #{}, + metrics_flush_interval => MetricsFlushInterval, queue => Queue, resume_interval => ResumeInterval, - tref => undefined + tref => undefined, + metrics_tref => undefined }, + Data = ensure_metrics_flush_timer(Data0), ?tp(buffer_worker_init, #{id => Id, index => Index, queue_opts => QueueOpts}), {ok, running, Data}. @@ -208,11 +227,16 @@ running(cast, block, St) -> {next_state, blocked, St}; running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) -> handle_query_requests(Request0, Data); -running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> - flush(St#{tref := undefined}); -running(info, {flush, _Ref}, _St) -> +running(info, {flush, Ref}, Data = #{tref := {_TRef, Ref}}) -> + flush(Data#{tref := undefined}); +running(info, {flush, _Ref}, _Data) -> ?tp(discarded_stale_flush, #{}), keep_state_and_data; +running(info, {flush_metrics, Ref}, Data0 = #{metrics_tref := {_TRef, Ref}}) -> + Data = flush_metrics(Data0#{metrics_tref := undefined}), + {keep_state, Data}; +running(info, {flush_metrics, _Ref}, _Data) -> + keep_state_and_data; running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) -> @@ -241,6 +265,11 @@ blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) -> blocked(info, {flush, _Ref}, _Data) -> %% ignore stale timer keep_state_and_data; +blocked(info, {flush_metrics, Ref}, Data0 = #{metrics_tref := {_TRef, Ref}}) -> + Data = flush_metrics(Data0#{metrics_tref := undefined}), + {keep_state, Data}; +blocked(info, {flush_metrics, _Ref}, _Data) -> + keep_state_and_data; blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) -> @@ -310,11 +339,7 @@ pick_cast(Id, Key, Query) -> resume_from_blocked(Data) -> ?tp(buffer_worker_resume_from_blocked_enter, #{}), - #{ - id := Id, - index := Index, - inflight_tid := InflightTID - } = Data, + #{inflight_tid := InflightTID} = Data, Now = now_(), case inflight_get_first_retriable(InflightTID, Now) of none -> @@ -326,10 +351,15 @@ resume_from_blocked(Data) -> end; {expired, Ref, Batch} -> WorkerPid = self(), - IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), - IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)), + IsAcked = ack_inflight(InflightTID, Ref, WorkerPid), + Counters = + case IsAcked of + true -> #{dropped_expired => length(Batch)}; + false -> #{} + end, + NData = aggregate_counters(Data, Counters), ?tp(buffer_worker_retry_expired, #{expired => Batch}), - resume_from_blocked(Data); + resume_from_blocked(NData); {single, Ref, Query} -> %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. @@ -339,11 +369,11 @@ resume_from_blocked(Data) -> {batch, Ref, NotExpired, Expired} -> NumExpired = length(Expired), ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired), - emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), + NData = aggregate_counters(Data, #{dropped_expired => NumExpired}), ?tp(buffer_worker_retry_expired, #{expired => Expired}), %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. - retry_inflight_sync(Ref, NotExpired, Data) + retry_inflight_sync(Ref, NotExpired, NData) end. retry_inflight_sync(Ref, QueryOrBatch, Data0) -> @@ -356,7 +386,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> ?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), QueryOpts = #{simple_query => false}, Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts), - ReplyResult = + {ShouldAck, PostFn, DeltaCounters} = case QueryOrBatch of ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) -> Reply = ?REPLY(ReplyTo, HasBeenSent, Result), @@ -364,9 +394,10 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> [?QUERY(_, _, _, _) | _] = Batch -> batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) end, - case ReplyResult of + Data1 = aggregate_counters(Data0, DeltaCounters), + case ShouldAck of %% Send failed because resource is down - {nack, PostFn} -> + nack -> PostFn(), ?tp( buffer_worker_retry_inflight_failed, @@ -375,11 +406,11 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> query_or_batch => QueryOrBatch } ), - {keep_state, Data0, {state_timeout, ResumeT, unblock}}; + {keep_state, Data1, {state_timeout, ResumeT, unblock}}; %% Send ok or failed but the resource is working - {ack, PostFn} -> + ack -> WorkerPid = self(), - IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), + IsAcked = ack_inflight(InflightTID, Ref, WorkerPid), %% we need to defer bumping the counters after %% `inflight_drop' to avoid the race condition when an %% inflight request might get completed concurrently with @@ -394,7 +425,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> query_or_batch => QueryOrBatch } ), - resume_from_blocked(Data0) + resume_from_blocked(Data1) end. %% Called during the `running' state only. @@ -426,9 +457,9 @@ collect_and_enqueue_query_requests(Request0, Data0) -> end, Requests ), - {Overflown, NewQ} = append_queue(Id, Index, Q, Queries), + {Overflown, NewQ, DeltaCounters} = append_queue(Id, Index, Q, Queries), ok = reply_overflown(Overflown), - Data0#{queue := NewQ}. + aggregate_counters(Data0#{queue := NewQ}, DeltaCounters). reply_overflown([]) -> ok; @@ -463,8 +494,6 @@ maybe_flush(Data0) -> -spec flush(data()) -> gen_statem:event_handler_result(state(), data()). flush(Data0) -> #{ - id := Id, - index := Index, batch_size := BatchSize, inflight_tid := InflightTID, queue := Q0 @@ -497,13 +526,13 @@ flush(Data0) -> case sieve_expired_requests(Batch, Now) of {[], _AllExpired} -> ok = replayq:ack(Q1, QAckRef), - emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)), - emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + NumExpired = length(Batch), + Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}), ?tp(buffer_worker_flush_all_expired, #{batch => Batch}), - flush(Data2); + flush(Data3); {NotExpired, Expired} -> NumExpired = length(Expired), - emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), + Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}), IsBatch = (BatchSize > 1), %% We *must* use the new queue, because we currently can't %% `nack' a `pop'. @@ -513,7 +542,7 @@ flush(Data0) -> #{expired => Expired, not_expired => NotExpired} ), Ref = make_request_ref(), - do_flush(Data2, #{ + do_flush(Data3, #{ is_batch => IsBatch, batch => NotExpired, ref => Ref, @@ -548,7 +577,9 @@ do_flush( QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts), Reply = ?REPLY(ReplyTo, HasBeenSent, Result), - case reply_caller(Id, Reply, QueryOpts) of + {ShouldAck, DeltaCounters} = reply_caller(Id, Reply, QueryOpts), + Data1 = aggregate_counters(Data0, DeltaCounters), + case ShouldAck of %% Failed; remove the request from the queue, as we cannot pop %% from it again, but we'll retry it using the inflight table. nack -> @@ -562,11 +593,10 @@ do_flush( %% request will be retried (i.e., it might not have been %% inserted during `call_query' if the resource was down %% and/or if it was a sync request). - inflight_append(InflightTID, InflightItem, Id, Index), + inflight_append(InflightTID, InflightItem), mark_inflight_as_retriable(InflightTID, Ref), - {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), + {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), - emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp( buffer_worker_flush_nack, #{ @@ -576,7 +606,7 @@ do_flush( result => Result } ), - {next_state, blocked, Data1}; + {next_state, blocked, Data2}; %% Success; just ack. ack -> ok = replayq:ack(Q1, QAckRef), @@ -588,15 +618,14 @@ do_flush( WorkerPid = self(), case is_async_return(Result) of true when IsUnrecoverableError -> - ack_inflight(InflightTID, Ref, Id, Index, WorkerPid); + ack_inflight(InflightTID, Ref, WorkerPid); true -> ok; false -> - ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) + ack_inflight(InflightTID, Ref, WorkerPid) end, - {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), + {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), - emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp( buffer_worker_flush_ack, #{ @@ -617,7 +646,7 @@ do_flush( }), ok end, - {keep_state, Data1} + {keep_state, Data2} end; do_flush(#{queue := Q1} = Data0, #{ is_batch := true, @@ -633,7 +662,9 @@ do_flush(#{queue := Q1} = Data0, #{ } = Data0, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(async_if_possible, Id, Index, Ref, Batch, QueryOpts), - case batch_reply_caller(Id, Result, Batch, QueryOpts) of + {ShouldAck, DeltaCounters} = batch_reply_caller(Id, Result, Batch, QueryOpts), + Data1 = aggregate_counters(Data0, DeltaCounters), + case ShouldAck of %% Failed; remove the request from the queue, as we cannot pop %% from it again, but we'll retry it using the inflight table. nack -> @@ -647,11 +678,10 @@ do_flush(#{queue := Q1} = Data0, #{ %% request will be retried (i.e., it might not have been %% inserted during `call_query' if the resource was down %% and/or if it was a sync request). - inflight_append(InflightTID, InflightItem, Id, Index), + inflight_append(InflightTID, InflightItem), mark_inflight_as_retriable(InflightTID, Ref), - {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), + {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), - emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp( buffer_worker_flush_nack, #{ @@ -661,7 +691,7 @@ do_flush(#{queue := Q1} = Data0, #{ result => Result } ), - {next_state, blocked, Data1}; + {next_state, blocked, Data2}; %% Success; just ack. ack -> ok = replayq:ack(Q1, QAckRef), @@ -673,15 +703,14 @@ do_flush(#{queue := Q1} = Data0, #{ WorkerPid = self(), case is_async_return(Result) of true when IsUnrecoverableError -> - ack_inflight(InflightTID, Ref, Id, Index, WorkerPid); + ack_inflight(InflightTID, Ref, WorkerPid); true -> ok; false -> - ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) + ack_inflight(InflightTID, Ref, WorkerPid) end, - {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), + {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), - emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), CurrentCount = queue_count(Q1), ?tp( buffer_worker_flush_ack, @@ -691,13 +720,13 @@ do_flush(#{queue := Q1} = Data0, #{ queue_count => CurrentCount } ), - Data2 = + Data3 = case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{ inflight => inflight_count(InflightTID) }), - Data1; + Data2; {true, true} -> ?tp(buffer_worker_flush_ack_reflush, #{ batch_or_query => Batch, @@ -706,17 +735,18 @@ do_flush(#{queue := Q1} = Data0, #{ batch_size => BatchSize }), flush_worker(self()), - Data1; + Data2; {true, false} -> - ensure_flush_timer(Data1) + ensure_flush_timer(Data2) end, - {keep_state, Data2} + {keep_state, Data3} end. batch_reply_caller(Id, BatchResult, Batch, QueryOpts) -> - {ShouldBlock, PostFn} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts), + {ShouldBlock, PostFn, DeltaCounters} = + batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts), PostFn(), - ShouldBlock. + {ShouldBlock, DeltaCounters}. batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> %% the `Mod:on_batch_query/3` returns a single result for a batch, @@ -727,23 +757,25 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> end, Batch ), - {ShouldAck, PostFns} = + {ShouldAck, PostFns, Counters} = lists:foldl( - fun(Reply, {_ShouldAck, PostFns}) -> + fun(Reply, {_ShouldAck, PostFns, OldCounters}) -> %% _ShouldAck should be the same as ShouldAck starting from the second reply - {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), - {ShouldAck, [PostFn | PostFns]} + {ShouldAck, PostFn, DeltaCounters} = reply_caller_defer_metrics( + Id, Reply, QueryOpts + ), + {ShouldAck, [PostFn | PostFns], merge_counters(OldCounters, DeltaCounters)} end, - {ack, []}, + {ack, [], #{}}, Replies ), PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end, - {ShouldAck, PostFn}. + {ShouldAck, PostFn, Counters}. reply_caller(Id, Reply, QueryOpts) -> - {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), + {ShouldAck, PostFn, DeltaCounters} = reply_caller_defer_metrics(Id, Reply, QueryOpts), PostFn(), - ShouldAck. + {ShouldAck, DeltaCounters}. %% Should only reply to the caller when the decision is final (not %% retriable). See comment on `handle_query_result_pure'. @@ -752,7 +784,7 @@ reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpt reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) -> IsSimpleQuery = maps:get(simple_query, QueryOpts, false), IsUnrecoverableError = is_unrecoverable_error(Result), - {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), + {ShouldAck, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent), case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of {ack, {async_return, _}, true, _} -> ok = do_reply_caller(ReplyTo, Result); @@ -765,11 +797,14 @@ reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) {ack, _, _, _} -> ok = do_reply_caller(ReplyTo, Result) end, - {ShouldAck, PostFn}. + {ShouldAck, PostFn, DeltaCounters}. +%% This is only called by `simple_{,a}sync_query', so we can bump the +%% counters here. handle_query_result(Id, Result, HasBeenSent) -> - {ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), + {ShouldBlock, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent), PostFn(), + bump_counters(Id, DeltaCounters), ShouldBlock. %% We should always retry (nack), except when: @@ -778,85 +813,156 @@ handle_query_result(Id, Result, HasBeenSent) -> %% * the result is a success (or at least a delayed result) %% We also retry even sync requests. In that case, we shouldn't reply %% the caller until one of those final results above happen. +-spec handle_query_result_pure(id(), term(), HasBeenSent :: boolean()) -> + {ack | nack, function(), counters()}. handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{msg => resource_exception, info => Msg}), ok end, - {nack, PostFn}; + {nack, PostFn, #{}}; handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when NotWorking == not_connected; NotWorking == blocked -> - {nack, fun() -> ok end}; + {nack, fun() -> ok end, #{}}; handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), - emqx_resource_metrics:dropped_resource_not_found_inc(Id), ok end, - {ack, PostFn}; + {ack, PostFn, #{dropped_resource_not_found => 1}}; handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), - emqx_resource_metrics:dropped_resource_stopped_inc(Id), ok end, - {ack, PostFn}; + {ack, PostFn, #{dropped_resource_stopped => 1}}; handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), ok end, - {nack, PostFn}; + {nack, PostFn, #{}}; handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> case is_unrecoverable_error(Error) of true -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), - inc_sent_failed(Id, HasBeenSent), ok end, - {ack, PostFn}; + Counters = + case HasBeenSent of + true -> #{retried_failed => 1}; + false -> #{failed => 1} + end, + {ack, PostFn, Counters}; false -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), ok end, - {nack, PostFn} + {nack, PostFn, #{}} end; handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) -> handle_query_async_result_pure(Id, Result, HasBeenSent); -handle_query_result_pure(Id, Result, HasBeenSent) -> +handle_query_result_pure(_Id, Result, HasBeenSent) -> PostFn = fun() -> assert_ok_result(Result), - inc_sent_success(Id, HasBeenSent), ok end, - {ack, PostFn}. + Counters = + case HasBeenSent of + true -> #{retried_success => 1}; + false -> #{success => 1} + end, + {ack, PostFn, Counters}. +-spec handle_query_async_result_pure(id(), term(), HasBeenSent :: boolean()) -> + {ack | nack, function(), counters()}. handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> case is_unrecoverable_error(Error) of true -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), - inc_sent_failed(Id, HasBeenSent), ok end, - {ack, PostFn}; + Counters = + case HasBeenSent of + true -> #{retried_failed => 1}; + false -> #{failed => 1} + end, + {ack, PostFn, Counters}; false -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => async_send_error, reason => Reason}), ok end, - {nack, PostFn} + {nack, PostFn, #{}} end; handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) -> - {ack, fun() -> ok end}; + {ack, fun() -> ok end, #{}}; handle_query_async_result_pure(_Id, ok, _HasBeenSent) -> - {ack, fun() -> ok end}. + {ack, fun() -> ok end, #{}}. + +-spec aggregate_counters(data(), counters()) -> data(). +aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) -> + Counters = merge_counters(OldCounters, DeltaCounters), + Data#{counters := Counters}. + +-spec merge_counters(counters(), counters()) -> counters(). +merge_counters(OldCounters, DeltaCounters) -> + maps:fold( + fun(Metric, Val, Acc) -> + maps:update_with(Metric, fun(X) -> X + Val end, Val, Acc) + end, + OldCounters, + DeltaCounters + ). + +-spec flush_metrics(data()) -> data(). +flush_metrics(Data = #{id := Id, counters := Counters}) -> + bump_counters(Id, Counters), + set_gauges(Data), + ensure_metrics_flush_timer(Data#{counters := #{}}). + +-spec ensure_metrics_flush_timer(data()) -> data(). +ensure_metrics_flush_timer(Data = #{metrics_tref := undefined, metrics_flush_interval := T}) -> + Ref = make_ref(), + TRef = erlang:send_after(T, self(), {flush_metrics, Ref}), + Data#{metrics_tref := {TRef, Ref}}. + +-spec bump_counters(id(), counters()) -> ok. +bump_counters(Id, Counters) -> + maps:foreach( + fun + (dropped_expired, Val) -> + emqx_resource_metrics:dropped_expired_inc(Id, Val); + (dropped_queue_full, Val) -> + emqx_resource_metrics:dropped_queue_full_inc(Id, Val); + (failed, Val) -> + emqx_resource_metrics:failed_inc(Id, Val); + (retried_failed, Val) -> + emqx_resource_metrics:retried_failed_inc(Id, Val); + (success, Val) -> + emqx_resource_metrics:success_inc(Id, Val); + (retried_success, Val) -> + emqx_resource_metrics:retried_success_inc(Id, Val); + (dropped_resource_not_found, Val) -> + emqx_resource_metrics:dropped_resource_not_found_inc(Id, Val); + (dropped_resource_stopped, Val) -> + emqx_resource_metrics:dropped_resource_stopped_inc(Id, Val) + end, + Counters + ). + +-spec set_gauges(data()) -> ok. +set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := InflightTID}) -> + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)), + emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), + ok. handle_async_worker_down(Data0, Pid) -> #{async_workers := AsyncWorkers0} = Data0, @@ -942,7 +1048,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re IsRetriable = false, WorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), - ok = inflight_append(InflightTID, InflightItem, Id, Index), + ok = inflight_append(InflightTID, InflightItem), Result = Mod:on_query_async(Id, Request, {ReplyFun, [ReplyContext]}, ResSt), {async_return, Result} end, @@ -978,7 +1084,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re IsRetriable = false, WorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), - ok = inflight_append(InflightTID, InflightItem, Id, Index), + ok = inflight_append(InflightTID, InflightItem), Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [ReplyContext]}, ResSt), {async_return, Result} end, @@ -1005,7 +1111,6 @@ handle_async_reply1( request_ref := Ref, inflight_tid := InflightTID, resource_id := Id, - worker_index := Index, buffer_worker := WorkerPid, min_query := ?QUERY(_, _, _, ExpireAt) = _Query } = ReplyContext, @@ -1018,7 +1123,9 @@ handle_async_reply1( Now = now_(), case is_expired(ExpireAt, Now) of true -> - IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), + IsAcked = ack_inflight(InflightTID, Ref, WorkerPid), + %% evalutate metrics call here since we're not inside + %% buffer worker IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), ?tp(handle_async_reply_expired, #{expired => [_Query]}), ok; @@ -1031,7 +1138,6 @@ do_handle_async_reply( query_opts := QueryOpts, resource_id := Id, request_ref := Ref, - worker_index := Index, buffer_worker := WorkerPid, inflight_tid := InflightTID, min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query @@ -1041,7 +1147,7 @@ do_handle_async_reply( %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. - {Action, PostFn} = reply_caller_defer_metrics( + {Action, PostFn, DeltaCounters} = reply_caller_defer_metrics( Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts ), @@ -1058,7 +1164,7 @@ do_handle_async_reply( ok = ?MODULE:block(WorkerPid), blocked; ack -> - ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts) + ok = do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts) end. handle_async_batch_reply( @@ -1110,7 +1216,6 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight, #{ resource_id := Id, - worker_index := Index, buffer_worker := WorkerPid, inflight_tid := InflightTID, request_ref := Ref, @@ -1130,11 +1235,13 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> RealNotExpired0 ), NumExpired = length(RealExpired), + %% evalutate metrics call here since we're not inside buffer + %% worker emqx_resource_metrics:late_reply_inc(Id, NumExpired), case RealNotExpired of [] -> %% all expired, no need to update back the inflight batch - _ = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), + _ = ack_inflight(InflightTID, Ref, WorkerPid), ok; _ -> %% some queries are not expired, put them back to the inflight batch @@ -1147,7 +1254,6 @@ do_handle_async_batch_reply( #{ buffer_worker := WorkerPid, resource_id := Id, - worker_index := Index, inflight_tid := InflightTID, request_ref := Ref, min_batch := Batch, @@ -1155,7 +1261,9 @@ do_handle_async_batch_reply( }, Result ) -> - {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts), + {Action, PostFn, DeltaCounters} = batch_reply_caller_defer_metrics( + Id, Result, Batch, QueryOpts + ), ?tp(handle_async_reply, #{ action => Action, batch_or_query => Batch, @@ -1169,16 +1277,18 @@ do_handle_async_batch_reply( ok = ?MODULE:block(WorkerPid), blocked; ack -> - ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts) + ok = do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts) end. -do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts) -> - IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), +do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts) -> + IsKnownRef = ack_inflight(InflightTID, Ref, WorkerPid), case maps:get(simple_query, QueryOpts, false) of true -> - PostFn(); + PostFn(), + bump_counters(Id, DeltaCounters); false when IsKnownRef -> - PostFn(); + PostFn(), + bump_counters(Id, DeltaCounters); false -> ok end, @@ -1222,31 +1332,30 @@ estimate_size(QItem) -> erlang:external_size(QItem). -spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> - {[queue_query()], replayq:q()}. + {[queue_query()], replayq:q(), counters()}. append_queue(Id, Index, Q, Queries) -> %% this assertion is to ensure that we never append a raw binary %% because the marshaller will get lost. false = is_binary(hd(Queries)), Q0 = replayq:append(Q, Queries), - {Overflown, Q2} = + {Overflown, Q2, DeltaCounters} = case replayq:overflow(Q0) of OverflownBytes when OverflownBytes =< 0 -> - {[], Q0}; + {[], Q0, #{}}; OverflownBytes -> PopOpts = #{bytes_limit => OverflownBytes, count_limit => 999999999}, {Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), - emqx_resource_metrics:dropped_queue_full_inc(Id, Dropped), + Counters = #{dropped_queue_full => Dropped}, ?SLOG(info, #{ msg => buffer_worker_overflow, resource_id => Id, worker_index => Index, dropped => Dropped }), - {Items2, Q1} + {Items2, Q1, Counters} end, - emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)), ?tp( buffer_worker_appended_to_queue, #{ @@ -1256,7 +1365,7 @@ append_queue(Id, Index, Q, Queries) -> overflown => length(Overflown) } ), - {Overflown, Q2}. + {Overflown, Q2, DeltaCounters}. %%============================================================================== %% the inflight queue for async query @@ -1266,20 +1375,18 @@ append_queue(Id, Index, Q, Queries) -> -define(INITIAL_TIME_REF, initial_time). -define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time). -inflight_new(InfltWinSZ, Id, Index) -> +inflight_new(InfltWinSZ) -> TableId = ets:new( emqx_resource_buffer_worker_inflight_tab, [ordered_set, public, {write_concurrency, true}] ), - inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}, Id, Index), + inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}), %% we use this counter because we might deal with batches as %% elements. - inflight_append(TableId, {?SIZE_REF, 0}, Id, Index), - inflight_append(TableId, {?BATCH_COUNT_REF, 0}, Id, Index), - inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index), - inflight_append( - TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}, Id, Index - ), + inflight_append(TableId, {?SIZE_REF, 0}), + inflight_append(TableId, {?BATCH_COUNT_REF, 0}), + inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}), + inflight_append(TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}), TableId. -spec inflight_get_first_retriable(ets:tid(), integer()) -> @@ -1331,38 +1438,32 @@ inflight_num_msgs(InflightTID) -> [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF), Size. -inflight_append(undefined, _InflightItem, _Id, _Index) -> +inflight_append(undefined, _InflightItem) -> ok; inflight_append( InflightTID, - ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, WorkerMRef), - Id, - Index + ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, WorkerMRef) ) -> Batch = mark_as_sent(Batch0), InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), BatchSize = length(Batch), IsNew andalso inc_inflight(InflightTID, BatchSize), - emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; inflight_append( InflightTID, ?INFLIGHT_ITEM( Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef - ), - Id, - Index + ) ) -> Query = mark_as_sent(Query0), InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), IsNew andalso inc_inflight(InflightTID, 1), - emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; -inflight_append(InflightTID, {Ref, Data}, _Id, _Index) -> +inflight_append(InflightTID, {Ref, Data}) -> ets:insert(InflightTID, {Ref, Data}), %% this is a metadata row being inserted; therefore, we don't bump %% the inflight metric. @@ -1398,6 +1499,8 @@ ensure_async_worker_monitored( ensure_async_worker_monitored(Data0, _Result) -> {Data0, undefined}. +-spec store_async_worker_reference(undefined | ets:tid(), inflight_key(), undefined | reference()) -> + ok. store_async_worker_reference(undefined = _InflightTID, _Ref, _WorkerMRef) -> ok; store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) -> @@ -1410,9 +1513,9 @@ store_async_worker_reference(InflightTID, Ref, WorkerMRef) when ), ok. -ack_inflight(undefined, _Ref, _Id, _Index, _WorkerPid) -> +ack_inflight(undefined, _Ref, _WorkerPid) -> false; -ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) -> +ack_inflight(InflightTID, Ref, WorkerPid) -> {Count, Removed} = case ets:take(InflightTID, Ref) of [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] -> @@ -1428,12 +1531,6 @@ ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) -> flush -> ?MODULE:flush_worker(WorkerPid) end, IsKnownRef = (Count > 0), - case IsKnownRef of - true -> - emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)); - false -> - ok - end, IsKnownRef. mark_inflight_items_as_retriable(Data, WorkerMRef) -> @@ -1496,16 +1593,6 @@ dec_inflight_update(InflightTID, Count) when Count > 0 -> %%============================================================================== -inc_sent_failed(Id, _HasBeenSent = true) -> - emqx_resource_metrics:retried_failed_inc(Id); -inc_sent_failed(Id, _HasBeenSent) -> - emqx_resource_metrics:failed_inc(Id). - -inc_sent_success(Id, _HasBeenSent = true) -> - emqx_resource_metrics:retried_success_inc(Id); -inc_sent_success(Id, _HasBeenSent) -> - emqx_resource_metrics:success_inc(Id). - call_mode(force_sync, _) -> sync; call_mode(async_if_possible, always_sync) -> sync; call_mode(async_if_possible, async_if_possible) -> async. diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 3b4fb66e5..1de1a6545 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -44,6 +44,7 @@ fields("creation_opts") -> {worker_pool_size, fun worker_pool_size/1}, {health_check_interval, fun health_check_interval/1}, {resume_interval, fun resume_interval/1}, + {metrics_flush_interval, fun metrics_flush_interval/1}, {start_after_created, fun start_after_created/1}, {start_timeout, fun start_timeout/1}, {auto_restart_interval, fun auto_restart_interval/1}, @@ -77,6 +78,11 @@ resume_interval(desc) -> ?DESC("resume_interval"); resume_interval(required) -> false; resume_interval(_) -> undefined. +metrics_flush_interval(type) -> emqx_schema:duration_ms(); +metrics_flush_interval(importance) -> ?IMPORTANCE_HIDDEN; +metrics_flush_interval(required) -> false; +metrics_flush_interval(_) -> undefined. + health_check_interval(type) -> emqx_schema:duration_ms(); health_check_interval(desc) -> ?DESC("health_check_interval"); health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index fc338b512..b960b0526 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -316,7 +316,11 @@ t_query_counter_async_query(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, - #{query_mode => async, batch_size => 1} + #{ + query_mode => async, + batch_size => 1, + metrics_flush_interval => 50 + } ), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), NMsgs = 1_000, @@ -350,7 +354,11 @@ t_query_counter_async_query(_) -> end ), #{counters := C} = emqx_resource:get_metrics(?ID), - ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C), + ?retry( + _Sleep = 300, + _Attempts0 = 20, + ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C) + ), ok = emqx_resource:remove_local(?ID). t_query_counter_async_callback(_) -> @@ -1171,6 +1179,7 @@ t_unblock_only_required_buffer_workers(_) -> #{ query_mode => async, batch_size => 5, + metrics_flush_interval => 50, batch_time => 100 } ), @@ -1219,6 +1228,7 @@ t_retry_batch(_Config) -> batch_size => 5, batch_time => 100, worker_pool_size => 1, + metrics_flush_interval => 50, resume_interval => 1_000 } ), @@ -1318,6 +1328,7 @@ t_delete_and_re_create_with_same_name(_Config) -> worker_pool_size => NumBufferWorkers, buffer_mode => volatile_offload, buffer_seg_bytes => 100, + metrics_flush_interval => 50, resume_interval => 1_000 } ), @@ -1354,10 +1365,16 @@ t_delete_and_re_create_with_same_name(_Config) -> %% ensure that stuff got enqueued into disk tap_metrics(?LINE), - Queuing1 = emqx_resource_metrics:queuing_get(?ID), - Inflight1 = emqx_resource_metrics:inflight_get(?ID), - ?assert(Queuing1 > 0), - ?assertEqual(2, Inflight1), + ?retry( + _Sleep = 300, + _Attempts0 = 20, + ?assert(emqx_resource_metrics:queuing_get(?ID) > 0) + ), + ?retry( + _Sleep = 300, + _Attempts0 = 20, + ?assertEqual(2, emqx_resource_metrics:inflight_get(?ID)) + ), %% now, we delete the resource process_flag(trap_exit, true), @@ -1409,6 +1426,7 @@ t_always_overflow(_Config) -> batch_size => 1, worker_pool_size => 1, max_buffer_bytes => 1, + metrics_flush_interval => 50, resume_interval => 1_000 } ), @@ -1446,6 +1464,7 @@ t_retry_sync_inflight(_Config) -> query_mode => sync, batch_size => 1, worker_pool_size => 1, + metrics_flush_interval => 50, resume_interval => ResumeInterval } ), @@ -1496,6 +1515,7 @@ t_retry_sync_inflight_batch(_Config) -> batch_size => 2, batch_time => 200, worker_pool_size => 1, + metrics_flush_interval => 50, resume_interval => ResumeInterval } ), @@ -1546,6 +1566,7 @@ t_retry_async_inflight(_Config) -> query_mode => async, batch_size => 1, worker_pool_size => 1, + metrics_flush_interval => 50, resume_interval => ResumeInterval } ), @@ -1590,6 +1611,7 @@ t_retry_async_inflight_full(_Config) -> inflight_window => AsyncInflightWindow, batch_size => 1, worker_pool_size => 1, + metrics_flush_interval => 50, resume_interval => ResumeInterval } ), @@ -1653,6 +1675,7 @@ t_async_reply_multi_eval(_Config) -> batch_size => 3, batch_time => 10, worker_pool_size => 1, + metrics_flush_interval => 50, resume_interval => ResumeInterval } ), @@ -1667,7 +1690,7 @@ t_async_reply_multi_eval(_Config) -> #{} ), ?retry( - ResumeInterval, + 2 * ResumeInterval, TotalTime div ResumeInterval, begin Metrics = tap_metrics(?LINE), @@ -1683,7 +1706,7 @@ t_async_reply_multi_eval(_Config) -> failed := Failed } = Counters, ?assertEqual(TotalQueries, Matched - 1), - ?assertEqual(Matched, Success + Dropped + LateReply + Failed) + ?assertEqual(Matched, Success + Dropped + LateReply + Failed, #{counters => Counters}) end ). @@ -1700,6 +1723,7 @@ t_retry_async_inflight_batch(_Config) -> batch_size => 2, batch_time => 200, worker_pool_size => 1, + metrics_flush_interval => 50, resume_interval => ResumeInterval } ), @@ -1745,6 +1769,7 @@ t_async_pool_worker_death(_Config) -> query_mode => async, batch_size => 1, worker_pool_size => NumBufferWorkers, + metrics_refresh_interval => 50, resume_interval => ResumeInterval } ), @@ -1768,8 +1793,11 @@ t_async_pool_worker_death(_Config) -> inc_counter_in_parallel_increasing(NumReqs, 1, ReqOpts), {ok, _} = snabbkaffe:receive_events(SRef0), - Inflight0 = emqx_resource_metrics:inflight_get(?ID), - ?assertEqual(NumReqs, Inflight0), + ?retry( + _Sleep = 300, + _Attempts0 = 20, + ?assertEqual(NumReqs, emqx_resource_metrics:inflight_get(?ID)) + ), %% grab one of the worker pids and kill it {ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state), @@ -1820,6 +1848,7 @@ t_expiration_sync_before_sending(_Config) -> query_mode => sync, batch_size => 1, worker_pool_size => 1, + metrics_flush_interval => 50, resume_interval => 1_000 } ), @@ -1837,6 +1866,7 @@ t_expiration_sync_batch_before_sending(_Config) -> batch_size => 2, batch_time => 100, worker_pool_size => 1, + metrics_flush_interval => 50, resume_interval => 1_000 } ), @@ -1853,6 +1883,7 @@ t_expiration_async_before_sending(_Config) -> query_mode => async, batch_size => 1, worker_pool_size => 1, + metrics_flush_interval => 50, resume_interval => 1_000 } ), @@ -1870,6 +1901,7 @@ t_expiration_async_batch_before_sending(_Config) -> batch_size => 2, batch_time => 100, worker_pool_size => 1, + metrics_flush_interval => 50, resume_interval => 1_000 } ), @@ -1950,6 +1982,7 @@ t_expiration_sync_before_sending_partial_batch(_Config) -> batch_size => 2, batch_time => 100, worker_pool_size => 1, + metrics_flush_interval => 250, resume_interval => 1_000 } ), @@ -1968,6 +2001,7 @@ t_expiration_async_before_sending_partial_batch(_Config) -> batch_size => 2, batch_time => 100, worker_pool_size => 1, + metrics_flush_interval => 250, resume_interval => 1_000 } ), @@ -2057,7 +2091,14 @@ do_t_expiration_before_sending_partial_batch(QueryMode) -> ], ?of_kind(buffer_worker_flush_potentially_partial, Trace) ), - wait_until_gauge_is(inflight, 0, 500), + wait_until_gauge_is( + inflight, + #{ + expected_value => 0, + timeout => 500, + max_events => 10 + } + ), Metrics = tap_metrics(?LINE), case QueryMode of async -> @@ -2933,8 +2974,15 @@ install_telemetry_handler(TestCase) -> put({?MODULE, telemetry_table}, Tid), Tid. -wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> - Events = receive_all_events(GaugeName, Timeout), +wait_until_gauge_is( + GaugeName, + #{ + expected_value := ExpectedValue, + timeout := Timeout, + max_events := MaxEvents + } +) -> + Events = receive_all_events(GaugeName, Timeout, MaxEvents), case length(Events) > 0 andalso lists:last(Events) of #{measurements := #{gauge_set := ExpectedValue}} -> ok; @@ -2948,12 +2996,18 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> end. receive_all_events(EventName, Timeout) -> - receive_all_events(EventName, Timeout, []). + receive_all_events(EventName, Timeout, _MaxEvents = 50, _Count = 0, _Acc = []). -receive_all_events(EventName, Timeout, Acc) -> +receive_all_events(EventName, Timeout, MaxEvents) -> + receive_all_events(EventName, Timeout, MaxEvents, _Count = 0, _Acc = []). + +receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents -> + lists:reverse(Acc); +receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) -> receive {telemetry, #{name := [_, _, EventName]} = Event} -> - receive_all_events(EventName, Timeout, [Event | Acc]) + ct:pal("telemetry event: ~p", [Event]), + receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc]) after Timeout -> lists:reverse(Acc) end. From c74c93388e5ed440a705f25d85ac2bf101c41b18 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 18 May 2023 09:48:25 -0300 Subject: [PATCH 2/4] refactor: rename some variables and sum type constructors for clarity --- .../src/emqx_resource_buffer_worker.erl | 154 +++++++++--------- 1 file changed, 79 insertions(+), 75 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 993e69749..47769418b 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -63,8 +63,8 @@ -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}). -define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)). -define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}). --define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef), - {Ref, BatchOrQuery, IsRetriable, WorkerMRef} +-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef), + {Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef} ). -define(ITEM_IDX, 2). -define(RETRY_IDX, 3). @@ -350,8 +350,8 @@ resume_from_blocked(Data) -> {next_state, running, Data} end; {expired, Ref, Batch} -> - WorkerPid = self(), - IsAcked = ack_inflight(InflightTID, Ref, WorkerPid), + BufferWorkerPid = self(), + IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), Counters = case IsAcked of true -> #{dropped_expired => length(Batch)}; @@ -409,8 +409,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> {keep_state, Data1, {state_timeout, ResumeT, unblock}}; %% Send ok or failed but the resource is working ack -> - WorkerPid = self(), - IsAcked = ack_inflight(InflightTID, Ref, WorkerPid), + BufferWorkerPid = self(), + IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), %% we need to defer bumping the counters after %% `inflight_drop' to avoid the race condition when an %% inflight request might get completed concurrently with @@ -587,16 +587,16 @@ do_flush( %% we set it atomically just below; a limitation of having %% to use tuples for atomic ets updates IsRetriable = true, - WorkerMRef0 = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerMRef0), + AsyncWorkerMRef0 = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, AsyncWorkerMRef0), %% we must append again to the table to ensure that the %% request will be retried (i.e., it might not have been %% inserted during `call_query' if the resource was down %% and/or if it was a sync request). inflight_append(InflightTID, InflightItem), mark_inflight_as_retriable(InflightTID, Ref), - {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result), - store_async_worker_reference(InflightTID, Ref, WorkerMRef), + {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result), + store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef), ?tp( buffer_worker_flush_nack, #{ @@ -615,17 +615,17 @@ do_flush( %% must ensure the async worker is being monitored for %% such requests. IsUnrecoverableError = is_unrecoverable_error(Result), - WorkerPid = self(), + BufferWorkerPid = self(), case is_async_return(Result) of true when IsUnrecoverableError -> - ack_inflight(InflightTID, Ref, WorkerPid); + ack_inflight(InflightTID, Ref, BufferWorkerPid); true -> ok; false -> - ack_inflight(InflightTID, Ref, WorkerPid) + ack_inflight(InflightTID, Ref, BufferWorkerPid) end, - {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result), - store_async_worker_reference(InflightTID, Ref, WorkerMRef), + {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result), + store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef), ?tp( buffer_worker_flush_ack, #{ @@ -672,16 +672,16 @@ do_flush(#{queue := Q1} = Data0, #{ %% we set it atomically just below; a limitation of having %% to use tuples for atomic ets updates IsRetriable = true, - WorkerMRef0 = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef0), + AsyncWorkerMRef0 = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef0), %% we must append again to the table to ensure that the %% request will be retried (i.e., it might not have been %% inserted during `call_query' if the resource was down %% and/or if it was a sync request). inflight_append(InflightTID, InflightItem), mark_inflight_as_retriable(InflightTID, Ref), - {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result), - store_async_worker_reference(InflightTID, Ref, WorkerMRef), + {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result), + store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef), ?tp( buffer_worker_flush_nack, #{ @@ -700,17 +700,17 @@ do_flush(#{queue := Q1} = Data0, #{ %% must ensure the async worker is being monitored for %% such requests. IsUnrecoverableError = is_unrecoverable_error(Result), - WorkerPid = self(), + BufferWorkerPid = self(), case is_async_return(Result) of true when IsUnrecoverableError -> - ack_inflight(InflightTID, Ref, WorkerPid); + ack_inflight(InflightTID, Ref, BufferWorkerPid); true -> ok; false -> - ack_inflight(InflightTID, Ref, WorkerPid) + ack_inflight(InflightTID, Ref, BufferWorkerPid) end, - {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result), - store_async_worker_reference(InflightTID, Ref, WorkerMRef), + {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result), + store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef), CurrentCount = queue_count(Q1), ?tp( buffer_worker_flush_ack, @@ -966,9 +966,9 @@ set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := Infli handle_async_worker_down(Data0, Pid) -> #{async_workers := AsyncWorkers0} = Data0, - {WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0), + {AsyncWorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0), Data = Data0#{async_workers := AsyncWorkers}, - mark_inflight_items_as_retriable(Data, WorkerMRef), + mark_inflight_items_as_retriable(Data, AsyncWorkerMRef), {keep_state, Data}. -spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _. @@ -1046,8 +1046,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re min_query => minimize(Query) }, IsRetriable = false, - WorkerMRef = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), + AsyncWorkerMRef = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), Result = Mod:on_query_async(Id, Request, {ReplyFun, [ReplyContext]}, ResSt), {async_return, Result} @@ -1082,8 +1082,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch ), IsRetriable = false, - WorkerMRef = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), + AsyncWorkerMRef = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [ReplyContext]}, ResSt), {async_return, Result} @@ -1111,7 +1111,7 @@ handle_async_reply1( request_ref := Ref, inflight_tid := InflightTID, resource_id := Id, - buffer_worker := WorkerPid, + buffer_worker := BufferWorkerPid, min_query := ?QUERY(_, _, _, ExpireAt) = _Query } = ReplyContext, Result @@ -1123,7 +1123,7 @@ handle_async_reply1( Now = now_(), case is_expired(ExpireAt, Now) of true -> - IsAcked = ack_inflight(InflightTID, Ref, WorkerPid), + IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), %% evalutate metrics call here since we're not inside %% buffer worker IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), @@ -1138,7 +1138,7 @@ do_handle_async_reply( query_opts := QueryOpts, resource_id := Id, request_ref := Ref, - buffer_worker := WorkerPid, + buffer_worker := BufferWorkerPid, inflight_tid := InflightTID, min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query }, @@ -1161,10 +1161,12 @@ do_handle_async_reply( nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ok = ?MODULE:block(WorkerPid), + ok = ?MODULE:block(BufferWorkerPid), blocked; ack -> - ok = do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts) + ok = do_async_ack( + InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts + ) end. handle_async_batch_reply( @@ -1213,10 +1215,10 @@ handle_async_batch_reply2([], _, _, _) -> %% this usually should never happen unless the async callback is being evaluated concurrently ok; handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> - ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight, + ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _AsyncWorkerMRef) = Inflight, #{ resource_id := Id, - buffer_worker := WorkerPid, + buffer_worker := BufferWorkerPid, inflight_tid := InflightTID, request_ref := Ref, min_batch := Batch @@ -1241,7 +1243,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> case RealNotExpired of [] -> %% all expired, no need to update back the inflight batch - _ = ack_inflight(InflightTID, Ref, WorkerPid), + _ = ack_inflight(InflightTID, Ref, BufferWorkerPid), ok; _ -> %% some queries are not expired, put them back to the inflight batch @@ -1252,7 +1254,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> do_handle_async_batch_reply( #{ - buffer_worker := WorkerPid, + buffer_worker := BufferWorkerPid, resource_id := Id, inflight_tid := InflightTID, request_ref := Ref, @@ -1274,14 +1276,16 @@ do_handle_async_batch_reply( nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ok = ?MODULE:block(WorkerPid), + ok = ?MODULE:block(BufferWorkerPid), blocked; ack -> - ok = do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts) + ok = do_async_ack( + InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts + ) end. -do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts) -> - IsKnownRef = ack_inflight(InflightTID, Ref, WorkerPid), +do_async_ack(InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts) -> + IsKnownRef = ack_inflight(InflightTID, Ref, BufferWorkerPid), case maps:get(simple_query, QueryOpts, false) of true -> PostFn(), @@ -1397,7 +1401,7 @@ inflight_new(InfltWinSZ) -> inflight_get_first_retriable(InflightTID, Now) -> MatchSpec = ets:fun2ms( - fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerMRef)) when + fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _AsyncWorkerMRef)) when IsRetriable =:= true -> {Ref, BatchOrQuery} @@ -1442,10 +1446,10 @@ inflight_append(undefined, _InflightItem) -> ok; inflight_append( InflightTID, - ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, WorkerMRef) + ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef) ) -> Batch = mark_as_sent(Batch0), - InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), BatchSize = length(Batch), IsNew andalso inc_inflight(InflightTID, BatchSize), @@ -1454,11 +1458,11 @@ inflight_append( inflight_append( InflightTID, ?INFLIGHT_ITEM( - Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef + Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, AsyncWorkerMRef ) ) -> Query = mark_as_sent(Query0), - InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), IsNew andalso inc_inflight(InflightTID, 1), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), @@ -1481,67 +1485,67 @@ mark_inflight_as_retriable(InflightTID, Ref) -> %% Track each worker pid only once. ensure_async_worker_monitored( - Data0 = #{async_workers := AsyncWorkers}, {async_return, {ok, WorkerPid}} = _Result + Data0 = #{async_workers := AsyncWorkers}, {async_return, {ok, AsyncWorkerPid}} = _Result ) when - is_pid(WorkerPid), is_map_key(WorkerPid, AsyncWorkers) + is_pid(AsyncWorkerPid), is_map_key(AsyncWorkerPid, AsyncWorkers) -> - WorkerMRef = maps:get(WorkerPid, AsyncWorkers), - {Data0, WorkerMRef}; + AsyncWorkerMRef = maps:get(AsyncWorkerPid, AsyncWorkers), + {Data0, AsyncWorkerMRef}; ensure_async_worker_monitored( - Data0 = #{async_workers := AsyncWorkers0}, {async_return, {ok, WorkerPid}} + Data0 = #{async_workers := AsyncWorkers0}, {async_return, {ok, AsyncWorkerPid}} ) when - is_pid(WorkerPid) + is_pid(AsyncWorkerPid) -> - WorkerMRef = monitor(process, WorkerPid), - AsyncWorkers = AsyncWorkers0#{WorkerPid => WorkerMRef}, + AsyncWorkerMRef = monitor(process, AsyncWorkerPid), + AsyncWorkers = AsyncWorkers0#{AsyncWorkerPid => AsyncWorkerMRef}, Data = Data0#{async_workers := AsyncWorkers}, - {Data, WorkerMRef}; + {Data, AsyncWorkerMRef}; ensure_async_worker_monitored(Data0, _Result) -> {Data0, undefined}. -spec store_async_worker_reference(undefined | ets:tid(), inflight_key(), undefined | reference()) -> ok. -store_async_worker_reference(undefined = _InflightTID, _Ref, _WorkerMRef) -> +store_async_worker_reference(undefined = _InflightTID, _Ref, _AsyncWorkerMRef) -> ok; store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) -> ok; -store_async_worker_reference(InflightTID, Ref, WorkerMRef) when - is_reference(WorkerMRef) +store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef) when + is_reference(AsyncWorkerMRef) -> _ = ets:update_element( - InflightTID, Ref, {?WORKER_MREF_IDX, WorkerMRef} + InflightTID, Ref, {?WORKER_MREF_IDX, AsyncWorkerMRef} ), ok. -ack_inflight(undefined, _Ref, _WorkerPid) -> +ack_inflight(undefined, _Ref, _BufferWorkerPid) -> false; -ack_inflight(InflightTID, Ref, WorkerPid) -> +ack_inflight(InflightTID, Ref, BufferWorkerPid) -> {Count, Removed} = case ets:take(InflightTID, Ref) of - [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] -> + [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _AsyncWorkerMRef)] -> {1, true}; - [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] -> + [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef)] -> {length(Batch), true}; [] -> {0, false} end, FlushCheck = dec_inflight_remove(InflightTID, Count, Removed), case FlushCheck of - continue -> ok; - flush -> ?MODULE:flush_worker(WorkerPid) + no_flush -> ok; + flush -> ?MODULE:flush_worker(BufferWorkerPid) end, IsKnownRef = (Count > 0), IsKnownRef. -mark_inflight_items_as_retriable(Data, WorkerMRef) -> +mark_inflight_items_as_retriable(Data, AsyncWorkerMRef) -> #{inflight_tid := InflightTID} = Data, IsRetriable = true, MatchSpec = ets:fun2ms( - fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, _IsRetriable, WorkerMRef0)) when - WorkerMRef =:= WorkerMRef0 + fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, _IsRetriable, AsyncWorkerMRef0)) when + AsyncWorkerMRef =:= AsyncWorkerMRef0 -> - ?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef0) + ?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef0) end ), _NumAffected = ets:select_replace(InflightTID, MatchSpec), @@ -1559,9 +1563,9 @@ inc_inflight(InflightTID, Count) -> ok. -spec dec_inflight_remove(undefined | ets:tid(), non_neg_integer(), Removed :: boolean()) -> - continue | flush. + no_flush | flush. dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) -> - continue; + no_flush; dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) -> NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0), @@ -1570,7 +1574,7 @@ dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) -> %% make it continue flushing. case NewValue =:= MaxValue - 1 of true -> flush; - false -> continue + false -> no_flush end; dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 -> %% If Count > 0, it must have been removed @@ -1582,7 +1586,7 @@ dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 -> %% make it continue flushing. case NewValue =:= MaxValue - 1 of true -> flush; - false -> continue + false -> no_flush end. dec_inflight_update(_InflightTID, _Count = 0) -> From 9c71f4ecbd13591fe1bec0b5f3238ac734c6db2f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 19 May 2023 09:22:54 -0300 Subject: [PATCH 3/4] test: fix flaky test --- apps/emqx_bridge_clickhouse/etc/emqx_bridge_clickhouse.conf | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 apps/emqx_bridge_clickhouse/etc/emqx_bridge_clickhouse.conf diff --git a/apps/emqx_bridge_clickhouse/etc/emqx_bridge_clickhouse.conf b/apps/emqx_bridge_clickhouse/etc/emqx_bridge_clickhouse.conf new file mode 100644 index 000000000..e69de29bb From 0559d6f6396630e3ad03afd6471ceac5dfaec7af Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 19 May 2023 16:39:03 -0300 Subject: [PATCH 4/4] refactor(buffer_worker): use static fn for bumping counters --- .../src/emqx_resource_buffer_worker.erl | 49 +++++++++++-------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 47769418b..35761822d 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -936,27 +936,34 @@ ensure_metrics_flush_timer(Data = #{metrics_tref := undefined, metrics_flush_int -spec bump_counters(id(), counters()) -> ok. bump_counters(Id, Counters) -> - maps:foreach( - fun - (dropped_expired, Val) -> - emqx_resource_metrics:dropped_expired_inc(Id, Val); - (dropped_queue_full, Val) -> - emqx_resource_metrics:dropped_queue_full_inc(Id, Val); - (failed, Val) -> - emqx_resource_metrics:failed_inc(Id, Val); - (retried_failed, Val) -> - emqx_resource_metrics:retried_failed_inc(Id, Val); - (success, Val) -> - emqx_resource_metrics:success_inc(Id, Val); - (retried_success, Val) -> - emqx_resource_metrics:retried_success_inc(Id, Val); - (dropped_resource_not_found, Val) -> - emqx_resource_metrics:dropped_resource_not_found_inc(Id, Val); - (dropped_resource_stopped, Val) -> - emqx_resource_metrics:dropped_resource_stopped_inc(Id, Val) - end, - Counters - ). + Iter = maps:iterator(Counters), + do_bump_counters(Iter, Id). + +do_bump_counters(Iter, Id) -> + case maps:next(Iter) of + {Key, Val, NIter} -> + do_bump_counters1(Key, Val, Id), + do_bump_counters(NIter, Id); + none -> + ok + end. + +do_bump_counters1(dropped_expired, Val, Id) -> + emqx_resource_metrics:dropped_expired_inc(Id, Val); +do_bump_counters1(dropped_queue_full, Val, Id) -> + emqx_resource_metrics:dropped_queue_full_inc(Id, Val); +do_bump_counters1(failed, Val, Id) -> + emqx_resource_metrics:failed_inc(Id, Val); +do_bump_counters1(retried_failed, Val, Id) -> + emqx_resource_metrics:retried_failed_inc(Id, Val); +do_bump_counters1(success, Val, Id) -> + emqx_resource_metrics:success_inc(Id, Val); +do_bump_counters1(retried_success, Val, Id) -> + emqx_resource_metrics:retried_success_inc(Id, Val); +do_bump_counters1(dropped_resource_not_found, Val, Id) -> + emqx_resource_metrics:dropped_resource_not_found_inc(Id, Val); +do_bump_counters1(dropped_resource_stopped, Val, Id) -> + emqx_resource_metrics:dropped_resource_stopped_inc(Id, Val). -spec set_gauges(data()) -> ok. set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := InflightTID}) ->