Merge pull request #12155 from thalesmg/fix-conn-inconsistent-status-r54-20231212

fix(connector_api): return status reason when status is inconsistent
This commit is contained in:
Thales Macedo Garitezi 2023-12-12 17:23:22 -03:00 committed by GitHub
commit 7f0cbcc323
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 47 deletions

View File

@ -589,14 +589,24 @@ pick_connectors_by_id(Type, Name, ConnectorsAllNodes) ->
format_connector_info([FirstConnector | _] = Connectors) ->
Res = maps:remove(node, FirstConnector),
NodeStatus = node_status(Connectors),
redact(Res#{
StatusReason = first_status_reason(Connectors),
Info0 = Res#{
status => aggregate_status(NodeStatus),
node_status => NodeStatus
}).
},
Info = emqx_utils_maps:put_if(Info0, status_reason, StatusReason, StatusReason =/= undefined),
redact(Info).
node_status(Connectors) ->
[maps:with([node, status, status_reason], B) || B <- Connectors].
first_status_reason(Connectors) ->
StatusReasons = [Reason || #{status_reason := Reason} <- Connectors, Reason =/= undefined],
case StatusReasons of
[Reason | _] -> Reason;
_ -> undefined
end.
aggregate_status(AllStatus) ->
Head = fun([A | _]) -> A end,
HeadVal = maps:get(status, Head(AllStatus), connecting),

View File

@ -23,6 +23,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/test_macros.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))).
-define(RESOURCE(NAME, TYPE), #{
@ -103,48 +104,6 @@
}).
-define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?CONNECTOR_NAME)).
%% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>).
%% -define(MQTT_CONNECTOR(SERVER, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_MQTT)#{
%% <<"server">> => SERVER,
%% <<"username">> => <<"user1">>,
%% <<"password">> => <<"">>,
%% <<"proto_ver">> => <<"v5">>,
%% <<"egress">> => #{
%% <<"remote">> => #{
%% <<"topic">> => <<"emqx/${topic}">>,
%% <<"qos">> => <<"${qos}">>,
%% <<"retain">> => false
%% }
%% }
%% }).
%% -define(MQTT_CONNECTOR(SERVER), ?MQTT_CONNECTOR(SERVER, <<"mqtt_egress_test_connector">>)).
%% -define(CONNECTOR_TYPE_HTTP, <<"kafka_producer">>).
%% -define(HTTP_CONNECTOR(URL, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_HTTP)#{
%% <<"url">> => URL,
%% <<"local_topic">> => <<"emqx_webhook/#">>,
%% <<"method">> => <<"post">>,
%% <<"body">> => <<"${payload}">>,
%% <<"headers">> => #{
%% % NOTE
%% % The Pascal-Case is important here.
%% % The reason is kinda ridiculous: `emqx_connector_resource:create_dry_run/2` converts
%% % connector config keys into atoms, and the atom 'Content-Type' exists in the ERTS
%% % when this happens (while the 'content-type' does not).
%% <<"Content-Type">> => <<"application/json">>
%% }
%% }).
%% -define(HTTP_CONNECTOR(URL), ?HTTP_CONNECTOR(URL, ?CONNECTOR_NAME)).
%% -define(URL(PORT, PATH),
%% list_to_binary(
%% io_lib:format(
%% "http://localhost:~s/~s",
%% [integer_to_list(PORT), PATH]
%% )
%% )
%% ).
-define(APPSPECS, [
emqx_conf,
emqx,
@ -178,11 +137,14 @@ groups() ->
t_fail_delete_with_action,
t_actions_field
],
ClusterOnlyTests = [
t_inconsistent_state
],
ClusterLaterJoinOnlyTCs = [
% t_cluster_later_join_metrics
],
[
{single, [], AllTCs -- ClusterLaterJoinOnlyTCs},
{single, [], (AllTCs -- ClusterLaterJoinOnlyTCs) -- ClusterOnlyTests},
{cluster_later_join, [], ClusterLaterJoinOnlyTCs},
{cluster, [], (AllTCs -- SingleOnlyTests) -- ClusterLaterJoinOnlyTCs}
].
@ -268,6 +230,8 @@ init_mocks(_TestCase) ->
fun
(<<"connector:", ?CONNECTOR_TYPE_STR, ":bad_", _/binary>>, _C) ->
{ok, bad_connector_state};
(_I, #{bootstrap_hosts := <<"nope:9092">>}) ->
{ok, worst_connector_state};
(_I, _C) ->
{ok, connector_state}
end
@ -277,8 +241,17 @@ init_mocks(_TestCase) ->
?CONNECTOR_IMPL,
on_get_status,
fun
(_, bad_connector_state) -> connecting;
(_, _) -> connected
(_, bad_connector_state) ->
connecting;
(_, worst_connector_state) ->
{?status_disconnected, worst_connector_state, [
#{
host => <<"nope:9092">>,
reason => unresolvable_hostname
}
]};
(_, _) ->
connected
end
),
meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}),
@ -891,6 +864,51 @@ t_raw_config_response_defaults(Config) ->
),
ok.
t_inconsistent_state(Config) ->
[_, Node2] = ?config(cluster_nodes, Config),
Params = ?KAFKA_CONNECTOR(?CONNECTOR_NAME),
?assertMatch(
{ok, 201, #{<<"enable">> := true, <<"resource_opts">> := #{}}},
request_json(
post,
uri(["connectors"]),
Params,
Config
)
),
BadParams = maps:without(
[<<"name">>, <<"type">>],
Params#{<<"bootstrap_hosts">> := <<"nope:9092">>}
),
{ok, _} = erpc:call(
Node2,
emqx,
update_config,
[[connectors, ?CONNECTOR_TYPE, ?CONNECTOR_NAME], BadParams, #{}]
),
ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, ?CONNECTOR_NAME),
?assertMatch(
{ok, 200, #{
<<"status">> := <<"inconsistent">>,
<<"node_status">> := [
#{<<"status">> := <<"connected">>},
#{
<<"status">> := <<"disconnected">>,
<<"status_reason">> := _
}
],
<<"status_reason">> := _
}},
request_json(
get,
uri(["connectors", ConnectorID]),
Config
)
),
ok.
%%% helpers
listen_on_random_port() ->
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],