fix(kinesis): return error message on access denied

Fixes https://emqx.atlassian.net/browse/EMQX-10764
This commit is contained in:
Paulo Zulato 2023-08-14 18:11:01 -03:00
parent a6e277c77a
commit a64386ef82
5 changed files with 67 additions and 7 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_kinesis, [
{description, "EMQX Enterprise Amazon Kinesis Bridge"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [
kernel,

View File

@ -111,7 +111,14 @@ init(#{
erlcloud_config:configure(
to_str(AwsAccessKey), to_str(AwsSecretAccessKey), Host, Port, Scheme, New
),
{ok, State}.
% check the connection
case erlcloud_kinesis:list_streams() of
{ok, _} ->
{ok, State};
{error, Reason} ->
?tp(kinesis_init_failed, #{instance_id => InstanceId, reason => Reason}),
{stop, Reason}
end.
handle_call(connection_status, _From, #{stream_name := StreamName} = State) ->
Status =

View File

@ -114,7 +114,12 @@ on_get_status(_InstanceId, #{pool_name := Pool} = State) ->
false -> disconnected
end
end;
{error, _} ->
{error, Reason} ->
?SLOG(error, #{
msg => "kinesis_producer_get_status_failed",
state => State,
reason => Reason
}),
disconnected
end.

View File

@ -796,7 +796,9 @@ t_publish_connection_down(Config0) ->
ok.
t_wrong_server(Config) ->
TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(kinesis_name, Config),
KinesisConfig0 = ?config(kinesis_config, Config),
ResourceId = ?config(resource_id, Config),
Overrides =
#{
@ -806,12 +808,57 @@ t_wrong_server(Config) ->
<<"health_check_interval">> => <<"60s">>
}
},
% probe
KinesisConfig = emqx_utils_maps:deep_merge(KinesisConfig0, Overrides),
Params = KinesisConfig#{<<"type">> => TypeBin, <<"name">> => Name},
ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params)
),
% create
?wait_async_action(
create_bridge(Config, Overrides),
#{?snk_kind := emqx_bridge_kinesis_impl_producer_start_ok},
#{?snk_kind := start_pool_failed},
30_000
),
?assertEqual({error, timeout}, emqx_resource_manager:health_check(ResourceId)),
emqx_bridge_resource:stop(?BRIDGE_TYPE, Name),
emqx_bridge_resource:remove(?BRIDGE_TYPE, Name),
?assertMatch(
{ok, _, #{error := {start_pool_failed, ResourceId, _}}},
emqx_resource_manager:lookup_cached(ResourceId)
),
ok.
t_access_denied(Config) ->
TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(kinesis_name, Config),
KinesisConfig = ?config(kinesis_config, Config),
ResourceId = ?config(resource_id, Config),
AccessError = {<<"AccessDeniedException">>, <<>>},
Params = KinesisConfig#{<<"type">> => TypeBin, <<"name">> => Name},
ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
emqx_common_test_helpers:with_mock(
erlcloud_kinesis,
list_streams,
fun() -> {error, AccessError} end,
fun() ->
% probe
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params)
),
% create
?wait_async_action(
create_bridge(Config),
#{?snk_kind := kinesis_init_failed},
30_000
),
?assertMatch(
{ok, _, #{error := {start_pool_failed, ResourceId, AccessError}}},
emqx_resource_manager:lookup_cached(ResourceId)
),
ok
end
),
ok.

View File

@ -0,0 +1 @@
Fixed error information when Kinesis bridge fails to connect to endpoint.