Merge pull request #9821 from thalesmg/buffer-worker-expiry-v50

feat(buffer_worker): add expiration time to requests
This commit is contained in:
Zaiming (Stone) Shi 2023-01-24 13:54:04 +01:00 committed by GitHub
commit 8fde169abb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1307 additions and 229 deletions

View File

@ -16,19 +16,21 @@
-define(EMPTY_METRICS, -define(EMPTY_METRICS,
?METRICS( ?METRICS(
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
) )
). ).
-define(METRICS( -define(METRICS(
Dropped, Dropped,
DroppedOther, DroppedOther,
DroppedExpired,
DroppedQueueFull, DroppedQueueFull,
DroppedResourceNotFound, DroppedResourceNotFound,
DroppedResourceStopped, DroppedResourceStopped,
Matched, Matched,
Queued, Queued,
Retried, Retried,
LateReply,
SentFailed, SentFailed,
SentInflight, SentInflight,
SentSucc, SentSucc,
@ -40,12 +42,14 @@
#{ #{
'dropped' => Dropped, 'dropped' => Dropped,
'dropped.other' => DroppedOther, 'dropped.other' => DroppedOther,
'dropped.expired' => DroppedExpired,
'dropped.queue_full' => DroppedQueueFull, 'dropped.queue_full' => DroppedQueueFull,
'dropped.resource_not_found' => DroppedResourceNotFound, 'dropped.resource_not_found' => DroppedResourceNotFound,
'dropped.resource_stopped' => DroppedResourceStopped, 'dropped.resource_stopped' => DroppedResourceStopped,
'matched' => Matched, 'matched' => Matched,
'queuing' => Queued, 'queuing' => Queued,
'retried' => Retried, 'retried' => Retried,
'late_reply' => LateReply,
'failed' => SentFailed, 'failed' => SentFailed,
'inflight' => SentInflight, 'inflight' => SentInflight,
'success' => SentSucc, 'success' => SentSucc,
@ -59,12 +63,14 @@
-define(metrics( -define(metrics(
Dropped, Dropped,
DroppedOther, DroppedOther,
DroppedExpired,
DroppedQueueFull, DroppedQueueFull,
DroppedResourceNotFound, DroppedResourceNotFound,
DroppedResourceStopped, DroppedResourceStopped,
Matched, Matched,
Queued, Queued,
Retried, Retried,
LateReply,
SentFailed, SentFailed,
SentInflight, SentInflight,
SentSucc, SentSucc,
@ -76,12 +82,14 @@
#{ #{
'dropped' := Dropped, 'dropped' := Dropped,
'dropped.other' := DroppedOther, 'dropped.other' := DroppedOther,
'dropped.expired' := DroppedExpired,
'dropped.queue_full' := DroppedQueueFull, 'dropped.queue_full' := DroppedQueueFull,
'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped, 'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched, 'matched' := Matched,
'queuing' := Queued, 'queuing' := Queued,
'retried' := Retried, 'retried' := Retried,
'late_reply' := LateReply,
'failed' := SentFailed, 'failed' := SentFailed,
'inflight' := SentInflight, 'inflight' := SentInflight,
'success' := SentSucc, 'success' := SentSucc,

View File

@ -65,7 +65,7 @@ load() ->
fun({Type, NamedConf}) -> fun({Type, NamedConf}) ->
lists:foreach( lists:foreach(
fun({Name, Conf}) -> fun({Name, Conf}) ->
%% fetch opts for `emqx_resource_worker` %% fetch opts for `emqx_resource_buffer_worker`
ResOpts = emqx_resource:fetch_creation_opts(Conf), ResOpts = emqx_resource:fetch_creation_opts(Conf),
safe_load_bridge(Type, Name, Conf, ResOpts) safe_load_bridge(Type, Name, Conf, ResOpts)
end, end,

View File

@ -751,11 +751,11 @@ aggregate_metrics(AllMetrics) ->
fun( fun(
#{ #{
metrics := ?metrics( metrics := ?metrics(
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15 M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
) )
}, },
?metrics( ?metrics(
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15 N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
) )
) -> ) ->
?METRICS( ?METRICS(
@ -773,7 +773,9 @@ aggregate_metrics(AllMetrics) ->
M12 + N12, M12 + N12,
M13 + N13, M13 + N13,
M14 + N14, M14 + N14,
M15 + N15 M15 + N15,
M16 + N16,
M17 + N17
) )
end, end,
InitMetrics, InitMetrics,
@ -805,11 +807,13 @@ format_metrics(#{
counters := #{ counters := #{
'dropped' := Dropped, 'dropped' := Dropped,
'dropped.other' := DroppedOther, 'dropped.other' := DroppedOther,
'dropped.expired' := DroppedExpired,
'dropped.queue_full' := DroppedQueueFull, 'dropped.queue_full' := DroppedQueueFull,
'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped, 'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched, 'matched' := Matched,
'retried' := Retried, 'retried' := Retried,
'late_reply' := LateReply,
'failed' := SentFailed, 'failed' := SentFailed,
'success' := SentSucc, 'success' := SentSucc,
'received' := Rcvd 'received' := Rcvd
@ -824,12 +828,14 @@ format_metrics(#{
?METRICS( ?METRICS(
Dropped, Dropped,
DroppedOther, DroppedOther,
DroppedExpired,
DroppedQueueFull, DroppedQueueFull,
DroppedResourceNotFound, DroppedResourceNotFound,
DroppedResourceStopped, DroppedResourceStopped,
Matched, Matched,
Queued, Queued,
Retried, Retried,
LateReply,
SentFailed, SentFailed,
SentInflight, SentInflight,
SentSucc, SentSucc,

View File

@ -830,7 +830,8 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"worker_pool_size">> => 2, <<"worker_pool_size">> => 2,
<<"query_mode">> => <<"sync">>, <<"query_mode">> => <<"sync">>,
<<"request_timeout">> => <<"500ms">>, %% using a long time so we can test recovery
<<"request_timeout">> => <<"15s">>,
%% to make it check the healthy quickly %% to make it check the healthy quickly
<<"health_check_interval">> => <<"0.5s">> <<"health_check_interval">> => <<"0.5s">>
} }
@ -886,9 +887,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
{ok, SRef} = {ok, SRef} =
snabbkaffe:subscribe( snabbkaffe:subscribe(
fun fun
(#{?snk_kind := resource_worker_retry_inflight_failed}) -> (#{?snk_kind := buffer_worker_retry_inflight_failed}) ->
true; true;
(#{?snk_kind := resource_worker_flush_nack}) -> (#{?snk_kind := buffer_worker_flush_nack}) ->
true; true;
(_) -> (_) ->
false false
@ -898,8 +899,10 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
), ),
Payload1 = <<"hello2">>, Payload1 = <<"hello2">>,
Payload2 = <<"hello3">>, Payload2 = <<"hello3">>,
emqx:publish(emqx_message:make(LocalTopic, Payload1)), %% we need to to it in other processes because it'll block due to
emqx:publish(emqx_message:make(LocalTopic, Payload2)), %% the long timeout
spawn(fun() -> emqx:publish(emqx_message:make(LocalTopic, Payload1)) end),
spawn(fun() -> emqx:publish(emqx_message:make(LocalTopic, Payload2)) end),
{ok, _} = snabbkaffe:receive_events(SRef), {ok, _} = snabbkaffe:receive_events(SRef),
%% verify the metrics of the bridge, the message should be queued %% verify the metrics of the bridge, the message should be queued
@ -917,9 +920,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"matched">> := Matched, <<"matched">> := Matched,
<<"success">> := 1, <<"success">> := 1,
<<"failed">> := 0, <<"failed">> := 0,
<<"queuing">> := 1, <<"queuing">> := Queuing,
<<"inflight">> := 1 <<"inflight">> := Inflight
} when Matched >= 3, } when Matched >= 3 andalso Inflight + Queuing == 2,
maps:get(<<"metrics">>, DecodedMetrics1) maps:get(<<"metrics">>, DecodedMetrics1)
), ),
@ -954,12 +957,11 @@ assert_mqtt_msg_received(Topic, Payload) ->
receive receive
{deliver, Topic, #message{payload = Payload}} -> {deliver, Topic, #message{payload = Payload}} ->
ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]), ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]),
ok; ok
Msg -> after 300 ->
ct:pal("Unexpected Msg: ~p", [Msg]), {messages, Messages} = process_info(self(), messages),
assert_mqtt_msg_received(Topic, Payload) Msg = io_lib:format("timeout waiting for ~p on topic ~p", [Payload, Topic]),
after 100 -> error({Msg, #{messages => Messages}})
ct:fail("timeout waiting for ~p on topic ~p", [Payload, Topic])
end. end.
request(Method, Url, Body) -> request(Method, Url, Body) ->

View File

@ -189,8 +189,8 @@ on_batch_query(
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
St = maps:get(BinKey, Sts), St = maps:get(BinKey, Sts),
case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of
{error, Error} -> {error, _Error} = Result ->
{error, Error}; handle_result(Result);
{_Column, Results} -> {_Column, Results} ->
handle_batch_result(Results, 0) handle_batch_result(Results, 0)
end end
@ -417,6 +417,8 @@ to_bin(Bin) when is_binary(Bin) ->
to_bin(Atom) when is_atom(Atom) -> to_bin(Atom) when is_atom(Atom) ->
erlang:atom_to_binary(Atom). erlang:atom_to_binary(Atom).
handle_result({error, disconnected}) ->
{error, {recoverable_error, disconnected}};
handle_result({error, Error}) -> handle_result({error, Error}) ->
{error, {unrecoverable_error, Error}}; {error, {unrecoverable_error, Error}};
handle_result(Res) -> handle_result(Res) ->

View File

@ -29,6 +29,8 @@
-type query_opts() :: #{ -type query_opts() :: #{
%% The key used for picking a resource worker %% The key used for picking a resource worker
pick_key => term(), pick_key => term(),
timeout => timeout(),
expire_at => infinity | integer(),
async_reply_fun => reply_fun() async_reply_fun => reply_fun()
}. }.
-type resource_data() :: #{ -type resource_data() :: #{

View File

@ -255,7 +255,7 @@ reset_metrics(ResId) ->
query(ResId, Request) -> query(ResId, Request) ->
query(ResId, Request, #{}). query(ResId, Request, #{}).
-spec query(resource_id(), Request :: term(), emqx_resource_worker:query_opts()) -> -spec query(resource_id(), Request :: term(), emqx_resource_buffer_worker:query_opts()) ->
Result :: term(). Result :: term().
query(ResId, Request, Opts) -> query(ResId, Request, Opts) ->
case emqx_resource_manager:ets_lookup(ResId) of case emqx_resource_manager:ets_lookup(ResId) of
@ -263,11 +263,11 @@ query(ResId, Request, Opts) ->
IsBufferSupported = is_buffer_supported(Module), IsBufferSupported = is_buffer_supported(Module),
case {IsBufferSupported, QM} of case {IsBufferSupported, QM} of
{true, _} -> {true, _} ->
emqx_resource_worker:simple_sync_query(ResId, Request); emqx_resource_buffer_worker:simple_sync_query(ResId, Request);
{false, sync} -> {false, sync} ->
emqx_resource_worker:sync_query(ResId, Request, Opts); emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
{false, async} -> {false, async} ->
emqx_resource_worker:async_query(ResId, Request, Opts) emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
end; end;
{error, not_found} -> {error, not_found} ->
?RESOURCE_ERROR(not_found, "resource not found") ?RESOURCE_ERROR(not_found, "resource not found")
@ -275,7 +275,7 @@ query(ResId, Request, Opts) ->
-spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term(). -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term().
simple_sync_query(ResId, Request) -> simple_sync_query(ResId, Request) ->
emqx_resource_worker:simple_sync_query(ResId, Request). emqx_resource_buffer_worker:simple_sync_query(ResId, Request).
-spec start(resource_id()) -> ok | {error, Reason :: term()}. -spec start(resource_id()) -> ok | {error, Reason :: term()}.
start(ResId) -> start(ResId) ->

View File

@ -17,7 +17,7 @@
%% This module implements async message sending, disk message queuing, %% This module implements async message sending, disk message queuing,
%% and message batching using ReplayQ. %% and message batching using ReplayQ.
-module(emqx_resource_worker). -module(emqx_resource_buffer_worker).
-include("emqx_resource.hrl"). -include("emqx_resource.hrl").
-include("emqx_resource_utils.hrl"). -include("emqx_resource_utils.hrl").
@ -60,21 +60,23 @@
-define(COLLECT_REQ_LIMIT, 1000). -define(COLLECT_REQ_LIMIT, 1000).
-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}). -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
-define(QUERY(FROM, REQUEST, SENT), {query, FROM, REQUEST, SENT}). -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
-define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}). -define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}).
-define(EXPAND(RESULT, BATCH), [ -define(EXPAND(RESULT, BATCH), [
?REPLY(FROM, REQUEST, SENT, RESULT) ?REPLY(FROM, REQUEST, SENT, RESULT)
|| ?QUERY(FROM, REQUEST, SENT) <- BATCH || ?QUERY(FROM, REQUEST, SENT, _EXPIRE_AT) <- BATCH
]). ]).
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef), -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef),
{Ref, BatchOrQuery, IsRetriable, WorkerMRef} {Ref, BatchOrQuery, IsRetriable, WorkerMRef}
). ).
-define(ITEM_IDX, 2).
-define(RETRY_IDX, 3). -define(RETRY_IDX, 3).
-define(WORKER_MREF_IDX, 4). -define(WORKER_MREF_IDX, 4).
-type id() :: binary(). -type id() :: binary().
-type index() :: pos_integer(). -type index() :: pos_integer().
-type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean()). -type expire_at() :: infinity | integer().
-type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean(), expire_at()).
-type request() :: term(). -type request() :: term().
-type from() :: pid() | reply_fun() | request_from(). -type from() :: pid() | reply_fun() | request_from().
-type request_from() :: undefined | gen_statem:from(). -type request_from() :: undefined | gen_statem:from().
@ -98,14 +100,18 @@ start_link(Id, Index, Opts) ->
gen_statem:start_link(?MODULE, {Id, Index, Opts}, []). gen_statem:start_link(?MODULE, {Id, Index, Opts}, []).
-spec sync_query(id(), request(), query_opts()) -> Result :: term(). -spec sync_query(id(), request(), query_opts()) -> Result :: term().
sync_query(Id, Request, Opts) -> sync_query(Id, Request, Opts0) ->
Opts1 = ensure_timeout_query_opts(Opts0, sync),
Opts = ensure_expire_at(Opts1),
PickKey = maps:get(pick_key, Opts, self()), PickKey = maps:get(pick_key, Opts, self()),
Timeout = maps:get(timeout, Opts, timer:seconds(15)), Timeout = maps:get(timeout, Opts),
emqx_resource_metrics:matched_inc(Id), emqx_resource_metrics:matched_inc(Id),
pick_call(Id, PickKey, {query, Request, Opts}, Timeout). pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
-spec async_query(id(), request(), query_opts()) -> Result :: term(). -spec async_query(id(), request(), query_opts()) -> Result :: term().
async_query(Id, Request, Opts) -> async_query(Id, Request, Opts0) ->
Opts1 = ensure_timeout_query_opts(Opts0, async),
Opts = ensure_expire_at(Opts1),
PickKey = maps:get(pick_key, Opts, self()), PickKey = maps:get(pick_key, Opts, self()),
emqx_resource_metrics:matched_inc(Id), emqx_resource_metrics:matched_inc(Id),
pick_cast(Id, PickKey, {query, Request, Opts}). pick_cast(Id, PickKey, {query, Request, Opts}).
@ -120,11 +126,15 @@ simple_sync_query(Id, Request) ->
%% would mess up the metrics anyway. `undefined' is ignored by %% would mess up the metrics anyway. `undefined' is ignored by
%% `emqx_resource_metrics:*_shift/3'. %% `emqx_resource_metrics:*_shift/3'.
Index = undefined, Index = undefined,
QueryOpts = #{simple_query => true}, QueryOpts0 = #{simple_query => true, timeout => infinity},
QueryOpts = #{expire_at := ExpireAt} = ensure_expire_at(QueryOpts0),
emqx_resource_metrics:matched_inc(Id), emqx_resource_metrics:matched_inc(Id),
Ref = make_message_ref(), Ref = make_message_ref(),
Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts),
HasBeenSent = false, HasBeenSent = false,
From = self(),
Result = call_query(
sync, Id, Index, Ref, ?QUERY(From, Request, HasBeenSent, ExpireAt), QueryOpts
),
_ = handle_query_result(Id, Result, HasBeenSent), _ = handle_query_result(Id, Result, HasBeenSent),
Result. Result.
@ -176,12 +186,17 @@ init({Id, Index, Opts}) ->
resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval),
tref => undefined tref => undefined
}, },
?tp(resource_worker_init, #{id => Id, index => Index}), ?tp(buffer_worker_init, #{id => Id, index => Index}),
{ok, running, Data}. {ok, running, Data}.
running(enter, _, St) -> running(enter, _, Data) ->
?tp(resource_worker_enter_running, #{}), ?tp(buffer_worker_enter_running, #{}),
maybe_flush(St); %% According to `gen_statem' laws, we mustn't call `maybe_flush'
%% directly because it may decide to return `{next_state, blocked, _}',
%% and that's an invalid response for a state enter call.
%% Returning a next event from a state enter call is also
%% prohibited.
{keep_state, ensure_flush_timer(Data, 0)};
running(cast, resume, _St) -> running(cast, resume, _St) ->
keep_state_and_data; keep_state_and_data;
running(cast, flush, Data) -> running(cast, flush, Data) ->
@ -206,7 +221,7 @@ running(info, Info, _St) ->
keep_state_and_data. keep_state_and_data.
blocked(enter, _, #{resume_interval := ResumeT} = _St) -> blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
?tp(resource_worker_enter_blocked, #{}), ?tp(buffer_worker_enter_blocked, #{}),
{keep_state_and_data, {state_timeout, ResumeT, unblock}}; {keep_state_and_data, {state_timeout, ResumeT, unblock}};
blocked(cast, block, _St) -> blocked(cast, block, _St) ->
keep_state_and_data; keep_state_and_data;
@ -243,9 +258,9 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
%%============================================================================== %%==============================================================================
-define(PICK(ID, KEY, EXPR), -define(PICK(ID, KEY, PID, EXPR),
try gproc_pool:pick_worker(ID, KEY) of try gproc_pool:pick_worker(ID, KEY) of
Pid when is_pid(Pid) -> PID when is_pid(PID) ->
EXPR; EXPR;
_ -> _ ->
?RESOURCE_ERROR(worker_not_created, "resource not created") ?RESOURCE_ERROR(worker_not_created, "resource not created")
@ -258,7 +273,7 @@ code_change(_OldVsn, State, _Extra) ->
). ).
pick_call(Id, Key, Query, Timeout) -> pick_call(Id, Key, Query, Timeout) ->
?PICK(Id, Key, begin ?PICK(Id, Key, Pid, begin
Caller = self(), Caller = self(),
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]), MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
From = {Caller, MRef}, From = {Caller, MRef},
@ -281,15 +296,21 @@ pick_call(Id, Key, Query, Timeout) ->
end). end).
pick_cast(Id, Key, Query) -> pick_cast(Id, Key, Query) ->
?PICK(Id, Key, begin ?PICK(Id, Key, Pid, begin
From = undefined, From = undefined,
erlang:send(Pid, ?SEND_REQ(From, Query)), erlang:send(Pid, ?SEND_REQ(From, Query)),
ok ok
end). end).
resume_from_blocked(Data) -> resume_from_blocked(Data) ->
#{inflight_tid := InflightTID} = Data, ?tp(buffer_worker_resume_from_blocked_enter, #{}),
case inflight_get_first_retriable(InflightTID) of #{
id := Id,
index := Index,
inflight_tid := InflightTID
} = Data,
Now = now_(),
case inflight_get_first_retriable(InflightTID, Now) of
none -> none ->
case is_inflight_full(InflightTID) of case is_inflight_full(InflightTID) of
true -> true ->
@ -297,14 +318,32 @@ resume_from_blocked(Data) ->
false -> false ->
{next_state, running, Data} {next_state, running, Data}
end; end;
{Ref, FirstQuery} -> {expired, Ref, Batch} ->
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
?tp(buffer_worker_retry_expired, #{expired => Batch}),
resume_from_blocked(Data);
{single, Ref, Query} ->
%% We retry msgs in inflight window sync, as if we send them %% We retry msgs in inflight window sync, as if we send them
%% async, they will be appended to the end of inflight window again. %% async, they will be appended to the end of inflight window again.
case is_inflight_full(InflightTID) of case is_inflight_full(InflightTID) of
true -> true ->
{keep_state, Data}; {keep_state, Data};
false -> false ->
retry_inflight_sync(Ref, FirstQuery, Data) retry_inflight_sync(Ref, Query, Data)
end;
{batch, Ref, NotExpired, Expired} ->
update_inflight_item(InflightTID, Ref, NotExpired),
NumExpired = length(Expired),
emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}),
%% We retry msgs in inflight window sync, as if we send them
%% async, they will be appended to the end of inflight window again.
case is_inflight_full(InflightTID) of
true ->
{keep_state, Data};
false ->
retry_inflight_sync(Ref, NotExpired, Data)
end end
end. end.
@ -315,15 +354,15 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
index := Index, index := Index,
resume_interval := ResumeT resume_interval := ResumeT
} = Data0, } = Data0,
?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), ?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
QueryOpts = #{simple_query => false}, QueryOpts = #{simple_query => false},
Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
ReplyResult = ReplyResult =
case QueryOrBatch of case QueryOrBatch of
?QUERY(From, CoreReq, HasBeenSent) -> ?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) ->
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
reply_caller_defer_metrics(Id, Reply, QueryOpts); reply_caller_defer_metrics(Id, Reply, QueryOpts);
[?QUERY(_, _, _) | _] = Batch -> [?QUERY(_, _, _, _) | _] = Batch ->
batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
end, end,
case ReplyResult of case ReplyResult of
@ -331,7 +370,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
{nack, PostFn} -> {nack, PostFn} ->
PostFn(), PostFn(),
?tp( ?tp(
resource_worker_retry_inflight_failed, buffer_worker_retry_inflight_failed,
#{ #{
ref => Ref, ref => Ref,
query_or_batch => QueryOrBatch query_or_batch => QueryOrBatch
@ -349,7 +388,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
%% we bump the counter when removing it from the table. %% we bump the counter when removing it from the table.
IsAcked andalso PostFn(), IsAcked andalso PostFn(),
?tp( ?tp(
resource_worker_retry_inflight_succeeded, buffer_worker_retry_inflight_succeeded,
#{ #{
ref => Ref, ref => Ref,
query_or_batch => QueryOrBatch query_or_batch => QueryOrBatch
@ -378,10 +417,12 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
(?SEND_REQ(undefined = _From, {query, Req, Opts})) -> (?SEND_REQ(undefined = _From, {query, Req, Opts})) ->
ReplyFun = maps:get(async_reply_fun, Opts, undefined), ReplyFun = maps:get(async_reply_fun, Opts, undefined),
HasBeenSent = false, HasBeenSent = false,
?QUERY(ReplyFun, Req, HasBeenSent); ExpireAt = maps:get(expire_at, Opts),
(?SEND_REQ(From, {query, Req, _Opts})) -> ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt);
(?SEND_REQ(From, {query, Req, Opts})) ->
HasBeenSent = false, HasBeenSent = false,
?QUERY(From, Req, HasBeenSent) ExpireAt = maps:get(expire_at, Opts),
?QUERY(From, Req, HasBeenSent, ExpireAt)
end, end,
Requests Requests
), ),
@ -406,6 +447,8 @@ maybe_flush(Data0) ->
-spec flush(data()) -> gen_statem:event_handler_result(state(), data()). -spec flush(data()) -> gen_statem:event_handler_result(state(), data()).
flush(Data0) -> flush(Data0) ->
#{ #{
id := Id,
index := Index,
batch_size := BatchSize, batch_size := BatchSize,
inflight_tid := InflightTID, inflight_tid := InflightTID,
queue := Q0 queue := Q0
@ -415,29 +458,49 @@ flush(Data0) ->
{0, _} -> {0, _} ->
{keep_state, Data1}; {keep_state, Data1};
{_, true} -> {_, true} ->
?tp(resource_worker_flush_but_inflight_full, #{}), ?tp(buffer_worker_flush_but_inflight_full, #{}),
Data2 = ensure_flush_timer(Data1), Data2 = ensure_flush_timer(Data1),
{keep_state, Data2}; {keep_state, Data2};
{_, false} -> {_, false} ->
?tp(buffer_worker_flush_before_pop, #{}),
{Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}), {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
IsBatch = BatchSize =/= 1,
%% We *must* use the new queue, because we currently can't
%% `nack' a `pop'.
%% Maybe we could re-open the queue?
Data2 = Data1#{queue := Q1}, Data2 = Data1#{queue := Q1},
Ref = make_message_ref(), ?tp(buffer_worker_flush_before_sieve_expired, #{}),
do_flush(Data2, #{ Now = now_(),
new_queue => Q1, %% if the request has expired, the caller is no longer
is_batch => IsBatch, %% waiting for a response.
batch => Batch, case sieve_expired_requests(Batch, Now) of
ref => Ref, all_expired ->
ack_ref => QAckRef ok = replayq:ack(Q1, QAckRef),
}) emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp(buffer_worker_flush_all_expired, #{batch => Batch}),
flush(Data2);
{NotExpired, Expired} ->
NumExpired = length(Expired),
emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
IsBatch = BatchSize =/= 1,
%% We *must* use the new queue, because we currently can't
%% `nack' a `pop'.
%% Maybe we could re-open the queue?
?tp(
buffer_worker_flush_potentially_partial,
#{expired => Expired, not_expired => NotExpired}
),
Ref = make_message_ref(),
do_flush(Data2, #{
new_queue => Q1,
is_batch => IsBatch,
batch => NotExpired,
ref => Ref,
ack_ref => QAckRef
})
end
end. end.
-spec do_flush(data(), #{ -spec do_flush(data(), #{
is_batch := boolean(), is_batch := boolean(),
batch := [?QUERY(from(), request(), boolean())], batch := [queue_query()],
ack_ref := replayq:ack_ref(), ack_ref := replayq:ack_ref(),
ref := inflight_key(), ref := inflight_key(),
new_queue := replayq:q() new_queue := replayq:q()
@ -459,7 +522,7 @@ do_flush(
inflight_tid := InflightTID inflight_tid := InflightTID
} = Data0, } = Data0,
%% unwrap when not batching (i.e., batch size == 1) %% unwrap when not batching (i.e., batch size == 1)
[?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch, [?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) = Request] = Batch,
QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
@ -483,7 +546,7 @@ do_flush(
store_async_worker_reference(InflightTID, Ref, WorkerMRef), store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp( ?tp(
resource_worker_flush_nack, buffer_worker_flush_nack,
#{ #{
ref => Ref, ref => Ref,
is_retriable => IsRetriable, is_retriable => IsRetriable,
@ -512,7 +575,7 @@ do_flush(
store_async_worker_reference(InflightTID, Ref, WorkerMRef), store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp( ?tp(
resource_worker_flush_ack, buffer_worker_flush_ack,
#{ #{
batch_or_query => Request, batch_or_query => Request,
result => Result result => Result
@ -560,7 +623,7 @@ do_flush(Data0, #{
store_async_worker_reference(InflightTID, Ref, WorkerMRef), store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp( ?tp(
resource_worker_flush_nack, buffer_worker_flush_nack,
#{ #{
ref => Ref, ref => Ref,
is_retriable => IsRetriable, is_retriable => IsRetriable,
@ -589,7 +652,7 @@ do_flush(Data0, #{
store_async_worker_reference(InflightTID, Ref, WorkerMRef), store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp( ?tp(
resource_worker_flush_ack, buffer_worker_flush_ack,
#{ #{
batch_or_query => Batch, batch_or_query => Batch,
result => Result result => Result
@ -720,45 +783,26 @@ handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) ->
ok ok
end, end,
{nack, PostFn}; {nack, PostFn};
handle_query_result_pure(Id, {error, {unrecoverable_error, Reason}}, HasBeenSent) -> handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
PostFn = fun() -> case is_unrecoverable_error(Error) of
?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), true ->
inc_sent_failed(Id, HasBeenSent), PostFn =
ok fun() ->
end, ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
{ack, PostFn}; inc_sent_failed(Id, HasBeenSent),
handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) -> ok
%% the message will be queued in replayq or inflight window, end,
%% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not {ack, PostFn};
%% sent this message. false ->
PostFn = fun() -> PostFn =
?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}), fun() ->
ok ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
end, ok
{nack, PostFn}; end,
handle_query_result_pure(Id, {error, Reason}, _HasBeenSent) -> {nack, PostFn}
PostFn = fun() -> end;
?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) ->
ok handle_query_async_result_pure(Id, Result, HasBeenSent);
end,
{nack, PostFn};
handle_query_result_pure(Id, {async_return, {error, {unrecoverable_error, Reason}}}, HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
inc_sent_failed(Id, HasBeenSent),
ok
end,
{ack, PostFn};
handle_query_result_pure(Id, {async_return, {error, Msg}}, _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
ok
end,
{nack, PostFn};
handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) ->
{ack, fun() -> ok end};
handle_query_result_pure(_Id, {async_return, {ok, Pid}}, _HasBeenSent) when is_pid(Pid) ->
{ack, fun() -> ok end};
handle_query_result_pure(Id, Result, HasBeenSent) -> handle_query_result_pure(Id, Result, HasBeenSent) ->
PostFn = fun() -> PostFn = fun() ->
assert_ok_result(Result), assert_ok_result(Result),
@ -767,6 +811,28 @@ handle_query_result_pure(Id, Result, HasBeenSent) ->
end, end,
{ack, PostFn}. {ack, PostFn}.
handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
case is_unrecoverable_error(Error) of
true ->
PostFn =
fun() ->
?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
inc_sent_failed(Id, HasBeenSent),
ok
end,
{ack, PostFn};
false ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => async_send_error, reason => Reason}),
ok
end,
{nack, PostFn}
end;
handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) ->
{ack, fun() -> ok end};
handle_query_async_result_pure(_Id, ok, _HasBeenSent) ->
{ack, fun() -> ok end}.
handle_async_worker_down(Data0, Pid) -> handle_async_worker_down(Data0, Pid) ->
#{async_workers := AsyncWorkers0} = Data0, #{async_workers := AsyncWorkers0} = Data0,
{WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0), {WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),
@ -812,10 +878,10 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
end end
). ).
apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) -> apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) ->
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}), ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{ ?tp(call_query_async, #{
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
}), }),
@ -834,13 +900,13 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt
end, end,
Request Request
); );
apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) -> apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, _QueryOpts) ->
?tp(call_batch_query, #{ ?tp(call_batch_query, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
}), }),
Requests = [Request || ?QUERY(_From, Request, _) <- Batch], Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch],
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{ ?tp(call_batch_query_async, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
}), }),
@ -850,7 +916,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt
begin begin
ReplyFun = fun ?MODULE:batch_reply_after_query/8, ReplyFun = fun ?MODULE:batch_reply_after_query/8,
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]}, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]},
Requests = [Request || ?QUERY(_From, Request, _) <- Batch], Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch],
IsRetriable = false, IsRetriable = false,
WorkerMRef = undefined, WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
@ -862,7 +928,41 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt
). ).
reply_after_query( reply_after_query(
Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), QueryOpts, Result Pid,
Id,
Index,
InflightTID,
Ref,
?QUERY(_From, _Request, _HasBeenSent, ExpireAt) = Query,
QueryOpts,
Result
) ->
?tp(
buffer_worker_reply_after_query_enter,
#{batch_or_query => [Query], ref => Ref}
),
Now = now_(),
case is_expired(ExpireAt, Now) of
true ->
IsFullBefore = is_inflight_full(InflightTID),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
IsFullBefore andalso ?MODULE:flush_worker(Pid),
?tp(buffer_worker_reply_after_query_expired, #{expired => [Query]}),
ok;
false ->
do_reply_after_query(Pid, Id, Index, InflightTID, Ref, Query, QueryOpts, Result)
end.
do_reply_after_query(
Pid,
Id,
Index,
InflightTID,
Ref,
?QUERY(From, Request, HasBeenSent, _ExpireAt),
QueryOpts,
Result
) -> ) ->
%% NOTE: 'inflight' is the count of messages that were sent async %% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the %% but received no ACK, NOT the number of messages queued in the
@ -873,18 +973,18 @@ reply_after_query(
case Action of case Action of
nack -> nack ->
%% Keep retrying. %% Keep retrying.
?tp(resource_worker_reply_after_query, #{ ?tp(buffer_worker_reply_after_query, #{
action => Action, action => Action,
batch_or_query => ?QUERY(From, Request, HasBeenSent), batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt),
ref => Ref, ref => Ref,
result => Result result => Result
}), }),
mark_inflight_as_retriable(InflightTID, Ref), mark_inflight_as_retriable(InflightTID, Ref),
?MODULE:block(Pid); ?MODULE:block(Pid);
ack -> ack ->
?tp(resource_worker_reply_after_query, #{ ?tp(buffer_worker_reply_after_query, #{
action => Action, action => Action,
batch_or_query => ?QUERY(From, Request, HasBeenSent), batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt),
ref => Ref, ref => Ref,
result => Result result => Result
}), }),
@ -896,6 +996,34 @@ reply_after_query(
end. end.
batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) -> batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
?tp(
buffer_worker_reply_after_query_enter,
#{batch_or_query => Batch, ref => Ref}
),
Now = now_(),
case sieve_expired_requests(Batch, Now) of
all_expired ->
IsFullBefore = is_inflight_full(InflightTID),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
IsFullBefore andalso ?MODULE:flush_worker(Pid),
?tp(buffer_worker_reply_after_query_expired, #{expired => Batch}),
ok;
{NotExpired, Expired} ->
NumExpired = length(Expired),
emqx_resource_metrics:late_reply_inc(Id, NumExpired),
NumExpired > 0 andalso
?tp(buffer_worker_reply_after_query_expired, #{expired => Expired}),
do_batch_reply_after_query(
Pid, Id, Index, InflightTID, Ref, NotExpired, QueryOpts, Result
)
end.
do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
?tp(
buffer_worker_reply_after_query_enter,
#{batch_or_query => Batch, ref => Ref}
),
%% NOTE: 'inflight' is the count of messages that were sent async %% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the %% but received no ACK, NOT the number of messages queued in the
%% inflight window. %% inflight window.
@ -903,7 +1031,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Resu
case Action of case Action of
nack -> nack ->
%% Keep retrying. %% Keep retrying.
?tp(resource_worker_reply_after_query, #{ ?tp(buffer_worker_reply_after_query, #{
action => nack, action => nack,
batch_or_query => Batch, batch_or_query => Batch,
ref => Ref, ref => Ref,
@ -912,7 +1040,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Resu
mark_inflight_as_retriable(InflightTID, Ref), mark_inflight_as_retriable(InflightTID, Ref),
?MODULE:block(Pid); ?MODULE:block(Pid);
ack -> ack ->
?tp(resource_worker_reply_after_query, #{ ?tp(buffer_worker_reply_after_query, #{
action => ack, action => ack,
batch_or_query => Batch, batch_or_query => Batch,
ref => Ref, ref => Ref,
@ -955,7 +1083,7 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
end, end,
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)),
?tp( ?tp(
resource_worker_appended_to_queue, buffer_worker_appended_to_queue,
#{ #{
id => Id, id => Id,
items => Queries, items => Queries,
@ -973,7 +1101,7 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
inflight_new(InfltWinSZ, Id, Index) -> inflight_new(InfltWinSZ, Id, Index) ->
TableId = ets:new( TableId = ets:new(
emqx_resource_worker_inflight_tab, emqx_resource_buffer_worker_inflight_tab,
[ordered_set, public, {write_concurrency, true}] [ordered_set, public, {write_concurrency, true}]
), ),
inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}, Id, Index), inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}, Id, Index),
@ -986,9 +1114,12 @@ inflight_new(InfltWinSZ, Id, Index) ->
), ),
TableId. TableId.
-spec inflight_get_first_retriable(ets:tid()) -> -spec inflight_get_first_retriable(ets:tid(), integer()) ->
none | {integer(), [?QUERY(_, _, _)] | ?QUERY(_, _, _)}. none
inflight_get_first_retriable(InflightTID) -> | {expired, inflight_key(), [queue_query()]}
| {single, inflight_key(), queue_query()}
| {batch, inflight_key(), _NotExpired :: [queue_query()], _Expired :: [queue_query()]}.
inflight_get_first_retriable(InflightTID, Now) ->
MatchSpec = MatchSpec =
ets:fun2ms( ets:fun2ms(
fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerMRef)) when fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerMRef)) when
@ -1000,8 +1131,22 @@ inflight_get_first_retriable(InflightTID) ->
case ets:select(InflightTID, MatchSpec, _Limit = 1) of case ets:select(InflightTID, MatchSpec, _Limit = 1) of
'$end_of_table' -> '$end_of_table' ->
none; none;
{[{Ref, BatchOrQuery}], _Continuation} -> {[{Ref, Query = ?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} ->
{Ref, BatchOrQuery} case is_expired(ExpireAt, Now) of
true ->
{expired, Ref, [Query]};
false ->
{single, Ref, Query}
end;
{[{Ref, Batch = [_ | _]}], _Continuation} ->
%% batch is non-empty because we check that in
%% `sieve_expired_requests'.
case sieve_expired_requests(Batch, Now) of
all_expired ->
{expired, Ref, Batch};
{NotExpired, Expired} ->
{batch, Ref, NotExpired, Expired}
end
end. end.
is_inflight_full(undefined) -> is_inflight_full(undefined) ->
@ -1030,7 +1175,7 @@ inflight_append(undefined, _InflightItem, _Id, _Index) ->
ok; ok;
inflight_append( inflight_append(
InflightTID, InflightTID,
?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, IsRetriable, WorkerMRef), ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, WorkerMRef),
Id, Id,
Index Index
) -> ) ->
@ -1040,11 +1185,13 @@ inflight_append(
BatchSize = length(Batch), BatchSize = length(Batch),
IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
ok; ok;
inflight_append( inflight_append(
InflightTID, InflightTID,
?INFLIGHT_ITEM(Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, IsRetriable, WorkerMRef), ?INFLIGHT_ITEM(
Ref, ?QUERY(_From, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef
),
Id, Id,
Index Index
) -> ) ->
@ -1053,7 +1200,7 @@ inflight_append(
IsNew = ets:insert_new(InflightTID, InflightItem), IsNew = ets:insert_new(InflightTID, InflightItem),
IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
ok; ok;
inflight_append(InflightTID, {Ref, Data}, _Id, _Index) -> inflight_append(InflightTID, {Ref, Data}, _Id, _Index) ->
ets:insert(InflightTID, {Ref, Data}), ets:insert(InflightTID, {Ref, Data}),
@ -1106,9 +1253,9 @@ ack_inflight(undefined, _Ref, _Id, _Index) ->
ack_inflight(InflightTID, Ref, Id, Index) -> ack_inflight(InflightTID, Ref, Id, Index) ->
Count = Count =
case ets:take(InflightTID, Ref) of case ets:take(InflightTID, Ref) of
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _), _IsRetriable, _WorkerMRef)] -> [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
1; 1;
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] -> [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
length(Batch); length(Batch);
_ -> _ ->
0 0
@ -1130,7 +1277,13 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
end end
), ),
_NumAffected = ets:select_replace(InflightTID, MatchSpec), _NumAffected = ets:select_replace(InflightTID, MatchSpec),
?tp(resource_worker_worker_down_update, #{num_affected => _NumAffected}), ?tp(buffer_worker_worker_down_update, #{num_affected => _NumAffected}),
ok.
%% used to update a batch after dropping expired individual queries.
update_inflight_item(InflightTID, Ref, NewBatch) ->
_ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
?tp(buffer_worker_worker_update_inflight_item, #{ref => Ref}),
ok. ok.
%%============================================================================== %%==============================================================================
@ -1180,11 +1333,14 @@ clear_disk_queue_dir(Id, Index) ->
Res Res
end. end.
ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) -> ensure_flush_timer(Data = #{batch_time := T}) ->
ensure_flush_timer(Data, T).
ensure_flush_timer(Data = #{tref := undefined}, T) ->
Ref = make_ref(), Ref = make_ref(),
TRef = erlang:send_after(T, self(), {flush, Ref}), TRef = erlang:send_after(T, self(), {flush, Ref}),
Data#{tref => {TRef, Ref}}; Data#{tref => {TRef, Ref}};
ensure_flush_timer(Data) -> ensure_flush_timer(Data, _T) ->
Data. Data.
cancel_flush_timer(St = #{tref := undefined}) -> cancel_flush_timer(St = #{tref := undefined}) ->
@ -1195,7 +1351,7 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
-spec make_message_ref() -> inflight_key(). -spec make_message_ref() -> inflight_key().
make_message_ref() -> make_message_ref() ->
erlang:monotonic_time(nanosecond). now_().
collect_requests(Acc, Limit) -> collect_requests(Acc, Limit) ->
Count = length(Acc), Count = length(Acc),
@ -1213,9 +1369,9 @@ do_collect_requests(Acc, Count, Limit) ->
mark_as_sent(Batch) when is_list(Batch) -> mark_as_sent(Batch) when is_list(Batch) ->
lists:map(fun mark_as_sent/1, Batch); lists:map(fun mark_as_sent/1, Batch);
mark_as_sent(?QUERY(From, Req, _)) -> mark_as_sent(?QUERY(From, Req, _HasBeenSent, ExpireAt)) ->
HasBeenSent = true, HasBeenSent = true,
?QUERY(From, Req, HasBeenSent). ?QUERY(From, Req, HasBeenSent, ExpireAt).
is_unrecoverable_error({error, {unrecoverable_error, _}}) -> is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
true; true;
@ -1235,3 +1391,49 @@ is_async_return({async_return, _}) ->
true; true;
is_async_return(_) -> is_async_return(_) ->
false. false.
sieve_expired_requests(Batch, Now) ->
{Expired, NotExpired} =
lists:partition(
fun(?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)) ->
is_expired(ExpireAt, Now)
end,
Batch
),
case {NotExpired, Expired} of
{[], []} ->
%% Should be impossible for batch_size >= 1.
all_expired;
{[], [_ | _]} ->
all_expired;
{[_ | _], _} ->
{NotExpired, Expired}
end.
-spec is_expired(infinity | integer(), integer()) -> boolean().
is_expired(infinity = _ExpireAt, _Now) ->
false;
is_expired(ExpireAt, Now) ->
Now > ExpireAt.
now_() ->
erlang:monotonic_time(nanosecond).
-spec ensure_timeout_query_opts(query_opts(), sync | async) -> query_opts().
ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) ->
Opts;
ensure_timeout_query_opts(#{} = Opts0, sync) ->
TimeoutMS = timer:seconds(15),
Opts0#{timeout => TimeoutMS};
ensure_timeout_query_opts(#{} = Opts0, async) ->
Opts0#{timeout => infinity}.
-spec ensure_expire_at(query_opts()) -> query_opts().
ensure_expire_at(#{expire_at := _} = Opts) ->
Opts;
ensure_expire_at(#{timeout := infinity} = Opts) ->
Opts#{expire_at => infinity};
ensure_expire_at(#{timeout := TimeoutMS} = Opts) ->
TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond),
ExpireAt = now_() + TimeoutNS,
Opts#{expire_at => ExpireAt}.

View File

@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_resource_worker_sup). -module(emqx_resource_buffer_worker_sup).
-behaviour(supervisor). -behaviour(supervisor).
%%%============================================================================= %%%=============================================================================
@ -99,7 +99,7 @@ ensure_worker_added(ResId, Idx) ->
-define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}). -define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}).
ensure_worker_started(ResId, Idx, Opts) -> ensure_worker_started(ResId, Idx, Opts) ->
Mod = emqx_resource_worker, Mod = emqx_resource_buffer_worker,
Spec = #{ Spec = #{
id => ?CHILD_ID(Mod, ResId, Idx), id => ?CHILD_ID(Mod, ResId, Idx),
start => {Mod, start_link, [ResId, Idx, Opts]}, start => {Mod, start_link, [ResId, Idx, Opts]},
@ -116,7 +116,7 @@ ensure_worker_started(ResId, Idx, Opts) ->
end. end.
ensure_worker_removed(ResId, Idx) -> ensure_worker_removed(ResId, Idx) ->
ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx), ChildId = ?CHILD_ID(emqx_resource_buffer_worker, ResId, Idx),
case supervisor:terminate_child(?SERVER, ChildId) of case supervisor:terminate_child(?SERVER, ChildId) of
ok -> ok ->
Res = supervisor:delete_child(?SERVER, ChildId), Res = supervisor:delete_child(?SERVER, ChildId),
@ -129,7 +129,7 @@ ensure_worker_removed(ResId, Idx) ->
end. end.
ensure_disk_queue_dir_absent(ResourceId, Index) -> ensure_disk_queue_dir_absent(ResourceId, Index) ->
ok = emqx_resource_worker:clear_disk_queue_dir(ResourceId, Index), ok = emqx_resource_buffer_worker:clear_disk_queue_dir(ResourceId, Index),
ok. ok.
ensure_worker_pool_removed(ResId) -> ensure_worker_pool_removed(ResId) ->

View File

@ -129,8 +129,10 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
'retried.success', 'retried.success',
'retried.failed', 'retried.failed',
'success', 'success',
'late_reply',
'failed', 'failed',
'dropped', 'dropped',
'dropped.expired',
'dropped.queue_full', 'dropped.queue_full',
'dropped.resource_not_found', 'dropped.resource_not_found',
'dropped.resource_stopped', 'dropped.resource_stopped',
@ -145,7 +147,7 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
%% buffer, so there is no need for resource workers %% buffer, so there is no need for resource workers
ok; ok;
false -> false ->
ok = emqx_resource_worker_sup:start_workers(ResId, Opts), ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts),
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
true -> true ->
wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
@ -468,7 +470,7 @@ retry_actions(Data) ->
handle_remove_event(From, ClearMetrics, Data) -> handle_remove_event(From, ClearMetrics, Data) ->
stop_resource(Data), stop_resource(Data),
ok = emqx_resource_worker_sup:stop_workers(Data#data.id, Data#data.opts), ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
case ClearMetrics of case ClearMetrics of
true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
false -> ok false -> ok
@ -582,9 +584,9 @@ maybe_alarm(_Status, ResId) ->
maybe_resume_resource_workers(connected) -> maybe_resume_resource_workers(connected) ->
lists:foreach( lists:foreach(
fun({_, Pid, _, _}) -> fun({_, Pid, _, _}) ->
emqx_resource_worker:resume(Pid) emqx_resource_buffer_worker:resume(Pid)
end, end,
supervisor:which_children(emqx_resource_worker_sup) supervisor:which_children(emqx_resource_buffer_worker_sup)
); );
maybe_resume_resource_workers(_) -> maybe_resume_resource_workers(_) ->
ok. ok.

View File

@ -34,6 +34,9 @@
dropped_other_inc/1, dropped_other_inc/1,
dropped_other_inc/2, dropped_other_inc/2,
dropped_other_get/1, dropped_other_get/1,
dropped_expired_inc/1,
dropped_expired_inc/2,
dropped_expired_get/1,
dropped_queue_full_inc/1, dropped_queue_full_inc/1,
dropped_queue_full_inc/2, dropped_queue_full_inc/2,
dropped_queue_full_get/1, dropped_queue_full_get/1,
@ -46,6 +49,9 @@
failed_inc/1, failed_inc/1,
failed_inc/2, failed_inc/2,
failed_get/1, failed_get/1,
late_reply_inc/1,
late_reply_inc/2,
late_reply_get/1,
matched_inc/1, matched_inc/1,
matched_inc/2, matched_inc/2,
matched_get/1, matched_get/1,
@ -75,9 +81,11 @@ events() ->
[?TELEMETRY_PREFIX, Event] [?TELEMETRY_PREFIX, Event]
|| Event <- [ || Event <- [
dropped_other, dropped_other,
dropped_expired,
dropped_queue_full, dropped_queue_full,
dropped_resource_not_found, dropped_resource_not_found,
dropped_resource_stopped, dropped_resource_stopped,
late_reply,
failed, failed,
inflight, inflight,
matched, matched,
@ -114,6 +122,9 @@ handle_telemetry_event(
dropped_other -> dropped_other ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val); emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val);
dropped_expired ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.expired', Val);
dropped_queue_full -> dropped_queue_full ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val); emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val);
@ -123,6 +134,8 @@ handle_telemetry_event(
dropped_resource_stopped -> dropped_resource_stopped ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val); emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val);
late_reply ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'late_reply', Val);
failed -> failed ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val); emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val);
matched -> matched ->
@ -211,6 +224,30 @@ dropped_other_inc(ID, Val) ->
dropped_other_get(ID) -> dropped_other_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other'). emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other').
%% @doc Count of messages dropped due to being expired before being sent.
dropped_expired_inc(ID) ->
dropped_expired_inc(ID, 1).
dropped_expired_inc(ID, Val) ->
telemetry:execute([?TELEMETRY_PREFIX, dropped_expired], #{counter_inc => Val}, #{
resource_id => ID
}).
dropped_expired_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.expired').
%% @doc Count of messages that were sent but received a late reply.
late_reply_inc(ID) ->
late_reply_inc(ID, 1).
late_reply_inc(ID, Val) ->
telemetry:execute([?TELEMETRY_PREFIX, late_reply], #{counter_inc => Val}, #{
resource_id => ID
}).
late_reply_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'late_reply').
%% @doc Count of messages dropped because the queue was full %% @doc Count of messages dropped because the queue was full
dropped_queue_full_inc(ID) -> dropped_queue_full_inc(ID) ->
dropped_queue_full_inc(ID, 1). dropped_queue_full_inc(ID, 1).

View File

@ -39,8 +39,8 @@ init([]) ->
modules => [emqx_resource_manager_sup] modules => [emqx_resource_manager_sup]
}, },
WorkerSup = #{ WorkerSup = #{
id => emqx_resource_worker_sup, id => emqx_resource_buffer_worker_sup,
start => {emqx_resource_worker_sup, start_link, []}, start => {emqx_resource_buffer_worker_sup, start_link, []},
restart => permanent, restart => permanent,
shutdown => infinity, shutdown => infinity,
type => supervisor type => supervisor

View File

@ -260,7 +260,7 @@ counter_loop(
?tp(connector_demo_inc_counter_async, #{n => N}), ?tp(connector_demo_inc_counter_async, #{n => N}),
State#{counter => Num + N}; State#{counter => Num + N};
{big_payload, _Payload, ReplyFun} when Status == blocked -> {big_payload, _Payload, ReplyFun} when Status == blocked ->
apply_reply(ReplyFun, {error, blocked}), apply_reply(ReplyFun, {error, {recoverable_error, blocked}}),
State; State;
{{FromPid, ReqRef}, {inc, N}} when Status == running -> {{FromPid, ReqRef}, {inc, N}} when Status == running ->
%ct:pal("sync counter recv: ~p", [{inc, N}]), %ct:pal("sync counter recv: ~p", [{inc, N}]),

File diff suppressed because it is too large Load Diff

View File

@ -95,7 +95,8 @@ fields("config") ->
} }
)} )}
] ++ ] ++
emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields(); (emqx_connector_mysql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields());
fields("creation_opts") -> fields("creation_opts") ->
Opts = emqx_resource_schema:fields("creation_opts"), Opts = emqx_resource_schema:fields("creation_opts"),
[O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];

View File

@ -97,7 +97,8 @@ fields("config") ->
} }
)} )}
] ++ ] ++
emqx_connector_pgsql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields(); (emqx_connector_pgsql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields());
fields("creation_opts") -> fields("creation_opts") ->
Opts = emqx_resource_schema:fields("creation_opts"), Opts = emqx_resource_schema:fields("creation_opts"),
[O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];

View File

@ -350,10 +350,12 @@ service_account_json(PrivateKeyPEM) ->
metrics_mapping() -> metrics_mapping() ->
#{ #{
dropped => fun emqx_resource_metrics:dropped_get/1, dropped => fun emqx_resource_metrics:dropped_get/1,
dropped_expired => fun emqx_resource_metrics:dropped_expired_get/1,
dropped_other => fun emqx_resource_metrics:dropped_other_get/1, dropped_other => fun emqx_resource_metrics:dropped_other_get/1,
dropped_queue_full => fun emqx_resource_metrics:dropped_queue_full_get/1, dropped_queue_full => fun emqx_resource_metrics:dropped_queue_full_get/1,
dropped_resource_not_found => fun emqx_resource_metrics:dropped_resource_not_found_get/1, dropped_resource_not_found => fun emqx_resource_metrics:dropped_resource_not_found_get/1,
dropped_resource_stopped => fun emqx_resource_metrics:dropped_resource_stopped_get/1, dropped_resource_stopped => fun emqx_resource_metrics:dropped_resource_stopped_get/1,
late_reply => fun emqx_resource_metrics:late_reply_get/1,
failed => fun emqx_resource_metrics:failed_get/1, failed => fun emqx_resource_metrics:failed_get/1,
inflight => fun emqx_resource_metrics:inflight_get/1, inflight => fun emqx_resource_metrics:inflight_get/1,
matched => fun emqx_resource_metrics:matched_get/1, matched => fun emqx_resource_metrics:matched_get/1,
@ -1117,9 +1119,6 @@ do_econnrefused_or_timeout_test(Config, Error) ->
CurrentMetrics CurrentMetrics
); );
{timeout, async} -> {timeout, async} ->
wait_telemetry_event(TelemetryTable, success, ResourceId, #{
timeout => 10_000, n_events => 2
}),
wait_until_gauge_is(inflight, 0, _Timeout = 400), wait_until_gauge_is(inflight, 0, _Timeout = 400),
wait_until_gauge_is(queuing, 0, _Timeout = 400), wait_until_gauge_is(queuing, 0, _Timeout = 400),
assert_metrics( assert_metrics(
@ -1130,13 +1129,12 @@ do_econnrefused_or_timeout_test(Config, Error) ->
matched => 2, matched => 2,
queuing => 0, queuing => 0,
retried => 0, retried => 0,
success => 2 success => 0,
late_reply => 2
}, },
ResourceId ResourceId
); );
{_, sync} -> {_, sync} ->
%% even waiting, hard to avoid flakiness... simpler to just sleep
%% a bit until stabilization.
wait_until_gauge_is(queuing, 0, 500), wait_until_gauge_is(queuing, 0, 500),
wait_until_gauge_is(inflight, 1, 500), wait_until_gauge_is(inflight, 1, 500),
assert_metrics( assert_metrics(
@ -1336,12 +1334,19 @@ t_unrecoverable_error(Config) ->
), ),
wait_until_gauge_is(queuing, 0, _Timeout = 400), wait_until_gauge_is(queuing, 0, _Timeout = 400),
wait_until_gauge_is(inflight, 1, _Timeout = 400), %% TODO: once temporary clause in
%% `emqx_resource_buffer_worker:is_unrecoverable_error'
%% that marks all unknown errors as unrecoverable is
%% removed, this inflight should be 1, because we retry if
%% the worker is killed.
wait_until_gauge_is(inflight, 0, _Timeout = 400),
assert_metrics( assert_metrics(
#{ #{
dropped => 0, dropped => 0,
failed => 0, %% FIXME: see comment above; failed should be 0
inflight => 1, %% and inflight should be 1.
failed => 1,
inflight => 0,
matched => 1, matched => 1,
queuing => 0, queuing => 0,
retried => 0, retried => 0,

View File

@ -277,6 +277,7 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" precision = ns\n" " precision = ns\n"
" write_syntax = \"~s\"\n" " write_syntax = \"~s\"\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 1s\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" }\n" " }\n"
@ -313,6 +314,7 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" precision = ns\n" " precision = ns\n"
" write_syntax = \"~s\"\n" " write_syntax = \"~s\"\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 1s\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" }\n" " }\n"
@ -906,25 +908,48 @@ t_write_failure(Config) ->
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
case QueryMode of case QueryMode of
sync -> sync ->
?assertError(timeout, send_message(Config, SentData)); {_, {ok, _}} =
?wait_async_action(
try
send_message(Config, SentData)
catch
error:timeout ->
{error, timeout}
end,
#{?snk_kind := buffer_worker_flush_nack},
1_000
);
async -> async ->
?assertEqual(ok, send_message(Config, SentData)) ?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := buffer_worker_reply_after_query},
1_000
)
end end
end), end),
fun(Trace0) -> fun(Trace0) ->
case QueryMode of case QueryMode of
sync -> sync ->
Trace = ?of_kind(resource_worker_flush_nack, Trace0), Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
?assertMatch([_ | _], Trace), ?assertMatch([_ | _], Trace),
[#{result := Result} | _] = Trace, [#{result := Result} | _] = Trace,
?assert( ?assert(
{error, {error, {closed, "The connection was lost."}}} =:= Result orelse {error, {error, {closed, "The connection was lost."}}} =:= Result orelse
{error, {error, closed}} =:= Result orelse {error, {error, closed}} =:= Result orelse
{error, {error, econnrefused}} =:= Result, {error, {recoverable_error, {error, econnrefused}}} =:= Result,
#{got => Result} #{got => Result}
); );
async -> async ->
ok Trace = ?of_kind(buffer_worker_reply_after_query, Trace0),
?assertMatch([#{action := nack} | _], Trace),
[#{result := Result} | _] = Trace,
?assert(
{error, {recoverable_error, {closed, "The connection was lost."}}} =:=
Result orelse
{error, {error, closed}} =:= Result orelse
{error, {recoverable_error, econnrefused}} =:= Result,
#{got => Result}
)
end, end,
ok ok
end end

View File

@ -228,7 +228,7 @@ query_resource(Config, Request) ->
Name = ?config(mysql_name, Config), Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config), BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request). emqx_resource:query(ResourceID, Request, #{timeout => 500}).
unprepare(Config, Key) -> unprepare(Config, Key) ->
Name = ?config(mysql_name, Config), Name = ?config(mysql_name, Config),
@ -413,7 +413,7 @@ t_write_failure(Config) ->
end), end),
fun(Trace0) -> fun(Trace0) ->
ct:pal("trace: ~p", [Trace0]), ct:pal("trace: ~p", [Trace0]),
Trace = ?of_kind(resource_worker_flush_nack, Trace0), Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
?assertMatch([#{result := {error, _}} | _], Trace), ?assertMatch([#{result := {error, _}} | _], Trace),
[#{result := {error, Error}} | _] = Trace, [#{result := {error, Error}} | _] = Trace,
case Error of case Error of

View File

@ -249,7 +249,7 @@ query_resource(Config, Request) ->
Name = ?config(pgsql_name, Config), Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config), BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request). emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
connect_direct_pgsql(Config) -> connect_direct_pgsql(Config) ->
Opts = #{ Opts = #{
@ -422,22 +422,32 @@ t_write_failure(Config) ->
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
?check_trace( ?check_trace(
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
case QueryMode of {_, {ok, _}} =
sync -> ?wait_async_action(
?assertError(timeout, send_message(Config, SentData)); case QueryMode of
async -> sync ->
send_message(Config, SentData) try
end send_message(Config, SentData)
catch
error:timeout ->
{error, timeout}
end;
async ->
send_message(Config, SentData)
end,
#{?snk_kind := buffer_worker_flush_nack},
1_000
)
end), end),
fun(Trace0) -> fun(Trace0) ->
ct:pal("trace: ~p", [Trace0]), ct:pal("trace: ~p", [Trace0]),
Trace = ?of_kind(resource_worker_flush_nack, Trace0), Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
?assertMatch([#{result := {error, _}} | _], Trace), ?assertMatch([#{result := {error, _}} | _], Trace),
[#{result := {error, Error}} | _] = Trace, [#{result := {error, Error}} | _] = Trace,
case Error of case Error of
{resource_error, _} -> {resource_error, _} ->
ok; ok;
disconnected -> {recoverable_error, disconnected} ->
ok; ok;
_ -> _ ->
ct:fail("unexpected error: ~p", [Error]) ct:fail("unexpected error: ~p", [Error])

View File

@ -58,7 +58,6 @@
%% emqx_resource API %% emqx_resource API
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% TODO: check
is_buffer_supported() -> false. is_buffer_supported() -> false.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.

View File

@ -26,6 +26,7 @@
on_batch_query_async/4, on_batch_query_async/4,
on_get_status/2 on_get_status/2
]). ]).
-export([reply_callback/2]).
-export([ -export([
namespace/0, namespace/0,
@ -353,7 +354,12 @@ do_query(InstId, Client, Points) ->
connector => InstId, connector => InstId,
reason => Reason reason => Reason
}), }),
Err case is_unrecoverable_error(Err) of
true ->
{error, {unrecoverable_error, Reason}};
false ->
{error, {recoverable_error, Reason}}
end
end. end.
do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
@ -362,7 +368,20 @@ do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
connector => InstId, connector => InstId,
points => Points points => Points
}), }),
{ok, _WorkerPid} = influxdb:write_async(Client, Points, ReplyFunAndArgs). WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
{ok, _WorkerPid} = influxdb:write_async(Client, Points, WrappedReplyFunAndArgs).
reply_callback(ReplyFunAndArgs, {error, Reason} = Error) ->
case is_unrecoverable_error(Error) of
true ->
Result = {error, {unrecoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
false ->
Result = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
end;
reply_callback(ReplyFunAndArgs, Result) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Tags & Fields Config Trans %% Tags & Fields Config Trans
@ -583,6 +602,17 @@ str(B) when is_binary(B) ->
str(S) when is_list(S) -> str(S) when is_list(S) ->
S. S.
is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
true;
is_unrecoverable_error({error, {recoverable_error, _}}) ->
false;
is_unrecoverable_error({error, {error, econnrefused}}) ->
false;
is_unrecoverable_error({error, econnrefused}) ->
false;
is_unrecoverable_error(_) ->
false.
%%=================================================================== %%===================================================================
%% eunit tests %% eunit tests
%%=================================================================== %%===================================================================