From b063b6f2530cefcdafd9ab400198c8cb8dad11c9 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 11 Oct 2021 14:36:58 +0800 Subject: [PATCH] feat(rules): support configure rules in config file --- .../etc/emqx_rule_engine.conf | 15 ++ apps/emqx_rule_engine/include/rule_engine.hrl | 25 ++- .../src/emqx_rule_api_schema.erl | 43 +----- .../emqx_rule_engine/src/emqx_rule_engine.erl | 91 +++++------ .../src/emqx_rule_engine_api.erl | 28 ++-- .../src/emqx_rule_engine_app.erl | 3 +- .../src/emqx_rule_engine_schema.erl | 53 ++++++- .../emqx_rule_engine/src/emqx_rule_events.erl | 2 +- .../src/emqx_rule_outputs.erl | 32 ++++ .../src/emqx_rule_registry.erl | 145 +++++++----------- .../src/emqx_rule_runtime.erl | 44 ++---- .../src/emqx_rule_sqltester.erl | 26 ++-- .../test/emqx_rule_engine_SUITE.erl | 98 +++--------- .../test/emqx_rule_engine_api_SUITE.erl | 64 ++++++++ 14 files changed, 343 insertions(+), 326 deletions(-) create mode 100644 apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl diff --git a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf index a4344cda8..aff89b77e 100644 --- a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf +++ b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf @@ -3,4 +3,19 @@ ##==================================================================== rule_engine { ignore_sys_message = true + #rules.my_republish_rule { + # description = "A simple rule that republishs MQTT messages from topic 't/1' to 't/2'" + # enable = true + # sql = "SELECT * FROM \"t/1\"" + # outputs = [ + # { + # function = republish + # args = { + # topic = "t/2" + # qos = "${qos}" + # payload = "${payload}" + # } + # } + # ] + #} } diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index c230aa8c3..7cb5d28d4 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -31,35 +31,30 @@ -type bridge_channel_id() :: binary(). -type selected_data() :: map(). -type envs() :: map(). --type output_type() :: bridge | builtin | func. -type output_target() :: bridge_channel_id() | atom() | output_fun(). -type output_fun_args() :: map(). -type output() :: #{ - type := output_type(), - target := output_target(), + function := output_target(), args => output_fun_args() }. + -type output_fun() :: fun((selected_data(), envs(), output_fun_args()) -> any()). --type rule_info() :: - #{ from := list(topic()) - , outputs := [output()] +-type rule() :: + #{ id := rule_id() , sql := binary() + , outputs := [output()] + , enabled := boolean() + , description => binary() + , created_at := integer() %% epoch in millisecond precision + , from := list(topic()) , is_foreach := boolean() , fields := list() , doeach := term() , incase := term() , conditions := tuple() - , enabled := boolean() - , description => binary() }. --record(rule, - { id :: rule_id() - , created_at :: integer() %% epoch in millisecond precision - , info :: rule_info() - }). - %% Arithmetic operators -define(is_arith(Op), (Op =:= '+' orelse Op =:= '-' orelse @@ -94,5 +89,3 @@ %% Tables -define(RULE_TAB, emqx_rule). - --define(RULE_ENGINE_SHARD, emqx_rule_engine_shard). diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index c96e82ecb..448f63138 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -37,16 +37,7 @@ roots() -> fields("rule_creation") -> [ {"id", sc(binary(), #{desc => "The Id of the rule", nullable => false})} - , {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false})} - , {"outputs", sc(hoconsc:array(hoconsc:union( - [ ref("bridge_output") - , ref("builtin_output") - ])), - #{desc => "The outputs of the rule", - default => []})} - , {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})} - , {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})} - ]; + ] ++ emqx_rule_engine_schema:fields("rules"); fields("rule_test") -> [ {"context", sc(hoconsc:union([ ref("ctx_pub") @@ -62,38 +53,6 @@ fields("rule_test") -> , {"sql", sc(binary(), #{desc => "The SQL of the rule for testing", nullable => false})} ]; -fields("bridge_output") -> - [ {type, bridge} - , {target, sc(binary(), #{desc => "The Channel ID of the bridge"})} - ]; - -fields("builtin_output") -> - [ {type, builtin} - , {target, sc(binary(), #{desc => "The Name of the built-on output"})} - , {args, sc(map(), #{desc => "The arguments of the built-in output", - default => #{}})} - ]; - -%% TODO: how to use this in "builtin_output".args ? -fields("republish_args") -> - [ {topic, sc(binary(), - #{desc => "The target topic of the re-published message." - " Template with with variables is allowed.", - nullable => false})} - , {qos, sc(binary(), - #{desc => "The qos of the re-published message." - " Template with with variables is allowed. Defaults to ${qos}.", - default => <<"${qos}">> })} - , {retain, sc(binary(), - #{desc => "The retain of the re-published message." - " Template with with variables is allowed. Defaults to ${retain}.", - default => <<"${retain}">> })} - , {payload, sc(binary(), - #{desc => "The payload of the re-published message." - " Template with with variables is allowed. Defaults to ${payload}.", - default => <<"${payload}">>})} - ]; - fields("ctx_pub") -> [ {"event_type", sc(message_publish, #{desc => "Event Type", nullable => false})} , {"id", sc(binary(), #{desc => "Message ID"})} diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 04d35931a..244952a14 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -19,21 +19,24 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). +-export([ load_rules/0 + ]). + -export([ create_rule/1 , update_rule/1 , delete_rule/1 ]). --export_type([rule/0]). - --type rule() :: #rule{}. - --define(T_RETRY, 60000). - %%------------------------------------------------------------------------------ %% APIs for rules and resources %%------------------------------------------------------------------------------ +-spec load_rules() -> ok. +load_rules() -> + lists:foreach(fun({Id, Rule}) -> + {ok, _} = create_rule(Rule#{id => Id}) + end, maps:to_list(emqx:get_config([rule_engine, rules], #{}))). + -spec create_rule(map()) -> {ok, rule()} | {error, term()}. create_rule(Params = #{id := RuleId}) -> case emqx_rule_registry:get_rule(RuleId) of @@ -52,9 +55,7 @@ update_rule(Params = #{id := RuleId}) -> delete_rule(RuleId) -> case emqx_rule_registry:get_rule(RuleId) of {ok, Rule} -> - ok = emqx_rule_registry:remove_rule(Rule), - _ = emqx_plugin_libs_rule:cluster_call(emqx_rule_metrics, clear_rule_metrics, [RuleId]), - ok; + emqx_rule_registry:remove_rule(Rule); not_found -> {error, not_found} end. @@ -66,26 +67,23 @@ delete_rule(RuleId) -> do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) -> case emqx_rule_sqlparser:parse(Sql) of {ok, Select} -> - Rule = #rule{ - id = RuleId, - created_at = erlang:system_time(millisecond), - info = #{ - enabled => maps:get(enabled, Params, true), - sql => Sql, - from => emqx_rule_sqlparser:select_from(Select), - outputs => parse_outputs(Outputs), - description => maps:get(description, Params, ""), - %% -- calculated fields: - is_foreach => emqx_rule_sqlparser:select_is_foreach(Select), - fields => emqx_rule_sqlparser:select_fields(Select), - doeach => emqx_rule_sqlparser:select_doeach(Select), - incase => emqx_rule_sqlparser:select_incase(Select), - conditions => emqx_rule_sqlparser:select_where(Select) - %% -- calculated fields end - } + Rule = #{ + id => RuleId, + created_at => erlang:system_time(millisecond), + enabled => maps:get(enabled, Params, true), + sql => Sql, + outputs => parse_outputs(Outputs), + description => maps:get(description, Params, ""), + %% -- calculated fields: + from => emqx_rule_sqlparser:select_from(Select), + is_foreach => emqx_rule_sqlparser:select_is_foreach(Select), + fields => emqx_rule_sqlparser:select_fields(Select), + doeach => emqx_rule_sqlparser:select_doeach(Select), + incase => emqx_rule_sqlparser:select_incase(Select), + conditions => emqx_rule_sqlparser:select_where(Select) + %% -- calculated fields end }, ok = emqx_rule_registry:add_rule(Rule), - _ = emqx_plugin_libs_rule:cluster_call(emqx_rule_metrics, create_rule_metrics, [RuleId]), {ok, Rule}; {error, Reason} -> {error, Reason} end. @@ -93,28 +91,21 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) -> parse_outputs(Outputs) -> [do_parse_outputs(Out) || Out <- Outputs]. -do_parse_outputs(#{type := bridge, target := ChId}) -> - #{type => bridge, target => ChId}; -do_parse_outputs(#{type := builtin, target := Repub, args := Args}) +do_parse_outputs(#{function := Repub, args := Args}) when Repub == republish; Repub == <<"republish">> -> - #{type => builtin, target => republish, args => pre_process_repub_args(Args)}; -do_parse_outputs(#{type := Type, target := Name} = Output) - when Type == func; Type == builtin -> - #{type => Type, target => Name, args => maps:get(args, Output, #{})}. + #{function => republish, args => emqx_rule_outputs:pre_process_repub_args(Args)}; +do_parse_outputs(#{function := Func} = Output) -> + #{function => parse_output_func(Func), args => maps:get(args, Output, #{})}; +do_parse_outputs(BridgeChannelId) when is_binary(BridgeChannelId) -> + BridgeChannelId. -pre_process_repub_args(#{<<"topic">> := Topic} = Args) -> - QoS = maps:get(<<"qos">>, Args, <<"${qos}">>), - Retain = maps:get(<<"retain">>, Args, <<"${retain}">>), - Payload = maps:get(<<"payload">>, Args, <<"${payload}">>), - #{topic => Topic, qos => QoS, payload => Payload, retain => Retain, - preprocessed_tmpl => #{ - topic => emqx_plugin_libs_rule:preproc_tmpl(Topic), - qos => preproc_vars(QoS), - retain => preproc_vars(Retain), - payload => emqx_plugin_libs_rule:preproc_tmpl(Payload) - }}. - -preproc_vars(Data) when is_binary(Data) -> - emqx_plugin_libs_rule:preproc_tmpl(Data); -preproc_vars(Data) -> - Data. +parse_output_func(FuncName) when is_atom(FuncName) -> + FuncName; +parse_output_func(BinFunc) when is_binary(BinFunc) -> + try binary_to_existing_atom(BinFunc) of + Func -> emqx_rule_outputs:assert_builtin_output(Func) + catch + error:badarg -> error({unknown_builtin_function, BinFunc}) + end; +parse_output_func(Func) when is_function(Func) -> + Func. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index c1635b76f..63c00ed62 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -295,13 +295,12 @@ err_msg(Msg) -> format_rule_resp(Rules) when is_list(Rules) -> [format_rule_resp(R) || R <- Rules]; -format_rule_resp(#rule{id = Id, created_at = CreatedAt, - info = #{ - from := Topics, - outputs := Output, - sql := SQL, - enabled := Enabled, - description := Descr}}) -> +format_rule_resp(#{ id := Id, created_at := CreatedAt, + from := Topics, + outputs := Output, + sql := SQL, + enabled := Enabled, + description := Descr}) -> #{id => Id, from => Topics, outputs => format_output(Output), @@ -318,12 +317,15 @@ format_datetime(Timestamp, Unit) -> format_output(Outputs) -> [do_format_output(Out) || Out <- Outputs]. -do_format_output(#{type := func}) -> - #{type => func, target => <<"internal_function">>}; -do_format_output(#{type := builtin, target := Name, args := Args}) -> - #{type => builtin, target => Name, args => maps:remove(preprocessed_tmpl, Args)}; -do_format_output(#{type := bridge, target := Name}) -> - #{type => bridge, target => Name}. +do_format_output(#{function := Func}) when is_function(Func) -> + FunInfo = erlang:fun_info(Func), + FunMod = proplists:get_value(module, FunInfo), + FunName = proplists:get_value(name, FunInfo), + #{function => list_to_binary(lists:concat([FunMod,":",FunName]))}; +do_format_output(#{function := Name, args := Args}) -> + #{function => Name, args => maps:remove(preprocessed_tmpl, Args)}; +do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) -> + BridgeChannelId. get_rule_metrics(Id) -> [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id])) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index e3e959222..4f41ba334 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -25,8 +25,9 @@ -export([stop/1]). start(_Type, _Args) -> - ok = ekka_rlog:wait_for_shards([?RULE_ENGINE_SHARD], infinity), + ets:new(?RULE_TAB, [named_table, public, set, {read_concurrency, true}]), ok = emqx_rule_events:reload(), + ok = emqx_rule_engine:load_rules(), emqx_rule_engine_sup:start_link(). stop(_State) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index 2614fb8b1..197412a9e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -29,4 +29,55 @@ namespace() -> rule_engine. roots() -> ["rule_engine"]. fields("rule_engine") -> - [{ignore_sys_message, hoconsc:mk(boolean(), #{default => true})}]. + [ {ignore_sys_message, sc(boolean(), #{default => true})} + , {rules, sc(hoconsc:map("id", ref("rules")), #{desc => "The rules", default => #{}})} + ]; + +fields("rules") -> + [ {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false})} + , {"outputs", sc(hoconsc:array(hoconsc:union( + [ binary() + , ref("builtin_output_republish") + , ref("builtin_output_console") + ])), + #{desc => "The outputs of the rule. An output can be a string refers to the channel Id " + "of a emqx bridge, or a object refers to a built-in function.", + default => []})} + , {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})} + , {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})} + ]; + +fields("builtin_output_republish") -> + [ {function, sc(republish, #{desc => "Republish the message as a new MQTT message"})} + , {args, sc(ref("republish_args"), #{desc => "The arguments of the built-in 'republish' output", + default => #{}})} + ]; + +fields("builtin_output_console") -> + [ {function, sc(console, #{desc => "Print the outputs to the console"})} + %% we may support some args for the console output in the future + %, {args, sc(map(), #{desc => "The arguments of the built-in 'console' output", + % default => #{}})} + ]; + +fields("republish_args") -> + [ {topic, sc(binary(), + #{desc => "The target topic of the re-published message." + " Template with with variables is allowed.", + nullable => false})} + , {qos, sc(binary(), + #{desc => "The qos of the re-published message." + " Template with with variables is allowed. Defaults to ${qos}.", + default => <<"${qos}">> })} + , {retain, sc(binary(), + #{desc => "The retain of the re-published message." + " Template with with variables is allowed. Defaults to ${retain}.", + default => <<"${retain}">> })} + , {payload, sc(binary(), + #{desc => "The payload of the re-published message." + " Template with with variables is allowed. Defaults to ${payload}.", + default => <<"${payload}">>})} + ]. + +sc(Type, Meta) -> hoconsc:mk(Type, Meta). +ref(Field) -> hoconsc:ref(?MODULE, Field). diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index d030917ef..a2f5439b2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -64,7 +64,7 @@ -endif. reload() -> - emqx_rule_registry:load_hooks_for_rule(emqx_rule_registry:get_rules()). + emqx_rule_registry:load_hooks_for_rules(emqx_rule_registry:get_rules()). load(<<"$bridges/", _ChannelId/binary>> = BridgeTopic) -> emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received, diff --git a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl index 1571561b7..021f72ca6 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl @@ -19,10 +19,25 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx.hrl"). +-define(OUTPUT_FUNCS, + [ console + , republish + ]). + -export([ console/3 , republish/3 ]). +-export([ pre_process_repub_args/1 + , assert_builtin_output/1 + ]). + +assert_builtin_output(FuncName) -> + case lists:member(FuncName, ?OUTPUT_FUNCS) of + true -> FuncName; + false -> error({unknown_builtin_function, FuncName}) + end. + -spec console(map(), map(), map()) -> any(). console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) -> ?ULOG("[rule output] ~ts~n" @@ -75,6 +90,23 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload) -> _ = emqx_broker:safe_publish(Msg), emqx_metrics:inc_msg(Msg). +pre_process_repub_args(#{<<"topic">> := Topic} = Args) -> + QoS = maps:get(<<"qos">>, Args, <<"${qos}">>), + Retain = maps:get(<<"retain">>, Args, <<"${retain}">>), + Payload = maps:get(<<"payload">>, Args, <<"${payload}">>), + #{topic => Topic, qos => QoS, payload => Payload, retain => Retain, + preprocessed_tmpl => #{ + topic => emqx_plugin_libs_rule:preproc_tmpl(Topic), + qos => preproc_vars(QoS), + retain => preproc_vars(Retain), + payload => emqx_plugin_libs_rule:preproc_tmpl(Payload) + }}. + +preproc_vars(Data) when is_binary(Data) -> + emqx_plugin_libs_rule:preproc_tmpl(Data); +preproc_vars(Data) -> + Data. + replace_simple_var(Tokens, Data) when is_list(Tokens) -> [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), Var; diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index aa2c97c76..ae7302673 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -36,12 +36,15 @@ , remove_rules/1 ]). --export([ load_hooks_for_rule/1 - , unload_hooks_for_rule/1 +-export([ do_remove_rules/1 + , do_add_rules/1 ]). -%% for debug purposes --export([dump/0]). +-export([ load_hooks_for_rules/1 + , unload_hooks_for_rule/1 + , add_metrics_for_rules/1 + , clear_metrics_for_rules/1 + ]). %% gen_server Callbacks -export([ init/1 @@ -52,39 +55,10 @@ , code_change/3 ]). -%% Mnesia bootstrap --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - -define(REGISTRY, ?MODULE). -define(T_CALL, 10000). -%%------------------------------------------------------------------------------ -%% Mnesia bootstrap -%%------------------------------------------------------------------------------ - -%% @doc Create or replicate tables. --spec(mnesia(boot | copy) -> ok). -mnesia(boot) -> - %% Optimize storage - StoreProps = [{ets, [{read_concurrency, true}]}], - %% Rule table - ok = ekka_mnesia:create_table(?RULE_TAB, [ - {rlog_shard, ?RULE_ENGINE_SHARD}, - {disc_copies, [node()]}, - {record_name, rule}, - {attributes, record_info(fields, rule)}, - {storage_properties, StoreProps}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?RULE_TAB, disc_copies). - -dump() -> - ?ULOG("Rules: ~p~n", [ets:tab2list(?RULE_TAB)]). - %%------------------------------------------------------------------------------ %% Start the registry %%------------------------------------------------------------------------------ @@ -97,90 +71,102 @@ start_link() -> %% Rule Management %%------------------------------------------------------------------------------ --spec(get_rules() -> list(emqx_rule_engine:rule())). +-spec(get_rules() -> [rule()]). get_rules() -> get_all_records(?RULE_TAB). get_rules_ordered_by_ts() -> - F = fun() -> - Query = qlc:q([E || E <- mnesia:table(?RULE_TAB)]), - qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}])) - end, - {atomic, List} = ekka_mnesia:transaction(?RULE_ENGINE_SHARD, F), - List. + lists:sort(fun(#{created_at := CreatedA}, #{created_at := CreatedB}) -> + CreatedA =< CreatedB + end, get_rules()). --spec(get_rules_for_topic(Topic :: binary()) -> list(emqx_rule_engine:rule())). +-spec(get_rules_for_topic(Topic :: binary()) -> [rule()]). get_rules_for_topic(Topic) -> - [Rule || Rule = #rule{info = #{from := From}} <- get_rules(), + [Rule || Rule = #{from := From} <- get_rules(), emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From)]. --spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())). +-spec(get_rules_with_same_event(Topic :: binary()) -> [rule()]). get_rules_with_same_event(Topic) -> EventName = emqx_rule_events:event_name(Topic), - [Rule || Rule = #rule{info = #{from := From}} <- get_rules(), + [Rule || Rule = #{from := From} <- get_rules(), lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)]. is_of_event_name(EventName, Topic) -> EventName =:= emqx_rule_events:event_name(Topic). --spec(get_rule(Id :: rule_id()) -> {ok, emqx_rule_engine:rule()} | not_found). +-spec(get_rule(Id :: rule_id()) -> {ok, rule()} | not_found). get_rule(Id) -> - case mnesia:dirty_read(?RULE_TAB, Id) of - [Rule] -> {ok, Rule}; + case ets:lookup(?RULE_TAB, Id) of + [{Id, Rule}] -> {ok, Rule#{id => Id}}; [] -> not_found end. --spec(add_rule(emqx_rule_engine:rule()) -> ok). -add_rule(Rule) when is_record(Rule, rule) -> +-spec(add_rule(rule()) -> ok). +add_rule(Rule) -> add_rules([Rule]). --spec(add_rules(list(emqx_rule_engine:rule())) -> ok). +-spec(add_rules([rule()]) -> ok). add_rules(Rules) -> gen_server:call(?REGISTRY, {add_rules, Rules}, ?T_CALL). --spec(remove_rule(emqx_rule_engine:rule() | rule_id()) -> ok). +-spec(remove_rule(rule() | rule_id()) -> ok). remove_rule(RuleOrId) -> remove_rules([RuleOrId]). --spec(remove_rules(list(emqx_rule_engine:rule()) | list(rule_id())) -> ok). +-spec(remove_rules([rule()] | list(rule_id())) -> ok). remove_rules(Rules) -> gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL). %% @private -insert_rules([]) -> ok; -insert_rules(Rules) -> - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, load_hooks_for_rule, [Rules]), - [mnesia:write(?RULE_TAB, Rule, write) ||Rule <- Rules]. +do_add_rules([]) -> ok; +do_add_rules(Rules) -> + load_hooks_for_rules(Rules), + add_metrics_for_rules(Rules), + ets:insert(?RULE_TAB, [{Id, maps:remove(id, R)} || #{id := Id} = R <- Rules]), + ok. %% @private -delete_rules([]) -> ok; -delete_rules(Rules = [R|_]) when is_binary(R) -> +do_remove_rules([]) -> ok; +do_remove_rules(RuleIds = [Id|_]) when is_binary(Id) -> RuleRecs = lists:foldl(fun(RuleId, Acc) -> case get_rule(RuleId) of {ok, Rule} -> [Rule|Acc]; not_found -> Acc end - end, [], Rules), - delete_rules_unload_hooks(RuleRecs); -delete_rules(Rules = [Rule|_]) when is_record(Rule, rule) -> - delete_rules_unload_hooks(Rules). + end, [], RuleIds), + remove_rules_unload_hooks(RuleRecs); +do_remove_rules(Rules = [Rule|_]) when is_map(Rule) -> + remove_rules_unload_hooks(Rules). -delete_rules_unload_hooks(Rules) -> - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, unload_hooks_for_rule, [Rules]), - [mnesia:delete_object(?RULE_TAB, Rule, write) ||Rule <- Rules]. +remove_rules_unload_hooks(Rules) -> + unload_hooks_for_rule(Rules), + clear_metrics_for_rules(Rules), + lists:foreach(fun(#{id := Id}) -> + ets:delete(?RULE_TAB, Id) + end, Rules). -load_hooks_for_rule(Rules) -> - lists:foreach(fun(#rule{info = #{from := Topics}}) -> +load_hooks_for_rules(Rules) -> + lists:foreach(fun(#{from := Topics}) -> lists:foreach(fun emqx_rule_events:load/1, Topics) end, Rules). +add_metrics_for_rules(Rules) -> + lists:foreach(fun(#{id := Id}) -> + ok = emqx_rule_metrics:create_rule_metrics(Id) + end, Rules). + +clear_metrics_for_rules(Rules) -> + lists:foreach(fun(#{id := Id}) -> + ok = emqx_rule_metrics:clear_rule_metrics(Id) + end, Rules). + unload_hooks_for_rule(Rules) -> - lists:foreach(fun(#rule{id = Id, info = #{from := Topics}}) -> + lists:foreach(fun(#{id := Id, from := Topics}) -> lists:foreach(fun(Topic) -> case get_rules_with_same_event(Topic) of - [#rule{id = Id0}] when Id0 == Id -> %% we are now deleting the last rule + [#{id := Id0}] when Id0 == Id -> %% we are now deleting the last rule emqx_rule_events:unload(Topic); _ -> ok end @@ -197,11 +183,11 @@ init([]) -> {ok, #{}}. handle_call({add_rules, Rules}, _From, State) -> - trans(fun insert_rules/1, [Rules]), + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_add_rules, [Rules]), {reply, ok, State}; handle_call({remove_rules, Rules}, _From, State) -> - trans(fun delete_rules/1, [Rules]), + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_remove_rules, [Rules]), {reply, ok, State}; handle_call(Req, _From, State) -> @@ -227,19 +213,4 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ get_all_records(Tab) -> - %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)). - %% Wrapping ets to a transaction to avoid reading inconsistent - %% ( nest cluster_call transaction, no a r/o transaction) - %% data during shard bootstrap - {atomic, Ret} = - ekka_mnesia:transaction(?RULE_ENGINE_SHARD, - fun() -> - ets:tab2list(Tab) - end), - Ret. - -trans(Fun, Args) -> - case ekka_mnesia:transaction(?RULE_ENGINE_SHARD, Fun, Args) of - {atomic, Result} -> Result; - {aborted, Reason} -> error(Reason) - end. + [Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 1e2ab2536..5b460456e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -45,12 +45,12 @@ %%------------------------------------------------------------------------------ %% Apply rules %%------------------------------------------------------------------------------ --spec(apply_rules(list(emqx_rule_engine:rule()), input()) -> ok). +-spec(apply_rules(list(rule()), input()) -> ok). apply_rules([], _Input) -> ok; -apply_rules([#rule{info = #{enabled := false}}|More], Input) -> +apply_rules([#{enabled := false}|More], Input) -> apply_rules(More, Input); -apply_rules([Rule = #rule{id = RuleID}|More], Input) -> +apply_rules([Rule = #{id := RuleID}|More], Input) -> try apply_rule_discard_result(Rule, Input) catch %% ignore the errors if select or match failed @@ -80,18 +80,19 @@ apply_rule_discard_result(Rule, Input) -> _ = apply_rule(Rule, Input), ok. -apply_rule(Rule = #rule{id = RuleID}, Input) -> +apply_rule(Rule = #{id := RuleID}, Input) -> clear_rule_payload(), do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). -do_apply_rule(#rule{id = RuleId, info = #{ +do_apply_rule(#{ + id := RuleId, is_foreach := true, fields := Fields, doeach := DoEach, incase := InCase, conditions := Conditions, outputs := Outputs - }}, Input) -> + }, Input) -> {Selected, Collection} = ?RAISE(select_and_collect(Fields, Input), {select_and_collect_error, {_EXCLASS_,_EXCPTION_,_ST_}}), ColumnsAndSelected = maps:merge(Input, Selected), @@ -105,12 +106,12 @@ do_apply_rule(#rule{id = RuleId, info = #{ {error, nomatch} end; -do_apply_rule(#rule{id = RuleId, info = #{ - is_foreach := false, - fields := Fields, - conditions := Conditions, - outputs := Outputs - }}, Input) -> +do_apply_rule(#{id := RuleId, + is_foreach := false, + fields := Fields, + conditions := Conditions, + outputs := Outputs + }, Input) -> Selected = ?RAISE(select_and_transform(Fields, Input), {select_and_transform_error, {_EXCLASS_,_EXCPTION_,_ST_}}), case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)), @@ -246,25 +247,14 @@ handle_output(OutId, Selected, Envs) -> }) end. -do_handle_output(#{type := bridge, target := ChannelId}, Selected, _Envs) -> +do_handle_output(ChannelId, Selected, _Envs) when is_binary(ChannelId) -> ?SLOG(debug, #{msg => "output to bridge", channel_id => ChannelId}), emqx_bridge:send_message(ChannelId, Selected); -do_handle_output(#{type := func, target := Func} = Out, Selected, Envs) -> +do_handle_output(#{function := Func} = Out, Selected, Envs) when is_function(Func) -> erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]); -do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs) - when is_atom(Output) -> - handle_builtin_output(Output, Selected, Envs, maps:get(args, Out, #{})); -do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs) - when is_binary(Output) -> - try binary_to_existing_atom(Output) of - Func -> handle_builtin_output(Func, Selected, Envs, maps:get(args, Out, #{})) - catch - error:badarg -> error(not_found) - end. - -handle_builtin_output(Func, Selected, Envs, Args) -> +do_handle_output(#{function := Func} = Out, Selected, Envs) when is_atom(Func) -> case erlang:function_exported(emqx_rule_outputs, Func, 3) of - true -> erlang:apply(emqx_rule_outputs, Func, [Selected, Envs, Args]); + true -> erlang:apply(emqx_rule_outputs, Func, [Selected, Envs, maps:get(args, Out, #{})]); false -> error(not_found) end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index ec263b35a..941c82cda 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -42,20 +42,18 @@ test(#{sql := Sql, context := Context}) -> test_rule(Sql, Select, Context, EventTopics) -> RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]), ok = emqx_rule_metrics:create_rule_metrics(RuleId), - Rule = #rule{ - id = RuleId, - info = #{ - sql => Sql, - from => EventTopics, - outputs => [#{type => func, target => fun ?MODULE:get_selected_data/3, args => #{}}], - enabled => true, - is_foreach => emqx_rule_sqlparser:select_is_foreach(Select), - fields => emqx_rule_sqlparser:select_fields(Select), - doeach => emqx_rule_sqlparser:select_doeach(Select), - incase => emqx_rule_sqlparser:select_incase(Select), - conditions => emqx_rule_sqlparser:select_where(Select) - }, - created_at = erlang:system_time(millisecond) + Rule = #{ + id => RuleId, + sql => Sql, + from => EventTopics, + outputs => [#{function => fun ?MODULE:get_selected_data/3, args => #{}}], + enabled => true, + is_foreach => emqx_rule_sqlparser:select_is_foreach(Select), + fields => emqx_rule_sqlparser:select_fields(Select), + doeach => emqx_rule_sqlparser:select_doeach(Select), + incase => emqx_rule_sqlparser:select_incase(Select), + conditions => emqx_rule_sqlparser:select_where(Select), + created_at => erlang:system_time(millisecond) }, FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)), try diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 45b30223a..348d6055c 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -30,7 +30,6 @@ all() -> [ {group, engine} - , {group, api} , {group, funcs} , {group, registry} , {group, runtime} @@ -45,9 +44,6 @@ groups() -> [{engine, [sequence], [t_create_rule ]}, - {api, [], - [t_crud_rule_api - ]}, {funcs, [], [t_kv_store ]}, @@ -108,8 +104,6 @@ groups() -> init_per_suite(Config) -> application:load(emqx_machine), - ok = ekka_mnesia:start(), - ok = emqx_rule_registry:mnesia(boot), ok = emqx_ct_helpers:start_apps([emqx_rule_engine]), Config. @@ -155,12 +149,12 @@ init_per_testcase(t_events, Config) -> #{id => <<"rule:t_events">>, sql => SQL, outputs => [ - #{type => builtin, target => console}, - #{type => func, target => fun ?MODULE:output_record_triggered_events/3, + #{function => console}, + #{function => fun ?MODULE:output_record_triggered_events/3, args => #{}} ], description => <<"to console and record triggered events">>}), - ?assertMatch(#rule{id = <<"rule:t_events">>}, Rule), + ?assertMatch(#{id := <<"rule:t_events">>}, Rule), [{hook_points_rules, Rule} | Config]; init_per_testcase(_TestCase, Config) -> emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), @@ -176,59 +170,17 @@ end_per_testcase(_TestCase, _Config) -> %% Test cases for rule engine %%------------------------------------------------------------------------------ t_create_rule(_Config) -> - {ok, #rule{id = Id}} = emqx_rule_engine:create_rule( + {ok, #{id := Id}} = emqx_rule_engine:create_rule( #{sql => <<"select * from \"t/a\"">>, id => <<"t_create_rule">>, - outputs => [#{type => builtin, target => console}], + outputs => [#{function => console}], description => <<"debug rule">>}), ct:pal("======== emqx_rule_registry:get_rules :~p", [emqx_rule_registry:get_rules()]), - ?assertMatch({ok, #rule{id = Id, info = #{from := [<<"t/a">>]}}}, + ?assertMatch({ok, #{id := Id, from := [<<"t/a">>]}}, emqx_rule_registry:get_rule(Id)), emqx_rule_registry:remove_rule(Id), ok. -%%------------------------------------------------------------------------------ -%% Test cases for rule engine api -%%------------------------------------------------------------------------------ - -t_crud_rule_api(_Config) -> - RuleID = <<"my_rule">>, - Params0 = #{ - <<"description">> => <<"A simple rule">>, - <<"enable">> => true, - <<"id">> => RuleID, - <<"outputs">> => [#{<<"type">> => <<"builtin">>, <<"target">> => <<"console">>}], - <<"sql">> => <<"SELECT * from \"t/1\"">> - }, - {201, Rule} = emqx_rule_engine_api:crud_rules(post, #{body => Params0}), - - ?assertEqual(RuleID, maps:get(id, Rule)), - {200, Rules} = emqx_rule_engine_api:crud_rules(get, #{}), - ct:pal("RList : ~p", [Rules]), - ?assert(length(Rules) > 0), - - {200, Rule1} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}), - ct:pal("RShow : ~p", [Rule1]), - ?assertEqual(Rule, Rule1), - - {200, Rule2} = emqx_rule_engine_api:crud_rules_by_id(put, #{ - bindings => #{id => RuleID}, - body => Params0#{<<"sql">> => <<"select * from \"t/b\"">>} - }), - - {200, Rule3} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}), - %ct:pal("RShow : ~p", [Rule3]), - ?assertEqual(Rule3, Rule2), - ?assertEqual(<<"select * from \"t/b\"">>, maps:get(sql, Rule3)), - - ?assertMatch({200}, emqx_rule_engine_api:crud_rules_by_id(delete, - #{bindings => #{id => RuleID}})), - - %ct:pal("Show After Deleted: ~p", [NotFound]), - ?assertMatch({404, #{code := _, message := _Message}}, - emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}})), - ok. - %%------------------------------------------------------------------------------ %% Test cases for rule funcs %%------------------------------------------------------------------------------ @@ -248,14 +200,14 @@ t_kv_store(_) -> t_add_get_remove_rule(_Config) -> RuleId0 = <<"rule-debug-0">>, ok = emqx_rule_registry:add_rule(make_simple_rule(RuleId0)), - ?assertMatch({ok, #rule{id = RuleId0}}, emqx_rule_registry:get_rule(RuleId0)), + ?assertMatch({ok, #{id := RuleId0}}, emqx_rule_registry:get_rule(RuleId0)), ok = emqx_rule_registry:remove_rule(RuleId0), ?assertEqual(not_found, emqx_rule_registry:get_rule(RuleId0)), RuleId1 = <<"rule-debug-1">>, Rule1 = make_simple_rule(RuleId1), ok = emqx_rule_registry:add_rule(Rule1), - ?assertMatch({ok, #rule{id = RuleId1}}, emqx_rule_registry:get_rule(RuleId1)), + ?assertMatch({ok, #{id := RuleId1}}, emqx_rule_registry:get_rule(RuleId1)), ok = emqx_rule_registry:remove_rule(Rule1), ?assertEqual(not_found, emqx_rule_registry:get_rule(RuleId1)), ok. @@ -282,9 +234,9 @@ t_create_existing_rule(_Config) -> {ok, _} = emqx_rule_engine:create_rule( #{id => <<"an_existing_rule">>, sql => <<"select * from \"t/#\"">>, - outputs => [#{type => builtin, target => console}] + outputs => [#{function => console}] }), - {ok, #rule{info = #{sql := SQL}}} = emqx_rule_registry:get_rule(<<"an_existing_rule">>), + {ok, #{sql := SQL}} = emqx_rule_registry:get_rule(<<"an_existing_rule">>), ?assertEqual(<<"select * from \"t/#\"">>, SQL), ok = emqx_rule_engine:delete_rule(<<"an_existing_rule">>), @@ -308,9 +260,9 @@ t_get_rules_ordered_by_ts(_Config) -> make_simple_rule_with_ts(<<"rule-debug-2">>, Now()) ]), ?assertMatch([ - #rule{id = <<"rule-debug-0">>}, - #rule{id = <<"rule-debug-1">>}, - #rule{id = <<"rule-debug-2">>} + #{id := <<"rule-debug-0">>}, + #{id := <<"rule-debug-1">>}, + #{id := <<"rule-debug-2">>} ], emqx_rule_registry:get_rules_ordered_by_ts()). t_get_rules_for_topic_2(_Config) -> @@ -1349,7 +1301,7 @@ t_sqlparse_nested_get(_Config) -> republish_output(Topic) -> republish_output(Topic, <<"${payload}">>). republish_output(Topic, Payload) -> - #{type => builtin, target => republish, + #{function => republish, args => #{<<"payload">> => Payload, <<"topic">> => Topic, <<"qos">> => 0}}. make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) -> @@ -1366,18 +1318,16 @@ make_simple_rule(RuleId, SQL, Topics) when is_binary(RuleId) -> make_simple_rule(RuleId, SQL, Topics, erlang:system_time(millisecond)). make_simple_rule(RuleId, SQL, Topics, Ts) when is_binary(RuleId) -> - #rule{ - id = RuleId, - info = #{ - sql => SQL, - from => Topics, - fields => [<<"*">>], - is_foreach => false, - conditions => {}, - ouputs => [#{type => builtin, target => console}], - description => <<"simple rule">> - }, - created_at = Ts + #{ + id => RuleId, + sql => SQL, + from => Topics, + fields => [<<"*">>], + is_foreach => false, + conditions => {}, + ouputs => [#{function => console}], + description => <<"simple rule">>, + created_at => Ts }. output_record_triggered_events(Data = #{event := EventName}, _Envs, _Args) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl new file mode 100644 index 000000000..0e679f0ca --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -0,0 +1,64 @@ +-module(emqx_rule_engine_api_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + application:load(emqx_machine), + ok = emqx_ct_helpers:start_apps([emqx_rule_engine]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([emqx_rule_engine]), + ok. + +init_per_testcase(_, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), + Config. + +end_per_testcase(_, _Config) -> + ok. + +t_crud_rule_api(_Config) -> + RuleID = <<"my_rule">>, + Params0 = #{ + <<"description">> => <<"A simple rule">>, + <<"enable">> => true, + <<"id">> => RuleID, + <<"outputs">> => [#{<<"function">> => <<"console">>}], + <<"sql">> => <<"SELECT * from \"t/1\"">> + }, + {201, Rule} = emqx_rule_engine_api:crud_rules(post, #{body => Params0}), + + ?assertEqual(RuleID, maps:get(id, Rule)), + {200, Rules} = emqx_rule_engine_api:crud_rules(get, #{}), + ct:pal("RList : ~p", [Rules]), + ?assert(length(Rules) > 0), + + {200, Rule1} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}), + ct:pal("RShow : ~p", [Rule1]), + ?assertEqual(Rule, Rule1), + + {200, Rule2} = emqx_rule_engine_api:crud_rules_by_id(put, #{ + bindings => #{id => RuleID}, + body => Params0#{<<"sql">> => <<"select * from \"t/b\"">>} + }), + + {200, Rule3} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}), + %ct:pal("RShow : ~p", [Rule3]), + ?assertEqual(Rule3, Rule2), + ?assertEqual(<<"select * from \"t/b\"">>, maps:get(sql, Rule3)), + + ?assertMatch({200}, emqx_rule_engine_api:crud_rules_by_id(delete, + #{bindings => #{id => RuleID}})), + + %ct:pal("Show After Deleted: ~p", [NotFound]), + ?assertMatch({404, #{code := _, message := _Message}}, + emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}})), + ok.