diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index f02836b22..591bfa8c3 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -213,16 +213,15 @@ maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopi format_msg_received(#{dup := Dup, payload := Payload, properties := Props, qos := QoS, retain := Retain, topic := Topic}) -> - #{event => '$bridges/mqtt', - id => emqx_guid:to_hexstr(emqx_guid:gen()), - payload => Payload, - topic => Topic, - qos => QoS, - dup => Dup, - retain => Retain, - pub_props => printable_maps(Props), - timestamp => erlang:system_time(millisecond) - }. + #{ id => emqx_guid:to_hexstr(emqx_guid:gen()) + , payload => Payload + , topic => Topic + , qos => QoS + , dup => Dup + , retain => Retain + , pub_props => printable_maps(Props) + , message_received_at => erlang:system_time(millisecond) + }. printable_maps(undefined) -> #{}; printable_maps(Headers) -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 7d2b55275..6490e21fe 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -537,7 +537,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> } = jsx:decode(Rule1), %% we also check if the outputs of the rule is triggered ?assertMatch(#{inspect := #{ - event := '$bridges/mqtt', + event := <<"$bridges/mqtt", _/binary>>, id := MsgId, payload := Payload, topic := RemoteTopic, diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index a0a33470b..1b45ddb0f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -72,12 +72,9 @@ reload() -> ok = emqx_rule_engine:load_hooks_for_rule(Rule) end, emqx_rule_engine:get_rules()). -load(<<"$bridges/", _BridgeId/binary>> = BridgeTopic) -> - emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received, - [#{bridge_topic => BridgeTopic}]}); load(Topic) -> HookPoint = event_name(Topic), - emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [[]]}). + emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}). unload() -> lists:foreach(fun(HookPoint) -> @@ -91,12 +88,6 @@ unload(Topic) -> %%-------------------------------------------------------------------- %% Callbacks %%-------------------------------------------------------------------- -on_bridge_message_received(Message, #{bridge_topic := BridgeTopic}) -> - case emqx_rule_engine:get_rules_for_topic(BridgeTopic) of - [] -> ok; - Rules -> emqx_rule_runtime:apply_rules(Rules, Message) - end. - on_message_publish(Message = #message{topic = Topic}, _Env) -> case ignore_sys_message(Message) of true -> ok; @@ -108,6 +99,9 @@ on_message_publish(Message = #message{topic = Topic}, _Env) -> end, {ok, Message}. +on_bridge_message_received(Message, Env = #{event_topic := BridgeTopic}) -> + apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message) end, Env). + on_client_connected(ClientInfo, ConnInfo, Env) -> apply_event('client.connected', fun() -> eventmsg_connected(ClientInfo, ConnInfo) end, Env). @@ -364,7 +358,9 @@ apply_event(EventName, GenEventMsg, _Env) -> EventTopic = event_topic(EventName), case emqx_rule_engine:get_rules_for_topic(EventTopic) of [] -> ok; - Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg()) + Rules -> + %% delay the generating of eventmsg after we have found some rules to apply + emqx_rule_runtime:apply_rules(Rules, GenEventMsg()) end. %%-------------------------------------------------------------------- @@ -383,6 +379,7 @@ event_info() -> , event_info_session_subscribed() , event_info_session_unsubscribed() , event_info_delivery_dropped() + , event_info_bridge_mqtt() ]. event_info_message_publish() -> @@ -449,6 +446,13 @@ event_info_session_unsubscribed() -> {<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>}, <<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">> ). +event_info_bridge_mqtt()-> + event_info_common( + <<"$bridges/mqtt:*">>, + {<<"MQTT bridge message">>, <<"MQTT 桥接消息"/utf8>>}, + {<<"received a message from MQTT bridge">>, <<"收到来自 MQTT 桥接的消息"/utf8>>}, + <<"SELECT * FROM \"$bridges/mqtt:my_mqtt_bridge\" WHERE topic =~ 't/#'">> + ). event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> #{event => event_topic(Event), @@ -460,46 +464,51 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> }. test_columns('message.dropped') -> - [ {<<"reason">>, <<"no_subscribers">>} + [ {<<"reason">>, [<<"no_subscribers">>, <<"the reason of dropping">>]} ] ++ test_columns('message.publish'); test_columns('message.publish') -> - [ {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid of the sender">>]} + , {<<"username">>, [<<"u_emqx">>, <<"the username of the sender">>]} + , {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]} + , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]} + , {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]} ]; test_columns('delivery.dropped') -> - [ {<<"reason">>, <<"queue_full">>} + [ {<<"reason">>, [<<"queue_full">>, <<"the reason of dropping">>]} ] ++ test_columns('message.delivered'); test_columns('message.acked') -> test_columns('message.delivered'); test_columns('message.delivered') -> - [ {<<"from_clientid">>, <<"c_emqx_1">>} - , {<<"from_username">>, <<"u_emqx_1">>} - , {<<"clientid">>, <<"c_emqx_2">>} - , {<<"username">>, <<"u_emqx_2">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + [ {<<"from_clientid">>, [<<"c_emqx_1">>, <<"the clientid of the sender">>]} + , {<<"from_username">>, [<<"u_emqx_1">>, <<"the username of the sender">>]} + , {<<"clientid">>, [<<"c_emqx_2">>, <<"the clientid of the receiver">>]} + , {<<"username">>, [<<"u_emqx_2">>, <<"the username of the receiver">>]} + , {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]} + , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]} + , {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]} ]; test_columns('client.connected') -> - [ {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"peername">>, <<"127.0.0.1:52918">>} + [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]} + , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]} + , {<<"peername">>, [<<"127.0.0.1:52918">>, <<"the IP address and port of the client">>]} ]; test_columns('client.disconnected') -> - [ {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"reason">>, <<"normal">>} + [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]} + , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]} + , {<<"reason">>, [<<"normal">>, <<"the reason for shutdown">>]} ]; test_columns('session.unsubscribed') -> test_columns('session.subscribed'); test_columns('session.subscribed') -> - [ {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} + [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]} + , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]} + , {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]} + , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]} + ]; +test_columns(<<"$bridges/mqtt", _/binary>>) -> + [ {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]} + , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]} + , {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]} ]. columns_with_exam('message.publish') -> @@ -514,12 +523,15 @@ columns_with_exam('message.publish') -> , {<<"flags">>, #{}} , {<<"headers">>, undefined} , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , columns_example_props(pub_props) , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]; columns_with_exam('message.delivered') -> columns_message_ack_delivered('message.delivered'); columns_with_exam('message.acked') -> + [ columns_example_props(puback_props) + ] ++ columns_message_ack_delivered('message.acked'); columns_with_exam('message.dropped') -> [ {<<"event">>, 'message.dropped'} @@ -533,6 +545,7 @@ columns_with_exam('message.dropped') -> , {<<"qos">>, 1} , {<<"flags">>, #{}} , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , columns_example_props(pub_props) , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]; @@ -549,6 +562,7 @@ columns_with_exam('delivery.dropped') -> , {<<"topic">>, <<"t/a">>} , {<<"qos">>, 1} , {<<"flags">>, #{}} + , columns_example_props(pub_props) , {<<"publish_received_at">>, erlang:system_time(millisecond)} , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} @@ -566,6 +580,7 @@ columns_with_exam('client.connected') -> , {<<"clean_start">>, true} , {<<"expiry_interval">>, 3600} , {<<"is_bridge">>, false} + , columns_example_props(conn_props) , {<<"connected_at">>, erlang:system_time(millisecond)} , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} @@ -577,14 +592,33 @@ columns_with_exam('client.disconnected') -> , {<<"username">>, <<"u_emqx">>} , {<<"peername">>, <<"192.168.0.10:56431">>} , {<<"sockname">>, <<"0.0.0.0:1883">>} + , columns_example_props(disconn_props) , {<<"disconnected_at">>, erlang:system_time(millisecond)} , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]; columns_with_exam('session.subscribed') -> - columns_message_sub_unsub('session.subscribed'); + [ columns_example_props(sub_props) + ] ++ columns_message_sub_unsub('session.subscribed'); columns_with_exam('session.unsubscribed') -> - columns_message_sub_unsub('session.unsubscribed'). + [ columns_example_props(unsub_props) + ] ++ columns_message_sub_unsub('session.unsubscribed'); +columns_with_exam(<<"$bridges/mqtt", _/binary>> = EventName) -> + [ {<<"event">>, EventName} + , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"dup">>, false} + , {<<"retain">>, false} + , columns_example_props(pub_props) + %% the time that we receiced the message from remote broker + , {<<"message_received_at">>, erlang:system_time(millisecond)} + %% the time that the rule is triggered + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]. columns_message_sub_unsub(EventName) -> [ {<<"event">>, EventName} @@ -610,14 +644,48 @@ columns_message_ack_delivered(EventName) -> , {<<"qos">>, 1} , {<<"flags">>, #{}} , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , columns_example_props(pub_props) , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]. +columns_example_props(PropType) -> + Props = columns_example_props_specific(PropType), + UserProps = #{ + 'User-Property' => #{<<"foo">> => <<"bar">>}, + 'User-Property-Pairs' => [ + #{key => <<"foo">>}, #{value => <<"bar">>} + ] + }, + {PropType, maps:merge(Props, UserProps)}. + +columns_example_props_specific(pub_props) -> + #{ 'Payload-Format-Indicator' => 0 + , 'Message-Expiry-Interval' => 30 + }; +columns_example_props_specific(puback_props) -> + #{ 'Reason-String' => <<"OK">> + }; +columns_example_props_specific(conn_props) -> + #{ 'Session-Expiry-Interval' => 7200 + , 'Receive-Maximum' => 32 + }; +columns_example_props_specific(disconn_props) -> + #{ 'Session-Expiry-Interval' => 7200 + , 'Reason-String' => <<"Redirect to another server">> + , 'Server Reference' => <<"192.168.22.129">> + }; +columns_example_props_specific(sub_props) -> + #{}; +columns_example_props_specific(unsub_props) -> + #{}. + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- +hook_fun(<<"$bridges/", _/binary>>) -> + on_bridge_message_received; hook_fun(Event) -> case string:split(atom_to_list(Event), ".") of [Prefix, Name] -> @@ -646,6 +714,7 @@ event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered'; event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked'; event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'; event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped'; +event_name(<<"$bridges/", _/binary>> = Topic) -> Topic; event_name(_) -> 'message.publish'. event_topic('client.connected') -> <<"$events/client_connected">>; @@ -656,7 +725,8 @@ event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.dropped') -> <<"$events/message_dropped">>; event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; -event_topic('message.publish') -> <<"$events/message_publish">>. +event_topic('message.publish') -> <<"$events/message_publish">>; +event_topic(<<"$bridges/", _/binary>> = Topic) -> Topic. printable_maps(undefined) -> #{}; printable_maps(Headers) -> @@ -665,6 +735,10 @@ printable_maps(Headers) -> AccIn#{K => ntoa(V0)}; ('User-Property', V0, AccIn) when is_list(V0) -> AccIn#{ + %% The 'User-Property' field is for the convenience of querying properties + %% using the '.' syntax, e.g. "SELECT 'User-Property'.foo as foo" + %% However, this does not allow duplicate property keys. To allow + %% duplicate keys, we have to use the 'User-Property-Pairs' field instead. 'User-Property' => maps:from_list(V0), 'User-Property-Pairs' => [#{ key => Key, diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 4aa517fef..4f036cf49 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -69,6 +69,7 @@ get_selected_data(Selected, _Envs, _Args) -> Selected. is_publish_topic(<<"$events/", _/binary>>) -> false; +is_publish_topic(<<"$bridges/", _/binary>>) -> false; is_publish_topic(_Topic) -> true. flatten([]) -> []; @@ -83,21 +84,8 @@ echo_action(Data, Envs) -> fill_default_values(Event, Context) -> maps:merge(envs_examp(Event), Context). -envs_examp(<<"$events/", _/binary>> = EVENT_TOPIC) -> - EventName = emqx_rule_events:event_name(EVENT_TOPIC), +envs_examp(EventTopic) -> + EventName = emqx_rule_events:event_name(EventTopic), emqx_rule_maps:atom_key_map( maps:from_list( - emqx_rule_events:columns_with_exam(EventName))); -envs_examp(_) -> - #{id => emqx_guid:to_hexstr(emqx_guid:gen()), - clientid => <<"c_emqx">>, - username => <<"u_emqx">>, - payload => <<"{\"id\": 1, \"name\": \"ha\"}">>, - peerhost => <<"127.0.0.1">>, - topic => <<"t/a">>, - qos => 1, - flags => #{sys => true, event => true}, - publish_received_at => emqx_plugin_libs_rule:now_ms(), - timestamp => emqx_plugin_libs_rule:now_ms(), - node => node() - }. + emqx_rule_events:columns_with_exam(EventName))).