Merge pull request #8992 from terry-xiaoyu/fast_resume_from_inflight_full

Fast resume from inflight full
This commit is contained in:
Xinyu Liu 2022-09-18 17:49:04 +08:00 committed by GitHub
commit cc327629c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 135 additions and 102 deletions

View File

@ -570,9 +570,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"success">> := 3, <<"success">> := 3,
<<"failed">> := 0, <<"failed">> := 0,
<<"queuing">> := 0, <<"queuing">> := 0,
<<"retried">> := R <<"retried">> := _
} }
} when R > 0, },
jsx:decode(BridgeStr2) jsx:decode(BridgeStr2)
), ),
%% also verify the 2 messages have been sent to the remote broker %% also verify the 2 messages have been sent to the remote broker

View File

@ -131,6 +131,8 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
[ [
'matched', 'matched',
'retried', 'retried',
'retried.success',
'retried.failed',
'success', 'success',
'failed', 'failed',
'dropped', 'dropped',

View File

@ -56,9 +56,12 @@
-define(Q_ITEM(REQUEST), {q_item, REQUEST}). -define(Q_ITEM(REQUEST), {q_item, REQUEST}).
-define(QUERY(FROM, REQUEST), {query, FROM, REQUEST}). -define(QUERY(FROM, REQUEST, SENT), {query, FROM, REQUEST, SENT}).
-define(REPLY(FROM, REQUEST, RESULT), {reply, FROM, REQUEST, RESULT}). -define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}).
-define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]). -define(EXPAND(RESULT, BATCH), [
?REPLY(FROM, REQUEST, SENT, RESULT)
|| ?QUERY(FROM, REQUEST, SENT) <- BATCH
]).
-type id() :: binary(). -type id() :: binary().
-type query() :: {query, from(), request()}. -type query() :: {query, from(), request()}.
@ -89,16 +92,16 @@ async_query(Id, Request, Opts) ->
%% simple query the resource without batching and queuing messages. %% simple query the resource without batching and queuing messages.
-spec simple_sync_query(id(), request()) -> Result :: term(). -spec simple_sync_query(id(), request()) -> Result :: term().
simple_sync_query(Id, Request) -> simple_sync_query(Id, Request) ->
Result = call_query(sync, Id, ?QUERY(self(), Request), #{}), Result = call_query(sync, Id, ?QUERY(self(), Request, false), #{}),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
_ = handle_query_result(Id, Result, false), _ = handle_query_result(Id, Result, false, false),
Result. Result.
-spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
simple_async_query(Id, Request, ReplyFun) -> simple_async_query(Id, Request, ReplyFun) ->
Result = call_query(async, Id, ?QUERY(ReplyFun, Request), #{}), Result = call_query(async, Id, ?QUERY(ReplyFun, Request, false), #{}),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
_ = handle_query_result(Id, Result, false), _ = handle_query_result(Id, Result, false, false),
Result. Result.
-spec block(pid() | atom()) -> ok. -spec block(pid() | atom()) -> ok.
@ -132,13 +135,13 @@ init({Id, Index, Opts}) ->
undefined undefined
end, end,
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)),
ok = inflight_new(Name), InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
ok = inflight_new(Name, InfltWinSZ),
HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
St = #{ St = #{
id => Id, id => Id,
index => Index, index => Index,
name => Name, name => Name,
async_inflight_window => maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
enable_batch => maps:get(enable_batch, Opts, false), enable_batch => maps:get(enable_batch, Opts, false),
batch_size => BatchSize, batch_size => BatchSize,
batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
@ -156,7 +159,7 @@ running(cast, resume, _St) ->
keep_state_and_data; keep_state_and_data;
running(cast, block, St) -> running(cast, block, St) ->
{next_state, blocked, St}; {next_state, blocked, St};
running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when running(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
is_list(Batch) is_list(Batch)
-> ->
Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
@ -178,7 +181,7 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
{keep_state_and_data, {state_timeout, ResumeT, resume}}; {keep_state_and_data, {state_timeout, ResumeT, resume}};
blocked(cast, block, _St) -> blocked(cast, block, _St) ->
keep_state_and_data; keep_state_and_data;
blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when blocked(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
is_list(Batch) is_list(Batch)
-> ->
Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
@ -189,13 +192,15 @@ blocked(state_timeout, resume, St) ->
do_resume(St); do_resume(St);
blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) -> blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) ->
Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
_ = reply_caller(Id, ?REPLY(From, Request, Error)), _ = reply_caller(Id, ?REPLY(From, Request, false, Error)),
{keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request))])}}; {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request, false))])}};
blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) -> blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) ->
ReplayFun = maps:get(async_reply_fun, Opts, undefined), ReplayFun = maps:get(async_reply_fun, Opts, undefined),
Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
_ = reply_caller(Id, ?REPLY(ReplayFun, Request, Error)), _ = reply_caller(Id, ?REPLY(ReplayFun, Request, false, Error)),
{keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}. {keep_state, St#{
queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplayFun, Request, false))])
}}.
terminate(_Reason, #{id := Id, index := Index}) -> terminate(_Reason, #{id := Id, index := Index}) ->
gproc_pool:disconnect_worker(Id, {Id, Index}). gproc_pool:disconnect_worker(Id, {Id, Index}).
@ -250,10 +255,11 @@ retry_first_from_queue(Q, Id, St) ->
retry_first_sync(Id, FirstQuery, undefined, undefined, Q, St) retry_first_sync(Id, FirstQuery, undefined, undefined, Q, St)
end. end.
retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = St0) -> retry_first_sync(
emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), Id, ?QUERY(_, _, HasSent) = Query, Name, Ref, Q, #{resume_interval := ResumeT} = St0
Result = call_query(sync, Id, FirstQuery, #{}), ) ->
case handle_query_result(Id, Result, false) of Result = call_query(sync, Id, Query, #{}),
case handle_query_result(Id, Result, HasSent, false) of
%% Send failed because resource down %% Send failed because resource down
true -> true ->
{keep_state, St0, {state_timeout, ResumeT, resume}}; {keep_state, St0, {state_timeout, ResumeT, resume}};
@ -279,7 +285,7 @@ drop_head(Id, Q) ->
Q1. Q1.
query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) ->
Acc1 = [?QUERY(From, Request) | Acc], Acc1 = [?QUERY(From, Request, false) | Acc],
emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'),
St = St0#{acc := Acc1, acc_left := Left - 1}, St = St0#{acc := Acc1, acc_left := Left - 1},
case Left =< 1 of case Left =< 1 of
@ -288,13 +294,12 @@ query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left
end; end;
query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) -> query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) ->
QueryOpts = #{ QueryOpts = #{
inflight_name => maps:get(name, St), inflight_name => maps:get(name, St)
inflight_window => maps:get(async_inflight_window, St)
}, },
Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts), Result = call_query(configured, Id, ?QUERY(From, Request, false), QueryOpts),
case reply_caller(Id, ?REPLY(From, Request, Result)) of case reply_caller(Id, ?REPLY(From, Request, false, Result)) of
true -> true ->
Query = ?QUERY(From, Request), Query = ?QUERY(From, Request, 1),
{next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}}; {next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}};
false -> false ->
{keep_state, St} {keep_state, St}
@ -312,8 +317,7 @@ flush(
) -> ) ->
Batch = lists:reverse(Batch0), Batch = lists:reverse(Batch0),
QueryOpts = #{ QueryOpts = #{
inflight_name => maps:get(name, St), inflight_name => maps:get(name, St)
inflight_window => maps:get(async_inflight_window, St)
}, },
emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)), emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)),
Result = call_query(configured, Id, Batch, QueryOpts), Result = call_query(configured, Id, Batch, QueryOpts),
@ -363,63 +367,65 @@ batch_reply_caller(Id, BatchResult, Batch) ->
reply_caller(Id, Reply) -> reply_caller(Id, Reply) ->
reply_caller(Id, Reply, false). reply_caller(Id, Reply, false).
reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) -> reply_caller(Id, ?REPLY(undefined, _, HasSent, Result), BlockWorker) ->
handle_query_result(Id, Result, BlockWorker); handle_query_result(Id, Result, HasSent, BlockWorker);
reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) -> reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasSent, Result), BlockWorker) when
is_function(ReplyFun)
->
_ = _ =
case Result of case Result of
{async_return, _} -> no_reply_for_now; {async_return, _} -> no_reply_for_now;
_ -> apply(ReplyFun, Args ++ [Result]) _ -> apply(ReplyFun, Args ++ [Result])
end, end,
handle_query_result(Id, Result, BlockWorker); handle_query_result(Id, Result, HasSent, BlockWorker);
reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) -> reply_caller(Id, ?REPLY(From, _, HasSent, Result), BlockWorker) ->
gen_statem:reply(From, Result), gen_statem:reply(From, Result),
handle_query_result(Id, Result, BlockWorker). handle_query_result(Id, Result, HasSent, BlockWorker).
handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), BlockWorker) -> handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), HasSent, BlockWorker) ->
?SLOG(error, #{msg => resource_exception, info => Msg}), ?SLOG(error, #{msg => resource_exception, info => Msg}),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), inc_sent_failed(Id, HasSent),
BlockWorker; BlockWorker;
handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasSent, _) when
NotWorking == not_connected; NotWorking == blocked NotWorking == not_connected; NotWorking == blocked
-> ->
true; true;
handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), BlockWorker) -> handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) ->
?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'),
BlockWorker; BlockWorker;
handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), BlockWorker) -> handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) ->
?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'),
BlockWorker; BlockWorker;
handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) -> handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) ->
?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
BlockWorker; BlockWorker;
handle_query_result(Id, {error, {recoverable_error, Reason}}, _BlockWorker) -> handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) ->
%% the message will be queued in replayq or inflight window, %% the message will be queued in replayq or inflight window,
%% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not
%% sent this message. %% sent this message.
?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}), ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}),
true; true;
handle_query_result(Id, {error, Reason}, BlockWorker) -> handle_query_result(Id, {error, Reason}, HasSent, BlockWorker) ->
?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), inc_sent_failed(Id, HasSent),
BlockWorker; BlockWorker;
handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> handle_query_result(_Id, {async_return, inflight_full}, _HasSent, _BlockWorker) ->
true; true;
handle_query_result(Id, {async_return, {error, Msg}}, BlockWorker) -> handle_query_result(Id, {async_return, {error, Msg}}, HasSent, BlockWorker) ->
?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), inc_sent_failed(Id, HasSent),
BlockWorker; BlockWorker;
handle_query_result(_Id, {async_return, ok}, BlockWorker) -> handle_query_result(_Id, {async_return, ok}, _HasSent, BlockWorker) ->
BlockWorker; BlockWorker;
handle_query_result(Id, Result, BlockWorker) -> handle_query_result(Id, Result, HasSent, BlockWorker) ->
assert_ok_result(Result), assert_ok_result(Result),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'), inc_sent_success(Id, HasSent),
BlockWorker. BlockWorker.
call_query(QM0, Id, Query, QueryOpts) -> call_query(QM0, Id, Query, QueryOpts) ->
@ -458,18 +464,16 @@ call_query(QM0, Id, Query, QueryOpts) ->
end end
). ).
apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> apply_query_fun(sync, Mod, Id, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) ->
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined), Name = maps:get(inflight_name, QueryOpts, undefined),
WinSize = maps:get(inflight_window, QueryOpts, undefined),
?APPLY_RESOURCE( ?APPLY_RESOURCE(
call_query_async, call_query_async,
case inflight_is_full(Name, WinSize) of case inflight_is_full(Name) of
true -> true ->
?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight'), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight'),
@ -482,19 +486,17 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
end, end,
Request Request
); );
apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> apply_query_fun(sync, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) ->
?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Requests = [Request || ?QUERY(_From, Request) <- Batch], Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined), Name = maps:get(inflight_name, QueryOpts, undefined),
WinSize = maps:get(inflight_window, QueryOpts, undefined),
?APPLY_RESOURCE( ?APPLY_RESOURCE(
call_batch_query_async, call_batch_query_async,
case inflight_is_full(Name, WinSize) of case inflight_is_full(Name) of
true -> true ->
?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
BatchLen = length(Batch), BatchLen = length(Batch),
@ -502,7 +504,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
ReplyFun = fun ?MODULE:batch_reply_after_query/6, ReplyFun = fun ?MODULE:batch_reply_after_query/6,
Ref = make_message_ref(), Ref = make_message_ref(),
Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
Requests = [Request || ?QUERY(_From, Request) <- Batch], Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
ok = inflight_append(Name, Ref, Batch), ok = inflight_append(Name, Ref, Batch),
Result = Mod:on_batch_query_async(Id, Requests, Args, ResSt), Result = Mod:on_batch_query_async(Id, Requests, Args, ResSt),
{async_return, Result} {async_return, Result}
@ -510,18 +512,18 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
Batch Batch
). ).
reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) -> reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request, HasSent), Result) ->
%% NOTE: 'inflight' is message count that sent async but no ACK received, %% NOTE: 'inflight' is message count that sent async but no ACK received,
%% NOT the message number ququed in the inflight window. %% NOT the message number ququed in the inflight window.
emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -1), emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -1),
case reply_caller(Id, ?REPLY(From, Request, Result)) of case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of
true -> true ->
%% we marked these messages are 'queuing' although they are actually %% we marked these messages are 'queuing' although they are actually
%% keeped in inflight window, not replayq %% keeped in inflight window, not replayq
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
?MODULE:block(Pid); ?MODULE:block(Pid);
false -> false ->
inflight_drop(Name, Ref) drop_inflight_and_resume(Pid, Name, Ref)
end. end.
batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
@ -535,39 +537,60 @@ batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
%% keeped in inflight window, not replayq %% keeped in inflight window, not replayq
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen),
?MODULE:block(Pid); ?MODULE:block(Pid);
false ->
drop_inflight_and_resume(Pid, Name, Ref)
end.
drop_inflight_and_resume(Pid, Name, Ref) ->
case inflight_is_full(Name) of
true ->
inflight_drop(Name, Ref),
?MODULE:resume(Pid);
false -> false ->
inflight_drop(Name, Ref) inflight_drop(Name, Ref)
end. end.
%%============================================================================== %%==============================================================================
%% the inflight queue for async query %% the inflight queue for async query
-define(SIZE_REF, -1).
inflight_new(Name) -> inflight_new(Name, InfltWinSZ) ->
_ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]), _ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]),
inflight_append(Name, ?SIZE_REF, {size, InfltWinSZ}),
ok. ok.
inflight_get_first(Name) -> inflight_get_first(Name) ->
case ets:first(Name) of case ets:next(Name, ?SIZE_REF) of
'$end_of_table' -> '$end_of_table' ->
empty; empty;
Ref -> Ref ->
case ets:lookup(Name, Ref) of case ets:lookup(Name, Ref) of
[Object] -> Object; [Object] ->
[] -> inflight_get_first(Name) Object;
[] ->
%% it might have been dropped
inflight_get_first(Name)
end end
end. end.
inflight_is_full(undefined, _) -> inflight_is_full(undefined) ->
false; false;
inflight_is_full(Name, MaxSize) -> inflight_is_full(Name) ->
[{_, {size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF),
case ets:info(Name, size) of case ets:info(Name, size) of
Size when Size >= MaxSize -> true; Size when Size > MaxSize -> true;
_ -> false _ -> false
end. end.
inflight_append(undefined, _Ref, _Query) -> inflight_append(undefined, _Ref, _Query) ->
ok; ok;
inflight_append(Name, Ref, Query) -> inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch) ->
ets:insert(Name, {Ref, Query}), ets:insert(Name, {Ref, [?QUERY(From, Req, true) || ?QUERY(From, Req, _) <- Batch]}),
ok;
inflight_append(Name, Ref, ?QUERY(From, Req, _)) ->
ets:insert(Name, {Ref, ?QUERY(From, Req, true)}),
ok;
inflight_append(Name, Ref, Data) ->
ets:insert(Name, {Ref, Data}),
ok. ok.
inflight_drop(undefined, _) -> inflight_drop(undefined, _) ->
@ -577,6 +600,21 @@ inflight_drop(Name, Ref) ->
ok. ok.
%%============================================================================== %%==============================================================================
inc_sent_failed(Id, true) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.failed');
inc_sent_failed(Id, _HasSent) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed').
inc_sent_success(Id, true) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.success');
inc_sent_success(Id, _HasSent) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'success').
call_mode(sync, _) -> sync; call_mode(sync, _) -> sync;
call_mode(async, always_sync) -> sync; call_mode(async, always_sync) -> sync;
call_mode(async, async_if_possible) -> async. call_mode(async, async_if_possible) -> async.

View File

@ -170,9 +170,11 @@ counter_loop(#{counter := Num, status := Status} = State) ->
ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]), ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]),
State#{status => running}; State#{status => running};
{inc, N, ReplyFun} when Status == running -> {inc, N, ReplyFun} when Status == running ->
%ct:pal("async counter recv: ~p", [{inc, N}]),
apply_reply(ReplyFun, ok), apply_reply(ReplyFun, ok),
State#{counter => Num + N}; State#{counter => Num + N};
{{FromPid, ReqRef}, {inc, N}} when Status == running -> {{FromPid, ReqRef}, {inc, N}} when Status == running ->
%ct:pal("sync counter recv: ~p", [{inc, N}]),
FromPid ! {ReqRef, ok}, FromPid ! {ReqRef, ok},
State#{counter => Num + N}; State#{counter => Num + N};
{{FromPid, ReqRef}, {inc, _N}} when Status == blocked -> {{FromPid, ReqRef}, {inc, _N}} when Status == blocked ->

View File

@ -211,7 +211,7 @@ t_batch_query_counter(_) ->
?DEFAULT_RESOURCE_GROUP, ?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource, register => true}, #{name => test_resource, register => true},
#{enable_batch => true} #{enable_batch => true, query_mode => sync}
), ),
?check_trace( ?check_trace(
@ -220,7 +220,7 @@ t_batch_query_counter(_) ->
fun(Result, Trace) -> fun(Result, Trace) ->
?assertMatch({ok, 0}, Result), ?assertMatch({ok, 0}, Result),
QueryTrace = ?of_kind(call_batch_query, Trace), QueryTrace = ?of_kind(call_batch_query, Trace),
?assertMatch([#{batch := [{query, _, get_counter}]}], QueryTrace) ?assertMatch([#{batch := [{query, _, get_counter, _}]}], QueryTrace)
end end
), ),
@ -242,7 +242,7 @@ t_query_counter_async_query(_) ->
?DEFAULT_RESOURCE_GROUP, ?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource, register => true}, #{name => test_resource, register => true},
#{query_mode => async} #{query_mode => async, enable_batch => false}
), ),
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
?check_trace( ?check_trace(
@ -251,7 +251,7 @@ t_query_counter_async_query(_) ->
fun(Trace) -> fun(Trace) ->
%% the callback_mode if 'emqx_connector_demo' is 'always_sync'. %% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
QueryTrace = ?of_kind(call_query, Trace), QueryTrace = ?of_kind(call_query, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
end end
), ),
%% wait for 1s to make sure all the aysnc query is sent to the resource. %% wait for 1s to make sure all the aysnc query is sent to the resource.
@ -264,7 +264,7 @@ t_query_counter_async_query(_) ->
?assertMatch({ok, 1000}, Result), ?assertMatch({ok, 1000}, Result),
%% the callback_mode if 'emqx_connector_demo' is 'always_sync'. %% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
QueryTrace = ?of_kind(call_query, Trace), QueryTrace = ?of_kind(call_query, Trace),
?assertMatch([#{query := {query, _, get_counter}}], QueryTrace) ?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace)
end end
), ),
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
@ -284,7 +284,7 @@ t_query_counter_async_callback(_) ->
?DEFAULT_RESOURCE_GROUP, ?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource, register => true}, #{name => test_resource, register => true},
#{query_mode => async, async_inflight_window => 1000000} #{query_mode => async, enable_batch => false, async_inflight_window => 1000000}
), ),
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
?check_trace( ?check_trace(
@ -292,7 +292,7 @@ t_query_counter_async_callback(_) ->
inc_counter_in_parallel(1000, ReqOpts), inc_counter_in_parallel(1000, ReqOpts),
fun(Trace) -> fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace), QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
end end
), ),
@ -305,7 +305,7 @@ t_query_counter_async_callback(_) ->
fun(Result, Trace) -> fun(Result, Trace) ->
?assertMatch({ok, 1000}, Result), ?assertMatch({ok, 1000}, Result),
QueryTrace = ?of_kind(call_query, Trace), QueryTrace = ?of_kind(call_query, Trace),
?assertMatch([#{query := {query, _, get_counter}}], QueryTrace) ?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace)
end end
), ),
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
@ -338,9 +338,11 @@ t_query_counter_async_inflight(_) ->
#{name => test_resource, register => true}, #{name => test_resource, register => true},
#{ #{
query_mode => async, query_mode => async,
enable_batch => false,
async_inflight_window => WindowSize, async_inflight_window => WindowSize,
worker_pool_size => 1, worker_pool_size => 1,
resume_interval => 300 resume_interval => 300,
enable_queue => false
} }
), ),
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
@ -354,11 +356,11 @@ t_query_counter_async_inflight(_) ->
inc_counter_in_parallel(WindowSize, ReqOpts), inc_counter_in_parallel(WindowSize, ReqOpts),
fun(Trace) -> fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace), QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
end end
), ),
%% this will block the resource_worker %% this will block the resource_worker as the inflight window is full now
ok = emqx_resource:query(?ID, {inc_counter, 1}), ok = emqx_resource:query(?ID, {inc_counter, 1}),
?assertMatch(0, ets:info(Tab0, size)), ?assertMatch(0, ets:info(Tab0, size)),
%% sleep to make the resource_worker resume some times %% sleep to make the resource_worker resume some times
@ -386,7 +388,7 @@ t_query_counter_async_inflight(_) ->
inc_counter_in_parallel(Num, ReqOpts), inc_counter_in_parallel(Num, ReqOpts),
fun(Trace) -> fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace), QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
end end
), ),
timer:sleep(1000), timer:sleep(1000),
@ -400,7 +402,7 @@ t_query_counter_async_inflight(_) ->
inc_counter_in_parallel(WindowSize, ReqOpts), inc_counter_in_parallel(WindowSize, ReqOpts),
fun(Trace) -> fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace), QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
end end
), ),
@ -414,13 +416,13 @@ t_query_counter_async_inflight(_) ->
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]), ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
?assert(Sent == Counter), ?assert(Sent =< Counter),
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
ct:pal("metrics: ~p", [C]), ct:pal("metrics: ~p", [C]),
?assertMatch( ?assertMatch(
#{matched := M, success := Ss, dropped := D} when #{matched := M, success := Ss, dropped := Dp, 'retried.success' := Rs} when
M == Ss + D, M == Ss + Dp - Rs,
C C
), ),
?assert( ?assert(
@ -683,17 +685,6 @@ inc_counter_in_parallel(N, Opts) ->
|| Pid <- Pids || Pid <- Pids
]. ].
% verify_inflight_full(WindowSize) ->
% ?check_trace(
% ?TRACE_OPTS,
% emqx_resource:query(?ID, {inc_counter, 1}),
% fun(Return, Trace) ->
% QueryTrace = ?of_kind(inflight_full, Trace),
% ?assertMatch([#{wind_size := WindowSize} | _], QueryTrace),
% ?assertMatch(ok, Return)
% end
% ).
bin_config() -> bin_config() ->
<<"\"name\": \"test_resource\"">>. <<"\"name\": \"test_resource\"">>.