diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 5f33714ef..ed5897a59 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -382,15 +382,6 @@ on_query_async( {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]} ). -reply_delegator(ReplyFunAndArgs, Result) -> - case Result of - {error, Reason} when Reason =:= econnrefused; Reason =:= timeout -> - Result1 = {error, {recoverable_error, Reason}}, - emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); - _ -> - emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) - end. - on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> case do_get_status(PoolName, Timeout) of true -> @@ -544,3 +535,12 @@ bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). + +reply_delegator(ReplyFunAndArgs, Result) -> + case Result of + {error, Reason} when Reason =:= econnrefused; Reason =:= timeout -> + Result1 = {error, {recoverable_error, Reason}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); + _ -> + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) + end. diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index a0c7ebcdb..c5841b7ef 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -154,12 +154,12 @@ running(enter, _, _St) -> running(cast, resume, _St) -> keep_state_and_data; running(cast, block, St) -> - {next_state, block, St}; + {next_state, blocked, St}; running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when is_list(Batch) -> Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), - {next_state, block, St#{queue := Q1}}; + {next_state, blocked, St#{queue := Q1}}; running({call, From}, {query, Request, _Opts}, St) -> query_or_acc(From, Request, St); running(cast, {query, Request, Opts}, St) -> @@ -366,7 +366,7 @@ reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) -> reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) -> _ = case Result of - {async_return, _} -> ok; + {async_return, _} -> no_reply_for_now; _ -> apply(ReplyFun, Args ++ [Result]) end, handle_query_result(Id, Result, BlockWorker); @@ -395,6 +395,9 @@ handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), BlockWorker; handle_query_result(Id, {error, {recoverable_error, _}}, _BlockWorker) -> + %% the message will be queued in replayq or inflight window, + %% i.e. the counter 'queuing' will increase, so we pretend that we have not + %% sent this message. emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1), true; handle_query_result(Id, {error, _}, BlockWorker) -> @@ -450,15 +453,10 @@ call_query(QM0, Id, Query, QueryOpts) -> apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), - ?APPLY_RESOURCE( - begin - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), - Result = Mod:on_query(Id, Request, ResSt), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), - Result - end, - Request - ); + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), + Result = ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), + Result; apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), @@ -483,16 +481,12 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request) <- Batch], - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)), - ?APPLY_RESOURCE( - begin - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), - Result = Mod:on_batch_query(Id, Requests, ResSt), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), - Result - end, - Batch - ); + BatchLen = length(Batch), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen), + Result = ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen), + Result; apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), @@ -503,8 +497,9 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), + BatchLen = length(Batch), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen), ReplyFun = fun ?MODULE:batch_reply_after_query/6, Ref = make_message_ref(), Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, @@ -517,20 +512,29 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ). reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) -> + %% NOTE: 'sent.inflight' is message count that sent but no ACK received, + %% NOT the message number ququed in the inflight window. + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), case reply_caller(Id, ?REPLY(From, Request, Result)) of true -> + %% we marked these messages are 'queuing' although they are in inflight window + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), ?MODULE:block(Pid); false -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), inflight_drop(Name, Ref) end. batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> + %% NOTE: 'sent.inflight' is message count that sent but no ACK received, + %% NOT the message number ququed in the inflight window. + BatchLen = length(Batch), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen), case batch_reply_caller(Id, Result, Batch) of true -> + %% we marked these messages are 'queuing' although they are in inflight window + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen), ?MODULE:block(Pid); false -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -length(Batch)), inflight_drop(Name, Ref) end. %%============================================================================== diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 4999b9410..a645af7d8 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -96,7 +96,7 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) -> Pid ! {From, {inc, N}}, receive {ReqRef, ok} -> ok; - {ReqRef, incorrect_status} -> {recoverable_error, incorrect_status} + {ReqRef, incorrect_status} -> {error, {recoverable_error, incorrect_status}} after 1000 -> {error, timeout} end;