diff --git a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf index 06cc41a91..de9e10e74 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf @@ -78,7 +78,7 @@ emqx_bridge_schema { } } - metric_batched { + metric_batching { desc { en: """Count of messages that are currently accumulated in memory waiting for sending in one batch.""" zh: """当前积压在内存里,等待批量发送的消息个数""" @@ -161,9 +161,9 @@ emqx_bridge_schema { } } - metric_queued { + metric_queuing { desc { - en: """Count of messages that are currently queued.""" + en: """Count of messages that are currently queuing.""" zh: """当前被缓存到磁盘队列的消息个数。""" } label: { @@ -181,16 +181,6 @@ emqx_bridge_schema { zh: "已发送" } } - metric_sent_exception { - desc { - en: """Count of messages that were sent but exceptions occur.""" - zh: """发送出现异常的消息个数。""" - } - label: { - en: "Sent Exception" - zh: "发送异常" - } - } metric_sent_failed { desc { diff --git a/apps/emqx_bridge/include/emqx_bridge.hrl b/apps/emqx_bridge/include/emqx_bridge.hrl index bb8ee6e29..2b64dba70 100644 --- a/apps/emqx_bridge/include/emqx_bridge.hrl +++ b/apps/emqx_bridge/include/emqx_bridge.hrl @@ -1,6 +1,6 @@ -define(EMPTY_METRICS, ?METRICS( - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ) ). @@ -15,7 +15,6 @@ Matched, Queued, Sent, - SentExcpt, SentFailed, SentInflight, SentSucc, @@ -25,7 +24,7 @@ Rcvd ), #{ - 'batched' => Batched, + 'batching' => Batched, 'dropped' => Dropped, 'dropped.other' => DroppedOther, 'dropped.queue_full' => DroppedQueueFull, @@ -33,9 +32,8 @@ 'dropped.resource_not_found' => DroppedResourceNotFound, 'dropped.resource_stopped' => DroppedResourceStopped, 'matched' => Matched, - 'queued' => Queued, + 'queuing' => Queued, 'sent' => Sent, - 'sent.exception' => SentExcpt, 'sent.failed' => SentFailed, 'sent.inflight' => SentInflight, 'sent.success' => SentSucc, @@ -57,7 +55,6 @@ Matched, Queued, Sent, - SentExcpt, SentFailed, SentInflight, SentSucc, @@ -67,7 +64,7 @@ Rcvd ), #{ - 'batched' := Batched, + 'batching' := Batched, 'dropped' := Dropped, 'dropped.other' := DroppedOther, 'dropped.queue_full' := DroppedQueueFull, @@ -75,9 +72,8 @@ 'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, - 'queued' := Queued, + 'queuing' := Queued, 'sent' := Sent, - 'sent.exception' := SentExcpt, 'sent.failed' := SentFailed, 'sent.inflight' := SentInflight, 'sent.success' := SentSucc, diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index bd892edcd..aa6a6af89 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -206,7 +206,7 @@ info_example_basic(webhook) -> worker_pool_size => 1, health_check_interval => 15000, auto_restart_interval => 15000, - query_mode => sync, + query_mode => async, async_inflight_window => 100, enable_queue => true, max_queue_bytes => 1024 * 1024 * 1024 @@ -624,11 +624,11 @@ aggregate_metrics(AllMetrics) -> fun( #{ metrics := ?metrics( - M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17, M18 + M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17 ) }, ?metrics( - N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17, N18 + N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17 ) ) -> ?METRICS( @@ -648,8 +648,7 @@ aggregate_metrics(AllMetrics) -> M14 + N14, M15 + N15, M16 + N16, - M17 + N17, - M18 + N18 + M17 + N17 ) end, InitMetrics, @@ -679,7 +678,7 @@ format_resp( format_metrics(#{ counters := #{ - 'batched' := Batched, + 'batching' := Batched, 'dropped' := Dropped, 'dropped.other' := DroppedOther, 'dropped.queue_full' := DroppedQueueFull, @@ -687,9 +686,8 @@ format_metrics(#{ 'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, - 'queued' := Queued, + 'queuing' := Queued, 'sent' := Sent, - 'sent.exception' := SentExcpt, 'sent.failed' := SentFailed, 'sent.inflight' := SentInflight, 'sent.success' := SentSucc, @@ -710,7 +708,6 @@ format_metrics(#{ Matched, Queued, Sent, - SentExcpt, SentFailed, SentInflight, SentSucc, diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index f55ac840e..1a20da6d6 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -102,7 +102,7 @@ fields(bridges) -> ] ++ ee_fields_bridges(); fields("metrics") -> [ - {"batched", mk(integer(), #{desc => ?DESC("metric_batched")})}, + {"batching", mk(integer(), #{desc => ?DESC("metric_batching")})}, {"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})}, {"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})}, {"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})}, @@ -113,9 +113,8 @@ fields("metrics") -> {"dropped.resource_stopped", mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})}, {"matched", mk(integer(), #{desc => ?DESC("metric_matched")})}, - {"queued", mk(integer(), #{desc => ?DESC("metric_queued")})}, + {"queuing", mk(integer(), #{desc => ?DESC("metric_queuing")})}, {"sent", mk(integer(), #{desc => ?DESC("metric_sent")})}, - {"sent.exception", mk(integer(), #{desc => ?DESC("metric_sent_exception")})}, {"sent.failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})}, {"sent.inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})}, {"sent.success", mk(integer(), #{desc => ?DESC("metric_sent_success")})}, diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 2562bf272..ed5897a59 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -31,7 +31,8 @@ on_stop/2, on_query/3, on_query_async/4, - on_get_status/2 + on_get_status/2, + reply_delegator/2 ]). -type url() :: emqx_http_lib:uri_map(). @@ -46,7 +47,7 @@ namespace/0 ]). --export([check_ssl_opts/2]). +-export([check_ssl_opts/2, validate_method/1]). -type connect_timeout() :: emqx_schema:duration() | infinity. -type pool_type() :: random | hash. @@ -137,8 +138,10 @@ fields(config) -> fields("request") -> [ {method, - hoconsc:mk(hoconsc:enum([post, put, get, delete]), #{ - required => false, desc => ?DESC("method") + hoconsc:mk(binary(), #{ + required => false, + desc => ?DESC("method"), + validator => fun ?MODULE:validate_method/1 })}, {path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})}, {body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})}, @@ -171,6 +174,17 @@ desc(_) -> validations() -> [{check_ssl_opts, fun check_ssl_opts/1}]. +validate_method(M) when M =:= <<"post">>; M =:= <<"put">>; M =:= <<"get">>; M =:= <<"delete">> -> + ok; +validate_method(M) -> + case string:find(M, "${") of + nomatch -> + {error, + <<"Invalid method, should be one of 'post', 'put', 'get', 'delete' or variables in ${field} format.">>}; + _ -> + ok + end. + sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field). @@ -286,13 +300,13 @@ on_query( Retry ) of - {error, econnrefused} -> + {error, Reason} when Reason =:= econnrefused; Reason =:= timeout -> ?SLOG(warning, #{ msg => "http_connector_do_request_failed", - reason => econnrefused, + reason => Reason, connector => InstId }), - {recoverable_error, econnrefused}; + {error, {recoverable_error, Reason}}; {error, Reason} = Result -> ?SLOG(error, #{ msg => "http_connector_do_request_failed", @@ -365,7 +379,7 @@ on_query_async( Method, NRequest, Timeout, - ReplyFunAndArgs + {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]} ). on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> @@ -521,3 +535,12 @@ bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). + +reply_delegator(ReplyFunAndArgs, Result) -> + case Result of + {error, Reason} when Reason =:= econnrefused; Reason =:= timeout -> + Result1 = {error, {recoverable_error, Reason}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); + _ -> + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) + end. diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index b9c200316..802427134 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -420,7 +420,7 @@ on_sql_query( error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} ), - {recoverable_error, Reason}; + {error, {recoverable_error, Reason}}; {error, Reason} -> ?SLOG( error, diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 2409a7069..aab0129d1 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -76,8 +76,8 @@ -type query_result() :: ok | {ok, term()} - | {error, term()} - | {recoverable_error, term()}. + | {error, {recoverable_error, term()}} + | {error, term()}. -define(WORKER_POOL_SIZE, 16). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index c1d500e8b..309c34195 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -110,7 +110,7 @@ list_group_instances/1 ]). --export([inc_received/1]). +-export([inc_received/1, apply_reply_fun/2]). -optional_callbacks([ on_query/3, @@ -441,6 +441,12 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> Error -> Error end. +apply_reply_fun({F, A}, Result) when is_function(F) -> + _ = erlang:apply(F, A ++ [Result]), + ok; +apply_reply_fun(From, Result) -> + gen_server:reply(From, Result). + %% ================================================================================= inc_received(ResId) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 2581a3001..c8c097c30 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -132,11 +132,10 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> 'matched', 'sent', 'dropped', - 'queued', - 'batched', + 'queuing', + 'batching', 'sent.success', 'sent.failed', - 'sent.exception', 'sent.inflight', 'dropped.queue_not_enabled', 'dropped.queue_full', diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 3a12a6b91..c5841b7ef 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -131,7 +131,7 @@ init({Id, Index, Opts}) -> false -> undefined end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', queue_count(Queue)), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)), ok = inflight_new(Name), St = #{ id => Id, @@ -154,12 +154,12 @@ running(enter, _, _St) -> running(cast, resume, _St) -> keep_state_and_data; running(cast, block, St) -> - {next_state, block, St}; + {next_state, blocked, St}; running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when is_list(Batch) -> Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), - {next_state, block, St#{queue := Q1}}; + {next_state, blocked, St#{queue := Q1}}; running({call, From}, {query, Request, _Opts}, St) -> query_or_acc(From, Request, St); running(cast, {query, Request, Opts}, St) -> @@ -273,12 +273,12 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S drop_head(Id, Q) -> {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), ok = replayq:ack(Q1, AckRef), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -1), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1), Q1. query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> Acc1 = [?QUERY(From, Request) | Acc], - emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'), St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); @@ -303,16 +303,17 @@ flush(#{acc := []} = St) -> flush( #{ id := Id, - acc := Batch, + acc := Batch0, batch_size := Size, queue := Q0 } = St ) -> + Batch = lists:reverse(Batch0), QueryOpts = #{ inflight_name => maps:get(name, St), inflight_window => maps:get(async_inflight_window, St) }, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched', -length(Batch)), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)), Result = call_query(configured, Id, Batch, QueryOpts), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of @@ -337,13 +338,13 @@ maybe_append_queue(Id, Q, Items) -> {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), replayq:append(Q2, Items). batch_reply_caller(Id, BatchResult, Batch) -> @@ -365,7 +366,7 @@ reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) -> reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) -> _ = case Result of - {async_return, _} -> ok; + {async_return, _} -> no_reply_for_now; _ -> apply(ReplyFun, Args ++ [Result]) end, handle_query_result(Id, Result, BlockWorker); @@ -374,7 +375,7 @@ reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) -> handle_query_result(Id, Result, BlockWorker). handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.exception'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), BlockWorker; handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when NotWorking == not_connected; NotWorking == blocked @@ -393,12 +394,15 @@ handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), BlockWorker; +handle_query_result(Id, {error, {recoverable_error, _}}, _BlockWorker) -> + %% the message will be queued in replayq or inflight window, + %% i.e. the counter 'queuing' will increase, so we pretend that we have not + %% sent this message. + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1), + true; handle_query_result(Id, {error, _}, BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), BlockWorker; -handle_query_result(Id, {recoverable_error, _}, _BlockWorker) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1), - true; handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> true; handle_query_result(Id, {async_return, {error, _}}, BlockWorker) -> @@ -433,7 +437,7 @@ call_query(QM0, Id, Query, QueryOpts) -> try %% if the callback module (connector) wants to return an error that %% makes the current resource goes into the `blocked` state, it should - %% return `{recoverable_error, Reason}` + %% return `{error, {recoverable_error, Reason}}` EXPR catch ERR:REASON:STACKTRACE -> @@ -449,7 +453,10 @@ call_query(QM0, Id, Query, QueryOpts) -> apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), - ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request); + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), + Result = ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), + Result; apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), @@ -457,7 +464,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?APPLY_RESOURCE( case inflight_is_full(Name, WinSize) of true -> - ?tp(inflight_full, #{id => Id, wind_size => WinSize}), + ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), @@ -474,8 +481,12 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request) <- Batch], - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)), - ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch); + BatchLen = length(Batch), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen), + Result = ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen), + Result; apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), @@ -483,11 +494,12 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?APPLY_RESOURCE( case inflight_is_full(Name, WinSize) of true -> - ?tp(inflight_full, #{id => Id, wind_size => WinSize}), + ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), + BatchLen = length(Batch), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen), ReplyFun = fun ?MODULE:batch_reply_after_query/6, Ref = make_message_ref(), Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, @@ -500,20 +512,29 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ). reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) -> + %% NOTE: 'sent.inflight' is message count that sent but no ACK received, + %% NOT the message number ququed in the inflight window. + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), case reply_caller(Id, ?REPLY(From, Request, Result)) of true -> + %% we marked these messages are 'queuing' although they are in inflight window + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), ?MODULE:block(Pid); false -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), inflight_drop(Name, Ref) end. batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> + %% NOTE: 'sent.inflight' is message count that sent but no ACK received, + %% NOT the message number ququed in the inflight window. + BatchLen = length(Batch), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen), case batch_reply_caller(Id, Result, Batch) of true -> + %% we marked these messages are 'queuing' although they are in inflight window + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen), ?MODULE:block(Pid); false -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -length(Batch)), inflight_drop(Name, Ref) end. %%============================================================================== diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 9e54c8a7b..df284bbe8 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -76,7 +76,7 @@ auto_restart_interval(_) -> undefined. query_mode(type) -> enum([sync, async]); query_mode(desc) -> ?DESC("query_mode"); -query_mode(default) -> sync; +query_mode(default) -> async; query_mode(required) -> false; query_mode(_) -> undefined. diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 4999b9410..a645af7d8 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -96,7 +96,7 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) -> Pid ! {From, {inc, N}}, receive {ReqRef, ok} -> ok; - {ReqRef, incorrect_status} -> {recoverable_error, incorrect_status} + {ReqRef, incorrect_status} -> {error, {recoverable_error, incorrect_status}} after 1000 -> {error, timeout} end; diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index b1d563291..29f2c6bf6 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -510,7 +510,7 @@ nested_put(Alias, Val, Columns0) -> -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found). inc_action_metrics(ok, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'); -inc_action_metrics({recoverable_error, _}, RuleId) -> +inc_action_metrics({error, {recoverable_error, _}}, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index f76eb388e..616842292 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -61,7 +61,7 @@ values(post) -> enable_batch => false, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, - query_mode => sync, + query_mode => async, enable_queue => false, max_queue_bytes => ?DEFAULT_QUEUE_SIZE }