From a64386ef8201c6359a50545a9d30a45582283124 Mon Sep 17 00:00:00 2001 From: Paulo Zulato Date: Mon, 14 Aug 2023 18:11:01 -0300 Subject: [PATCH] fix(kinesis): return error message on access denied Fixes https://emqx.atlassian.net/browse/EMQX-10764 --- .../src/emqx_bridge_kinesis.app.src | 2 +- .../emqx_bridge_kinesis_connector_client.erl | 9 ++- .../src/emqx_bridge_kinesis_impl_producer.erl | 7 ++- ...mqx_bridge_kinesis_impl_producer_SUITE.erl | 55 +++++++++++++++++-- changes/ee/fix-11444.en.md | 1 + 5 files changed, 67 insertions(+), 7 deletions(-) create mode 100644 changes/ee/fix-11444.en.md diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src index 36f6c8b0b..3eb923b5d 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_kinesis, [ {description, "EMQX Enterprise Amazon Kinesis Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl index bb1000e5f..d9dc0220f 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl @@ -111,7 +111,14 @@ init(#{ erlcloud_config:configure( to_str(AwsAccessKey), to_str(AwsSecretAccessKey), Host, Port, Scheme, New ), - {ok, State}. + % check the connection + case erlcloud_kinesis:list_streams() of + {ok, _} -> + {ok, State}; + {error, Reason} -> + ?tp(kinesis_init_failed, #{instance_id => InstanceId, reason => Reason}), + {stop, Reason} + end. handle_call(connection_status, _From, #{stream_name := StreamName} = State) -> Status = diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl index 7948581b5..1e07ae96e 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl @@ -114,7 +114,12 @@ on_get_status(_InstanceId, #{pool_name := Pool} = State) -> false -> disconnected end end; - {error, _} -> + {error, Reason} -> + ?SLOG(error, #{ + msg => "kinesis_producer_get_status_failed", + state => State, + reason => Reason + }), disconnected end. diff --git a/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl b/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl index 114f324a9..d0fe4a1b4 100644 --- a/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl @@ -796,7 +796,9 @@ t_publish_connection_down(Config0) -> ok. t_wrong_server(Config) -> + TypeBin = ?BRIDGE_TYPE_BIN, Name = ?config(kinesis_name, Config), + KinesisConfig0 = ?config(kinesis_config, Config), ResourceId = ?config(resource_id, Config), Overrides = #{ @@ -806,12 +808,57 @@ t_wrong_server(Config) -> <<"health_check_interval">> => <<"60s">> } }, + % probe + KinesisConfig = emqx_utils_maps:deep_merge(KinesisConfig0, Overrides), + Params = KinesisConfig#{<<"type">> => TypeBin, <<"name">> => Name}, + ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params) + ), + % create ?wait_async_action( create_bridge(Config, Overrides), - #{?snk_kind := emqx_bridge_kinesis_impl_producer_start_ok}, + #{?snk_kind := start_pool_failed}, 30_000 ), - ?assertEqual({error, timeout}, emqx_resource_manager:health_check(ResourceId)), - emqx_bridge_resource:stop(?BRIDGE_TYPE, Name), - emqx_bridge_resource:remove(?BRIDGE_TYPE, Name), + ?assertMatch( + {ok, _, #{error := {start_pool_failed, ResourceId, _}}}, + emqx_resource_manager:lookup_cached(ResourceId) + ), + ok. + +t_access_denied(Config) -> + TypeBin = ?BRIDGE_TYPE_BIN, + Name = ?config(kinesis_name, Config), + KinesisConfig = ?config(kinesis_config, Config), + ResourceId = ?config(resource_id, Config), + AccessError = {<<"AccessDeniedException">>, <<>>}, + Params = KinesisConfig#{<<"type">> => TypeBin, <<"name">> => Name}, + ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + emqx_common_test_helpers:with_mock( + erlcloud_kinesis, + list_streams, + fun() -> {error, AccessError} end, + fun() -> + % probe + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params) + ), + % create + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := kinesis_init_failed}, + 30_000 + ), + ?assertMatch( + {ok, _, #{error := {start_pool_failed, ResourceId, AccessError}}}, + emqx_resource_manager:lookup_cached(ResourceId) + ), + ok + end + ), ok. diff --git a/changes/ee/fix-11444.en.md b/changes/ee/fix-11444.en.md new file mode 100644 index 000000000..c8e80946d --- /dev/null +++ b/changes/ee/fix-11444.en.md @@ -0,0 +1 @@ +Fixed error information when Kinesis bridge fails to connect to endpoint.