From 3903575435a2d68c0317da5f08477cf8d903d26e Mon Sep 17 00:00:00 2001 From: Turtle Date: Thu, 29 Jul 2021 20:47:20 +0800 Subject: [PATCH] feat(event-topic): support publish broker event topic --- apps/emqx_modules/etc/emqx_modules.conf | 13 +- apps/emqx_modules/src/emqx_event_topic.erl | 246 ++++++++++++++++++ apps/emqx_modules/src/emqx_modules_app.erl | 4 +- apps/emqx_modules/src/emqx_modules_schema.erl | 26 +- apps/emqx_modules/src/emqx_presence.erl | 122 --------- extension_schemas.config | 2 +- 6 files changed, 283 insertions(+), 130 deletions(-) create mode 100644 apps/emqx_modules/src/emqx_event_topic.erl delete mode 100644 apps/emqx_modules/src/emqx_presence.erl diff --git a/apps/emqx_modules/etc/emqx_modules.conf b/apps/emqx_modules/etc/emqx_modules.conf index 9049ca6c7..636346288 100644 --- a/apps/emqx_modules/etc/emqx_modules.conf +++ b/apps/emqx_modules/etc/emqx_modules.conf @@ -11,8 +11,17 @@ telemetry: { enable: true } -presence: { - enable: true + +event_topic: { + topics: [ + "$event/client_connected", + "$event/client_disconnected", + "$event/session_subscribed", + "$event/session_unsubscribed", + "$event/message_delivered", + "$event/message_acked", + "$event/message_dropped" + ] } topic_metrics:{ diff --git a/apps/emqx_modules/src/emqx_event_topic.erl b/apps/emqx_modules/src/emqx_event_topic.erl new file mode 100644 index 000000000..e84a3d0ec --- /dev/null +++ b/apps/emqx_modules/src/emqx_event_topic.erl @@ -0,0 +1,246 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_event_topic). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-export([ enable/0 + , disable/0 + ]). + +-export([ on_client_connected/2 + , on_client_disconnected/3 + , on_session_subscribed/3 + , on_session_unsubscribed/3 + , on_message_dropped/3 + , on_message_delivered/2 + , on_message_acked/2 + ]). + +-ifdef(TEST). +-export([reason/1]). +-endif. + +enable() -> + emqx_hooks:put('client.connected', {?MODULE, on_client_connected, []}), + emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, []}), + emqx_hooks:put('session.subscribed', {?MODULE, on_session_subscribed, []}), + emqx_hooks:put('session.unsubscribed', {?MODULE, on_session_unsubscribed, []}), + emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}), + emqx_hooks:put('message.acked', {?MODULE, on_message_acked, []}), + emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}). + +disable() -> + emqx_hooks:del('client.connected', {?MODULE, on_client_connected}), + emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}), + emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}), + emqx_hooks:del('session.unsubscribed', {?MODULE, session_unsubscribed}), + emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}), + emqx_hooks:del('message.acked', {?MODULE, on_message_acked}), + emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped}). + +%%-------------------------------------------------------------------- +%% Callbacks +%%-------------------------------------------------------------------- + +on_client_connected(ClientInfo, ConnInfo) -> + Payload0 = connected_payload(ClientInfo, ConnInfo), + emqx_broker:safe_publish( + make_msg(<<"$event/client_connected">>, + emqx_json:encode(Payload0))). + +on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username}, + Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}) -> + Payload0 = #{clientid => ClientId, + username => Username, + reason => reason(Reason), + disconnected_at => DisconnectedAt, + ts => erlang:system_time(millisecond) + }, + emqx_broker:safe_publish( + make_msg(<<"$event/client_connected">>, + emqx_json:encode(Payload0))). + +on_session_subscribed(_ClientInfo = #{clientid := ClientId, + username := Username}, + Topic, SubOpts) -> + Payload0 = #{clientid => ClientId, + username => Username, + topic => Topic, + subopts => SubOpts, + ts => erlang:system_time(millisecond) + }, + emqx_broker:safe_publish( + make_msg(<<"$event/session_subscribed">>, + emqx_json:encode(Payload0))). + +on_session_unsubscribed(_ClientInfo = #{clientid := ClientId, + username := Username}, + Topic, _SubOpts) -> + Payload0 = #{clientid => ClientId, + username => Username, + topic => Topic, + ts => erlang:system_time(millisecond) + }, + emqx_broker:safe_publish( + make_msg(<<"$event/session_unsubscribed">>, + emqx_json:encode(Payload0))). + +on_message_dropped(Message = #message{from = ClientId}, _, Reason) -> + case ignore_sys_message(Message) of + true -> ok; + false -> + Payload0 = base_message(Message), + Payload1 = Payload0#{ + reason => Reason, + clientid => ClientId, + username => emqx_message:get_header(username, Message, undefined), + peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)) + }, + emqx_broker:safe_publish( + make_msg(<<"$event/message_dropped">>, emqx_json:encode(Payload1))) + end, + {ok, Message}. + +on_message_delivered(_ClientInfo = #{ + peerhost := PeerHost, + clientid := ReceiverCId, + username := ReceiverUsername}, + #message{from = ClientId} = Message) -> + case ignore_sys_message(Message) of + true -> ok; + false -> + Payload0 = base_message(Message), + Payload1 = Payload0#{ + from_clientid => ClientId, + from_username => emqx_message:get_header(username, Message, undefined), + clientid => ReceiverCId, + username => ReceiverUsername, + peerhost => ntoa(PeerHost) + }, + emqx_broker:safe_publish( + make_msg(<<"$event/message_delivered">>, emqx_json:encode(Payload1))) + end, + {ok, Message}. + +on_message_acked(_ClientInfo = #{ + peerhost := PeerHost, + clientid := ReceiverCId, + username := ReceiverUsername}, + #message{from = ClientId} = Message) -> + case ignore_sys_message(Message) of + true -> ok; + false -> + Payload0 = base_message(Message), + Payload1 = Payload0#{ + from_clientid => ClientId, + from_username => emqx_message:get_header(username, Message, undefined), + clientid => ReceiverCId, + username => ReceiverUsername, + peerhost => ntoa(PeerHost) + }, + emqx_broker:safe_publish( + make_msg(<<"$event/message_acked">>, emqx_json:encode(Payload1))) + end, + {ok, Message}. + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +connected_payload(#{peerhost := PeerHost, + sockport := SockPort, + clientid := ClientId, + username := Username + }, + #{clean_start := CleanStart, + proto_name := ProtoName, + proto_ver := ProtoVer, + keepalive := Keepalive, + connected_at := ConnectedAt, + expiry_interval := ExpiryInterval + }) -> + #{clientid => ClientId, + username => Username, + ipaddress => ntoa(PeerHost), + sockport => SockPort, + proto_name => ProtoName, + proto_ver => ProtoVer, + keepalive => Keepalive, + connack => 0, %% Deprecated? + clean_start => CleanStart, + expiry_interval => ExpiryInterval div 1000, + connected_at => ConnectedAt, + ts => erlang:system_time(millisecond) + }. + +make_msg(Topic, Payload) -> + emqx_message:set_flag( + sys, emqx_message:make( + ?MODULE, 0, Topic, iolist_to_binary(Payload))). + +-compile({inline, [reason/1]}). +reason(Reason) when is_atom(Reason) -> Reason; +reason({shutdown, Reason}) when is_atom(Reason) -> Reason; +reason({Error, _}) when is_atom(Error) -> Error; +reason(_) -> internal_error. + +ntoa(undefined) -> undefined; +ntoa({IpAddr, Port}) -> + iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); +ntoa(IpAddr) -> + iolist_to_binary(inet:ntoa(IpAddr)). + +printable_maps(undefined) -> #{}; +printable_maps(Headers) -> + maps:fold( + fun (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname -> + AccIn#{K => ntoa(V0)}; + ('User-Property', V0, AccIn) when is_list(V0) -> + AccIn#{ + 'User-Property' => maps:from_list(V0), + 'User-Property-Pairs' => [#{ + key => Key, + value => Value + } || {Key, Value} <- V0] + }; + (K, V0, AccIn) -> AccIn#{K => V0} + end, #{}, Headers). + +base_message(Message) -> + #message{ + id = Id, + qos = QoS, + flags = Flags, + topic = Topic, + headers = Headers, + payload = Payload, + timestamp = Timestamp} = Message, + #{ + id => emqx_guid:to_hexstr(Id), + payload => Payload, + topic => Topic, + qos => QoS, + flags => Flags, + headers => printable_maps(Headers), + pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), + publish_received_at => Timestamp + }. + +ignore_sys_message(#message{flags = Flags}) -> + maps:get(sys, Flags, false). diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index 2b0ee482d..d52021942 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -33,16 +33,16 @@ stop(_State) -> maybe_enable_modules() -> emqx_config:get([delayed, enable], true) andalso emqx_delayed:enable(), - emqx_config:get([presence, enable], true) andalso emqx_presence:enable(), emqx_config:get([telemetry, enable], true) andalso emqx_telemetry:enable(), emqx_config:get([recon, enable], true) andalso emqx_recon:enable(), + emqx_event_topic:enable(), emqx_rewrite:enable(), emqx_topic_metrics:enable(). maybe_disable_modules() -> emqx_config:get([delayed, enable], true) andalso emqx_delayed:disable(), - emqx_config:get([presence, enable], true) andalso emqx_presence:disable(), emqx_config:get([telemetry, enable], true) andalso emqx_telemetry:disable(), emqx_config:get([recon, enable], true) andalso emqx_recon:disable(), + emqx_event_topic:disable(), emqx_rewrite:disable(), emqx_topic_metrics:disable(). diff --git a/apps/emqx_modules/src/emqx_modules_schema.erl b/apps/emqx_modules/src/emqx_modules_schema.erl index e7b8742e4..4c3a0c992 100644 --- a/apps/emqx_modules/src/emqx_modules_schema.erl +++ b/apps/emqx_modules/src/emqx_modules_schema.erl @@ -27,13 +27,12 @@ structs() -> ["delayed", "recon", "telemetry", - "presence", + "event_topic", "rewrite", "topic_metrics"]. fields(Name) when Name =:= "recon"; - Name =:= "telemetry"; - Name =:= "presence" -> + Name =:= "telemetry" -> [ {enable, emqx_schema:t(boolean(), undefined, false)} ]; @@ -46,6 +45,10 @@ fields("rewrite") -> [ {rules, hoconsc:array(hoconsc:ref(?MODULE, "rules"))} ]; +fields("event_topic") -> + [ {topics, fun topics/1} + ]; + fields("topic_metrics") -> [ {topics, hoconsc:array(binary())} ]; @@ -56,3 +59,20 @@ fields("rules") -> , {re, emqx_schema:t(binary())} , {dest_topic, emqx_schema:t(binary())} ]. + +topics(type) -> hoconsc:array(binary()); +topics(default) -> []; +% topics(validator) -> [ +% fun(Conf) -> +% case lists:member(Conf, ["$event/client_connected", +% "$event/client_disconnected", +% "$event/session_subscribed", +% "$event/session_unsubscribed", +% "$event/message_delivered", +% "$event/message_acked", +% "$event/message_dropped"]) of +% true -> ok; +% false -> {error, "Bad event topic"} +% end +% end]; +topics(_) -> undefined. diff --git a/apps/emqx_modules/src/emqx_presence.erl b/apps/emqx_modules/src/emqx_presence.erl deleted file mode 100644 index c85d951e1..000000000 --- a/apps/emqx_modules/src/emqx_presence.erl +++ /dev/null @@ -1,122 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_presence). - --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/logger.hrl"). - --logger_header("[Presence]"). - --export([ enable/0 - , disable/0 - ]). - --export([ on_client_connected/2 - , on_client_disconnected/3 - ]). - --ifdef(TEST). --export([reason/1]). --endif. - -enable() -> - emqx_hooks:put('client.connected', {?MODULE, on_client_connected, []}), - emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, []}). - -disable() -> - emqx_hooks:del('client.connected', {?MODULE, on_client_connected}), - emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}). - -%%-------------------------------------------------------------------- -%% Callbacks -%%-------------------------------------------------------------------- - -on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo) -> - Presence = connected_presence(ClientInfo, ConnInfo), - case emqx_json:safe_encode(Presence) of - {ok, Payload} -> - emqx_broker:safe_publish( - make_msg(topic(connected, ClientId), Payload)); - {error, _Reason} -> - ?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence]) - end. - -on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username}, - Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}) -> - Presence = #{clientid => ClientId, - username => Username, - reason => reason(Reason), - disconnected_at => DisconnectedAt, - ts => erlang:system_time(millisecond) - }, - case emqx_json:safe_encode(Presence) of - {ok, Payload} -> - emqx_broker:safe_publish( - make_msg(topic(disconnected, ClientId), Payload)); - {error, _Reason} -> - ?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence]) - end. - -%%-------------------------------------------------------------------- -%% Helper functions -%%-------------------------------------------------------------------- - -connected_presence(#{peerhost := PeerHost, - sockport := SockPort, - clientid := ClientId, - username := Username - }, - #{clean_start := CleanStart, - proto_name := ProtoName, - proto_ver := ProtoVer, - keepalive := Keepalive, - connected_at := ConnectedAt, - expiry_interval := ExpiryInterval - }) -> - #{clientid => ClientId, - username => Username, - ipaddress => ntoa(PeerHost), - sockport => SockPort, - proto_name => ProtoName, - proto_ver => ProtoVer, - keepalive => Keepalive, - connack => 0, %% Deprecated? - clean_start => CleanStart, - expiry_interval => ExpiryInterval div 1000, - connected_at => ConnectedAt, - ts => erlang:system_time(millisecond) - }. - -make_msg(Topic, Payload) -> - emqx_message:set_flag( - sys, emqx_message:make( - ?MODULE, 0, Topic, iolist_to_binary(Payload))). - -topic(connected, ClientId) -> - emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/connected"])); -topic(disconnected, ClientId) -> - emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/disconnected"])). - --compile({inline, [reason/1]}). -reason(Reason) when is_atom(Reason) -> Reason; -reason({shutdown, Reason}) when is_atom(Reason) -> Reason; -reason({Error, _}) when is_atom(Error) -> Error; -reason(_) -> internal_error. - --compile({inline, [ntoa/1]}). -ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)). - diff --git a/extension_schemas.config b/extension_schemas.config index 4880dfa86..725c9cd6c 100644 --- a/extension_schemas.config +++ b/extension_schemas.config @@ -16,7 +16,7 @@ , {"delayed", emqx_modules_schema} , {"recon", emqx_modules_schema} , {"telemetry", emqx_modules_schema} -, {"presence", emqx_modules_schema} +, {"event_topic", emqx_modules_schema} , {"rewrite", emqx_modules_schema} , {"topic_metrics", emqx_modules_schema} ].