From dcf70e0e68348d24cecdbd30b66637c206c32d14 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Thu, 16 Feb 2023 14:10:42 +0100 Subject: [PATCH 1/3] refactor(emqx_resource): add more trace points for flushing --- .../src/emqx_resource_buffer_worker.erl | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index b25725c41..bb4eee57d 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -468,7 +468,10 @@ flush(Data0) -> queue := Q0 } = Data0, Data1 = cancel_flush_timer(Data0), - case {queue_count(Q0), is_inflight_full(InflightTID)} of + CurrentCount = queue_count(Q0), + IsFull = is_inflight_full(InflightTID), + ?tp(buffer_worker_flush, #{queue_count => CurrentCount, is_full => IsFull}), + case {CurrentCount, IsFull} of {0, _} -> {keep_state, Data1}; {_, true} -> @@ -595,8 +598,12 @@ do_flush( result => Result } ), - case queue_count(Q1) > 0 of + CurrentCount = queue_count(Q1), + case CurrentCount > 0 of true -> + ?tp(buffer_worker_flush_ack_reflush, #{ + batch_or_query => Request, result => Result, queue_count => CurrentCount + }), flush_worker(self()); false -> ok @@ -666,19 +673,26 @@ do_flush(Data0, #{ {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + CurrentCount = queue_count(Q1), ?tp( buffer_worker_flush_ack, #{ batch_or_query => Batch, - result => Result + result => Result, + queue_count => CurrentCount } ), - CurrentCount = queue_count(Q1), Data2 = case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> Data1; {true, true} -> + ?tp(buffer_worker_flush_ack_reflush, #{ + batch_or_query => Batch, + result => Result, + queue_count => CurrentCount, + batch_size => BatchSize + }), flush_worker(self()), Data1; {true, false} -> From 2442a4dea7d04eba36ec75f112dd419427fae6e6 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Thu, 16 Feb 2023 14:14:29 +0100 Subject: [PATCH 2/3] test(emqx_resource): add regression test for recursive flushing --- .../test/emqx_resource_SUITE.erl | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index a042bfb67..984b3b04a 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2291,6 +2291,67 @@ t_expiration_retry_batch_multiple_times(_Config) -> ), ok. +t_recursive_flush(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 1, + batch_time => 10_000, + worker_pool_size => 1 + } + ), + do_t_recursive_flush(). + +t_recursive_flush_batch(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 2, + batch_time => 10_000, + worker_pool_size => 1 + } + ), + do_t_recursive_flush(). + +do_t_recursive_flush() -> + ?check_trace( + begin + Timeout = 1_000, + Pid = spawn_link(fun S() -> + emqx_resource:query(?ID, {inc_counter, 1}), + S() + end), + %% we want two reflushes to happen before we analyze the + %% trace, so that we get a single full interaction + {ok, _} = snabbkaffe:block_until( + ?match_n_events(2, #{?snk_kind := buffer_worker_flush_ack_reflush}), Timeout + ), + unlink(Pid), + exit(Pid, kill), + ok + end, + fun(Trace) -> + %% check that a recursive flush leads to a new call to flush/1 + Pairs = ?find_pairs( + #{?snk_kind := buffer_worker_flush_ack_reflush}, + #{?snk_kind := buffer_worker_flush}, + Trace + ), + ?assert(lists:any(fun(E) -> E end, [true || {pair, _, _} <- Pairs])) + end + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ From 056bc71af26d486b220cb5780151e1d8ecef3f76 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Thu, 16 Feb 2023 15:05:38 +0100 Subject: [PATCH 3/3] chore: bump VSN version --- apps/emqx_resource/src/emqx_resource.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 4618e94a6..cb26c7f09 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [