emqx/apps/emqx_resource/src/emqx_resource_buffer_worker...

2109 lines
78 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% This module implements async message sending, disk message queuing,
%% and message batching using ReplayQ.
-module(emqx_resource_buffer_worker).
-include("emqx_resource.hrl").
-include("emqx_resource_errors.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(gen_statem).
-export([
start_link/3,
sync_query/3,
async_query/3,
block/1,
resume/1,
flush_worker/1
]).
-export([
simple_sync_query/2,
simple_sync_query/3,
simple_async_query/3,
simple_sync_internal_buffer_query/3
]).
-export([
callback_mode/0,
init/1,
terminate/2,
code_change/3
]).
-export([running/3, blocked/3]).
-export([queue_item_marshaller/1, estimate_size/1]).
-export([
handle_async_reply/2, handle_async_batch_reply/2, reply_call/2, reply_call_internal_buffer/3
]).
-export([clear_disk_queue_dir/2]).
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
-define(COLLECT_REQ_LIMIT, 1000).
-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
-define(SIMPLE_QUERY(FROM, REQUEST), ?QUERY(FROM, REQUEST, false, infinity)).
-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef),
{Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef}
).
-define(ITEM_IDX, 2).
-define(RETRY_IDX, 3).
-define(WORKER_MREF_IDX, 4).
-type id() :: binary().
-type index() :: pos_integer().
-type expire_at() :: infinity | integer().
-type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()).
-type request() :: term().
-type request_from() :: undefined | gen_statem:from().
-type timeout_ms() :: emqx_schema:timeout_duration_ms().
-type request_ttl() :: emqx_schema:timeout_duration_ms().
-type health_check_interval() :: pos_integer().
-type state() :: blocked | running.
-type inflight_key() :: integer().
-type counters() :: #{
dropped_expired => non_neg_integer(),
dropped_queue_full => non_neg_integer(),
dropped_resource_not_found => non_neg_integer(),
dropped_resource_stopped => non_neg_integer(),
success => non_neg_integer(),
failed => non_neg_integer(),
retried_success => non_neg_integer(),
retried_failed => non_neg_integer()
}.
-type inflight_table() :: ets:tid() | atom() | reference().
-type data() :: #{
id := id(),
index := index(),
inflight_tid := inflight_table(),
async_workers := #{pid() => reference()},
batch_size := pos_integer(),
batch_time := timeout_ms(),
counters := counters(),
metrics_flush_interval := timeout_ms(),
queue := replayq:q(),
resume_interval := timeout_ms(),
tref := undefined | {reference(), reference()},
metrics_tref := undefined | {reference(), reference()}
}.
callback_mode() -> [state_functions, state_enter].
start_link(Id, Index, Opts) ->
gen_statem:start_link(?MODULE, {Id, Index, Opts}, []).
-spec sync_query(id(), request(), query_opts()) -> Result :: term().
sync_query(Id, Request, Opts0) ->
?tp(sync_query, #{id => Id, request => Request, query_opts => Opts0}),
Opts1 = ensure_timeout_query_opts(Opts0, sync),
Opts = ensure_expire_at(Opts1),
PickKey = maps:get(pick_key, Opts, self()),
Timeout = maps:get(timeout, Opts),
emqx_resource_metrics:matched_inc(Id),
pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
-spec async_query(id(), request(), query_opts()) -> Result :: term().
async_query(Id, Request, Opts0) ->
?tp(async_query, #{id => Id, request => Request, query_opts => Opts0}),
Opts1 = ensure_timeout_query_opts(Opts0, async),
Opts = ensure_expire_at(Opts1),
PickKey = maps:get(pick_key, Opts, self()),
emqx_resource_metrics:matched_inc(Id),
pick_cast(Id, PickKey, {query, Request, Opts}).
%% simple query the resource without batching and queuing.
-spec simple_sync_query(id(), request()) -> term().
simple_sync_query(Id, Request) ->
simple_sync_query(Id, Request, #{}).
-spec simple_sync_query(id(), request(), query_opts()) -> term().
simple_sync_query(Id, Request, QueryOpts0) ->
%% Note: since calling this function implies in bypassing the
%% buffer workers, and each buffer worker index is used when
%% collecting gauge metrics, we use this dummy index. If this
%% call ends up calling buffering functions, that's a bug and
%% would mess up the metrics anyway. `undefined' is ignored by
%% `emqx_resource_metrics:*_shift/3'.
?tp(simple_sync_query, #{id => Id, request => Request}),
Index = undefined,
QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
emqx_resource_metrics:matched_inc(Id),
Ref = make_request_ref(),
ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts),
_ = handle_query_result(Id, Result, _HasBeenSent = false),
Result.
%% simple async-query the resource without batching and queuing.
-spec simple_async_query(id(), request(), query_opts()) -> term().
simple_async_query(Id, Request, QueryOpts0) ->
?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
Index = undefined,
QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
emqx_resource_metrics:matched_inc(Id),
Ref = make_request_ref(),
ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
Result = call_query(
async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts
),
_ = handle_query_result(Id, Result, _HasBeenSent = false),
Result.
%% This is a hack to handle cases where the underlying connector has internal buffering
%% (e.g.: Kafka and Pulsar producers). Since the message may be inernally retried at a
%% later time, we can't bump metrics immediatelly if the return value is not a success
%% (e.g.: if the call timed out, but the message was enqueued nevertheless).
-spec simple_sync_internal_buffer_query(id(), request(), query_opts()) -> term().
simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
?tp(simple_sync_internal_buffer_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
ReplyAlias = alias([reply]),
try
MaybeReplyTo = maps:get(reply_to, QueryOpts0, undefined),
QueryOpts1 = QueryOpts0#{
reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]}
},
QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
case simple_async_query(Id, Request, QueryOpts) of
{error, _} = Error ->
Error;
{async_return, {error, _} = Error} ->
Error;
{async_return, {ok, _Pid}} ->
receive
{ReplyAlias, Response} ->
Response
after Timeout ->
_ = unalias(ReplyAlias),
receive
{ReplyAlias, Response} ->
Response
after 0 -> {error, timeout}
end
end
end
after
_ = unalias(ReplyAlias)
end.
simple_query_opts() ->
ensure_expire_at(#{simple_query => true, timeout => infinity}).
-spec block(pid()) -> ok.
block(ServerRef) ->
gen_statem:cast(ServerRef, block).
-spec resume(pid()) -> ok.
resume(ServerRef) ->
gen_statem:cast(ServerRef, resume).
-spec flush_worker(pid()) -> ok.
flush_worker(ServerRef) ->
gen_statem:cast(ServerRef, flush).
-spec init({id(), pos_integer(), map()}) -> gen_statem:init_result(state(), data()).
init({Id, Index, Opts}) ->
process_flag(trap_exit, true),
true = gproc_pool:connect_worker(Id, {Id, Index}),
BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
QueueOpts = replayq_opts(Id, Index, Opts),
Queue = replayq:open(QueueOpts),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
emqx_resource_metrics:inflight_set(Id, Index, 0),
InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT),
InflightTID = inflight_new(InflightWinSize),
HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
RequestTTL = maps:get(request_ttl, Opts, ?DEFAULT_REQUEST_TTL),
BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
BatchTime = adjust_batch_time(Id, RequestTTL, BatchTime0),
DefaultResumeInterval = default_resume_interval(RequestTTL, HealthCheckInterval),
ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval),
MetricsFlushInterval = maps:get(metrics_flush_interval, Opts, ?DEFAULT_METRICS_FLUSH_INTERVAL),
Data0 = #{
id => Id,
index => Index,
inflight_tid => InflightTID,
async_workers => #{},
batch_size => BatchSize,
batch_time => BatchTime,
counters => #{},
metrics_flush_interval => MetricsFlushInterval,
queue => Queue,
resume_interval => ResumeInterval,
tref => undefined,
metrics_tref => undefined
},
Data = ensure_metrics_flush_timer(Data0),
?tp(buffer_worker_init, #{id => Id, index => Index, queue_opts => QueueOpts}),
{ok, running, Data}.
running(enter, _, #{tref := _Tref} = Data) ->
?tp(buffer_worker_enter_running, #{id => maps:get(id, Data), tref => _Tref}),
%% According to `gen_statem' laws, we mustn't call `maybe_flush'
%% directly because it may decide to return `{next_state, blocked, _}',
%% and that's an invalid response for a state enter call.
%% Returning a next event from a state enter call is also
%% prohibited.
{keep_state, ensure_flush_timer(Data, 0)};
running(cast, resume, _St) ->
keep_state_and_data;
running(cast, flush, Data) ->
flush(Data);
running(cast, block, St) ->
{next_state, blocked, St};
running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) ->
handle_query_requests(Request0, Data);
running(info, {flush, Ref}, Data = #{tref := {_TRef, Ref}}) ->
flush(Data#{tref := undefined});
running(info, {flush, _Ref}, _Data) ->
?tp(discarded_stale_flush, #{}),
keep_state_and_data;
running(info, {flush_metrics, Ref}, Data0 = #{metrics_tref := {_TRef, Ref}}) ->
Data = flush_metrics(Data0#{metrics_tref := undefined}),
{keep_state, Data};
running(info, {flush_metrics, _Ref}, _Data) ->
keep_state_and_data;
running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
is_map_key(Pid, AsyncWorkers0)
->
?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}),
handle_async_worker_down(Data0, Pid);
running(info, Info, _St) ->
?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}),
keep_state_and_data.
blocked(enter, _, #{resume_interval := ResumeT} = St0) ->
?tp(buffer_worker_enter_blocked, #{buffer_worker => self()}),
%% discard the old timer, new timer will be started when entering running state again
St = cancel_flush_timer(St0),
{keep_state, St, {state_timeout, ResumeT, unblock}};
blocked(cast, block, _St) ->
keep_state_and_data;
blocked(cast, resume, St) ->
resume_from_blocked(St);
blocked(cast, flush, St) ->
resume_from_blocked(St);
blocked(state_timeout, unblock, St) ->
resume_from_blocked(St);
blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) ->
Data = collect_and_enqueue_query_requests(Request0, Data0),
{keep_state, Data};
blocked(info, {flush, _Ref}, _Data) ->
%% ignore stale timer
keep_state_and_data;
blocked(info, {flush_metrics, Ref}, Data0 = #{metrics_tref := {_TRef, Ref}}) ->
Data = flush_metrics(Data0#{metrics_tref := undefined}),
{keep_state, Data};
blocked(info, {flush_metrics, _Ref}, _Data) ->
keep_state_and_data;
blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
is_map_key(Pid, AsyncWorkers0)
->
?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}),
handle_async_worker_down(Data0, Pid);
blocked(info, Info, _Data) ->
?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}),
keep_state_and_data.
terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
_ = replayq:close(Q),
emqx_resource_metrics:inflight_set(Id, Index, 0),
%% since we want volatile queues, this will be 0 after
%% termination.
emqx_resource_metrics:queuing_set(Id, Index, 0),
gproc_pool:disconnect_worker(Id, {Id, Index}),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%==============================================================================
-define(PICK(ID, KEY, PID, EXPR),
try
case gproc_pool:pick_worker(ID, KEY) of
PID when is_pid(PID) ->
EXPR;
_ ->
?RESOURCE_ERROR(worker_not_created, "resource not created")
end
catch
error:badarg ->
?RESOURCE_ERROR(worker_not_created, "resource not created");
error:timeout ->
?RESOURCE_ERROR(timeout, "call resource timeout")
end
).
pick_call(Id, Key, Query = {_, _, QueryOpts}, Timeout) ->
?PICK(Id, Key, Pid, begin
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
ReplyTo = {fun ?MODULE:reply_call/2, [MRef]},
erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
receive
{MRef, Response} ->
erlang:demonitor(MRef, [flush]),
maybe_reply_to(Response, QueryOpts);
{'DOWN', MRef, process, Pid, Reason} ->
error({worker_down, Reason})
after Timeout ->
erlang:demonitor(MRef, [flush]),
receive
{MRef, Response} ->
maybe_reply_to(Response, QueryOpts)
after 0 ->
error(timeout)
end
end
end).
pick_cast(Id, Key, Query = {query, _Request, QueryOpts}) ->
?PICK(Id, Key, Pid, begin
ReplyTo = maps:get(reply_to, QueryOpts, undefined),
erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
ok
end).
resume_from_blocked(Data) ->
?tp(buffer_worker_resume_from_blocked_enter, #{}),
#{inflight_tid := InflightTID} = Data,
Now = now_(),
case inflight_get_first_retriable(InflightTID, Now) of
none ->
case is_inflight_full(InflightTID) of
true ->
{keep_state, Data};
false ->
{next_state, running, Data}
end;
{expired, Ref, Batch} ->
BufferWorkerPid = self(),
IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
Counters =
case IsAcked of
true -> #{dropped_expired => length(Batch)};
false -> #{}
end,
batch_reply_dropped(Batch, {error, request_expired}),
NData = aggregate_counters(Data, Counters),
?tp(buffer_worker_retry_expired, #{expired => Batch}),
resume_from_blocked(NData);
{single, Ref, Query} ->
%% We retry msgs in inflight window sync, as if we send them
%% async, they will be appended to the end of inflight window again.
retry_inflight_sync(Ref, Query, Data);
{batch, Ref, NotExpired, []} ->
retry_inflight_sync(Ref, NotExpired, Data);
{batch, Ref, NotExpired, Expired} ->
NumExpired = length(Expired),
ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired),
batch_reply_dropped(Expired, {error, request_expired}),
NData = aggregate_counters(Data, #{dropped_expired => NumExpired}),
?tp(buffer_worker_retry_expired, #{expired => Expired}),
%% We retry msgs in inflight window sync, as if we send them
%% async, they will be appended to the end of inflight window again.
retry_inflight_sync(Ref, NotExpired, NData)
end.
retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
#{
id := Id,
inflight_tid := InflightTID,
index := Index,
resume_interval := ResumeT
} = Data0,
?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
QueryOpts = #{simple_query => false},
Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
{ShouldAck, PostFn, DeltaCounters} =
case QueryOrBatch of
?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) ->
Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
reply_caller_defer_metrics(Id, Reply, QueryOpts);
[?QUERY(_, _, _, _) | _] = Batch ->
batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
end,
Data1 = aggregate_counters(Data0, DeltaCounters),
case ShouldAck of
%% Send failed because resource is down
nack ->
PostFn(),
?tp(
buffer_worker_retry_inflight_failed,
#{
ref => Ref,
query_or_batch => QueryOrBatch,
result => Result
}
),
{keep_state, Data1, {state_timeout, ResumeT, unblock}};
%% Send ok or failed but the resource is working
ack ->
BufferWorkerPid = self(),
IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
%% we need to defer bumping the counters after
%% `inflight_drop' to avoid the race condition when an
%% inflight request might get completed concurrently with
%% the retry, bumping them twice. Since both inflight
%% requests (repeated and original) have the safe `Ref',
%% we bump the counter when removing it from the table.
IsAcked andalso PostFn(),
?tp(
buffer_worker_retry_inflight_succeeded,
#{
ref => Ref,
query_or_batch => QueryOrBatch
}
),
resume_from_blocked(Data1)
end.
%% Called during the `running' state only.
-spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) ->
gen_statem:event_handler_result(state(), data()).
handle_query_requests(Request0, Data0) ->
Data = collect_and_enqueue_query_requests(Request0, Data0),
maybe_flush(Data).
collect_and_enqueue_query_requests(Request0, Data0) ->
#{
id := Id,
index := Index,
queue := Q
} = Data0,
Requests = collect_requests([Request0], ?COLLECT_REQ_LIMIT),
Queries =
lists:map(
fun
(?SEND_REQ(undefined = _ReplyTo, {query, Req, Opts})) ->
ReplyFun = maps:get(async_reply_fun, Opts, undefined),
HasBeenSent = false,
ExpireAt = maps:get(expire_at, Opts),
?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt);
(?SEND_REQ(ReplyTo, {query, Req, Opts})) ->
HasBeenSent = false,
ExpireAt = maps:get(expire_at, Opts),
?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt)
end,
Requests
),
{Overflown, NewQ, DeltaCounters} = append_queue(Id, Index, Q, Queries),
ok = reply_overflown(Overflown),
aggregate_counters(Data0#{queue := NewQ}, DeltaCounters).
reply_overflown([]) ->
ok;
reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt) | More]) ->
do_reply_caller(ReplyTo, {error, buffer_overflow}),
reply_overflown(More).
do_reply_caller(undefined, _Result) ->
ok;
do_reply_caller({F, Args}, {async_return, Result}) ->
%% this is an early return to async caller, the retry
%% decision has to be made by the caller
do_reply_caller({F, Args}, Result);
do_reply_caller({F, Args}, Result) when is_function(F) ->
_ = erlang:apply(F, Args ++ [Result]),
ok;
do_reply_caller({F, Args, _Context}, Result) when is_function(F) ->
_ = erlang:apply(F, Args ++ [Result]),
ok.
maybe_flush(Data0) ->
#{
batch_size := BatchSize,
queue := Q
} = Data0,
QueueCount = queue_count(Q),
case QueueCount >= BatchSize of
true ->
flush(Data0);
false ->
{keep_state, ensure_flush_timer(Data0)}
end.
%% Called during the `running' state only.
-spec flush(data()) -> gen_statem:event_handler_result(state(), data()).
flush(Data0) ->
#{
batch_size := BatchSize,
inflight_tid := InflightTID,
queue := Q0
} = Data0,
Data1 = cancel_flush_timer(Data0),
CurrentCount = queue_count(Q0),
IsFull = is_inflight_full(InflightTID),
?tp_ignore_side_effects_in_prod(buffer_worker_flush, #{
queued => CurrentCount,
is_inflight_full => IsFull,
inflight => inflight_count(InflightTID)
}),
case {CurrentCount, IsFull} of
{0, _} ->
?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{
inflight => inflight_count(InflightTID)
}),
{keep_state, Data1};
{_, true} ->
?tp(buffer_worker_flush_but_inflight_full, #{}),
{keep_state, Data1};
{_, false} ->
?tp(buffer_worker_flush_before_pop, #{}),
{Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
Data2 = Data1#{queue := Q1},
?tp(buffer_worker_flush_before_sieve_expired, #{}),
Now = now_(),
%% if the request has expired, the caller is no longer
%% waiting for a response.
case sieve_expired_requests(Batch, Now) of
{[], _AllExpired} ->
ok = replayq:ack(Q1, QAckRef),
NumExpired = length(Batch),
batch_reply_dropped(Batch, {error, request_expired}),
Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
?tp(buffer_worker_flush_all_expired, #{batch => Batch}),
flush(Data3);
{NotExpired, Expired} ->
NumExpired = length(Expired),
batch_reply_dropped(Expired, {error, request_expired}),
Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
IsBatch = (BatchSize > 1),
%% We *must* use the new queue, because we currently can't
%% `nack' a `pop'.
%% Maybe we could re-open the queue?
?tp(
buffer_worker_flush_potentially_partial,
#{expired => Expired, not_expired => NotExpired}
),
Ref = make_request_ref(),
do_flush(Data3, #{
is_batch => IsBatch,
batch => NotExpired,
ref => Ref,
ack_ref => QAckRef
})
end
end.
-spec do_flush(data(), #{
is_batch := boolean(),
batch := [queue_query()],
ack_ref := replayq:ack_ref(),
ref := inflight_key()
}) ->
gen_statem:event_handler_result(state(), data()).
do_flush(
#{queue := Q1} = Data0,
#{
is_batch := false,
batch := Batch,
ref := Ref,
ack_ref := QAckRef
}
) ->
#{
id := Id,
index := Index,
inflight_tid := InflightTID
} = Data0,
%% unwrap when not batching (i.e., batch size == 1)
[?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch,
QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts),
Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
{ShouldAck, DeltaCounters} = reply_caller(Id, Reply, QueryOpts),
Data1 = aggregate_counters(Data0, DeltaCounters),
case ShouldAck of
%% Failed; remove the request from the queue, as we cannot pop
%% from it again, but we'll retry it using the inflight table.
nack ->
ok = replayq:ack(Q1, QAckRef),
%% we set it atomically just below; a limitation of having
%% to use tuples for atomic ets updates
IsRetriable = true,
AsyncWorkerMRef0 = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, AsyncWorkerMRef0),
%% we must append again to the table to ensure that the
%% request will be retried (i.e., it might not have been
%% inserted during `call_query' if the resource was down
%% and/or if it was a sync request).
inflight_append(InflightTID, InflightItem),
mark_inflight_as_retriable(InflightTID, Ref),
{Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result),
store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef),
?tp(
buffer_worker_flush_nack,
#{
ref => Ref,
is_retriable => IsRetriable,
batch_or_query => Request,
result => Result
}
),
{next_state, blocked, Data2};
%% Success; just ack.
ack ->
ok = replayq:ack(Q1, QAckRef),
%% Async requests are acked later when the async worker
%% calls the corresponding callback function. Also, we
%% must ensure the async worker is being monitored for
%% such requests.
IsUnrecoverableError = is_unrecoverable_error(Result),
BufferWorkerPid = self(),
case is_async_return(Result) of
true when IsUnrecoverableError ->
ack_inflight(InflightTID, Ref, BufferWorkerPid);
true ->
ok;
false ->
ack_inflight(InflightTID, Ref, BufferWorkerPid)
end,
{Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result),
store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef),
?tp(
buffer_worker_flush_ack,
#{
batch_or_query => Request,
result => Result
}
),
CurrentCount = queue_count(Q1),
case CurrentCount > 0 of
true ->
?tp(buffer_worker_flush_ack_reflush, #{
batch_or_query => Request, result => Result, queue_count => CurrentCount
}),
flush_worker(self());
false ->
?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{
inflight => inflight_count(InflightTID)
}),
ok
end,
{keep_state, Data2}
end;
do_flush(#{queue := Q1} = Data0, #{
is_batch := true,
batch := Batch,
ref := Ref,
ack_ref := QAckRef
}) ->
#{
id := Id,
index := Index,
batch_size := BatchSize,
inflight_tid := InflightTID
} = Data0,
QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
Result = call_query(async_if_possible, Id, Index, Ref, Batch, QueryOpts),
{ShouldAck, DeltaCounters} = batch_reply_caller(Id, Result, Batch, QueryOpts),
Data1 = aggregate_counters(Data0, DeltaCounters),
case ShouldAck of
%% Failed; remove the request from the queue, as we cannot pop
%% from it again, but we'll retry it using the inflight table.
nack ->
ok = replayq:ack(Q1, QAckRef),
%% we set it atomically just below; a limitation of having
%% to use tuples for atomic ets updates
IsRetriable = true,
AsyncWorkerMRef0 = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef0),
%% we must append again to the table to ensure that the
%% request will be retried (i.e., it might not have been
%% inserted during `call_query' if the resource was down
%% and/or if it was a sync request).
inflight_append(InflightTID, InflightItem),
mark_inflight_as_retriable(InflightTID, Ref),
{Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result),
store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef),
?tp(
buffer_worker_flush_nack,
#{
ref => Ref,
is_retriable => IsRetriable,
batch_or_query => Batch,
result => Result
}
),
{next_state, blocked, Data2};
%% Success; just ack.
ack ->
ok = replayq:ack(Q1, QAckRef),
%% Async requests are acked later when the async worker
%% calls the corresponding callback function. Also, we
%% must ensure the async worker is being monitored for
%% such requests.
IsUnrecoverableError = is_unrecoverable_error(Result),
BufferWorkerPid = self(),
case is_async_return(Result) of
true when IsUnrecoverableError ->
ack_inflight(InflightTID, Ref, BufferWorkerPid);
true ->
ok;
false ->
ack_inflight(InflightTID, Ref, BufferWorkerPid)
end,
{Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result),
store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef),
CurrentCount = queue_count(Q1),
?tp(
buffer_worker_flush_ack,
#{
batch_or_query => Batch,
result => Result,
queue_count => CurrentCount
}
),
Data3 =
case {CurrentCount > 0, CurrentCount >= BatchSize} of
{false, _} ->
?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{
inflight => inflight_count(InflightTID)
}),
Data2;
{true, true} ->
?tp(buffer_worker_flush_ack_reflush, #{
batch_or_query => Batch,
result => Result,
queue_count => CurrentCount,
batch_size => BatchSize
}),
flush_worker(self()),
Data2;
{true, false} ->
ensure_flush_timer(Data2)
end,
{keep_state, Data3}
end.
batch_reply_caller(Id, BatchResult, Batch, QueryOpts) ->
{ShouldBlock, PostFn, DeltaCounters} =
batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts),
PostFn(),
{ShouldBlock, DeltaCounters}.
batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
%% the `Mod:on_batch_query/3` returns a single result for a batch,
%% so we need to expand
Replies = lists:map(
fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) ->
?REPLY(FROM, SENT, BatchResult)
end,
Batch
),
{ShouldAck, PostFns, Counters} =
lists:foldl(
fun(Reply, {_ShouldAck, PostFns, OldCounters}) ->
%% _ShouldAck should be the same as ShouldAck starting from the second reply
{ShouldAck, PostFn, DeltaCounters} = reply_caller_defer_metrics(
Id, Reply, QueryOpts
),
{ShouldAck, [PostFn | PostFns], merge_counters(OldCounters, DeltaCounters)}
end,
{ack, [], #{}},
Replies
),
PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end,
{ShouldAck, PostFn, Counters}.
reply_caller(Id, Reply, QueryOpts) ->
{ShouldAck, PostFn, DeltaCounters} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
PostFn(),
{ShouldAck, DeltaCounters}.
%% Should only reply to the caller when the decision is final (not
%% retriable). See comment on `handle_query_result_pure'.
reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) ->
handle_query_result_pure(Id, Result, HasBeenSent);
reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) ->
IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
IsUnrecoverableError = is_unrecoverable_error(Result),
{ShouldAck, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent),
case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
{ack, {async_return, _}, true, _} ->
ok = do_reply_caller(ReplyTo, Result);
{ack, {async_return, _}, false, _} ->
ok;
{_, _, _, true} ->
ok = do_reply_caller(ReplyTo, Result);
{nack, _, _, _} ->
ok;
{ack, _, _, _} ->
ok = do_reply_caller(ReplyTo, Result)
end,
{ShouldAck, PostFn, DeltaCounters}.
%% This is basically used only by rule actions. To avoid rule action metrics from
%% becoming inconsistent when we drop messages, we need a way to signal rule engine that
%% this action has reached a conclusion.
-spec reply_dropped(reply_fun(), {error, late_reply | request_expired}) -> ok.
reply_dropped(_ReplyTo = {Fn, Args, #{reply_dropped := true}}, Result) when
is_function(Fn), is_list(Args)
->
%% We want to avoid bumping metrics inside the buffer worker, since it's costly.
emqx_pool:async_submit(Fn, Args ++ [Result]),
ok;
reply_dropped(_ReplyTo, _Result) ->
ok.
-spec batch_reply_dropped([queue_query()], {error, late_reply | request_expired}) -> ok.
batch_reply_dropped(Batch, Result) ->
lists:foreach(
fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt)) ->
reply_dropped(ReplyTo, Result)
end,
Batch
).
%% This is only called by `simple_{,a}sync_query', so we can bump the
%% counters here.
handle_query_result(Id, Result, HasBeenSent) ->
{ShouldBlock, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent),
PostFn(),
bump_counters(Id, DeltaCounters),
ShouldBlock.
%% We should always retry (nack), except when:
%% * resource is not found
%% * resource is stopped
%% * the result is a success (or at least a delayed result)
%% We also retry even sync requests. In that case, we shouldn't reply
%% the caller until one of those final results above happen.
-spec handle_query_result_pure(id(), term(), HasBeenSent :: boolean()) ->
{ack | nack, function(), counters()}.
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{msg => "resource_exception", info => emqx_utils:redact(Msg)}),
ok
end,
{nack, PostFn, #{}};
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when
NotWorking == not_connected; NotWorking == blocked
->
{nack, fun() -> ok end, #{}};
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => "resource_not_found", info => Msg}),
ok
end,
{ack, PostFn, #{dropped_resource_not_found => 1}};
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => "resource_stopped", info => Msg}),
ok
end,
{ack, PostFn, #{dropped_resource_stopped => 1}};
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => "other_resource_error", reason => Reason}),
ok
end,
{nack, PostFn, #{}};
handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
case is_unrecoverable_error(Error) of
true ->
PostFn =
fun() ->
?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}),
ok
end,
Counters =
case HasBeenSent of
true -> #{retried_failed => 1};
false -> #{failed => 1}
end,
{ack, PostFn, Counters};
false ->
PostFn =
fun() ->
?SLOG(error, #{id => Id, msg => "send_error", reason => Reason}),
ok
end,
{nack, PostFn, #{}}
end;
handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) ->
handle_query_async_result_pure(Id, Result, HasBeenSent);
handle_query_result_pure(_Id, Result, HasBeenSent) ->
PostFn = fun() ->
assert_ok_result(Result),
ok
end,
Counters =
case HasBeenSent of
true -> #{retried_success => 1};
false -> #{success => 1}
end,
{ack, PostFn, Counters}.
-spec handle_query_async_result_pure(id(), term(), HasBeenSent :: boolean()) ->
{ack | nack, function(), counters()}.
handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
case is_unrecoverable_error(Error) of
true ->
PostFn =
fun() ->
?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}),
ok
end,
Counters =
case HasBeenSent of
true -> #{retried_failed => 1};
false -> #{failed => 1}
end,
{ack, PostFn, Counters};
false ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => "async_send_error", reason => Reason}),
ok
end,
{nack, PostFn, #{}}
end;
handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) ->
{ack, fun() -> ok end, #{}};
handle_query_async_result_pure(_Id, ok, _HasBeenSent) ->
{ack, fun() -> ok end, #{}}.
-spec aggregate_counters(data(), counters()) -> data().
aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) ->
Counters = merge_counters(OldCounters, DeltaCounters),
Data#{counters := Counters}.
-spec merge_counters(counters(), counters()) -> counters().
merge_counters(OldCounters, DeltaCounters) ->
maps:fold(
fun(Metric, Val, Acc) ->
maps:update_with(Metric, fun(X) -> X + Val end, Val, Acc)
end,
OldCounters,
DeltaCounters
).
-spec flush_metrics(data()) -> data().
flush_metrics(Data = #{id := Id, counters := Counters}) ->
bump_counters(Id, Counters),
set_gauges(Data),
log_expired_message_count(Data),
ensure_metrics_flush_timer(Data#{counters := #{}}).
-spec ensure_metrics_flush_timer(data()) -> data().
ensure_metrics_flush_timer(Data = #{metrics_tref := undefined, metrics_flush_interval := T}) ->
Ref = make_ref(),
TRef = erlang:send_after(T, self(), {flush_metrics, Ref}),
Data#{metrics_tref := {TRef, Ref}}.
-spec bump_counters(id(), counters()) -> ok.
bump_counters(Id, Counters) ->
Iter = maps:iterator(Counters),
do_bump_counters(Iter, Id).
do_bump_counters(Iter, Id) ->
case maps:next(Iter) of
{Key, Val, NIter} ->
do_bump_counters1(Key, Val, Id),
do_bump_counters(NIter, Id);
none ->
ok
end.
do_bump_counters1(dropped_expired, Val, Id) ->
emqx_resource_metrics:dropped_expired_inc(Id, Val);
do_bump_counters1(dropped_queue_full, Val, Id) ->
emqx_resource_metrics:dropped_queue_full_inc(Id, Val);
do_bump_counters1(failed, Val, Id) ->
emqx_resource_metrics:failed_inc(Id, Val);
do_bump_counters1(retried_failed, Val, Id) ->
emqx_resource_metrics:retried_failed_inc(Id, Val);
do_bump_counters1(success, Val, Id) ->
emqx_resource_metrics:success_inc(Id, Val);
do_bump_counters1(retried_success, Val, Id) ->
emqx_resource_metrics:retried_success_inc(Id, Val);
do_bump_counters1(dropped_resource_not_found, Val, Id) ->
emqx_resource_metrics:dropped_resource_not_found_inc(Id, Val);
do_bump_counters1(dropped_resource_stopped, Val, Id) ->
emqx_resource_metrics:dropped_resource_stopped_inc(Id, Val).
-spec log_expired_message_count(data()) -> ok.
log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counters}) ->
ExpiredCount = maps:get(dropped_expired, Counters, 0),
case ExpiredCount > 0 of
false ->
ok;
true ->
?SLOG(info, #{
msg => "buffer_worker_dropped_expired_messages",
resource_id => Id,
worker_index => Index,
expired_count => ExpiredCount
}),
ok
end.
-spec set_gauges(data()) -> ok.
set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := InflightTID}) ->
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
ok.
handle_async_worker_down(Data0, Pid) ->
#{async_workers := AsyncWorkers0} = Data0,
{AsyncWorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),
Data = Data0#{async_workers := AsyncWorkers},
mark_inflight_items_as_retriable(Data, AsyncWorkerMRef),
{next_state, blocked, Data}.
-spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _.
call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM}),
case emqx_resource_manager:lookup_cached(extract_connector_id(Id)) of
%% This seems to be the only place where the `rm_status_stopped' status matters,
%% to distinguish from the `disconnected' status.
{ok, _Group, #{status := ?rm_status_stopped}} ->
?RESOURCE_ERROR(stopped, "resource stopped or disabled");
{ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} ->
{error, {unrecoverable_error, unhealthy_target}};
{ok, _Group, Resource} ->
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource);
{error, not_found} ->
?RESOURCE_ERROR(not_found, "resource not found")
end.
%% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1
extract_connector_id(Id) when is_binary(Id) ->
case binary:split(Id, <<":">>, [global]) of
[
_ChannelGlobalType,
_ChannelSubType,
_ChannelName,
<<"connector">>,
ConnectorType,
ConnectorName
] ->
<<"connector:", ConnectorType/binary, ":", ConnectorName/binary>>;
_ ->
Id
end;
extract_connector_id(Id) ->
Id.
is_channel_id(Id) ->
extract_connector_id(Id) =/= Id.
%% Check if channel is installed in the connector state.
%% There is no need to query the conncector if the channel is not
%% installed as the query will fail anyway.
pre_query_channel_check({Id, _} = _Request, Channels, QueryOpts) when
is_map_key(Id, Channels)
->
ChannelStatus = maps:get(Id, Channels),
case emqx_resource_manager:channel_status_is_channel_added(ChannelStatus) of
true ->
ok;
false ->
error_if_channel_is_not_installed(Id, QueryOpts)
end;
pre_query_channel_check({Id, _} = _Request, _Channels, QueryOpts) ->
error_if_channel_is_not_installed(Id, QueryOpts);
pre_query_channel_check(_Request, _Channels, _QueryOpts) ->
ok.
error_if_channel_is_not_installed(Id, QueryOpts) ->
%% Fail with a recoverable error if the channel is not installed and there are buffer
%% workers involved so that the operation can be retried. Otherwise, this is
%% unrecoverable. It is emqx_resource_manager's responsibility to ensure that the
%% channel installation is retried.
IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
case is_channel_id(Id) of
true when IsSimpleQuery ->
{error,
{unrecoverable_error,
iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}};
true ->
{error,
{recoverable_error,
iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}};
false ->
ok
end.
do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Resource) when
ReqQM =:= simple_sync_internal_buffer; ReqQM =:= simple_async_internal_buffer
->
%% The query overrides the query mode of the resource, send even in disconnected state
?tp(simple_query_override, #{query_mode => ReqQM}),
#{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
CallMode = call_mode(QM, CBM),
apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer
->
%% The connector supports buffer, send even in disconnected state
#{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
CallMode = call_mode(QM, CBM),
apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) ->
%% when calling from the buffer worker or other simple queries,
%% only apply the query fun when it's at connected status
#{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
CallMode = call_mode(QM, CBM),
apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
?RESOURCE_ERROR(not_connected, "resource not connected").
-define(APPLY_RESOURCE(NAME, EXPR, REQ),
try
%% if the callback module (connector) wants to return an error that
%% makes the current resource goes into the `blocked` state, it should
%% return `{error, {recoverable_error, Reason}}`
EXPR
catch
%% For convenience and to make the code in the callbacks cleaner an
%% error exception with the two following formats are translated to the
%% corresponding return values. The receiver of the return values
%% recognizes these special return formats and use them to decided if a
%% request should be retried.
error:{unrecoverable_error, Msg} ->
{error, {unrecoverable_error, Msg}};
error:{recoverable_error, Msg} ->
{error, {recoverable_error, Msg}};
ERR:REASON:STACKTRACE ->
?RESOURCE_ERROR(exception, #{
name => NAME,
id => Id,
request => REQ,
error => {ERR, REASON},
stacktrace => STACKTRACE
})
end
).
apply_query_fun(
sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, Channels, QueryOpts
) ->
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
maybe_reply_to(
?APPLY_RESOURCE(
call_query,
begin
case pre_query_channel_check(Request, Channels, QueryOpts) of
ok ->
Mod:on_query(extract_connector_id(Id), Request, ResSt);
Error ->
Error
end
end,
Request
),
QueryOpts
);
apply_query_fun(
async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, Channels, QueryOpts
) ->
?tp(call_query_async, #{
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
}),
InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
?APPLY_RESOURCE(
call_query_async,
begin
ReplyFun = fun ?MODULE:handle_async_reply/2,
ReplyContext = #{
buffer_worker => self(),
resource_id => Id,
worker_index => Index,
inflight_tid => InflightTID,
request_ref => Ref,
query_opts => QueryOpts,
min_query => minimize(Query)
},
IsRetriable = false,
AsyncWorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
ok = inflight_append(InflightTID, InflightItem),
case pre_query_channel_check(Request, Channels, QueryOpts) of
ok ->
Result = Mod:on_query_async(
extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt
),
{async_return, Result};
Error ->
maybe_reply_to(Error, QueryOpts)
end
end,
Request
);
apply_query_fun(
sync,
Mod,
Id,
_Index,
_Ref,
[?QUERY(_, FirstRequest, _, _) | _] = Batch,
ResSt,
Channels,
QueryOpts
) ->
?tp(call_batch_query, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
}),
Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch),
maybe_reply_to(
?APPLY_RESOURCE(
call_batch_query,
begin
case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of
ok ->
Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt);
Error ->
Error
end
end,
Batch
),
QueryOpts
);
apply_query_fun(
async,
Mod,
Id,
Index,
Ref,
[?QUERY(_, FirstRequest, _, _) | _] = Batch,
ResSt,
Channels,
QueryOpts
) ->
?tp(call_batch_query_async, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
}),
InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
?APPLY_RESOURCE(
call_batch_query_async,
begin
ReplyFun = fun ?MODULE:handle_async_batch_reply/2,
ReplyContext = #{
buffer_worker => self(),
resource_id => Id,
worker_index => Index,
inflight_tid => InflightTID,
request_ref => Ref,
query_opts => QueryOpts,
min_batch => minimize(Batch)
},
Requests = lists:map(
fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch
),
IsRetriable = false,
AsyncWorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
ok = inflight_append(InflightTID, InflightItem),
case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of
ok ->
Result = Mod:on_batch_query_async(
extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt
),
{async_return, Result};
Error ->
maybe_reply_to(Error, QueryOpts)
end
end,
Batch
).
maybe_reply_to(Result, #{reply_to := ReplyTo}) ->
do_reply_caller(ReplyTo, Result),
Result;
maybe_reply_to(Result, _) ->
Result.
handle_async_reply(
#{
request_ref := Ref,
inflight_tid := InflightTID,
query_opts := Opts
} = ReplyContext,
Result
) ->
case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of
discard ->
ok;
continue ->
handle_async_reply1(ReplyContext, Result)
end.
handle_async_reply1(
#{
request_ref := Ref,
inflight_tid := InflightTID,
resource_id := Id,
buffer_worker := BufferWorkerPid,
min_query := ?QUERY(ReplyTo, _, _, ExpireAt) = _Query
} = ReplyContext,
Result
) ->
?tp(
handle_async_reply_enter,
#{batch_or_query => [_Query], ref => Ref, result => Result}
),
Now = now_(),
case is_expired(ExpireAt, Now) of
true ->
IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
%% evalutate metrics call here since we're not inside
%% buffer worker
IsAcked andalso
begin
emqx_resource_metrics:late_reply_inc(Id),
reply_dropped(ReplyTo, {error, late_reply})
end,
?tp(handle_async_reply_expired, #{expired => [_Query]}),
ok;
false ->
do_handle_async_reply(ReplyContext, Result)
end.
do_handle_async_reply(
#{
query_opts := QueryOpts,
resource_id := Id,
request_ref := Ref,
buffer_worker := BufferWorkerPid,
inflight_tid := InflightTID,
min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
},
Result
) ->
%% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the
%% inflight window.
{Action, PostFn, DeltaCounters} = reply_caller_defer_metrics(
Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts
),
?tp(handle_async_reply, #{
action => Action,
batch_or_query => [_Query],
ref => Ref,
result => Result
}),
case Action of
nack ->
%% Keep retrying.
ok = mark_inflight_as_retriable(InflightTID, Ref),
ok = ?MODULE:block(BufferWorkerPid),
blocked;
ack ->
ok = do_async_ack(
InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts
)
end.
handle_async_batch_reply(
#{
inflight_tid := InflightTID,
request_ref := Ref,
query_opts := Opts
} = ReplyContext,
Result
) ->
case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of
discard ->
ok;
continue ->
handle_async_batch_reply1(ReplyContext, Result)
end.
handle_async_batch_reply1(
#{
inflight_tid := InflightTID,
request_ref := Ref,
min_batch := Batch
} = ReplyContext,
Result
) ->
?tp(
handle_async_reply_enter,
#{batch_or_query => Batch, ref => Ref, result => Result}
),
Now = now_(),
case sieve_expired_requests(Batch, Now) of
{_NotExpired, []} ->
%% this is the critical code path,
%% we try not to do ets:lookup in this case
%% because the batch can be quite big
do_handle_async_batch_reply(ReplyContext, Result);
{_NotExpired, _Expired} ->
%% at least one is expired
%% the batch from reply context is minimized, so it cannot be used
%% to update the inflight items, hence discard Batch and lookup the RealBatch
?tp(handle_async_reply_expired, #{expired => _Expired}),
handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now)
end.
handle_async_batch_reply2([], _, _, _) ->
%% this usually should never happen unless the async callback is being evaluated concurrently
ok;
handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _AsyncWorkerMRef) = Inflight,
#{
resource_id := Id,
buffer_worker := BufferWorkerPid,
inflight_tid := InflightTID,
request_ref := Ref,
min_batch := Batch
} = ReplyContext,
%% All batch items share the same HasBeenSent flag
%% So we just take the original flag from the ReplyContext batch
%% and put it back to the batch found in inflight table
%% which must have already been set to `false`
[?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch,
{RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now),
RealNotExpired =
lists:map(
fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) ->
?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt)
end,
RealNotExpired0
),
NumExpired = length(RealExpired),
%% evalutate metrics call here since we're not inside buffer
%% worker
emqx_resource_metrics:late_reply_inc(Id, NumExpired),
batch_reply_dropped(RealExpired, {error, late_reply}),
case RealNotExpired of
[] ->
%% all expired, no need to update back the inflight batch
_ = ack_inflight(InflightTID, Ref, BufferWorkerPid),
ok;
_ ->
%% some queries are not expired, put them back to the inflight batch
%% so it can be either acked now or retried later
ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired),
?tp_ignore_side_effects_in_prod(
handle_async_reply_partially_expired,
#{
inflight_count => inflight_count(InflightTID),
num_inflight_messages => inflight_num_msgs(InflightTID)
}
),
do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result)
end.
do_handle_async_batch_reply(
#{
buffer_worker := BufferWorkerPid,
resource_id := Id,
inflight_tid := InflightTID,
request_ref := Ref,
min_batch := Batch,
query_opts := QueryOpts
},
Result
) ->
{Action, PostFn, DeltaCounters} = batch_reply_caller_defer_metrics(
Id, Result, Batch, QueryOpts
),
?tp(handle_async_reply, #{
action => Action,
batch_or_query => Batch,
ref => Ref,
result => Result
}),
case Action of
nack ->
%% Keep retrying.
ok = mark_inflight_as_retriable(InflightTID, Ref),
ok = ?MODULE:block(BufferWorkerPid),
blocked;
ack ->
ok = do_async_ack(
InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts
)
end.
do_async_ack(InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts) ->
IsKnownRef = ack_inflight(InflightTID, Ref, BufferWorkerPid),
case maps:get(simple_query, QueryOpts, false) of
true ->
PostFn(),
bump_counters(Id, DeltaCounters);
false when IsKnownRef ->
PostFn(),
bump_counters(Id, DeltaCounters);
false ->
ok
end,
ok.
%% check if the async reply is valid.
%% e.g. if a connector evaluates the callback more than once:
%% 1. If the request was previously deleted from inflight table due to
%% either succeeded previously or expired, this function logs a
%% warning message and returns 'discard' instruction.
%% 2. If the request was previously failed and now pending on a retry,
%% then this function will return 'continue' as there is no way to
%% tell if this reply is stae or not.
maybe_handle_unknown_async_reply(undefined, _Ref, #{simple_query := true}) ->
continue;
maybe_handle_unknown_async_reply(InflightTID, Ref, #{}) ->
try ets:member(InflightTID, Ref) of
true ->
continue;
false ->
?tp(
warning,
unknown_async_reply_discarded,
#{inflight_key => Ref}
),
discard
catch
error:badarg ->
%% shutdown ?
discard
end.
%%==============================================================================
%% operations for queue
queue_item_marshaller(Bin) when is_binary(Bin) ->
binary_to_term(Bin);
queue_item_marshaller(Item) ->
term_to_binary(Item).
estimate_size(QItem) ->
erlang:external_size(QItem).
-spec append_queue(id(), index(), replayq:q(), [queue_query()]) ->
{[queue_query()], replayq:q(), counters()}.
append_queue(Id, Index, Q, Queries) ->
%% this assertion is to ensure that we never append a raw binary
%% because the marshaller will get lost.
false = is_binary(hd(Queries)),
Q0 = replayq:append(Q, Queries),
{Overflown, Q2, DeltaCounters} =
case replayq:overflow(Q0) of
OverflownBytes when OverflownBytes =< 0 ->
{[], Q0, #{}};
OverflownBytes ->
PopOpts = #{bytes_limit => OverflownBytes, count_limit => 999999999},
{Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts),
ok = replayq:ack(Q1, QAckRef),
Dropped = length(Items2),
Counters = #{dropped_queue_full => Dropped},
?SLOG(info, #{
msg => "buffer_worker_overflow",
resource_id => Id,
worker_index => Index,
dropped => Dropped
}),
{Items2, Q1, Counters}
end,
?tp(
buffer_worker_appended_to_queue,
#{
id => Id,
items => Queries,
queue_count => queue_count(Q2),
overflown => length(Overflown)
}
),
{Overflown, Q2, DeltaCounters}.
%%==============================================================================
%% the inflight queue for async query
-define(MAX_SIZE_REF, max_size).
-define(SIZE_REF, size).
-define(BATCH_COUNT_REF, batch_count).
-define(INITIAL_TIME_REF, initial_time).
-define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time).
inflight_new(InfltWinSZ) ->
TableId = ets:new(
emqx_resource_buffer_worker_inflight_tab,
[ordered_set, public, {write_concurrency, true}]
),
inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}),
%% we use this counter because we might deal with batches as
%% elements.
inflight_append(TableId, {?SIZE_REF, 0}),
inflight_append(TableId, {?BATCH_COUNT_REF, 0}),
inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}),
inflight_append(TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}),
TableId.
-spec inflight_get_first_retriable(ets:tid(), integer()) ->
none
| {expired, inflight_key(), [queue_query()]}
| {single, inflight_key(), queue_query()}
| {batch, inflight_key(), _NotExpired :: [queue_query()], _Expired :: [queue_query()]}.
inflight_get_first_retriable(InflightTID, Now) ->
MatchSpec =
ets:fun2ms(
fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _AsyncWorkerMRef)) when
IsRetriable =:= true
->
{Ref, BatchOrQuery}
end
),
case ets:select(InflightTID, MatchSpec, _Limit = 1) of
'$end_of_table' ->
none;
{[{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} ->
case is_expired(ExpireAt, Now) of
true ->
{expired, Ref, [Query]};
false ->
{single, Ref, Query}
end;
{[{Ref, Batch = [_ | _]}], _Continuation} ->
case sieve_expired_requests(Batch, Now) of
{[], _AllExpired} ->
{expired, Ref, Batch};
{NotExpired, Expired} ->
{batch, Ref, NotExpired, Expired}
end
end.
is_inflight_full(undefined) ->
false;
is_inflight_full(InflightTID) ->
[{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
%% we consider number of batches rather than number of messages
%% because one batch request may hold several messages.
Size = inflight_count(InflightTID),
Size >= MaxSize.
inflight_count(InflightTID) ->
emqx_utils_ets:lookup_value(InflightTID, ?BATCH_COUNT_REF, 0).
inflight_num_msgs(InflightTID) ->
[{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF),
Size.
inflight_append(undefined, _InflightItem) ->
ok;
inflight_append(
InflightTID,
?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef)
) ->
Batch = mark_as_sent(Batch0),
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
IsNew = ets:insert_new(InflightTID, InflightItem),
BatchSize = length(Batch),
IsNew andalso inc_inflight(InflightTID, BatchSize),
?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
ok;
inflight_append(
InflightTID,
?INFLIGHT_ITEM(
Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, AsyncWorkerMRef
)
) ->
Query = mark_as_sent(Query0),
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
IsNew = ets:insert_new(InflightTID, InflightItem),
IsNew andalso inc_inflight(InflightTID, 1),
?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
ok;
inflight_append(InflightTID, {Ref, Data}) ->
ets:insert(InflightTID, {Ref, Data}),
%% this is a metadata row being inserted; therefore, we don't bump
%% the inflight metric.
ok.
%% a request was already appended and originally not retriable, but an
%% error occurred and it is now retriable.
mark_inflight_as_retriable(undefined, _Ref) ->
ok;
mark_inflight_as_retriable(InflightTID, Ref) ->
_ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}),
%% the old worker's DOWN should not affect this inflight any more
_ = ets:update_element(InflightTID, Ref, {?WORKER_MREF_IDX, erased}),
ok.
%% Track each worker pid only once.
ensure_async_worker_monitored(
Data0 = #{async_workers := AsyncWorkers}, {async_return, {ok, AsyncWorkerPid}} = _Result
) when
is_pid(AsyncWorkerPid), is_map_key(AsyncWorkerPid, AsyncWorkers)
->
AsyncWorkerMRef = maps:get(AsyncWorkerPid, AsyncWorkers),
{Data0, AsyncWorkerMRef};
ensure_async_worker_monitored(
Data0 = #{async_workers := AsyncWorkers0}, {async_return, {ok, AsyncWorkerPid}}
) when
is_pid(AsyncWorkerPid)
->
AsyncWorkerMRef = monitor(process, AsyncWorkerPid),
AsyncWorkers = AsyncWorkers0#{AsyncWorkerPid => AsyncWorkerMRef},
Data = Data0#{async_workers := AsyncWorkers},
{Data, AsyncWorkerMRef};
ensure_async_worker_monitored(Data0, _Result) ->
{Data0, undefined}.
-spec store_async_worker_reference(undefined | ets:tid(), inflight_key(), undefined | reference()) ->
ok.
store_async_worker_reference(undefined = _InflightTID, _Ref, _AsyncWorkerMRef) ->
ok;
store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) ->
ok;
store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef) when
is_reference(AsyncWorkerMRef)
->
_ = ets:update_element(
InflightTID, Ref, {?WORKER_MREF_IDX, AsyncWorkerMRef}
),
ok.
ack_inflight(undefined, _Ref, _BufferWorkerPid) ->
false;
ack_inflight(InflightTID, Ref, BufferWorkerPid) ->
{Count, Removed} =
case ets:take(InflightTID, Ref) of
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _AsyncWorkerMRef)] ->
{1, true};
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef)] ->
{length(Batch), true};
[] ->
{0, false}
end,
FlushCheck = dec_inflight_remove(InflightTID, Count, Removed),
case FlushCheck of
no_flush -> ok;
flush -> ?MODULE:flush_worker(BufferWorkerPid)
end,
IsKnownRef = (Count > 0),
IsKnownRef.
mark_inflight_items_as_retriable(Data, AsyncWorkerMRef) ->
#{inflight_tid := InflightTID} = Data,
IsRetriable = true,
MatchSpec =
ets:fun2ms(
fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, _IsRetriable, AsyncWorkerMRef0)) when
AsyncWorkerMRef =:= AsyncWorkerMRef0
->
?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef0)
end
),
_NumAffected = ets:select_replace(InflightTID, MatchSpec),
?tp(buffer_worker_async_agent_down, #{num_affected => _NumAffected, buffer_worker => self()}),
ok.
%% used to update a batch after dropping expired individual queries.
update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
_ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
ok = dec_inflight_update(InflightTID, NumExpired).
inc_inflight(InflightTID, Count) ->
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}),
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}),
ok.
-spec dec_inflight_remove(undefined | ets:tid(), non_neg_integer(), Removed :: boolean()) ->
no_flush | flush.
dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) ->
no_flush;
dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) ->
NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0),
%% if the new value is Max - 1, it means that we've just made room
%% in the inflight table, so we should poke the buffer worker to
%% make it continue flushing.
case NewValue =:= MaxValue - 1 of
true -> flush;
false -> no_flush
end;
dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 ->
%% If Count > 0, it must have been removed
NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0),
%% if the new value is Max - 1, it means that we've just made room
%% in the inflight table, so we should poke the buffer worker to
%% make it continue flushing.
case NewValue =:= MaxValue - 1 of
true -> flush;
false -> no_flush
end.
dec_inflight_update(_InflightTID, _Count = 0) ->
ok;
dec_inflight_update(InflightTID, Count) when Count > 0 ->
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
ok.
%%==============================================================================
call_mode(force_sync, _) -> sync;
call_mode(async_if_possible, always_sync) -> sync;
call_mode(async_if_possible, async_if_possible) -> async.
assert_ok_result(ok) ->
true;
assert_ok_result({async_return, R}) ->
assert_ok_result(R);
assert_ok_result(R) when is_tuple(R) ->
try
ok = erlang:element(1, R)
catch
error:{badmatch, _} ->
error({not_ok_result, R})
end;
assert_ok_result(R) ->
error({not_ok_result, R}).
queue_count(Q) ->
replayq:count(Q).
disk_queue_dir(Id, Index) ->
QDir0 = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
QDir = filename:join([emqx:data_dir(), "bufs", node(), QDir0]),
emqx_utils:safe_filename(QDir).
clear_disk_queue_dir(Id, Index) ->
ReplayQDir = disk_queue_dir(Id, Index),
case file:del_dir_r(ReplayQDir) of
{error, enoent} ->
ok;
Res ->
Res
end.
ensure_flush_timer(Data = #{batch_time := T}) ->
ensure_flush_timer(Data, T).
ensure_flush_timer(Data = #{tref := undefined}, 0) ->
%% if the batch_time is 0, we don't need to start a timer, which
%% can be costly at high rates.
Ref = make_ref(),
self() ! {flush, Ref},
Data#{tref => {Ref, Ref}};
ensure_flush_timer(Data = #{tref := undefined}, T) ->
Ref = make_ref(),
TRef = erlang:send_after(T, self(), {flush, Ref}),
Data#{tref => {TRef, Ref}};
ensure_flush_timer(Data, _T) ->
Data.
cancel_flush_timer(St = #{tref := undefined}) ->
St;
cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
_ = erlang:cancel_timer(TRef),
St#{tref => undefined}.
-spec make_request_ref() -> inflight_key().
make_request_ref() ->
now_().
collect_requests(Acc, Limit) ->
Count = length(Acc),
do_collect_requests(Acc, Count, Limit).
do_collect_requests(Acc, Count, Limit) when Count >= Limit ->
lists:reverse(Acc);
do_collect_requests(Acc, Count, Limit) ->
receive
?SEND_REQ(_ReplyTo, _Req) = Request ->
do_collect_requests([Request | Acc], Count + 1, Limit)
after 0 ->
lists:reverse(Acc)
end.
mark_as_sent(Batch) when is_list(Batch) ->
lists:map(fun mark_as_sent/1, Batch);
mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt)) ->
HasBeenSent = true,
?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt).
is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
true;
is_unrecoverable_error({error, {recoverable_error, _}}) ->
false;
is_unrecoverable_error({async_return, Result}) ->
is_unrecoverable_error(Result);
is_unrecoverable_error({error, _}) ->
%% TODO: delete this clause.
%% Ideally all errors except for 'unrecoverable_error' should be
%% retried, including DB schema errors.
true;
is_unrecoverable_error(_) ->
false.
is_async_return({async_return, _}) ->
true;
is_async_return(_) ->
false.
sieve_expired_requests(Batch, Now) ->
lists:partition(
fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) ->
not is_expired(ExpireAt, Now)
end,
Batch
).
-spec is_expired(infinity | integer(), integer()) -> boolean().
is_expired(infinity = _ExpireAt, _Now) ->
false;
is_expired(ExpireAt, Now) ->
Now > ExpireAt.
now_() ->
erlang:monotonic_time(nanosecond).
-spec ensure_timeout_query_opts(query_opts(), sync | async) -> query_opts().
ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) ->
Opts;
ensure_timeout_query_opts(#{} = Opts0, sync) ->
Opts0#{timeout => ?DEFAULT_REQUEST_TTL};
ensure_timeout_query_opts(#{} = Opts0, async) ->
Opts0#{timeout => infinity}.
-spec ensure_expire_at(query_opts()) -> query_opts().
ensure_expire_at(#{expire_at := _} = Opts) ->
Opts;
ensure_expire_at(#{timeout := infinity} = Opts) ->
Opts#{expire_at => infinity};
ensure_expire_at(#{timeout := TimeoutMS} = Opts) ->
TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond),
ExpireAt = now_() + TimeoutNS,
Opts#{expire_at => ExpireAt}.
%% no need to keep the request for async reply handler
minimize(?QUERY(_, _, _, _) = Q) ->
do_minimize(Q);
minimize(L) when is_list(L) ->
lists:map(fun do_minimize/1, L).
-ifdef(TEST).
do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query.
-else.
do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt).
-endif.
%% To avoid message loss due to misconfigurations, we adjust
%% `batch_time' based on `request_ttl'. If `batch_time' >
%% `request_ttl', all requests will timeout before being sent if
%% the message rate is low. Even worse if `pool_size' is high.
%% We cap `batch_time' at `request_ttl div 2' as a rule of thumb.
adjust_batch_time(_Id, _RequestTTL = infinity, BatchTime0) ->
BatchTime0;
adjust_batch_time(Id, RequestTTL, BatchTime0) ->
BatchTime = max(0, min(BatchTime0, RequestTTL div 2)),
case BatchTime =:= BatchTime0 of
false ->
?SLOG(info, #{
id => Id,
msg => "adjusting_buffer_worker_batch_time",
new_batch_time => BatchTime
});
true ->
ok
end,
BatchTime.
replayq_opts(Id, Index, Opts) ->
BufferMode = maps:get(buffer_mode, Opts, memory_only),
TotalBytes = maps:get(max_buffer_bytes, Opts, ?DEFAULT_BUFFER_BYTES),
case BufferMode of
memory_only ->
#{
mem_only => true,
marshaller => fun ?MODULE:queue_item_marshaller/1,
max_total_bytes => TotalBytes,
sizer => fun ?MODULE:estimate_size/1
};
volatile_offload ->
SegBytes0 = maps:get(buffer_seg_bytes, Opts, TotalBytes),
SegBytes = min(SegBytes0, TotalBytes),
#{
dir => disk_queue_dir(Id, Index),
marshaller => fun ?MODULE:queue_item_marshaller/1,
max_total_bytes => TotalBytes,
%% we don't want to retain the queue after
%% resource restarts.
offload => {true, volatile},
seg_bytes => SegBytes,
sizer => fun ?MODULE:estimate_size/1
}
end.
%% The request timeout should be greater than the resume interval, as
%% it defines how often the buffer worker tries to unblock. If request
%% timeout is <= resume interval and the buffer worker is ever
%% blocked, than all queued requests will basically fail without being
%% attempted.
-spec default_resume_interval(request_ttl(), health_check_interval()) -> timeout_ms().
default_resume_interval(_RequestTTL = infinity, HealthCheckInterval) ->
max(1, HealthCheckInterval);
default_resume_interval(RequestTTL, HealthCheckInterval) ->
max(1, min(HealthCheckInterval, RequestTTL div 3)).
-spec reply_call(reference(), term()) -> ok.
reply_call(Alias, Response) ->
%% Since we use a reference created with `{alias,
%% reply_demonitor}', after we `demonitor' it in case of a
%% timeout, we won't send any more messages that the caller is not
%% expecting anymore. Using `gen_statem:reply({pid(),
%% reference()}, _)' would still send a late reply even after the
%% demonitor.
erlang:send(Alias, {Alias, Response}),
ok.
%% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to'
%% callbacks.
reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) ->
?MODULE:reply_call(ReplyAlias, Response),
do_reply_caller(MaybeReplyTo, Response).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
adjust_batch_time_test_() ->
%% just for logging
Id = some_id,
[
{"batch time smaller than request_time/2",
?_assertEqual(
100,
adjust_batch_time(Id, 500, 100)
)},
{"batch time equal to request_time/2",
?_assertEqual(
100,
adjust_batch_time(Id, 200, 100)
)},
{"batch time greater than request_time/2",
?_assertEqual(
50,
adjust_batch_time(Id, 100, 100)
)},
{"batch time smaller than request_time/2 (request_time = infinity)",
?_assertEqual(
100,
adjust_batch_time(Id, infinity, 100)
)}
].
-endif.