fix: update the counters for data bridges

This commit is contained in:
Shawn 2022-08-30 12:28:01 +08:00
parent e0a6a61d73
commit 65dfa63324
2 changed files with 172 additions and 30 deletions

View File

@ -57,22 +57,95 @@
end end
). ).
-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{ -define(EMPTY_METRICS,
matched => MATCH, ?METRICS(
success => SUCC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
failed => FAILED, )
).
-define(METRICS(
Batched,
Dropped,
DroppedOther,
DroppedQueueFull,
DroppedQueueNotEnabled,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Queued,
Retried,
Sent,
SentExcpt,
SentFailed,
SentInflight,
SentSucc,
RATE,
RATE_5,
RATE_MAX
),
#{
'batched' => Batched,
'dropped' => Dropped,
'dropped.other' => DroppedOther,
'dropped.queue_full' => DroppedQueueFull,
'dropped.queue_not_enabled' => DroppedQueueNotEnabled,
'dropped.resource_not_found' => DroppedResourceNotFound,
'dropped.resource_stopped' => DroppedResourceStopped,
'matched' => Matched,
'queued' => Queued,
'retried' => Retried,
'sent' => Sent,
'sent.exception' => SentExcpt,
'sent.failed' => SentFailed,
'sent.inflight' => SentInflight,
'sent.success' => SentSucc,
rate => RATE, rate => RATE,
rate_last5m => RATE_5, rate_last5m => RATE_5,
rate_max => RATE_MAX rate_max => RATE_MAX
}). }
-define(metrics(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{ ).
matched := MATCH,
success := SUCC, -define(metrics(
failed := FAILED, Batched,
Dropped,
DroppedOther,
DroppedQueueFull,
DroppedQueueNotEnabled,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Queued,
Retried,
Sent,
SentExcpt,
SentFailed,
SentInflight,
SentSucc,
RATE,
RATE_5,
RATE_MAX
),
#{
'batched' := Batched,
'dropped' := Dropped,
'dropped.other' := DroppedOther,
'dropped.queue_full' := DroppedQueueFull,
'dropped.queue_not_enabled' := DroppedQueueNotEnabled,
'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
'queued' := Queued,
'retried' := Retried,
'sent' := Sent,
'sent.exception' := SentExcpt,
'sent.failed' := SentFailed,
'sent.inflight' := SentInflight,
'sent.success' := SentSucc,
rate := RATE, rate := RATE,
rate_last5m := RATE_5, rate_last5m := RATE_5,
rate_max := RATE_MAX rate_max := RATE_MAX
}). }
).
namespace() -> "bridge". namespace() -> "bridge".
@ -193,11 +266,11 @@ method_example(_Type, put) ->
maybe_with_metrics_example(TypeNameExam, get) -> maybe_with_metrics_example(TypeNameExam, get) ->
TypeNameExam#{ TypeNameExam#{
metrics => ?METRICS(0, 0, 0, 0, 0, 0), metrics => ?EMPTY_METRICS,
node_metrics => [ node_metrics => [
#{ #{
node => node(), node => node(),
metrics => ?METRICS(0, 0, 0, 0, 0, 0) metrics => ?EMPTY_METRICS
} }
] ]
}; };
@ -217,7 +290,16 @@ info_example_basic(webhook) ->
ssl => #{enable => false}, ssl => #{enable => false},
local_topic => <<"emqx_webhook/#">>, local_topic => <<"emqx_webhook/#">>,
method => post, method => post,
body => <<"${payload}">> body => <<"${payload}">>,
resource_opts => #{
worker_pool_size => 1,
health_check_interval => 15000,
auto_restart_interval => 15000,
query_mode => sync,
async_inflight_window => 100,
enable_queue => true,
max_queue_bytes => 1024 * 1024 * 1024
}
}; };
info_example_basic(mqtt) -> info_example_basic(mqtt) ->
(mqtt_main_example())#{ (mqtt_main_example())#{
@ -619,19 +701,37 @@ collect_metrics(Bridges) ->
[maps:with([node, metrics], B) || B <- Bridges]. [maps:with([node, metrics], B) || B <- Bridges].
aggregate_metrics(AllMetrics) -> aggregate_metrics(AllMetrics) ->
InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0), InitMetrics = ?EMPTY_METRICS,
lists:foldl( lists:foldl(
fun( fun(
#{metrics := ?metrics(Match1, Succ1, Failed1, Rate1, Rate5m1, RateMax1)}, #{
?metrics(Match0, Succ0, Failed0, Rate0, Rate5m0, RateMax0) metrics := ?metrics(
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17, M18
)
},
?metrics(
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17, N18
)
) -> ) ->
?METRICS( ?METRICS(
Match1 + Match0, M1 + N1,
Succ1 + Succ0, M2 + N2,
Failed1 + Failed0, M3 + N3,
Rate1 + Rate0, M4 + N4,
Rate5m1 + Rate5m0, M5 + N5,
RateMax1 + RateMax0 M6 + N6,
M7 + N7,
M8 + N8,
M9 + N9,
M10 + N10,
M11 + N11,
M12 + N12,
M13 + N13,
M14 + N14,
M15 + N15,
M16 + N16,
M17 + N17,
M18 + N18
) )
end, end,
InitMetrics, InitMetrics,
@ -660,12 +760,47 @@ format_resp(
}. }.
format_metrics(#{ format_metrics(#{
counters := #{failed := Failed, exception := Ex, matched := Match, success := Succ}, counters := #{
'batched' := Batched,
'dropped' := Dropped,
'dropped.other' := DroppedOther,
'dropped.queue_full' := DroppedQueueFull,
'dropped.queue_not_enabled' := DroppedQueueNotEnabled,
'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
'queued' := Queued,
'retried' := Retried,
'sent' := Sent,
'sent.exception' := SentExcpt,
'sent.failed' := SentFailed,
'sent.inflight' := SentInflight,
'sent.success' := SentSucc
},
rate := #{ rate := #{
matched := #{current := Rate, last5m := Rate5m, max := RateMax} matched := #{current := Rate, last5m := Rate5m, max := RateMax}
} }
}) -> }) ->
?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax). ?METRICS(
Batched,
Dropped,
DroppedOther,
DroppedQueueFull,
DroppedQueueNotEnabled,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Queued,
Retried,
Sent,
SentExcpt,
SentFailed,
SentInflight,
SentSucc,
Rate,
Rate5m,
RateMax
).
fill_defaults(Type, RawConf) -> fill_defaults(Type, RawConf) ->
PackedConf = pack_bridge_conf(Type, RawConf), PackedConf = pack_bridge_conf(Type, RawConf),

View File

@ -286,6 +286,13 @@ on_query(
Retry Retry
) )
of of
{error, econnrefused} ->
?SLOG(warning, #{
msg => "http_connector_do_request_failed",
reason => econnrefused,
connector => InstId
}),
{recoverable_error, econnrefused};
{error, Reason} = Result -> {error, Reason} = Result ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "http_connector_do_request_failed", msg => "http_connector_do_request_failed",