From 98e4ea6fde9bd9cbed41079a83de68f7df94e291 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Jun 2024 13:49:25 +0200 Subject: [PATCH 1/6] feat(bridge-s3): make validation errors more readable And also turn them into schema-level validations, instead of bridge-level error conditions. --- .../src/emqx_bridge_s3_connector.erl | 19 ++----- .../src/emqx_bridge_s3_upload.erl | 49 ++++++++++++------- .../emqx_bridge_s3_aggreg_upload_SUITE.erl | 21 ++++++++ apps/emqx_utils/src/emqx_utils.erl | 4 ++ 4 files changed, 60 insertions(+), 33 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 00c03fd3a..c9e23a934 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -162,13 +162,8 @@ on_get_status(_InstId, State = #{client_config := Config}) -> -spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) -> {ok, state()} | {error, _Reason}. on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) -> - try - ChannelState = start_channel(State, Config), - {ok, State#{channels => Channels#{ChannelId => ChannelState}}} - catch - throw:Reason -> - {error, Reason} - end. + ChannelState = start_channel(State, Config), + {ok, State#{channels => Channels#{ChannelId => ChannelState}}}. -spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) -> {ok, state()}. @@ -217,7 +212,8 @@ start_channel(State, #{ max_records := MaxRecords }, container := Container, - bucket := Bucket + bucket := Bucket, + key := Key } }) -> AggregId = {Type, Name}, @@ -226,7 +222,7 @@ start_channel(State, #{ max_records => MaxRecords, work_dir => work_dir(Type, Name) }, - Template = ensure_ok(emqx_bridge_s3_upload:mk_key_template(Parameters)), + Template = emqx_bridge_s3_upload:mk_key_template(Key), DeliveryOpts = #{ bucket => Bucket, key => Template, @@ -253,11 +249,6 @@ start_channel(State, #{ on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end }. -ensure_ok({ok, V}) -> - V; -ensure_ok({error, Reason}) -> - throw(Reason). - upload_options(Parameters) -> #{acl => maps:get(acl, Parameters, undefined)}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl index 2bf12f24b..bedefc7c5 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl @@ -29,7 +29,10 @@ ]). %% Internal exports --export([convert_actions/2]). +-export([ + convert_actions/2, + validate_key_template/1 +]). -define(DEFAULT_AGGREG_BATCH_SIZE, 100). -define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>). @@ -137,7 +140,10 @@ fields(s3_aggregated_upload_parameters) -> )} ], emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [ - {key, #{desc => ?DESC(s3_aggregated_upload_key)}} + {key, #{ + desc => ?DESC(s3_aggregated_upload_key), + validator => fun ?MODULE:validate_key_template/1 + }} ]), emqx_s3_schema:fields(s3_uploader) ]); @@ -246,23 +252,13 @@ convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := Resou Conf#{<<"resource_opts">> := NResourceOpts} end. -%% Interpreting options - --spec mk_key_template(_Parameters :: map()) -> - {ok, emqx_template:str()} | {error, _Reason}. -mk_key_template(#{key := Key}) -> - Template = emqx_template:parse(Key), +validate_key_template(Conf) -> + Template = emqx_template:parse(Conf), case validate_bindings(emqx_template:placeholders(Template)) of - UsedBindings when is_list(UsedBindings) -> - SuffixTemplate = mk_suffix_template(UsedBindings), - case emqx_template:is_const(SuffixTemplate) of - true -> - {ok, Template}; - false -> - {ok, Template ++ SuffixTemplate} - end; - Error = {error, _} -> - Error + Bindings when is_list(Bindings) -> + ok; + {error, {disallowed_placeholders, Disallowed}} -> + {error, emqx_utils:format("Template placeholders are disallowed: ~p", [Disallowed])} end. validate_bindings(Bindings) -> @@ -276,7 +272,22 @@ validate_bindings(Bindings) -> [] -> Bindings; Disallowed -> - {error, {invalid_key_template, {disallowed_placeholders, Disallowed}}} + {error, {disallowed_placeholders, Disallowed}} + end. + +%% Interpreting options + +-spec mk_key_template(unicode:chardata()) -> + emqx_template:str(). +mk_key_template(Key) -> + Template = emqx_template:parse(Key), + UsedBindings = emqx_template:placeholders(Template), + SuffixTemplate = mk_suffix_template(UsedBindings), + case emqx_template:is_const(SuffixTemplate) of + true -> + Template; + false -> + Template ++ SuffixTemplate end. mk_suffix_template(UsedBindings) -> diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl index b7c17bbaa..345c2e9aa 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -177,6 +177,27 @@ t_create_invalid_config(Config) -> ) ). +t_create_invalid_config_key_template(Config) -> + ?assertMatch( + {error, + {_Status, _, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := #{ + <<"kind">> := <<"validation_error">>, + <<"reason">> := <<"Template placeholders are disallowed:", _/bytes>>, + <<"path">> := <<"root.parameters.key">> + } + }}}, + emqx_bridge_v2_testlib:create_bridge_api( + Config, + _Overrides = #{ + <<"parameters">> => #{ + <<"key">> => <<"${action}/${foo}:${bar.rfc3339}">> + } + } + ) + ). + t_update_invalid_config(Config) -> ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch( diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 536b427b3..e4f0d91d1 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -65,6 +65,7 @@ flattermap/2, tcp_keepalive_opts/4, format/1, + format/2, call_first_defined/1, ntoa/1, foldl_while/3, @@ -565,6 +566,9 @@ tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) -> format(Term) -> iolist_to_binary(io_lib:format("~0p", [Term])). +format(Fmt, Args) -> + unicode:characters_to_binary(io_lib:format(Fmt, Args)). + -spec call_first_defined(list({module(), atom(), list()})) -> term() | no_return(). call_first_defined([{Module, Function, Args} | Rest]) -> try From f3ffbd47106b585c3b34b603c5f95fad44752928 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Jun 2024 14:45:51 +0200 Subject: [PATCH 2/6] feat(bridge-s3): provide more meaningful error details in status --- .../src/emqx_bridge_s3_connector.erl | 68 +++++++++++++++---- .../test/emqx_bridge_s3_SUITE.erl | 7 ++ 2 files changed, 62 insertions(+), 13 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index c9e23a934..204a84a65 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -146,16 +146,14 @@ on_stop(InstId, _State = #{pool_name := PoolName}) -> on_get_status(_InstId, State = #{client_config := Config}) -> case emqx_s3_client:aws_config(Config) of {error, Reason} -> - {?status_disconnected, State, Reason}; + {?status_disconnected, State, map_error_details(Reason)}; AWSConfig -> try erlcloud_s3:list_buckets(AWSConfig) of Props when is_list(Props) -> ?status_connected catch - error:{aws_error, {http_error, _Code, _, Reason}} -> - {?status_disconnected, State, Reason}; - error:{aws_error, {socket_error, Reason}} -> - {?status_disconnected, State, Reason} + error:Error -> + {?status_disconnected, State, map_error_details(Error)} end end. @@ -284,8 +282,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) -> catch error:{aws_error, {http_error, 404, _, _Reason}} -> throw({unhealthy_target, "Bucket does not exist"}); - error:{aws_error, {socket_error, Reason}} -> - throw({unhealthy_target, emqx_utils:format(Reason)}) + error:Error -> + throw({unhealthy_target, map_error_details(Error)}) end end. @@ -378,13 +376,31 @@ run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) -> {error, {unrecoverable_error, Reason}} end. -map_error({socket_error, _} = Reason) -> - {recoverable_error, Reason}; -map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 -> +map_error(Error) -> + {map_error_class(Error), map_error_details(Error)}. + +map_error_class({s3_error, _, _}) -> + unrecoverable_error; +map_error_class({aws_error, Error}) -> + map_error_class(Error); +map_error_class({socket_error, _}) -> + recoverable_error; +map_error_class({http_error, Status, _, _}) when Status >= 500 -> %% https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList - {recoverable_error, Reason}; -map_error(Reason) -> - {unrecoverable_error, Reason}. + recoverable_error; +map_error_class(_Error) -> + unrecoverable_error. + +map_error_details({s3_error, Code, Message}) -> + emqx_utils:format("S3 error: ~s ~s", [Code, Message]); +map_error_details({aws_error, Error}) -> + map_error_details(Error); +map_error_details({socket_error, Reason}) -> + emqx_utils:format("Socket error: ~s", [emqx_utils:readable_error_msg(Reason)]); +map_error_details({http_error, _, _, _} = Error) -> + emqx_utils:format("AWS error: ~s", [map_aws_error_details(Error)]); +map_error_details(Error) -> + Error. render_bucket(Template, Data) -> case emqx_template:render(Template, {emqx_jsonish, Data}) of @@ -407,6 +423,32 @@ render_content(Template, Data) -> iolist_to_string(IOList) -> unicode:characters_to_list(IOList). +%% + +-include_lib("xmerl/include/xmerl.hrl"). + +-spec map_aws_error_details(_AWSError) -> + unicode:chardata(). +map_aws_error_details({http_error, _Status, _, Body}) -> + try xmerl_scan:string(unicode:characters_to_list(Body), [{quiet, true}]) of + {Error = #xmlElement{name = 'Error'}, _} -> + map_aws_error_details(Error); + _ -> + Body + catch + exit:_ -> + Body + end; +map_aws_error_details(#xmlElement{content = Content}) -> + Code = extract_xml_text(lists:keyfind('Code', #xmlElement.name, Content)), + Message = extract_xml_text(lists:keyfind('Message', #xmlElement.name, Content)), + [Code, $:, $\s | Message]. + +extract_xml_text(#xmlElement{content = Content}) -> + [Fragment || #xmlText{value = Fragment} <- Content]; +extract_xml_text(false) -> + []. + %% `emqx_connector_aggreg_delivery` APIs -spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t(). diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl index f8eaa1b3a..4771f9d04 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl @@ -159,6 +159,13 @@ t_start_broken_update_restart(Config) -> _Attempts = 20, ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId)) ), + ?assertMatch( + {ok, + {{_HTTP, 200, _}, _, #{ + <<"status_reason">> := <<"AWS error: SignatureDoesNotMatch:", _/bytes>> + }}}, + emqx_bridge_v2_testlib:get_connector_api(Type, Name) + ), ?assertMatch( {ok, {{_HTTP, 200, _}, _, _}}, emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConf) From fb9afd8313915fab78ce89a89889de98d888006e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Jun 2024 14:56:57 +0200 Subject: [PATCH 3/6] feat(bridge-s3): beautify posix write errors --- apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 204a84a65..60e2c9b87 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -373,7 +373,7 @@ run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) -> ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}), ok; {error, Reason} -> - {error, {unrecoverable_error, Reason}} + {error, {unrecoverable_error, emqx_utils:explain_posix(Reason)}} end. map_error(Error) -> From 486a041adfeba1a96d38fe1c5ceefbd78aca9662 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Jun 2024 15:15:06 +0200 Subject: [PATCH 4/6] feat(bridge-s3): also map credentials / aggreg upload errors --- .../src/emqx_bridge_s3_connector.erl | 9 ++++++--- .../emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 60e2c9b87..fdc6d255b 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -274,7 +274,7 @@ channel_status(#{mode := aggregated, aggreg_id := AggregId, bucket := Bucket}, S check_bucket_accessible(Bucket, #{client_config := Config}) -> case emqx_s3_client:aws_config(Config) of {error, Reason} -> - throw({unhealthy_target, Reason}); + throw({unhealthy_target, map_error_details(Reason)}); AWSConfig -> try erlcloud_s3:list_objects(Bucket, [{max_keys, 1}], AWSConfig) of Props when is_list(Props) -> @@ -293,8 +293,7 @@ check_aggreg_upload_errors(AggregId) -> %% TODO %% This approach means that, for example, 3 upload failures will cause %% the channel to be marked as unhealthy for 3 consecutive health checks. - ErrorMessage = emqx_utils:format(Error), - throw({unhealthy_target, ErrorMessage}); + throw({unhealthy_target, map_error_details(Error)}); [] -> ok end. @@ -399,6 +398,10 @@ map_error_details({socket_error, Reason}) -> emqx_utils:format("Socket error: ~s", [emqx_utils:readable_error_msg(Reason)]); map_error_details({http_error, _, _, _} = Error) -> emqx_utils:format("AWS error: ~s", [map_aws_error_details(Error)]); +map_error_details({failed_to_obtain_credentials, Error}) -> + emqx_utils:format("Unable to obtain AWS credentials: ~s", [map_error_details(Error)]); +map_error_details({upload_failed, Error}) -> + map_error_details(Error); map_error_details(Error) -> Error. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl index 4771f9d04..fa3205456 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl @@ -134,6 +134,22 @@ action_config(Name, ConnectorId) -> t_start_stop(Config) -> emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped). +t_create_unavailable_credentials(Config) -> + ConnectorName = ?config(connector_name, Config), + ConnectorType = ?config(connector_type, Config), + ConnectorConfig = maps:without( + [<<"access_key_id">>, <<"secret_access_key">>], + ?config(connector_config, Config) + ), + ?assertMatch( + {ok, + {{_HTTP, 201, _}, _, #{ + <<"status_reason">> := + <<"Unable to obtain AWS credentials: Socket error:", _/bytes>> + }}}, + emqx_bridge_v2_testlib:create_connector_api(ConnectorName, ConnectorType, ConnectorConfig) + ). + t_ignore_batch_opts(Config) -> {ok, {_Status, _, Bridge}} = emqx_bridge_v2_testlib:create_bridge_api(Config), ?assertMatch( From 0eedab3d7639f9caf9d905a409b2589a3498f2f1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Jun 2024 15:25:34 +0200 Subject: [PATCH 5/6] chore: add changelog entry --- changes/ee/breaking-13332.en.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changes/ee/breaking-13332.en.md diff --git a/changes/ee/breaking-13332.en.md b/changes/ee/breaking-13332.en.md new file mode 100644 index 000000000..0b5bf5896 --- /dev/null +++ b/changes/ee/breaking-13332.en.md @@ -0,0 +1,4 @@ +When an S3 Bridge is improperly configured, error messages now contain more informative and easy to read details. + +## Breaking changes +* S3 Bridge configuration with invalid aggregated upload key template will no longer work. Before this change, such configuration was considered valid but the bridge would never work anyway. From da214be5a13cbd4e009c8a79b7ca0c4fd948787f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Jun 2024 17:12:14 +0200 Subject: [PATCH 6/6] test(bridge-s3): adapt testcase to different CI environment --- apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl index fa3205456..ea69a346f 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl @@ -145,7 +145,7 @@ t_create_unavailable_credentials(Config) -> {ok, {{_HTTP, 201, _}, _, #{ <<"status_reason">> := - <<"Unable to obtain AWS credentials: Socket error:", _/bytes>> + <<"Unable to obtain AWS credentials:", _/bytes>> }}}, emqx_bridge_v2_testlib:create_connector_api(ConnectorName, ConnectorType, ConnectorConfig) ).