From 6685a3c5a846e23ccf332705a0e4ecbf19d8f5fc Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 17 Jun 2022 15:37:23 +0800 Subject: [PATCH] fix: remove the 'headers' field from the rule events --- apps/emqx_bridge/src/emqx_bridge.erl | 2 +- .../src/mqtt/emqx_connector_mqtt_msg.erl | 3 +- .../emqx_rule_engine/src/emqx_rule_events.erl | 114 ++++----- .../src/emqx_rule_runtime.erl | 217 +++++++++--------- .../src/emqx_rule_sqltester.erl | 2 +- .../test/emqx_rule_engine_SUITE.erl | 4 +- .../test/emqx_rule_funcs_SUITE.erl | 7 +- 7 files changed, 181 insertions(+), 168 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 200c8c2a9..ba6c64dbc 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -118,7 +118,7 @@ unload_hook() -> on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> case maps:get(sys, Flags, false) of false -> - Msg = emqx_rule_events:eventmsg_publish(Message), + {Msg, _} = emqx_rule_events:eventmsg_publish(Message), send_to_matched_egress_bridges(Topic, Msg); true -> ok diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 8cc582512..581d2670f 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -63,7 +63,8 @@ make_pub_vars(Mountpoint, Conf) when is_map(Conf) -> exp_msg(). to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> Retain0 = maps:get(retain, Flags0, false), - MapMsg = maps:put(retain, Retain0, emqx_rule_events:eventmsg_publish(Msg)), + {Columns, _} = emqx_rule_events:eventmsg_publish(Msg), + MapMsg = maps:put(retain, Retain0, Columns), to_remote_msg(MapMsg, Vars); to_remote_msg(MapMsg, #{ remote_topic := TopicToken, diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index a25ad0e2e..0ffa2e684 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -107,36 +107,41 @@ unload(Topic) -> %%-------------------------------------------------------------------- %% Callbacks %%-------------------------------------------------------------------- -on_message_publish(Message = #message{topic = Topic}, _Env) -> +on_message_publish(Message = #message{topic = Topic}, _Conf) -> case ignore_sys_message(Message) of true -> ok; false -> case emqx_rule_engine:get_rules_for_topic(Topic) of - [] -> ok; - Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message)) + [] -> + ok; + Rules -> + %% ENVs are the fields that can't be refereced by the SQL, but can be used + %% from actions. e.g. The 'headers' field in the internal record `#message{}`. + {Columns, Envs} = eventmsg_publish(Message), + emqx_rule_runtime:apply_rules(Rules, Columns, Envs) end end, {ok, Message}. -on_bridge_message_received(Message, Env = #{event_topic := BridgeTopic}) -> - apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message) end, Env). +on_bridge_message_received(Message, Conf = #{event_topic := BridgeTopic}) -> + apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message, #{}) end, Conf). -on_client_connected(ClientInfo, ConnInfo, Env) -> +on_client_connected(ClientInfo, ConnInfo, Conf) -> apply_event( 'client.connected', fun() -> eventmsg_connected(ClientInfo, ConnInfo) end, - Env + Conf ). -on_client_connack(ConnInfo, Reason, _, Env) -> +on_client_connack(ConnInfo, Reason, _, Conf) -> apply_event( 'client.connack', fun() -> eventmsg_connack(ConnInfo, Reason) end, - Env + Conf ). -on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, Env) -> +on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, Conf) -> apply_event( 'client.check_authz_complete', fun() -> @@ -148,35 +153,35 @@ on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, E AuthzSource ) end, - Env + Conf ). -on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) -> +on_client_disconnected(ClientInfo, Reason, ConnInfo, Conf) -> apply_event( 'client.disconnected', fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end, - Env + Conf ). -on_session_subscribed(ClientInfo, Topic, SubOpts, Env) -> +on_session_subscribed(ClientInfo, Topic, SubOpts, Conf) -> apply_event( 'session.subscribed', fun() -> eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) end, - Env + Conf ). -on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env) -> +on_session_unsubscribed(ClientInfo, Topic, SubOpts, Conf) -> apply_event( 'session.unsubscribed', fun() -> eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts) end, - Env + Conf ). -on_message_dropped(Message, _, Reason, Env) -> +on_message_dropped(Message, _, Reason, Conf) -> case ignore_sys_message(Message) of true -> ok; @@ -184,12 +189,12 @@ on_message_dropped(Message, _, Reason, Env) -> apply_event( 'message.dropped', fun() -> eventmsg_dropped(Message, Reason) end, - Env + Conf ) end, {ok, Message}. -on_message_delivered(ClientInfo, Message, Env) -> +on_message_delivered(ClientInfo, Message, Conf) -> case ignore_sys_message(Message) of true -> ok; @@ -197,12 +202,12 @@ on_message_delivered(ClientInfo, Message, Env) -> apply_event( 'message.delivered', fun() -> eventmsg_delivered(ClientInfo, Message) end, - Env + Conf ) end, {ok, Message}. -on_message_acked(ClientInfo, Message, Env) -> +on_message_acked(ClientInfo, Message, Conf) -> case ignore_sys_message(Message) of true -> ok; @@ -210,12 +215,12 @@ on_message_acked(ClientInfo, Message, Env) -> apply_event( 'message.acked', fun() -> eventmsg_acked(ClientInfo, Message) end, - Env + Conf ) end, {ok, Message}. -on_delivery_dropped(ClientInfo, Message, Reason, Env) -> +on_delivery_dropped(ClientInfo, Message, Reason, Conf) -> case ignore_sys_message(Message) of true -> ok; @@ -223,7 +228,7 @@ on_delivery_dropped(ClientInfo, Message, Reason, Env) -> apply_event( 'delivery.dropped', fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end, - Env + Conf ) end, {ok, Message}. @@ -256,10 +261,9 @@ eventmsg_publish( qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), - %% the column 'headers' will be removed in the next major release - headers => printable_maps(Headers), publish_received_at => Timestamp - } + }, + #{headers => Headers} ). eventmsg_connected( @@ -299,7 +303,8 @@ eventmsg_connected( is_bridge => IsBridge, conn_props => printable_maps(ConnProps), connected_at => ConnectedAt - } + }, + #{} ). eventmsg_disconnected( @@ -328,7 +333,8 @@ eventmsg_disconnected( proto_ver => ProtoVer, disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})), disconnected_at => DisconnectedAt - } + }, + #{} ). eventmsg_connack( @@ -360,7 +366,8 @@ eventmsg_connack( keepalive => Keepalive, expiry_interval => ExpiryInterval, conn_props => printable_maps(ConnProps) - } + }, + #{} ). eventmsg_check_authz_complete( @@ -384,7 +391,8 @@ eventmsg_check_authz_complete( action => PubSub, authz_source => AuthzSource, result => Result - } + }, + #{} ). eventmsg_sub_or_unsub( @@ -407,7 +415,8 @@ eventmsg_sub_or_unsub( PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})), topic => Topic, qos => QoS - } + }, + #{} ). eventmsg_dropped( @@ -435,11 +444,10 @@ eventmsg_dropped( topic => Topic, qos => QoS, flags => Flags, - %% the column 'headers' will be removed in the next major release - headers => printable_maps(Headers), pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), publish_received_at => Timestamp - } + }, + #{headers => Headers} ). eventmsg_delivered( @@ -472,11 +480,10 @@ eventmsg_delivered( topic => Topic, qos => QoS, flags => Flags, - %% the column 'headers' will be removed in the next major release - headers => printable_maps(Headers), pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), publish_received_at => Timestamp - } + }, + #{headers => Headers} ). eventmsg_acked( @@ -509,12 +516,11 @@ eventmsg_acked( topic => Topic, qos => QoS, flags => Flags, - %% the column 'headers' will be removed in the next major release - headers => printable_maps(Headers), pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})), publish_received_at => Timestamp - } + }, + #{headers => Headers} ). eventmsg_delivery_dropped( @@ -549,34 +555,37 @@ eventmsg_delivery_dropped( topic => Topic, qos => QoS, flags => Flags, - %% the column 'headers' will be removed in the next major release - headers => printable_maps(Headers), pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), publish_received_at => Timestamp - } + }, + #{headers => Headers} ). sub_unsub_prop_key('session.subscribed') -> sub_props; sub_unsub_prop_key('session.unsubscribed') -> unsub_props. -with_basic_columns(EventName, Data) when is_map(Data) -> - Data#{ - event => EventName, - timestamp => erlang:system_time(millisecond), - node => node() +with_basic_columns(EventName, Columns, Envs) when is_map(Columns) -> + { + Columns#{ + event => EventName, + timestamp => erlang:system_time(millisecond), + node => node() + }, + Envs }. %%-------------------------------------------------------------------- %% rules applying %%-------------------------------------------------------------------- -apply_event(EventName, GenEventMsg, _Env) -> +apply_event(EventName, GenEventMsg, _Conf) -> EventTopic = event_topic(EventName), case emqx_rule_engine:get_rules_for_topic(EventTopic) of [] -> ok; Rules -> %% delay the generating of eventmsg after we have found some rules to apply - emqx_rule_runtime:apply_rules(Rules, GenEventMsg()) + {Columns, Envs} = GenEventMsg(), + emqx_rule_runtime:apply_rules(Rules, Columns, Envs) end. %%-------------------------------------------------------------------- @@ -777,7 +786,6 @@ columns_with_exam('message.publish') -> {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"flags">>, #{}}, - {<<"headers">>, undefined}, {<<"publish_received_at">>, erlang:system_time(millisecond)}, columns_example_props(pub_props), {<<"timestamp">>, erlang:system_time(millisecond)}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 5d78d7ab8..a0d1c464a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -21,8 +21,8 @@ -include_lib("emqx/include/logger.hrl"). -export([ - apply_rule/2, - apply_rules/2, + apply_rule/3, + apply_rules/3, clear_rule_payload/0 ]). @@ -37,7 +37,7 @@ -compile({no_auto_import, [alias/1]}). --type input() :: map(). +-type columns() :: map(). -type alias() :: atom(). -type collection() :: {alias(), [term()]}. @@ -50,24 +50,24 @@ %%------------------------------------------------------------------------------ %% Apply rules %%------------------------------------------------------------------------------ --spec apply_rules(list(rule()), input()) -> ok. -apply_rules([], _Input) -> +-spec apply_rules(list(rule()), columns(), envs()) -> ok. +apply_rules([], _Columns, _Envs) -> ok; -apply_rules([#{enable := false} | More], Input) -> - apply_rules(More, Input); -apply_rules([Rule | More], Input) -> - apply_rule_discard_result(Rule, Input), - apply_rules(More, Input). +apply_rules([#{enable := false} | More], Columns, Envs) -> + apply_rules(More, Columns, Envs); +apply_rules([Rule | More], Columns, Envs) -> + apply_rule_discard_result(Rule, Columns, Envs), + apply_rules(More, Columns, Envs). -apply_rule_discard_result(Rule, Input) -> - _ = apply_rule(Rule, Input), +apply_rule_discard_result(Rule, Columns, Envs) -> + _ = apply_rule(Rule, Columns, Envs), ok. -apply_rule(Rule = #{id := RuleID}, Input) -> +apply_rule(Rule = #{id := RuleID}, Columns, Envs) -> ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'matched'), clear_rule_payload(), try - do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})) + do_apply_rule(Rule, add_metadata(Columns, #{rule_id => RuleID}), Envs) catch %% ignore the errors if select or match failed _:Reason = {select_and_transform_error, Error} -> @@ -124,13 +124,14 @@ do_apply_rule( conditions := Conditions, actions := Actions }, - Input + Columns, + Envs ) -> {Selected, Collection} = ?RAISE( - select_and_collect(Fields, Input), + select_and_collect(Fields, Columns), {select_and_collect_error, {_EXCLASS_, _EXCPTION_, _ST_}} ), - ColumnsAndSelected = maps:merge(Input, Selected), + ColumnsAndSelected = maps:merge(Columns, Selected), case ?RAISE( match_conditions(Conditions, ColumnsAndSelected), @@ -138,14 +139,15 @@ do_apply_rule( ) of true -> - Collection2 = filter_collection(Input, InCase, DoEach, Collection), + Collection2 = filter_collection(Columns, InCase, DoEach, Collection), case Collection2 of [] -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'); _ -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed') end, - {ok, [handle_action_list(RuleId, Actions, Coll, Input) || Coll <- Collection2]}; + NewEnvs = maps:merge(Columns, Envs), + {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- Collection2]}; false -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), {error, nomatch} @@ -158,21 +160,22 @@ do_apply_rule( conditions := Conditions, actions := Actions }, - Input + Columns, + Envs ) -> Selected = ?RAISE( - select_and_transform(Fields, Input), + select_and_transform(Fields, Columns), {select_and_transform_error, {_EXCLASS_, _EXCPTION_, _ST_}} ), case ?RAISE( - match_conditions(Conditions, maps:merge(Input, Selected)), + match_conditions(Conditions, maps:merge(Columns, Selected)), {match_conditions_error, {_EXCLASS_, _EXCPTION_, _ST_}} ) of true -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'), - {ok, handle_action_list(RuleId, Actions, Selected, Input)}; + {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))}; false -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), {error, nomatch} @@ -182,73 +185,73 @@ clear_rule_payload() -> erlang:erase(rule_payload). %% SELECT Clause -select_and_transform(Fields, Input) -> - select_and_transform(Fields, Input, #{}). +select_and_transform(Fields, Columns) -> + select_and_transform(Fields, Columns, #{}). -select_and_transform([], _Input, Action) -> +select_and_transform([], _Columns, Action) -> Action; -select_and_transform(['*' | More], Input, Action) -> - select_and_transform(More, Input, maps:merge(Action, Input)); -select_and_transform([{as, Field, Alias} | More], Input, Action) -> - Val = eval(Field, Input), +select_and_transform(['*' | More], Columns, Action) -> + select_and_transform(More, Columns, maps:merge(Action, Columns)); +select_and_transform([{as, Field, Alias} | More], Columns, Action) -> + Val = eval(Field, Columns), select_and_transform( More, - nested_put(Alias, Val, Input), + nested_put(Alias, Val, Columns), nested_put(Alias, Val, Action) ); -select_and_transform([Field | More], Input, Action) -> - Val = eval(Field, Input), +select_and_transform([Field | More], Columns, Action) -> + Val = eval(Field, Columns), Key = alias(Field), select_and_transform( More, - nested_put(Key, Val, Input), + nested_put(Key, Val, Columns), nested_put(Key, Val, Action) ). %% FOREACH Clause --spec select_and_collect(list(), input()) -> {input(), collection()}. -select_and_collect(Fields, Input) -> - select_and_collect(Fields, Input, {#{}, {'item', []}}). +-spec select_and_collect(list(), columns()) -> {columns(), collection()}. +select_and_collect(Fields, Columns) -> + select_and_collect(Fields, Columns, {#{}, {'item', []}}). -select_and_collect([{as, Field, {_, A} = Alias}], Input, {Action, _}) -> - Val = eval(Field, Input), +select_and_collect([{as, Field, {_, A} = Alias}], Columns, {Action, _}) -> + Val = eval(Field, Columns), {nested_put(Alias, Val, Action), {A, ensure_list(Val)}}; -select_and_collect([{as, Field, Alias} | More], Input, {Action, LastKV}) -> - Val = eval(Field, Input), +select_and_collect([{as, Field, Alias} | More], Columns, {Action, LastKV}) -> + Val = eval(Field, Columns), select_and_collect( More, - nested_put(Alias, Val, Input), + nested_put(Alias, Val, Columns), {nested_put(Alias, Val, Action), LastKV} ); -select_and_collect([Field], Input, {Action, _}) -> - Val = eval(Field, Input), +select_and_collect([Field], Columns, {Action, _}) -> + Val = eval(Field, Columns), Key = alias(Field), {nested_put(Key, Val, Action), {'item', ensure_list(Val)}}; -select_and_collect([Field | More], Input, {Action, LastKV}) -> - Val = eval(Field, Input), +select_and_collect([Field | More], Columns, {Action, LastKV}) -> + Val = eval(Field, Columns), Key = alias(Field), select_and_collect( More, - nested_put(Key, Val, Input), + nested_put(Key, Val, Columns), {nested_put(Key, Val, Action), LastKV} ). %% Filter each item got from FOREACH -filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) -> +filter_collection(Columns, InCase, DoEach, {CollKey, CollVal}) -> lists:filtermap( fun(Item) -> - InputAndItem = maps:merge(Input, #{CollKey => Item}), + ColumnsAndItem = maps:merge(Columns, #{CollKey => Item}), case ?RAISE( - match_conditions(InCase, InputAndItem), + match_conditions(InCase, ColumnsAndItem), {match_incase_error, {_EXCLASS_, _EXCPTION_, _ST_}} ) of - true when DoEach == [] -> {true, InputAndItem}; + true when DoEach == [] -> {true, ColumnsAndItem}; true -> {true, ?RAISE( - select_and_transform(DoEach, InputAndItem), + select_and_transform(DoEach, ColumnsAndItem), {doeach_error, {_EXCLASS_, _EXCPTION_, _ST_}} )}; false -> @@ -356,41 +359,41 @@ eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) -> nested_get({path, Path}, may_decode_payload(Payload)); eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) -> nested_get({path, Path}, may_decode_payload(Payload)); -eval({path, _} = Path, Input) -> - nested_get(Path, Input); -eval({range, {Begin, End}}, _Input) -> +eval({path, _} = Path, Columns) -> + nested_get(Path, Columns); +eval({range, {Begin, End}}, _Columns) -> range_gen(Begin, End); -eval({get_range, {Begin, End}, Data}, Input) -> - range_get(Begin, End, eval(Data, Input)); -eval({var, _} = Var, Input) -> - nested_get(Var, Input); -eval({const, Val}, _Input) -> +eval({get_range, {Begin, End}, Data}, Columns) -> + range_get(Begin, End, eval(Data, Columns)); +eval({var, _} = Var, Columns) -> + nested_get(Var, Columns); +eval({const, Val}, _Columns) -> Val; %% unary add -eval({'+', L}, Input) -> - eval(L, Input); +eval({'+', L}, Columns) -> + eval(L, Columns); %% unary subtract -eval({'-', L}, Input) -> - -(eval(L, Input)); -eval({Op, L, R}, Input) when ?is_arith(Op) -> - apply_func(Op, [eval(L, Input), eval(R, Input)], Input); -eval({Op, L, R}, Input) when ?is_comp(Op) -> - compare(Op, eval(L, Input), eval(R, Input)); -eval({list, List}, Input) -> - [eval(L, Input) || L <- List]; -eval({'case', <<>>, CaseClauses, ElseClauses}, Input) -> - eval_case_clauses(CaseClauses, ElseClauses, Input); -eval({'case', CaseOn, CaseClauses, ElseClauses}, Input) -> - eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input); -eval({'fun', {_, Name}, Args}, Input) -> - apply_func(Name, [eval(Arg, Input) || Arg <- Args], Input). +eval({'-', L}, Columns) -> + -(eval(L, Columns)); +eval({Op, L, R}, Columns) when ?is_arith(Op) -> + apply_func(Op, [eval(L, Columns), eval(R, Columns)], Columns); +eval({Op, L, R}, Columns) when ?is_comp(Op) -> + compare(Op, eval(L, Columns), eval(R, Columns)); +eval({list, List}, Columns) -> + [eval(L, Columns) || L <- List]; +eval({'case', <<>>, CaseClauses, ElseClauses}, Columns) -> + eval_case_clauses(CaseClauses, ElseClauses, Columns); +eval({'case', CaseOn, CaseClauses, ElseClauses}, Columns) -> + eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Columns); +eval({'fun', {_, Name}, Args}, Columns) -> + apply_func(Name, [eval(Arg, Columns) || Arg <- Args], Columns). -handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Input) -> - Input#{payload => may_decode_payload(Payload)}; -handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Input) -> - Input#{<<"payload">> => may_decode_payload(Payload)}; -handle_alias(_, Input) -> - Input. +handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Columns) -> + Columns#{payload => may_decode_payload(Payload)}; +handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Columns) -> + Columns#{<<"payload">> => may_decode_payload(Payload)}; +handle_alias(_, Columns) -> + Columns. alias({var, Var}) -> {var, Var}; @@ -417,55 +420,55 @@ alias({'fun', Name, _}) -> alias(_) -> ?ephemeral_alias(unknown, unknown). -eval_case_clauses([], ElseClauses, Input) -> +eval_case_clauses([], ElseClauses, Columns) -> case ElseClauses of {} -> undefined; - _ -> eval(ElseClauses, Input) + _ -> eval(ElseClauses, Columns) end; -eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Input) -> - case match_conditions(Cond, Input) of +eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Columns) -> + case match_conditions(Cond, Columns) of true -> - eval(Clause, Input); + eval(Clause, Columns); _ -> - eval_case_clauses(CaseClauses, ElseClauses, Input) + eval_case_clauses(CaseClauses, ElseClauses, Columns) end. -eval_switch_clauses(_CaseOn, [], ElseClauses, Input) -> +eval_switch_clauses(_CaseOn, [], ElseClauses, Columns) -> case ElseClauses of {} -> undefined; - _ -> eval(ElseClauses, Input) + _ -> eval(ElseClauses, Columns) end; -eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Input) -> - ConResult = eval(Cond, Input), - case eval(CaseOn, Input) of +eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Columns) -> + ConResult = eval(Cond, Columns), + case eval(CaseOn, Columns) of ConResult -> - eval(Clause, Input); + eval(Clause, Columns); _ -> - eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input) + eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Columns) end. -apply_func(Name, Args, Input) when is_atom(Name) -> - do_apply_func(Name, Args, Input); -apply_func(Name, Args, Input) when is_binary(Name) -> +apply_func(Name, Args, Columns) when is_atom(Name) -> + do_apply_func(Name, Args, Columns); +apply_func(Name, Args, Columns) when is_binary(Name) -> FunName = try binary_to_existing_atom(Name, utf8) catch error:badarg -> error({sql_function_not_supported, Name}) end, - do_apply_func(FunName, Args, Input). + do_apply_func(FunName, Args, Columns). -do_apply_func(Name, Args, Input) -> +do_apply_func(Name, Args, Columns) -> case erlang:apply(emqx_rule_funcs, Name, Args) of Func when is_function(Func) -> - erlang:apply(Func, [Input]); + erlang:apply(Func, [Columns]); Result -> Result end. -add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) -> - NewMetadata = maps:merge(maps:get(metadata, Input, #{}), Metadata), - Input#{metadata => NewMetadata}. +add_metadata(Columns, Metadata) when is_map(Columns), is_map(Metadata) -> + NewMetadata = maps:merge(maps:get(metadata, Columns, #{}), Metadata), + Columns#{metadata => NewMetadata}. %%------------------------------------------------------------------------------ %% Internal Functions @@ -495,6 +498,6 @@ safe_decode_and_cache(MaybeJson) -> ensure_list(List) when is_list(List) -> List; ensure_list(_NotList) -> []. -nested_put(Alias, Val, Input0) -> - Input = handle_alias(Alias, Input0), - emqx_rule_maps:nested_put(Alias, Val, Input). +nested_put(Alias, Val, Columns0) -> + Columns = handle_alias(Alias, Columns0), + emqx_rule_maps:nested_put(Alias, Val, Columns). diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 8c1d0cb1d..c333bb80e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -61,7 +61,7 @@ test_rule(Sql, Select, Context, EventTopics) -> created_at => erlang:system_time(millisecond) }, FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)), - try emqx_rule_runtime:apply_rule(Rule, FullContext) of + try emqx_rule_runtime:apply_rule(Rule, FullContext, #{}) of {ok, Data} -> {ok, flatten(Data)}; {error, Reason} -> {error, Reason} after diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index a5834d5df..4e0c7dfe3 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -243,7 +243,7 @@ t_add_get_remove_rule(_Config) -> ok. t_add_get_remove_rules(_Config) -> - delete_rules_by_ids(emqx_rule_engine:get_rules()), + delete_rules_by_ids([Id || #{id := Id} <- emqx_rule_engine:get_rules()]), ok = insert_rules( [ make_simple_rule(<<"rule-debug-1">>), @@ -2386,7 +2386,6 @@ verify_event_fields('message.publish', Fields) -> topic := Topic, qos := QoS, flags := Flags, - headers := Headers, pub_props := Properties, timestamp := Timestamp, publish_received_at := EventAt @@ -2402,7 +2401,6 @@ verify_event_fields('message.publish', Fields) -> ?assertEqual(<<"t1">>, Topic), ?assertEqual(1, QoS), ?assert(is_map(Flags)), - ?assert(is_map(Headers)), ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties), ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000), ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000), diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 6f6e06ff9..35ff60831 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -23,8 +23,6 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --import(emqx_rule_events, [eventmsg_publish/1]). - -define(PROPTEST(F), ?assert(proper:quickcheck(F()))). %%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))). @@ -36,6 +34,11 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. + +eventmsg_publish(Msg) -> + {Columns, _} = emqx_rule_events:eventmsg_publish(Msg), + Columns. + %%------------------------------------------------------------------------------ %% Test cases for IoT Funcs %%------------------------------------------------------------------------------