feat(bridge-s3): make validation errors more readable
And also turn them into schema-level validations, instead of bridge-level error conditions.
This commit is contained in:
parent
a2e8d49847
commit
98e4ea6fde
|
@ -162,13 +162,8 @@ on_get_status(_InstId, State = #{client_config := Config}) ->
|
|||
-spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) ->
|
||||
{ok, state()} | {error, _Reason}.
|
||||
on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) ->
|
||||
try
|
||||
ChannelState = start_channel(State, Config),
|
||||
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}
|
||||
catch
|
||||
throw:Reason ->
|
||||
{error, Reason}
|
||||
end.
|
||||
ChannelState = start_channel(State, Config),
|
||||
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}.
|
||||
|
||||
-spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) ->
|
||||
{ok, state()}.
|
||||
|
@ -217,7 +212,8 @@ start_channel(State, #{
|
|||
max_records := MaxRecords
|
||||
},
|
||||
container := Container,
|
||||
bucket := Bucket
|
||||
bucket := Bucket,
|
||||
key := Key
|
||||
}
|
||||
}) ->
|
||||
AggregId = {Type, Name},
|
||||
|
@ -226,7 +222,7 @@ start_channel(State, #{
|
|||
max_records => MaxRecords,
|
||||
work_dir => work_dir(Type, Name)
|
||||
},
|
||||
Template = ensure_ok(emqx_bridge_s3_upload:mk_key_template(Parameters)),
|
||||
Template = emqx_bridge_s3_upload:mk_key_template(Key),
|
||||
DeliveryOpts = #{
|
||||
bucket => Bucket,
|
||||
key => Template,
|
||||
|
@ -253,11 +249,6 @@ start_channel(State, #{
|
|||
on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end
|
||||
}.
|
||||
|
||||
ensure_ok({ok, V}) ->
|
||||
V;
|
||||
ensure_ok({error, Reason}) ->
|
||||
throw(Reason).
|
||||
|
||||
upload_options(Parameters) ->
|
||||
#{acl => maps:get(acl, Parameters, undefined)}.
|
||||
|
||||
|
|
|
@ -29,7 +29,10 @@
|
|||
]).
|
||||
|
||||
%% Internal exports
|
||||
-export([convert_actions/2]).
|
||||
-export([
|
||||
convert_actions/2,
|
||||
validate_key_template/1
|
||||
]).
|
||||
|
||||
-define(DEFAULT_AGGREG_BATCH_SIZE, 100).
|
||||
-define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>).
|
||||
|
@ -137,7 +140,10 @@ fields(s3_aggregated_upload_parameters) ->
|
|||
)}
|
||||
],
|
||||
emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [
|
||||
{key, #{desc => ?DESC(s3_aggregated_upload_key)}}
|
||||
{key, #{
|
||||
desc => ?DESC(s3_aggregated_upload_key),
|
||||
validator => fun ?MODULE:validate_key_template/1
|
||||
}}
|
||||
]),
|
||||
emqx_s3_schema:fields(s3_uploader)
|
||||
]);
|
||||
|
@ -246,23 +252,13 @@ convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := Resou
|
|||
Conf#{<<"resource_opts">> := NResourceOpts}
|
||||
end.
|
||||
|
||||
%% Interpreting options
|
||||
|
||||
-spec mk_key_template(_Parameters :: map()) ->
|
||||
{ok, emqx_template:str()} | {error, _Reason}.
|
||||
mk_key_template(#{key := Key}) ->
|
||||
Template = emqx_template:parse(Key),
|
||||
validate_key_template(Conf) ->
|
||||
Template = emqx_template:parse(Conf),
|
||||
case validate_bindings(emqx_template:placeholders(Template)) of
|
||||
UsedBindings when is_list(UsedBindings) ->
|
||||
SuffixTemplate = mk_suffix_template(UsedBindings),
|
||||
case emqx_template:is_const(SuffixTemplate) of
|
||||
true ->
|
||||
{ok, Template};
|
||||
false ->
|
||||
{ok, Template ++ SuffixTemplate}
|
||||
end;
|
||||
Error = {error, _} ->
|
||||
Error
|
||||
Bindings when is_list(Bindings) ->
|
||||
ok;
|
||||
{error, {disallowed_placeholders, Disallowed}} ->
|
||||
{error, emqx_utils:format("Template placeholders are disallowed: ~p", [Disallowed])}
|
||||
end.
|
||||
|
||||
validate_bindings(Bindings) ->
|
||||
|
@ -276,7 +272,22 @@ validate_bindings(Bindings) ->
|
|||
[] ->
|
||||
Bindings;
|
||||
Disallowed ->
|
||||
{error, {invalid_key_template, {disallowed_placeholders, Disallowed}}}
|
||||
{error, {disallowed_placeholders, Disallowed}}
|
||||
end.
|
||||
|
||||
%% Interpreting options
|
||||
|
||||
-spec mk_key_template(unicode:chardata()) ->
|
||||
emqx_template:str().
|
||||
mk_key_template(Key) ->
|
||||
Template = emqx_template:parse(Key),
|
||||
UsedBindings = emqx_template:placeholders(Template),
|
||||
SuffixTemplate = mk_suffix_template(UsedBindings),
|
||||
case emqx_template:is_const(SuffixTemplate) of
|
||||
true ->
|
||||
Template;
|
||||
false ->
|
||||
Template ++ SuffixTemplate
|
||||
end.
|
||||
|
||||
mk_suffix_template(UsedBindings) ->
|
||||
|
|
|
@ -177,6 +177,27 @@ t_create_invalid_config(Config) ->
|
|||
)
|
||||
).
|
||||
|
||||
t_create_invalid_config_key_template(Config) ->
|
||||
?assertMatch(
|
||||
{error,
|
||||
{_Status, _, #{
|
||||
<<"code">> := <<"BAD_REQUEST">>,
|
||||
<<"message">> := #{
|
||||
<<"kind">> := <<"validation_error">>,
|
||||
<<"reason">> := <<"Template placeholders are disallowed:", _/bytes>>,
|
||||
<<"path">> := <<"root.parameters.key">>
|
||||
}
|
||||
}}},
|
||||
emqx_bridge_v2_testlib:create_bridge_api(
|
||||
Config,
|
||||
_Overrides = #{
|
||||
<<"parameters">> => #{
|
||||
<<"key">> => <<"${action}/${foo}:${bar.rfc3339}">>
|
||||
}
|
||||
}
|
||||
)
|
||||
).
|
||||
|
||||
t_update_invalid_config(Config) ->
|
||||
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
||||
?assertMatch(
|
||||
|
|
|
@ -65,6 +65,7 @@
|
|||
flattermap/2,
|
||||
tcp_keepalive_opts/4,
|
||||
format/1,
|
||||
format/2,
|
||||
call_first_defined/1,
|
||||
ntoa/1,
|
||||
foldl_while/3,
|
||||
|
@ -565,6 +566,9 @@ tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) ->
|
|||
format(Term) ->
|
||||
iolist_to_binary(io_lib:format("~0p", [Term])).
|
||||
|
||||
format(Fmt, Args) ->
|
||||
unicode:characters_to_binary(io_lib:format(Fmt, Args)).
|
||||
|
||||
-spec call_first_defined(list({module(), atom(), list()})) -> term() | no_return().
|
||||
call_first_defined([{Module, Function, Args} | Rest]) ->
|
||||
try
|
||||
|
|
Loading…
Reference in New Issue