From 4a71aa58ceb68ca1e388acb957bcb7d092acbd16 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 12 Dec 2023 10:04:15 -0300 Subject: [PATCH] fix(connector_api): return status reason when status is inconsistent Fixes https://emqx.atlassian.net/browse/EMQX-11581 --- .../emqx_connector/src/emqx_connector_api.erl | 14 ++- .../test/emqx_connector_api_SUITE.erl | 108 ++++++++++-------- 2 files changed, 75 insertions(+), 47 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index af5721585..26387b844 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -589,14 +589,24 @@ pick_connectors_by_id(Type, Name, ConnectorsAllNodes) -> format_connector_info([FirstConnector | _] = Connectors) -> Res = maps:remove(node, FirstConnector), NodeStatus = node_status(Connectors), - redact(Res#{ + StatusReason = first_status_reason(Connectors), + Info0 = Res#{ status => aggregate_status(NodeStatus), node_status => NodeStatus - }). + }, + Info = emqx_utils_maps:put_if(Info0, status_reason, StatusReason, StatusReason =/= undefined), + redact(Info). node_status(Connectors) -> [maps:with([node, status, status_reason], B) || B <- Connectors]. +first_status_reason(Connectors) -> + StatusReasons = [Reason || #{status_reason := Reason} <- Connectors, Reason =/= undefined], + case StatusReasons of + [Reason | _] -> Reason; + _ -> undefined + end. + aggregate_status(AllStatus) -> Head = fun([A | _]) -> A end, HeadVal = maps:get(status, Head(AllStatus), connecting), diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 92fd1de6d..01f340662 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/test_macros.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))). -define(RESOURCE(NAME, TYPE), #{ @@ -103,48 +104,6 @@ }). -define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?CONNECTOR_NAME)). -%% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>). -%% -define(MQTT_CONNECTOR(SERVER, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_MQTT)#{ -%% <<"server">> => SERVER, -%% <<"username">> => <<"user1">>, -%% <<"password">> => <<"">>, -%% <<"proto_ver">> => <<"v5">>, -%% <<"egress">> => #{ -%% <<"remote">> => #{ -%% <<"topic">> => <<"emqx/${topic}">>, -%% <<"qos">> => <<"${qos}">>, -%% <<"retain">> => false -%% } -%% } -%% }). -%% -define(MQTT_CONNECTOR(SERVER), ?MQTT_CONNECTOR(SERVER, <<"mqtt_egress_test_connector">>)). - -%% -define(CONNECTOR_TYPE_HTTP, <<"kafka_producer">>). -%% -define(HTTP_CONNECTOR(URL, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_HTTP)#{ -%% <<"url">> => URL, -%% <<"local_topic">> => <<"emqx_webhook/#">>, -%% <<"method">> => <<"post">>, -%% <<"body">> => <<"${payload}">>, -%% <<"headers">> => #{ -%% % NOTE -%% % The Pascal-Case is important here. -%% % The reason is kinda ridiculous: `emqx_connector_resource:create_dry_run/2` converts -%% % connector config keys into atoms, and the atom 'Content-Type' exists in the ERTS -%% % when this happens (while the 'content-type' does not). -%% <<"Content-Type">> => <<"application/json">> -%% } -%% }). -%% -define(HTTP_CONNECTOR(URL), ?HTTP_CONNECTOR(URL, ?CONNECTOR_NAME)). - -%% -define(URL(PORT, PATH), -%% list_to_binary( -%% io_lib:format( -%% "http://localhost:~s/~s", -%% [integer_to_list(PORT), PATH] -%% ) -%% ) -%% ). - -define(APPSPECS, [ emqx_conf, emqx, @@ -178,11 +137,14 @@ groups() -> t_fail_delete_with_action, t_actions_field ], + ClusterOnlyTests = [ + t_inconsistent_state + ], ClusterLaterJoinOnlyTCs = [ % t_cluster_later_join_metrics ], [ - {single, [], AllTCs -- ClusterLaterJoinOnlyTCs}, + {single, [], (AllTCs -- ClusterLaterJoinOnlyTCs) -- ClusterOnlyTests}, {cluster_later_join, [], ClusterLaterJoinOnlyTCs}, {cluster, [], (AllTCs -- SingleOnlyTests) -- ClusterLaterJoinOnlyTCs} ]. @@ -268,6 +230,8 @@ init_mocks(_TestCase) -> fun (<<"connector:", ?CONNECTOR_TYPE_STR, ":bad_", _/binary>>, _C) -> {ok, bad_connector_state}; + (_I, #{bootstrap_hosts := <<"nope:9092">>}) -> + {ok, worst_connector_state}; (_I, _C) -> {ok, connector_state} end @@ -277,8 +241,17 @@ init_mocks(_TestCase) -> ?CONNECTOR_IMPL, on_get_status, fun - (_, bad_connector_state) -> connecting; - (_, _) -> connected + (_, bad_connector_state) -> + connecting; + (_, worst_connector_state) -> + {?status_disconnected, worst_connector_state, [ + #{ + host => <<"nope:9092">>, + reason => unresolvable_hostname + } + ]}; + (_, _) -> + connected end ), meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}), @@ -858,6 +831,51 @@ t_raw_config_response_defaults(Config) -> ), ok. +t_inconsistent_state(Config) -> + [_, Node2] = ?config(cluster_nodes, Config), + Params = ?KAFKA_CONNECTOR(?CONNECTOR_NAME), + ?assertMatch( + {ok, 201, #{<<"enable">> := true, <<"resource_opts">> := #{}}}, + request_json( + post, + uri(["connectors"]), + Params, + Config + ) + ), + BadParams = maps:without( + [<<"name">>, <<"type">>], + Params#{<<"bootstrap_hosts">> := <<"nope:9092">>} + ), + {ok, _} = erpc:call( + Node2, + emqx, + update_config, + [[connectors, ?CONNECTOR_TYPE, ?CONNECTOR_NAME], BadParams, #{}] + ), + + ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, ?CONNECTOR_NAME), + ?assertMatch( + {ok, 200, #{ + <<"status">> := <<"inconsistent">>, + <<"node_status">> := [ + #{<<"status">> := <<"connected">>}, + #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := _ + } + ], + <<"status_reason">> := _ + }}, + request_json( + get, + uri(["connectors", ConnectorID]), + Config + ) + ), + + ok. + %%% helpers listen_on_random_port() -> SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],