refactor(resource): save inflight size into the ETS table

This commit is contained in:
Shawn 2022-09-16 16:52:08 +08:00
parent 7242ffd713
commit 8307f04c2e
2 changed files with 23 additions and 34 deletions

View File

@ -132,13 +132,13 @@ init({Id, Index, Opts}) ->
undefined
end,
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)),
ok = inflight_new(Name),
InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
ok = inflight_new(Name, InfltWinSZ),
HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
St = #{
id => Id,
index => Index,
name => Name,
async_inflight_window => maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
enable_batch => maps:get(enable_batch, Opts, false),
batch_size => BatchSize,
batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
@ -288,8 +288,7 @@ query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left
end;
query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) ->
QueryOpts = #{
inflight_name => maps:get(name, St),
inflight_window => maps:get(async_inflight_window, St)
inflight_name => maps:get(name, St)
},
Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts),
case reply_caller(Id, ?REPLY(From, Request, Result)) of
@ -312,8 +311,7 @@ flush(
) ->
Batch = lists:reverse(Batch0),
QueryOpts = #{
inflight_name => maps:get(name, St),
inflight_window => maps:get(async_inflight_window, St)
inflight_name => maps:get(name, St)
},
emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)),
Result = call_query(configured, Id, Batch, QueryOpts),
@ -464,12 +462,10 @@ apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -
apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined),
WinSize = maps:get(inflight_window, QueryOpts, undefined),
?APPLY_RESOURCE(
call_query_async,
case inflight_is_full(Name, WinSize) of
case inflight_is_full(Name) of
true ->
?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
{async_return, inflight_full};
false ->
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight'),
@ -489,12 +485,10 @@ apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined),
WinSize = maps:get(inflight_window, QueryOpts, undefined),
?APPLY_RESOURCE(
call_batch_query_async,
case inflight_is_full(Name, WinSize) of
case inflight_is_full(Name) of
true ->
?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
{async_return, inflight_full};
false ->
BatchLen = length(Batch),
@ -540,27 +534,32 @@ batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
end.
%%==============================================================================
%% the inflight queue for async query
inflight_new(Name) ->
-define(SIZE_REF, -1).
inflight_new(Name, InfltWinSZ) ->
_ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]),
inflight_append(Name, ?SIZE_REF, {size, InfltWinSZ}),
ok.
inflight_get_first(Name) ->
case ets:first(Name) of
case ets:next(Name, ?SIZE_REF) of
'$end_of_table' ->
empty;
Ref ->
case ets:lookup(Name, Ref) of
[Object] -> Object;
[] -> inflight_get_first(Name)
[Object] ->
Object;
[] ->
%% it might have been dropped
inflight_get_first(Name)
end
end.
inflight_is_full(undefined, _) ->
inflight_is_full(undefined) ->
false;
inflight_is_full(Name, MaxSize) ->
inflight_is_full(Name) ->
[{_, {size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF),
case ets:info(Name, size) of
Size when Size >= MaxSize -> true;
Size when Size > MaxSize -> true;
_ -> false
end.

View File

@ -242,7 +242,7 @@ t_query_counter_async_query(_) ->
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true},
#{query_mode => async}
#{query_mode => async, enable_batch => false}
),
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
?check_trace(
@ -284,7 +284,7 @@ t_query_counter_async_callback(_) ->
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true},
#{query_mode => async, async_inflight_window => 1000000}
#{query_mode => async, enable_batch => false, async_inflight_window => 1000000}
),
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
?check_trace(
@ -338,6 +338,7 @@ t_query_counter_async_inflight(_) ->
#{name => test_resource, register => true},
#{
query_mode => async,
enable_batch => false,
async_inflight_window => WindowSize,
worker_pool_size => 1,
resume_interval => 300
@ -358,7 +359,7 @@ t_query_counter_async_inflight(_) ->
end
),
%% this will block the resource_worker
%% this will block the resource_worker as the inflight windown is full now
ok = emqx_resource:query(?ID, {inc_counter, 1}),
?assertMatch(0, ets:info(Tab0, size)),
%% sleep to make the resource_worker resume some times
@ -683,17 +684,6 @@ inc_counter_in_parallel(N, Opts) ->
|| Pid <- Pids
].
% verify_inflight_full(WindowSize) ->
% ?check_trace(
% ?TRACE_OPTS,
% emqx_resource:query(?ID, {inc_counter, 1}),
% fun(Return, Trace) ->
% QueryTrace = ?of_kind(inflight_full, Trace),
% ?assertMatch([#{wind_size := WindowSize} | _], QueryTrace),
% ?assertMatch(ok, Return)
% end
% ).
bin_config() ->
<<"\"name\": \"test_resource\"">>.