From 420ccf0f51ed10e957fa77234bf85805aa4f9da9 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sun, 26 Sep 2021 14:55:19 +0800 Subject: [PATCH] refactor(rules): change republish as an output --- apps/emqx_rule_engine/include/rule_engine.hrl | 13 +++- .../src/emqx_rule_api_schema.erl | 40 +++++++++++- .../emqx_rule_engine/src/emqx_rule_engine.erl | 30 ++++++++- .../src/emqx_rule_engine_api.erl | 35 ++++++++-- apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 18 ----- .../src/emqx_rule_outputs.erl | 62 ++++++++++++++++-- .../src/emqx_rule_runtime.erl | 26 ++++---- .../src/emqx_rule_sqltester.erl | 6 +- .../test/emqx_rule_engine_SUITE.erl | 65 ++++++++++++------- 9 files changed, 223 insertions(+), 72 deletions(-) diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 29d21b7cc..b46d9149c 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -31,10 +31,21 @@ -type(topic() :: binary()). -type(bridge_channel_id() :: binary()). +-type selected_data() :: map(). +-type envs() :: map(). +-type output_type() :: bridge | builtin | func. +-type output_target() :: bridge_channel_id() | atom() | output_fun(). +-type output_fun_args() :: map(). +-type output() :: #{ + type := output_type(), + target := output_target(), + args => output_fun_args() +}. +-type output_fun() :: fun((selected_data(), envs(), output_fun_args()) -> any()). -type(rule_info() :: #{ from := list(topic()) - , to := list(bridge_channel_id() | fun()) + , outputs := [output()] , sql := binary() , is_foreach := boolean() , fields := list() diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 051624b4a..9a78f27d4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -34,8 +34,12 @@ roots() -> fields("rule_creation") -> [ {"id", sc(binary(), #{desc => "The Id of the rule", nullable => false})} , {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false})} - , {"outputs", sc(hoconsc:array(binary()), - #{desc => "The outputs of the rule", default => [<<"console">>]})} + , {"outputs", sc(hoconsc:array(hoconsc:union( + [ ref("bridge_output") + , ref("builtin_output") + ])), + #{desc => "The outputs of the rule", + default => []})} , {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})} , {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})} ]; @@ -54,6 +58,38 @@ fields("rule_test") -> , {"sql", sc(binary(), #{desc => "The SQL of the rule for testing", nullable => false})} ]; +fields("bridge_output") -> + [ {type, bridge} + , {target, sc(binary(), #{desc => "The Channel ID of the bridge"})} + ]; + +fields("builtin_output") -> + [ {type, builtin} + , {target, sc(binary(), #{desc => "The Name of the built-on output"})} + , {args, sc(map(), #{desc => "The arguments of the built-in output", + default => #{}})} + ]; + +%% TODO: how to use this in "builtin_output".args ? +fields("republish_args") -> + [ {topic, sc(binary(), + #{desc => "The target topic of the re-published message." + " Template with with variables is allowed.", + nullable => false})} + , {qos, sc(binary(), + #{desc => "The qos of the re-published message." + " Template with with variables is allowed. Defaults to ${qos}.", + default => <<"${qos}">> })} + , {retain, sc(binary(), + #{desc => "The retain of the re-published message." + " Template with with variables is allowed. Defaults to ${retain}.", + default => <<"${retain}">> })} + , {payload, sc(binary(), + #{desc => "The payload of the re-published message." + " Template with with variables is allowed. Defaults to ${payload}.", + default => <<"${payload}">>})} + ]; + fields("ctx_pub") -> [ {"event_type", sc(message_publish, #{desc => "Event Type", nullable => false})} , {"id", sc(binary(), #{desc => "Message ID"})} diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 3775b5e4d..707513d0c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -72,7 +72,7 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) -> enabled => maps:get(enabled, Params, true), sql => Sql, from => emqx_rule_sqlparser:select_from(Select), - outputs => Outputs, + outputs => parse_outputs(Outputs), description => maps:get(description, Params, ""), %% -- calculated fields: is_foreach => emqx_rule_sqlparser:select_is_foreach(Select), @@ -88,3 +88,31 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) -> {ok, Rule}; Reason -> {error, Reason} end. + +parse_outputs(Outputs) -> + [do_parse_outputs(Out) || Out <- Outputs]. + +do_parse_outputs(#{type := bridge, target := ChId}) -> + #{type => bridge, target => ChId}; +do_parse_outputs(#{type := builtin, target := Repub, args := Args}) + when Repub == republish; Repub == <<"republish">> -> + #{type => builtin, target => republish, args => pre_process_repub_args(Args)}; +do_parse_outputs(#{type := builtin, target := Name} = Output) -> + #{type => builtin, target => Name, args => maps:get(args, Output, #{})}. + +pre_process_repub_args(#{<<"topic">> := Topic} = Args) -> + QoS = maps:get(<<"qos">>, Args, <<"${qos}">>), + Retain = maps:get(<<"retain">>, Args, <<"${retain}">>), + Payload = maps:get(<<"payload">>, Args, <<"${payload}">>), + #{topic => Topic, qos => QoS, payload => Payload, retain => Retain, + preprocessed_tmpl => #{ + topic => emqx_plugin_libs_rule:preproc_tmpl(Topic), + qos => preproc_vars(QoS), + retain => preproc_vars(Retain), + payload => emqx_plugin_libs_rule:preproc_tmpl(Payload) + }}. + +preproc_vars(Data) when is_binary(Data) -> + emqx_plugin_libs_rule:preproc_tmpl(Data); +preproc_vars(Data) -> + Data. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index b1cb7b778..337c07cb0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -141,8 +141,21 @@ put_req_schema() -> description => <<"The outputs of the rule">>, type => array, items => #{ - type => string, - example => <<"console">> + type => object, + properties => #{ + type => #{ + type => string, + enum => [<<"bridge">>, <<"builtin">>], + example => <<"builtin">> + }, + target => #{ + type => string, + example => <<"console">> + }, + args => #{ + type => object + } + } } }, description => #{ @@ -190,9 +203,9 @@ rule_test_req_schema() -> event_type => #{ description => <<"Event Type">>, type => string, - enum => ["message_publish", "message_acked", "message_delivered", - "message_dropped", "session_subscribed", "session_unsubscribed", - "client_connected", "client_disconnected"], + enum => [<<"message_publish">>, <<"message_acked">>, <<"message_delivered">>, + <<"message_dropped">>, <<"session_subscribed">>, <<"session_unsubscribed">>, + <<"client_connected">>, <<"client_disconnected">>], example => <<"message_publish">> }, clientid => #{ @@ -295,7 +308,7 @@ format_rule_resp(#rule{id = Id, created_at = CreatedAt, description := Descr}}) -> #{id => Id, from => Topics, - outputs => Output, + outputs => format_output(Output), sql => SQL, metrics => get_rule_metrics(Id), enabled => Enabled, @@ -306,6 +319,16 @@ format_rule_resp(#rule{id = Id, created_at = CreatedAt, format_datetime(Timestamp, Unit) -> list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])). +format_output(Outputs) -> + [do_format_output(Out) || Out <- Outputs]. + +do_format_output(#{type := func}) -> + #{type => builtin, target => <<"internal_function">>}; +do_format_output(#{type := builtin, target := Name, args := Args}) -> + #{type => builtin, target => Name, args => maps:remove(preprocessed_tmpl, Args)}; +do_format_output(#{type := bridge, target := Name}) -> + #{type => bridge, target => Name}. + get_rule_metrics(Id) -> [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id])) || Node <- ekka_mnesia:running_nodes()]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index bc3ef6f24..fd922db86 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -38,8 +38,6 @@ , contains_topic_match/2 , contains_topic_match/3 , null/0 - , republish/3 - , republish/4 ]). %% Arithmetic Funcs @@ -309,22 +307,6 @@ find_topic_filter(Filter, TopicFilters, Func) -> null() -> undefined. -republish(Topic, Payload, Qos) -> - republish(Topic, Payload, Qos, false). - -republish(Topic, Payload, Qos, Retain) -> - Msg = #message{ - id = emqx_guid:gen(), - qos = Qos, - from = republish_function, - flags = #{retain => Retain}, - headers = #{}, - topic = Topic, - payload = Payload, - timestamp = erlang:system_time(millisecond) - }, - emqx_broker:safe_publish(Msg). - %%------------------------------------------------------------------------------ %% Arithmetic Funcs %%------------------------------------------------------------------------------ diff --git a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl index 6f8e3908e..e322aba9b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl @@ -17,16 +17,66 @@ %% Define the default actions. -module(emqx_rule_outputs). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx.hrl"). --export([ console/2 - , get_selected_data/2 +-export([ console/3 + , republish/3 ]). --spec console(map(), map()) -> any(). -console(Selected, #{metadata := #{rule_id := RuleId}} = Envs) -> +-spec console(map(), map(), map()) -> any(). +console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) -> ?ULOG("[rule output] ~s~n" "\tOutput Data: ~p~n" "\tEnvs: ~p~n", [RuleId, Selected, Envs]). -get_selected_data(Selected, _Envs) -> - Selected. +republish(_Selected, #{topic := Topic, headers := #{republish_by := RuleId}, + metadata := #{rule_id := RuleId}}, _Args) -> + ?LOG(error, "[republish] recursively republish detected, msg topic: ~p", [Topic]); + +%% republish a PUBLISH message +republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}}, + #{preprocessed_tmpl := #{ + qos := QoSTks, + retain := RetainTks, + topic := TopicTks, + payload := PayloadTks}}) -> + Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected), + Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), + QoS = replace_simple_var(QoSTks, Selected), + Retain = replace_simple_var(RetainTks, Selected), + ?LOG(debug, "[republish] to: ~p, payload: ~p", [Topic, Payload]), + safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload); + +%% in case this is a "$events/" event +republish(Selected, #{metadata := #{rule_id := RuleId}}, + #{preprocessed_tmpl := #{ + qos := QoSTks, + retain := RetainTks, + topic := TopicTks, + payload := PayloadTks}}) -> + Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected), + Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), + QoS = replace_simple_var(QoSTks, Selected), + Retain = replace_simple_var(RetainTks, Selected), + ?LOG(debug, "[republish] to: ~p, payload: ~p", [Topic, Payload]), + safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload). + +safe_publish(RuleId, Topic, QoS, Flags, Payload) -> + Msg = #message{ + id = emqx_guid:gen(), + qos = QoS, + from = RuleId, + flags = Flags, + headers = #{republish_by => RuleId}, + topic = Topic, + payload = Payload, + timestamp = erlang:system_time(millisecond) + }, + _ = emqx_broker:safe_publish(Msg), + emqx_metrics:inc_msg(Msg). + +replace_simple_var(Tokens, Data) when is_list(Tokens) -> + [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), + Var; +replace_simple_var(Val, _Data) -> + Val. \ No newline at end of file diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 1da870816..9d8c9eece 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -238,22 +238,24 @@ handle_output(OutId, Selected, Envs) -> ?LOG(warning, "Output to ~p failed, ~p", [OutId, {Err, Reason, ST}]) end. -do_handle_output(<<"bridge:", _/binary>> = _ChannelId, _Selected, _Envs) -> - ?LOG(warning, "calling bridge from rules has not been implemented yet!"); -do_handle_output(OutputFun, Selected, Envs) when is_function(OutputFun) -> - erlang:apply(OutputFun, [Selected, Envs]); -do_handle_output(BuiltInOutput, Selected, Envs) when is_atom(BuiltInOutput) -> - handle_builtin_output(BuiltInOutput, Selected, Envs); -do_handle_output(BuiltInOutput, Selected, Envs) when is_binary(BuiltInOutput) -> - try binary_to_existing_atom(BuiltInOutput) of - Func -> handle_builtin_output(Func, Selected, Envs) +do_handle_output(#{type := bridge, target := ChannelId}, _Selected, _Envs) -> + ?LOG(warning, "calling bridge from rules has not been implemented yet! ~p", [ChannelId]); +do_handle_output(#{type := func, target := Func} = Out, Selected, Envs) -> + erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]); +do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs) + when is_atom(Output) -> + handle_builtin_output(Output, Selected, Envs, maps:get(args, Out, #{})); +do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs) + when is_binary(Output) -> + try binary_to_existing_atom(Output) of + Func -> handle_builtin_output(Func, Selected, Envs, maps:get(args, Out, #{})) catch error:badarg -> error(not_found) end. -handle_builtin_output(Func, Selected, Envs) -> - case erlang:function_exported(emqx_rule_outputs, Func, 2) of - true -> erlang:apply(emqx_rule_outputs, Func, [Selected, Envs]); +handle_builtin_output(Func, Selected, Envs, Args) -> + case erlang:function_exported(emqx_rule_outputs, Func, 3) of + true -> erlang:apply(emqx_rule_outputs, Func, [Selected, Envs, Args]); false -> error(not_found) end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 4a46f24bb..620361c0c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -19,6 +19,7 @@ -export([ test/1 , echo_action/2 + , get_selected_data/3 ]). %% Dialyzer gives up on the generated code. @@ -55,7 +56,7 @@ test_rule(Sql, Select, Context, EventTopics) -> info = #{ sql => Sql, from => EventTopics, - outputs => [get_selected_data], + outputs => [#{type => func, target => fun ?MODULE:get_selected_data/3, args => #{}}], enabled => true, is_foreach => emqx_rule_sqlparser:select_is_foreach(Select), fields => emqx_rule_sqlparser:select_fields(Select), @@ -74,6 +75,9 @@ test_rule(Sql, Select, Context, EventTopics) -> emqx_rule_metrics:clear_rule_metrics(RuleId) end. +get_selected_data(Selected, _Envs, _Args) -> + Selected. + is_publish_topic(<<"$events/", _/binary>>) -> false; is_publish_topic(_Topic) -> true. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 3b949baa3..918460ac4 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -152,10 +152,14 @@ init_per_testcase(t_events, Config) -> "\"$events/message_dropped\", " "\"t1\"", {ok, Rule} = emqx_rule_engine:create_rule( - #{id => <<"rule:t_events">>, - sql => SQL, - outputs => [console, fun ?MODULE:output_record_triggered_events/2], - description => <<"to console and record triggered events">>}), + #{id => <<"rule:t_events">>, + sql => SQL, + outputs => [ + #{type => builtin, target => console}, + #{type => func, target => fun ?MODULE:output_record_triggered_events/3, + args => #{}} + ], + description => <<"to console and record triggered events">>}), ?assertMatch(#rule{id = <<"rule:t_events">>}, Rule), [{hook_points_rules, Rule} | Config]; init_per_testcase(_TestCase, Config) -> @@ -175,7 +179,7 @@ t_create_rule(_Config) -> {ok, #rule{id = Id}} = emqx_rule_engine:create_rule( #{sql => <<"select * from \"t/a\"">>, id => <<"t_create_rule">>, - outputs => [console], + outputs => [#{type => builtin, target => console}], description => <<"debug rule">>}), ct:pal("======== emqx_rule_registry:get_rules :~p", [emqx_rule_registry:get_rules()]), ?assertMatch({ok, #rule{id = Id, info = #{from := [<<"t/a">>]}}}, @@ -193,7 +197,7 @@ t_crud_rule_api(_Config) -> <<"description">> => <<"A simple rule">>, <<"enable">> => true, <<"id">> => RuleID, - <<"outputs">> => [ <<"console">> ], + <<"outputs">> => [#{<<"type">> => <<"builtin">>, <<"target">> => <<"console">>}], <<"sql">> => <<"SELECT * from \"t/1\"">> }, {201, Rule} = emqx_rule_engine_api:crud_rules(post, #{body => Params0}), @@ -278,7 +282,7 @@ t_create_existing_rule(_Config) -> {ok, _} = emqx_rule_engine:create_rule( #{id => <<"an_existing_rule">>, sql => <<"select * from \"t/#\"">>, - outputs => [console] + outputs => [#{type => builtin, target => console}] }), {ok, #rule{info = #{sql := SQL}}} = emqx_rule_registry:get_rule(<<"an_existing_rule">>), ?assertEqual(<<"select * from \"t/#\"">>, SQL), @@ -427,12 +431,13 @@ message_acked(_Client) -> ok. t_match_atom_and_binary(_Config) -> - SQL = "SELECT connected_at as ts, *, republish('t2', 'user:' + ts, 0) " + SQL = "SELECT connected_at as ts, * " "FROM \"$events/client_connected\" " "WHERE username = 'emqx2' ", + Repub = republish_output(<<"t2">>, <<"user:${ts}">>), {ok, TopicRule} = emqx_rule_engine:create_rule( - #{sql => SQL, id => ?TMP_RULEID, - outputs => []}), + #{sql => SQL, id => ?TMP_RULEID, + outputs => [Repub]}), {ok, Client} = emqtt:start_link([{username, <<"emqx1">>}]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), @@ -532,12 +537,13 @@ t_sqlselect_00(_Config) -> topic => <<"t/a">>}})). t_sqlselect_01(_Config) -> - SQL = "SELECT json_decode(payload) as p, payload, republish('t2', payload, 0) " + SQL = "SELECT json_decode(payload) as p, payload " "FROM \"t3/#\", \"t1\" " "WHERE p.x = 1", + Repub = republish_output(<<"t2">>), {ok, TopicRule1} = emqx_rule_engine:create_rule( #{sql => SQL, id => ?TMP_RULEID, - outputs => []}), + outputs => [Repub]}), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), @@ -569,12 +575,13 @@ t_sqlselect_01(_Config) -> emqx_rule_registry:remove_rule(TopicRule1). t_sqlselect_02(_Config) -> - SQL = "SELECT *, republish('t2', payload, 0) " + SQL = "SELECT * " "FROM \"t3/#\", \"t1\" " "WHERE payload.x = 1", + Repub = republish_output(<<"t2">>), {ok, TopicRule1} = emqx_rule_engine:create_rule( #{sql => SQL, id => ?TMP_RULEID, - outputs => []}), + outputs => [Repub]}), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), @@ -606,12 +613,13 @@ t_sqlselect_02(_Config) -> emqx_rule_registry:remove_rule(TopicRule1). t_sqlselect_1(_Config) -> - SQL = "SELECT json_decode(payload) as p, payload, republish('t2', payload, 0) " + SQL = "SELECT json_decode(payload) as p, payload " "FROM \"t1\" " "WHERE p.x = 1 and p.y = 2", + Repub = republish_output(<<"t2">>), {ok, TopicRule} = emqx_rule_engine:create_rule( #{sql => SQL, id => ?TMP_RULEID, - outputs => []}), + outputs => [Repub]}), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), @@ -636,11 +644,11 @@ t_sqlselect_1(_Config) -> t_sqlselect_2(_Config) -> %% recursively republish to t2 - SQL = "SELECT *, republish('t2', payload, 0) " - "FROM \"t2\" ", + SQL = "SELECT * FROM \"t2\" ", + Repub = republish_output(<<"t2">>), {ok, TopicRule} = emqx_rule_engine:create_rule( #{sql => SQL, id => ?TMP_RULEID, - outputs => []}), + outputs => [Repub]}), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), @@ -662,12 +670,13 @@ t_sqlselect_2(_Config) -> t_sqlselect_3(_Config) -> %% republish the client.connected msg - SQL = "SELECT *, republish('t2', 'clientid=' + clientid, 0) " - "FROM \"$events/client_connected\" " - "WHERE username = 'emqx1'", + SQL = "SELECT * " + "FROM \"$events/client_connected\" " + "WHERE username = 'emqx1'", + Repub = republish_output(<<"t2">>, <<"clientid=${clientid}">>), {ok, TopicRule} = emqx_rule_engine:create_rule( #{sql => SQL, id => ?TMP_RULEID, - outputs => []}), + outputs => [Repub]}), {ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), @@ -1337,6 +1346,12 @@ t_sqlparse_nested_get(_Config) -> %% Internal helpers %%------------------------------------------------------------------------------ +republish_output(Topic) -> + republish_output(Topic, <<"${payload}">>). +republish_output(Topic, Payload) -> + #{type => builtin, target => republish, + args => #{<<"payload">> => Payload, <<"topic">> => Topic, <<"qos">> => 0}}. + make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) -> SQL = <<"select * from \"simple/topic\"">>, Topics = [<<"simple/topic">>], @@ -1359,13 +1374,13 @@ make_simple_rule(RuleId, SQL, Topics, Ts) when is_binary(RuleId) -> fields => [<<"*">>], is_foreach => false, conditions => {}, - ouputs => [console], + ouputs => [#{type => builtin, target => console}], description => <<"simple rule">> }, created_at = Ts }. -output_record_triggered_events(Data = #{event := EventName}, _Envs) -> +output_record_triggered_events(Data = #{event := EventName}, _Envs, _Args) -> ct:pal("applying output_record_triggered_events: ~p", [Data]), ets:insert(events_record_tab, {EventName, Data}).