Merge pull request #8260 from terry-xiaoyu/remove_headers_field_from_rule_events
fix: remove the 'headers' field from the rule events
This commit is contained in:
commit
89a51ace51
|
@ -118,7 +118,7 @@ unload_hook() ->
|
||||||
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
||||||
case maps:get(sys, Flags, false) of
|
case maps:get(sys, Flags, false) of
|
||||||
false ->
|
false ->
|
||||||
Msg = emqx_rule_events:eventmsg_publish(Message),
|
{Msg, _} = emqx_rule_events:eventmsg_publish(Message),
|
||||||
send_to_matched_egress_bridges(Topic, Msg);
|
send_to_matched_egress_bridges(Topic, Msg);
|
||||||
true ->
|
true ->
|
||||||
ok
|
ok
|
||||||
|
|
|
@ -63,7 +63,8 @@ make_pub_vars(Mountpoint, Conf) when is_map(Conf) ->
|
||||||
exp_msg().
|
exp_msg().
|
||||||
to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
|
to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
|
||||||
Retain0 = maps:get(retain, Flags0, false),
|
Retain0 = maps:get(retain, Flags0, false),
|
||||||
MapMsg = maps:put(retain, Retain0, emqx_rule_events:eventmsg_publish(Msg)),
|
{Columns, _} = emqx_rule_events:eventmsg_publish(Msg),
|
||||||
|
MapMsg = maps:put(retain, Retain0, Columns),
|
||||||
to_remote_msg(MapMsg, Vars);
|
to_remote_msg(MapMsg, Vars);
|
||||||
to_remote_msg(MapMsg, #{
|
to_remote_msg(MapMsg, #{
|
||||||
remote_topic := TopicToken,
|
remote_topic := TopicToken,
|
||||||
|
|
|
@ -25,6 +25,8 @@
|
||||||
|
|
||||||
-import(hoconsc, [mk/2, ref/2, array/1]).
|
-import(hoconsc, [mk/2, ref/2, array/1]).
|
||||||
|
|
||||||
|
-export([printable_function_name/2]).
|
||||||
|
|
||||||
%% Swagger specs from hocon schema
|
%% Swagger specs from hocon schema
|
||||||
-export([api_spec/0, paths/0, schema/1, namespace/0]).
|
-export([api_spec/0, paths/0, schema/1, namespace/0]).
|
||||||
|
|
||||||
|
|
|
@ -84,10 +84,17 @@ pretty_print_rule(ID) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% erlfmt-ignore
|
%% erlfmt-ignore
|
||||||
format_action(#{func := Func, args := Args}) ->
|
format_action(#{mod := Mod, func := Func, args := Args}) ->
|
||||||
io_lib:format("Function:\n ~p\n"
|
Name = emqx_rule_engine_api:printable_function_name(Mod, Func),
|
||||||
"Args:\n ~p\n"
|
io_lib:format("- Name: ~s\n"
|
||||||
,[Func, maps:without([preprocessed_tmpl], Args)]
|
" Type: function\n"
|
||||||
|
" Args: ~p\n"
|
||||||
|
,[Name, maps:without([preprocessed_tmpl], Args)]
|
||||||
|
);
|
||||||
|
format_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
|
||||||
|
io_lib:format("- Name: ~s\n"
|
||||||
|
" Type: data-bridge\n"
|
||||||
|
,[BridgeChannelId]
|
||||||
).
|
).
|
||||||
|
|
||||||
left_pad(Str) ->
|
left_pad(Str) ->
|
||||||
|
|
|
@ -107,36 +107,41 @@ unload(Topic) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Callbacks
|
%% Callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
on_message_publish(Message = #message{topic = Topic}, _Env) ->
|
on_message_publish(Message = #message{topic = Topic}, _Conf) ->
|
||||||
case ignore_sys_message(Message) of
|
case ignore_sys_message(Message) of
|
||||||
true ->
|
true ->
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
case emqx_rule_engine:get_rules_for_topic(Topic) of
|
case emqx_rule_engine:get_rules_for_topic(Topic) of
|
||||||
[] -> ok;
|
[] ->
|
||||||
Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message))
|
ok;
|
||||||
|
Rules ->
|
||||||
|
%% ENVs are the fields that can't be refereced by the SQL, but can be used
|
||||||
|
%% from actions. e.g. The 'headers' field in the internal record `#message{}`.
|
||||||
|
{Columns, Envs} = eventmsg_publish(Message),
|
||||||
|
emqx_rule_runtime:apply_rules(Rules, Columns, Envs)
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
{ok, Message}.
|
{ok, Message}.
|
||||||
|
|
||||||
on_bridge_message_received(Message, Env = #{event_topic := BridgeTopic}) ->
|
on_bridge_message_received(Message, Conf = #{event_topic := BridgeTopic}) ->
|
||||||
apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message) end, Env).
|
apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message, #{}) end, Conf).
|
||||||
|
|
||||||
on_client_connected(ClientInfo, ConnInfo, Env) ->
|
on_client_connected(ClientInfo, ConnInfo, Conf) ->
|
||||||
apply_event(
|
apply_event(
|
||||||
'client.connected',
|
'client.connected',
|
||||||
fun() -> eventmsg_connected(ClientInfo, ConnInfo) end,
|
fun() -> eventmsg_connected(ClientInfo, ConnInfo) end,
|
||||||
Env
|
Conf
|
||||||
).
|
).
|
||||||
|
|
||||||
on_client_connack(ConnInfo, Reason, _, Env) ->
|
on_client_connack(ConnInfo, Reason, _, Conf) ->
|
||||||
apply_event(
|
apply_event(
|
||||||
'client.connack',
|
'client.connack',
|
||||||
fun() -> eventmsg_connack(ConnInfo, Reason) end,
|
fun() -> eventmsg_connack(ConnInfo, Reason) end,
|
||||||
Env
|
Conf
|
||||||
).
|
).
|
||||||
|
|
||||||
on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, Env) ->
|
on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, Conf) ->
|
||||||
apply_event(
|
apply_event(
|
||||||
'client.check_authz_complete',
|
'client.check_authz_complete',
|
||||||
fun() ->
|
fun() ->
|
||||||
|
@ -148,35 +153,35 @@ on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, E
|
||||||
AuthzSource
|
AuthzSource
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
Env
|
Conf
|
||||||
).
|
).
|
||||||
|
|
||||||
on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
|
on_client_disconnected(ClientInfo, Reason, ConnInfo, Conf) ->
|
||||||
apply_event(
|
apply_event(
|
||||||
'client.disconnected',
|
'client.disconnected',
|
||||||
fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end,
|
fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end,
|
||||||
Env
|
Conf
|
||||||
).
|
).
|
||||||
|
|
||||||
on_session_subscribed(ClientInfo, Topic, SubOpts, Env) ->
|
on_session_subscribed(ClientInfo, Topic, SubOpts, Conf) ->
|
||||||
apply_event(
|
apply_event(
|
||||||
'session.subscribed',
|
'session.subscribed',
|
||||||
fun() ->
|
fun() ->
|
||||||
eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts)
|
eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts)
|
||||||
end,
|
end,
|
||||||
Env
|
Conf
|
||||||
).
|
).
|
||||||
|
|
||||||
on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env) ->
|
on_session_unsubscribed(ClientInfo, Topic, SubOpts, Conf) ->
|
||||||
apply_event(
|
apply_event(
|
||||||
'session.unsubscribed',
|
'session.unsubscribed',
|
||||||
fun() ->
|
fun() ->
|
||||||
eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts)
|
eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts)
|
||||||
end,
|
end,
|
||||||
Env
|
Conf
|
||||||
).
|
).
|
||||||
|
|
||||||
on_message_dropped(Message, _, Reason, Env) ->
|
on_message_dropped(Message, _, Reason, Conf) ->
|
||||||
case ignore_sys_message(Message) of
|
case ignore_sys_message(Message) of
|
||||||
true ->
|
true ->
|
||||||
ok;
|
ok;
|
||||||
|
@ -184,12 +189,12 @@ on_message_dropped(Message, _, Reason, Env) ->
|
||||||
apply_event(
|
apply_event(
|
||||||
'message.dropped',
|
'message.dropped',
|
||||||
fun() -> eventmsg_dropped(Message, Reason) end,
|
fun() -> eventmsg_dropped(Message, Reason) end,
|
||||||
Env
|
Conf
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
{ok, Message}.
|
{ok, Message}.
|
||||||
|
|
||||||
on_message_delivered(ClientInfo, Message, Env) ->
|
on_message_delivered(ClientInfo, Message, Conf) ->
|
||||||
case ignore_sys_message(Message) of
|
case ignore_sys_message(Message) of
|
||||||
true ->
|
true ->
|
||||||
ok;
|
ok;
|
||||||
|
@ -197,12 +202,12 @@ on_message_delivered(ClientInfo, Message, Env) ->
|
||||||
apply_event(
|
apply_event(
|
||||||
'message.delivered',
|
'message.delivered',
|
||||||
fun() -> eventmsg_delivered(ClientInfo, Message) end,
|
fun() -> eventmsg_delivered(ClientInfo, Message) end,
|
||||||
Env
|
Conf
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
{ok, Message}.
|
{ok, Message}.
|
||||||
|
|
||||||
on_message_acked(ClientInfo, Message, Env) ->
|
on_message_acked(ClientInfo, Message, Conf) ->
|
||||||
case ignore_sys_message(Message) of
|
case ignore_sys_message(Message) of
|
||||||
true ->
|
true ->
|
||||||
ok;
|
ok;
|
||||||
|
@ -210,12 +215,12 @@ on_message_acked(ClientInfo, Message, Env) ->
|
||||||
apply_event(
|
apply_event(
|
||||||
'message.acked',
|
'message.acked',
|
||||||
fun() -> eventmsg_acked(ClientInfo, Message) end,
|
fun() -> eventmsg_acked(ClientInfo, Message) end,
|
||||||
Env
|
Conf
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
{ok, Message}.
|
{ok, Message}.
|
||||||
|
|
||||||
on_delivery_dropped(ClientInfo, Message, Reason, Env) ->
|
on_delivery_dropped(ClientInfo, Message, Reason, Conf) ->
|
||||||
case ignore_sys_message(Message) of
|
case ignore_sys_message(Message) of
|
||||||
true ->
|
true ->
|
||||||
ok;
|
ok;
|
||||||
|
@ -223,7 +228,7 @@ on_delivery_dropped(ClientInfo, Message, Reason, Env) ->
|
||||||
apply_event(
|
apply_event(
|
||||||
'delivery.dropped',
|
'delivery.dropped',
|
||||||
fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end,
|
fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end,
|
||||||
Env
|
Conf
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
{ok, Message}.
|
{ok, Message}.
|
||||||
|
@ -256,10 +261,9 @@ eventmsg_publish(
|
||||||
qos => QoS,
|
qos => QoS,
|
||||||
flags => Flags,
|
flags => Flags,
|
||||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||||
%% the column 'headers' will be removed in the next major release
|
|
||||||
headers => printable_maps(Headers),
|
|
||||||
publish_received_at => Timestamp
|
publish_received_at => Timestamp
|
||||||
}
|
},
|
||||||
|
#{headers => Headers}
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_connected(
|
eventmsg_connected(
|
||||||
|
@ -299,7 +303,8 @@ eventmsg_connected(
|
||||||
is_bridge => IsBridge,
|
is_bridge => IsBridge,
|
||||||
conn_props => printable_maps(ConnProps),
|
conn_props => printable_maps(ConnProps),
|
||||||
connected_at => ConnectedAt
|
connected_at => ConnectedAt
|
||||||
}
|
},
|
||||||
|
#{}
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_disconnected(
|
eventmsg_disconnected(
|
||||||
|
@ -328,7 +333,8 @@ eventmsg_disconnected(
|
||||||
proto_ver => ProtoVer,
|
proto_ver => ProtoVer,
|
||||||
disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})),
|
disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})),
|
||||||
disconnected_at => DisconnectedAt
|
disconnected_at => DisconnectedAt
|
||||||
}
|
},
|
||||||
|
#{}
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_connack(
|
eventmsg_connack(
|
||||||
|
@ -360,7 +366,8 @@ eventmsg_connack(
|
||||||
keepalive => Keepalive,
|
keepalive => Keepalive,
|
||||||
expiry_interval => ExpiryInterval,
|
expiry_interval => ExpiryInterval,
|
||||||
conn_props => printable_maps(ConnProps)
|
conn_props => printable_maps(ConnProps)
|
||||||
}
|
},
|
||||||
|
#{}
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_check_authz_complete(
|
eventmsg_check_authz_complete(
|
||||||
|
@ -384,7 +391,8 @@ eventmsg_check_authz_complete(
|
||||||
action => PubSub,
|
action => PubSub,
|
||||||
authz_source => AuthzSource,
|
authz_source => AuthzSource,
|
||||||
result => Result
|
result => Result
|
||||||
}
|
},
|
||||||
|
#{}
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_sub_or_unsub(
|
eventmsg_sub_or_unsub(
|
||||||
|
@ -407,7 +415,8 @@ eventmsg_sub_or_unsub(
|
||||||
PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})),
|
PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})),
|
||||||
topic => Topic,
|
topic => Topic,
|
||||||
qos => QoS
|
qos => QoS
|
||||||
}
|
},
|
||||||
|
#{}
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_dropped(
|
eventmsg_dropped(
|
||||||
|
@ -435,11 +444,10 @@ eventmsg_dropped(
|
||||||
topic => Topic,
|
topic => Topic,
|
||||||
qos => QoS,
|
qos => QoS,
|
||||||
flags => Flags,
|
flags => Flags,
|
||||||
%% the column 'headers' will be removed in the next major release
|
|
||||||
headers => printable_maps(Headers),
|
|
||||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||||
publish_received_at => Timestamp
|
publish_received_at => Timestamp
|
||||||
}
|
},
|
||||||
|
#{headers => Headers}
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_delivered(
|
eventmsg_delivered(
|
||||||
|
@ -472,11 +480,10 @@ eventmsg_delivered(
|
||||||
topic => Topic,
|
topic => Topic,
|
||||||
qos => QoS,
|
qos => QoS,
|
||||||
flags => Flags,
|
flags => Flags,
|
||||||
%% the column 'headers' will be removed in the next major release
|
|
||||||
headers => printable_maps(Headers),
|
|
||||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||||
publish_received_at => Timestamp
|
publish_received_at => Timestamp
|
||||||
}
|
},
|
||||||
|
#{headers => Headers}
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_acked(
|
eventmsg_acked(
|
||||||
|
@ -509,12 +516,11 @@ eventmsg_acked(
|
||||||
topic => Topic,
|
topic => Topic,
|
||||||
qos => QoS,
|
qos => QoS,
|
||||||
flags => Flags,
|
flags => Flags,
|
||||||
%% the column 'headers' will be removed in the next major release
|
|
||||||
headers => printable_maps(Headers),
|
|
||||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||||
puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})),
|
puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})),
|
||||||
publish_received_at => Timestamp
|
publish_received_at => Timestamp
|
||||||
}
|
},
|
||||||
|
#{headers => Headers}
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_delivery_dropped(
|
eventmsg_delivery_dropped(
|
||||||
|
@ -549,34 +555,37 @@ eventmsg_delivery_dropped(
|
||||||
topic => Topic,
|
topic => Topic,
|
||||||
qos => QoS,
|
qos => QoS,
|
||||||
flags => Flags,
|
flags => Flags,
|
||||||
%% the column 'headers' will be removed in the next major release
|
|
||||||
headers => printable_maps(Headers),
|
|
||||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||||
publish_received_at => Timestamp
|
publish_received_at => Timestamp
|
||||||
}
|
},
|
||||||
|
#{headers => Headers}
|
||||||
).
|
).
|
||||||
|
|
||||||
sub_unsub_prop_key('session.subscribed') -> sub_props;
|
sub_unsub_prop_key('session.subscribed') -> sub_props;
|
||||||
sub_unsub_prop_key('session.unsubscribed') -> unsub_props.
|
sub_unsub_prop_key('session.unsubscribed') -> unsub_props.
|
||||||
|
|
||||||
with_basic_columns(EventName, Data) when is_map(Data) ->
|
with_basic_columns(EventName, Columns, Envs) when is_map(Columns) ->
|
||||||
Data#{
|
{
|
||||||
|
Columns#{
|
||||||
event => EventName,
|
event => EventName,
|
||||||
timestamp => erlang:system_time(millisecond),
|
timestamp => erlang:system_time(millisecond),
|
||||||
node => node()
|
node => node()
|
||||||
|
},
|
||||||
|
Envs
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% rules applying
|
%% rules applying
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
apply_event(EventName, GenEventMsg, _Env) ->
|
apply_event(EventName, GenEventMsg, _Conf) ->
|
||||||
EventTopic = event_topic(EventName),
|
EventTopic = event_topic(EventName),
|
||||||
case emqx_rule_engine:get_rules_for_topic(EventTopic) of
|
case emqx_rule_engine:get_rules_for_topic(EventTopic) of
|
||||||
[] ->
|
[] ->
|
||||||
ok;
|
ok;
|
||||||
Rules ->
|
Rules ->
|
||||||
%% delay the generating of eventmsg after we have found some rules to apply
|
%% delay the generating of eventmsg after we have found some rules to apply
|
||||||
emqx_rule_runtime:apply_rules(Rules, GenEventMsg())
|
{Columns, Envs} = GenEventMsg(),
|
||||||
|
emqx_rule_runtime:apply_rules(Rules, Columns, Envs)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -777,7 +786,6 @@ columns_with_exam('message.publish') ->
|
||||||
{<<"topic">>, <<"t/a">>},
|
{<<"topic">>, <<"t/a">>},
|
||||||
{<<"qos">>, 1},
|
{<<"qos">>, 1},
|
||||||
{<<"flags">>, #{}},
|
{<<"flags">>, #{}},
|
||||||
{<<"headers">>, undefined},
|
|
||||||
{<<"publish_received_at">>, erlang:system_time(millisecond)},
|
{<<"publish_received_at">>, erlang:system_time(millisecond)},
|
||||||
columns_example_props(pub_props),
|
columns_example_props(pub_props),
|
||||||
{<<"timestamp">>, erlang:system_time(millisecond)},
|
{<<"timestamp">>, erlang:system_time(millisecond)},
|
||||||
|
|
|
@ -21,8 +21,8 @@
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
apply_rule/2,
|
apply_rule/3,
|
||||||
apply_rules/2,
|
apply_rules/3,
|
||||||
clear_rule_payload/0
|
clear_rule_payload/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@
|
||||||
|
|
||||||
-compile({no_auto_import, [alias/1]}).
|
-compile({no_auto_import, [alias/1]}).
|
||||||
|
|
||||||
-type input() :: map().
|
-type columns() :: map().
|
||||||
-type alias() :: atom().
|
-type alias() :: atom().
|
||||||
-type collection() :: {alias(), [term()]}.
|
-type collection() :: {alias(), [term()]}.
|
||||||
|
|
||||||
|
@ -50,24 +50,24 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Apply rules
|
%% Apply rules
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec apply_rules(list(rule()), input()) -> ok.
|
-spec apply_rules(list(rule()), columns(), envs()) -> ok.
|
||||||
apply_rules([], _Input) ->
|
apply_rules([], _Columns, _Envs) ->
|
||||||
ok;
|
ok;
|
||||||
apply_rules([#{enable := false} | More], Input) ->
|
apply_rules([#{enable := false} | More], Columns, Envs) ->
|
||||||
apply_rules(More, Input);
|
apply_rules(More, Columns, Envs);
|
||||||
apply_rules([Rule | More], Input) ->
|
apply_rules([Rule | More], Columns, Envs) ->
|
||||||
apply_rule_discard_result(Rule, Input),
|
apply_rule_discard_result(Rule, Columns, Envs),
|
||||||
apply_rules(More, Input).
|
apply_rules(More, Columns, Envs).
|
||||||
|
|
||||||
apply_rule_discard_result(Rule, Input) ->
|
apply_rule_discard_result(Rule, Columns, Envs) ->
|
||||||
_ = apply_rule(Rule, Input),
|
_ = apply_rule(Rule, Columns, Envs),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
apply_rule(Rule = #{id := RuleID}, Input) ->
|
apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
|
||||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'matched'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'matched'),
|
||||||
clear_rule_payload(),
|
clear_rule_payload(),
|
||||||
try
|
try
|
||||||
do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID}))
|
do_apply_rule(Rule, add_metadata(Columns, #{rule_id => RuleID}), Envs)
|
||||||
catch
|
catch
|
||||||
%% ignore the errors if select or match failed
|
%% ignore the errors if select or match failed
|
||||||
_:Reason = {select_and_transform_error, Error} ->
|
_:Reason = {select_and_transform_error, Error} ->
|
||||||
|
@ -124,13 +124,14 @@ do_apply_rule(
|
||||||
conditions := Conditions,
|
conditions := Conditions,
|
||||||
actions := Actions
|
actions := Actions
|
||||||
},
|
},
|
||||||
Input
|
Columns,
|
||||||
|
Envs
|
||||||
) ->
|
) ->
|
||||||
{Selected, Collection} = ?RAISE(
|
{Selected, Collection} = ?RAISE(
|
||||||
select_and_collect(Fields, Input),
|
select_and_collect(Fields, Columns),
|
||||||
{select_and_collect_error, {_EXCLASS_, _EXCPTION_, _ST_}}
|
{select_and_collect_error, {_EXCLASS_, _EXCPTION_, _ST_}}
|
||||||
),
|
),
|
||||||
ColumnsAndSelected = maps:merge(Input, Selected),
|
ColumnsAndSelected = maps:merge(Columns, Selected),
|
||||||
case
|
case
|
||||||
?RAISE(
|
?RAISE(
|
||||||
match_conditions(Conditions, ColumnsAndSelected),
|
match_conditions(Conditions, ColumnsAndSelected),
|
||||||
|
@ -138,14 +139,15 @@ do_apply_rule(
|
||||||
)
|
)
|
||||||
of
|
of
|
||||||
true ->
|
true ->
|
||||||
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
|
Collection2 = filter_collection(Columns, InCase, DoEach, Collection),
|
||||||
case Collection2 of
|
case Collection2 of
|
||||||
[] ->
|
[] ->
|
||||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result');
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result');
|
||||||
_ ->
|
_ ->
|
||||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed')
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed')
|
||||||
end,
|
end,
|
||||||
{ok, [handle_action_list(RuleId, Actions, Coll, Input) || Coll <- Collection2]};
|
NewEnvs = maps:merge(Columns, Envs),
|
||||||
|
{ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- Collection2]};
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
|
||||||
{error, nomatch}
|
{error, nomatch}
|
||||||
|
@ -158,21 +160,22 @@ do_apply_rule(
|
||||||
conditions := Conditions,
|
conditions := Conditions,
|
||||||
actions := Actions
|
actions := Actions
|
||||||
},
|
},
|
||||||
Input
|
Columns,
|
||||||
|
Envs
|
||||||
) ->
|
) ->
|
||||||
Selected = ?RAISE(
|
Selected = ?RAISE(
|
||||||
select_and_transform(Fields, Input),
|
select_and_transform(Fields, Columns),
|
||||||
{select_and_transform_error, {_EXCLASS_, _EXCPTION_, _ST_}}
|
{select_and_transform_error, {_EXCLASS_, _EXCPTION_, _ST_}}
|
||||||
),
|
),
|
||||||
case
|
case
|
||||||
?RAISE(
|
?RAISE(
|
||||||
match_conditions(Conditions, maps:merge(Input, Selected)),
|
match_conditions(Conditions, maps:merge(Columns, Selected)),
|
||||||
{match_conditions_error, {_EXCLASS_, _EXCPTION_, _ST_}}
|
{match_conditions_error, {_EXCLASS_, _EXCPTION_, _ST_}}
|
||||||
)
|
)
|
||||||
of
|
of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
|
||||||
{ok, handle_action_list(RuleId, Actions, Selected, Input)};
|
{ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
|
||||||
{error, nomatch}
|
{error, nomatch}
|
||||||
|
@ -182,73 +185,73 @@ clear_rule_payload() ->
|
||||||
erlang:erase(rule_payload).
|
erlang:erase(rule_payload).
|
||||||
|
|
||||||
%% SELECT Clause
|
%% SELECT Clause
|
||||||
select_and_transform(Fields, Input) ->
|
select_and_transform(Fields, Columns) ->
|
||||||
select_and_transform(Fields, Input, #{}).
|
select_and_transform(Fields, Columns, #{}).
|
||||||
|
|
||||||
select_and_transform([], _Input, Action) ->
|
select_and_transform([], _Columns, Action) ->
|
||||||
Action;
|
Action;
|
||||||
select_and_transform(['*' | More], Input, Action) ->
|
select_and_transform(['*' | More], Columns, Action) ->
|
||||||
select_and_transform(More, Input, maps:merge(Action, Input));
|
select_and_transform(More, Columns, maps:merge(Action, Columns));
|
||||||
select_and_transform([{as, Field, Alias} | More], Input, Action) ->
|
select_and_transform([{as, Field, Alias} | More], Columns, Action) ->
|
||||||
Val = eval(Field, Input),
|
Val = eval(Field, Columns),
|
||||||
select_and_transform(
|
select_and_transform(
|
||||||
More,
|
More,
|
||||||
nested_put(Alias, Val, Input),
|
nested_put(Alias, Val, Columns),
|
||||||
nested_put(Alias, Val, Action)
|
nested_put(Alias, Val, Action)
|
||||||
);
|
);
|
||||||
select_and_transform([Field | More], Input, Action) ->
|
select_and_transform([Field | More], Columns, Action) ->
|
||||||
Val = eval(Field, Input),
|
Val = eval(Field, Columns),
|
||||||
Key = alias(Field),
|
Key = alias(Field),
|
||||||
select_and_transform(
|
select_and_transform(
|
||||||
More,
|
More,
|
||||||
nested_put(Key, Val, Input),
|
nested_put(Key, Val, Columns),
|
||||||
nested_put(Key, Val, Action)
|
nested_put(Key, Val, Action)
|
||||||
).
|
).
|
||||||
|
|
||||||
%% FOREACH Clause
|
%% FOREACH Clause
|
||||||
-spec select_and_collect(list(), input()) -> {input(), collection()}.
|
-spec select_and_collect(list(), columns()) -> {columns(), collection()}.
|
||||||
select_and_collect(Fields, Input) ->
|
select_and_collect(Fields, Columns) ->
|
||||||
select_and_collect(Fields, Input, {#{}, {'item', []}}).
|
select_and_collect(Fields, Columns, {#{}, {'item', []}}).
|
||||||
|
|
||||||
select_and_collect([{as, Field, {_, A} = Alias}], Input, {Action, _}) ->
|
select_and_collect([{as, Field, {_, A} = Alias}], Columns, {Action, _}) ->
|
||||||
Val = eval(Field, Input),
|
Val = eval(Field, Columns),
|
||||||
{nested_put(Alias, Val, Action), {A, ensure_list(Val)}};
|
{nested_put(Alias, Val, Action), {A, ensure_list(Val)}};
|
||||||
select_and_collect([{as, Field, Alias} | More], Input, {Action, LastKV}) ->
|
select_and_collect([{as, Field, Alias} | More], Columns, {Action, LastKV}) ->
|
||||||
Val = eval(Field, Input),
|
Val = eval(Field, Columns),
|
||||||
select_and_collect(
|
select_and_collect(
|
||||||
More,
|
More,
|
||||||
nested_put(Alias, Val, Input),
|
nested_put(Alias, Val, Columns),
|
||||||
{nested_put(Alias, Val, Action), LastKV}
|
{nested_put(Alias, Val, Action), LastKV}
|
||||||
);
|
);
|
||||||
select_and_collect([Field], Input, {Action, _}) ->
|
select_and_collect([Field], Columns, {Action, _}) ->
|
||||||
Val = eval(Field, Input),
|
Val = eval(Field, Columns),
|
||||||
Key = alias(Field),
|
Key = alias(Field),
|
||||||
{nested_put(Key, Val, Action), {'item', ensure_list(Val)}};
|
{nested_put(Key, Val, Action), {'item', ensure_list(Val)}};
|
||||||
select_and_collect([Field | More], Input, {Action, LastKV}) ->
|
select_and_collect([Field | More], Columns, {Action, LastKV}) ->
|
||||||
Val = eval(Field, Input),
|
Val = eval(Field, Columns),
|
||||||
Key = alias(Field),
|
Key = alias(Field),
|
||||||
select_and_collect(
|
select_and_collect(
|
||||||
More,
|
More,
|
||||||
nested_put(Key, Val, Input),
|
nested_put(Key, Val, Columns),
|
||||||
{nested_put(Key, Val, Action), LastKV}
|
{nested_put(Key, Val, Action), LastKV}
|
||||||
).
|
).
|
||||||
|
|
||||||
%% Filter each item got from FOREACH
|
%% Filter each item got from FOREACH
|
||||||
filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) ->
|
filter_collection(Columns, InCase, DoEach, {CollKey, CollVal}) ->
|
||||||
lists:filtermap(
|
lists:filtermap(
|
||||||
fun(Item) ->
|
fun(Item) ->
|
||||||
InputAndItem = maps:merge(Input, #{CollKey => Item}),
|
ColumnsAndItem = maps:merge(Columns, #{CollKey => Item}),
|
||||||
case
|
case
|
||||||
?RAISE(
|
?RAISE(
|
||||||
match_conditions(InCase, InputAndItem),
|
match_conditions(InCase, ColumnsAndItem),
|
||||||
{match_incase_error, {_EXCLASS_, _EXCPTION_, _ST_}}
|
{match_incase_error, {_EXCLASS_, _EXCPTION_, _ST_}}
|
||||||
)
|
)
|
||||||
of
|
of
|
||||||
true when DoEach == [] -> {true, InputAndItem};
|
true when DoEach == [] -> {true, ColumnsAndItem};
|
||||||
true ->
|
true ->
|
||||||
{true,
|
{true,
|
||||||
?RAISE(
|
?RAISE(
|
||||||
select_and_transform(DoEach, InputAndItem),
|
select_and_transform(DoEach, ColumnsAndItem),
|
||||||
{doeach_error, {_EXCLASS_, _EXCPTION_, _ST_}}
|
{doeach_error, {_EXCLASS_, _EXCPTION_, _ST_}}
|
||||||
)};
|
)};
|
||||||
false ->
|
false ->
|
||||||
|
@ -356,41 +359,41 @@ eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
|
||||||
nested_get({path, Path}, may_decode_payload(Payload));
|
nested_get({path, Path}, may_decode_payload(Payload));
|
||||||
eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->
|
eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->
|
||||||
nested_get({path, Path}, may_decode_payload(Payload));
|
nested_get({path, Path}, may_decode_payload(Payload));
|
||||||
eval({path, _} = Path, Input) ->
|
eval({path, _} = Path, Columns) ->
|
||||||
nested_get(Path, Input);
|
nested_get(Path, Columns);
|
||||||
eval({range, {Begin, End}}, _Input) ->
|
eval({range, {Begin, End}}, _Columns) ->
|
||||||
range_gen(Begin, End);
|
range_gen(Begin, End);
|
||||||
eval({get_range, {Begin, End}, Data}, Input) ->
|
eval({get_range, {Begin, End}, Data}, Columns) ->
|
||||||
range_get(Begin, End, eval(Data, Input));
|
range_get(Begin, End, eval(Data, Columns));
|
||||||
eval({var, _} = Var, Input) ->
|
eval({var, _} = Var, Columns) ->
|
||||||
nested_get(Var, Input);
|
nested_get(Var, Columns);
|
||||||
eval({const, Val}, _Input) ->
|
eval({const, Val}, _Columns) ->
|
||||||
Val;
|
Val;
|
||||||
%% unary add
|
%% unary add
|
||||||
eval({'+', L}, Input) ->
|
eval({'+', L}, Columns) ->
|
||||||
eval(L, Input);
|
eval(L, Columns);
|
||||||
%% unary subtract
|
%% unary subtract
|
||||||
eval({'-', L}, Input) ->
|
eval({'-', L}, Columns) ->
|
||||||
-(eval(L, Input));
|
-(eval(L, Columns));
|
||||||
eval({Op, L, R}, Input) when ?is_arith(Op) ->
|
eval({Op, L, R}, Columns) when ?is_arith(Op) ->
|
||||||
apply_func(Op, [eval(L, Input), eval(R, Input)], Input);
|
apply_func(Op, [eval(L, Columns), eval(R, Columns)], Columns);
|
||||||
eval({Op, L, R}, Input) when ?is_comp(Op) ->
|
eval({Op, L, R}, Columns) when ?is_comp(Op) ->
|
||||||
compare(Op, eval(L, Input), eval(R, Input));
|
compare(Op, eval(L, Columns), eval(R, Columns));
|
||||||
eval({list, List}, Input) ->
|
eval({list, List}, Columns) ->
|
||||||
[eval(L, Input) || L <- List];
|
[eval(L, Columns) || L <- List];
|
||||||
eval({'case', <<>>, CaseClauses, ElseClauses}, Input) ->
|
eval({'case', <<>>, CaseClauses, ElseClauses}, Columns) ->
|
||||||
eval_case_clauses(CaseClauses, ElseClauses, Input);
|
eval_case_clauses(CaseClauses, ElseClauses, Columns);
|
||||||
eval({'case', CaseOn, CaseClauses, ElseClauses}, Input) ->
|
eval({'case', CaseOn, CaseClauses, ElseClauses}, Columns) ->
|
||||||
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input);
|
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Columns);
|
||||||
eval({'fun', {_, Name}, Args}, Input) ->
|
eval({'fun', {_, Name}, Args}, Columns) ->
|
||||||
apply_func(Name, [eval(Arg, Input) || Arg <- Args], Input).
|
apply_func(Name, [eval(Arg, Columns) || Arg <- Args], Columns).
|
||||||
|
|
||||||
handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Input) ->
|
handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Columns) ->
|
||||||
Input#{payload => may_decode_payload(Payload)};
|
Columns#{payload => may_decode_payload(Payload)};
|
||||||
handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Input) ->
|
handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Columns) ->
|
||||||
Input#{<<"payload">> => may_decode_payload(Payload)};
|
Columns#{<<"payload">> => may_decode_payload(Payload)};
|
||||||
handle_alias(_, Input) ->
|
handle_alias(_, Columns) ->
|
||||||
Input.
|
Columns.
|
||||||
|
|
||||||
alias({var, Var}) ->
|
alias({var, Var}) ->
|
||||||
{var, Var};
|
{var, Var};
|
||||||
|
@ -417,55 +420,55 @@ alias({'fun', Name, _}) ->
|
||||||
alias(_) ->
|
alias(_) ->
|
||||||
?ephemeral_alias(unknown, unknown).
|
?ephemeral_alias(unknown, unknown).
|
||||||
|
|
||||||
eval_case_clauses([], ElseClauses, Input) ->
|
eval_case_clauses([], ElseClauses, Columns) ->
|
||||||
case ElseClauses of
|
case ElseClauses of
|
||||||
{} -> undefined;
|
{} -> undefined;
|
||||||
_ -> eval(ElseClauses, Input)
|
_ -> eval(ElseClauses, Columns)
|
||||||
end;
|
end;
|
||||||
eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
|
eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Columns) ->
|
||||||
case match_conditions(Cond, Input) of
|
case match_conditions(Cond, Columns) of
|
||||||
true ->
|
true ->
|
||||||
eval(Clause, Input);
|
eval(Clause, Columns);
|
||||||
_ ->
|
_ ->
|
||||||
eval_case_clauses(CaseClauses, ElseClauses, Input)
|
eval_case_clauses(CaseClauses, ElseClauses, Columns)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
eval_switch_clauses(_CaseOn, [], ElseClauses, Input) ->
|
eval_switch_clauses(_CaseOn, [], ElseClauses, Columns) ->
|
||||||
case ElseClauses of
|
case ElseClauses of
|
||||||
{} -> undefined;
|
{} -> undefined;
|
||||||
_ -> eval(ElseClauses, Input)
|
_ -> eval(ElseClauses, Columns)
|
||||||
end;
|
end;
|
||||||
eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
|
eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Columns) ->
|
||||||
ConResult = eval(Cond, Input),
|
ConResult = eval(Cond, Columns),
|
||||||
case eval(CaseOn, Input) of
|
case eval(CaseOn, Columns) of
|
||||||
ConResult ->
|
ConResult ->
|
||||||
eval(Clause, Input);
|
eval(Clause, Columns);
|
||||||
_ ->
|
_ ->
|
||||||
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input)
|
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Columns)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
apply_func(Name, Args, Input) when is_atom(Name) ->
|
apply_func(Name, Args, Columns) when is_atom(Name) ->
|
||||||
do_apply_func(Name, Args, Input);
|
do_apply_func(Name, Args, Columns);
|
||||||
apply_func(Name, Args, Input) when is_binary(Name) ->
|
apply_func(Name, Args, Columns) when is_binary(Name) ->
|
||||||
FunName =
|
FunName =
|
||||||
try
|
try
|
||||||
binary_to_existing_atom(Name, utf8)
|
binary_to_existing_atom(Name, utf8)
|
||||||
catch
|
catch
|
||||||
error:badarg -> error({sql_function_not_supported, Name})
|
error:badarg -> error({sql_function_not_supported, Name})
|
||||||
end,
|
end,
|
||||||
do_apply_func(FunName, Args, Input).
|
do_apply_func(FunName, Args, Columns).
|
||||||
|
|
||||||
do_apply_func(Name, Args, Input) ->
|
do_apply_func(Name, Args, Columns) ->
|
||||||
case erlang:apply(emqx_rule_funcs, Name, Args) of
|
case erlang:apply(emqx_rule_funcs, Name, Args) of
|
||||||
Func when is_function(Func) ->
|
Func when is_function(Func) ->
|
||||||
erlang:apply(Func, [Input]);
|
erlang:apply(Func, [Columns]);
|
||||||
Result ->
|
Result ->
|
||||||
Result
|
Result
|
||||||
end.
|
end.
|
||||||
|
|
||||||
add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) ->
|
add_metadata(Columns, Metadata) when is_map(Columns), is_map(Metadata) ->
|
||||||
NewMetadata = maps:merge(maps:get(metadata, Input, #{}), Metadata),
|
NewMetadata = maps:merge(maps:get(metadata, Columns, #{}), Metadata),
|
||||||
Input#{metadata => NewMetadata}.
|
Columns#{metadata => NewMetadata}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal Functions
|
%% Internal Functions
|
||||||
|
@ -495,6 +498,6 @@ safe_decode_and_cache(MaybeJson) ->
|
||||||
ensure_list(List) when is_list(List) -> List;
|
ensure_list(List) when is_list(List) -> List;
|
||||||
ensure_list(_NotList) -> [].
|
ensure_list(_NotList) -> [].
|
||||||
|
|
||||||
nested_put(Alias, Val, Input0) ->
|
nested_put(Alias, Val, Columns0) ->
|
||||||
Input = handle_alias(Alias, Input0),
|
Columns = handle_alias(Alias, Columns0),
|
||||||
emqx_rule_maps:nested_put(Alias, Val, Input).
|
emqx_rule_maps:nested_put(Alias, Val, Columns).
|
||||||
|
|
|
@ -61,7 +61,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
|
||||||
created_at => erlang:system_time(millisecond)
|
created_at => erlang:system_time(millisecond)
|
||||||
},
|
},
|
||||||
FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)),
|
FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)),
|
||||||
try emqx_rule_runtime:apply_rule(Rule, FullContext) of
|
try emqx_rule_runtime:apply_rule(Rule, FullContext, #{}) of
|
||||||
{ok, Data} -> {ok, flatten(Data)};
|
{ok, Data} -> {ok, flatten(Data)};
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
after
|
after
|
||||||
|
|
|
@ -243,7 +243,7 @@ t_add_get_remove_rule(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_add_get_remove_rules(_Config) ->
|
t_add_get_remove_rules(_Config) ->
|
||||||
delete_rules_by_ids(emqx_rule_engine:get_rules()),
|
delete_rules_by_ids([Id || #{id := Id} <- emqx_rule_engine:get_rules()]),
|
||||||
ok = insert_rules(
|
ok = insert_rules(
|
||||||
[
|
[
|
||||||
make_simple_rule(<<"rule-debug-1">>),
|
make_simple_rule(<<"rule-debug-1">>),
|
||||||
|
@ -2386,7 +2386,6 @@ verify_event_fields('message.publish', Fields) ->
|
||||||
topic := Topic,
|
topic := Topic,
|
||||||
qos := QoS,
|
qos := QoS,
|
||||||
flags := Flags,
|
flags := Flags,
|
||||||
headers := Headers,
|
|
||||||
pub_props := Properties,
|
pub_props := Properties,
|
||||||
timestamp := Timestamp,
|
timestamp := Timestamp,
|
||||||
publish_received_at := EventAt
|
publish_received_at := EventAt
|
||||||
|
@ -2402,7 +2401,6 @@ verify_event_fields('message.publish', Fields) ->
|
||||||
?assertEqual(<<"t1">>, Topic),
|
?assertEqual(<<"t1">>, Topic),
|
||||||
?assertEqual(1, QoS),
|
?assertEqual(1, QoS),
|
||||||
?assert(is_map(Flags)),
|
?assert(is_map(Flags)),
|
||||||
?assert(is_map(Headers)),
|
|
||||||
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
||||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||||
|
|
|
@ -23,8 +23,6 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
-import(emqx_rule_events, [eventmsg_publish/1]).
|
|
||||||
|
|
||||||
-define(PROPTEST(F), ?assert(proper:quickcheck(F()))).
|
-define(PROPTEST(F), ?assert(proper:quickcheck(F()))).
|
||||||
%%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))).
|
%%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))).
|
||||||
|
|
||||||
|
@ -36,6 +34,11 @@ init_per_suite(Config) ->
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
eventmsg_publish(Msg) ->
|
||||||
|
{Columns, _} = emqx_rule_events:eventmsg_publish(Msg),
|
||||||
|
Columns.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Test cases for IoT Funcs
|
%% Test cases for IoT Funcs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue