Merge pull request #13332 from keynslug/ft/EMQX-12571/error-mapping
feat(bridge-s3): provide more meaningful error details in status
This commit is contained in:
commit
d8963c836e
|
@ -146,29 +146,22 @@ on_stop(InstId, _State = #{pool_name := PoolName}) ->
|
||||||
on_get_status(_InstId, State = #{client_config := Config}) ->
|
on_get_status(_InstId, State = #{client_config := Config}) ->
|
||||||
case emqx_s3_client:aws_config(Config) of
|
case emqx_s3_client:aws_config(Config) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{?status_disconnected, State, Reason};
|
{?status_disconnected, State, map_error_details(Reason)};
|
||||||
AWSConfig ->
|
AWSConfig ->
|
||||||
try erlcloud_s3:list_buckets(AWSConfig) of
|
try erlcloud_s3:list_buckets(AWSConfig) of
|
||||||
Props when is_list(Props) ->
|
Props when is_list(Props) ->
|
||||||
?status_connected
|
?status_connected
|
||||||
catch
|
catch
|
||||||
error:{aws_error, {http_error, _Code, _, Reason}} ->
|
error:Error ->
|
||||||
{?status_disconnected, State, Reason};
|
{?status_disconnected, State, map_error_details(Error)}
|
||||||
error:{aws_error, {socket_error, Reason}} ->
|
|
||||||
{?status_disconnected, State, Reason}
|
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) ->
|
-spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) ->
|
||||||
{ok, state()} | {error, _Reason}.
|
{ok, state()} | {error, _Reason}.
|
||||||
on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) ->
|
on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) ->
|
||||||
try
|
ChannelState = start_channel(State, Config),
|
||||||
ChannelState = start_channel(State, Config),
|
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}.
|
||||||
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}
|
|
||||||
catch
|
|
||||||
throw:Reason ->
|
|
||||||
{error, Reason}
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) ->
|
-spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) ->
|
||||||
{ok, state()}.
|
{ok, state()}.
|
||||||
|
@ -217,7 +210,8 @@ start_channel(State, #{
|
||||||
max_records := MaxRecords
|
max_records := MaxRecords
|
||||||
},
|
},
|
||||||
container := Container,
|
container := Container,
|
||||||
bucket := Bucket
|
bucket := Bucket,
|
||||||
|
key := Key
|
||||||
}
|
}
|
||||||
}) ->
|
}) ->
|
||||||
AggregId = {Type, Name},
|
AggregId = {Type, Name},
|
||||||
|
@ -226,7 +220,7 @@ start_channel(State, #{
|
||||||
max_records => MaxRecords,
|
max_records => MaxRecords,
|
||||||
work_dir => work_dir(Type, Name)
|
work_dir => work_dir(Type, Name)
|
||||||
},
|
},
|
||||||
Template = ensure_ok(emqx_bridge_s3_upload:mk_key_template(Parameters)),
|
Template = emqx_bridge_s3_upload:mk_key_template(Key),
|
||||||
DeliveryOpts = #{
|
DeliveryOpts = #{
|
||||||
bucket => Bucket,
|
bucket => Bucket,
|
||||||
key => Template,
|
key => Template,
|
||||||
|
@ -253,11 +247,6 @@ start_channel(State, #{
|
||||||
on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end
|
on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end
|
||||||
}.
|
}.
|
||||||
|
|
||||||
ensure_ok({ok, V}) ->
|
|
||||||
V;
|
|
||||||
ensure_ok({error, Reason}) ->
|
|
||||||
throw(Reason).
|
|
||||||
|
|
||||||
upload_options(Parameters) ->
|
upload_options(Parameters) ->
|
||||||
#{acl => maps:get(acl, Parameters, undefined)}.
|
#{acl => maps:get(acl, Parameters, undefined)}.
|
||||||
|
|
||||||
|
@ -285,7 +274,7 @@ channel_status(#{mode := aggregated, aggreg_id := AggregId, bucket := Bucket}, S
|
||||||
check_bucket_accessible(Bucket, #{client_config := Config}) ->
|
check_bucket_accessible(Bucket, #{client_config := Config}) ->
|
||||||
case emqx_s3_client:aws_config(Config) of
|
case emqx_s3_client:aws_config(Config) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
throw({unhealthy_target, Reason});
|
throw({unhealthy_target, map_error_details(Reason)});
|
||||||
AWSConfig ->
|
AWSConfig ->
|
||||||
try erlcloud_s3:list_objects(Bucket, [{max_keys, 1}], AWSConfig) of
|
try erlcloud_s3:list_objects(Bucket, [{max_keys, 1}], AWSConfig) of
|
||||||
Props when is_list(Props) ->
|
Props when is_list(Props) ->
|
||||||
|
@ -293,8 +282,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) ->
|
||||||
catch
|
catch
|
||||||
error:{aws_error, {http_error, 404, _, _Reason}} ->
|
error:{aws_error, {http_error, 404, _, _Reason}} ->
|
||||||
throw({unhealthy_target, "Bucket does not exist"});
|
throw({unhealthy_target, "Bucket does not exist"});
|
||||||
error:{aws_error, {socket_error, Reason}} ->
|
error:Error ->
|
||||||
throw({unhealthy_target, emqx_utils:format(Reason)})
|
throw({unhealthy_target, map_error_details(Error)})
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -304,8 +293,7 @@ check_aggreg_upload_errors(AggregId) ->
|
||||||
%% TODO
|
%% TODO
|
||||||
%% This approach means that, for example, 3 upload failures will cause
|
%% This approach means that, for example, 3 upload failures will cause
|
||||||
%% the channel to be marked as unhealthy for 3 consecutive health checks.
|
%% the channel to be marked as unhealthy for 3 consecutive health checks.
|
||||||
ErrorMessage = emqx_utils:format(Error),
|
throw({unhealthy_target, map_error_details(Error)});
|
||||||
throw({unhealthy_target, ErrorMessage});
|
|
||||||
[] ->
|
[] ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
@ -384,16 +372,38 @@ run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) ->
|
||||||
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}),
|
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}),
|
||||||
ok;
|
ok;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, {unrecoverable_error, Reason}}
|
{error, {unrecoverable_error, emqx_utils:explain_posix(Reason)}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
map_error({socket_error, _} = Reason) ->
|
map_error(Error) ->
|
||||||
{recoverable_error, Reason};
|
{map_error_class(Error), map_error_details(Error)}.
|
||||||
map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 ->
|
|
||||||
|
map_error_class({s3_error, _, _}) ->
|
||||||
|
unrecoverable_error;
|
||||||
|
map_error_class({aws_error, Error}) ->
|
||||||
|
map_error_class(Error);
|
||||||
|
map_error_class({socket_error, _}) ->
|
||||||
|
recoverable_error;
|
||||||
|
map_error_class({http_error, Status, _, _}) when Status >= 500 ->
|
||||||
%% https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
|
%% https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
|
||||||
{recoverable_error, Reason};
|
recoverable_error;
|
||||||
map_error(Reason) ->
|
map_error_class(_Error) ->
|
||||||
{unrecoverable_error, Reason}.
|
unrecoverable_error.
|
||||||
|
|
||||||
|
map_error_details({s3_error, Code, Message}) ->
|
||||||
|
emqx_utils:format("S3 error: ~s ~s", [Code, Message]);
|
||||||
|
map_error_details({aws_error, Error}) ->
|
||||||
|
map_error_details(Error);
|
||||||
|
map_error_details({socket_error, Reason}) ->
|
||||||
|
emqx_utils:format("Socket error: ~s", [emqx_utils:readable_error_msg(Reason)]);
|
||||||
|
map_error_details({http_error, _, _, _} = Error) ->
|
||||||
|
emqx_utils:format("AWS error: ~s", [map_aws_error_details(Error)]);
|
||||||
|
map_error_details({failed_to_obtain_credentials, Error}) ->
|
||||||
|
emqx_utils:format("Unable to obtain AWS credentials: ~s", [map_error_details(Error)]);
|
||||||
|
map_error_details({upload_failed, Error}) ->
|
||||||
|
map_error_details(Error);
|
||||||
|
map_error_details(Error) ->
|
||||||
|
Error.
|
||||||
|
|
||||||
render_bucket(Template, Data) ->
|
render_bucket(Template, Data) ->
|
||||||
case emqx_template:render(Template, {emqx_jsonish, Data}) of
|
case emqx_template:render(Template, {emqx_jsonish, Data}) of
|
||||||
|
@ -416,6 +426,32 @@ render_content(Template, Data) ->
|
||||||
iolist_to_string(IOList) ->
|
iolist_to_string(IOList) ->
|
||||||
unicode:characters_to_list(IOList).
|
unicode:characters_to_list(IOList).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
-include_lib("xmerl/include/xmerl.hrl").
|
||||||
|
|
||||||
|
-spec map_aws_error_details(_AWSError) ->
|
||||||
|
unicode:chardata().
|
||||||
|
map_aws_error_details({http_error, _Status, _, Body}) ->
|
||||||
|
try xmerl_scan:string(unicode:characters_to_list(Body), [{quiet, true}]) of
|
||||||
|
{Error = #xmlElement{name = 'Error'}, _} ->
|
||||||
|
map_aws_error_details(Error);
|
||||||
|
_ ->
|
||||||
|
Body
|
||||||
|
catch
|
||||||
|
exit:_ ->
|
||||||
|
Body
|
||||||
|
end;
|
||||||
|
map_aws_error_details(#xmlElement{content = Content}) ->
|
||||||
|
Code = extract_xml_text(lists:keyfind('Code', #xmlElement.name, Content)),
|
||||||
|
Message = extract_xml_text(lists:keyfind('Message', #xmlElement.name, Content)),
|
||||||
|
[Code, $:, $\s | Message].
|
||||||
|
|
||||||
|
extract_xml_text(#xmlElement{content = Content}) ->
|
||||||
|
[Fragment || #xmlText{value = Fragment} <- Content];
|
||||||
|
extract_xml_text(false) ->
|
||||||
|
[].
|
||||||
|
|
||||||
%% `emqx_connector_aggreg_delivery` APIs
|
%% `emqx_connector_aggreg_delivery` APIs
|
||||||
|
|
||||||
-spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t().
|
-spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t().
|
||||||
|
|
|
@ -29,7 +29,10 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Internal exports
|
%% Internal exports
|
||||||
-export([convert_actions/2]).
|
-export([
|
||||||
|
convert_actions/2,
|
||||||
|
validate_key_template/1
|
||||||
|
]).
|
||||||
|
|
||||||
-define(DEFAULT_AGGREG_BATCH_SIZE, 100).
|
-define(DEFAULT_AGGREG_BATCH_SIZE, 100).
|
||||||
-define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>).
|
-define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>).
|
||||||
|
@ -137,7 +140,10 @@ fields(s3_aggregated_upload_parameters) ->
|
||||||
)}
|
)}
|
||||||
],
|
],
|
||||||
emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [
|
emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [
|
||||||
{key, #{desc => ?DESC(s3_aggregated_upload_key)}}
|
{key, #{
|
||||||
|
desc => ?DESC(s3_aggregated_upload_key),
|
||||||
|
validator => fun ?MODULE:validate_key_template/1
|
||||||
|
}}
|
||||||
]),
|
]),
|
||||||
emqx_s3_schema:fields(s3_uploader)
|
emqx_s3_schema:fields(s3_uploader)
|
||||||
]);
|
]);
|
||||||
|
@ -246,23 +252,13 @@ convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := Resou
|
||||||
Conf#{<<"resource_opts">> := NResourceOpts}
|
Conf#{<<"resource_opts">> := NResourceOpts}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Interpreting options
|
validate_key_template(Conf) ->
|
||||||
|
Template = emqx_template:parse(Conf),
|
||||||
-spec mk_key_template(_Parameters :: map()) ->
|
|
||||||
{ok, emqx_template:str()} | {error, _Reason}.
|
|
||||||
mk_key_template(#{key := Key}) ->
|
|
||||||
Template = emqx_template:parse(Key),
|
|
||||||
case validate_bindings(emqx_template:placeholders(Template)) of
|
case validate_bindings(emqx_template:placeholders(Template)) of
|
||||||
UsedBindings when is_list(UsedBindings) ->
|
Bindings when is_list(Bindings) ->
|
||||||
SuffixTemplate = mk_suffix_template(UsedBindings),
|
ok;
|
||||||
case emqx_template:is_const(SuffixTemplate) of
|
{error, {disallowed_placeholders, Disallowed}} ->
|
||||||
true ->
|
{error, emqx_utils:format("Template placeholders are disallowed: ~p", [Disallowed])}
|
||||||
{ok, Template};
|
|
||||||
false ->
|
|
||||||
{ok, Template ++ SuffixTemplate}
|
|
||||||
end;
|
|
||||||
Error = {error, _} ->
|
|
||||||
Error
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
validate_bindings(Bindings) ->
|
validate_bindings(Bindings) ->
|
||||||
|
@ -276,7 +272,22 @@ validate_bindings(Bindings) ->
|
||||||
[] ->
|
[] ->
|
||||||
Bindings;
|
Bindings;
|
||||||
Disallowed ->
|
Disallowed ->
|
||||||
{error, {invalid_key_template, {disallowed_placeholders, Disallowed}}}
|
{error, {disallowed_placeholders, Disallowed}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% Interpreting options
|
||||||
|
|
||||||
|
-spec mk_key_template(unicode:chardata()) ->
|
||||||
|
emqx_template:str().
|
||||||
|
mk_key_template(Key) ->
|
||||||
|
Template = emqx_template:parse(Key),
|
||||||
|
UsedBindings = emqx_template:placeholders(Template),
|
||||||
|
SuffixTemplate = mk_suffix_template(UsedBindings),
|
||||||
|
case emqx_template:is_const(SuffixTemplate) of
|
||||||
|
true ->
|
||||||
|
Template;
|
||||||
|
false ->
|
||||||
|
Template ++ SuffixTemplate
|
||||||
end.
|
end.
|
||||||
|
|
||||||
mk_suffix_template(UsedBindings) ->
|
mk_suffix_template(UsedBindings) ->
|
||||||
|
|
|
@ -134,6 +134,22 @@ action_config(Name, ConnectorId) ->
|
||||||
t_start_stop(Config) ->
|
t_start_stop(Config) ->
|
||||||
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
|
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
|
||||||
|
|
||||||
|
t_create_unavailable_credentials(Config) ->
|
||||||
|
ConnectorName = ?config(connector_name, Config),
|
||||||
|
ConnectorType = ?config(connector_type, Config),
|
||||||
|
ConnectorConfig = maps:without(
|
||||||
|
[<<"access_key_id">>, <<"secret_access_key">>],
|
||||||
|
?config(connector_config, Config)
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok,
|
||||||
|
{{_HTTP, 201, _}, _, #{
|
||||||
|
<<"status_reason">> :=
|
||||||
|
<<"Unable to obtain AWS credentials:", _/bytes>>
|
||||||
|
}}},
|
||||||
|
emqx_bridge_v2_testlib:create_connector_api(ConnectorName, ConnectorType, ConnectorConfig)
|
||||||
|
).
|
||||||
|
|
||||||
t_ignore_batch_opts(Config) ->
|
t_ignore_batch_opts(Config) ->
|
||||||
{ok, {_Status, _, Bridge}} = emqx_bridge_v2_testlib:create_bridge_api(Config),
|
{ok, {_Status, _, Bridge}} = emqx_bridge_v2_testlib:create_bridge_api(Config),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
|
@ -159,6 +175,13 @@ t_start_broken_update_restart(Config) ->
|
||||||
_Attempts = 20,
|
_Attempts = 20,
|
||||||
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId))
|
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId))
|
||||||
),
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok,
|
||||||
|
{{_HTTP, 200, _}, _, #{
|
||||||
|
<<"status_reason">> := <<"AWS error: SignatureDoesNotMatch:", _/bytes>>
|
||||||
|
}}},
|
||||||
|
emqx_bridge_v2_testlib:get_connector_api(Type, Name)
|
||||||
|
),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, {{_HTTP, 200, _}, _, _}},
|
{ok, {{_HTTP, 200, _}, _, _}},
|
||||||
emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConf)
|
emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConf)
|
||||||
|
|
|
@ -177,6 +177,27 @@ t_create_invalid_config(Config) ->
|
||||||
)
|
)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
t_create_invalid_config_key_template(Config) ->
|
||||||
|
?assertMatch(
|
||||||
|
{error,
|
||||||
|
{_Status, _, #{
|
||||||
|
<<"code">> := <<"BAD_REQUEST">>,
|
||||||
|
<<"message">> := #{
|
||||||
|
<<"kind">> := <<"validation_error">>,
|
||||||
|
<<"reason">> := <<"Template placeholders are disallowed:", _/bytes>>,
|
||||||
|
<<"path">> := <<"root.parameters.key">>
|
||||||
|
}
|
||||||
|
}}},
|
||||||
|
emqx_bridge_v2_testlib:create_bridge_api(
|
||||||
|
Config,
|
||||||
|
_Overrides = #{
|
||||||
|
<<"parameters">> => #{
|
||||||
|
<<"key">> => <<"${action}/${foo}:${bar.rfc3339}">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
t_update_invalid_config(Config) ->
|
t_update_invalid_config(Config) ->
|
||||||
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
|
|
|
@ -65,6 +65,7 @@
|
||||||
flattermap/2,
|
flattermap/2,
|
||||||
tcp_keepalive_opts/4,
|
tcp_keepalive_opts/4,
|
||||||
format/1,
|
format/1,
|
||||||
|
format/2,
|
||||||
call_first_defined/1,
|
call_first_defined/1,
|
||||||
ntoa/1,
|
ntoa/1,
|
||||||
foldl_while/3,
|
foldl_while/3,
|
||||||
|
@ -565,6 +566,9 @@ tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) ->
|
||||||
format(Term) ->
|
format(Term) ->
|
||||||
iolist_to_binary(io_lib:format("~0p", [Term])).
|
iolist_to_binary(io_lib:format("~0p", [Term])).
|
||||||
|
|
||||||
|
format(Fmt, Args) ->
|
||||||
|
unicode:characters_to_binary(io_lib:format(Fmt, Args)).
|
||||||
|
|
||||||
-spec call_first_defined(list({module(), atom(), list()})) -> term() | no_return().
|
-spec call_first_defined(list({module(), atom(), list()})) -> term() | no_return().
|
||||||
call_first_defined([{Module, Function, Args} | Rest]) ->
|
call_first_defined([{Module, Function, Args} | Rest]) ->
|
||||||
try
|
try
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
When an S3 Bridge is improperly configured, error messages now contain more informative and easy to read details.
|
||||||
|
|
||||||
|
## Breaking changes
|
||||||
|
* S3 Bridge configuration with invalid aggregated upload key template will no longer work. Before this change, such configuration was considered valid but the bridge would never work anyway.
|
Loading…
Reference in New Issue