diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index bc9f37935..00c03fd3a 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -162,8 +162,13 @@ on_get_status(_InstId, State = #{client_config := Config}) -> -spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) -> {ok, state()} | {error, _Reason}. on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) -> - ChannelState = start_channel(State, Config), - {ok, State#{channels => Channels#{ChannelId => ChannelState}}}. + try + ChannelState = start_channel(State, Config), + {ok, State#{channels => Channels#{ChannelId => ChannelState}}} + catch + throw:Reason -> + {error, Reason} + end. -spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) -> {ok, state()}. @@ -221,9 +226,10 @@ start_channel(State, #{ max_records => MaxRecords, work_dir => work_dir(Type, Name) }, + Template = ensure_ok(emqx_bridge_s3_upload:mk_key_template(Parameters)), DeliveryOpts = #{ bucket => Bucket, - key => emqx_bridge_s3_upload:mk_key_template(Parameters), + key => Template, container => Container, upload_options => emqx_bridge_s3_upload:mk_upload_options(Parameters), callback_module => ?MODULE, @@ -247,6 +253,11 @@ start_channel(State, #{ on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end }. +ensure_ok({ok, V}) -> + V; +ensure_ok({error, Reason}) -> + throw(Reason). + upload_options(Parameters) -> #{acl => maps:get(acl, Parameters, undefined)}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl index 6c5ee5d0e..2bf12f24b 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl @@ -248,17 +248,35 @@ convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := Resou %% Interpreting options --spec mk_key_template(_Parameters :: map()) -> emqx_template:str(). +-spec mk_key_template(_Parameters :: map()) -> + {ok, emqx_template:str()} | {error, _Reason}. mk_key_template(#{key := Key}) -> Template = emqx_template:parse(Key), - {_, BindingErrors} = emqx_template:render(Template, #{}), - {UsedBindings, _} = lists:unzip(BindingErrors), - SuffixTemplate = mk_suffix_template(UsedBindings), - case emqx_template:is_const(SuffixTemplate) of - true -> - Template; - false -> - Template ++ SuffixTemplate + case validate_bindings(emqx_template:placeholders(Template)) of + UsedBindings when is_list(UsedBindings) -> + SuffixTemplate = mk_suffix_template(UsedBindings), + case emqx_template:is_const(SuffixTemplate) of + true -> + {ok, Template}; + false -> + {ok, Template ++ SuffixTemplate} + end; + Error = {error, _} -> + Error + end. + +validate_bindings(Bindings) -> + Formats = ["rfc3339", "rfc3339utc", "unix"], + AllowedBindings = lists:append([ + ["action", "node", "sequence"], + ["datetime." ++ F || F <- Formats], + ["datetime_until." ++ F || F <- Formats] + ]), + case Bindings -- AllowedBindings of + [] -> + Bindings; + Disallowed -> + {error, {invalid_key_template, {disallowed_placeholders, Disallowed}}} end. mk_suffix_template(UsedBindings) -> diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src index 6562958ee..b79cba2b2 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src @@ -1,6 +1,6 @@ {application, emqx_connector_aggregator, [ {description, "EMQX Enterprise Connector Data Aggregator"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl index f3936fd54..935fa6b52 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl @@ -372,9 +372,13 @@ lookup_current_buffer(Name) -> %% enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) -> - {ok, Pid} = emqx_connector_aggreg_upload_sup:start_delivery(Name, Buffer), - MRef = erlang:monitor(process, Pid), - St#st{deliveries = Ds#{MRef => Buffer}}. + case emqx_connector_aggreg_upload_sup:start_delivery(Name, Buffer) of + {ok, Pid} -> + MRef = erlang:monitor(process, Pid), + St#st{deliveries = Ds#{MRef => Buffer}}; + {error, _} = Error -> + handle_delivery_exit(Buffer, Error, St) + end. handle_delivery_exit(Buffer, Normal, St = #st{name = Name}) when Normal == normal; Normal == noproc diff --git a/apps/emqx_utils/src/emqx_template.erl b/apps/emqx_utils/src/emqx_template.erl index 1383f90e1..02e18017e 100644 --- a/apps/emqx_utils/src/emqx_template.erl +++ b/apps/emqx_utils/src/emqx_template.erl @@ -20,6 +20,7 @@ -export([parse/2]). -export([parse_deep/1]). -export([parse_deep/2]). +-export([placeholders/1]). -export([validate/2]). -export([is_const/1]). -export([unparse/1]). @@ -143,14 +144,19 @@ parse_accessor(Var) -> Name end. +-spec placeholders(t()) -> [varname()]. +placeholders(Template) when is_list(Template) -> + [Name || {var, Name, _} <- Template]; +placeholders({'$tpl', Template}) -> + placeholders_deep(Template). + %% @doc Validate a template against a set of allowed variables. %% If the given template contains any variable not in the allowed set, an error %% is returned. -spec validate([varname() | {var_namespace, varname()}], t()) -> ok | {error, [_Error :: {varname(), disallowed}]}. validate(Allowed, Template) -> - {_, Errors} = render(Template, #{}), - {Used, _} = lists:unzip(Errors), + Used = placeholders(Template), case find_disallowed(lists:usort(Used), Allowed) of [] -> ok; @@ -192,10 +198,13 @@ is_allowed(Var, [{var_namespace, VarPrefix} | Allowed]) -> false -> is_allowed(Var, Allowed) end; -is_allowed(Var, [Var | _Allowed]) -> +is_allowed(Var, [VarAllowed | Rest]) -> + is_same_varname(Var, VarAllowed) orelse is_allowed(Var, Rest). + +is_same_varname("", ".") -> true; -is_allowed(Var, [_ | Allowed]) -> - is_allowed(Var, Allowed). +is_same_varname(V1, V2) -> + V1 =:= V2. %% @doc Check if a template is constant with respect to rendering, i.e. does not %% contain any placeholders. @@ -322,6 +331,22 @@ parse_deep_term(Term, Opts) when is_binary(Term) -> parse_deep_term(Term, _Opts) -> Term. +-spec placeholders_deep(deeptpl()) -> [varname()]. +placeholders_deep(Template) when is_map(Template) -> + maps:fold( + fun(KT, VT, Acc) -> placeholders_deep(KT) ++ placeholders_deep(VT) ++ Acc end, + [], + Template + ); +placeholders_deep({list, Template}) when is_list(Template) -> + lists:flatmap(fun placeholders_deep/1, Template); +placeholders_deep({tuple, Template}) when is_list(Template) -> + lists:flatmap(fun placeholders_deep/1, Template); +placeholders_deep(Template) when is_list(Template) -> + placeholders(Template); +placeholders_deep(_Term) -> + []. + render_deep(Template, Context, Opts) when is_map(Template) -> maps:fold( fun(KT, VT, {Acc, Errors}) -> diff --git a/apps/emqx_utils/test/emqx_template_SUITE.erl b/apps/emqx_utils/test/emqx_template_SUITE.erl index 0a3273170..a049ebfbc 100644 --- a/apps/emqx_utils/test/emqx_template_SUITE.erl +++ b/apps/emqx_utils/test/emqx_template_SUITE.erl @@ -128,6 +128,14 @@ t_render_custom_bindings(_) -> render_string(Template, {?MODULE, []}) ). +t_placeholders(_) -> + TString = <<"a:${a},b:${b},c:$${c},d:{${d.d1}},e:${$}{e},lit:${$}{$}">>, + Template = emqx_template:parse(TString), + ?assertEqual( + ["a", "b", "c", "d.d1"], + emqx_template:placeholders(Template) + ). + t_unparse(_) -> TString = <<"a:${a},b:${b},c:$${c},d:{${d.d1}},e:${$}{e},lit:${$}{$}">>, Template = emqx_template:parse(TString), @@ -337,6 +345,16 @@ t_unparse_tmpl_deep(_) -> Template = emqx_template:parse_deep(Term), ?assertEqual(Term, emqx_template:unparse(Template)). +t_allow_this(_) -> + ?assertEqual( + {error, [{"", disallowed}]}, + emqx_template:validate(["d"], emqx_template:parse(<<"this:${}">>)) + ), + ?assertEqual( + {error, [{"", disallowed}]}, + emqx_template:validate(["d"], emqx_template:parse(<<"this:${.}">>)) + ). + t_allow_var_by_namespace(_) -> Context = #{d => #{d1 => <<"hi">>}}, Template = emqx_template:parse(<<"d.d1:${d.d1}">>), diff --git a/changes/ee/fix-13227.en.md b/changes/ee/fix-13227.en.md new file mode 100644 index 000000000..f2c1c2f38 --- /dev/null +++ b/changes/ee/fix-13227.en.md @@ -0,0 +1 @@ +Fixed an issue with S3 Bridge when running in aggregated mode, where invalid key template in the configuration haven't been reported as an error during bridge setup, but instead caused a storm of hard to recover crashes later.