Merge pull request #8859 from terry-xiaoyu/refactor_counters_for_resources

Some improvements and bug fixes to emqx_resources
This commit is contained in:
Xinyu Liu 2022-09-03 12:16:49 +08:00 committed by GitHub
commit 7fed4faac3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 108 additions and 77 deletions

View File

@ -78,7 +78,7 @@ emqx_bridge_schema {
} }
} }
metric_batched { metric_batching {
desc { desc {
en: """Count of messages that are currently accumulated in memory waiting for sending in one batch.""" en: """Count of messages that are currently accumulated in memory waiting for sending in one batch."""
zh: """当前积压在内存里,等待批量发送的消息个数""" zh: """当前积压在内存里,等待批量发送的消息个数"""
@ -161,9 +161,9 @@ emqx_bridge_schema {
} }
} }
metric_queued { metric_queuing {
desc { desc {
en: """Count of messages that are currently queued.""" en: """Count of messages that are currently queuing."""
zh: """当前被缓存到磁盘队列的消息个数。""" zh: """当前被缓存到磁盘队列的消息个数。"""
} }
label: { label: {
@ -181,16 +181,6 @@ emqx_bridge_schema {
zh: "已发送" zh: "已发送"
} }
} }
metric_sent_exception {
desc {
en: """Count of messages that were sent but exceptions occur."""
zh: """发送出现异常的消息个数。"""
}
label: {
en: "Sent Exception"
zh: "发送异常"
}
}
metric_sent_failed { metric_sent_failed {
desc { desc {

View File

@ -1,6 +1,6 @@
-define(EMPTY_METRICS, -define(EMPTY_METRICS,
?METRICS( ?METRICS(
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
) )
). ).
@ -15,7 +15,6 @@
Matched, Matched,
Queued, Queued,
Sent, Sent,
SentExcpt,
SentFailed, SentFailed,
SentInflight, SentInflight,
SentSucc, SentSucc,
@ -25,7 +24,7 @@
Rcvd Rcvd
), ),
#{ #{
'batched' => Batched, 'batching' => Batched,
'dropped' => Dropped, 'dropped' => Dropped,
'dropped.other' => DroppedOther, 'dropped.other' => DroppedOther,
'dropped.queue_full' => DroppedQueueFull, 'dropped.queue_full' => DroppedQueueFull,
@ -33,9 +32,8 @@
'dropped.resource_not_found' => DroppedResourceNotFound, 'dropped.resource_not_found' => DroppedResourceNotFound,
'dropped.resource_stopped' => DroppedResourceStopped, 'dropped.resource_stopped' => DroppedResourceStopped,
'matched' => Matched, 'matched' => Matched,
'queued' => Queued, 'queuing' => Queued,
'sent' => Sent, 'sent' => Sent,
'sent.exception' => SentExcpt,
'sent.failed' => SentFailed, 'sent.failed' => SentFailed,
'sent.inflight' => SentInflight, 'sent.inflight' => SentInflight,
'sent.success' => SentSucc, 'sent.success' => SentSucc,
@ -57,7 +55,6 @@
Matched, Matched,
Queued, Queued,
Sent, Sent,
SentExcpt,
SentFailed, SentFailed,
SentInflight, SentInflight,
SentSucc, SentSucc,
@ -67,7 +64,7 @@
Rcvd Rcvd
), ),
#{ #{
'batched' := Batched, 'batching' := Batched,
'dropped' := Dropped, 'dropped' := Dropped,
'dropped.other' := DroppedOther, 'dropped.other' := DroppedOther,
'dropped.queue_full' := DroppedQueueFull, 'dropped.queue_full' := DroppedQueueFull,
@ -75,9 +72,8 @@
'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped, 'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched, 'matched' := Matched,
'queued' := Queued, 'queuing' := Queued,
'sent' := Sent, 'sent' := Sent,
'sent.exception' := SentExcpt,
'sent.failed' := SentFailed, 'sent.failed' := SentFailed,
'sent.inflight' := SentInflight, 'sent.inflight' := SentInflight,
'sent.success' := SentSucc, 'sent.success' := SentSucc,

View File

@ -206,7 +206,7 @@ info_example_basic(webhook) ->
worker_pool_size => 1, worker_pool_size => 1,
health_check_interval => 15000, health_check_interval => 15000,
auto_restart_interval => 15000, auto_restart_interval => 15000,
query_mode => sync, query_mode => async,
async_inflight_window => 100, async_inflight_window => 100,
enable_queue => true, enable_queue => true,
max_queue_bytes => 1024 * 1024 * 1024 max_queue_bytes => 1024 * 1024 * 1024
@ -624,11 +624,11 @@ aggregate_metrics(AllMetrics) ->
fun( fun(
#{ #{
metrics := ?metrics( metrics := ?metrics(
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17, M18 M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
) )
}, },
?metrics( ?metrics(
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17, N18 N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
) )
) -> ) ->
?METRICS( ?METRICS(
@ -648,8 +648,7 @@ aggregate_metrics(AllMetrics) ->
M14 + N14, M14 + N14,
M15 + N15, M15 + N15,
M16 + N16, M16 + N16,
M17 + N17, M17 + N17
M18 + N18
) )
end, end,
InitMetrics, InitMetrics,
@ -679,7 +678,7 @@ format_resp(
format_metrics(#{ format_metrics(#{
counters := #{ counters := #{
'batched' := Batched, 'batching' := Batched,
'dropped' := Dropped, 'dropped' := Dropped,
'dropped.other' := DroppedOther, 'dropped.other' := DroppedOther,
'dropped.queue_full' := DroppedQueueFull, 'dropped.queue_full' := DroppedQueueFull,
@ -687,9 +686,8 @@ format_metrics(#{
'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped, 'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched, 'matched' := Matched,
'queued' := Queued, 'queuing' := Queued,
'sent' := Sent, 'sent' := Sent,
'sent.exception' := SentExcpt,
'sent.failed' := SentFailed, 'sent.failed' := SentFailed,
'sent.inflight' := SentInflight, 'sent.inflight' := SentInflight,
'sent.success' := SentSucc, 'sent.success' := SentSucc,
@ -710,7 +708,6 @@ format_metrics(#{
Matched, Matched,
Queued, Queued,
Sent, Sent,
SentExcpt,
SentFailed, SentFailed,
SentInflight, SentInflight,
SentSucc, SentSucc,

View File

@ -102,7 +102,7 @@ fields(bridges) ->
] ++ ee_fields_bridges(); ] ++ ee_fields_bridges();
fields("metrics") -> fields("metrics") ->
[ [
{"batched", mk(integer(), #{desc => ?DESC("metric_batched")})}, {"batching", mk(integer(), #{desc => ?DESC("metric_batching")})},
{"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})}, {"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})},
{"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})}, {"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})},
{"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})}, {"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})},
@ -113,9 +113,8 @@ fields("metrics") ->
{"dropped.resource_stopped", {"dropped.resource_stopped",
mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})}, mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})},
{"matched", mk(integer(), #{desc => ?DESC("metric_matched")})}, {"matched", mk(integer(), #{desc => ?DESC("metric_matched")})},
{"queued", mk(integer(), #{desc => ?DESC("metric_queued")})}, {"queuing", mk(integer(), #{desc => ?DESC("metric_queuing")})},
{"sent", mk(integer(), #{desc => ?DESC("metric_sent")})}, {"sent", mk(integer(), #{desc => ?DESC("metric_sent")})},
{"sent.exception", mk(integer(), #{desc => ?DESC("metric_sent_exception")})},
{"sent.failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})}, {"sent.failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})},
{"sent.inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})}, {"sent.inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})},
{"sent.success", mk(integer(), #{desc => ?DESC("metric_sent_success")})}, {"sent.success", mk(integer(), #{desc => ?DESC("metric_sent_success")})},

View File

@ -31,7 +31,8 @@
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_query_async/4, on_query_async/4,
on_get_status/2 on_get_status/2,
reply_delegator/2
]). ]).
-type url() :: emqx_http_lib:uri_map(). -type url() :: emqx_http_lib:uri_map().
@ -46,7 +47,7 @@
namespace/0 namespace/0
]). ]).
-export([check_ssl_opts/2]). -export([check_ssl_opts/2, validate_method/1]).
-type connect_timeout() :: emqx_schema:duration() | infinity. -type connect_timeout() :: emqx_schema:duration() | infinity.
-type pool_type() :: random | hash. -type pool_type() :: random | hash.
@ -137,8 +138,10 @@ fields(config) ->
fields("request") -> fields("request") ->
[ [
{method, {method,
hoconsc:mk(hoconsc:enum([post, put, get, delete]), #{ hoconsc:mk(binary(), #{
required => false, desc => ?DESC("method") required => false,
desc => ?DESC("method"),
validator => fun ?MODULE:validate_method/1
})}, })},
{path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})}, {path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})},
{body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})}, {body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})},
@ -171,6 +174,17 @@ desc(_) ->
validations() -> validations() ->
[{check_ssl_opts, fun check_ssl_opts/1}]. [{check_ssl_opts, fun check_ssl_opts/1}].
validate_method(M) when M =:= <<"post">>; M =:= <<"put">>; M =:= <<"get">>; M =:= <<"delete">> ->
ok;
validate_method(M) ->
case string:find(M, "${") of
nomatch ->
{error,
<<"Invalid method, should be one of 'post', 'put', 'get', 'delete' or variables in ${field} format.">>};
_ ->
ok
end.
sc(Type, Meta) -> hoconsc:mk(Type, Meta). sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field). ref(Field) -> hoconsc:ref(?MODULE, Field).
@ -286,13 +300,13 @@ on_query(
Retry Retry
) )
of of
{error, econnrefused} -> {error, Reason} when Reason =:= econnrefused; Reason =:= timeout ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "http_connector_do_request_failed", msg => "http_connector_do_request_failed",
reason => econnrefused, reason => Reason,
connector => InstId connector => InstId
}), }),
{recoverable_error, econnrefused}; {error, {recoverable_error, Reason}};
{error, Reason} = Result -> {error, Reason} = Result ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "http_connector_do_request_failed", msg => "http_connector_do_request_failed",
@ -365,7 +379,7 @@ on_query_async(
Method, Method,
NRequest, NRequest,
Timeout, Timeout,
ReplyFunAndArgs {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]}
). ).
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
@ -521,3 +535,12 @@ bin(Str) when is_list(Str) ->
list_to_binary(Str); list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> bin(Atom) when is_atom(Atom) ->
atom_to_binary(Atom, utf8). atom_to_binary(Atom, utf8).
reply_delegator(ReplyFunAndArgs, Result) ->
case Result of
{error, Reason} when Reason =:= econnrefused; Reason =:= timeout ->
Result1 = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
_ ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
end.

View File

@ -420,7 +420,7 @@ on_sql_query(
error, error,
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
), ),
{recoverable_error, Reason}; {error, {recoverable_error, Reason}};
{error, Reason} -> {error, Reason} ->
?SLOG( ?SLOG(
error, error,

View File

@ -76,8 +76,8 @@
-type query_result() :: -type query_result() ::
ok ok
| {ok, term()} | {ok, term()}
| {error, term()} | {error, {recoverable_error, term()}}
| {recoverable_error, term()}. | {error, term()}.
-define(WORKER_POOL_SIZE, 16). -define(WORKER_POOL_SIZE, 16).

View File

@ -110,7 +110,7 @@
list_group_instances/1 list_group_instances/1
]). ]).
-export([inc_received/1]). -export([inc_received/1, apply_reply_fun/2]).
-optional_callbacks([ -optional_callbacks([
on_query/3, on_query/3,
@ -441,6 +441,12 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
Error -> Error Error -> Error
end. end.
apply_reply_fun({F, A}, Result) when is_function(F) ->
_ = erlang:apply(F, A ++ [Result]),
ok;
apply_reply_fun(From, Result) ->
gen_server:reply(From, Result).
%% ================================================================================= %% =================================================================================
inc_received(ResId) -> inc_received(ResId) ->

View File

@ -132,11 +132,10 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
'matched', 'matched',
'sent', 'sent',
'dropped', 'dropped',
'queued', 'queuing',
'batched', 'batching',
'sent.success', 'sent.success',
'sent.failed', 'sent.failed',
'sent.exception',
'sent.inflight', 'sent.inflight',
'dropped.queue_not_enabled', 'dropped.queue_not_enabled',
'dropped.queue_full', 'dropped.queue_full',

View File

@ -131,7 +131,7 @@ init({Id, Index, Opts}) ->
false -> false ->
undefined undefined
end, end,
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', queue_count(Queue)), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)),
ok = inflight_new(Name), ok = inflight_new(Name),
St = #{ St = #{
id => Id, id => Id,
@ -154,12 +154,12 @@ running(enter, _, _St) ->
running(cast, resume, _St) -> running(cast, resume, _St) ->
keep_state_and_data; keep_state_and_data;
running(cast, block, St) -> running(cast, block, St) ->
{next_state, block, St}; {next_state, blocked, St};
running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
is_list(Batch) is_list(Batch)
-> ->
Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
{next_state, block, St#{queue := Q1}}; {next_state, blocked, St#{queue := Q1}};
running({call, From}, {query, Request, _Opts}, St) -> running({call, From}, {query, Request, _Opts}, St) ->
query_or_acc(From, Request, St); query_or_acc(From, Request, St);
running(cast, {query, Request, Opts}, St) -> running(cast, {query, Request, Opts}, St) ->
@ -273,12 +273,12 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S
drop_head(Id, Q) -> drop_head(Id, Q) ->
{Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
ok = replayq:ack(Q1, AckRef), ok = replayq:ack(Q1, AckRef),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -1), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1),
Q1. Q1.
query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) ->
Acc1 = [?QUERY(From, Request) | Acc], Acc1 = [?QUERY(From, Request) | Acc],
emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'),
St = St0#{acc := Acc1, acc_left := Left - 1}, St = St0#{acc := Acc1, acc_left := Left - 1},
case Left =< 1 of case Left =< 1 of
true -> flush(St); true -> flush(St);
@ -303,16 +303,17 @@ flush(#{acc := []} = St) ->
flush( flush(
#{ #{
id := Id, id := Id,
acc := Batch, acc := Batch0,
batch_size := Size, batch_size := Size,
queue := Q0 queue := Q0
} = St } = St
) -> ) ->
Batch = lists:reverse(Batch0),
QueryOpts = #{ QueryOpts = #{
inflight_name => maps:get(name, St), inflight_name => maps:get(name, St),
inflight_window => maps:get(async_inflight_window, St) inflight_window => maps:get(async_inflight_window, St)
}, },
emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched', -length(Batch)), emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)),
Result = call_query(configured, Id, Batch, QueryOpts), Result = call_query(configured, Id, Batch, QueryOpts),
St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
case batch_reply_caller(Id, Result, Batch) of case batch_reply_caller(Id, Result, Batch) of
@ -337,13 +338,13 @@ maybe_append_queue(Id, Q, Items) ->
{Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts),
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
Dropped = length(Items2), Dropped = length(Items2),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'),
?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
Q1 Q1
end, end,
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
replayq:append(Q2, Items). replayq:append(Q2, Items).
batch_reply_caller(Id, BatchResult, Batch) -> batch_reply_caller(Id, BatchResult, Batch) ->
@ -365,7 +366,7 @@ reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) ->
reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) -> reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) ->
_ = _ =
case Result of case Result of
{async_return, _} -> ok; {async_return, _} -> no_reply_for_now;
_ -> apply(ReplyFun, Args ++ [Result]) _ -> apply(ReplyFun, Args ++ [Result])
end, end,
handle_query_result(Id, Result, BlockWorker); handle_query_result(Id, Result, BlockWorker);
@ -374,7 +375,7 @@ reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) ->
handle_query_result(Id, Result, BlockWorker). handle_query_result(Id, Result, BlockWorker).
handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) -> handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.exception'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
BlockWorker; BlockWorker;
handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when
NotWorking == not_connected; NotWorking == blocked NotWorking == not_connected; NotWorking == blocked
@ -393,12 +394,15 @@ handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
BlockWorker; BlockWorker;
handle_query_result(Id, {error, {recoverable_error, _}}, _BlockWorker) ->
%% the message will be queued in replayq or inflight window,
%% i.e. the counter 'queuing' will increase, so we pretend that we have not
%% sent this message.
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1),
true;
handle_query_result(Id, {error, _}, BlockWorker) -> handle_query_result(Id, {error, _}, BlockWorker) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
BlockWorker; BlockWorker;
handle_query_result(Id, {recoverable_error, _}, _BlockWorker) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1),
true;
handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) ->
true; true;
handle_query_result(Id, {async_return, {error, _}}, BlockWorker) -> handle_query_result(Id, {async_return, {error, _}}, BlockWorker) ->
@ -433,7 +437,7 @@ call_query(QM0, Id, Query, QueryOpts) ->
try try
%% if the callback module (connector) wants to return an error that %% if the callback module (connector) wants to return an error that
%% makes the current resource goes into the `blocked` state, it should %% makes the current resource goes into the `blocked` state, it should
%% return `{recoverable_error, Reason}` %% return `{error, {recoverable_error, Reason}}`
EXPR EXPR
catch catch
ERR:REASON:STACKTRACE -> ERR:REASON:STACKTRACE ->
@ -449,7 +453,10 @@ call_query(QM0, Id, Query, QueryOpts) ->
apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) ->
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request); ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
Result = ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
Result;
apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined), Name = maps:get(inflight_name, QueryOpts, undefined),
@ -457,7 +464,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
?APPLY_RESOURCE( ?APPLY_RESOURCE(
case inflight_is_full(Name, WinSize) of case inflight_is_full(Name, WinSize) of
true -> true ->
?tp(inflight_full, #{id => Id, wind_size => WinSize}), ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
@ -474,8 +481,12 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Requests = [Request || ?QUERY(_From, Request) <- Batch], Requests = [Request || ?QUERY(_From, Request) <- Batch],
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)), BatchLen = length(Batch),
?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch); ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen),
Result = ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen),
Result;
apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined), Name = maps:get(inflight_name, QueryOpts, undefined),
@ -483,11 +494,12 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
?APPLY_RESOURCE( ?APPLY_RESOURCE(
case inflight_is_full(Name, WinSize) of case inflight_is_full(Name, WinSize) of
true -> true ->
?tp(inflight_full, #{id => Id, wind_size => WinSize}), ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)), BatchLen = length(Batch),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen),
ReplyFun = fun ?MODULE:batch_reply_after_query/6, ReplyFun = fun ?MODULE:batch_reply_after_query/6,
Ref = make_message_ref(), Ref = make_message_ref(),
Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
@ -500,20 +512,29 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
). ).
reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) -> reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) ->
%% NOTE: 'sent.inflight' is message count that sent but no ACK received,
%% NOT the message number ququed in the inflight window.
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
case reply_caller(Id, ?REPLY(From, Request, Result)) of case reply_caller(Id, ?REPLY(From, Request, Result)) of
true -> true ->
%% we marked these messages are 'queuing' although they are in inflight window
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
?MODULE:block(Pid); ?MODULE:block(Pid);
false -> false ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
inflight_drop(Name, Ref) inflight_drop(Name, Ref)
end. end.
batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
%% NOTE: 'sent.inflight' is message count that sent but no ACK received,
%% NOT the message number ququed in the inflight window.
BatchLen = length(Batch),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen),
case batch_reply_caller(Id, Result, Batch) of case batch_reply_caller(Id, Result, Batch) of
true -> true ->
%% we marked these messages are 'queuing' although they are in inflight window
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen),
?MODULE:block(Pid); ?MODULE:block(Pid);
false -> false ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -length(Batch)),
inflight_drop(Name, Ref) inflight_drop(Name, Ref)
end. end.
%%============================================================================== %%==============================================================================

View File

@ -76,7 +76,7 @@ auto_restart_interval(_) -> undefined.
query_mode(type) -> enum([sync, async]); query_mode(type) -> enum([sync, async]);
query_mode(desc) -> ?DESC("query_mode"); query_mode(desc) -> ?DESC("query_mode");
query_mode(default) -> sync; query_mode(default) -> async;
query_mode(required) -> false; query_mode(required) -> false;
query_mode(_) -> undefined. query_mode(_) -> undefined.

View File

@ -96,7 +96,7 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
Pid ! {From, {inc, N}}, Pid ! {From, {inc, N}},
receive receive
{ReqRef, ok} -> ok; {ReqRef, ok} -> ok;
{ReqRef, incorrect_status} -> {recoverable_error, incorrect_status} {ReqRef, incorrect_status} -> {error, {recoverable_error, incorrect_status}}
after 1000 -> after 1000 ->
{error, timeout} {error, timeout}
end; end;

View File

@ -510,7 +510,7 @@ nested_put(Alias, Val, Columns0) ->
-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found). -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found).
inc_action_metrics(ok, RuleId) -> inc_action_metrics(ok, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'); emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success');
inc_action_metrics({recoverable_error, _}, RuleId) -> inc_action_metrics({error, {recoverable_error, _}}, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) -> inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');

View File

@ -61,7 +61,7 @@ values(post) ->
enable_batch => false, enable_batch => false,
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync, query_mode => async,
enable_queue => false, enable_queue => false,
max_queue_bytes => ?DEFAULT_QUEUE_SIZE max_queue_bytes => ?DEFAULT_QUEUE_SIZE
} }