Merge pull request #13227 from keynslug/fix/EMQX-12533/bad-tpl
fix(bridge-s3): validate aggreg key template before adding a channel
This commit is contained in:
commit
2f0eb3560b
|
@ -162,8 +162,13 @@ on_get_status(_InstId, State = #{client_config := Config}) ->
|
||||||
-spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) ->
|
-spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) ->
|
||||||
{ok, state()} | {error, _Reason}.
|
{ok, state()} | {error, _Reason}.
|
||||||
on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) ->
|
on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) ->
|
||||||
ChannelState = start_channel(State, Config),
|
try
|
||||||
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}.
|
ChannelState = start_channel(State, Config),
|
||||||
|
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}
|
||||||
|
catch
|
||||||
|
throw:Reason ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
-spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) ->
|
-spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) ->
|
||||||
{ok, state()}.
|
{ok, state()}.
|
||||||
|
@ -221,9 +226,10 @@ start_channel(State, #{
|
||||||
max_records => MaxRecords,
|
max_records => MaxRecords,
|
||||||
work_dir => work_dir(Type, Name)
|
work_dir => work_dir(Type, Name)
|
||||||
},
|
},
|
||||||
|
Template = ensure_ok(emqx_bridge_s3_upload:mk_key_template(Parameters)),
|
||||||
DeliveryOpts = #{
|
DeliveryOpts = #{
|
||||||
bucket => Bucket,
|
bucket => Bucket,
|
||||||
key => emqx_bridge_s3_upload:mk_key_template(Parameters),
|
key => Template,
|
||||||
container => Container,
|
container => Container,
|
||||||
upload_options => emqx_bridge_s3_upload:mk_upload_options(Parameters),
|
upload_options => emqx_bridge_s3_upload:mk_upload_options(Parameters),
|
||||||
callback_module => ?MODULE,
|
callback_module => ?MODULE,
|
||||||
|
@ -247,6 +253,11 @@ start_channel(State, #{
|
||||||
on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end
|
on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
ensure_ok({ok, V}) ->
|
||||||
|
V;
|
||||||
|
ensure_ok({error, Reason}) ->
|
||||||
|
throw(Reason).
|
||||||
|
|
||||||
upload_options(Parameters) ->
|
upload_options(Parameters) ->
|
||||||
#{acl => maps:get(acl, Parameters, undefined)}.
|
#{acl => maps:get(acl, Parameters, undefined)}.
|
||||||
|
|
||||||
|
|
|
@ -248,17 +248,35 @@ convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := Resou
|
||||||
|
|
||||||
%% Interpreting options
|
%% Interpreting options
|
||||||
|
|
||||||
-spec mk_key_template(_Parameters :: map()) -> emqx_template:str().
|
-spec mk_key_template(_Parameters :: map()) ->
|
||||||
|
{ok, emqx_template:str()} | {error, _Reason}.
|
||||||
mk_key_template(#{key := Key}) ->
|
mk_key_template(#{key := Key}) ->
|
||||||
Template = emqx_template:parse(Key),
|
Template = emqx_template:parse(Key),
|
||||||
{_, BindingErrors} = emqx_template:render(Template, #{}),
|
case validate_bindings(emqx_template:placeholders(Template)) of
|
||||||
{UsedBindings, _} = lists:unzip(BindingErrors),
|
UsedBindings when is_list(UsedBindings) ->
|
||||||
SuffixTemplate = mk_suffix_template(UsedBindings),
|
SuffixTemplate = mk_suffix_template(UsedBindings),
|
||||||
case emqx_template:is_const(SuffixTemplate) of
|
case emqx_template:is_const(SuffixTemplate) of
|
||||||
true ->
|
true ->
|
||||||
Template;
|
{ok, Template};
|
||||||
false ->
|
false ->
|
||||||
Template ++ SuffixTemplate
|
{ok, Template ++ SuffixTemplate}
|
||||||
|
end;
|
||||||
|
Error = {error, _} ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
validate_bindings(Bindings) ->
|
||||||
|
Formats = ["rfc3339", "rfc3339utc", "unix"],
|
||||||
|
AllowedBindings = lists:append([
|
||||||
|
["action", "node", "sequence"],
|
||||||
|
["datetime." ++ F || F <- Formats],
|
||||||
|
["datetime_until." ++ F || F <- Formats]
|
||||||
|
]),
|
||||||
|
case Bindings -- AllowedBindings of
|
||||||
|
[] ->
|
||||||
|
Bindings;
|
||||||
|
Disallowed ->
|
||||||
|
{error, {invalid_key_template, {disallowed_placeholders, Disallowed}}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
mk_suffix_template(UsedBindings) ->
|
mk_suffix_template(UsedBindings) ->
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_connector_aggregator, [
|
{application, emqx_connector_aggregator, [
|
||||||
{description, "EMQX Enterprise Connector Data Aggregator"},
|
{description, "EMQX Enterprise Connector Data Aggregator"},
|
||||||
{vsn, "0.1.0"},
|
{vsn, "0.1.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
|
|
|
@ -372,9 +372,13 @@ lookup_current_buffer(Name) ->
|
||||||
%%
|
%%
|
||||||
|
|
||||||
enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) ->
|
enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) ->
|
||||||
{ok, Pid} = emqx_connector_aggreg_upload_sup:start_delivery(Name, Buffer),
|
case emqx_connector_aggreg_upload_sup:start_delivery(Name, Buffer) of
|
||||||
MRef = erlang:monitor(process, Pid),
|
{ok, Pid} ->
|
||||||
St#st{deliveries = Ds#{MRef => Buffer}}.
|
MRef = erlang:monitor(process, Pid),
|
||||||
|
St#st{deliveries = Ds#{MRef => Buffer}};
|
||||||
|
{error, _} = Error ->
|
||||||
|
handle_delivery_exit(Buffer, Error, St)
|
||||||
|
end.
|
||||||
|
|
||||||
handle_delivery_exit(Buffer, Normal, St = #st{name = Name}) when
|
handle_delivery_exit(Buffer, Normal, St = #st{name = Name}) when
|
||||||
Normal == normal; Normal == noproc
|
Normal == normal; Normal == noproc
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
-export([parse/2]).
|
-export([parse/2]).
|
||||||
-export([parse_deep/1]).
|
-export([parse_deep/1]).
|
||||||
-export([parse_deep/2]).
|
-export([parse_deep/2]).
|
||||||
|
-export([placeholders/1]).
|
||||||
-export([validate/2]).
|
-export([validate/2]).
|
||||||
-export([is_const/1]).
|
-export([is_const/1]).
|
||||||
-export([unparse/1]).
|
-export([unparse/1]).
|
||||||
|
@ -143,14 +144,19 @@ parse_accessor(Var) ->
|
||||||
Name
|
Name
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec placeholders(t()) -> [varname()].
|
||||||
|
placeholders(Template) when is_list(Template) ->
|
||||||
|
[Name || {var, Name, _} <- Template];
|
||||||
|
placeholders({'$tpl', Template}) ->
|
||||||
|
placeholders_deep(Template).
|
||||||
|
|
||||||
%% @doc Validate a template against a set of allowed variables.
|
%% @doc Validate a template against a set of allowed variables.
|
||||||
%% If the given template contains any variable not in the allowed set, an error
|
%% If the given template contains any variable not in the allowed set, an error
|
||||||
%% is returned.
|
%% is returned.
|
||||||
-spec validate([varname() | {var_namespace, varname()}], t()) ->
|
-spec validate([varname() | {var_namespace, varname()}], t()) ->
|
||||||
ok | {error, [_Error :: {varname(), disallowed}]}.
|
ok | {error, [_Error :: {varname(), disallowed}]}.
|
||||||
validate(Allowed, Template) ->
|
validate(Allowed, Template) ->
|
||||||
{_, Errors} = render(Template, #{}),
|
Used = placeholders(Template),
|
||||||
{Used, _} = lists:unzip(Errors),
|
|
||||||
case find_disallowed(lists:usort(Used), Allowed) of
|
case find_disallowed(lists:usort(Used), Allowed) of
|
||||||
[] ->
|
[] ->
|
||||||
ok;
|
ok;
|
||||||
|
@ -192,10 +198,13 @@ is_allowed(Var, [{var_namespace, VarPrefix} | Allowed]) ->
|
||||||
false ->
|
false ->
|
||||||
is_allowed(Var, Allowed)
|
is_allowed(Var, Allowed)
|
||||||
end;
|
end;
|
||||||
is_allowed(Var, [Var | _Allowed]) ->
|
is_allowed(Var, [VarAllowed | Rest]) ->
|
||||||
|
is_same_varname(Var, VarAllowed) orelse is_allowed(Var, Rest).
|
||||||
|
|
||||||
|
is_same_varname("", ".") ->
|
||||||
true;
|
true;
|
||||||
is_allowed(Var, [_ | Allowed]) ->
|
is_same_varname(V1, V2) ->
|
||||||
is_allowed(Var, Allowed).
|
V1 =:= V2.
|
||||||
|
|
||||||
%% @doc Check if a template is constant with respect to rendering, i.e. does not
|
%% @doc Check if a template is constant with respect to rendering, i.e. does not
|
||||||
%% contain any placeholders.
|
%% contain any placeholders.
|
||||||
|
@ -322,6 +331,22 @@ parse_deep_term(Term, Opts) when is_binary(Term) ->
|
||||||
parse_deep_term(Term, _Opts) ->
|
parse_deep_term(Term, _Opts) ->
|
||||||
Term.
|
Term.
|
||||||
|
|
||||||
|
-spec placeholders_deep(deeptpl()) -> [varname()].
|
||||||
|
placeholders_deep(Template) when is_map(Template) ->
|
||||||
|
maps:fold(
|
||||||
|
fun(KT, VT, Acc) -> placeholders_deep(KT) ++ placeholders_deep(VT) ++ Acc end,
|
||||||
|
[],
|
||||||
|
Template
|
||||||
|
);
|
||||||
|
placeholders_deep({list, Template}) when is_list(Template) ->
|
||||||
|
lists:flatmap(fun placeholders_deep/1, Template);
|
||||||
|
placeholders_deep({tuple, Template}) when is_list(Template) ->
|
||||||
|
lists:flatmap(fun placeholders_deep/1, Template);
|
||||||
|
placeholders_deep(Template) when is_list(Template) ->
|
||||||
|
placeholders(Template);
|
||||||
|
placeholders_deep(_Term) ->
|
||||||
|
[].
|
||||||
|
|
||||||
render_deep(Template, Context, Opts) when is_map(Template) ->
|
render_deep(Template, Context, Opts) when is_map(Template) ->
|
||||||
maps:fold(
|
maps:fold(
|
||||||
fun(KT, VT, {Acc, Errors}) ->
|
fun(KT, VT, {Acc, Errors}) ->
|
||||||
|
|
|
@ -128,6 +128,14 @@ t_render_custom_bindings(_) ->
|
||||||
render_string(Template, {?MODULE, []})
|
render_string(Template, {?MODULE, []})
|
||||||
).
|
).
|
||||||
|
|
||||||
|
t_placeholders(_) ->
|
||||||
|
TString = <<"a:${a},b:${b},c:$${c},d:{${d.d1}},e:${$}{e},lit:${$}{$}">>,
|
||||||
|
Template = emqx_template:parse(TString),
|
||||||
|
?assertEqual(
|
||||||
|
["a", "b", "c", "d.d1"],
|
||||||
|
emqx_template:placeholders(Template)
|
||||||
|
).
|
||||||
|
|
||||||
t_unparse(_) ->
|
t_unparse(_) ->
|
||||||
TString = <<"a:${a},b:${b},c:$${c},d:{${d.d1}},e:${$}{e},lit:${$}{$}">>,
|
TString = <<"a:${a},b:${b},c:$${c},d:{${d.d1}},e:${$}{e},lit:${$}{$}">>,
|
||||||
Template = emqx_template:parse(TString),
|
Template = emqx_template:parse(TString),
|
||||||
|
@ -337,6 +345,16 @@ t_unparse_tmpl_deep(_) ->
|
||||||
Template = emqx_template:parse_deep(Term),
|
Template = emqx_template:parse_deep(Term),
|
||||||
?assertEqual(Term, emqx_template:unparse(Template)).
|
?assertEqual(Term, emqx_template:unparse(Template)).
|
||||||
|
|
||||||
|
t_allow_this(_) ->
|
||||||
|
?assertEqual(
|
||||||
|
{error, [{"", disallowed}]},
|
||||||
|
emqx_template:validate(["d"], emqx_template:parse(<<"this:${}">>))
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
{error, [{"", disallowed}]},
|
||||||
|
emqx_template:validate(["d"], emqx_template:parse(<<"this:${.}">>))
|
||||||
|
).
|
||||||
|
|
||||||
t_allow_var_by_namespace(_) ->
|
t_allow_var_by_namespace(_) ->
|
||||||
Context = #{d => #{d1 => <<"hi">>}},
|
Context = #{d => #{d1 => <<"hi">>}},
|
||||||
Template = emqx_template:parse(<<"d.d1:${d.d1}">>),
|
Template = emqx_template:parse(<<"d.d1:${d.d1}">>),
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed an issue with S3 Bridge when running in aggregated mode, where invalid key template in the configuration haven't been reported as an error during bridge setup, but instead caused a storm of hard to recover crashes later.
|
Loading…
Reference in New Issue