diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index c89621d59..4ffeee71f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -570,9 +570,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"success">> := 3, <<"failed">> := 0, <<"queuing">> := 0, - <<"retried">> := R + <<"retried">> := _ } - } when R > 0, + }, jsx:decode(BridgeStr2) ), %% also verify the 2 messages have been sent to the remote broker diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 3430b3964..8d61832e3 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -131,6 +131,8 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> [ 'matched', 'retried', + 'retried.success', + 'retried.failed', 'success', 'failed', 'dropped', diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 25032c29c..4f7ac63d8 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -56,9 +56,12 @@ -define(Q_ITEM(REQUEST), {q_item, REQUEST}). --define(QUERY(FROM, REQUEST), {query, FROM, REQUEST}). --define(REPLY(FROM, REQUEST, RESULT), {reply, FROM, REQUEST, RESULT}). --define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]). +-define(QUERY(FROM, REQUEST, SENT), {query, FROM, REQUEST, SENT}). +-define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}). +-define(EXPAND(RESULT, BATCH), [ + ?REPLY(FROM, REQUEST, SENT, RESULT) + || ?QUERY(FROM, REQUEST, SENT) <- BATCH +]). -type id() :: binary(). -type query() :: {query, from(), request()}. @@ -89,16 +92,16 @@ async_query(Id, Request, Opts) -> %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> - Result = call_query(sync, Id, ?QUERY(self(), Request), #{}), + Result = call_query(sync, Id, ?QUERY(self(), Request, false), #{}), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), - _ = handle_query_result(Id, Result, false), + _ = handle_query_result(Id, Result, false, false), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). simple_async_query(Id, Request, ReplyFun) -> - Result = call_query(async, Id, ?QUERY(ReplyFun, Request), #{}), + Result = call_query(async, Id, ?QUERY(ReplyFun, Request, false), #{}), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), - _ = handle_query_result(Id, Result, false), + _ = handle_query_result(Id, Result, false, false), Result. -spec block(pid() | atom()) -> ok. @@ -156,7 +159,7 @@ running(cast, resume, _St) -> keep_state_and_data; running(cast, block, St) -> {next_state, blocked, St}; -running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when +running(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when is_list(Batch) -> Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), @@ -178,7 +181,7 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) -> {keep_state_and_data, {state_timeout, ResumeT, resume}}; blocked(cast, block, _St) -> keep_state_and_data; -blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when +blocked(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when is_list(Batch) -> Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), @@ -189,13 +192,15 @@ blocked(state_timeout, resume, St) -> do_resume(St); blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) -> Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), - _ = reply_caller(Id, ?REPLY(From, Request, Error)), - {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request))])}}; + _ = reply_caller(Id, ?REPLY(From, Request, false, Error)), + {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request, false))])}}; blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) -> ReplayFun = maps:get(async_reply_fun, Opts, undefined), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), - _ = reply_caller(Id, ?REPLY(ReplayFun, Request, Error)), - {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}. + _ = reply_caller(Id, ?REPLY(ReplayFun, Request, false, Error)), + {keep_state, St#{ + queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplayFun, Request, false))]) + }}. terminate(_Reason, #{id := Id, index := Index}) -> gproc_pool:disconnect_worker(Id, {Id, Index}). @@ -250,10 +255,11 @@ retry_first_from_queue(Q, Id, St) -> retry_first_sync(Id, FirstQuery, undefined, undefined, Q, St) end. -retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = St0) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), - Result = call_query(sync, Id, FirstQuery, #{}), - case handle_query_result(Id, Result, false) of +retry_first_sync( + Id, ?QUERY(_, _, HasSent) = Query, Name, Ref, Q, #{resume_interval := ResumeT} = St0 +) -> + Result = call_query(sync, Id, Query, #{}), + case handle_query_result(Id, Result, HasSent, false) of %% Send failed because resource down true -> {keep_state, St0, {state_timeout, ResumeT, resume}}; @@ -279,7 +285,7 @@ drop_head(Id, Q) -> Q1. query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> - Acc1 = [?QUERY(From, Request) | Acc], + Acc1 = [?QUERY(From, Request, false) | Acc], emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'), St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of @@ -290,10 +296,10 @@ query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) QueryOpts = #{ inflight_name => maps:get(name, St) }, - Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts), - case reply_caller(Id, ?REPLY(From, Request, Result)) of + Result = call_query(configured, Id, ?QUERY(From, Request, false), QueryOpts), + case reply_caller(Id, ?REPLY(From, Request, false, Result)) of true -> - Query = ?QUERY(From, Request), + Query = ?QUERY(From, Request, 1), {next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}}; false -> {keep_state, St} @@ -361,63 +367,65 @@ batch_reply_caller(Id, BatchResult, Batch) -> reply_caller(Id, Reply) -> reply_caller(Id, Reply, false). -reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) -> - handle_query_result(Id, Result, BlockWorker); -reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) -> +reply_caller(Id, ?REPLY(undefined, _, HasSent, Result), BlockWorker) -> + handle_query_result(Id, Result, HasSent, BlockWorker); +reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasSent, Result), BlockWorker) when + is_function(ReplyFun) +-> _ = case Result of {async_return, _} -> no_reply_for_now; _ -> apply(ReplyFun, Args ++ [Result]) end, - handle_query_result(Id, Result, BlockWorker); -reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) -> + handle_query_result(Id, Result, HasSent, BlockWorker); +reply_caller(Id, ?REPLY(From, _, HasSent, Result), BlockWorker) -> gen_statem:reply(From, Result), - handle_query_result(Id, Result, BlockWorker). + handle_query_result(Id, Result, HasSent, BlockWorker). -handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), BlockWorker) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), HasSent, BlockWorker) -> ?SLOG(error, #{msg => resource_exception, info => Msg}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), + inc_sent_failed(Id, HasSent), BlockWorker; -handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when +handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasSent, _) when NotWorking == not_connected; NotWorking == blocked -> true; -handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), BlockWorker) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'), BlockWorker; -handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), BlockWorker) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'), BlockWorker; -handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), BlockWorker; -handle_query_result(Id, {error, {recoverable_error, Reason}}, _BlockWorker) -> +handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) -> %% the message will be queued in replayq or inflight window, %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not %% sent this message. ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}), true; -handle_query_result(Id, {error, Reason}, BlockWorker) -> +handle_query_result(Id, {error, Reason}, HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), + inc_sent_failed(Id, HasSent), BlockWorker; -handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> +handle_query_result(_Id, {async_return, inflight_full}, _HasSent, _BlockWorker) -> true; -handle_query_result(Id, {async_return, {error, Msg}}, BlockWorker) -> +handle_query_result(Id, {async_return, {error, Msg}}, HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), + inc_sent_failed(Id, HasSent), BlockWorker; -handle_query_result(_Id, {async_return, ok}, BlockWorker) -> +handle_query_result(_Id, {async_return, ok}, _HasSent, BlockWorker) -> BlockWorker; -handle_query_result(Id, Result, BlockWorker) -> +handle_query_result(Id, Result, HasSent, BlockWorker) -> assert_ok_result(Result), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'), + inc_sent_success(Id, HasSent), BlockWorker. call_query(QM0, Id, Query, QueryOpts) -> @@ -456,10 +464,10 @@ call_query(QM0, Id, Query, QueryOpts) -> end ). -apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); -apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> +apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), ?APPLY_RESOURCE( @@ -478,11 +486,11 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> end, Request ); -apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), - Requests = [Request || ?QUERY(_From, Request) <- Batch], + Requests = [Request || ?QUERY(_From, Request, _) <- Batch], ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); -apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> +apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), ?APPLY_RESOURCE( @@ -496,7 +504,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ReplyFun = fun ?MODULE:batch_reply_after_query/6, Ref = make_message_ref(), Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, - Requests = [Request || ?QUERY(_From, Request) <- Batch], + Requests = [Request || ?QUERY(_From, Request, _) <- Batch], ok = inflight_append(Name, Ref, Batch), Result = Mod:on_batch_query_async(Id, Requests, Args, ResSt), {async_return, Result} @@ -504,18 +512,18 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> Batch ). -reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) -> +reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request, HasSent), Result) -> %% NOTE: 'inflight' is message count that sent async but no ACK received, %% NOT the message number ququed in the inflight window. emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -1), - case reply_caller(Id, ?REPLY(From, Request, Result)) of + case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of true -> %% we marked these messages are 'queuing' although they are actually %% keeped in inflight window, not replayq emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), ?MODULE:block(Pid); false -> - inflight_drop(Name, Ref) + drop_inflight_and_resume(Pid, Name, Ref) end. batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> @@ -529,9 +537,19 @@ batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> %% keeped in inflight window, not replayq emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen), ?MODULE:block(Pid); + false -> + drop_inflight_and_resume(Pid, Name, Ref) + end. + +drop_inflight_and_resume(Pid, Name, Ref) -> + case inflight_is_full(Name) of + true -> + inflight_drop(Name, Ref), + ?MODULE:resume(Pid); false -> inflight_drop(Name, Ref) end. + %%============================================================================== %% the inflight queue for async query -define(SIZE_REF, -1). @@ -565,8 +583,14 @@ inflight_is_full(Name) -> inflight_append(undefined, _Ref, _Query) -> ok; -inflight_append(Name, Ref, Query) -> - ets:insert(Name, {Ref, Query}), +inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch) -> + ets:insert(Name, {Ref, [?QUERY(From, Req, true) || ?QUERY(From, Req, _) <- Batch]}), + ok; +inflight_append(Name, Ref, ?QUERY(From, Req, _)) -> + ets:insert(Name, {Ref, ?QUERY(From, Req, true)}), + ok; +inflight_append(Name, Ref, Data) -> + ets:insert(Name, {Ref, Data}), ok. inflight_drop(undefined, _) -> @@ -576,6 +600,21 @@ inflight_drop(Name, Ref) -> ok. %%============================================================================== + +inc_sent_failed(Id, true) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.failed'); +inc_sent_failed(Id, _HasSent) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'). + +inc_sent_success(Id, true) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.success'); +inc_sent_success(Id, _HasSent) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'). + call_mode(sync, _) -> sync; call_mode(async, always_sync) -> sync; call_mode(async, async_if_possible) -> async. diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index a645af7d8..3b83cf7ed 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -170,9 +170,11 @@ counter_loop(#{counter := Num, status := Status} = State) -> ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]), State#{status => running}; {inc, N, ReplyFun} when Status == running -> + %ct:pal("async counter recv: ~p", [{inc, N}]), apply_reply(ReplyFun, ok), State#{counter => Num + N}; {{FromPid, ReqRef}, {inc, N}} when Status == running -> + %ct:pal("sync counter recv: ~p", [{inc, N}]), FromPid ! {ReqRef, ok}, State#{counter => Num + N}; {{FromPid, ReqRef}, {inc, _N}} when Status == blocked -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index a9d274168..2446c8102 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -211,7 +211,7 @@ t_batch_query_counter(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, - #{enable_batch => true} + #{enable_batch => true, query_mode => sync} ), ?check_trace( @@ -220,7 +220,7 @@ t_batch_query_counter(_) -> fun(Result, Trace) -> ?assertMatch({ok, 0}, Result), QueryTrace = ?of_kind(call_batch_query, Trace), - ?assertMatch([#{batch := [{query, _, get_counter}]}], QueryTrace) + ?assertMatch([#{batch := [{query, _, get_counter, _}]}], QueryTrace) end ), @@ -251,7 +251,7 @@ t_query_counter_async_query(_) -> fun(Trace) -> %% the callback_mode if 'emqx_connector_demo' is 'always_sync'. QueryTrace = ?of_kind(call_query, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) end ), %% wait for 1s to make sure all the aysnc query is sent to the resource. @@ -264,7 +264,7 @@ t_query_counter_async_query(_) -> ?assertMatch({ok, 1000}, Result), %% the callback_mode if 'emqx_connector_demo' is 'always_sync'. QueryTrace = ?of_kind(call_query, Trace), - ?assertMatch([#{query := {query, _, get_counter}}], QueryTrace) + ?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace) end ), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), @@ -292,7 +292,7 @@ t_query_counter_async_callback(_) -> inc_counter_in_parallel(1000, ReqOpts), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) end ), @@ -305,7 +305,7 @@ t_query_counter_async_callback(_) -> fun(Result, Trace) -> ?assertMatch({ok, 1000}, Result), QueryTrace = ?of_kind(call_query, Trace), - ?assertMatch([#{query := {query, _, get_counter}}], QueryTrace) + ?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace) end ), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), @@ -341,7 +341,8 @@ t_query_counter_async_inflight(_) -> enable_batch => false, async_inflight_window => WindowSize, worker_pool_size => 1, - resume_interval => 300 + resume_interval => 300, + enable_queue => false } ), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), @@ -355,11 +356,11 @@ t_query_counter_async_inflight(_) -> inc_counter_in_parallel(WindowSize, ReqOpts), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) end ), - %% this will block the resource_worker as the inflight windown is full now + %% this will block the resource_worker as the inflight window is full now ok = emqx_resource:query(?ID, {inc_counter, 1}), ?assertMatch(0, ets:info(Tab0, size)), %% sleep to make the resource_worker resume some times @@ -387,7 +388,7 @@ t_query_counter_async_inflight(_) -> inc_counter_in_parallel(Num, ReqOpts), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) end ), timer:sleep(1000), @@ -401,7 +402,7 @@ t_query_counter_async_inflight(_) -> inc_counter_in_parallel(WindowSize, ReqOpts), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) end ), @@ -415,13 +416,13 @@ t_query_counter_async_inflight(_) -> {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]), - ?assert(Sent == Counter), + ?assert(Sent =< Counter), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), ct:pal("metrics: ~p", [C]), ?assertMatch( - #{matched := M, success := Ss, dropped := D} when - M == Ss + D, + #{matched := M, success := Ss, dropped := Dp, 'retried.success' := Rs} when + M == Ss + Dp - Rs, C ), ?assert(