refactor(rules): change republish as an output

This commit is contained in:
Shawn 2021-09-26 14:55:19 +08:00
parent f33e28af6d
commit 420ccf0f51
9 changed files with 223 additions and 72 deletions

View File

@ -31,10 +31,21 @@
-type(topic() :: binary()).
-type(bridge_channel_id() :: binary()).
-type selected_data() :: map().
-type envs() :: map().
-type output_type() :: bridge | builtin | func.
-type output_target() :: bridge_channel_id() | atom() | output_fun().
-type output_fun_args() :: map().
-type output() :: #{
type := output_type(),
target := output_target(),
args => output_fun_args()
}.
-type output_fun() :: fun((selected_data(), envs(), output_fun_args()) -> any()).
-type(rule_info() ::
#{ from := list(topic())
, to := list(bridge_channel_id() | fun())
, outputs := [output()]
, sql := binary()
, is_foreach := boolean()
, fields := list()

View File

@ -34,8 +34,12 @@ roots() ->
fields("rule_creation") ->
[ {"id", sc(binary(), #{desc => "The Id of the rule", nullable => false})}
, {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false})}
, {"outputs", sc(hoconsc:array(binary()),
#{desc => "The outputs of the rule", default => [<<"console">>]})}
, {"outputs", sc(hoconsc:array(hoconsc:union(
[ ref("bridge_output")
, ref("builtin_output")
])),
#{desc => "The outputs of the rule",
default => []})}
, {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})}
, {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})}
];
@ -54,6 +58,38 @@ fields("rule_test") ->
, {"sql", sc(binary(), #{desc => "The SQL of the rule for testing", nullable => false})}
];
fields("bridge_output") ->
[ {type, bridge}
, {target, sc(binary(), #{desc => "The Channel ID of the bridge"})}
];
fields("builtin_output") ->
[ {type, builtin}
, {target, sc(binary(), #{desc => "The Name of the built-on output"})}
, {args, sc(map(), #{desc => "The arguments of the built-in output",
default => #{}})}
];
%% TODO: how to use this in "builtin_output".args ?
fields("republish_args") ->
[ {topic, sc(binary(),
#{desc => "The target topic of the re-published message."
" Template with with variables is allowed.",
nullable => false})}
, {qos, sc(binary(),
#{desc => "The qos of the re-published message."
" Template with with variables is allowed. Defaults to ${qos}.",
default => <<"${qos}">> })}
, {retain, sc(binary(),
#{desc => "The retain of the re-published message."
" Template with with variables is allowed. Defaults to ${retain}.",
default => <<"${retain}">> })}
, {payload, sc(binary(),
#{desc => "The payload of the re-published message."
" Template with with variables is allowed. Defaults to ${payload}.",
default => <<"${payload}">>})}
];
fields("ctx_pub") ->
[ {"event_type", sc(message_publish, #{desc => "Event Type", nullable => false})}
, {"id", sc(binary(), #{desc => "Message ID"})}

View File

@ -72,7 +72,7 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
enabled => maps:get(enabled, Params, true),
sql => Sql,
from => emqx_rule_sqlparser:select_from(Select),
outputs => Outputs,
outputs => parse_outputs(Outputs),
description => maps:get(description, Params, ""),
%% -- calculated fields:
is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
@ -88,3 +88,31 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
{ok, Rule};
Reason -> {error, Reason}
end.
parse_outputs(Outputs) ->
[do_parse_outputs(Out) || Out <- Outputs].
do_parse_outputs(#{type := bridge, target := ChId}) ->
#{type => bridge, target => ChId};
do_parse_outputs(#{type := builtin, target := Repub, args := Args})
when Repub == republish; Repub == <<"republish">> ->
#{type => builtin, target => republish, args => pre_process_repub_args(Args)};
do_parse_outputs(#{type := builtin, target := Name} = Output) ->
#{type => builtin, target => Name, args => maps:get(args, Output, #{})}.
pre_process_repub_args(#{<<"topic">> := Topic} = Args) ->
QoS = maps:get(<<"qos">>, Args, <<"${qos}">>),
Retain = maps:get(<<"retain">>, Args, <<"${retain}">>),
Payload = maps:get(<<"payload">>, Args, <<"${payload}">>),
#{topic => Topic, qos => QoS, payload => Payload, retain => Retain,
preprocessed_tmpl => #{
topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
qos => preproc_vars(QoS),
retain => preproc_vars(Retain),
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
}}.
preproc_vars(Data) when is_binary(Data) ->
emqx_plugin_libs_rule:preproc_tmpl(Data);
preproc_vars(Data) ->
Data.

View File

@ -141,8 +141,21 @@ put_req_schema() ->
description => <<"The outputs of the rule">>,
type => array,
items => #{
type => object,
properties => #{
type => #{
type => string,
enum => [<<"bridge">>, <<"builtin">>],
example => <<"builtin">>
},
target => #{
type => string,
example => <<"console">>
},
args => #{
type => object
}
}
}
},
description => #{
@ -190,9 +203,9 @@ rule_test_req_schema() ->
event_type => #{
description => <<"Event Type">>,
type => string,
enum => ["message_publish", "message_acked", "message_delivered",
"message_dropped", "session_subscribed", "session_unsubscribed",
"client_connected", "client_disconnected"],
enum => [<<"message_publish">>, <<"message_acked">>, <<"message_delivered">>,
<<"message_dropped">>, <<"session_subscribed">>, <<"session_unsubscribed">>,
<<"client_connected">>, <<"client_disconnected">>],
example => <<"message_publish">>
},
clientid => #{
@ -295,7 +308,7 @@ format_rule_resp(#rule{id = Id, created_at = CreatedAt,
description := Descr}}) ->
#{id => Id,
from => Topics,
outputs => Output,
outputs => format_output(Output),
sql => SQL,
metrics => get_rule_metrics(Id),
enabled => Enabled,
@ -306,6 +319,16 @@ format_rule_resp(#rule{id = Id, created_at = CreatedAt,
format_datetime(Timestamp, Unit) ->
list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])).
format_output(Outputs) ->
[do_format_output(Out) || Out <- Outputs].
do_format_output(#{type := func}) ->
#{type => builtin, target => <<"internal_function">>};
do_format_output(#{type := builtin, target := Name, args := Args}) ->
#{type => builtin, target => Name, args => maps:remove(preprocessed_tmpl, Args)};
do_format_output(#{type := bridge, target := Name}) ->
#{type => bridge, target => Name}.
get_rule_metrics(Id) ->
[maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]))
|| Node <- ekka_mnesia:running_nodes()].

View File

@ -38,8 +38,6 @@
, contains_topic_match/2
, contains_topic_match/3
, null/0
, republish/3
, republish/4
]).
%% Arithmetic Funcs
@ -309,22 +307,6 @@ find_topic_filter(Filter, TopicFilters, Func) ->
null() ->
undefined.
republish(Topic, Payload, Qos) ->
republish(Topic, Payload, Qos, false).
republish(Topic, Payload, Qos, Retain) ->
Msg = #message{
id = emqx_guid:gen(),
qos = Qos,
from = republish_function,
flags = #{retain => Retain},
headers = #{},
topic = Topic,
payload = Payload,
timestamp = erlang:system_time(millisecond)
},
emqx_broker:safe_publish(Msg).
%%------------------------------------------------------------------------------
%% Arithmetic Funcs
%%------------------------------------------------------------------------------

View File

@ -17,16 +17,66 @@
%% Define the default actions.
-module(emqx_rule_outputs).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
-export([ console/2
, get_selected_data/2
-export([ console/3
, republish/3
]).
-spec console(map(), map()) -> any().
console(Selected, #{metadata := #{rule_id := RuleId}} = Envs) ->
-spec console(map(), map(), map()) -> any().
console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) ->
?ULOG("[rule output] ~s~n"
"\tOutput Data: ~p~n"
"\tEnvs: ~p~n", [RuleId, Selected, Envs]).
get_selected_data(Selected, _Envs) ->
Selected.
republish(_Selected, #{topic := Topic, headers := #{republish_by := RuleId},
metadata := #{rule_id := RuleId}}, _Args) ->
?LOG(error, "[republish] recursively republish detected, msg topic: ~p", [Topic]);
%% republish a PUBLISH message
republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}},
#{preprocessed_tmpl := #{
qos := QoSTks,
retain := RetainTks,
topic := TopicTks,
payload := PayloadTks}}) ->
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected),
Retain = replace_simple_var(RetainTks, Selected),
?LOG(debug, "[republish] to: ~p, payload: ~p", [Topic, Payload]),
safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
%% in case this is a "$events/" event
republish(Selected, #{metadata := #{rule_id := RuleId}},
#{preprocessed_tmpl := #{
qos := QoSTks,
retain := RetainTks,
topic := TopicTks,
payload := PayloadTks}}) ->
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected),
Retain = replace_simple_var(RetainTks, Selected),
?LOG(debug, "[republish] to: ~p, payload: ~p", [Topic, Payload]),
safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
safe_publish(RuleId, Topic, QoS, Flags, Payload) ->
Msg = #message{
id = emqx_guid:gen(),
qos = QoS,
from = RuleId,
flags = Flags,
headers = #{republish_by => RuleId},
topic = Topic,
payload = Payload,
timestamp = erlang:system_time(millisecond)
},
_ = emqx_broker:safe_publish(Msg),
emqx_metrics:inc_msg(Msg).
replace_simple_var(Tokens, Data) when is_list(Tokens) ->
[Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
Var;
replace_simple_var(Val, _Data) ->
Val.

View File

@ -238,22 +238,24 @@ handle_output(OutId, Selected, Envs) ->
?LOG(warning, "Output to ~p failed, ~p", [OutId, {Err, Reason, ST}])
end.
do_handle_output(<<"bridge:", _/binary>> = _ChannelId, _Selected, _Envs) ->
?LOG(warning, "calling bridge from rules has not been implemented yet!");
do_handle_output(OutputFun, Selected, Envs) when is_function(OutputFun) ->
erlang:apply(OutputFun, [Selected, Envs]);
do_handle_output(BuiltInOutput, Selected, Envs) when is_atom(BuiltInOutput) ->
handle_builtin_output(BuiltInOutput, Selected, Envs);
do_handle_output(BuiltInOutput, Selected, Envs) when is_binary(BuiltInOutput) ->
try binary_to_existing_atom(BuiltInOutput) of
Func -> handle_builtin_output(Func, Selected, Envs)
do_handle_output(#{type := bridge, target := ChannelId}, _Selected, _Envs) ->
?LOG(warning, "calling bridge from rules has not been implemented yet! ~p", [ChannelId]);
do_handle_output(#{type := func, target := Func} = Out, Selected, Envs) ->
erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]);
do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs)
when is_atom(Output) ->
handle_builtin_output(Output, Selected, Envs, maps:get(args, Out, #{}));
do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs)
when is_binary(Output) ->
try binary_to_existing_atom(Output) of
Func -> handle_builtin_output(Func, Selected, Envs, maps:get(args, Out, #{}))
catch
error:badarg -> error(not_found)
end.
handle_builtin_output(Func, Selected, Envs) ->
case erlang:function_exported(emqx_rule_outputs, Func, 2) of
true -> erlang:apply(emqx_rule_outputs, Func, [Selected, Envs]);
handle_builtin_output(Func, Selected, Envs, Args) ->
case erlang:function_exported(emqx_rule_outputs, Func, 3) of
true -> erlang:apply(emqx_rule_outputs, Func, [Selected, Envs, Args]);
false -> error(not_found)
end.

View File

@ -19,6 +19,7 @@
-export([ test/1
, echo_action/2
, get_selected_data/3
]).
%% Dialyzer gives up on the generated code.
@ -55,7 +56,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
info = #{
sql => Sql,
from => EventTopics,
outputs => [get_selected_data],
outputs => [#{type => func, target => fun ?MODULE:get_selected_data/3, args => #{}}],
enabled => true,
is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
fields => emqx_rule_sqlparser:select_fields(Select),
@ -74,6 +75,9 @@ test_rule(Sql, Select, Context, EventTopics) ->
emqx_rule_metrics:clear_rule_metrics(RuleId)
end.
get_selected_data(Selected, _Envs, _Args) ->
Selected.
is_publish_topic(<<"$events/", _/binary>>) -> false;
is_publish_topic(_Topic) -> true.

View File

@ -154,7 +154,11 @@ init_per_testcase(t_events, Config) ->
{ok, Rule} = emqx_rule_engine:create_rule(
#{id => <<"rule:t_events">>,
sql => SQL,
outputs => [console, fun ?MODULE:output_record_triggered_events/2],
outputs => [
#{type => builtin, target => console},
#{type => func, target => fun ?MODULE:output_record_triggered_events/3,
args => #{}}
],
description => <<"to console and record triggered events">>}),
?assertMatch(#rule{id = <<"rule:t_events">>}, Rule),
[{hook_points_rules, Rule} | Config];
@ -175,7 +179,7 @@ t_create_rule(_Config) ->
{ok, #rule{id = Id}} = emqx_rule_engine:create_rule(
#{sql => <<"select * from \"t/a\"">>,
id => <<"t_create_rule">>,
outputs => [console],
outputs => [#{type => builtin, target => console}],
description => <<"debug rule">>}),
ct:pal("======== emqx_rule_registry:get_rules :~p", [emqx_rule_registry:get_rules()]),
?assertMatch({ok, #rule{id = Id, info = #{from := [<<"t/a">>]}}},
@ -193,7 +197,7 @@ t_crud_rule_api(_Config) ->
<<"description">> => <<"A simple rule">>,
<<"enable">> => true,
<<"id">> => RuleID,
<<"outputs">> => [ <<"console">> ],
<<"outputs">> => [#{<<"type">> => <<"builtin">>, <<"target">> => <<"console">>}],
<<"sql">> => <<"SELECT * from \"t/1\"">>
},
{201, Rule} = emqx_rule_engine_api:crud_rules(post, #{body => Params0}),
@ -278,7 +282,7 @@ t_create_existing_rule(_Config) ->
{ok, _} = emqx_rule_engine:create_rule(
#{id => <<"an_existing_rule">>,
sql => <<"select * from \"t/#\"">>,
outputs => [console]
outputs => [#{type => builtin, target => console}]
}),
{ok, #rule{info = #{sql := SQL}}} = emqx_rule_registry:get_rule(<<"an_existing_rule">>),
?assertEqual(<<"select * from \"t/#\"">>, SQL),
@ -427,12 +431,13 @@ message_acked(_Client) ->
ok.
t_match_atom_and_binary(_Config) ->
SQL = "SELECT connected_at as ts, *, republish('t2', 'user:' + ts, 0) "
SQL = "SELECT connected_at as ts, * "
"FROM \"$events/client_connected\" "
"WHERE username = 'emqx2' ",
Repub = republish_output(<<"t2">>, <<"user:${ts}">>),
{ok, TopicRule} = emqx_rule_engine:create_rule(
#{sql => SQL, id => ?TMP_RULEID,
outputs => []}),
outputs => [Repub]}),
{ok, Client} = emqtt:start_link([{username, <<"emqx1">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
@ -532,12 +537,13 @@ t_sqlselect_00(_Config) ->
topic => <<"t/a">>}})).
t_sqlselect_01(_Config) ->
SQL = "SELECT json_decode(payload) as p, payload, republish('t2', payload, 0) "
SQL = "SELECT json_decode(payload) as p, payload "
"FROM \"t3/#\", \"t1\" "
"WHERE p.x = 1",
Repub = republish_output(<<"t2">>),
{ok, TopicRule1} = emqx_rule_engine:create_rule(
#{sql => SQL, id => ?TMP_RULEID,
outputs => []}),
outputs => [Repub]}),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
@ -569,12 +575,13 @@ t_sqlselect_01(_Config) ->
emqx_rule_registry:remove_rule(TopicRule1).
t_sqlselect_02(_Config) ->
SQL = "SELECT *, republish('t2', payload, 0) "
SQL = "SELECT * "
"FROM \"t3/#\", \"t1\" "
"WHERE payload.x = 1",
Repub = republish_output(<<"t2">>),
{ok, TopicRule1} = emqx_rule_engine:create_rule(
#{sql => SQL, id => ?TMP_RULEID,
outputs => []}),
outputs => [Repub]}),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
@ -606,12 +613,13 @@ t_sqlselect_02(_Config) ->
emqx_rule_registry:remove_rule(TopicRule1).
t_sqlselect_1(_Config) ->
SQL = "SELECT json_decode(payload) as p, payload, republish('t2', payload, 0) "
SQL = "SELECT json_decode(payload) as p, payload "
"FROM \"t1\" "
"WHERE p.x = 1 and p.y = 2",
Repub = republish_output(<<"t2">>),
{ok, TopicRule} = emqx_rule_engine:create_rule(
#{sql => SQL, id => ?TMP_RULEID,
outputs => []}),
outputs => [Repub]}),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
@ -636,11 +644,11 @@ t_sqlselect_1(_Config) ->
t_sqlselect_2(_Config) ->
%% recursively republish to t2
SQL = "SELECT *, republish('t2', payload, 0) "
"FROM \"t2\" ",
SQL = "SELECT * FROM \"t2\" ",
Repub = republish_output(<<"t2">>),
{ok, TopicRule} = emqx_rule_engine:create_rule(
#{sql => SQL, id => ?TMP_RULEID,
outputs => []}),
outputs => [Repub]}),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
@ -662,12 +670,13 @@ t_sqlselect_2(_Config) ->
t_sqlselect_3(_Config) ->
%% republish the client.connected msg
SQL = "SELECT *, republish('t2', 'clientid=' + clientid, 0) "
SQL = "SELECT * "
"FROM \"$events/client_connected\" "
"WHERE username = 'emqx1'",
Repub = republish_output(<<"t2">>, <<"clientid=${clientid}">>),
{ok, TopicRule} = emqx_rule_engine:create_rule(
#{sql => SQL, id => ?TMP_RULEID,
outputs => []}),
outputs => [Repub]}),
{ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
@ -1337,6 +1346,12 @@ t_sqlparse_nested_get(_Config) ->
%% Internal helpers
%%------------------------------------------------------------------------------
republish_output(Topic) ->
republish_output(Topic, <<"${payload}">>).
republish_output(Topic, Payload) ->
#{type => builtin, target => republish,
args => #{<<"payload">> => Payload, <<"topic">> => Topic, <<"qos">> => 0}}.
make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
SQL = <<"select * from \"simple/topic\"">>,
Topics = [<<"simple/topic">>],
@ -1359,13 +1374,13 @@ make_simple_rule(RuleId, SQL, Topics, Ts) when is_binary(RuleId) ->
fields => [<<"*">>],
is_foreach => false,
conditions => {},
ouputs => [console],
ouputs => [#{type => builtin, target => console}],
description => <<"simple rule">>
},
created_at = Ts
}.
output_record_triggered_events(Data = #{event := EventName}, _Envs) ->
output_record_triggered_events(Data = #{event := EventName}, _Envs, _Args) ->
ct:pal("applying output_record_triggered_events: ~p", [Data]),
ets:insert(events_record_tab, {EventName, Data}).