feat(tpl): use `emqx_connector_template` in `emqx_rule_engine` app

This commit is contained in:
Andrew Mayorov 2023-04-18 17:42:39 +03:00
parent e1bca5844f
commit b812f9af5a
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 57 additions and 52 deletions

View File

@ -30,6 +30,7 @@
-export([render_strict/2]). -export([render_strict/2]).
-export([render_strict/3]). -export([render_strict/3]).
-export([lookup_var/2]).
-export([to_string/1]). -export([to_string/1]).
-export_type([t/0]). -export_type([t/0]).

View File

@ -65,23 +65,18 @@ pre_process_action_args(
qos := QoS, qos := QoS,
retain := Retain, retain := Retain,
payload := Payload, payload := Payload,
mqtt_properties := MQTTPropertiesTemplate0, mqtt_properties := MQTTProperties,
user_properties := UserPropertiesTemplate user_properties := UserProperties
} = Args } = Args
) -> ) ->
MQTTPropertiesTemplate =
maps:map(
fun(_Key, V) -> emqx_placeholder:preproc_tmpl(V) end,
MQTTPropertiesTemplate0
),
Args#{ Args#{
preprocessed_tmpl => #{ preprocessed_tmpl => #{
topic => emqx_placeholder:preproc_tmpl(Topic), topic => emqx_connector_template:parse(Topic),
qos => preproc_vars(QoS), qos => parse_vars(QoS),
retain => preproc_vars(Retain), retain => parse_vars(Retain),
payload => emqx_placeholder:preproc_tmpl(Payload), payload => parse_payload(Payload),
mqtt_properties => MQTTPropertiesTemplate, mqtt_properties => parse_mqtt_properties(MQTTProperties),
user_properties => preproc_user_properties(UserPropertiesTemplate) user_properties => parse_user_properties(UserProperties)
} }
}; };
pre_process_action_args(_, Args) -> pre_process_action_args(_, Args) ->
@ -114,25 +109,27 @@ republish(
#{metadata := #{rule_id := RuleId}} = Env, #{metadata := #{rule_id := RuleId}} = Env,
#{ #{
preprocessed_tmpl := #{ preprocessed_tmpl := #{
qos := QoSTks, qos := QoSTemplate,
retain := RetainTks, retain := RetainTemplate,
topic := TopicTks, topic := TopicTemplate,
payload := PayloadTks, payload := PayloadTemplate,
mqtt_properties := MQTTPropertiesTemplate, mqtt_properties := MQTTPropertiesTemplate,
user_properties := UserPropertiesTks user_properties := UserPropertiesTemplate
} }
} }
) -> ) ->
Topic = emqx_placeholder:proc_tmpl(TopicTks, Selected), Topic = unicode:characters_to_binary(
Payload = format_msg(PayloadTks, Selected), emqx_connector_template:render_strict(TopicTemplate, Selected)
QoS = replace_simple_var(QoSTks, Selected, 0), ),
Retain = replace_simple_var(RetainTks, Selected, false), Payload = emqx_connector_template:render_strict(PayloadTemplate, Selected),
QoS = render_simple_var(QoSTemplate, Selected, 0),
Retain = render_simple_var(RetainTemplate, Selected, false),
%% 'flags' is set for message re-publishes or message related %% 'flags' is set for message re-publishes or message related
%% events such as message.acked and message.dropped %% events such as message.acked and message.dropped
Flags0 = maps:get(flags, Env, #{}), Flags0 = maps:get(flags, Env, #{}),
Flags = Flags0#{retain => Retain}, Flags = Flags0#{retain => Retain},
PubProps0 = format_pub_props(UserPropertiesTks, Selected, Env), PubProps0 = render_pub_props(UserPropertiesTemplate, Selected, Env),
MQTTProps = format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env), MQTTProps = render_mqtt_properties(MQTTPropertiesTemplate, Selected, Env),
PubProps = maps:merge(PubProps0, MQTTProps), PubProps = maps:merge(PubProps0, MQTTProps),
?TRACE( ?TRACE(
"RULE", "RULE",
@ -203,58 +200,66 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
_ = emqx_broker:safe_publish(Msg), _ = emqx_broker:safe_publish(Msg),
emqx_metrics:inc_msg(Msg). emqx_metrics:inc_msg(Msg).
preproc_vars(Data) when is_binary(Data) -> parse_vars(Data) when is_binary(Data) ->
emqx_placeholder:preproc_tmpl(Data); emqx_connector_template:parse(Data);
preproc_vars(Data) -> parse_vars(Data) ->
Data. {const, Data}.
preproc_user_properties(<<"${pub_props.'User-Property'}">>) -> parse_mqtt_properties(MQTTPropertiesTemplate) ->
maps:map(
fun(_Key, V) -> emqx_connector_template:parse(V) end,
MQTTPropertiesTemplate
).
parse_user_properties(<<"${pub_props.'User-Property'}">>) ->
%% keep the original %% keep the original
%% avoid processing this special variable because %% avoid processing this special variable because
%% we do not want to force users to select the value %% we do not want to force users to select the value
%% the value will be taken from Env.pub_props directly %% the value will be taken from Env.pub_props directly
?ORIGINAL_USER_PROPERTIES; ?ORIGINAL_USER_PROPERTIES;
preproc_user_properties(<<"${", _/binary>> = V) -> parse_user_properties(<<"${", _/binary>> = V) ->
%% use a variable %% use a variable
emqx_placeholder:preproc_tmpl(V); emqx_connector_template:parse(V);
preproc_user_properties(_) -> parse_user_properties(_) ->
%% invalid, discard %% invalid, discard
undefined. undefined.
replace_simple_var(Tokens, Data, Default) when is_list(Tokens) -> render_simple_var([{var, Name}], Data, Default) ->
[Var] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}), case emqx_connector_template:lookup_var(Name, Data) of
case Var of {ok, Var} -> Var;
%% cannot find the variable from Data %% cannot find the variable from Data
undefined -> Default; {error, _} -> Default
_ -> Var
end; end;
replace_simple_var(Val, _Data, _Default) -> render_simple_var({const, Val}, _Data, _Default) ->
Val. Val.
format_msg([], Selected) -> parse_payload(Payload) ->
emqx_utils_json:encode(Selected); case string:is_empty(Payload) of
format_msg(Tokens, Selected) -> false -> emqx_connector_template:parse(Payload);
emqx_placeholder:proc_tmpl(Tokens, Selected). true -> emqx_connector_template:parse("${.}")
end.
format_pub_props(UserPropertiesTks, Selected, Env) -> render_pub_props(UserPropertiesTemplate, Selected, Env) ->
UserProperties = UserProperties =
case UserPropertiesTks of case UserPropertiesTemplate of
?ORIGINAL_USER_PROPERTIES -> ?ORIGINAL_USER_PROPERTIES ->
maps:get('User-Property', maps:get(pub_props, Env, #{}), #{}); maps:get('User-Property', maps:get(pub_props, Env, #{}), #{});
undefined -> undefined ->
#{}; #{};
_ -> _ ->
replace_simple_var(UserPropertiesTks, Selected, #{}) render_simple_var(UserPropertiesTemplate, Selected, #{})
end, end,
#{'User-Property' => UserProperties}. #{'User-Property' => UserProperties}.
format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) -> render_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) ->
#{metadata := #{rule_id := RuleId}} = Env, #{metadata := #{rule_id := RuleId}} = Env,
MQTTProperties0 = MQTTProperties =
maps:fold( maps:fold(
fun(K, Template, Acc) -> fun(K, Template, Acc) ->
try try
V = emqx_placeholder:proc_tmpl(Template, Selected), V = unicode:characters_to_binary(
emqx_connector_template:render_strict(Template, Selected)
),
Acc#{K => V} Acc#{K => V}
catch catch
Kind:Error -> Kind:Error ->
@ -275,7 +280,7 @@ format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) ->
#{}, #{},
MQTTPropertiesTemplate MQTTPropertiesTemplate
), ),
coerce_properties_values(MQTTProperties0, Env). coerce_properties_values(MQTTProperties, Env).
ensure_int(B) when is_binary(B) -> ensure_int(B) when is_binary(B) ->
try try

View File

@ -1364,14 +1364,13 @@ t_sqlselect_inject_props(_Config) ->
actions => [Repub] actions => [Repub]
} }
), ),
Props = user_properties(#{<<"inject_key">> => <<"inject_val">>}),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
emqtt:publish(Client, <<"t1">>, #{}, <<"{\"x\":1}">>, [{qos, 0}]), emqtt:publish(Client, <<"t1">>, #{}, <<"{\"x\":1}">>, [{qos, 0}]),
receive receive
{publish, #{topic := T, payload := Payload, properties := Props2}} -> {publish, #{topic := T, payload := Payload, properties := Props}} ->
?assertEqual(Props, Props2), ?assertEqual(user_properties(#{<<"inject_key">> => <<"inject_val">>}), Props),
?assertEqual(<<"t2">>, T), ?assertEqual(<<"t2">>, T),
?assertEqual(<<"{\"x\":1}">>, Payload) ?assertEqual(<<"{\"x\":1}">>, Payload)
after 2000 -> after 2000 ->