Merge pull request #12557 from keynslug/fix/s3-bridge-startup
fix(s3-bridge): anticipate repeated connector start requests
This commit is contained in:
commit
7feb444e31
|
@ -320,8 +320,7 @@ create_kind_api(Config, Overrides) ->
|
||||||
PathRoot = api_path_root(Kind),
|
PathRoot = api_path_root(Kind),
|
||||||
Path = emqx_mgmt_api_test_util:api_path([PathRoot]),
|
Path = emqx_mgmt_api_test_util:api_path([PathRoot]),
|
||||||
ct:pal("creating bridge (~s, http):\n ~p", [Kind, Params]),
|
ct:pal("creating bridge (~s, http):\n ~p", [Kind, Params]),
|
||||||
Method = post,
|
Res = request(post, Path, Params),
|
||||||
Res = request(Method, Path, Params),
|
|
||||||
ct:pal("bridge create (~s, http) result:\n ~p", [Kind, Res]),
|
ct:pal("bridge create (~s, http) result:\n ~p", [Kind, Res]),
|
||||||
Res.
|
Res.
|
||||||
|
|
||||||
|
@ -332,15 +331,33 @@ create_connector_api(Config, Overrides) ->
|
||||||
ConnectorConfig0 = ?config(connector_config, Config),
|
ConnectorConfig0 = ?config(connector_config, Config),
|
||||||
ConnectorName = ?config(connector_name, Config),
|
ConnectorName = ?config(connector_name, Config),
|
||||||
ConnectorType = ?config(connector_type, Config),
|
ConnectorType = ?config(connector_type, Config),
|
||||||
Method = post,
|
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
|
|
||||||
ConnectorConfig = emqx_utils_maps:deep_merge(ConnectorConfig0, Overrides),
|
ConnectorConfig = emqx_utils_maps:deep_merge(ConnectorConfig0, Overrides),
|
||||||
|
create_connector_api(ConnectorName, ConnectorType, ConnectorConfig).
|
||||||
|
|
||||||
|
create_connector_api(ConnectorName, ConnectorType, ConnectorConfig) ->
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
|
||||||
Params = ConnectorConfig#{<<"type">> => ConnectorType, <<"name">> => ConnectorName},
|
Params = ConnectorConfig#{<<"type">> => ConnectorType, <<"name">> => ConnectorName},
|
||||||
ct:pal("creating connector (http):\n ~p", [Params]),
|
ct:pal("creating connector (http):\n ~p", [Params]),
|
||||||
Res = request(Method, Path, Params),
|
Res = request(post, Path, Params),
|
||||||
ct:pal("connector create (http) result:\n ~p", [Res]),
|
ct:pal("connector create (http) result:\n ~p", [Res]),
|
||||||
Res.
|
Res.
|
||||||
|
|
||||||
|
update_connector_api(ConnectorName, ConnectorType, ConnectorConfig) ->
|
||||||
|
ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]),
|
||||||
|
ct:pal("updating connector ~s (http):\n ~p", [ConnectorId, ConnectorConfig]),
|
||||||
|
Res = request(put, Path, ConnectorConfig),
|
||||||
|
ct:pal("connector update (http) result:\n ~p", [Res]),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
start_connector_api(ConnectorName, ConnectorType) ->
|
||||||
|
ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId, "start"]),
|
||||||
|
ct:pal("starting connector ~s (http)", [ConnectorId]),
|
||||||
|
Res = request(post, Path, #{}),
|
||||||
|
ct:pal("connector update (http) result:\n ~p", [Res]),
|
||||||
|
Res.
|
||||||
|
|
||||||
create_action_api(Config) ->
|
create_action_api(Config) ->
|
||||||
create_action_api(Config, _Overrides = #{}).
|
create_action_api(Config, _Overrides = #{}).
|
||||||
|
|
||||||
|
|
|
@ -71,6 +71,7 @@ on_start(InstId, Config) ->
|
||||||
channels => #{}
|
channels => #{}
|
||||||
},
|
},
|
||||||
HttpConfig = emqx_s3_profile_conf:http_config(Config),
|
HttpConfig = emqx_s3_profile_conf:http_config(Config),
|
||||||
|
_ = ehttpc_sup:stop_pool(PoolName),
|
||||||
case ehttpc_sup:start_pool(PoolName, HttpConfig) of
|
case ehttpc_sup:start_pool(PoolName, HttpConfig) of
|
||||||
{ok, Pid} ->
|
{ok, Pid} ->
|
||||||
?SLOG(info, #{msg => "s3_connector_start_http_pool_success", pool_name => PoolName}),
|
?SLOG(info, #{msg => "s3_connector_start_http_pool_success", pool_name => PoolName}),
|
||||||
|
|
|
@ -82,7 +82,7 @@ connector_config(Name, _Config) ->
|
||||||
parse_and_check_config(<<"connectors">>, ?CONNECTOR_TYPE, Name, #{
|
parse_and_check_config(<<"connectors">>, ?CONNECTOR_TYPE, Name, #{
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"description">> => <<"S3 Connector">>,
|
<<"description">> => <<"S3 Connector">>,
|
||||||
<<"host">> => maps:get(<<"host">>, BaseConf),
|
<<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)),
|
||||||
<<"port">> => maps:get(<<"port">>, BaseConf),
|
<<"port">> => maps:get(<<"port">>, BaseConf),
|
||||||
<<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf),
|
<<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf),
|
||||||
<<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf),
|
<<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf),
|
||||||
|
@ -144,6 +144,38 @@ parse_and_check_config(Root, Type, Name, ConfigIn) ->
|
||||||
t_start_stop(Config) ->
|
t_start_stop(Config) ->
|
||||||
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
|
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
|
||||||
|
|
||||||
|
t_start_broken_update_restart(Config) ->
|
||||||
|
Name = ?config(connector_name, Config),
|
||||||
|
Type = ?config(connector_type, Config),
|
||||||
|
ConnectorConf = ?config(connector_config, Config),
|
||||||
|
ConnectorConfBroken = maps:merge(
|
||||||
|
ConnectorConf,
|
||||||
|
#{<<"secret_access_key">> => <<"imnotanadmin">>}
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, {{_HTTP, 201, _}, _, _}},
|
||||||
|
emqx_bridge_v2_testlib:create_connector_api(Name, Type, ConnectorConfBroken)
|
||||||
|
),
|
||||||
|
ConnectorId = emqx_connector_resource:resource_id(Type, Name),
|
||||||
|
?retry(
|
||||||
|
_Sleep = 1_000,
|
||||||
|
_Attempts = 20,
|
||||||
|
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId))
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, {{_HTTP, 200, _}, _, _}},
|
||||||
|
emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConf)
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, {{_HTTP, 204, _}, _, _}},
|
||||||
|
emqx_bridge_v2_testlib:start_connector_api(Name, Type)
|
||||||
|
),
|
||||||
|
?retry(
|
||||||
|
1_000,
|
||||||
|
20,
|
||||||
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ConnectorId))
|
||||||
|
).
|
||||||
|
|
||||||
t_create_via_http(Config) ->
|
t_create_via_http(Config) ->
|
||||||
emqx_bridge_v2_testlib:t_create_via_http(Config).
|
emqx_bridge_v2_testlib:t_create_via_http(Config).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue