diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 36f8cf0f5..2f15bbcd7 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -57,22 +57,95 @@ end ). --define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{ - matched => MATCH, - success => SUCC, - failed => FAILED, - rate => RATE, - rate_last5m => RATE_5, - rate_max => RATE_MAX -}). --define(metrics(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{ - matched := MATCH, - success := SUCC, - failed := FAILED, - rate := RATE, - rate_last5m := RATE_5, - rate_max := RATE_MAX -}). +-define(EMPTY_METRICS, + ?METRICS( + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 + ) +). + +-define(METRICS( + Batched, + Dropped, + DroppedOther, + DroppedQueueFull, + DroppedQueueNotEnabled, + DroppedResourceNotFound, + DroppedResourceStopped, + Matched, + Queued, + Retried, + Sent, + SentExcpt, + SentFailed, + SentInflight, + SentSucc, + RATE, + RATE_5, + RATE_MAX +), + #{ + 'batched' => Batched, + 'dropped' => Dropped, + 'dropped.other' => DroppedOther, + 'dropped.queue_full' => DroppedQueueFull, + 'dropped.queue_not_enabled' => DroppedQueueNotEnabled, + 'dropped.resource_not_found' => DroppedResourceNotFound, + 'dropped.resource_stopped' => DroppedResourceStopped, + 'matched' => Matched, + 'queued' => Queued, + 'retried' => Retried, + 'sent' => Sent, + 'sent.exception' => SentExcpt, + 'sent.failed' => SentFailed, + 'sent.inflight' => SentInflight, + 'sent.success' => SentSucc, + rate => RATE, + rate_last5m => RATE_5, + rate_max => RATE_MAX + } +). + +-define(metrics( + Batched, + Dropped, + DroppedOther, + DroppedQueueFull, + DroppedQueueNotEnabled, + DroppedResourceNotFound, + DroppedResourceStopped, + Matched, + Queued, + Retried, + Sent, + SentExcpt, + SentFailed, + SentInflight, + SentSucc, + RATE, + RATE_5, + RATE_MAX +), + #{ + 'batched' := Batched, + 'dropped' := Dropped, + 'dropped.other' := DroppedOther, + 'dropped.queue_full' := DroppedQueueFull, + 'dropped.queue_not_enabled' := DroppedQueueNotEnabled, + 'dropped.resource_not_found' := DroppedResourceNotFound, + 'dropped.resource_stopped' := DroppedResourceStopped, + 'matched' := Matched, + 'queued' := Queued, + 'retried' := Retried, + 'sent' := Sent, + 'sent.exception' := SentExcpt, + 'sent.failed' := SentFailed, + 'sent.inflight' := SentInflight, + 'sent.success' := SentSucc, + rate := RATE, + rate_last5m := RATE_5, + rate_max := RATE_MAX + } +). namespace() -> "bridge". @@ -193,11 +266,11 @@ method_example(_Type, put) -> maybe_with_metrics_example(TypeNameExam, get) -> TypeNameExam#{ - metrics => ?METRICS(0, 0, 0, 0, 0, 0), + metrics => ?EMPTY_METRICS, node_metrics => [ #{ node => node(), - metrics => ?METRICS(0, 0, 0, 0, 0, 0) + metrics => ?EMPTY_METRICS } ] }; @@ -217,7 +290,16 @@ info_example_basic(webhook) -> ssl => #{enable => false}, local_topic => <<"emqx_webhook/#">>, method => post, - body => <<"${payload}">> + body => <<"${payload}">>, + resource_opts => #{ + worker_pool_size => 1, + health_check_interval => 15000, + auto_restart_interval => 15000, + query_mode => sync, + async_inflight_window => 100, + enable_queue => true, + max_queue_bytes => 1024 * 1024 * 1024 + } }; info_example_basic(mqtt) -> (mqtt_main_example())#{ @@ -619,19 +701,37 @@ collect_metrics(Bridges) -> [maps:with([node, metrics], B) || B <- Bridges]. aggregate_metrics(AllMetrics) -> - InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0), + InitMetrics = ?EMPTY_METRICS, lists:foldl( fun( - #{metrics := ?metrics(Match1, Succ1, Failed1, Rate1, Rate5m1, RateMax1)}, - ?metrics(Match0, Succ0, Failed0, Rate0, Rate5m0, RateMax0) + #{ + metrics := ?metrics( + M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17, M18 + ) + }, + ?metrics( + N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17, N18 + ) ) -> ?METRICS( - Match1 + Match0, - Succ1 + Succ0, - Failed1 + Failed0, - Rate1 + Rate0, - Rate5m1 + Rate5m0, - RateMax1 + RateMax0 + M1 + N1, + M2 + N2, + M3 + N3, + M4 + N4, + M5 + N5, + M6 + N6, + M7 + N7, + M8 + N8, + M9 + N9, + M10 + N10, + M11 + N11, + M12 + N12, + M13 + N13, + M14 + N14, + M15 + N15, + M16 + N16, + M17 + N17, + M18 + N18 ) end, InitMetrics, @@ -660,12 +760,47 @@ format_resp( }. format_metrics(#{ - counters := #{failed := Failed, exception := Ex, matched := Match, success := Succ}, + counters := #{ + 'batched' := Batched, + 'dropped' := Dropped, + 'dropped.other' := DroppedOther, + 'dropped.queue_full' := DroppedQueueFull, + 'dropped.queue_not_enabled' := DroppedQueueNotEnabled, + 'dropped.resource_not_found' := DroppedResourceNotFound, + 'dropped.resource_stopped' := DroppedResourceStopped, + 'matched' := Matched, + 'queued' := Queued, + 'retried' := Retried, + 'sent' := Sent, + 'sent.exception' := SentExcpt, + 'sent.failed' := SentFailed, + 'sent.inflight' := SentInflight, + 'sent.success' := SentSucc + }, rate := #{ matched := #{current := Rate, last5m := Rate5m, max := RateMax} } }) -> - ?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax). + ?METRICS( + Batched, + Dropped, + DroppedOther, + DroppedQueueFull, + DroppedQueueNotEnabled, + DroppedResourceNotFound, + DroppedResourceStopped, + Matched, + Queued, + Retried, + Sent, + SentExcpt, + SentFailed, + SentInflight, + SentSucc, + Rate, + Rate5m, + RateMax + ). fill_defaults(Type, RawConf) -> PackedConf = pack_bridge_conf(Type, RawConf), diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index f63873fe3..2562bf272 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -286,6 +286,13 @@ on_query( Retry ) of + {error, econnrefused} -> + ?SLOG(warning, #{ + msg => "http_connector_do_request_failed", + reason => econnrefused, + connector => InstId + }), + {recoverable_error, econnrefused}; {error, Reason} = Result -> ?SLOG(error, #{ msg => "http_connector_do_request_failed",