diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 492caabc8..c21412970 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -526,12 +526,15 @@ columns_with_exam('message.publish') -> , {<<"flags">>, #{}} , {<<"headers">>, undefined} , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , columns_example_props(pub_props) , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]; columns_with_exam('message.delivered') -> columns_message_ack_delivered('message.delivered'); columns_with_exam('message.acked') -> + [ columns_example_props(puback_props) + ] ++ columns_message_ack_delivered('message.acked'); columns_with_exam('message.dropped') -> [ {<<"event">>, 'message.dropped'} @@ -545,6 +548,7 @@ columns_with_exam('message.dropped') -> , {<<"qos">>, 1} , {<<"flags">>, #{}} , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , columns_example_props(pub_props) , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]; @@ -561,6 +565,7 @@ columns_with_exam('delivery.dropped') -> , {<<"topic">>, <<"t/a">>} , {<<"qos">>, 1} , {<<"flags">>, #{}} + , columns_example_props(pub_props) , {<<"publish_received_at">>, erlang:system_time(millisecond)} , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} @@ -578,6 +583,7 @@ columns_with_exam('client.connected') -> , {<<"clean_start">>, true} , {<<"expiry_interval">>, 3600} , {<<"is_bridge">>, false} + , columns_example_props(conn_props) , {<<"connected_at">>, erlang:system_time(millisecond)} , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} @@ -589,14 +595,17 @@ columns_with_exam('client.disconnected') -> , {<<"username">>, <<"u_emqx">>} , {<<"peername">>, <<"192.168.0.10:56431">>} , {<<"sockname">>, <<"0.0.0.0:1883">>} + , columns_example_props(disconn_props) , {<<"disconnected_at">>, erlang:system_time(millisecond)} , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]; columns_with_exam('session.subscribed') -> - columns_message_sub_unsub('session.subscribed'); + [ columns_example_props(sub_props) + ] ++ columns_message_sub_unsub('session.subscribed'); columns_with_exam('session.unsubscribed') -> - columns_message_sub_unsub('session.unsubscribed'); + [ columns_example_props(unsub_props) + ] ++ columns_message_sub_unsub('session.unsubscribed'); columns_with_exam(<<"$bridges/mqtt">>) -> [ {<<"event">>, <<"$bridges/mqtt">>} , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} @@ -606,6 +615,7 @@ columns_with_exam(<<"$bridges/mqtt">>) -> , {<<"qos">>, 1} , {<<"dup">>, false} , {<<"retain">>, false} + , columns_example_props(pub_props) %% the time that we receiced the message from remote broker , {<<"message_received_at">>, erlang:system_time(millisecond)} %% the time that the rule is triggered @@ -637,10 +647,42 @@ columns_message_ack_delivered(EventName) -> , {<<"qos">>, 1} , {<<"flags">>, #{}} , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , columns_example_props(pub_props) , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]. +columns_example_props(PropType) -> + Props = columns_example_props_specific(PropType), + UserProps = #{ + 'User-Property' => #{<<"foo">> => <<"bar">>}, + 'User-Property-Pairs' => [ + #{key => <<"foo">>}, #{value => <<"bar">>} + ] + }, + {PropType, maps:merge(Props, UserProps)}. + +columns_example_props_specific(pub_props) -> + #{ 'Payload-Format-Indicator' => 0 + , 'Message-Expiry-Interval' => 30 + }; +columns_example_props_specific(puback_props) -> + #{ 'Reason-String' => <<"OK">> + }; +columns_example_props_specific(conn_props) -> + #{ 'Session-Expiry-Interval' => 7200 + , 'Receive-Maximum' => 32 + }; +columns_example_props_specific(disconn_props) -> + #{ 'Session-Expiry-Interval' => 7200 + , 'Reason-String' => <<"Redirect to another server">> + , 'Server Reference' => <<"192.168.22.129">> + }; +columns_example_props_specific(sub_props) -> + #{}; +columns_example_props_specific(unsub_props) -> + #{}. + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- @@ -694,6 +736,10 @@ printable_maps(Headers) -> AccIn#{K => ntoa(V0)}; ('User-Property', V0, AccIn) when is_list(V0) -> AccIn#{ + %% The 'User-Property' field is for the convenience of querying properties + %% using the '.' syntax, e.g. "SELECT 'User-Property'.foo as foo" + %% However, this does not allow duplicate property keys. To allow + %% duplicate keys, we have to use the 'User-Property-Pairs' field instead. 'User-Property' => maps:from_list(V0), 'User-Property-Pairs' => [#{ key => Key,