From 29fc30ea69041ad0e0f726d23cde52a1ce259dbe Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 11 Jun 2024 11:46:34 +0200 Subject: [PATCH] fix(bridge-s3): validate aggreg key template before adding a channel --- .../src/emqx_bridge_s3_connector.erl | 17 +++++++-- .../src/emqx_bridge_s3_upload.erl | 36 ++++++++++++++----- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index bc9f37935..00c03fd3a 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -162,8 +162,13 @@ on_get_status(_InstId, State = #{client_config := Config}) -> -spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) -> {ok, state()} | {error, _Reason}. on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) -> - ChannelState = start_channel(State, Config), - {ok, State#{channels => Channels#{ChannelId => ChannelState}}}. + try + ChannelState = start_channel(State, Config), + {ok, State#{channels => Channels#{ChannelId => ChannelState}}} + catch + throw:Reason -> + {error, Reason} + end. -spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) -> {ok, state()}. @@ -221,9 +226,10 @@ start_channel(State, #{ max_records => MaxRecords, work_dir => work_dir(Type, Name) }, + Template = ensure_ok(emqx_bridge_s3_upload:mk_key_template(Parameters)), DeliveryOpts = #{ bucket => Bucket, - key => emqx_bridge_s3_upload:mk_key_template(Parameters), + key => Template, container => Container, upload_options => emqx_bridge_s3_upload:mk_upload_options(Parameters), callback_module => ?MODULE, @@ -247,6 +253,11 @@ start_channel(State, #{ on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end }. +ensure_ok({ok, V}) -> + V; +ensure_ok({error, Reason}) -> + throw(Reason). + upload_options(Parameters) -> #{acl => maps:get(acl, Parameters, undefined)}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl index 6c5ee5d0e..2bf12f24b 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl @@ -248,17 +248,35 @@ convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := Resou %% Interpreting options --spec mk_key_template(_Parameters :: map()) -> emqx_template:str(). +-spec mk_key_template(_Parameters :: map()) -> + {ok, emqx_template:str()} | {error, _Reason}. mk_key_template(#{key := Key}) -> Template = emqx_template:parse(Key), - {_, BindingErrors} = emqx_template:render(Template, #{}), - {UsedBindings, _} = lists:unzip(BindingErrors), - SuffixTemplate = mk_suffix_template(UsedBindings), - case emqx_template:is_const(SuffixTemplate) of - true -> - Template; - false -> - Template ++ SuffixTemplate + case validate_bindings(emqx_template:placeholders(Template)) of + UsedBindings when is_list(UsedBindings) -> + SuffixTemplate = mk_suffix_template(UsedBindings), + case emqx_template:is_const(SuffixTemplate) of + true -> + {ok, Template}; + false -> + {ok, Template ++ SuffixTemplate} + end; + Error = {error, _} -> + Error + end. + +validate_bindings(Bindings) -> + Formats = ["rfc3339", "rfc3339utc", "unix"], + AllowedBindings = lists:append([ + ["action", "node", "sequence"], + ["datetime." ++ F || F <- Formats], + ["datetime_until." ++ F || F <- Formats] + ]), + case Bindings -- AllowedBindings of + [] -> + Bindings; + Disallowed -> + {error, {invalid_key_template, {disallowed_placeholders, Disallowed}}} end. mk_suffix_template(UsedBindings) ->