fix(bridge-s3): validate aggreg key template before adding a channel

This commit is contained in:
Andrew Mayorov 2024-06-11 11:46:34 +02:00
parent fb0da9848c
commit 29fc30ea69
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 41 additions and 12 deletions

View File

@ -162,8 +162,13 @@ on_get_status(_InstId, State = #{client_config := Config}) ->
-spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) -> -spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) ->
{ok, state()} | {error, _Reason}. {ok, state()} | {error, _Reason}.
on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) -> on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) ->
ChannelState = start_channel(State, Config), try
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}. ChannelState = start_channel(State, Config),
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}
catch
throw:Reason ->
{error, Reason}
end.
-spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) -> -spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) ->
{ok, state()}. {ok, state()}.
@ -221,9 +226,10 @@ start_channel(State, #{
max_records => MaxRecords, max_records => MaxRecords,
work_dir => work_dir(Type, Name) work_dir => work_dir(Type, Name)
}, },
Template = ensure_ok(emqx_bridge_s3_upload:mk_key_template(Parameters)),
DeliveryOpts = #{ DeliveryOpts = #{
bucket => Bucket, bucket => Bucket,
key => emqx_bridge_s3_upload:mk_key_template(Parameters), key => Template,
container => Container, container => Container,
upload_options => emqx_bridge_s3_upload:mk_upload_options(Parameters), upload_options => emqx_bridge_s3_upload:mk_upload_options(Parameters),
callback_module => ?MODULE, callback_module => ?MODULE,
@ -247,6 +253,11 @@ start_channel(State, #{
on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end
}. }.
ensure_ok({ok, V}) ->
V;
ensure_ok({error, Reason}) ->
throw(Reason).
upload_options(Parameters) -> upload_options(Parameters) ->
#{acl => maps:get(acl, Parameters, undefined)}. #{acl => maps:get(acl, Parameters, undefined)}.

View File

@ -248,17 +248,35 @@ convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := Resou
%% Interpreting options %% Interpreting options
-spec mk_key_template(_Parameters :: map()) -> emqx_template:str(). -spec mk_key_template(_Parameters :: map()) ->
{ok, emqx_template:str()} | {error, _Reason}.
mk_key_template(#{key := Key}) -> mk_key_template(#{key := Key}) ->
Template = emqx_template:parse(Key), Template = emqx_template:parse(Key),
{_, BindingErrors} = emqx_template:render(Template, #{}), case validate_bindings(emqx_template:placeholders(Template)) of
{UsedBindings, _} = lists:unzip(BindingErrors), UsedBindings when is_list(UsedBindings) ->
SuffixTemplate = mk_suffix_template(UsedBindings), SuffixTemplate = mk_suffix_template(UsedBindings),
case emqx_template:is_const(SuffixTemplate) of case emqx_template:is_const(SuffixTemplate) of
true -> true ->
Template; {ok, Template};
false -> false ->
Template ++ SuffixTemplate {ok, Template ++ SuffixTemplate}
end;
Error = {error, _} ->
Error
end.
validate_bindings(Bindings) ->
Formats = ["rfc3339", "rfc3339utc", "unix"],
AllowedBindings = lists:append([
["action", "node", "sequence"],
["datetime." ++ F || F <- Formats],
["datetime_until." ++ F || F <- Formats]
]),
case Bindings -- AllowedBindings of
[] ->
Bindings;
Disallowed ->
{error, {invalid_key_template, {disallowed_placeholders, Disallowed}}}
end. end.
mk_suffix_template(UsedBindings) -> mk_suffix_template(UsedBindings) ->