From 0e7a3f89a9f54a61a57e8e126b00afef216a3850 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Wed, 13 Oct 2021 17:50:22 +0800
Subject: [PATCH] feat(rules): support output functions in <<"Mod:Func">>
format
---
apps/emqx_rule_engine/include/rule_engine.hrl | 20 ++--
.../emqx_rule_engine/src/emqx_rule_engine.erl | 22 +---
.../src/emqx_rule_engine_api.erl | 10 +-
.../src/emqx_rule_engine_schema.erl | 96 +++++++++++++---
.../src/emqx_rule_outputs.erl | 108 +++++++++++++-----
.../src/emqx_rule_runtime.erl | 9 +-
.../src/emqx_rule_sqltester.erl | 2 +-
.../test/emqx_rule_engine_SUITE.erl | 4 +-
8 files changed, 181 insertions(+), 90 deletions(-)
diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl
index 88527a2b9..b7ec37d8e 100644
--- a/apps/emqx_rule_engine/include/rule_engine.hrl
+++ b/apps/emqx_rule_engine/include/rule_engine.hrl
@@ -26,19 +26,21 @@
-type mf() :: {Module::atom(), Fun::atom()}.
-type hook() :: atom() | 'any'.
-
-type topic() :: binary().
--type bridge_channel_id() :: binary().
+
-type selected_data() :: map().
-type envs() :: map().
--type output_target() :: bridge_channel_id() | atom() | output_fun().
--type output_fun_args() :: map().
--type output() :: #{
- function := output_target(),
- args => output_fun_args()
-}.
--type output_fun() :: fun((selected_data(), envs(), output_fun_args()) -> any()).
+-type builtin_output_func() :: republish | console.
+-type builtin_output_module() :: emqx_rule_outputs.
+-type bridge_channel_id() :: binary().
+-type output_fun_args() :: map().
+
+-type output() :: #{
+ mod := builtin_output_module() | module(),
+ func := builtin_output_func() | atom(),
+ args => output_fun_args()
+} | bridge_channel_id().
-type rule() ::
#{ id := rule_id()
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
index 7ed8d19d6..8b7039401 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
@@ -260,27 +260,13 @@ do_delete_rule(RuleId) ->
end.
parse_outputs(Outputs) ->
- [do_parse_outputs(Out) || Out <- Outputs].
+ [do_parse_output(Out) || Out <- Outputs].
-do_parse_outputs(#{function := Repub, args := Args})
- when Repub == republish; Repub == <<"republish">> ->
- #{function => republish, args => emqx_rule_outputs:pre_process_repub_args(Args)};
-do_parse_outputs(#{function := Func} = Output) ->
- #{function => parse_output_func(Func), args => maps:get(args, Output, #{})};
-do_parse_outputs(BridgeChannelId) when is_binary(BridgeChannelId) ->
+do_parse_output(Output) when is_map(Output) ->
+ emqx_rule_outputs:parse_output(Output);
+do_parse_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
BridgeChannelId.
-parse_output_func(FuncName) when is_atom(FuncName) ->
- FuncName;
-parse_output_func(BinFunc) when is_binary(BinFunc) ->
- try binary_to_existing_atom(BinFunc) of
- Func -> emqx_rule_outputs:assert_builtin_output(Func)
- catch
- error:badarg -> error({unknown_builtin_function, BinFunc})
- end;
-parse_output_func(Func) when is_function(Func) ->
- Func.
-
get_all_records(Tab) ->
[Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)].
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
index 3c1eaba67..5e6b28b61 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
@@ -330,13 +330,9 @@ format_datetime(Timestamp, Unit) ->
format_output(Outputs) ->
[do_format_output(Out) || Out <- Outputs].
-do_format_output(#{function := Func}) when is_function(Func) ->
- FunInfo = erlang:fun_info(Func),
- FunMod = proplists:get_value(module, FunInfo),
- FunName = proplists:get_value(name, FunInfo),
- #{function => list_to_binary(lists:concat([FunMod,":",FunName]))};
-do_format_output(#{function := Name, args := Args}) ->
- #{function => Name, args => maps:remove(preprocessed_tmpl, Args)};
+do_format_output(#{mod := Mod, func := Func, args := Args}) ->
+ #{function => list_to_binary(lists:concat([Mod,":",Func])),
+ args => maps:remove(preprocessed_tmpl, Args)};
do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
BridgeChannelId.
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
index 41b3693bb..47a86e763 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
@@ -37,24 +37,64 @@ fields("rule_engine") ->
];
fields("rules") ->
- [ {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false,
- validator => fun ?MODULE:validate_sql/1})}
+ [ {"sql", sc(binary(),
+ #{ desc => """
+SQL query to transform the messages.
+Example: SELECT * FROM \"test/topic\" WHERE payload.x = 1
+"""
+ , nullable => false
+ , validator => fun ?MODULE:validate_sql/1})}
, {"outputs", sc(hoconsc:array(hoconsc:union(
[ binary()
, ref("builtin_output_republish")
, ref("builtin_output_console")
])),
- #{desc => "The outputs of the rule. An output can be a string refers to the channel Id "
- "of a emqx bridge, or a object refers to a built-in function.",
- default => []})}
+ #{ desc => """
+A list of outputs of the rule.
+An output can be a string that refers to the channel Id of a emqx bridge, or a object
+that refers to a function.
+There a some built-in functions like \"republish\" and \"console\", and we also support user
+provided functions like \":\".
+The outputs in the list is executed one by one in order.
+This means that if one of the output is executing slowly, all of the outputs comes after it will not
+be executed until it returns.
+If one of the output crashed, all other outputs come after it will still be executed, in the
+original order.
+If there's any error when running an output, there will be an error message, and the 'failure'
+counter of the function output or the bridge channel will increase.
+"""
+ , default => []
+ })}
, {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})}
, {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})}
];
fields("builtin_output_republish") ->
[ {function, sc(republish, #{desc => "Republish the message as a new MQTT message"})}
- , {args, sc(ref("republish_args"), #{desc => "The arguments of the built-in 'republish' output",
- default => #{}})}
+ , {args, sc(ref("republish_args"),
+ #{ desc => """
+The arguments of the built-in 'republish' output.
+We can use variables in the args.
+
+The variables are selected by the rule. For exmaple, if the rule SQL is defined as following:
+
+ SELECT clientid, qos, payload FROM \"t/1\"
+
+Then there are 3 variables available: clientid
, qos
and
+payload
. And if we've set the args to:
+
+ {
+ topic = \"t/${clientid}\"
+ qos = \"${qos}\"
+ payload = \"msg: ${payload}\"
+ }
+
+When the rule is triggered by an MQTT message with payload = \"hello\", qos = 1,
+clientid = \"steve\", the rule will republish a new MQTT message to topic \"t/steve\",
+payload = \"msg: hello\", and qos = 1.
+"""
+ , default => #{}
+ })}
];
fields("builtin_output_console") ->
@@ -66,21 +106,39 @@ fields("builtin_output_console") ->
fields("republish_args") ->
[ {topic, sc(binary(),
- #{desc => "The target topic of the re-published message."
- " Template with with variables is allowed.",
- nullable => false})}
+ #{ desc =>"""
+The target topic of message to be re-published.
+Template with variables is allowed, see description of the 'republish_args'.
+"""
+ , nullable => false
+ })}
, {qos, sc(binary(),
- #{desc => "The qos of the re-published message."
- " Template with with variables is allowed. Defaults to ${qos}.",
- default => <<"${qos}">> })}
+ #{ desc => """
+The qos of the message to be re-published.
+Template with with variables is allowed, see description of the 'republish_args.
+Defaults to ${qos}. If variable ${qos} is not found from the selected result of the rule,
+0 is used.
+"""
+ , default => <<"${qos}">>
+ })}
, {retain, sc(binary(),
- #{desc => "The retain of the re-published message."
- " Template with with variables is allowed. Defaults to ${retain}.",
- default => <<"${retain}">> })}
+ #{ desc => """
+The retain flag of the message to be re-published.
+Template with with variables is allowed, see description of the 'republish_args.
+Defaults to ${retain}. If variable ${retain} is not found from the selected result
+of the rule, false is used.
+"""
+ , default => <<"${retain}">>
+ })}
, {payload, sc(binary(),
- #{desc => "The payload of the re-published message."
- " Template with with variables is allowed. Defaults to ${payload}.",
- default => <<"${payload}">>})}
+ #{ desc => """
+The payload of the message to be re-published.
+Template with with variables is allowed, see description of the 'republish_args.
.
+Defaults to ${payload}. If variable ${payload} is not found from the selected result
+of the rule, then the string \"undefined\" is used.
+"""
+ , default => <<"${payload}">>
+ })}
].
validate_sql(Sql) ->
diff --git a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl
index eccf090e9..61a520e81 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl
@@ -16,28 +16,54 @@
%% Define the default actions.
-module(emqx_rule_outputs).
+
+-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
--define(OUTPUT_FUNCS,
- [ console
- , republish
+%% APIs
+-export([ parse_output/1
]).
+%% callbacks of emqx_rule_output
+-export([ pre_process_output_args/2
+ ]).
+
+%% output functions
-export([ console/3
, republish/3
]).
--export([ pre_process_repub_args/1
- , assert_builtin_output/1
- ]).
+-optional_callbacks([ pre_process_output_args/2
+ ]).
-assert_builtin_output(FuncName) ->
- case lists:member(FuncName, ?OUTPUT_FUNCS) of
- true -> FuncName;
- false -> error({unknown_builtin_function, FuncName})
- end.
+-callback pre_process_output_args(FuncName :: atom(), output_fun_args()) -> output_fun_args().
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+parse_output(#{function := OutputFunc} = Output) ->
+ {Mod, Func} = parse_output_func(OutputFunc),
+ #{mod => Mod, func => Func,
+ args => pre_process_args(Mod, Func, maps:get(args, Output, #{}))}.
+
+%%--------------------------------------------------------------------
+%% callbacks of emqx_rule_output
+%%--------------------------------------------------------------------
+pre_process_output_args(republish, #{topic := Topic, qos := QoS, retain := Retain,
+ payload := Payload} = Args) ->
+ Args#{preprocessed_tmpl => #{
+ topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
+ qos => preproc_vars(QoS),
+ retain => preproc_vars(Retain),
+ payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
+ }};
+pre_process_output_args(_, Args) ->
+ Args.
+
+%%--------------------------------------------------------------------
+%% output functions
+%%--------------------------------------------------------------------
-spec console(map(), map(), map()) -> any().
console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) ->
?ULOG("[rule output] ~ts~n"
@@ -57,8 +83,8 @@ republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}},
payload := PayloadTks}}) ->
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
- QoS = replace_simple_var(QoSTks, Selected),
- Retain = replace_simple_var(RetainTks, Selected),
+ QoS = replace_simple_var(QoSTks, Selected, 0),
+ Retain = replace_simple_var(RetainTks, Selected, false),
?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}),
safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
@@ -71,11 +97,45 @@ republish(Selected, #{metadata := #{rule_id := RuleId}},
payload := PayloadTks}}) ->
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
- QoS = replace_simple_var(QoSTks, Selected),
- Retain = replace_simple_var(RetainTks, Selected),
+ QoS = replace_simple_var(QoSTks, Selected, 0),
+ Retain = replace_simple_var(RetainTks, Selected, false),
?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}),
safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
+%%--------------------------------------------------------------------
+%% internal functions
+%%--------------------------------------------------------------------
+parse_output_func(OutputFunc) ->
+ {Mod, Func} = get_output_mod_func(OutputFunc),
+ assert_function_supported(Mod, Func),
+ {Mod, Func}.
+
+get_output_mod_func(OutputFunc) when is_atom(OutputFunc) ->
+ {emqx_rule_outputs, OutputFunc};
+get_output_mod_func(OutputFunc) when is_binary(OutputFunc) ->
+ ToAtom = fun(Bin) ->
+ try binary_to_existing_atom(Bin) of Atom -> Atom
+ catch error:badarg -> error({unknown_output_function, OutputFunc})
+ end
+ end,
+ case string:split(OutputFunc, ":", all) of
+ [Func1] -> {emqx_rule_outputs, ToAtom(Func1)};
+ [Mod1, Func1] -> {ToAtom(Mod1), ToAtom(Func1)};
+ _ -> error({invalid_output_function, OutputFunc})
+ end.
+
+assert_function_supported(Mod, Func) ->
+ case erlang:function_exported(Mod, Func, 3) of
+ true -> ok;
+ false -> error({output_function_not_supported, Func})
+ end.
+
+pre_process_args(Mod, Func, Args) ->
+ case erlang:function_exported(Mod, pre_process_output_args, 2) of
+ true -> Mod:pre_process_output_args(Func, Args);
+ false -> Args
+ end.
+
safe_publish(RuleId, Topic, QoS, Flags, Payload) ->
Msg = #message{
id = emqx_guid:gen(),
@@ -90,22 +150,16 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload) ->
_ = emqx_broker:safe_publish(Msg),
emqx_metrics:inc_msg(Msg).
-pre_process_repub_args(#{topic := Topic, qos := QoS, retain := Retain,
- payload := Payload} = Args) ->
- Args#{preprocessed_tmpl => #{
- topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
- qos => preproc_vars(QoS),
- retain => preproc_vars(Retain),
- payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
- }}.
-
preproc_vars(Data) when is_binary(Data) ->
emqx_plugin_libs_rule:preproc_tmpl(Data);
preproc_vars(Data) ->
Data.
-replace_simple_var(Tokens, Data) when is_list(Tokens) ->
+replace_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
[Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
- Var;
-replace_simple_var(Val, _Data) ->
+ case Var of
+ undefined -> Default; %% cannot find the variable from Data
+ _ -> Var
+ end;
+replace_simple_var(Val, _Data, _Default) ->
Val.
diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl
index 5b460456e..dc162665c 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl
@@ -250,13 +250,8 @@ handle_output(OutId, Selected, Envs) ->
do_handle_output(ChannelId, Selected, _Envs) when is_binary(ChannelId) ->
?SLOG(debug, #{msg => "output to bridge", channel_id => ChannelId}),
emqx_bridge:send_message(ChannelId, Selected);
-do_handle_output(#{function := Func} = Out, Selected, Envs) when is_function(Func) ->
- erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]);
-do_handle_output(#{function := Func} = Out, Selected, Envs) when is_atom(Func) ->
- case erlang:function_exported(emqx_rule_outputs, Func, 3) of
- true -> erlang:apply(emqx_rule_outputs, Func, [Selected, Envs, maps:get(args, Out, #{})]);
- false -> error(not_found)
- end.
+do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
+ Mod:Func(Selected, Envs, Args).
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
nested_get({path, Path}, may_decode_payload(Payload));
diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl
index 941c82cda..a67e62355 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl
@@ -46,7 +46,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
id => RuleId,
sql => Sql,
from => EventTopics,
- outputs => [#{function => fun ?MODULE:get_selected_data/3, args => #{}}],
+ outputs => [#{mod => ?MODULE, func => get_selected_data, args => #{}}],
enabled => true,
is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
fields => emqx_rule_sqlparser:select_fields(Select),
diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
index 812406b2c..659777d56 100644
--- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
+++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
@@ -150,7 +150,7 @@ init_per_testcase(t_events, Config) ->
sql => SQL,
outputs => [
#{function => console},
- #{function => fun ?MODULE:output_record_triggered_events/3,
+ #{function => <<"emqx_rule_engine_SUITE:output_record_triggered_events">>,
args => #{}}
],
description => <<"to console and record triggered events">>}),
@@ -1318,7 +1318,7 @@ make_simple_rule(RuleId, SQL, Topics, Ts) when is_binary(RuleId) ->
fields => [<<"*">>],
is_foreach => false,
conditions => {},
- ouputs => [#{function => console}],
+ outputs => [#{mod => emqx_rule_outputs, func => console, args => #{}}],
description => <<"simple rule">>,
created_at => Ts
}.