fix(s3-bridge): anticipate repeated connector start requests

Starting disconnected connector manually implies that there's no
attempt to stop it first.
This commit is contained in:
Andrew Mayorov 2024-02-21 13:52:55 +01:00
parent 529211b9ac
commit 6b97983303
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 56 additions and 6 deletions

View File

@ -283,8 +283,7 @@ create_kind_api(Config, Overrides) ->
PathRoot = api_path_root(Kind),
Path = emqx_mgmt_api_test_util:api_path([PathRoot]),
ct:pal("creating bridge (~s, http):\n ~p", [Kind, Params]),
Method = post,
Res = request(Method, Path, Params),
Res = request(post, Path, Params),
ct:pal("bridge create (~s, http) result:\n ~p", [Kind, Res]),
Res.
@ -295,15 +294,33 @@ create_connector_api(Config, Overrides) ->
ConnectorConfig0 = ?config(connector_config, Config),
ConnectorName = ?config(connector_name, Config),
ConnectorType = ?config(connector_type, Config),
Method = post,
Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
ConnectorConfig = emqx_utils_maps:deep_merge(ConnectorConfig0, Overrides),
create_connector_api(ConnectorName, ConnectorType, ConnectorConfig).
create_connector_api(ConnectorName, ConnectorType, ConnectorConfig) ->
Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
Params = ConnectorConfig#{<<"type">> => ConnectorType, <<"name">> => ConnectorName},
ct:pal("creating connector (http):\n ~p", [Params]),
Res = request(Method, Path, Params),
Res = request(post, Path, Params),
ct:pal("connector create (http) result:\n ~p", [Res]),
Res.
update_connector_api(ConnectorName, ConnectorType, ConnectorConfig) ->
ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]),
ct:pal("updating connector ~s (http):\n ~p", [ConnectorId, ConnectorConfig]),
Res = request(put, Path, ConnectorConfig),
ct:pal("connector update (http) result:\n ~p", [Res]),
Res.
start_connector_api(ConnectorName, ConnectorType) ->
ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId, "start"]),
ct:pal("starting connector ~s (http)", [ConnectorId]),
Res = request(post, Path, #{}),
ct:pal("connector update (http) result:\n ~p", [Res]),
Res.
create_action_api(Config) ->
create_action_api(Config, _Overrides = #{}).

View File

@ -71,6 +71,7 @@ on_start(InstId, Config) ->
channels => #{}
},
HttpConfig = emqx_s3_profile_conf:http_config(Config),
_ = ehttpc_sup:stop_pool(PoolName),
case ehttpc_sup:start_pool(PoolName, HttpConfig) of
{ok, Pid} ->
?SLOG(info, #{msg => "s3_connector_start_http_pool_success", pool_name => PoolName}),

View File

@ -82,7 +82,7 @@ connector_config(Name, _Config) ->
parse_and_check_config(<<"connectors">>, ?CONNECTOR_TYPE, Name, #{
<<"enable">> => true,
<<"description">> => <<"S3 Connector">>,
<<"host">> => maps:get(<<"host">>, BaseConf),
<<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)),
<<"port">> => maps:get(<<"port">>, BaseConf),
<<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf),
<<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf),
@ -144,6 +144,38 @@ parse_and_check_config(Root, Type, Name, ConfigIn) ->
t_start_stop(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
t_start_broken_update_restart(Config) ->
Name = ?config(connector_name, Config),
Type = ?config(connector_type, Config),
ConnectorConf = ?config(connector_config, Config),
ConnectorConfBroken = maps:merge(
ConnectorConf,
#{<<"secret_access_key">> => <<"imnotanadmin">>}
),
?assertMatch(
{ok, {{_HTTP, 201, _}, _, _}},
emqx_bridge_v2_testlib:create_connector_api(Name, Type, ConnectorConfBroken)
),
ConnectorId = emqx_connector_resource:resource_id(Type, Name),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId))
),
?assertMatch(
{ok, {{_HTTP, 200, _}, _, _}},
emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConf)
),
?assertMatch(
{ok, {{_HTTP, 204, _}, _, _}},
emqx_bridge_v2_testlib:start_connector_api(Name, Type)
),
?retry(
1_000,
20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ConnectorId))
).
t_create_via_http(Config) ->
emqx_bridge_v2_testlib:t_create_via_http(Config).