%%-------------------------------------------------------------------- %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_rule_events). -include("rule_engine.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/emqx_access_control.hrl"). -include_lib("emqx_bridge/include/emqx_bridge_resource.hrl"). -export([ reload/0, load/1, unload/0, unload/1, event_names/0, event_name/1, event_topics_enum/0, event_topic/1, eventmsg_publish/1 ]). -export([ on_client_connected/3, on_client_disconnected/4, on_client_connack/4, on_client_check_authz_complete/6, on_client_check_authn_complete/3, on_session_subscribed/4, on_session_unsubscribed/4, on_message_publish/2, on_message_dropped/4, on_message_transformation_failed/3, on_schema_validation_failed/3, on_message_delivered/3, on_message_acked/3, on_delivery_dropped/4, on_bridge_message_received/2 ]). -export([ event_info/0, columns/1, columns_with_exam/1 ]). -ifdef(TEST). -export([ reason/1, hook_fun/1, printable_maps/1 ]). -endif. -elvis([{elvis_style, dont_repeat_yourself, disable}]). event_names() -> [ 'client.connected', 'client.disconnected', 'client.connack', 'client.check_authz_complete', 'session.subscribed', 'session.unsubscribed', 'message.publish', 'message.delivered', 'message.acked', 'message.dropped', 'message.transformation_failed', 'schema.validation_failed', 'delivery.dropped' ]. %% for documentation purposes event_topics_enum() -> [ '$events/client_connected', '$events/client_disconnected', '$events/client_connack', '$events/client_check_authz_complete', '$events/session_subscribed', '$events/session_unsubscribed', '$events/message_delivered', '$events/message_acked', '$events/message_dropped', '$events/message_transformation_failed', '$events/schema_validation_failed', '$events/delivery_dropped' % '$events/message_publish' % not possible to use in SELECT FROM ]. reload() -> lists:foreach( fun(Rule) -> ok = emqx_rule_engine:load_hooks_for_rule(Rule) end, emqx_rule_engine:get_rules() ). load(Topic) -> HookPoint = event_name(Topic), HookFun = hook_fun_name(HookPoint), emqx_hooks:put( HookPoint, {?MODULE, HookFun, [#{event_topic => Topic}]}, ?HP_RULE_ENGINE ). unload() -> lists:foreach( fun(HookPoint) -> emqx_hooks:del(HookPoint, {?MODULE, hook_fun_name(HookPoint)}) end, event_names() ). unload(Topic) -> HookPoint = event_name(Topic), emqx_hooks:del(HookPoint, {?MODULE, hook_fun_name(HookPoint)}). %%-------------------------------------------------------------------- %% Callbacks %%-------------------------------------------------------------------- on_message_publish(Message = #message{topic = Topic}, _Conf) -> case ignore_sys_message(Message) of true -> ok; false -> case emqx_rule_engine:get_rules_for_topic(Topic) of [] -> ok; Rules -> %% ENVs are the fields that can't be refereced by the SQL, but can be used %% from actions. e.g. The 'headers' field in the internal record `#message{}`. {Columns, Envs} = eventmsg_publish(Message), emqx_rule_runtime:apply_rules(Rules, Columns, Envs) end end, {ok, Message}. on_bridge_message_received(Message, Conf = #{event_topic := BridgeTopic}) -> apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message, #{}) end, Conf). on_client_connected(ClientInfo, ConnInfo, Conf) -> apply_event( 'client.connected', fun() -> eventmsg_connected(ClientInfo, ConnInfo) end, Conf ). on_client_connack(ConnInfo, Reason, _, Conf) -> apply_event( 'client.connack', fun() -> eventmsg_connack(ConnInfo, Reason) end, Conf ). %% TODO: support full action in major release on_client_check_authz_complete( ClientInfo, ?authz_action(PubSub), Topic, Result, AuthzSource, Conf ) -> apply_event( 'client.check_authz_complete', fun() -> eventmsg_check_authz_complete( ClientInfo, PubSub, Topic, Result, AuthzSource ) end, Conf ). on_client_check_authn_complete(ClientInfo, Result, Conf) -> apply_event( 'client.check_authn_complete', fun() -> eventmsg_check_authn_complete( ClientInfo, Result ) end, Conf ). on_client_disconnected(ClientInfo, Reason, ConnInfo, Conf) -> apply_event( 'client.disconnected', fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end, Conf ). on_session_subscribed(ClientInfo, Topic, SubOpts, Conf) -> apply_event( 'session.subscribed', fun() -> eventmsg_sub_or_unsub( 'session.subscribed', ClientInfo, emqx_topic:maybe_format_share(Topic), SubOpts ) end, Conf ). on_session_unsubscribed(ClientInfo, Topic, SubOpts, Conf) -> apply_event( 'session.unsubscribed', fun() -> eventmsg_sub_or_unsub( 'session.unsubscribed', ClientInfo, emqx_topic:maybe_format_share(Topic), SubOpts ) end, Conf ). on_message_dropped(Message, _, Reason, Conf) -> case ignore_sys_message(Message) of true -> ok; false -> apply_event( 'message.dropped', fun() -> eventmsg_dropped(Message, Reason) end, Conf ) end, {ok, Message}. on_message_transformation_failed(Message, TransformationContext, Conf) -> case ignore_sys_message(Message) of true -> ok; false -> apply_event( 'message.transformation_failed', fun() -> eventmsg_transformation_failed(Message, TransformationContext) end, Conf ) end, {ok, Message}. on_schema_validation_failed(Message, ValidationContext, Conf) -> case ignore_sys_message(Message) of true -> ok; false -> apply_event( 'schema.validation_failed', fun() -> eventmsg_validation_failed(Message, ValidationContext) end, Conf ) end, {ok, Message}. on_message_delivered(ClientInfo, Message, Conf) -> case ignore_sys_message(Message) of true -> ok; false -> apply_event( 'message.delivered', fun() -> eventmsg_delivered(ClientInfo, Message) end, Conf ) end, {ok, Message}. on_message_acked(ClientInfo, Message, Conf) -> case ignore_sys_message(Message) of true -> ok; false -> apply_event( 'message.acked', fun() -> eventmsg_acked(ClientInfo, Message) end, Conf ) end, {ok, Message}. on_delivery_dropped(ClientInfo, Message, Reason, Conf) -> case ignore_sys_message(Message) of true -> ok; false -> apply_event( 'delivery.dropped', fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end, Conf ) end, {ok, Message}. %%-------------------------------------------------------------------- %% Event Messages %%-------------------------------------------------------------------- eventmsg_publish( Message = #message{ id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp } ) -> with_basic_columns( 'message.publish', #{ id => emqx_guid:to_hexstr(Id), clientid => ClientId, username => emqx_message:get_header(username, Message, undefined), payload => Payload, peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)), topic => Topic, qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), publish_received_at => Timestamp }, #{headers => Headers} ). eventmsg_connected( _ClientInfo = #{ clientid := ClientId, username := Username, is_bridge := IsBridge, mountpoint := Mountpoint }, ConnInfo = #{ peername := PeerName, sockname := SockName, clean_start := CleanStart, proto_name := ProtoName, proto_ver := ProtoVer, connected_at := ConnectedAt } ) -> Keepalive = maps:get(keepalive, ConnInfo, 0), ConnProps = maps:get(conn_props, ConnInfo, #{}), RcvMax = maps:get(receive_maximum, ConnInfo, 0), ExpiryInterval = maps:get(expiry_interval, ConnInfo, 0), with_basic_columns( 'client.connected', #{ clientid => ClientId, username => Username, mountpoint => Mountpoint, peername => ntoa(PeerName), sockname => ntoa(SockName), proto_name => ProtoName, proto_ver => ProtoVer, keepalive => Keepalive, clean_start => CleanStart, receive_maximum => RcvMax, expiry_interval => ExpiryInterval div 1000, is_bridge => IsBridge, conn_props => printable_maps(ConnProps), connected_at => ConnectedAt }, #{} ). eventmsg_disconnected( _ClientInfo = #{ clientid := ClientId, username := Username }, ConnInfo = #{ peername := PeerName, sockname := SockName, proto_name := ProtoName, proto_ver := ProtoVer, disconnected_at := DisconnectedAt }, Reason ) -> with_basic_columns( 'client.disconnected', #{ reason => reason(Reason), clientid => ClientId, username => Username, peername => ntoa(PeerName), sockname => ntoa(SockName), proto_name => ProtoName, proto_ver => ProtoVer, disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})), disconnected_at => DisconnectedAt }, #{} ). eventmsg_connack( ConnInfo = #{ clientid := ClientId, clean_start := CleanStart, username := Username, peername := PeerName, sockname := SockName, proto_name := ProtoName, proto_ver := ProtoVer }, Reason ) -> Keepalive = maps:get(keepalive, ConnInfo, 0), ConnProps = maps:get(conn_props, ConnInfo, #{}), ExpiryInterval = maps:get(expiry_interval, ConnInfo, 0), with_basic_columns( 'client.connack', #{ reason_code => reason(Reason), clientid => ClientId, clean_start => CleanStart, username => Username, peername => ntoa(PeerName), sockname => ntoa(SockName), proto_name => ProtoName, proto_ver => ProtoVer, keepalive => Keepalive, expiry_interval => ExpiryInterval, conn_props => printable_maps(ConnProps) }, #{} ). eventmsg_check_authz_complete( _ClientInfo = #{ clientid := ClientId, username := Username, peerhost := PeerHost }, PubSub, Topic, Result, AuthzSource ) -> with_basic_columns( 'client.check_authz_complete', #{ clientid => ClientId, username => Username, peerhost => ntoa(PeerHost), topic => Topic, action => PubSub, authz_source => AuthzSource, result => Result }, #{} ). eventmsg_check_authn_complete( _ClientInfo = #{ clientid := ClientId, username := Username, peerhost := PeerHost, peerport := PeerPort }, Result ) -> #{ reason_code := Reason, is_superuser := IsSuperuser, is_anonymous := IsAnonymous } = maps:merge( #{is_anonymous => false, is_superuser => false}, Result ), with_basic_columns( 'client.check_authn_complete', #{ clientid => ClientId, username => Username, peername => ntoa({PeerHost, PeerPort}), reason_code => force_to_bin(Reason), is_anonymous => IsAnonymous, is_superuser => IsSuperuser }, #{} ). eventmsg_sub_or_unsub( Event, _ClientInfo = #{ clientid := ClientId, username := Username, peerhost := PeerHost }, Topic, SubOpts = #{qos := QoS} ) -> PropKey = sub_unsub_prop_key(Event), with_basic_columns( Event, #{ clientid => ClientId, username => Username, peerhost => ntoa(PeerHost), PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})), topic => Topic, qos => QoS }, #{} ). eventmsg_dropped( Message = #message{ id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp }, Reason ) -> with_basic_columns( 'message.dropped', #{ id => emqx_guid:to_hexstr(Id), reason => Reason, clientid => ClientId, username => emqx_message:get_header(username, Message, undefined), payload => Payload, peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)), topic => Topic, qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), publish_received_at => Timestamp }, #{headers => Headers} ). eventmsg_transformation_failed( Message = #message{ id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp }, TransformationContext ) -> #{name := TransformationName} = TransformationContext, with_basic_columns( 'message.transformation_failed', #{ id => emqx_guid:to_hexstr(Id), transformation => TransformationName, clientid => ClientId, username => emqx_message:get_header(username, Message, undefined), payload => Payload, peername => ntoa(emqx_message:get_header(peername, Message, undefined)), topic => Topic, qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), publish_received_at => Timestamp }, #{headers => Headers} ). eventmsg_validation_failed( Message = #message{ id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp }, ValidationContext ) -> #{name := ValidationName} = ValidationContext, with_basic_columns( 'schema.validation_failed', #{ id => emqx_guid:to_hexstr(Id), validation => ValidationName, clientid => ClientId, username => emqx_message:get_header(username, Message, undefined), payload => Payload, peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)), topic => Topic, qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), publish_received_at => Timestamp }, #{headers => Headers} ). eventmsg_delivered( _ClientInfo = #{ peerhost := PeerHost, clientid := ReceiverCId, username := ReceiverUsername }, Message = #message{ id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp } ) -> with_basic_columns( 'message.delivered', #{ id => emqx_guid:to_hexstr(Id), from_clientid => ClientId, from_username => emqx_message:get_header(username, Message, undefined), clientid => ReceiverCId, username => ReceiverUsername, payload => Payload, peerhost => ntoa(PeerHost), topic => Topic, qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), publish_received_at => Timestamp }, #{headers => Headers} ). eventmsg_acked( _ClientInfo = #{ peerhost := PeerHost, clientid := ReceiverCId, username := ReceiverUsername }, Message = #message{ id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp } ) -> with_basic_columns( 'message.acked', #{ id => emqx_guid:to_hexstr(Id), from_clientid => ClientId, from_username => emqx_message:get_header(username, Message, undefined), clientid => ReceiverCId, username => ReceiverUsername, payload => Payload, peerhost => ntoa(PeerHost), topic => Topic, qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})), publish_received_at => Timestamp }, #{headers => Headers} ). eventmsg_delivery_dropped( _ClientInfo = #{ peerhost := PeerHost, clientid := ReceiverCId, username := ReceiverUsername }, Message = #message{ id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp }, Reason ) -> with_basic_columns( 'delivery.dropped', #{ id => emqx_guid:to_hexstr(Id), reason => Reason, from_clientid => ClientId, from_username => emqx_message:get_header(username, Message, undefined), clientid => ReceiverCId, username => ReceiverUsername, payload => Payload, peerhost => ntoa(PeerHost), topic => Topic, qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), publish_received_at => Timestamp }, #{headers => Headers} ). sub_unsub_prop_key('session.subscribed') -> sub_props; sub_unsub_prop_key('session.unsubscribed') -> unsub_props. with_basic_columns(EventName, Columns, Envs) when is_map(Columns) -> { Columns#{ event => EventName, timestamp => erlang:system_time(millisecond), node => node() }, Envs }. %%-------------------------------------------------------------------- %% rules applying %%-------------------------------------------------------------------- apply_event(EventName, GenEventMsg, _Conf) -> EventTopic = event_topic(EventName), case emqx_rule_engine:get_rules_for_topic(EventTopic) of [] -> ok; Rules -> %% delay the generating of eventmsg after we have found some rules to apply {Columns, Envs} = GenEventMsg(), emqx_rule_runtime:apply_rules(Rules, Columns, Envs) end. %%-------------------------------------------------------------------- %% Columns %%-------------------------------------------------------------------- columns(Event) -> [Key || {Key, _ExampleVal} <- columns_with_exam(Event)]. event_info() -> [ event_info_message_publish(), event_info_message_deliver(), event_info_message_acked(), event_info_message_dropped(), event_info_client_connected(), event_info_client_disconnected(), event_info_client_connack(), event_info_client_check_authz_complete(), event_info_client_check_authn_complete(), event_info_session_subscribed(), event_info_session_unsubscribed(), event_info_delivery_dropped(), event_info_bridge_mqtt() ] ++ ee_event_info(). -if(?EMQX_RELEASE_EDITION == ee). %% ELSE (?EMQX_RELEASE_EDITION == ee). event_info_schema_validation_failed() -> event_info_common( 'schema.validation_failed', {<<"schema validation failed">>, <<"schema 验证失败"/utf8>>}, {<<"messages that do not pass configured validations">>, <<"未通过验证的消息"/utf8>>}, <<"SELECT * FROM \"$events/schema_validation_failed\" WHERE topic =~ 't/#'">> ). event_info_message_transformation_failed() -> event_info_common( 'message.transformation_failed', {<<"message transformation failed">>, <<"message 验证失败"/utf8>>}, {<<"messages that do not pass configured transformation">>, <<"未通过验证的消息"/utf8>>}, <<"SELECT * FROM \"$events/message_transformation_failed\" WHERE topic =~ 't/#'">> ). ee_event_info() -> [ event_info_schema_validation_failed(), event_info_message_transformation_failed() ]. -else. %% END (?EMQX_RELEASE_EDITION == ee). ee_event_info() -> []. -endif. event_info_message_publish() -> event_info_common( 'message.publish', {<<"message publish">>, <<"消息发布"/utf8>>}, {<<"message publish">>, <<"消息发布"/utf8>>}, <<"SELECT payload.msg as msg FROM \"t/#\" WHERE msg = 'hello'">> ). event_info_message_deliver() -> event_info_common( 'message.delivered', {<<"message delivered">>, <<"消息已投递"/utf8>>}, {<<"message delivered">>, <<"消息已投递"/utf8>>}, <<"SELECT * FROM \"$events/message_delivered\" WHERE topic =~ 't/#'">> ). event_info_message_acked() -> event_info_common( 'message.acked', {<<"message acked">>, <<"消息应答"/utf8>>}, {<<"message acked">>, <<"消息应答"/utf8>>}, <<"SELECT * FROM \"$events/message_acked\" WHERE topic =~ 't/#'">> ). event_info_message_dropped() -> event_info_common( 'message.dropped', {<<"message routing-drop">>, <<"消息转发丢弃"/utf8>>}, {<<"messages are discarded during routing, usually because there are no subscribers">>, <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>}, <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">> ). event_info_delivery_dropped() -> event_info_common( 'delivery.dropped', {<<"message delivery-drop">>, <<"消息投递丢弃"/utf8>>}, {<<"messages are discarded during delivery, i.e. because the message queue is full">>, <<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>}, <<"SELECT * FROM \"$events/delivery_dropped\" WHERE topic =~ 't/#'">> ). event_info_client_connected() -> event_info_common( 'client.connected', {<<"client connected">>, <<"连接建立"/utf8>>}, {<<"client connected">>, <<"连接建立"/utf8>>}, <<"SELECT * FROM \"$events/client_connected\"">> ). event_info_client_disconnected() -> event_info_common( 'client.disconnected', {<<"client disconnected">>, <<"连接断开"/utf8>>}, {<<"client disconnected">>, <<"连接断开"/utf8>>}, <<"SELECT * FROM \"$events/client_disconnected\" WHERE topic =~ 't/#'">> ). event_info_client_connack() -> event_info_common( 'client.connack', {<<"client connack">>, <<"连接确认"/utf8>>}, {<<"client connack">>, <<"连接确认"/utf8>>}, <<"SELECT * FROM \"$events/client_connack\"">> ). event_info_client_check_authz_complete() -> event_info_common( 'client.check_authz_complete', {<<"client check authz complete">>, <<"授权结果"/utf8>>}, {<<"client check authz complete">>, <<"授权结果"/utf8>>}, <<"SELECT * FROM \"$events/client_check_authz_complete\"">> ). event_info_client_check_authn_complete() -> event_info_common( 'client.check_authn_complete', {<<"client check authn complete">>, <<"认证结果"/utf8>>}, {<<"client check authn complete">>, <<"认证结果"/utf8>>}, <<"SELECT * FROM \"$events/client_check_authn_complete\"">> ). event_info_session_subscribed() -> event_info_common( 'session.subscribed', {<<"session subscribed">>, <<"会话订阅完成"/utf8>>}, {<<"session subscribed">>, <<"会话订阅完成"/utf8>>}, <<"SELECT * FROM \"$events/session_subscribed\" WHERE topic =~ 't/#'">> ). event_info_session_unsubscribed() -> event_info_common( 'session.unsubscribed', {<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>}, {<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>}, <<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">> ). event_info_bridge_mqtt() -> event_info_common( <<"$bridges/mqtt:*">>, {<<"MQTT bridge message">>, <<"MQTT 桥接消息"/utf8>>}, {<<"received a message from MQTT bridge">>, <<"收到来自 MQTT 桥接的消息"/utf8>>}, <<"SELECT * FROM \"$bridges/mqtt:my_mqtt_bridge\" WHERE topic =~ 't/#'">> ). event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> #{ event => event_topic(Event), title => #{en => TitleEN, zh => TitleZH}, description => #{en => DescrEN, zh => DescrZH}, test_columns => test_columns(Event), columns => columns(Event), sql_example => SqlExam }. test_columns('message.dropped') -> [{<<"reason">>, [<<"no_subscribers">>, <<"the reason of dropping">>]}] ++ test_columns('message.publish'); test_columns('message.publish') -> [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid of the sender">>]}, {<<"username">>, [<<"u_emqx">>, <<"the username of the sender">>]}, {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]}, {<<"qos">>, [1, <<"the QoS of the MQTT message">>]}, {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]} ]; test_columns('delivery.dropped') -> [{<<"reason">>, [<<"queue_full">>, <<"the reason of dropping">>]}] ++ test_columns('message.delivered'); test_columns('message.acked') -> test_columns('message.delivered'); test_columns('message.delivered') -> [ {<<"from_clientid">>, [<<"c_emqx_1">>, <<"the clientid of the sender">>]}, {<<"from_username">>, [<<"u_emqx_1">>, <<"the username of the sender">>]}, {<<"clientid">>, [<<"c_emqx_2">>, <<"the clientid of the receiver">>]}, {<<"username">>, [<<"u_emqx_2">>, <<"the username of the receiver">>]}, {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]}, {<<"qos">>, [1, <<"the QoS of the MQTT message">>]}, {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]} ]; test_columns('client.connected') -> [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}, {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}, {<<"peername">>, [<<"127.0.0.1:52918">>, <<"the IP address and port of the client">>]} ]; test_columns('client.disconnected') -> [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}, {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}, {<<"reason">>, [<<"normal">>, <<"the reason for shutdown">>]} ]; test_columns('client.connack') -> [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}, {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}, {<<"reason_code">>, [<<"success">>, <<"the reason code">>]} ]; test_columns('client.check_authz_complete') -> [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}, {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}, {<<"topic">>, [<<"t/1">>, <<"the topic of the MQTT message">>]}, {<<"action">>, [<<"publish">>, <<"the action of publish or subscribe">>]}, {<<"result">>, [<<"allow">>, <<"the authz check complete result">>]} ]; test_columns('client.check_authn_complete') -> [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}, {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}, {<<"reason_code">>, [<<"success">>, <<"the reason code">>]}, {<<"is_superuser">>, [true, <<"Whether this is a superuser">>]}, {<<"is_anonymous">>, [false, <<"Whether this is a superuser">>]} ]; test_columns('session.unsubscribed') -> test_columns('session.subscribed'); test_columns('session.subscribed') -> [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}, {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}, {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]}, {<<"qos">>, [1, <<"the QoS of the MQTT message">>]} ]; test_columns(<<"$bridges/mqtt", _/binary>>) -> [ {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]}, {<<"qos">>, [1, <<"the QoS of the MQTT message">>]}, {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]} ]; test_columns(Event) -> ee_test_columns(Event). -if(?EMQX_RELEASE_EDITION == ee). ee_test_columns('schema.validation_failed') -> [{<<"validation">>, <<"myvalidation">>}] ++ test_columns('message.publish'); ee_test_columns('message.transformation_failed') -> [{<<"transformation">>, <<"mytransformation">>}] ++ test_columns('message.publish'). %% ELSE (?EMQX_RELEASE_EDITION == ee). -else. -spec ee_test_columns(_) -> no_return(). ee_test_columns(Event) -> error({unknown_event, Event}). %% END (?EMQX_RELEASE_EDITION == ee). -endif. columns_with_exam('message.publish') -> [ {<<"event">>, 'message.publish'}, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}, {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, {<<"peerhost">>, <<"192.168.0.10">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"flags">>, #{}}, {<<"publish_received_at">>, erlang:system_time(millisecond)}, columns_example_props(pub_props), {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; columns_with_exam('message.delivered') -> columns_message_ack_delivered('message.delivered'); columns_with_exam('message.acked') -> [columns_example_props(puback_props)] ++ columns_message_ack_delivered('message.acked'); columns_with_exam('message.dropped') -> [ {<<"event">>, 'message.dropped'}, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}, {<<"reason">>, no_subscribers}, {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, {<<"peerhost">>, <<"192.168.0.10">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"flags">>, #{}}, {<<"publish_received_at">>, erlang:system_time(millisecond)}, columns_example_props(pub_props), {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; columns_with_exam('schema.validation_failed') -> [ {<<"event">>, 'schema.validation_failed'}, {<<"validation">>, <<"my_validation">>}, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}, {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, {<<"peerhost">>, <<"192.168.0.10">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"flags">>, #{}}, {<<"publish_received_at">>, erlang:system_time(millisecond)}, columns_example_props(pub_props), {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; columns_with_exam('message.transformation_failed') -> [ {<<"event">>, 'message.transformation_failed'}, {<<"validation">>, <<"my_transformation">>}, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}, {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, {<<"peername">>, <<"192.168.0.10:56431">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"flags">>, #{}}, {<<"publish_received_at">>, erlang:system_time(millisecond)}, columns_example_props(pub_props), {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; columns_with_exam('delivery.dropped') -> [ {<<"event">>, 'delivery.dropped'}, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}, {<<"reason">>, queue_full}, {<<"from_clientid">>, <<"c_emqx_1">>}, {<<"from_username">>, <<"u_emqx_1">>}, {<<"clientid">>, <<"c_emqx_2">>}, {<<"username">>, <<"u_emqx_2">>}, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, {<<"peerhost">>, <<"192.168.0.10">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"flags">>, #{}}, columns_example_props(pub_props), {<<"publish_received_at">>, erlang:system_time(millisecond)}, {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; columns_with_exam('client.connected') -> [ {<<"event">>, 'client.connected'}, {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, {<<"mountpoint">>, undefined}, {<<"peername">>, <<"192.168.0.10:56431">>}, {<<"sockname">>, <<"0.0.0.0:1883">>}, {<<"proto_name">>, <<"MQTT">>}, {<<"proto_ver">>, 5}, {<<"keepalive">>, 60}, {<<"clean_start">>, true}, {<<"expiry_interval">>, 3600}, {<<"is_bridge">>, false}, columns_example_props(conn_props), {<<"connected_at">>, erlang:system_time(millisecond)}, {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; columns_with_exam('client.disconnected') -> [ {<<"event">>, 'client.disconnected'}, {<<"reason">>, normal}, {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, {<<"peername">>, <<"192.168.0.10:56431">>}, {<<"sockname">>, <<"0.0.0.0:1883">>}, {<<"proto_name">>, <<"MQTT">>}, {<<"proto_ver">>, 5}, columns_example_props(disconn_props), {<<"disconnected_at">>, erlang:system_time(millisecond)}, {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; columns_with_exam('client.connack') -> [ {<<"event">>, 'client.connected'}, {<<"reason_code">>, success}, {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, {<<"peername">>, <<"192.168.0.10:56431">>}, {<<"sockname">>, <<"0.0.0.0:1883">>}, {<<"proto_name">>, <<"MQTT">>}, {<<"proto_ver">>, 5}, {<<"keepalive">>, 60}, {<<"clean_start">>, true}, {<<"expiry_interval">>, 3600}, {<<"connected_at">>, erlang:system_time(millisecond)}, columns_example_props(conn_props), {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; columns_with_exam('client.check_authz_complete') -> [ {<<"event">>, 'client.check_authz_complete'}, {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, {<<"peerhost">>, <<"192.168.0.10">>}, {<<"topic">>, <<"t/a">>}, {<<"action">>, <<"publish">>}, {<<"authz_source">>, <<"cache">>}, {<<"result">>, <<"allow">>}, {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; columns_with_exam('client.check_authn_complete') -> [ {<<"event">>, 'client.check_authz_complete'}, {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, {<<"peername">>, <<"192.168.0.10:56431">>}, {<<"reason_code">>, <<"success">>}, {<<"is_superuser">>, true}, {<<"is_anonymous">>, false}, {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; columns_with_exam('session.subscribed') -> [columns_example_props(sub_props)] ++ columns_message_sub_unsub('session.subscribed'); columns_with_exam('session.unsubscribed') -> [columns_example_props(unsub_props)] ++ columns_message_sub_unsub('session.unsubscribed'); columns_with_exam(<<"$bridges/mqtt", _/binary>> = EventName) -> [ {<<"event">>, EventName}, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, {<<"server">>, <<"192.168.0.10:1883">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"dup">>, false}, {<<"retain">>, false}, columns_example_props(pub_props), %% the time that we receiced the message from remote broker {<<"message_received_at">>, erlang:system_time(millisecond)}, %% the time that the rule is triggered {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]. columns_message_sub_unsub(EventName) -> [ {<<"event">>, EventName}, {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, {<<"peerhost">>, <<"192.168.0.10">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]. columns_message_ack_delivered(EventName) -> [ {<<"event">>, EventName}, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}, {<<"from_clientid">>, <<"c_emqx_1">>}, {<<"from_username">>, <<"u_emqx_1">>}, {<<"clientid">>, <<"c_emqx_2">>}, {<<"username">>, <<"u_emqx_2">>}, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, {<<"peerhost">>, <<"192.168.0.10">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"flags">>, #{}}, {<<"publish_received_at">>, erlang:system_time(millisecond)}, columns_example_props(pub_props), {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]. columns_example_props(PropType) -> Props = columns_example_props_specific(PropType), UserProps = #{ 'User-Property' => #{<<"foo">> => <<"bar">>}, 'User-Property-Pairs' => [ #{key => <<"foo">>}, #{value => <<"bar">>} ] }, {PropType, maps:merge(Props, UserProps)}. columns_example_props_specific(pub_props) -> #{ 'Payload-Format-Indicator' => 0, 'Message-Expiry-Interval' => 30 }; columns_example_props_specific(puback_props) -> #{'Reason-String' => <<"OK">>}; columns_example_props_specific(conn_props) -> #{ 'Session-Expiry-Interval' => 7200, 'Receive-Maximum' => 32 }; columns_example_props_specific(disconn_props) -> #{ 'Session-Expiry-Interval' => 7200, 'Reason-String' => <<"Redirect to another server">>, 'Server Reference' => <<"192.168.22.129">> }; columns_example_props_specific(sub_props) -> #{}; columns_example_props_specific(unsub_props) -> #{}. %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- hook_fun_name(HookPoint) -> HookFun = hook_fun(HookPoint), {name, HookFunName} = erlang:fun_info(HookFun, name), HookFunName. %% return static function references to help static code checks hook_fun(?BRIDGE_HOOKPOINT(_)) -> fun ?MODULE:on_bridge_message_received/2; hook_fun('client.connected') -> fun ?MODULE:on_client_connected/3; hook_fun('client.disconnected') -> fun ?MODULE:on_client_disconnected/4; hook_fun('client.connack') -> fun ?MODULE:on_client_connack/4; hook_fun('client.check_authz_complete') -> fun ?MODULE:on_client_check_authz_complete/6; hook_fun('client.check_authn_complete') -> fun ?MODULE:on_client_check_authn_complete/3; hook_fun('session.subscribed') -> fun ?MODULE:on_session_subscribed/4; hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4; hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3; hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3; hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4; hook_fun('message.transformation_failed') -> fun ?MODULE:on_message_transformation_failed/3; hook_fun('schema.validation_failed') -> fun ?MODULE:on_schema_validation_failed/3; hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4; hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2; hook_fun(Event) -> error({invalid_event, Event}). reason(Reason) when is_atom(Reason) -> Reason; reason({shutdown, Reason}) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; reason(_) -> internal_error. force_to_bin(Bin) when is_binary(Bin) -> Bin; force_to_bin(Term) -> try emqx_utils_conv:bin(Term) catch _:_ -> emqx_utils_conv:bin(lists:flatten(io_lib:format("~p", [Term]))) end. ntoa(undefined) -> undefined; ntoa(IpOrIpPort) -> iolist_to_binary(emqx_utils:ntoa(IpOrIpPort)). event_name(?BRIDGE_HOOKPOINT(_) = Bridge) -> Bridge; event_name(<<"$events/client_connected">>) -> 'client.connected'; event_name(<<"$events/client_disconnected">>) -> 'client.disconnected'; event_name(<<"$events/client_connack">>) -> 'client.connack'; event_name(<<"$events/client_check_authz_complete">>) -> 'client.check_authz_complete'; event_name(<<"$events/client_check_authn_complete">>) -> 'client.check_authn_complete'; event_name(<<"$events/session_subscribed">>) -> 'session.subscribed'; event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed'; event_name(<<"$events/message_delivered">>) -> 'message.delivered'; event_name(<<"$events/message_acked">>) -> 'message.acked'; event_name(<<"$events/message_dropped">>) -> 'message.dropped'; event_name(<<"$events/message_transformation_failed">>) -> 'message.transformation_failed'; event_name(<<"$events/schema_validation_failed">>) -> 'schema.validation_failed'; event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped'; event_name(_) -> 'message.publish'. event_topic(?BRIDGE_HOOKPOINT(_) = Bridge) -> Bridge; event_topic('client.connected') -> <<"$events/client_connected">>; event_topic('client.disconnected') -> <<"$events/client_disconnected">>; event_topic('client.connack') -> <<"$events/client_connack">>; event_topic('client.check_authz_complete') -> <<"$events/client_check_authz_complete">>; event_topic('client.check_authn_complete') -> <<"$events/client_check_authn_complete">>; event_topic('session.subscribed') -> <<"$events/session_subscribed">>; event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.dropped') -> <<"$events/message_dropped">>; event_topic('message.transformation_failed') -> <<"$events/message_transformation_failed">>; event_topic('schema.validation_failed') -> <<"$events/schema_validation_failed">>; event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; event_topic('message.publish') -> <<"$events/message_publish">>. printable_maps(undefined) -> #{}; printable_maps(Headers) -> maps:fold( fun (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname -> AccIn#{K => ntoa(V0)}; ('User-Property', V0, AccIn) when is_list(V0) -> AccIn#{ %% The 'User-Property' field is for the convenience of querying properties %% using the '.' syntax, e.g. "SELECT 'User-Property'.foo as foo" %% However, this does not allow duplicate property keys. To allow %% duplicate keys, we have to use the 'User-Property-Pairs' field instead. 'User-Property' => maps:from_list(V0), 'User-Property-Pairs' => [ #{ key => Key, value => Value } || {Key, Value} <- V0 ] }; (_K, V, AccIn) when is_tuple(V) -> %% internal headers AccIn; (K, V, AccIn) -> AccIn#{K => V} end, #{'User-Property' => #{}}, Headers ). ignore_sys_message(#message{flags = Flags}) -> ConfigRootKey = emqx_rule_engine_schema:namespace(), maps:get(sys, Flags, false) andalso emqx:get_config([ConfigRootKey, ignore_sys_message]).