From 8307f04c2efe145f48fc6df9ac7734a4a5bc73d5 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 16 Sep 2022 16:52:08 +0800 Subject: [PATCH] refactor(resource): save inflight size into the ETS table --- .../src/emqx_resource_worker.erl | 39 +++++++++---------- .../test/emqx_resource_SUITE.erl | 18 ++------- 2 files changed, 23 insertions(+), 34 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 1f6c4f599..25032c29c 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -132,13 +132,13 @@ init({Id, Index, Opts}) -> undefined end, emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)), - ok = inflight_new(Name), + InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), + ok = inflight_new(Name, InfltWinSZ), HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), St = #{ id => Id, index => Index, name => Name, - async_inflight_window => maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), enable_batch => maps:get(enable_batch, Opts, false), batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), @@ -288,8 +288,7 @@ query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left end; query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) -> QueryOpts = #{ - inflight_name => maps:get(name, St), - inflight_window => maps:get(async_inflight_window, St) + inflight_name => maps:get(name, St) }, Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts), case reply_caller(Id, ?REPLY(From, Request, Result)) of @@ -312,8 +311,7 @@ flush( ) -> Batch = lists:reverse(Batch0), QueryOpts = #{ - inflight_name => maps:get(name, St), - inflight_window => maps:get(async_inflight_window, St) + inflight_name => maps:get(name, St) }, emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)), Result = call_query(configured, Id, Batch, QueryOpts), @@ -464,12 +462,10 @@ apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) - apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), - WinSize = maps:get(inflight_window, QueryOpts, undefined), ?APPLY_RESOURCE( call_query_async, - case inflight_is_full(Name, WinSize) of + case inflight_is_full(Name) of true -> - ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight'), @@ -489,12 +485,10 @@ apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), - WinSize = maps:get(inflight_window, QueryOpts, undefined), ?APPLY_RESOURCE( call_batch_query_async, - case inflight_is_full(Name, WinSize) of + case inflight_is_full(Name) of true -> - ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> BatchLen = length(Batch), @@ -540,27 +534,32 @@ batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> end. %%============================================================================== %% the inflight queue for async query - -inflight_new(Name) -> +-define(SIZE_REF, -1). +inflight_new(Name, InfltWinSZ) -> _ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]), + inflight_append(Name, ?SIZE_REF, {size, InfltWinSZ}), ok. inflight_get_first(Name) -> - case ets:first(Name) of + case ets:next(Name, ?SIZE_REF) of '$end_of_table' -> empty; Ref -> case ets:lookup(Name, Ref) of - [Object] -> Object; - [] -> inflight_get_first(Name) + [Object] -> + Object; + [] -> + %% it might have been dropped + inflight_get_first(Name) end end. -inflight_is_full(undefined, _) -> +inflight_is_full(undefined) -> false; -inflight_is_full(Name, MaxSize) -> +inflight_is_full(Name) -> + [{_, {size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF), case ets:info(Name, size) of - Size when Size >= MaxSize -> true; + Size when Size > MaxSize -> true; _ -> false end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 3f42850ad..a9d274168 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -242,7 +242,7 @@ t_query_counter_async_query(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, - #{query_mode => async} + #{query_mode => async, enable_batch => false} ), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), ?check_trace( @@ -284,7 +284,7 @@ t_query_counter_async_callback(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, - #{query_mode => async, async_inflight_window => 1000000} + #{query_mode => async, enable_batch => false, async_inflight_window => 1000000} ), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), ?check_trace( @@ -338,6 +338,7 @@ t_query_counter_async_inflight(_) -> #{name => test_resource, register => true}, #{ query_mode => async, + enable_batch => false, async_inflight_window => WindowSize, worker_pool_size => 1, resume_interval => 300 @@ -358,7 +359,7 @@ t_query_counter_async_inflight(_) -> end ), - %% this will block the resource_worker + %% this will block the resource_worker as the inflight windown is full now ok = emqx_resource:query(?ID, {inc_counter, 1}), ?assertMatch(0, ets:info(Tab0, size)), %% sleep to make the resource_worker resume some times @@ -683,17 +684,6 @@ inc_counter_in_parallel(N, Opts) -> || Pid <- Pids ]. -% verify_inflight_full(WindowSize) -> -% ?check_trace( -% ?TRACE_OPTS, -% emqx_resource:query(?ID, {inc_counter, 1}), -% fun(Return, Trace) -> -% QueryTrace = ?of_kind(inflight_full, Trace), -% ?assertMatch([#{wind_size := WindowSize} | _], QueryTrace), -% ?assertMatch(ok, Return) -% end -% ). - bin_config() -> <<"\"name\": \"test_resource\"">>.