From 53825b9abace7225d52458c9e0c04aaa4b934ff9 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 15 Mar 2023 14:50:40 +0100 Subject: [PATCH] fix(emqx_bridge): propagate connection error to resource status --- apps/emqx_bridge/i18n/emqx_bridge_schema.conf | 11 ++ apps/emqx_bridge/src/emqx_bridge_api.erl | 102 +++++++++-------- .../src/schema/emqx_bridge_schema.erl | 14 ++- .../test/emqx_bridge_api_SUITE.erl | 107 +++++++++++------- apps/emqx_resource/include/emqx_resource.hrl | 1 + .../src/emqx_resource_manager.erl | 1 + changes/ce/fix-10145.en.md | 1 + 7 files changed, 152 insertions(+), 85 deletions(-) create mode 100644 changes/ce/fix-10145.en.md diff --git a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf index 901f25455..de4ceb0d5 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf @@ -54,6 +54,17 @@ emqx_bridge_schema { } } + desc_status_reason { + desc { + en: "This is the reason given in case a bridge is failing to connect." + zh: "桥接连接失败的原因。" + } + label: { + en: "Failure reason" + zh: "失败原因" + } + } + desc_node_status { desc { en: """The status of the bridge for each node. diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 8ac3e476a..c0e69ce83 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -748,7 +748,7 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) -> format_bridge_info_with_metrics([FirstBridge | _] = Bridges) -> Res = maps:remove(node, FirstBridge), - NodeStatus = collect_status(Bridges), + NodeStatus = node_status(Bridges), NodeMetrics = collect_metrics(Bridges), redact(Res#{ status => aggregate_status(NodeStatus), @@ -765,8 +765,8 @@ format_bridge_metrics(Bridges) -> Res = format_bridge_info_with_metrics(Bridges), maps:with([metrics, node_metrics], Res). -collect_status(Bridges) -> - [maps:with([node, status], B) || B <- Bridges]. +node_status(Bridges) -> + [maps:with([node, status, status_reason], B) || B <- Bridges]. aggregate_status(AllStatus) -> Head = fun([A | _]) -> A end, @@ -837,52 +837,63 @@ format_resource( ) ). -format_resource_data(#{status := Status, metrics := Metrics}) -> - #{status => Status, metrics => format_metrics(Metrics)}; -format_resource_data(#{status := Status}) -> - #{status => Status}. +format_resource_data(ResData) -> + maps:fold(fun format_resource_data/3, #{}, maps:with([status, metrics, error], ResData)). -format_metrics(#{ - counters := #{ - 'dropped' := Dropped, - 'dropped.other' := DroppedOther, - 'dropped.expired' := DroppedExpired, - 'dropped.queue_full' := DroppedQueueFull, - 'dropped.resource_not_found' := DroppedResourceNotFound, - 'dropped.resource_stopped' := DroppedResourceStopped, - 'matched' := Matched, - 'retried' := Retried, - 'late_reply' := LateReply, - 'failed' := SentFailed, - 'success' := SentSucc, - 'received' := Rcvd +format_resource_data(error, undefined, Result) -> + Result; +format_resource_data(error, Error, Result) -> + Result#{status_reason => emqx_misc:readable_error_msg(Error)}; +format_resource_data( + metrics, + #{ + counters := #{ + 'dropped' := Dropped, + 'dropped.other' := DroppedOther, + 'dropped.expired' := DroppedExpired, + 'dropped.queue_full' := DroppedQueueFull, + 'dropped.resource_not_found' := DroppedResourceNotFound, + 'dropped.resource_stopped' := DroppedResourceStopped, + 'matched' := Matched, + 'retried' := Retried, + 'late_reply' := LateReply, + 'failed' := SentFailed, + 'success' := SentSucc, + 'received' := Rcvd + }, + gauges := Gauges, + rate := #{ + matched := #{current := Rate, last5m := Rate5m, max := RateMax} + } }, - gauges := Gauges, - rate := #{ - matched := #{current := Rate, last5m := Rate5m, max := RateMax} - } -}) -> + Result +) -> Queued = maps:get('queuing', Gauges, 0), SentInflight = maps:get('inflight', Gauges, 0), - ?METRICS( - Dropped, - DroppedOther, - DroppedExpired, - DroppedQueueFull, - DroppedResourceNotFound, - DroppedResourceStopped, - Matched, - Queued, - Retried, - LateReply, - SentFailed, - SentInflight, - SentSucc, - Rate, - Rate5m, - RateMax, - Rcvd - ). + Result#{ + metrics => + ?METRICS( + Dropped, + DroppedOther, + DroppedExpired, + DroppedQueueFull, + DroppedResourceNotFound, + DroppedResourceStopped, + Matched, + Queued, + Retried, + LateReply, + SentFailed, + SentInflight, + SentSucc, + Rate, + Rate5m, + RateMax, + Rcvd + ) + }; +format_resource_data(K, V, Result) -> + Result#{K => V}. fill_defaults(Type, RawConf) -> PackedConf = pack_bridge_conf(Type, RawConf), @@ -924,6 +935,7 @@ filter_out_request_body(Conf) -> <<"type">>, <<"name">>, <<"status">>, + <<"error">>, <<"node_status">>, <<"node_metrics">>, <<"metrics">>, diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index 74d2a5ca1..6c278a5ec 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -106,6 +106,12 @@ common_bridge_fields() -> status_fields() -> [ {"status", mk(status(), #{desc => ?DESC("desc_status")})}, + {"status_reason", + mk(binary(), #{ + required => false, + desc => ?DESC("desc_status_reason"), + example => <<"Connection refused">> + })}, {"node_status", mk( hoconsc:array(ref(?MODULE, "node_status")), @@ -190,7 +196,13 @@ fields("node_metrics") -> fields("node_status") -> [ node_name(), - {"status", mk(status(), #{})} + {"status", mk(status(), #{})}, + {"status_reason", + mk(binary(), #{ + required => false, + desc => ?DESC("desc_status_reason"), + example => <<"Connection refused">> + })} ]. desc(bridges) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 8b388a771..b919dba6b 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -23,7 +23,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -define(CONF_DEFAULT, <<"bridges: {}">>). --define(BRIDGE_TYPE, <<"webhook">>). +-define(BRIDGE_TYPE_HTTP, <<"webhook">>). -define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))). -define(URL(PORT, PATH), list_to_binary( @@ -48,7 +48,7 @@ }). -define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)). --define(HTTP_BRIDGE(URL, TYPE, NAME), ?BRIDGE(NAME, TYPE)#{ +-define(HTTP_BRIDGE(URL, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_HTTP)#{ <<"url">> => URL, <<"local_topic">> => <<"emqx_webhook/#">>, <<"method">> => <<"post">>, @@ -57,6 +57,7 @@ <<"content-type">> => <<"application/json">> } }). +-define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)). all() -> emqx_common_test_helpers:all(?MODULE). @@ -206,12 +207,12 @@ t_http_crud_apis(Config) -> {ok, 201, Bridge} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ?HTTP_BRIDGE(URL1, Name) ), %ct:pal("---bridge: ~p", [Bridge]), #{ - <<"type">> := ?BRIDGE_TYPE, + <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := _, @@ -219,7 +220,7 @@ t_http_crud_apis(Config) -> <<"url">> := URL1 } = emqx_json:decode(Bridge, [return_maps]), - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), %% send an message to emqx and the message should be forwarded to the HTTP server Body = <<"my msg">>, emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)), @@ -243,11 +244,11 @@ t_http_crud_apis(Config) -> {ok, 200, Bridge2} = request( put, uri(["bridges", BridgeID]), - ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, Name) + ?HTTP_BRIDGE(URL2, Name) ), ?assertMatch( #{ - <<"type">> := ?BRIDGE_TYPE, + <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := _, @@ -262,7 +263,7 @@ t_http_crud_apis(Config) -> ?assertMatch( [ #{ - <<"type">> := ?BRIDGE_TYPE, + <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := _, @@ -279,7 +280,7 @@ t_http_crud_apis(Config) -> {ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch( #{ - <<"type">> := ?BRIDGE_TYPE, + <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := _, @@ -311,7 +312,7 @@ t_http_crud_apis(Config) -> {ok, 404, ErrMsg2} = request( put, uri(["bridges", BridgeID]), - ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, Name) + ?HTTP_BRIDGE(URL2, Name) ), ?assertMatch( #{ @@ -340,6 +341,34 @@ t_http_crud_apis(Config) -> }, emqx_json:decode(ErrMsg3, [return_maps]) ), + + %% Create non working bridge + BrokenURL = ?URL(Port + 1, "/foo"), + {ok, 201, BrokenBridge} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(BrokenURL, Name) + ), + #{ + <<"type">> := ?BRIDGE_TYPE_HTTP, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"disconnected">>, + <<"status_reason">> := <<"Connection refused">>, + <<"node_status">> := [ + #{<<"status">> := <<"disconnected">>, <<"status_reason">> := <<"Connection refused">>} + | _ + ], + <<"url">> := BrokenURL + } = emqx_json:decode(BrokenBridge, [return_maps]), + {ok, 200, FixedBridgeResponse} = request(put, uri(["bridges", BridgeID]), ?HTTP_BRIDGE(URL1)), + #{ + <<"status">> := <<"connected">>, + <<"node_status">> := [FixedNodeStatus = #{<<"status">> := <<"connected">>} | _] + } = FixedBridge = emqx_json:decode(FixedBridgeResponse, [return_maps]), + ?assert(not maps:is_key(<<"status_reason">>, FixedBridge)), + ?assert(not maps:is_key(<<"status_reason">>, FixedNodeStatus)), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), ok. t_http_bridges_local_topic(Config) -> @@ -356,16 +385,16 @@ t_http_bridges_local_topic(Config) -> {ok, 201, _} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name1) + ?HTTP_BRIDGE(URL1, Name1) ), %% and we create another one without local_topic {ok, 201, _} = request( post, uri(["bridges"]), - maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name2)) + maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, Name2)) ), - BridgeID1 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name1), - BridgeID2 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name2), + BridgeID1 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name1), + BridgeID2 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name2), %% Send an message to emqx and the message should be forwarded to the HTTP server. %% This is to verify we can have 2 bridges with and without local_topic fields %% at the same time. @@ -400,11 +429,11 @@ t_check_dependent_actions_on_delete(Config) -> %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), Name = <<"t_http_crud_apis">>, - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), {ok, 201, _} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ?HTTP_BRIDGE(URL1, Name) ), {ok, 201, Rule} = request( post, @@ -438,11 +467,11 @@ t_cascade_delete_actions(Config) -> %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), Name = <<"t_http_crud_apis">>, - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), {ok, 201, _} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ?HTTP_BRIDGE(URL1, Name) ), {ok, 201, Rule} = request( post, @@ -472,7 +501,7 @@ t_cascade_delete_actions(Config) -> {ok, 201, _} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ?HTTP_BRIDGE(URL1, Name) ), {ok, 201, _} = request( post, @@ -496,9 +525,9 @@ t_broken_bpapi_vsn(Config) -> {ok, 201, _Bridge} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ?HTTP_BRIDGE(URL1, Name) ), - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), %% still works since we redirect to 'restart' {ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>), {ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>), @@ -511,9 +540,9 @@ t_old_bpapi_vsn(Config) -> {ok, 201, _Bridge} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ?HTTP_BRIDGE(URL1, Name) ), - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), {ok, 204, <<>>} = request(post, operation_path(cluster, stop, BridgeID), <<"">>), {ok, 204, <<>>} = request(post, operation_path(node, stop, BridgeID), <<"">>), %% still works since we redirect to 'restart' @@ -551,18 +580,18 @@ do_start_stop_bridges(Type, Config) -> {ok, 201, Bridge} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ?HTTP_BRIDGE(URL1, Name) ), %ct:pal("the bridge ==== ~p", [Bridge]), #{ - <<"type">> := ?BRIDGE_TYPE, + <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], <<"url">> := URL1 } = emqx_json:decode(Bridge, [return_maps]), - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), %% stop it {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), @@ -633,18 +662,18 @@ t_enable_disable_bridges(Config) -> {ok, 201, Bridge} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ?HTTP_BRIDGE(URL1, Name) ), %ct:pal("the bridge ==== ~p", [Bridge]), #{ - <<"type">> := ?BRIDGE_TYPE, + <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], <<"url">> := URL1 } = emqx_json:decode(Bridge, [return_maps]), - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), %% disable it {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), @@ -690,18 +719,18 @@ t_reset_bridges(Config) -> {ok, 201, Bridge} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ?HTTP_BRIDGE(URL1, Name) ), %ct:pal("the bridge ==== ~p", [Bridge]), #{ - <<"type">> := ?BRIDGE_TYPE, + <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], <<"url">> := URL1 } = emqx_json:decode(Bridge, [return_maps]), - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), {ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), %% delete the bridge @@ -748,20 +777,20 @@ t_bridges_probe(Config) -> {ok, 204, <<>>} = request( post, uri(["bridges_probe"]), - ?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME) + ?HTTP_BRIDGE(URL) ), %% second time with same name is ok since no real bridge created {ok, 204, <<>>} = request( post, uri(["bridges_probe"]), - ?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME) + ?HTTP_BRIDGE(URL) ), {ok, 400, NxDomain} = request( post, uri(["bridges_probe"]), - ?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>, ?BRIDGE_TYPE, ?BRIDGE_NAME) + ?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>) ), ?assertMatch( #{ @@ -882,12 +911,12 @@ t_metrics(Config) -> {ok, 201, Bridge} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ?HTTP_BRIDGE(URL1, Name) ), %ct:pal("---bridge: ~p", [Bridge]), #{ - <<"type">> := ?BRIDGE_TYPE, + <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := _, @@ -895,7 +924,7 @@ t_metrics(Config) -> <<"url">> := URL1 } = emqx_json:decode(Bridge, [return_maps]), - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), %% check for empty bridge metrics {ok, 200, Bridge1Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), @@ -963,7 +992,7 @@ t_inconsistent_webhook_request_timeouts(Config) -> Name = ?BRIDGE_NAME, BadBridgeParams = emqx_map_lib:deep_merge( - ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name), + ?HTTP_BRIDGE(URL1, Name), #{ <<"request_timeout">> => <<"1s">>, <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>} diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 41be9e8a0..ae22e27e0 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -41,6 +41,7 @@ callback_mode := callback_mode(), query_mode := query_mode(), config := resource_config(), + error := term(), state := resource_state(), status := resource_status(), metrics => emqx_metrics_worker:metrics() diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 40f9fe1ab..1bc6a3b6a 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -666,6 +666,7 @@ maybe_reply(Actions, From, Reply) -> data_record_to_external_map(Data) -> #{ id => Data#data.id, + error => Data#data.error, mod => Data#data.mod, callback_mode => Data#data.callback_mode, query_mode => Data#data.query_mode, diff --git a/changes/ce/fix-10145.en.md b/changes/ce/fix-10145.en.md new file mode 100644 index 000000000..bddbd9085 --- /dev/null +++ b/changes/ce/fix-10145.en.md @@ -0,0 +1 @@ +Fix `bridges` API to report error conditions for a failing bridge as `status_reason`.