diff --git a/apps/emqx_bridge/include/emqx_bridge.hrl b/apps/emqx_bridge/include/emqx_bridge.hrl index d8229cc77..d062b4b7f 100644 --- a/apps/emqx_bridge/include/emqx_bridge.hrl +++ b/apps/emqx_bridge/include/emqx_bridge.hrl @@ -16,19 +16,21 @@ -define(EMPTY_METRICS, ?METRICS( - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ) ). -define(METRICS( Dropped, DroppedOther, + DroppedExpired, DroppedQueueFull, DroppedResourceNotFound, DroppedResourceStopped, Matched, Queued, Retried, + LateReply, SentFailed, SentInflight, SentSucc, @@ -40,12 +42,14 @@ #{ 'dropped' => Dropped, 'dropped.other' => DroppedOther, + 'dropped.expired' => DroppedExpired, 'dropped.queue_full' => DroppedQueueFull, 'dropped.resource_not_found' => DroppedResourceNotFound, 'dropped.resource_stopped' => DroppedResourceStopped, 'matched' => Matched, 'queuing' => Queued, 'retried' => Retried, + 'late_reply' => LateReply, 'failed' => SentFailed, 'inflight' => SentInflight, 'success' => SentSucc, @@ -59,12 +63,14 @@ -define(metrics( Dropped, DroppedOther, + DroppedExpired, DroppedQueueFull, DroppedResourceNotFound, DroppedResourceStopped, Matched, Queued, Retried, + LateReply, SentFailed, SentInflight, SentSucc, @@ -76,12 +82,14 @@ #{ 'dropped' := Dropped, 'dropped.other' := DroppedOther, + 'dropped.expired' := DroppedExpired, 'dropped.queue_full' := DroppedQueueFull, 'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, 'queuing' := Queued, 'retried' := Retried, + 'late_reply' := LateReply, 'failed' := SentFailed, 'inflight' := SentInflight, 'success' := SentSucc, diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 50505effc..2c43ce5d7 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -751,11 +751,11 @@ aggregate_metrics(AllMetrics) -> fun( #{ metrics := ?metrics( - M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15 + M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17 ) }, ?metrics( - N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15 + N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17 ) ) -> ?METRICS( @@ -773,7 +773,9 @@ aggregate_metrics(AllMetrics) -> M12 + N12, M13 + N13, M14 + N14, - M15 + N15 + M15 + N15, + M16 + N16, + M17 + N17 ) end, InitMetrics, @@ -805,11 +807,13 @@ format_metrics(#{ counters := #{ 'dropped' := Dropped, 'dropped.other' := DroppedOther, + 'dropped.expired' := DroppedExpired, 'dropped.queue_full' := DroppedQueueFull, 'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, 'retried' := Retried, + 'late_reply' := LateReply, 'failed' := SentFailed, 'success' := SentSucc, 'received' := Rcvd @@ -824,12 +828,14 @@ format_metrics(#{ ?METRICS( Dropped, DroppedOther, + DroppedExpired, DroppedQueueFull, DroppedResourceNotFound, DroppedResourceStopped, Matched, Queued, Retried, + LateReply, SentFailed, SentInflight, SentSucc, diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 1b4eac73e..1f5b06fab 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -830,7 +830,8 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"resource_opts">> => #{ <<"worker_pool_size">> => 2, <<"query_mode">> => <<"sync">>, - <<"request_timeout">> => <<"500ms">>, + %% using a long time so we can test recovery + <<"request_timeout">> => <<"15s">>, %% to make it check the healthy quickly <<"health_check_interval">> => <<"0.5s">> } @@ -898,8 +899,10 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> ), Payload1 = <<"hello2">>, Payload2 = <<"hello3">>, - emqx:publish(emqx_message:make(LocalTopic, Payload1)), - emqx:publish(emqx_message:make(LocalTopic, Payload2)), + %% we need to to it in other processes because it'll block due to + %% the long timeout + spawn(fun() -> emqx:publish(emqx_message:make(LocalTopic, Payload1)) end), + spawn(fun() -> emqx:publish(emqx_message:make(LocalTopic, Payload2)) end), {ok, _} = snabbkaffe:receive_events(SRef), %% verify the metrics of the bridge, the message should be queued @@ -917,9 +920,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"matched">> := Matched, <<"success">> := 1, <<"failed">> := 0, - <<"queuing">> := 1, - <<"inflight">> := 1 - } when Matched >= 3, + <<"queuing">> := Queuing, + <<"inflight">> := Inflight + } when Matched >= 3 andalso Inflight + Queuing == 2, maps:get(<<"metrics">>, DecodedMetrics1) ), diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index d7b080ae8..4f2a4883b 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -29,6 +29,8 @@ -type query_opts() :: #{ %% The key used for picking a resource worker pick_key => term(), + timeout => timeout(), + expire_at => infinity | integer(), async_reply_fun => reply_fun() }. -type resource_data() :: #{ diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 4b16ed3d5..8b79ce5a8 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -60,21 +60,23 @@ -define(COLLECT_REQ_LIMIT, 1000). -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}). --define(QUERY(FROM, REQUEST, SENT), {query, FROM, REQUEST, SENT}). +-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}). -define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}). -define(EXPAND(RESULT, BATCH), [ ?REPLY(FROM, REQUEST, SENT, RESULT) - || ?QUERY(FROM, REQUEST, SENT) <- BATCH + || ?QUERY(FROM, REQUEST, SENT, _EXPIRE_AT) <- BATCH ]). -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef), {Ref, BatchOrQuery, IsRetriable, WorkerMRef} ). +-define(ITEM_IDX, 2). -define(RETRY_IDX, 3). -define(WORKER_MREF_IDX, 4). -type id() :: binary(). -type index() :: pos_integer(). --type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean()). +-type expire_at() :: infinity | integer(). +-type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean(), expire_at()). -type request() :: term(). -type from() :: pid() | reply_fun() | request_from(). -type request_from() :: undefined | gen_statem:from(). @@ -98,14 +100,18 @@ start_link(Id, Index, Opts) -> gen_statem:start_link(?MODULE, {Id, Index, Opts}, []). -spec sync_query(id(), request(), query_opts()) -> Result :: term(). -sync_query(Id, Request, Opts) -> +sync_query(Id, Request, Opts0) -> + Opts1 = ensure_timeout_query_opts(Opts0, sync), + Opts = ensure_expire_at(Opts1), PickKey = maps:get(pick_key, Opts, self()), - Timeout = maps:get(timeout, Opts, timer:seconds(15)), + Timeout = maps:get(timeout, Opts), emqx_resource_metrics:matched_inc(Id), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). -spec async_query(id(), request(), query_opts()) -> Result :: term(). -async_query(Id, Request, Opts) -> +async_query(Id, Request, Opts0) -> + Opts1 = ensure_timeout_query_opts(Opts0, async), + Opts = ensure_expire_at(Opts1), PickKey = maps:get(pick_key, Opts, self()), emqx_resource_metrics:matched_inc(Id), pick_cast(Id, PickKey, {query, Request, Opts}). @@ -120,11 +126,15 @@ simple_sync_query(Id, Request) -> %% would mess up the metrics anyway. `undefined' is ignored by %% `emqx_resource_metrics:*_shift/3'. Index = undefined, - QueryOpts = #{simple_query => true}, + QueryOpts0 = #{simple_query => true, timeout => infinity}, + QueryOpts = #{expire_at := ExpireAt} = ensure_expire_at(QueryOpts0), emqx_resource_metrics:matched_inc(Id), Ref = make_message_ref(), - Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts), HasBeenSent = false, + From = self(), + Result = call_query( + sync, Id, Index, Ref, ?QUERY(From, Request, HasBeenSent, ExpireAt), QueryOpts + ), _ = handle_query_result(Id, Result, HasBeenSent), Result. @@ -179,9 +189,14 @@ init({Id, Index, Opts}) -> ?tp(buffer_worker_init, #{id => Id, index => Index}), {ok, running, Data}. -running(enter, _, St) -> +running(enter, _, Data) -> ?tp(buffer_worker_enter_running, #{}), - maybe_flush(St); + %% According to `gen_statem' laws, we mustn't call `maybe_flush' + %% directly because it may decide to return `{next_state, blocked, _}', + %% and that's an invalid response for a state enter call. + %% Returning a next event from a state enter call is also + %% prohibited. + {keep_state, ensure_flush_timer(Data, 0)}; running(cast, resume, _St) -> keep_state_and_data; running(cast, flush, Data) -> @@ -243,9 +258,9 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%============================================================================== --define(PICK(ID, KEY, EXPR), +-define(PICK(ID, KEY, PID, EXPR), try gproc_pool:pick_worker(ID, KEY) of - Pid when is_pid(Pid) -> + PID when is_pid(PID) -> EXPR; _ -> ?RESOURCE_ERROR(worker_not_created, "resource not created") @@ -258,7 +273,7 @@ code_change(_OldVsn, State, _Extra) -> ). pick_call(Id, Key, Query, Timeout) -> - ?PICK(Id, Key, begin + ?PICK(Id, Key, Pid, begin Caller = self(), MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]), From = {Caller, MRef}, @@ -281,15 +296,21 @@ pick_call(Id, Key, Query, Timeout) -> end). pick_cast(Id, Key, Query) -> - ?PICK(Id, Key, begin + ?PICK(Id, Key, Pid, begin From = undefined, erlang:send(Pid, ?SEND_REQ(From, Query)), ok end). resume_from_blocked(Data) -> - #{inflight_tid := InflightTID} = Data, - case inflight_get_first_retriable(InflightTID) of + ?tp(buffer_worker_resume_from_blocked_enter, #{}), + #{ + id := Id, + index := Index, + inflight_tid := InflightTID + } = Data, + Now = now_(), + case inflight_get_first_retriable(InflightTID, Now) of none -> case is_inflight_full(InflightTID) of true -> @@ -297,14 +318,32 @@ resume_from_blocked(Data) -> false -> {next_state, running, Data} end; - {Ref, FirstQuery} -> + {expired, Ref, Batch} -> + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)), + ?tp(buffer_worker_retry_expired, #{expired => Batch}), + resume_from_blocked(Data); + {single, Ref, Query} -> %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. case is_inflight_full(InflightTID) of true -> {keep_state, Data}; false -> - retry_inflight_sync(Ref, FirstQuery, Data) + retry_inflight_sync(Ref, Query, Data) + end; + {batch, Ref, NotExpired, Expired} -> + update_inflight_item(InflightTID, Ref, NotExpired), + NumExpired = length(Expired), + emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), + NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}), + %% We retry msgs in inflight window sync, as if we send them + %% async, they will be appended to the end of inflight window again. + case is_inflight_full(InflightTID) of + true -> + {keep_state, Data}; + false -> + retry_inflight_sync(Ref, NotExpired, Data) end end. @@ -320,10 +359,10 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), ReplyResult = case QueryOrBatch of - ?QUERY(From, CoreReq, HasBeenSent) -> + ?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) -> Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), reply_caller_defer_metrics(Id, Reply, QueryOpts); - [?QUERY(_, _, _) | _] = Batch -> + [?QUERY(_, _, _, _) | _] = Batch -> batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) end, case ReplyResult of @@ -378,10 +417,12 @@ collect_and_enqueue_query_requests(Request0, Data0) -> (?SEND_REQ(undefined = _From, {query, Req, Opts})) -> ReplyFun = maps:get(async_reply_fun, Opts, undefined), HasBeenSent = false, - ?QUERY(ReplyFun, Req, HasBeenSent); - (?SEND_REQ(From, {query, Req, _Opts})) -> + ExpireAt = maps:get(expire_at, Opts), + ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt); + (?SEND_REQ(From, {query, Req, Opts})) -> HasBeenSent = false, - ?QUERY(From, Req, HasBeenSent) + ExpireAt = maps:get(expire_at, Opts), + ?QUERY(From, Req, HasBeenSent, ExpireAt) end, Requests ), @@ -406,6 +447,8 @@ maybe_flush(Data0) -> -spec flush(data()) -> gen_statem:event_handler_result(state(), data()). flush(Data0) -> #{ + id := Id, + index := Index, batch_size := BatchSize, inflight_tid := InflightTID, queue := Q0 @@ -419,25 +462,45 @@ flush(Data0) -> Data2 = ensure_flush_timer(Data1), {keep_state, Data2}; {_, false} -> + ?tp(buffer_worker_flush_before_pop, #{}), {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}), - IsBatch = BatchSize =/= 1, - %% We *must* use the new queue, because we currently can't - %% `nack' a `pop'. - %% Maybe we could re-open the queue? Data2 = Data1#{queue := Q1}, - Ref = make_message_ref(), - do_flush(Data2, #{ - new_queue => Q1, - is_batch => IsBatch, - batch => Batch, - ref => Ref, - ack_ref => QAckRef - }) + ?tp(buffer_worker_flush_before_sieve_expired, #{}), + Now = now_(), + %% if the request has expired, the caller is no longer + %% waiting for a response. + case sieve_expired_requests(Batch, Now) of + all_expired -> + ok = replayq:ack(Q1, QAckRef), + emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + ?tp(buffer_worker_flush_all_expired, #{batch => Batch}), + flush(Data2); + {NotExpired, Expired} -> + NumExpired = length(Expired), + emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), + IsBatch = BatchSize =/= 1, + %% We *must* use the new queue, because we currently can't + %% `nack' a `pop'. + %% Maybe we could re-open the queue? + ?tp( + buffer_worker_flush_potentially_partial, + #{expired => Expired, not_expired => NotExpired} + ), + Ref = make_message_ref(), + do_flush(Data2, #{ + new_queue => Q1, + is_batch => IsBatch, + batch => NotExpired, + ref => Ref, + ack_ref => QAckRef + }) + end end. -spec do_flush(data(), #{ is_batch := boolean(), - batch := [?QUERY(from(), request(), boolean())], + batch := [queue_query()], ack_ref := replayq:ack_ref(), ref := inflight_key(), new_queue := replayq:q() @@ -459,7 +522,7 @@ do_flush( inflight_tid := InflightTID } = Data0, %% unwrap when not batching (i.e., batch size == 1) - [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch, + [?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) = Request] = Batch, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), @@ -812,10 +875,10 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> end ). -apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}), ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); -apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> +apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{ id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async }), @@ -834,13 +897,13 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt end, Request ); -apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync }), - Requests = [Request || ?QUERY(_From, Request, _) <- Batch], + Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch], ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); -apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> +apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async }), @@ -850,7 +913,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt begin ReplyFun = fun ?MODULE:batch_reply_after_query/8, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]}, - Requests = [Request || ?QUERY(_From, Request, _) <- Batch], + Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch], IsRetriable = false, WorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), @@ -862,7 +925,41 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt ). reply_after_query( - Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), QueryOpts, Result + Pid, + Id, + Index, + InflightTID, + Ref, + ?QUERY(_From, _Request, _HasBeenSent, ExpireAt) = Query, + QueryOpts, + Result +) -> + ?tp( + buffer_worker_reply_after_query_enter, + #{batch_or_query => [Query], ref => Ref} + ), + Now = now_(), + case is_expired(ExpireAt, Now) of + true -> + IsFullBefore = is_inflight_full(InflightTID), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), + IsFullBefore andalso ?MODULE:flush_worker(Pid), + ?tp(buffer_worker_reply_after_query_expired, #{expired => [Query]}), + ok; + false -> + do_reply_after_query(Pid, Id, Index, InflightTID, Ref, Query, QueryOpts, Result) + end. + +do_reply_after_query( + Pid, + Id, + Index, + InflightTID, + Ref, + ?QUERY(From, Request, HasBeenSent, _ExpireAt), + QueryOpts, + Result ) -> %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the @@ -875,7 +972,7 @@ reply_after_query( %% Keep retrying. ?tp(buffer_worker_reply_after_query, #{ action => Action, - batch_or_query => ?QUERY(From, Request, HasBeenSent), + batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt), ref => Ref, result => Result }), @@ -884,7 +981,7 @@ reply_after_query( ack -> ?tp(buffer_worker_reply_after_query, #{ action => Action, - batch_or_query => ?QUERY(From, Request, HasBeenSent), + batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt), ref => Ref, result => Result }), @@ -896,6 +993,34 @@ reply_after_query( end. batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) -> + ?tp( + buffer_worker_reply_after_query_enter, + #{batch_or_query => Batch, ref => Ref} + ), + Now = now_(), + case sieve_expired_requests(Batch, Now) of + all_expired -> + IsFullBefore = is_inflight_full(InflightTID), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), + IsFullBefore andalso ?MODULE:flush_worker(Pid), + ?tp(buffer_worker_reply_after_query_expired, #{expired => Batch}), + ok; + {NotExpired, Expired} -> + NumExpired = length(Expired), + emqx_resource_metrics:late_reply_inc(Id, NumExpired), + NumExpired > 0 andalso + ?tp(buffer_worker_reply_after_query_expired, #{expired => Expired}), + do_batch_reply_after_query( + Pid, Id, Index, InflightTID, Ref, NotExpired, QueryOpts, Result + ) + end. + +do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) -> + ?tp( + buffer_worker_reply_after_query_enter, + #{batch_or_query => Batch, ref => Ref} + ), %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. @@ -986,9 +1111,12 @@ inflight_new(InfltWinSZ, Id, Index) -> ), TableId. --spec inflight_get_first_retriable(ets:tid()) -> - none | {integer(), [?QUERY(_, _, _)] | ?QUERY(_, _, _)}. -inflight_get_first_retriable(InflightTID) -> +-spec inflight_get_first_retriable(ets:tid(), integer()) -> + none + | {expired, inflight_key(), [queue_query()]} + | {single, inflight_key(), queue_query()} + | {batch, inflight_key(), _NotExpired :: [queue_query()], _Expired :: [queue_query()]}. +inflight_get_first_retriable(InflightTID, Now) -> MatchSpec = ets:fun2ms( fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerMRef)) when @@ -1000,8 +1128,22 @@ inflight_get_first_retriable(InflightTID) -> case ets:select(InflightTID, MatchSpec, _Limit = 1) of '$end_of_table' -> none; - {[{Ref, BatchOrQuery}], _Continuation} -> - {Ref, BatchOrQuery} + {[{Ref, Query = ?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} -> + case is_expired(ExpireAt, Now) of + true -> + {expired, Ref, [Query]}; + false -> + {single, Ref, Query} + end; + {[{Ref, Batch = [_ | _]}], _Continuation} -> + %% batch is non-empty because we check that in + %% `sieve_expired_requests'. + case sieve_expired_requests(Batch, Now) of + all_expired -> + {expired, Ref, Batch}; + {NotExpired, Expired} -> + {batch, Ref, NotExpired, Expired} + end end. is_inflight_full(undefined) -> @@ -1030,7 +1172,7 @@ inflight_append(undefined, _InflightItem, _Id, _Index) -> ok; inflight_append( InflightTID, - ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, IsRetriable, WorkerMRef), + ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, WorkerMRef), Id, Index ) -> @@ -1044,7 +1186,9 @@ inflight_append( ok; inflight_append( InflightTID, - ?INFLIGHT_ITEM(Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, IsRetriable, WorkerMRef), + ?INFLIGHT_ITEM( + Ref, ?QUERY(_From, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef + ), Id, Index ) -> @@ -1106,9 +1250,9 @@ ack_inflight(undefined, _Ref, _Id, _Index) -> ack_inflight(InflightTID, Ref, Id, Index) -> Count = case ets:take(InflightTID, Ref) of - [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _), _IsRetriable, _WorkerMRef)] -> + [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] -> 1; - [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] -> + [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] -> length(Batch); _ -> 0 @@ -1133,6 +1277,12 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> ?tp(buffer_worker_worker_down_update, #{num_affected => _NumAffected}), ok. +%% used to update a batch after dropping expired individual queries. +update_inflight_item(InflightTID, Ref, NewBatch) -> + _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), + ?tp(buffer_worker_worker_update_inflight_item, #{ref => Ref}), + ok. + %%============================================================================== inc_sent_failed(Id, _HasBeenSent = true) -> @@ -1180,11 +1330,14 @@ clear_disk_queue_dir(Id, Index) -> Res end. -ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) -> +ensure_flush_timer(Data = #{batch_time := T}) -> + ensure_flush_timer(Data, T). + +ensure_flush_timer(Data = #{tref := undefined}, T) -> Ref = make_ref(), TRef = erlang:send_after(T, self(), {flush, Ref}), Data#{tref => {TRef, Ref}}; -ensure_flush_timer(Data) -> +ensure_flush_timer(Data, _T) -> Data. cancel_flush_timer(St = #{tref := undefined}) -> @@ -1195,7 +1348,7 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) -> -spec make_message_ref() -> inflight_key(). make_message_ref() -> - erlang:monotonic_time(nanosecond). + now_(). collect_requests(Acc, Limit) -> Count = length(Acc), @@ -1213,9 +1366,9 @@ do_collect_requests(Acc, Count, Limit) -> mark_as_sent(Batch) when is_list(Batch) -> lists:map(fun mark_as_sent/1, Batch); -mark_as_sent(?QUERY(From, Req, _)) -> +mark_as_sent(?QUERY(From, Req, _HasBeenSent, ExpireAt)) -> HasBeenSent = true, - ?QUERY(From, Req, HasBeenSent). + ?QUERY(From, Req, HasBeenSent, ExpireAt). is_unrecoverable_error({error, {unrecoverable_error, _}}) -> true; @@ -1235,3 +1388,49 @@ is_async_return({async_return, _}) -> true; is_async_return(_) -> false. + +sieve_expired_requests(Batch, Now) -> + {Expired, NotExpired} = + lists:partition( + fun(?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)) -> + is_expired(ExpireAt, Now) + end, + Batch + ), + case {NotExpired, Expired} of + {[], []} -> + %% Should be impossible for batch_size >= 1. + all_expired; + {[], [_ | _]} -> + all_expired; + {[_ | _], _} -> + {NotExpired, Expired} + end. + +-spec is_expired(infinity | integer(), integer()) -> boolean(). +is_expired(infinity = _ExpireAt, _Now) -> + false; +is_expired(ExpireAt, Now) -> + Now > ExpireAt. + +now_() -> + erlang:monotonic_time(nanosecond). + +-spec ensure_timeout_query_opts(query_opts(), sync | async) -> query_opts(). +ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) -> + Opts; +ensure_timeout_query_opts(#{} = Opts0, sync) -> + TimeoutMS = timer:seconds(15), + Opts0#{timeout => TimeoutMS}; +ensure_timeout_query_opts(#{} = Opts0, async) -> + Opts0#{timeout => infinity}. + +-spec ensure_expire_at(query_opts()) -> query_opts(). +ensure_expire_at(#{expire_at := _} = Opts) -> + Opts; +ensure_expire_at(#{timeout := infinity} = Opts) -> + Opts#{expire_at => infinity}; +ensure_expire_at(#{timeout := TimeoutMS} = Opts) -> + TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond), + ExpireAt = now_() + TimeoutNS, + Opts#{expire_at => ExpireAt}. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 95d1ed1d2..c01158b0a 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -134,8 +134,10 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> 'retried.success', 'retried.failed', 'success', + 'late_reply', 'failed', 'dropped', + 'dropped.expired', 'dropped.queue_full', 'dropped.resource_not_found', 'dropped.resource_stopped', diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index 455be9c22..28507e291 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -34,6 +34,9 @@ dropped_other_inc/1, dropped_other_inc/2, dropped_other_get/1, + dropped_expired_inc/1, + dropped_expired_inc/2, + dropped_expired_get/1, dropped_queue_full_inc/1, dropped_queue_full_inc/2, dropped_queue_full_get/1, @@ -46,6 +49,9 @@ failed_inc/1, failed_inc/2, failed_get/1, + late_reply_inc/1, + late_reply_inc/2, + late_reply_get/1, matched_inc/1, matched_inc/2, matched_get/1, @@ -75,9 +81,11 @@ events() -> [?TELEMETRY_PREFIX, Event] || Event <- [ dropped_other, + dropped_expired, dropped_queue_full, dropped_resource_not_found, dropped_resource_stopped, + late_reply, failed, inflight, matched, @@ -114,6 +122,9 @@ handle_telemetry_event( dropped_other -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val); + dropped_expired -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.expired', Val); dropped_queue_full -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val); @@ -123,6 +134,8 @@ handle_telemetry_event( dropped_resource_stopped -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val); + late_reply -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'late_reply', Val); failed -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val); matched -> @@ -211,6 +224,30 @@ dropped_other_inc(ID, Val) -> dropped_other_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other'). +%% @doc Count of messages dropped due to being expired before being sent. +dropped_expired_inc(ID) -> + dropped_expired_inc(ID, 1). + +dropped_expired_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, dropped_expired], #{counter_inc => Val}, #{ + resource_id => ID + }). + +dropped_expired_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.expired'). + +%% @doc Count of messages that were sent but received a late reply. +late_reply_inc(ID) -> + late_reply_inc(ID, 1). + +late_reply_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, late_reply], #{counter_inc => Val}, #{ + resource_id => ID + }). + +late_reply_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'late_reply'). + %% @doc Count of messages dropped because the queue was full dropped_queue_full_inc(ID) -> dropped_queue_full_inc(ID, 1). diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index c2b0c5733..4e3423808 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -260,7 +260,7 @@ counter_loop( ?tp(connector_demo_inc_counter_async, #{n => N}), State#{counter => Num + N}; {big_payload, _Payload, ReplyFun} when Status == blocked -> - apply_reply(ReplyFun, {error, blocked}), + apply_reply(ReplyFun, {error, {recoverable_error, blocked}}), State; {{FromPid, ReqRef}, {inc, N}} when Status == running -> %ct:pal("sync counter recv: ~p", [{inc, N}]), diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index c9325af2b..9b2af74f6 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -232,7 +232,7 @@ t_batch_query_counter(_) -> fun(Result, Trace) -> ?assertMatch({ok, 0}, Result), QueryTrace = ?of_kind(call_batch_query, Trace), - ?assertMatch([#{batch := [{query, _, get_counter, _}]}], QueryTrace) + ?assertMatch([#{batch := [{query, _, get_counter, _, _}]}], QueryTrace) end ), @@ -284,7 +284,7 @@ t_query_counter_async_query(_) -> fun(Trace) -> %% the callback_mode of 'emqx_connector_demo' is 'always_sync'. QueryTrace = ?of_kind(call_query, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) end ), %% simple query ignores the query_mode and batching settings in the resource_worker @@ -295,7 +295,7 @@ t_query_counter_async_query(_) -> ?assertMatch({ok, 1000}, Result), %% the callback_mode if 'emqx_connector_demo' is 'always_sync'. QueryTrace = ?of_kind(call_query, Trace), - ?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace) + ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace) end ), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), @@ -337,7 +337,7 @@ t_query_counter_async_callback(_) -> end, fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) end ), @@ -348,7 +348,7 @@ t_query_counter_async_callback(_) -> fun(Result, Trace) -> ?assertMatch({ok, 1000}, Result), QueryTrace = ?of_kind(call_query, Trace), - ?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace) + ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace) end ), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), @@ -419,7 +419,7 @@ t_query_counter_async_inflight(_) -> ), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) end ), tap_metrics(?LINE), @@ -476,7 +476,7 @@ t_query_counter_async_inflight(_) -> end, fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace), + ?assertMatch([#{query := {query, _, {inc_counter, _}, _, _}} | _], QueryTrace), ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), ok @@ -495,7 +495,7 @@ t_query_counter_async_inflight(_) -> ), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) end ), @@ -605,8 +605,8 @@ t_query_counter_async_inflight_batch(_) -> [ #{ batch := [ - {query, _, {inc_counter, 1}, _}, - {query, _, {inc_counter, 1}, _} + {query, _, {inc_counter, 1}, _, _}, + {query, _, {inc_counter, 1}, _, _} ] } | _ @@ -687,7 +687,7 @@ t_query_counter_async_inflight_batch(_) -> fun(Trace) -> QueryTrace = ?of_kind(call_batch_query_async, Trace), ?assertMatch( - [#{batch := [{query, _, {inc_counter, _}, _} | _]} | _], + [#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _], QueryTrace ) end @@ -712,7 +712,7 @@ t_query_counter_async_inflight_batch(_) -> fun(Trace) -> QueryTrace = ?of_kind(call_batch_query_async, Trace), ?assertMatch( - [#{batch := [{query, _, {inc_counter, _}, _} | _]} | _], + [#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _], QueryTrace ) end @@ -1499,9 +1499,680 @@ t_async_pool_worker_death(_Config) -> ), ok. +t_expiration_sync_before_sending(_Config) -> + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 1, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + do_t_expiration_before_sending(sync). + +t_expiration_sync_batch_before_sending(_Config) -> + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + do_t_expiration_before_sending(sync). + +t_expiration_async_before_sending(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 1, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + do_t_expiration_before_sending(async). + +t_expiration_async_batch_before_sending(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + do_t_expiration_before_sending(async). + +do_t_expiration_before_sending(QueryMode) -> + ?check_trace( + begin + ok = emqx_resource:simple_sync_query(?ID, block), + + ?force_ordering( + #{?snk_kind := buffer_worker_flush_before_pop}, + #{?snk_kind := delay_enter} + ), + ?force_ordering( + #{?snk_kind := delay}, + #{?snk_kind := buffer_worker_flush_before_sieve_expired} + ), + + TimeoutMS = 100, + spawn_link(fun() -> + case QueryMode of + sync -> + ?assertError( + timeout, + emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => TimeoutMS}) + ); + async -> + ?assertEqual( + ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => TimeoutMS}) + ) + end + end), + spawn_link(fun() -> + ?tp(delay_enter, #{}), + ct:sleep(2 * TimeoutMS), + ?tp(delay, #{}), + ok + end), + + {ok, _} = ?block_until(#{?snk_kind := buffer_worker_flush_all_expired}, 4 * TimeoutMS), + ok + end, + fun(Trace) -> + ?assertMatch( + [#{batch := [{query, _, {inc_counter, 99}, _, _}]}], + ?of_kind(buffer_worker_flush_all_expired, Trace) + ), + Metrics = tap_metrics(?LINE), + ?assertMatch( + #{ + counters := #{ + matched := 2, + %% the block call + success := 1, + dropped := 1, + 'dropped.expired' := 1, + retried := 0, + failed := 0 + } + }, + Metrics + ), + ok + end + ), + ok. + +t_expiration_sync_before_sending_partial_batch(_Config) -> + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + install_telemetry_handler(?FUNCTION_NAME), + do_t_expiration_before_sending_partial_batch(sync). + +t_expiration_async_before_sending_partial_batch(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + install_telemetry_handler(?FUNCTION_NAME), + do_t_expiration_before_sending_partial_batch(async). + +do_t_expiration_before_sending_partial_batch(QueryMode) -> + ?check_trace( + begin + ok = emqx_resource:simple_sync_query(?ID, block), + + ?force_ordering( + #{?snk_kind := buffer_worker_flush_before_pop}, + #{?snk_kind := delay_enter} + ), + ?force_ordering( + #{?snk_kind := delay}, + #{?snk_kind := buffer_worker_flush_before_sieve_expired} + ), + + Pid0 = + spawn_link(fun() -> + ?assertEqual( + ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity}) + ), + ?tp(infinity_query_returned, #{}) + end), + TimeoutMS = 100, + Pid1 = + spawn_link(fun() -> + case QueryMode of + sync -> + ?assertError( + timeout, + emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) + ); + async -> + ?assertEqual( + ok, + emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) + ) + end + end), + Pid2 = + spawn_link(fun() -> + ?tp(delay_enter, #{}), + ct:sleep(2 * TimeoutMS), + ?tp(delay, #{}), + ok + end), + + {ok, _} = ?block_until( + #{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS + ), + ok = emqx_resource:simple_sync_query(?ID, resume), + case QueryMode of + async -> + {ok, _} = ?block_until( + #{ + ?snk_kind := buffer_worker_reply_after_query, + action := ack, + batch_or_query := [{query, _, {inc_counter, 99}, _, _}] + }, + 10 * TimeoutMS + ); + sync -> + %% more time because it needs to retry if sync + {ok, _} = ?block_until(#{?snk_kind := infinity_query_returned}, 20 * TimeoutMS) + end, + + lists:foreach( + fun(Pid) -> + unlink(Pid), + exit(Pid, kill) + end, + [Pid0, Pid1, Pid2] + ), + ok + end, + fun(Trace) -> + ?assertMatch( + [ + #{ + expired := [{query, _, {inc_counter, 199}, _, _}], + not_expired := [{query, _, {inc_counter, 99}, _, _}] + } + ], + ?of_kind(buffer_worker_flush_potentially_partial, Trace) + ), + wait_until_gauge_is(inflight, 0, 500), + Metrics = tap_metrics(?LINE), + case QueryMode of + async -> + ?assertMatch( + #{ + counters := #{ + matched := 4, + %% the block call, the request with + %% infinity timeout, and the resume + %% call. + success := 3, + dropped := 1, + 'dropped.expired' := 1, + %% was sent successfully and held by + %% the test connector. + retried := 0, + failed := 0 + } + }, + Metrics + ); + sync -> + ?assertMatch( + #{ + counters := #{ + matched := 4, + %% the block call, the request with + %% infinity timeout, and the resume + %% call. + success := 3, + dropped := 1, + 'dropped.expired' := 1, + %% currently, the test connector + %% replies with an error that may make + %% the buffer worker retry. + retried := Retried, + failed := 0 + } + } when Retried =< 1, + Metrics + ) + end, + ok + end + ), + ok. + +t_expiration_async_after_reply(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 1, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + install_telemetry_handler(?FUNCTION_NAME), + do_t_expiration_async_after_reply(single). + +t_expiration_async_batch_after_reply(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 2_000 + } + ), + install_telemetry_handler(?FUNCTION_NAME), + do_t_expiration_async_after_reply(batch). + +do_t_expiration_async_after_reply(IsBatch) -> + ?check_trace( + begin + NAcks = + case IsBatch of + batch -> 1; + single -> 2 + end, + ?force_ordering( + #{?snk_kind := buffer_worker_flush_ack}, + NAcks, + #{?snk_kind := delay_enter}, + _Guard = true + ), + ?force_ordering( + #{?snk_kind := delay}, + #{ + ?snk_kind := buffer_worker_reply_after_query_enter, + batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] + } + ), + + TimeoutMS = 100, + ?assertEqual( + ok, + emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) + ), + ?assertEqual( + ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity}) + ), + Pid0 = + spawn_link(fun() -> + ?tp(delay_enter, #{}), + ct:sleep(2 * TimeoutMS), + ?tp(delay, #{}), + ok + end), + + {ok, _} = ?block_until( + #{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS + ), + {ok, _} = ?block_until( + #{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS + ), + + unlink(Pid0), + exit(Pid0, kill), + ok + end, + fun(Trace) -> + ?assertMatch( + [ + #{ + expired := [{query, _, {inc_counter, 199}, _, _}] + } + ], + ?of_kind(buffer_worker_reply_after_query_expired, Trace) + ), + wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}), + Metrics = tap_metrics(?LINE), + ?assertMatch( + #{ + counters := #{ + matched := 2, + %% the request with infinity timeout. + success := 1, + dropped := 0, + late_reply := 1, + retried := 0, + failed := 0 + } + }, + Metrics + ), + ok + end + ), + ok. + +t_expiration_batch_all_expired_after_reply(_Config) -> + ResumeInterval = 300, + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := buffer_worker_flush_ack}, + #{?snk_kind := delay_enter} + ), + ?force_ordering( + #{?snk_kind := delay}, + #{ + ?snk_kind := buffer_worker_reply_after_query_enter, + batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] + } + ), + + TimeoutMS = 200, + ?assertEqual( + ok, + emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) + ), + Pid0 = + spawn_link(fun() -> + ?tp(delay_enter, #{}), + ct:sleep(2 * TimeoutMS), + ?tp(delay, #{}), + ok + end), + + {ok, _} = ?block_until( + #{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS + ), + + unlink(Pid0), + exit(Pid0, kill), + ok + end, + fun(Trace) -> + ?assertMatch( + [ + #{ + expired := [{query, _, {inc_counter, 199}, _, _}] + } + ], + ?of_kind(buffer_worker_reply_after_query_expired, Trace) + ), + Metrics = tap_metrics(?LINE), + ?assertMatch( + #{ + counters := #{ + matched := 1, + success := 0, + dropped := 0, + late_reply := 1, + retried := 0, + failed := 0 + } + }, + Metrics + ), + ok + end + ), + ok. + +t_expiration_retry(_Config) -> + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 1, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 300 + } + ), + do_t_expiration_retry(single). + +t_expiration_retry_batch(_Config) -> + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => 300 + } + ), + do_t_expiration_retry(batch). + +do_t_expiration_retry(IsBatch) -> + ResumeInterval = 300, + ?check_trace( + begin + ok = emqx_resource:simple_sync_query(?ID, block), + + {ok, SRef0} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := buffer_worker_flush_nack}), + 1, + 200 + ), + TimeoutMS = 100, + %% the request that expires must be first, so it's the + %% head of the inflight table (and retriable). + {ok, SRef1} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := buffer_worker_appended_to_queue}), + 1, + ResumeInterval * 2 + ), + spawn_link(fun() -> + ?assertError( + timeout, + emqx_resource:query( + ?ID, + {inc_counter, 1}, + #{timeout => TimeoutMS} + ) + ) + end), + Pid1 = + spawn_link(fun() -> + receive + go -> ok + end, + ?assertEqual( + ok, + emqx_resource:query( + ?ID, + {inc_counter, 2}, + #{timeout => infinity} + ) + ) + end), + {ok, _} = snabbkaffe:receive_events(SRef1), + Pid1 ! go, + {ok, _} = snabbkaffe:receive_events(SRef0), + + {ok, _} = + ?block_until( + #{?snk_kind := buffer_worker_retry_expired}, + ResumeInterval * 10 + ), + + SuccessEventKind = + case IsBatch of + batch -> buffer_worker_retry_inflight_succeeded; + single -> buffer_worker_flush_ack + end, + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:simple_sync_query(?ID, resume), + #{?snk_kind := SuccessEventKind}, + ResumeInterval * 5 + ), + + ok + end, + fun(Trace) -> + ?assertMatch( + [#{expired := [{query, _, {inc_counter, 1}, _, _}]}], + ?of_kind(buffer_worker_retry_expired, Trace) + ), + ok + end + ), + ok. + +t_expiration_retry_batch_multiple_times(_Config) -> + ResumeInterval = 300, + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 2, + batch_time => 100, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + ?check_trace( + begin + ok = emqx_resource:simple_sync_query(?ID, block), + + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := buffer_worker_flush_nack}), + 1, + 200 + ), + TimeoutMS = 100, + spawn_link(fun() -> + ?assertError( + timeout, + emqx_resource:query( + ?ID, + {inc_counter, 1}, + #{timeout => TimeoutMS} + ) + ) + end), + spawn_link(fun() -> + ?assertError( + timeout, + emqx_resource:query( + ?ID, + {inc_counter, 2}, + #{timeout => ResumeInterval + TimeoutMS} + ) + ) + end), + {ok, _} = snabbkaffe:receive_events(SRef), + + {ok, _} = + snabbkaffe:block_until( + ?match_n_events(2, #{?snk_kind := buffer_worker_retry_expired}), + ResumeInterval * 10 + ), + + ok + end, + fun(Trace) -> + ?assertMatch( + [ + #{expired := [{query, _, {inc_counter, 1}, _, _}]}, + #{expired := [{query, _, {inc_counter, 2}, _, _}]} + ], + ?of_kind(buffer_worker_retry_expired, Trace) + ), + ok + end + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ + inc_counter_in_parallel(N) -> inc_counter_in_parallel(N, #{}). @@ -1564,6 +2235,81 @@ tap_metrics(Line) -> ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]), #{counters => C, gauges => G}. +install_telemetry_handler(TestCase) -> + Tid = ets:new(TestCase, [ordered_set, public]), + HandlerId = TestCase, + TestPid = self(), + _ = telemetry:attach_many( + HandlerId, + emqx_resource_metrics:events(), + fun(EventName, Measurements, Metadata, _Config) -> + Data = #{ + name => EventName, + measurements => Measurements, + metadata => Metadata + }, + ets:insert(Tid, {erlang:monotonic_time(), Data}), + TestPid ! {telemetry, Data}, + ok + end, + unused_config + ), + on_exit(fun() -> + telemetry:detach(HandlerId), + ets:delete(Tid) + end), + put({?MODULE, telemetry_table}, Tid), + Tid. + +wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> + Events = receive_all_events(GaugeName, Timeout), + case length(Events) > 0 andalso lists:last(Events) of + #{measurements := #{gauge_set := ExpectedValue}} -> + ok; + #{measurements := #{gauge_set := Value}} -> + ct:fail( + "gauge ~p didn't reach expected value ~p; last value: ~p", + [GaugeName, ExpectedValue, Value] + ); + false -> + ct:pal("no ~p gauge events received!", [GaugeName]) + end. + +receive_all_events(EventName, Timeout) -> + receive_all_events(EventName, Timeout, []). + +receive_all_events(EventName, Timeout, Acc) -> + receive + {telemetry, #{name := [_, _, EventName]} = Event} -> + receive_all_events(EventName, Timeout, [Event | Acc]) + after Timeout -> + lists:reverse(Acc) + end. + +wait_telemetry_event(EventName) -> + wait_telemetry_event(EventName, #{timeout => 5_000, n_events => 1}). + +wait_telemetry_event( + EventName, + Opts0 +) -> + DefaultOpts = #{timeout => 5_000, n_events => 1}, + #{timeout := Timeout, n_events := NEvents} = maps:merge(DefaultOpts, Opts0), + wait_n_events(NEvents, Timeout, EventName). + +wait_n_events(NEvents, _Timeout, _EventName) when NEvents =< 0 -> + ok; +wait_n_events(NEvents, Timeout, EventName) -> + TelemetryTable = get({?MODULE, telemetry_table}), + receive + {telemetry, #{name := [_, _, EventName]}} -> + wait_n_events(NEvents - 1, Timeout, EventName) + after Timeout -> + RecordedEvents = ets:tab2list(TelemetryTable), + ct:pal("recorded events: ~p", [RecordedEvents]), + error({timeout_waiting_for_telemetry, EventName}) + end. + assert_sync_retry_fail_then_succeed_inflight(Trace) -> ct:pal(" ~p", [Trace]), ?assert( diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 47a4646de..c9560739f 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -350,10 +350,12 @@ service_account_json(PrivateKeyPEM) -> metrics_mapping() -> #{ dropped => fun emqx_resource_metrics:dropped_get/1, + dropped_expired => fun emqx_resource_metrics:dropped_expired_get/1, dropped_other => fun emqx_resource_metrics:dropped_other_get/1, dropped_queue_full => fun emqx_resource_metrics:dropped_queue_full_get/1, dropped_resource_not_found => fun emqx_resource_metrics:dropped_resource_not_found_get/1, dropped_resource_stopped => fun emqx_resource_metrics:dropped_resource_stopped_get/1, + late_reply => fun emqx_resource_metrics:late_reply_get/1, failed => fun emqx_resource_metrics:failed_get/1, inflight => fun emqx_resource_metrics:inflight_get/1, matched => fun emqx_resource_metrics:matched_get/1, @@ -1117,9 +1119,6 @@ do_econnrefused_or_timeout_test(Config, Error) -> CurrentMetrics ); {timeout, async} -> - wait_telemetry_event(TelemetryTable, success, ResourceId, #{ - timeout => 10_000, n_events => 2 - }), wait_until_gauge_is(inflight, 0, _Timeout = 400), wait_until_gauge_is(queuing, 0, _Timeout = 400), assert_metrics( @@ -1130,7 +1129,8 @@ do_econnrefused_or_timeout_test(Config, Error) -> matched => 2, queuing => 0, retried => 0, - success => 2 + success => 0, + late_reply => 2 }, ResourceId ); diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl index 041bdec08..898c36fe0 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl @@ -58,7 +58,6 @@ %% emqx_resource API %%------------------------------------------------------------------------------------------------- -%% TODO: check is_buffer_supported() -> false. callback_mode() -> async_if_possible.