emqx/apps/emqx_rule_engine/include/rule_events.hrl

286 lines
13 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(COLUMNS(EVENT), [Key || {Key, _ExampleVal} <- ?EG_COLUMNS(EVENT)]).
-define(EG_COLUMNS(EVENT),
case EVENT of
'message.publish' ->
[ {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"headers">>, undefined}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'message.delivered' ->
[ {<<"event">>, 'message.delivered'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"from_clientid">>, <<"c_emqx_1">>}
, {<<"from_username">>, <<"u_emqx_1">>}
, {<<"clientid">>, <<"c_emqx_2">>}
, {<<"username">>, <<"u_emqx_2">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'message.acked' ->
[ {<<"event">>, 'message.acked'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"from_clientid">>, <<"c_emqx_1">>}
, {<<"from_username">>, <<"u_emqx_1">>}
, {<<"clientid">>, <<"c_emqx_2">>}
, {<<"username">>, <<"u_emqx_2">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'message.dropped' ->
[ {<<"event">>, 'message.dropped'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"reason">>, no_subscribers}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'client.connected' ->
[ {<<"event">>, 'client.connected'}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"mountpoint">>, undefined}
, {<<"peername">>, <<"192.168.0.10:56431">>}
, {<<"sockname">>, <<"0.0.0.0:1883">>}
, {<<"proto_name">>, <<"MQTT">>}
, {<<"proto_ver">>, 5}
, {<<"keepalive">>, 60}
, {<<"clean_start">>, true}
, {<<"expiry_interval">>, 3600}
, {<<"is_bridge">>, false}
, {<<"connected_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'client.disconnected' ->
[ {<<"event">>, 'client.disconnected'}
, {<<"reason">>, normal}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peername">>, <<"192.168.0.10:56431">>}
, {<<"sockname">>, <<"0.0.0.0:1883">>}
, {<<"disconnected_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'session.subscribed' ->
[ {<<"event">>, 'session.subscribed'}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'session.unsubscribed' ->
[ {<<"event">>, 'session.unsubscribed'}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
RuleType ->
error({unknown_rule_type, RuleType})
end).
-define(TEST_COLUMNS_MESSGE,
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
]).
-define(TEST_COLUMNS_MESSGE_DELIVERED_ACKED,
[ {<<"from_clientid">>, <<"c_emqx_1">>}
, {<<"from_username">>, <<"u_emqx_1">>}
, {<<"clientid">>, <<"c_emqx_2">>}
, {<<"username">>, <<"u_emqx_2">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
]).
-define(TEST_COLUMNS(EVENT),
case EVENT of
'message.publish' -> ?TEST_COLUMNS_MESSGE;
'message.dropped' -> ?TEST_COLUMNS_MESSGE;
'message.delivered' -> ?TEST_COLUMNS_MESSGE_DELIVERED_ACKED;
'message.acked' -> ?TEST_COLUMNS_MESSGE_DELIVERED_ACKED;
'client.connected' ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peername">>, <<"127.0.0.1:52918">>}
];
'client.disconnected' ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"reason">>, <<"normal">>}
];
'session.subscribed' ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
];
'session.unsubscribed' ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
];
RuleType ->
error({unknown_rule_type, RuleType})
end).
-define(EVENT_INFO_MESSAGE_PUBLISH,
#{ event => '$events/message_publish',
title => #{en => <<"message publish">>, zh => <<"消息发布"/utf8>>},
description => #{en => <<"message publish">>, zh => <<"消息发布"/utf8>>},
test_columns => ?TEST_COLUMNS('message.publish'),
columns => ?COLUMNS('message.publish'),
sql_example => <<"SELECT payload.msg as msg FROM \"t/#\" WHERE msg = 'hello'">>
}).
-define(EVENT_INFO_MESSAGE_DELIVER,
#{ event => '$events/message_delivered',
title => #{en => <<"message delivered">>, zh => <<"消息投递"/utf8>>},
description => #{en => <<"message delivered">>, zh => <<"消息投递"/utf8>>},
test_columns => ?TEST_COLUMNS('message.delivered'),
columns => ?COLUMNS('message.delivered'),
sql_example => <<"SELECT * FROM \"$events/message_delivered\" WHERE topic =~ 't/#'">>
}).
-define(EVENT_INFO_MESSAGE_ACKED,
#{ event => '$events/message_acked',
title => #{en => <<"message acked">>, zh => <<"消息应答"/utf8>>},
description => #{en => <<"message acked">>, zh => <<"消息应答"/utf8>>},
test_columns => ?TEST_COLUMNS('message.acked'),
columns => ?COLUMNS('message.acked'),
sql_example => <<"SELECT * FROM \"$events/message_acked\" WHERE topic =~ 't/#'">>
}).
-define(EVENT_INFO_MESSAGE_DROPPED,
#{ event => '$events/message_dropped',
title => #{en => <<"message dropped">>, zh => <<"消息丢弃"/utf8>>},
description => #{en => <<"message dropped">>, zh => <<"消息丢弃"/utf8>>},
test_columns => ?TEST_COLUMNS('message.dropped'),
columns => ?COLUMNS('message.dropped'),
sql_example => <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">>
}).
-define(EVENT_INFO_CLIENT_CONNECTED,
#{ event => '$events/client_connected',
title => #{en => <<"client connected">>, zh => <<"连接建立"/utf8>>},
description => #{en => <<"client connected">>, zh => <<"连接建立"/utf8>>},
test_columns => ?TEST_COLUMNS('client.connected'),
columns => ?COLUMNS('client.connected'),
sql_example => <<"SELECT * FROM \"$events/client_connected\"">>
}).
-define(EVENT_INFO_CLIENT_DISCONNECTED,
#{ event => '$events/client_disconnected',
title => #{en => <<"client disconnected">>, zh => <<"连接断开"/utf8>>},
description => #{en => <<"client disconnected">>, zh => <<"连接断开"/utf8>>},
test_columns => ?TEST_COLUMNS('client.disconnected'),
columns => ?COLUMNS('client.disconnected'),
sql_example => <<"SELECT * FROM \"$events/client_disconnected\"">>
}).
-define(EVENT_INFO_SESSION_SUBSCRIBED,
#{ event => '$events/session_subscribed',
title => #{en => <<"session subscribed">>, zh => <<"会话订阅完成"/utf8>>},
description => #{en => <<"session subscribed">>, zh => <<"会话订阅完成"/utf8>>},
test_columns => ?TEST_COLUMNS('session.subscribed'),
columns => ?COLUMNS('session.subscribed'),
sql_example => <<"SELECT * FROM \"$events/session_subscribed\" WHERE topic =~ 't/#'">>
}).
-define(EVENT_INFO_SESSION_UNSUBSCRIBED,
#{ event => '$events/session_unsubscribed',
title => #{en => <<"session unsubscribed">>, zh => <<"会话取消订阅完成"/utf8>>},
description => #{en => <<"session unsubscribed">>, zh => <<"会话取消订阅完成"/utf8>>},
test_columns => ?TEST_COLUMNS('session.unsubscribed'),
columns => ?COLUMNS('session.unsubscribed'),
sql_example => <<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">>
}).
-define(EVENT_INFO,
[ ?EVENT_INFO_MESSAGE_PUBLISH
, ?EVENT_INFO_MESSAGE_DELIVER
, ?EVENT_INFO_MESSAGE_ACKED
, ?EVENT_INFO_MESSAGE_DROPPED
, ?EVENT_INFO_CLIENT_CONNECTED
, ?EVENT_INFO_CLIENT_DISCONNECTED
, ?EVENT_INFO_SESSION_SUBSCRIBED
, ?EVENT_INFO_SESSION_UNSUBSCRIBED
]).
-define(EG_ENVS(EVENT_TOPIC),
case EVENT_TOPIC of
<<"$events/", _/binary>> ->
EventName = emqx_rule_events:event_name(EVENT_TOPIC),
emqx_rule_maps:atom_key_map(maps:from_list(?EG_COLUMNS(EventName)));
_PublishTopic ->
#{id => emqx_guid:to_hexstr(emqx_guid:gen()),
clientid => <<"c_emqx">>,
username => <<"u_emqx">>,
payload => <<"{\"id\": 1, \"name\": \"ha\"}">>,
peerhost => <<"127.0.0.1">>,
topic => <<"t/a">>,
qos => 1,
flags => #{sys => true, event => true},
publish_received_at => emqx_rule_utils:now_ms(),
timestamp => emqx_rule_utils:now_ms(),
node => node()
}
end).