%%-------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- %% This module implements async message sending, disk message queuing, %% and message batching using ReplayQ. -module(emqx_resource_buffer_worker). -include("emqx_resource.hrl"). -include("emqx_resource_errors.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -behaviour(gen_statem). -export([ start_link/3, sync_query/3, async_query/3, block/1, resume/1, flush_worker/1 ]). -export([ simple_sync_query/2, simple_sync_query/3, simple_async_query/3, simple_sync_internal_buffer_query/3 ]). -export([ callback_mode/0, init/1, terminate/2, code_change/3 ]). -export([running/3, blocked/3]). -export([queue_item_marshaller/1, estimate_size/1]). -export([ handle_async_reply/2, handle_async_batch_reply/2, reply_call/2, reply_call_internal_buffer/3 ]). -export([clear_disk_queue_dir/2]). -elvis([{elvis_style, dont_repeat_yourself, disable}]). -define(COLLECT_REQ_LIMIT, 1000). -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}). -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}). -define(SIMPLE_QUERY(FROM, REQUEST), ?QUERY(FROM, REQUEST, false, infinity)). -define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}). -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef), {Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef} ). -define(ITEM_IDX, 2). -define(RETRY_IDX, 3). -define(WORKER_MREF_IDX, 4). -type id() :: binary(). -type index() :: pos_integer(). -type expire_at() :: infinity | integer(). -type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()). -type request() :: term(). -type request_from() :: undefined | gen_statem:from(). -type timeout_ms() :: emqx_schema:timeout_duration_ms(). -type request_ttl() :: emqx_schema:timeout_duration_ms(). -type health_check_interval() :: pos_integer(). -type state() :: blocked | running. -type inflight_key() :: integer(). -type counters() :: #{ dropped_expired => non_neg_integer(), dropped_queue_full => non_neg_integer(), dropped_resource_not_found => non_neg_integer(), dropped_resource_stopped => non_neg_integer(), success => non_neg_integer(), failed => non_neg_integer(), retried_success => non_neg_integer(), retried_failed => non_neg_integer() }. -type inflight_table() :: ets:tid() | atom() | reference(). -type data() :: #{ id := id(), index := index(), inflight_tid := inflight_table(), async_workers := #{pid() => reference()}, batch_size := pos_integer(), batch_time := timeout_ms(), counters := counters(), metrics_flush_interval := timeout_ms(), queue := replayq:q(), resume_interval := timeout_ms(), tref := undefined | {reference(), reference()}, metrics_tref := undefined | {reference(), reference()} }. callback_mode() -> [state_functions, state_enter]. start_link(Id, Index, Opts) -> gen_statem:start_link(?MODULE, {Id, Index, Opts}, []). -spec sync_query(id(), request(), query_opts()) -> Result :: term(). sync_query(Id, Request, Opts0) -> ?tp(sync_query, #{id => Id, request => Request, query_opts => Opts0}), Opts1 = ensure_timeout_query_opts(Opts0, sync), Opts = ensure_expire_at(Opts1), PickKey = maps:get(pick_key, Opts, self()), Timeout = maps:get(timeout, Opts), emqx_resource_metrics:matched_inc(Id), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). -spec async_query(id(), request(), query_opts()) -> Result :: term(). async_query(Id, Request, Opts0) -> ?tp(async_query, #{id => Id, request => Request, query_opts => Opts0}), Opts1 = ensure_timeout_query_opts(Opts0, async), Opts = ensure_expire_at(Opts1), PickKey = maps:get(pick_key, Opts, self()), emqx_resource_metrics:matched_inc(Id), pick_cast(Id, PickKey, {query, Request, Opts}). %% simple query the resource without batching and queuing. -spec simple_sync_query(id(), request()) -> term(). simple_sync_query(Id, Request) -> simple_sync_query(Id, Request, #{}). -spec simple_sync_query(id(), request(), query_opts()) -> term(). simple_sync_query(Id, Request, QueryOpts0) -> %% Note: since calling this function implies in bypassing the %% buffer workers, and each buffer worker index is used when %% collecting gauge metrics, we use this dummy index. If this %% call ends up calling buffering functions, that's a bug and %% would mess up the metrics anyway. `undefined' is ignored by %% `emqx_resource_metrics:*_shift/3'. ?tp(simple_sync_query, #{id => Id, request => Request}), Index = undefined, QueryOpts = maps:merge(simple_query_opts(), QueryOpts0), emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), ReplyTo = maps:get(reply_to, QueryOpts0, undefined), Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. %% simple async-query the resource without batching and queuing. -spec simple_async_query(id(), request(), query_opts()) -> term(). simple_async_query(Id, Request, QueryOpts0) -> ?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}), Index = undefined, QueryOpts = maps:merge(simple_query_opts(), QueryOpts0), emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), ReplyTo = maps:get(reply_to, QueryOpts0, undefined), Result = call_query( async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts ), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. %% This is a hack to handle cases where the underlying connector has internal buffering %% (e.g.: Kafka and Pulsar producers). Since the message may be inernally retried at a %% later time, we can't bump metrics immediatelly if the return value is not a success %% (e.g.: if the call timed out, but the message was enqueued nevertheless). -spec simple_sync_internal_buffer_query(id(), request(), query_opts()) -> term(). simple_sync_internal_buffer_query(Id, Request, QueryOpts0) -> ?tp(simple_sync_internal_buffer_query, #{id => Id, request => Request, query_opts => QueryOpts0}), ReplyAlias = alias([reply]), try MaybeReplyTo = maps:get(reply_to, QueryOpts0, undefined), QueryOpts1 = QueryOpts0#{ reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]} }, QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1), case simple_async_query(Id, Request, QueryOpts) of {error, _} = Error -> Error; {async_return, {error, _} = Error} -> Error; {async_return, {ok, _Pid}} -> receive {ReplyAlias, Response} -> Response after Timeout -> _ = unalias(ReplyAlias), receive {ReplyAlias, Response} -> Response after 0 -> {error, timeout} end end end after _ = unalias(ReplyAlias) end. simple_query_opts() -> ensure_expire_at(#{simple_query => true, timeout => infinity}). -spec block(pid()) -> ok. block(ServerRef) -> gen_statem:cast(ServerRef, block). -spec resume(pid()) -> ok. resume(ServerRef) -> gen_statem:cast(ServerRef, resume). -spec flush_worker(pid()) -> ok. flush_worker(ServerRef) -> gen_statem:cast(ServerRef, flush). -spec init({id(), pos_integer(), map()}) -> gen_statem:init_result(state(), data()). init({Id, Index, Opts}) -> process_flag(trap_exit, true), true = gproc_pool:connect_worker(Id, {Id, Index}), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), QueueOpts = replayq_opts(Id, Index, Opts), Queue = replayq:open(QueueOpts), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)), emqx_resource_metrics:inflight_set(Id, Index, 0), InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT), InflightTID = inflight_new(InflightWinSize), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), RequestTTL = maps:get(request_ttl, Opts, ?DEFAULT_REQUEST_TTL), BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), BatchTime = adjust_batch_time(Id, RequestTTL, BatchTime0), DefaultResumeInterval = default_resume_interval(RequestTTL, HealthCheckInterval), ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval), MetricsFlushInterval = maps:get(metrics_flush_interval, Opts, ?DEFAULT_METRICS_FLUSH_INTERVAL), Data0 = #{ id => Id, index => Index, inflight_tid => InflightTID, async_workers => #{}, batch_size => BatchSize, batch_time => BatchTime, counters => #{}, metrics_flush_interval => MetricsFlushInterval, queue => Queue, resume_interval => ResumeInterval, tref => undefined, metrics_tref => undefined }, Data = ensure_metrics_flush_timer(Data0), ?tp(buffer_worker_init, #{id => Id, index => Index, queue_opts => QueueOpts}), {ok, running, Data}. running(enter, _, #{tref := _Tref} = Data) -> ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data), tref => _Tref}), %% According to `gen_statem' laws, we mustn't call `maybe_flush' %% directly because it may decide to return `{next_state, blocked, _}', %% and that's an invalid response for a state enter call. %% Returning a next event from a state enter call is also %% prohibited. {keep_state, ensure_flush_timer(Data, 0)}; running(cast, resume, _St) -> keep_state_and_data; running(cast, flush, Data) -> flush(Data); running(cast, block, St) -> {next_state, blocked, St}; running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) -> handle_query_requests(Request0, Data); running(info, {flush, Ref}, Data = #{tref := {_TRef, Ref}}) -> flush(Data#{tref := undefined}); running(info, {flush, _Ref}, _Data) -> ?tp(discarded_stale_flush, #{}), keep_state_and_data; running(info, {flush_metrics, Ref}, Data0 = #{metrics_tref := {_TRef, Ref}}) -> Data = flush_metrics(Data0#{metrics_tref := undefined}), {keep_state, Data}; running(info, {flush_metrics, _Ref}, _Data) -> keep_state_and_data; running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) -> ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}), handle_async_worker_down(Data0, Pid); running(info, Info, _St) -> ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}), keep_state_and_data. blocked(enter, _, #{resume_interval := ResumeT} = St0) -> ?tp(buffer_worker_enter_blocked, #{buffer_worker => self()}), %% discard the old timer, new timer will be started when entering running state again St = cancel_flush_timer(St0), {keep_state, St, {state_timeout, ResumeT, unblock}}; blocked(cast, block, _St) -> keep_state_and_data; blocked(cast, resume, St) -> resume_from_blocked(St); blocked(cast, flush, St) -> resume_from_blocked(St); blocked(state_timeout, unblock, St) -> resume_from_blocked(St); blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) -> Data = collect_and_enqueue_query_requests(Request0, Data0), {keep_state, Data}; blocked(info, {flush, _Ref}, _Data) -> %% ignore stale timer keep_state_and_data; blocked(info, {flush_metrics, Ref}, Data0 = #{metrics_tref := {_TRef, Ref}}) -> Data = flush_metrics(Data0#{metrics_tref := undefined}), {keep_state, Data}; blocked(info, {flush_metrics, _Ref}, _Data) -> keep_state_and_data; blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) -> ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}), handle_async_worker_down(Data0, Pid); blocked(info, Info, _Data) -> ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}), keep_state_and_data. terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> _ = replayq:close(Q), emqx_resource_metrics:inflight_set(Id, Index, 0), %% since we want volatile queues, this will be 0 after %% termination. emqx_resource_metrics:queuing_set(Id, Index, 0), gproc_pool:disconnect_worker(Id, {Id, Index}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%============================================================================== -define(PICK(ID, KEY, PID, EXPR), try case gproc_pool:pick_worker(ID, KEY) of PID when is_pid(PID) -> EXPR; _ -> ?RESOURCE_ERROR(worker_not_created, "resource not created") end catch error:badarg -> ?RESOURCE_ERROR(worker_not_created, "resource not created"); error:timeout -> ?RESOURCE_ERROR(timeout, "call resource timeout") end ). pick_call(Id, Key, Query = {_, _, QueryOpts}, Timeout) -> ?PICK(Id, Key, Pid, begin MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]), ReplyTo = {fun ?MODULE:reply_call/2, [MRef]}, erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)), receive {MRef, Response} -> erlang:demonitor(MRef, [flush]), maybe_reply_to(Response, QueryOpts); {'DOWN', MRef, process, Pid, Reason} -> error({worker_down, Reason}) after Timeout -> erlang:demonitor(MRef, [flush]), receive {MRef, Response} -> maybe_reply_to(Response, QueryOpts) after 0 -> error(timeout) end end end). pick_cast(Id, Key, Query = {query, _Request, QueryOpts}) -> ?PICK(Id, Key, Pid, begin ReplyTo = maps:get(reply_to, QueryOpts, undefined), erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)), ok end). resume_from_blocked(Data) -> ?tp(buffer_worker_resume_from_blocked_enter, #{}), #{inflight_tid := InflightTID} = Data, Now = now_(), case inflight_get_first_retriable(InflightTID, Now) of none -> case is_inflight_full(InflightTID) of true -> {keep_state, Data}; false -> {next_state, running, Data} end; {expired, Ref, Batch} -> BufferWorkerPid = self(), IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), Counters = case IsAcked of true -> #{dropped_expired => length(Batch)}; false -> #{} end, batch_reply_dropped(Batch, {error, request_expired}), NData = aggregate_counters(Data, Counters), ?tp(buffer_worker_retry_expired, #{expired => Batch}), resume_from_blocked(NData); {single, Ref, Query} -> %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. retry_inflight_sync(Ref, Query, Data); {batch, Ref, NotExpired, []} -> retry_inflight_sync(Ref, NotExpired, Data); {batch, Ref, NotExpired, Expired} -> NumExpired = length(Expired), ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired), batch_reply_dropped(Expired, {error, request_expired}), NData = aggregate_counters(Data, #{dropped_expired => NumExpired}), ?tp(buffer_worker_retry_expired, #{expired => Expired}), %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. retry_inflight_sync(Ref, NotExpired, NData) end. retry_inflight_sync(Ref, QueryOrBatch, Data0) -> #{ id := Id, inflight_tid := InflightTID, index := Index, resume_interval := ResumeT } = Data0, ?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), QueryOpts = #{simple_query => false}, Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts), {ShouldAck, PostFn, DeltaCounters} = case QueryOrBatch of ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) -> Reply = ?REPLY(ReplyTo, HasBeenSent, Result), reply_caller_defer_metrics(Id, Reply, QueryOpts); [?QUERY(_, _, _, _) | _] = Batch -> batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) end, Data1 = aggregate_counters(Data0, DeltaCounters), case ShouldAck of %% Send failed because resource is down nack -> PostFn(), ?tp( buffer_worker_retry_inflight_failed, #{ ref => Ref, query_or_batch => QueryOrBatch, result => Result } ), {keep_state, Data1, {state_timeout, ResumeT, unblock}}; %% Send ok or failed but the resource is working ack -> BufferWorkerPid = self(), IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), %% we need to defer bumping the counters after %% `inflight_drop' to avoid the race condition when an %% inflight request might get completed concurrently with %% the retry, bumping them twice. Since both inflight %% requests (repeated and original) have the safe `Ref', %% we bump the counter when removing it from the table. IsAcked andalso PostFn(), ?tp( buffer_worker_retry_inflight_succeeded, #{ ref => Ref, query_or_batch => QueryOrBatch } ), resume_from_blocked(Data1) end. %% Called during the `running' state only. -spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) -> gen_statem:event_handler_result(state(), data()). handle_query_requests(Request0, Data0) -> Data = collect_and_enqueue_query_requests(Request0, Data0), maybe_flush(Data). collect_and_enqueue_query_requests(Request0, Data0) -> #{ id := Id, index := Index, queue := Q } = Data0, Requests = collect_requests([Request0], ?COLLECT_REQ_LIMIT), Queries = lists:map( fun (?SEND_REQ(undefined = _ReplyTo, {query, Req, Opts})) -> ReplyFun = maps:get(async_reply_fun, Opts, undefined), HasBeenSent = false, ExpireAt = maps:get(expire_at, Opts), ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt); (?SEND_REQ(ReplyTo, {query, Req, Opts})) -> HasBeenSent = false, ExpireAt = maps:get(expire_at, Opts), ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt) end, Requests ), {Overflown, NewQ, DeltaCounters} = append_queue(Id, Index, Q, Queries), ok = reply_overflown(Overflown), aggregate_counters(Data0#{queue := NewQ}, DeltaCounters). reply_overflown([]) -> ok; reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt) | More]) -> do_reply_caller(ReplyTo, {error, buffer_overflow}), reply_overflown(More). do_reply_caller(undefined, _Result) -> ok; do_reply_caller({F, Args}, {async_return, Result}) -> %% this is an early return to async caller, the retry %% decision has to be made by the caller do_reply_caller({F, Args}, Result); do_reply_caller({F, Args}, Result) when is_function(F) -> _ = erlang:apply(F, Args ++ [Result]), ok; do_reply_caller({F, Args, _Context}, Result) when is_function(F) -> _ = erlang:apply(F, Args ++ [Result]), ok. maybe_flush(Data0) -> #{ batch_size := BatchSize, queue := Q } = Data0, QueueCount = queue_count(Q), case QueueCount >= BatchSize of true -> flush(Data0); false -> {keep_state, ensure_flush_timer(Data0)} end. %% Called during the `running' state only. -spec flush(data()) -> gen_statem:event_handler_result(state(), data()). flush(Data0) -> #{ batch_size := BatchSize, inflight_tid := InflightTID, queue := Q0 } = Data0, Data1 = cancel_flush_timer(Data0), CurrentCount = queue_count(Q0), IsFull = is_inflight_full(InflightTID), ?tp_ignore_side_effects_in_prod(buffer_worker_flush, #{ queued => CurrentCount, is_inflight_full => IsFull, inflight => inflight_count(InflightTID) }), case {CurrentCount, IsFull} of {0, _} -> ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{ inflight => inflight_count(InflightTID) }), {keep_state, Data1}; {_, true} -> ?tp(buffer_worker_flush_but_inflight_full, #{}), {keep_state, Data1}; {_, false} -> ?tp(buffer_worker_flush_before_pop, #{}), {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}), Data2 = Data1#{queue := Q1}, ?tp(buffer_worker_flush_before_sieve_expired, #{}), Now = now_(), %% if the request has expired, the caller is no longer %% waiting for a response. case sieve_expired_requests(Batch, Now) of {[], _AllExpired} -> ok = replayq:ack(Q1, QAckRef), NumExpired = length(Batch), batch_reply_dropped(Batch, {error, request_expired}), Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}), ?tp(buffer_worker_flush_all_expired, #{batch => Batch}), flush(Data3); {NotExpired, Expired} -> NumExpired = length(Expired), batch_reply_dropped(Expired, {error, request_expired}), Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}), IsBatch = (BatchSize > 1), %% We *must* use the new queue, because we currently can't %% `nack' a `pop'. %% Maybe we could re-open the queue? ?tp( buffer_worker_flush_potentially_partial, #{expired => Expired, not_expired => NotExpired} ), Ref = make_request_ref(), do_flush(Data3, #{ is_batch => IsBatch, batch => NotExpired, ref => Ref, ack_ref => QAckRef }) end end. -spec do_flush(data(), #{ is_batch := boolean(), batch := [queue_query()], ack_ref := replayq:ack_ref(), ref := inflight_key() }) -> gen_statem:event_handler_result(state(), data()). do_flush( #{queue := Q1} = Data0, #{ is_batch := false, batch := Batch, ref := Ref, ack_ref := QAckRef } ) -> #{ id := Id, index := Index, inflight_tid := InflightTID } = Data0, %% unwrap when not batching (i.e., batch size == 1) [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts), Reply = ?REPLY(ReplyTo, HasBeenSent, Result), {ShouldAck, DeltaCounters} = reply_caller(Id, Reply, QueryOpts), Data1 = aggregate_counters(Data0, DeltaCounters), case ShouldAck of %% Failed; remove the request from the queue, as we cannot pop %% from it again, but we'll retry it using the inflight table. nack -> ok = replayq:ack(Q1, QAckRef), %% we set it atomically just below; a limitation of having %% to use tuples for atomic ets updates IsRetriable = true, AsyncWorkerMRef0 = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, AsyncWorkerMRef0), %% we must append again to the table to ensure that the %% request will be retried (i.e., it might not have been %% inserted during `call_query' if the resource was down %% and/or if it was a sync request). inflight_append(InflightTID, InflightItem), mark_inflight_as_retriable(InflightTID, Ref), {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result), store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef), ?tp( buffer_worker_flush_nack, #{ ref => Ref, is_retriable => IsRetriable, batch_or_query => Request, result => Result } ), {next_state, blocked, Data2}; %% Success; just ack. ack -> ok = replayq:ack(Q1, QAckRef), %% Async requests are acked later when the async worker %% calls the corresponding callback function. Also, we %% must ensure the async worker is being monitored for %% such requests. IsUnrecoverableError = is_unrecoverable_error(Result), BufferWorkerPid = self(), case is_async_return(Result) of true when IsUnrecoverableError -> ack_inflight(InflightTID, Ref, BufferWorkerPid); true -> ok; false -> ack_inflight(InflightTID, Ref, BufferWorkerPid) end, {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result), store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef), ?tp( buffer_worker_flush_ack, #{ batch_or_query => Request, result => Result } ), CurrentCount = queue_count(Q1), case CurrentCount > 0 of true -> ?tp(buffer_worker_flush_ack_reflush, #{ batch_or_query => Request, result => Result, queue_count => CurrentCount }), flush_worker(self()); false -> ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{ inflight => inflight_count(InflightTID) }), ok end, {keep_state, Data2} end; do_flush(#{queue := Q1} = Data0, #{ is_batch := true, batch := Batch, ref := Ref, ack_ref := QAckRef }) -> #{ id := Id, index := Index, batch_size := BatchSize, inflight_tid := InflightTID } = Data0, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(async_if_possible, Id, Index, Ref, Batch, QueryOpts), {ShouldAck, DeltaCounters} = batch_reply_caller(Id, Result, Batch, QueryOpts), Data1 = aggregate_counters(Data0, DeltaCounters), case ShouldAck of %% Failed; remove the request from the queue, as we cannot pop %% from it again, but we'll retry it using the inflight table. nack -> ok = replayq:ack(Q1, QAckRef), %% we set it atomically just below; a limitation of having %% to use tuples for atomic ets updates IsRetriable = true, AsyncWorkerMRef0 = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef0), %% we must append again to the table to ensure that the %% request will be retried (i.e., it might not have been %% inserted during `call_query' if the resource was down %% and/or if it was a sync request). inflight_append(InflightTID, InflightItem), mark_inflight_as_retriable(InflightTID, Ref), {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result), store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef), ?tp( buffer_worker_flush_nack, #{ ref => Ref, is_retriable => IsRetriable, batch_or_query => Batch, result => Result } ), {next_state, blocked, Data2}; %% Success; just ack. ack -> ok = replayq:ack(Q1, QAckRef), %% Async requests are acked later when the async worker %% calls the corresponding callback function. Also, we %% must ensure the async worker is being monitored for %% such requests. IsUnrecoverableError = is_unrecoverable_error(Result), BufferWorkerPid = self(), case is_async_return(Result) of true when IsUnrecoverableError -> ack_inflight(InflightTID, Ref, BufferWorkerPid); true -> ok; false -> ack_inflight(InflightTID, Ref, BufferWorkerPid) end, {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result), store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef), CurrentCount = queue_count(Q1), ?tp( buffer_worker_flush_ack, #{ batch_or_query => Batch, result => Result, queue_count => CurrentCount } ), Data3 = case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{ inflight => inflight_count(InflightTID) }), Data2; {true, true} -> ?tp(buffer_worker_flush_ack_reflush, #{ batch_or_query => Batch, result => Result, queue_count => CurrentCount, batch_size => BatchSize }), flush_worker(self()), Data2; {true, false} -> ensure_flush_timer(Data2) end, {keep_state, Data3} end. batch_reply_caller(Id, BatchResult, Batch, QueryOpts) -> {ShouldBlock, PostFn, DeltaCounters} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts), PostFn(), {ShouldBlock, DeltaCounters}. batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> %% the `Mod:on_batch_query/3` returns a single result for a batch, %% so we need to expand Replies = lists:map( fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) -> ?REPLY(FROM, SENT, BatchResult) end, Batch ), {ShouldAck, PostFns, Counters} = lists:foldl( fun(Reply, {_ShouldAck, PostFns, OldCounters}) -> %% _ShouldAck should be the same as ShouldAck starting from the second reply {ShouldAck, PostFn, DeltaCounters} = reply_caller_defer_metrics( Id, Reply, QueryOpts ), {ShouldAck, [PostFn | PostFns], merge_counters(OldCounters, DeltaCounters)} end, {ack, [], #{}}, Replies ), PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end, {ShouldAck, PostFn, Counters}. reply_caller(Id, Reply, QueryOpts) -> {ShouldAck, PostFn, DeltaCounters} = reply_caller_defer_metrics(Id, Reply, QueryOpts), PostFn(), {ShouldAck, DeltaCounters}. %% Should only reply to the caller when the decision is final (not %% retriable). See comment on `handle_query_result_pure'. reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) -> handle_query_result_pure(Id, Result, HasBeenSent); reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) -> IsSimpleQuery = maps:get(simple_query, QueryOpts, false), IsUnrecoverableError = is_unrecoverable_error(Result), {ShouldAck, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent), case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of {ack, {async_return, _}, true, _} -> ok = do_reply_caller(ReplyTo, Result); {ack, {async_return, _}, false, _} -> ok; {_, _, _, true} -> ok = do_reply_caller(ReplyTo, Result); {nack, _, _, _} -> ok; {ack, _, _, _} -> ok = do_reply_caller(ReplyTo, Result) end, {ShouldAck, PostFn, DeltaCounters}. %% This is basically used only by rule actions. To avoid rule action metrics from %% becoming inconsistent when we drop messages, we need a way to signal rule engine that %% this action has reached a conclusion. -spec reply_dropped(reply_fun(), {error, late_reply | request_expired}) -> ok. reply_dropped(_ReplyTo = {Fn, Args, #{reply_dropped := true}}, Result) when is_function(Fn), is_list(Args) -> %% We want to avoid bumping metrics inside the buffer worker, since it's costly. emqx_pool:async_submit(Fn, Args ++ [Result]), ok; reply_dropped(_ReplyTo, _Result) -> ok. -spec batch_reply_dropped([queue_query()], {error, late_reply | request_expired}) -> ok. batch_reply_dropped(Batch, Result) -> lists:foreach( fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt)) -> reply_dropped(ReplyTo, Result) end, Batch ). %% This is only called by `simple_{,a}sync_query', so we can bump the %% counters here. handle_query_result(Id, Result, HasBeenSent) -> {ShouldBlock, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent), PostFn(), bump_counters(Id, DeltaCounters), ShouldBlock. %% We should always retry (nack), except when: %% * resource is not found %% * resource is stopped %% * the result is a success (or at least a delayed result) %% We also retry even sync requests. In that case, we shouldn't reply %% the caller until one of those final results above happen. -spec handle_query_result_pure(id(), term(), HasBeenSent :: boolean()) -> {ack | nack, function(), counters()}. handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{msg => "resource_exception", info => emqx_utils:redact(Msg)}), ok end, {nack, PostFn, #{}}; handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when NotWorking == not_connected; NotWorking == blocked -> {nack, fun() -> ok end, #{}}; handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => "resource_not_found", info => Msg}), ok end, {ack, PostFn, #{dropped_resource_not_found => 1}}; handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => "resource_stopped", info => Msg}), ok end, {ack, PostFn, #{dropped_resource_stopped => 1}}; handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => "other_resource_error", reason => Reason}), ok end, {nack, PostFn, #{}}; handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> case is_unrecoverable_error(Error) of true -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}), ok end, Counters = case HasBeenSent of true -> #{retried_failed => 1}; false -> #{failed => 1} end, {ack, PostFn, Counters}; false -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => "send_error", reason => Reason}), ok end, {nack, PostFn, #{}} end; handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) -> handle_query_async_result_pure(Id, Result, HasBeenSent); handle_query_result_pure(_Id, Result, HasBeenSent) -> PostFn = fun() -> assert_ok_result(Result), ok end, Counters = case HasBeenSent of true -> #{retried_success => 1}; false -> #{success => 1} end, {ack, PostFn, Counters}. -spec handle_query_async_result_pure(id(), term(), HasBeenSent :: boolean()) -> {ack | nack, function(), counters()}. handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> case is_unrecoverable_error(Error) of true -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}), ok end, Counters = case HasBeenSent of true -> #{retried_failed => 1}; false -> #{failed => 1} end, {ack, PostFn, Counters}; false -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => "async_send_error", reason => Reason}), ok end, {nack, PostFn, #{}} end; handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) -> {ack, fun() -> ok end, #{}}; handle_query_async_result_pure(_Id, ok, _HasBeenSent) -> {ack, fun() -> ok end, #{}}. -spec aggregate_counters(data(), counters()) -> data(). aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) -> Counters = merge_counters(OldCounters, DeltaCounters), Data#{counters := Counters}. -spec merge_counters(counters(), counters()) -> counters(). merge_counters(OldCounters, DeltaCounters) -> maps:fold( fun(Metric, Val, Acc) -> maps:update_with(Metric, fun(X) -> X + Val end, Val, Acc) end, OldCounters, DeltaCounters ). -spec flush_metrics(data()) -> data(). flush_metrics(Data = #{id := Id, counters := Counters}) -> bump_counters(Id, Counters), set_gauges(Data), log_expired_message_count(Data), ensure_metrics_flush_timer(Data#{counters := #{}}). -spec ensure_metrics_flush_timer(data()) -> data(). ensure_metrics_flush_timer(Data = #{metrics_tref := undefined, metrics_flush_interval := T}) -> Ref = make_ref(), TRef = erlang:send_after(T, self(), {flush_metrics, Ref}), Data#{metrics_tref := {TRef, Ref}}. -spec bump_counters(id(), counters()) -> ok. bump_counters(Id, Counters) -> Iter = maps:iterator(Counters), do_bump_counters(Iter, Id). do_bump_counters(Iter, Id) -> case maps:next(Iter) of {Key, Val, NIter} -> do_bump_counters1(Key, Val, Id), do_bump_counters(NIter, Id); none -> ok end. do_bump_counters1(dropped_expired, Val, Id) -> emqx_resource_metrics:dropped_expired_inc(Id, Val); do_bump_counters1(dropped_queue_full, Val, Id) -> emqx_resource_metrics:dropped_queue_full_inc(Id, Val); do_bump_counters1(failed, Val, Id) -> emqx_resource_metrics:failed_inc(Id, Val); do_bump_counters1(retried_failed, Val, Id) -> emqx_resource_metrics:retried_failed_inc(Id, Val); do_bump_counters1(success, Val, Id) -> emqx_resource_metrics:success_inc(Id, Val); do_bump_counters1(retried_success, Val, Id) -> emqx_resource_metrics:retried_success_inc(Id, Val); do_bump_counters1(dropped_resource_not_found, Val, Id) -> emqx_resource_metrics:dropped_resource_not_found_inc(Id, Val); do_bump_counters1(dropped_resource_stopped, Val, Id) -> emqx_resource_metrics:dropped_resource_stopped_inc(Id, Val). -spec log_expired_message_count(data()) -> ok. log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counters}) -> ExpiredCount = maps:get(dropped_expired, Counters, 0), case ExpiredCount > 0 of false -> ok; true -> ?SLOG(info, #{ msg => "buffer_worker_dropped_expired_messages", resource_id => Id, worker_index => Index, expired_count => ExpiredCount }), ok end. -spec set_gauges(data()) -> ok. set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := InflightTID}) -> emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), ok. handle_async_worker_down(Data0, Pid) -> #{async_workers := AsyncWorkers0} = Data0, {AsyncWorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0), Data = Data0#{async_workers := AsyncWorkers}, mark_inflight_items_as_retriable(Data, AsyncWorkerMRef), {next_state, blocked, Data}. -spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _. call_query(QM, Id, Index, Ref, Query, QueryOpts) -> ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM}), case emqx_resource_manager:lookup_cached(extract_connector_id(Id)) of %% This seems to be the only place where the `rm_status_stopped' status matters, %% to distinguish from the `disconnected' status. {ok, _Group, #{status := ?rm_status_stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); {ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} -> {error, {unrecoverable_error, unhealthy_target}}; {ok, _Group, Resource} -> do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource); {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") end. %% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1 extract_connector_id(Id) when is_binary(Id) -> case binary:split(Id, <<":">>, [global]) of [ _ChannelGlobalType, _ChannelSubType, _ChannelName, <<"connector">>, ConnectorType, ConnectorName ] -> <<"connector:", ConnectorType/binary, ":", ConnectorName/binary>>; _ -> Id end; extract_connector_id(Id) -> Id. is_channel_id(Id) -> extract_connector_id(Id) =/= Id. %% Check if channel is installed in the connector state. %% There is no need to query the conncector if the channel is not %% installed as the query will fail anyway. pre_query_channel_check({Id, _} = _Request, Channels, QueryOpts) when is_map_key(Id, Channels) -> ChannelStatus = maps:get(Id, Channels), case emqx_resource_manager:channel_status_is_channel_added(ChannelStatus) of true -> ok; false -> error_if_channel_is_not_installed(Id, QueryOpts) end; pre_query_channel_check({Id, _} = _Request, _Channels, QueryOpts) -> error_if_channel_is_not_installed(Id, QueryOpts); pre_query_channel_check(_Request, _Channels, _QueryOpts) -> ok. error_if_channel_is_not_installed(Id, QueryOpts) -> %% Fail with a recoverable error if the channel is not installed and there are buffer %% workers involved so that the operation can be retried. Otherwise, this is %% unrecoverable. It is emqx_resource_manager's responsibility to ensure that the %% channel installation is retried. IsSimpleQuery = maps:get(simple_query, QueryOpts, false), case is_channel_id(Id) of true when IsSimpleQuery -> {error, {unrecoverable_error, iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}}; true -> {error, {recoverable_error, iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}}; false -> ok end. do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Resource) when ReqQM =:= simple_sync_internal_buffer; ReqQM =:= simple_async_internal_buffer -> %% The query overrides the query mode of the resource, send even in disconnected state ?tp(simple_query_override, #{query_mode => ReqQM}), #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource, CallMode = call_mode(QM, CBM), apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts); do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer -> %% The connector supports buffer, send even in disconnected state #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource, CallMode = call_mode(QM, CBM), apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts); do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) -> %% when calling from the buffer worker or other simple queries, %% only apply the query fun when it's at connected status #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource, CallMode = call_mode(QM, CBM), apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts); do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) -> ?RESOURCE_ERROR(not_connected, "resource not connected"). -define(APPLY_RESOURCE(NAME, EXPR, REQ), try %% if the callback module (connector) wants to return an error that %% makes the current resource goes into the `blocked` state, it should %% return `{error, {recoverable_error, Reason}}` EXPR catch %% For convenience and to make the code in the callbacks cleaner an %% error exception with the two following formats are translated to the %% corresponding return values. The receiver of the return values %% recognizes these special return formats and use them to decided if a %% request should be retried. error:{unrecoverable_error, Msg} -> {error, {unrecoverable_error, Msg}}; error:{recoverable_error, Msg} -> {error, {recoverable_error, Msg}}; ERR:REASON:STACKTRACE -> ?RESOURCE_ERROR(exception, #{ name => NAME, id => Id, request => REQ, error => {ERR, REASON}, stacktrace => STACKTRACE }) end ). apply_query_fun( sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, Channels, QueryOpts ) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}), maybe_reply_to( ?APPLY_RESOURCE( call_query, begin case pre_query_channel_check(Request, Channels, QueryOpts) of ok -> Mod:on_query(extract_connector_id(Id), Request, ResSt); Error -> Error end end, Request ), QueryOpts ); apply_query_fun( async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, Channels, QueryOpts ) -> ?tp(call_query_async, #{ id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async }), InflightTID = maps:get(inflight_tid, QueryOpts, undefined), ?APPLY_RESOURCE( call_query_async, begin ReplyFun = fun ?MODULE:handle_async_reply/2, ReplyContext = #{ buffer_worker => self(), resource_id => Id, worker_index => Index, inflight_tid => InflightTID, request_ref => Ref, query_opts => QueryOpts, min_query => minimize(Query) }, IsRetriable = false, AsyncWorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), case pre_query_channel_check(Request, Channels, QueryOpts) of ok -> Result = Mod:on_query_async( extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt ), {async_return, Result}; Error -> maybe_reply_to(Error, QueryOpts) end end, Request ); apply_query_fun( sync, Mod, Id, _Index, _Ref, [?QUERY(_, FirstRequest, _, _) | _] = Batch, ResSt, Channels, QueryOpts ) -> ?tp(call_batch_query, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync }), Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch), maybe_reply_to( ?APPLY_RESOURCE( call_batch_query, begin case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of ok -> Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt); Error -> Error end end, Batch ), QueryOpts ); apply_query_fun( async, Mod, Id, Index, Ref, [?QUERY(_, FirstRequest, _, _) | _] = Batch, ResSt, Channels, QueryOpts ) -> ?tp(call_batch_query_async, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async }), InflightTID = maps:get(inflight_tid, QueryOpts, undefined), ?APPLY_RESOURCE( call_batch_query_async, begin ReplyFun = fun ?MODULE:handle_async_batch_reply/2, ReplyContext = #{ buffer_worker => self(), resource_id => Id, worker_index => Index, inflight_tid => InflightTID, request_ref => Ref, query_opts => QueryOpts, min_batch => minimize(Batch) }, Requests = lists:map( fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch ), IsRetriable = false, AsyncWorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of ok -> Result = Mod:on_batch_query_async( extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt ), {async_return, Result}; Error -> maybe_reply_to(Error, QueryOpts) end end, Batch ). maybe_reply_to(Result, #{reply_to := ReplyTo}) -> do_reply_caller(ReplyTo, Result), Result; maybe_reply_to(Result, _) -> Result. handle_async_reply( #{ request_ref := Ref, inflight_tid := InflightTID, query_opts := Opts } = ReplyContext, Result ) -> case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of discard -> ok; continue -> handle_async_reply1(ReplyContext, Result) end. handle_async_reply1( #{ request_ref := Ref, inflight_tid := InflightTID, resource_id := Id, buffer_worker := BufferWorkerPid, min_query := ?QUERY(ReplyTo, _, _, ExpireAt) = _Query } = ReplyContext, Result ) -> ?tp( handle_async_reply_enter, #{batch_or_query => [_Query], ref => Ref, result => Result} ), Now = now_(), case is_expired(ExpireAt, Now) of true -> IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), %% evalutate metrics call here since we're not inside %% buffer worker IsAcked andalso begin emqx_resource_metrics:late_reply_inc(Id), reply_dropped(ReplyTo, {error, late_reply}) end, ?tp(handle_async_reply_expired, #{expired => [_Query]}), ok; false -> do_handle_async_reply(ReplyContext, Result) end. do_handle_async_reply( #{ query_opts := QueryOpts, resource_id := Id, request_ref := Ref, buffer_worker := BufferWorkerPid, inflight_tid := InflightTID, min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query }, Result ) -> %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. {Action, PostFn, DeltaCounters} = reply_caller_defer_metrics( Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts ), ?tp(handle_async_reply, #{ action => Action, batch_or_query => [_Query], ref => Ref, result => Result }), case Action of nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), ok = ?MODULE:block(BufferWorkerPid), blocked; ack -> ok = do_async_ack( InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts ) end. handle_async_batch_reply( #{ inflight_tid := InflightTID, request_ref := Ref, query_opts := Opts } = ReplyContext, Result ) -> case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of discard -> ok; continue -> handle_async_batch_reply1(ReplyContext, Result) end. handle_async_batch_reply1( #{ inflight_tid := InflightTID, request_ref := Ref, min_batch := Batch } = ReplyContext, Result ) -> ?tp( handle_async_reply_enter, #{batch_or_query => Batch, ref => Ref, result => Result} ), Now = now_(), case sieve_expired_requests(Batch, Now) of {_NotExpired, []} -> %% this is the critical code path, %% we try not to do ets:lookup in this case %% because the batch can be quite big do_handle_async_batch_reply(ReplyContext, Result); {_NotExpired, _Expired} -> %% at least one is expired %% the batch from reply context is minimized, so it cannot be used %% to update the inflight items, hence discard Batch and lookup the RealBatch ?tp(handle_async_reply_expired, #{expired => _Expired}), handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) end. handle_async_batch_reply2([], _, _, _) -> %% this usually should never happen unless the async callback is being evaluated concurrently ok; handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _AsyncWorkerMRef) = Inflight, #{ resource_id := Id, buffer_worker := BufferWorkerPid, inflight_tid := InflightTID, request_ref := Ref, min_batch := Batch } = ReplyContext, %% All batch items share the same HasBeenSent flag %% So we just take the original flag from the ReplyContext batch %% and put it back to the batch found in inflight table %% which must have already been set to `false` [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch, {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now), RealNotExpired = lists:map( fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) -> ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt) end, RealNotExpired0 ), NumExpired = length(RealExpired), %% evalutate metrics call here since we're not inside buffer %% worker emqx_resource_metrics:late_reply_inc(Id, NumExpired), batch_reply_dropped(RealExpired, {error, late_reply}), case RealNotExpired of [] -> %% all expired, no need to update back the inflight batch _ = ack_inflight(InflightTID, Ref, BufferWorkerPid), ok; _ -> %% some queries are not expired, put them back to the inflight batch %% so it can be either acked now or retried later ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), ?tp_ignore_side_effects_in_prod( handle_async_reply_partially_expired, #{ inflight_count => inflight_count(InflightTID), num_inflight_messages => inflight_num_msgs(InflightTID) } ), do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result) end. do_handle_async_batch_reply( #{ buffer_worker := BufferWorkerPid, resource_id := Id, inflight_tid := InflightTID, request_ref := Ref, min_batch := Batch, query_opts := QueryOpts }, Result ) -> {Action, PostFn, DeltaCounters} = batch_reply_caller_defer_metrics( Id, Result, Batch, QueryOpts ), ?tp(handle_async_reply, #{ action => Action, batch_or_query => Batch, ref => Ref, result => Result }), case Action of nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), ok = ?MODULE:block(BufferWorkerPid), blocked; ack -> ok = do_async_ack( InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts ) end. do_async_ack(InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts) -> IsKnownRef = ack_inflight(InflightTID, Ref, BufferWorkerPid), case maps:get(simple_query, QueryOpts, false) of true -> PostFn(), bump_counters(Id, DeltaCounters); false when IsKnownRef -> PostFn(), bump_counters(Id, DeltaCounters); false -> ok end, ok. %% check if the async reply is valid. %% e.g. if a connector evaluates the callback more than once: %% 1. If the request was previously deleted from inflight table due to %% either succeeded previously or expired, this function logs a %% warning message and returns 'discard' instruction. %% 2. If the request was previously failed and now pending on a retry, %% then this function will return 'continue' as there is no way to %% tell if this reply is stae or not. maybe_handle_unknown_async_reply(undefined, _Ref, #{simple_query := true}) -> continue; maybe_handle_unknown_async_reply(InflightTID, Ref, #{}) -> try ets:member(InflightTID, Ref) of true -> continue; false -> ?tp( warning, unknown_async_reply_discarded, #{inflight_key => Ref} ), discard catch error:badarg -> %% shutdown ? discard end. %%============================================================================== %% operations for queue queue_item_marshaller(Bin) when is_binary(Bin) -> binary_to_term(Bin); queue_item_marshaller(Item) -> term_to_binary(Item). estimate_size(QItem) -> erlang:external_size(QItem). -spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> {[queue_query()], replayq:q(), counters()}. append_queue(Id, Index, Q, Queries) -> %% this assertion is to ensure that we never append a raw binary %% because the marshaller will get lost. false = is_binary(hd(Queries)), Q0 = replayq:append(Q, Queries), {Overflown, Q2, DeltaCounters} = case replayq:overflow(Q0) of OverflownBytes when OverflownBytes =< 0 -> {[], Q0, #{}}; OverflownBytes -> PopOpts = #{bytes_limit => OverflownBytes, count_limit => 999999999}, {Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), Counters = #{dropped_queue_full => Dropped}, ?SLOG(info, #{ msg => "buffer_worker_overflow", resource_id => Id, worker_index => Index, dropped => Dropped }), {Items2, Q1, Counters} end, ?tp( buffer_worker_appended_to_queue, #{ id => Id, items => Queries, queue_count => queue_count(Q2), overflown => length(Overflown) } ), {Overflown, Q2, DeltaCounters}. %%============================================================================== %% the inflight queue for async query -define(MAX_SIZE_REF, max_size). -define(SIZE_REF, size). -define(BATCH_COUNT_REF, batch_count). -define(INITIAL_TIME_REF, initial_time). -define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time). inflight_new(InfltWinSZ) -> TableId = ets:new( emqx_resource_buffer_worker_inflight_tab, [ordered_set, public, {write_concurrency, true}] ), inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}), %% we use this counter because we might deal with batches as %% elements. inflight_append(TableId, {?SIZE_REF, 0}), inflight_append(TableId, {?BATCH_COUNT_REF, 0}), inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}), inflight_append(TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}), TableId. -spec inflight_get_first_retriable(ets:tid(), integer()) -> none | {expired, inflight_key(), [queue_query()]} | {single, inflight_key(), queue_query()} | {batch, inflight_key(), _NotExpired :: [queue_query()], _Expired :: [queue_query()]}. inflight_get_first_retriable(InflightTID, Now) -> MatchSpec = ets:fun2ms( fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _AsyncWorkerMRef)) when IsRetriable =:= true -> {Ref, BatchOrQuery} end ), case ets:select(InflightTID, MatchSpec, _Limit = 1) of '$end_of_table' -> none; {[{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} -> case is_expired(ExpireAt, Now) of true -> {expired, Ref, [Query]}; false -> {single, Ref, Query} end; {[{Ref, Batch = [_ | _]}], _Continuation} -> case sieve_expired_requests(Batch, Now) of {[], _AllExpired} -> {expired, Ref, Batch}; {NotExpired, Expired} -> {batch, Ref, NotExpired, Expired} end end. is_inflight_full(undefined) -> false; is_inflight_full(InflightTID) -> [{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF), %% we consider number of batches rather than number of messages %% because one batch request may hold several messages. Size = inflight_count(InflightTID), Size >= MaxSize. inflight_count(InflightTID) -> emqx_utils_ets:lookup_value(InflightTID, ?BATCH_COUNT_REF, 0). inflight_num_msgs(InflightTID) -> [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF), Size. inflight_append(undefined, _InflightItem) -> ok; inflight_append( InflightTID, ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef) ) -> Batch = mark_as_sent(Batch0), InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), BatchSize = length(Batch), IsNew andalso inc_inflight(InflightTID, BatchSize), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; inflight_append( InflightTID, ?INFLIGHT_ITEM( Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, AsyncWorkerMRef ) ) -> Query = mark_as_sent(Query0), InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), IsNew andalso inc_inflight(InflightTID, 1), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; inflight_append(InflightTID, {Ref, Data}) -> ets:insert(InflightTID, {Ref, Data}), %% this is a metadata row being inserted; therefore, we don't bump %% the inflight metric. ok. %% a request was already appended and originally not retriable, but an %% error occurred and it is now retriable. mark_inflight_as_retriable(undefined, _Ref) -> ok; mark_inflight_as_retriable(InflightTID, Ref) -> _ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}), %% the old worker's DOWN should not affect this inflight any more _ = ets:update_element(InflightTID, Ref, {?WORKER_MREF_IDX, erased}), ok. %% Track each worker pid only once. ensure_async_worker_monitored( Data0 = #{async_workers := AsyncWorkers}, {async_return, {ok, AsyncWorkerPid}} = _Result ) when is_pid(AsyncWorkerPid), is_map_key(AsyncWorkerPid, AsyncWorkers) -> AsyncWorkerMRef = maps:get(AsyncWorkerPid, AsyncWorkers), {Data0, AsyncWorkerMRef}; ensure_async_worker_monitored( Data0 = #{async_workers := AsyncWorkers0}, {async_return, {ok, AsyncWorkerPid}} ) when is_pid(AsyncWorkerPid) -> AsyncWorkerMRef = monitor(process, AsyncWorkerPid), AsyncWorkers = AsyncWorkers0#{AsyncWorkerPid => AsyncWorkerMRef}, Data = Data0#{async_workers := AsyncWorkers}, {Data, AsyncWorkerMRef}; ensure_async_worker_monitored(Data0, _Result) -> {Data0, undefined}. -spec store_async_worker_reference(undefined | ets:tid(), inflight_key(), undefined | reference()) -> ok. store_async_worker_reference(undefined = _InflightTID, _Ref, _AsyncWorkerMRef) -> ok; store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) -> ok; store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef) when is_reference(AsyncWorkerMRef) -> _ = ets:update_element( InflightTID, Ref, {?WORKER_MREF_IDX, AsyncWorkerMRef} ), ok. ack_inflight(undefined, _Ref, _BufferWorkerPid) -> false; ack_inflight(InflightTID, Ref, BufferWorkerPid) -> {Count, Removed} = case ets:take(InflightTID, Ref) of [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _AsyncWorkerMRef)] -> {1, true}; [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef)] -> {length(Batch), true}; [] -> {0, false} end, FlushCheck = dec_inflight_remove(InflightTID, Count, Removed), case FlushCheck of no_flush -> ok; flush -> ?MODULE:flush_worker(BufferWorkerPid) end, IsKnownRef = (Count > 0), IsKnownRef. mark_inflight_items_as_retriable(Data, AsyncWorkerMRef) -> #{inflight_tid := InflightTID} = Data, IsRetriable = true, MatchSpec = ets:fun2ms( fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, _IsRetriable, AsyncWorkerMRef0)) when AsyncWorkerMRef =:= AsyncWorkerMRef0 -> ?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef0) end ), _NumAffected = ets:select_replace(InflightTID, MatchSpec), ?tp(buffer_worker_async_agent_down, #{num_affected => _NumAffected, buffer_worker => self()}), ok. %% used to update a batch after dropping expired individual queries. update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), ok = dec_inflight_update(InflightTID, NumExpired). inc_inflight(InflightTID, Count) -> _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}), _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}), ok. -spec dec_inflight_remove(undefined | ets:tid(), non_neg_integer(), Removed :: boolean()) -> no_flush | flush. dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) -> no_flush; dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) -> NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0), %% if the new value is Max - 1, it means that we've just made room %% in the inflight table, so we should poke the buffer worker to %% make it continue flushing. case NewValue =:= MaxValue - 1 of true -> flush; false -> no_flush end; dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 -> %% If Count > 0, it must have been removed NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0), %% if the new value is Max - 1, it means that we've just made room %% in the inflight table, so we should poke the buffer worker to %% make it continue flushing. case NewValue =:= MaxValue - 1 of true -> flush; false -> no_flush end. dec_inflight_update(_InflightTID, _Count = 0) -> ok; dec_inflight_update(InflightTID, Count) when Count > 0 -> _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), ok. %%============================================================================== call_mode(force_sync, _) -> sync; call_mode(async_if_possible, always_sync) -> sync; call_mode(async_if_possible, async_if_possible) -> async. assert_ok_result(ok) -> true; assert_ok_result({async_return, R}) -> assert_ok_result(R); assert_ok_result(R) when is_tuple(R) -> try ok = erlang:element(1, R) catch error:{badmatch, _} -> error({not_ok_result, R}) end; assert_ok_result(R) -> error({not_ok_result, R}). queue_count(Q) -> replayq:count(Q). disk_queue_dir(Id, Index) -> QDir0 = binary_to_list(Id) ++ ":" ++ integer_to_list(Index), QDir = filename:join([emqx:data_dir(), "bufs", node(), QDir0]), emqx_utils:safe_filename(QDir). clear_disk_queue_dir(Id, Index) -> ReplayQDir = disk_queue_dir(Id, Index), case file:del_dir_r(ReplayQDir) of {error, enoent} -> ok; Res -> Res end. ensure_flush_timer(Data = #{batch_time := T}) -> ensure_flush_timer(Data, T). ensure_flush_timer(Data = #{tref := undefined}, 0) -> %% if the batch_time is 0, we don't need to start a timer, which %% can be costly at high rates. Ref = make_ref(), self() ! {flush, Ref}, Data#{tref => {Ref, Ref}}; ensure_flush_timer(Data = #{tref := undefined}, T) -> Ref = make_ref(), TRef = erlang:send_after(T, self(), {flush, Ref}), Data#{tref => {TRef, Ref}}; ensure_flush_timer(Data, _T) -> Data. cancel_flush_timer(St = #{tref := undefined}) -> St; cancel_flush_timer(St = #{tref := {TRef, _Ref}}) -> _ = erlang:cancel_timer(TRef), St#{tref => undefined}. -spec make_request_ref() -> inflight_key(). make_request_ref() -> now_(). collect_requests(Acc, Limit) -> Count = length(Acc), do_collect_requests(Acc, Count, Limit). do_collect_requests(Acc, Count, Limit) when Count >= Limit -> lists:reverse(Acc); do_collect_requests(Acc, Count, Limit) -> receive ?SEND_REQ(_ReplyTo, _Req) = Request -> do_collect_requests([Request | Acc], Count + 1, Limit) after 0 -> lists:reverse(Acc) end. mark_as_sent(Batch) when is_list(Batch) -> lists:map(fun mark_as_sent/1, Batch); mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt)) -> HasBeenSent = true, ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt). is_unrecoverable_error({error, {unrecoverable_error, _}}) -> true; is_unrecoverable_error({error, {recoverable_error, _}}) -> false; is_unrecoverable_error({async_return, Result}) -> is_unrecoverable_error(Result); is_unrecoverable_error({error, _}) -> %% TODO: delete this clause. %% Ideally all errors except for 'unrecoverable_error' should be %% retried, including DB schema errors. true; is_unrecoverable_error(_) -> false. is_async_return({async_return, _}) -> true; is_async_return(_) -> false. sieve_expired_requests(Batch, Now) -> lists:partition( fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) -> not is_expired(ExpireAt, Now) end, Batch ). -spec is_expired(infinity | integer(), integer()) -> boolean(). is_expired(infinity = _ExpireAt, _Now) -> false; is_expired(ExpireAt, Now) -> Now > ExpireAt. now_() -> erlang:monotonic_time(nanosecond). -spec ensure_timeout_query_opts(query_opts(), sync | async) -> query_opts(). ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) -> Opts; ensure_timeout_query_opts(#{} = Opts0, sync) -> Opts0#{timeout => ?DEFAULT_REQUEST_TTL}; ensure_timeout_query_opts(#{} = Opts0, async) -> Opts0#{timeout => infinity}. -spec ensure_expire_at(query_opts()) -> query_opts(). ensure_expire_at(#{expire_at := _} = Opts) -> Opts; ensure_expire_at(#{timeout := infinity} = Opts) -> Opts#{expire_at => infinity}; ensure_expire_at(#{timeout := TimeoutMS} = Opts) -> TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond), ExpireAt = now_() + TimeoutNS, Opts#{expire_at => ExpireAt}. %% no need to keep the request for async reply handler minimize(?QUERY(_, _, _, _) = Q) -> do_minimize(Q); minimize(L) when is_list(L) -> lists:map(fun do_minimize/1, L). -ifdef(TEST). do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query. -else. do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt). -endif. %% To avoid message loss due to misconfigurations, we adjust %% `batch_time' based on `request_ttl'. If `batch_time' > %% `request_ttl', all requests will timeout before being sent if %% the message rate is low. Even worse if `pool_size' is high. %% We cap `batch_time' at `request_ttl div 2' as a rule of thumb. adjust_batch_time(_Id, _RequestTTL = infinity, BatchTime0) -> BatchTime0; adjust_batch_time(Id, RequestTTL, BatchTime0) -> BatchTime = max(0, min(BatchTime0, RequestTTL div 2)), case BatchTime =:= BatchTime0 of false -> ?SLOG(info, #{ id => Id, msg => "adjusting_buffer_worker_batch_time", new_batch_time => BatchTime }); true -> ok end, BatchTime. replayq_opts(Id, Index, Opts) -> BufferMode = maps:get(buffer_mode, Opts, memory_only), TotalBytes = maps:get(max_buffer_bytes, Opts, ?DEFAULT_BUFFER_BYTES), case BufferMode of memory_only -> #{ mem_only => true, marshaller => fun ?MODULE:queue_item_marshaller/1, max_total_bytes => TotalBytes, sizer => fun ?MODULE:estimate_size/1 }; volatile_offload -> SegBytes0 = maps:get(buffer_seg_bytes, Opts, TotalBytes), SegBytes = min(SegBytes0, TotalBytes), #{ dir => disk_queue_dir(Id, Index), marshaller => fun ?MODULE:queue_item_marshaller/1, max_total_bytes => TotalBytes, %% we don't want to retain the queue after %% resource restarts. offload => {true, volatile}, seg_bytes => SegBytes, sizer => fun ?MODULE:estimate_size/1 } end. %% The request timeout should be greater than the resume interval, as %% it defines how often the buffer worker tries to unblock. If request %% timeout is <= resume interval and the buffer worker is ever %% blocked, than all queued requests will basically fail without being %% attempted. -spec default_resume_interval(request_ttl(), health_check_interval()) -> timeout_ms(). default_resume_interval(_RequestTTL = infinity, HealthCheckInterval) -> max(1, HealthCheckInterval); default_resume_interval(RequestTTL, HealthCheckInterval) -> max(1, min(HealthCheckInterval, RequestTTL div 3)). -spec reply_call(reference(), term()) -> ok. reply_call(Alias, Response) -> %% Since we use a reference created with `{alias, %% reply_demonitor}', after we `demonitor' it in case of a %% timeout, we won't send any more messages that the caller is not %% expecting anymore. Using `gen_statem:reply({pid(), %% reference()}, _)' would still send a late reply even after the %% demonitor. erlang:send(Alias, {Alias, Response}), ok. %% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to' %% callbacks. reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) -> ?MODULE:reply_call(ReplyAlias, Response), do_reply_caller(MaybeReplyTo, Response). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). adjust_batch_time_test_() -> %% just for logging Id = some_id, [ {"batch time smaller than request_time/2", ?_assertEqual( 100, adjust_batch_time(Id, 500, 100) )}, {"batch time equal to request_time/2", ?_assertEqual( 100, adjust_batch_time(Id, 200, 100) )}, {"batch time greater than request_time/2", ?_assertEqual( 50, adjust_batch_time(Id, 100, 100) )}, {"batch time smaller than request_time/2 (request_time = infinity)", ?_assertEqual( 100, adjust_batch_time(Id, infinity, 100) )} ]. -endif.