diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 7bd8cfe73..e47e8d04e 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1009,22 +1009,6 @@ zones.default { ## Broker ##================================================================== broker { - ## System interval of publishing $SYS messages. - ## - ## @doc broker.sys_msg_interval - ## ValueType: Duration | disabled - ## Default: 1m - sys_msg_interval = 1m - - ## System heartbeat interval of publishing following heart beat message: - ## - "$SYS/brokers//uptime" - ## - "$SYS/brokers//datetime" - ## - ## @doc broker.sys_heartbeat_interval - ## ValueType: Duration - ## Default: 30s | disabled - sys_heartbeat_interval = 30s - ## Session locking strategy in a cluster. ## ## @doc broker.session_locking_strategy @@ -1095,6 +1079,51 @@ broker { perf.trie_compaction = true } +##================================================================== +## System Topic +##================================================================== + +sys_topic { + ## System interval of publishing $SYS messages. + ## + ## @doc broker.sys_msg_interval + ## ValueType: Duration | disabled + ## Default: 1m + sys_msg_interval = 1m + + ## System heartbeat interval of publishing following heart beat message: + ## - "$SYS/brokers//uptime" + ## - "$SYS/brokers//datetime" + ## + ## @doc broker.sys_heartbeat_interval + ## ValueType: Duration + ## Default: 30s | disabled + sys_heartbeat_interval = 30s + + ## Whether to enable Client lifecycle event messages publish. + ## The following options are not only for enabling MQTT client event messages + ## publish but also for Gateway clients. However, these kinds of clients type + ## are distinguished by the Topic prefix: + ## - For the MQTT client, its event topic format is: + ## $SYS/broker//clients// + ## - For the Gateway client, it is + ## $SYS/broker//gateway//clients// + sys_event_messages { + ## Enable to publish client connected event messages. + ## - Topic: "$SYS/broker//clients//connected" + client_connected = true + ## Enable to publish client disconnected event messages. + ## - Topic: "$SYS/broker//clients//disconnected" + client_disconnected = true + ## Enable to publish event message that client subscribed a topic successfully. + ## - Topic: "$SYS/broker//clients//subscribed" + client_subscribed = false + ## Enable to publish event message that client unsubscribed a topic successfully. + ## - Topic: "$SYS/broker//clients//unsubscribed" + client_unsubscribed = false + } +} + ##================================================================== ## System Monitor ##================================================================== diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index d09acd25e..01b93cc06 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -144,6 +144,9 @@ roots(medium) -> [ {"broker", sc(ref("broker"), #{})} + , {"sys_topic", + sc(ref("sys_topic"), + #{})} , {"rate_limit", sc(ref("rate_limit"), #{})} @@ -857,17 +860,7 @@ fields("deflate_opts") -> ]; fields("broker") -> - [ {"sys_msg_interval", - sc(hoconsc:union([disabled, duration()]), - #{ default => "1m" - }) - } - , {"sys_heartbeat_interval", - sc(hoconsc:union([disabled, duration()]), - #{ default => "30s" - }) - } - , {"enable_session_registry", + [ {"enable_session_registry", sc(boolean(), #{ default => true }) @@ -909,6 +902,55 @@ fields("broker_perf") -> })} ]; +fields("sys_topic") -> + [ {"sys_msg_interval", + sc(hoconsc:union([disabled, duration()]), + #{ default => "1m" + }) + } + , {"sys_heartbeat_interval", + sc(hoconsc:union([disabled, duration()]), + #{ default => "30s" + }) + } + , {"sys_event_messages", + sc(ref("event_names"), + #{ desc => + """Whether to enable Client lifecycle event messages publish.
+The following options are not only for enabling MQTT client event messages +publish but also for Gateway clients. However, these kinds of clients type +are distinguished by the Topic prefix: +- For the MQTT client, its event topic format is:
+ $SYS/broker//clients//
+- For the Gateway client, it is + $SYS/broker//gateway//clients//""" + }) + } + ]; + +fields("event_names") -> + [ {"client_connected", + sc(boolean(), + #{default => true + }) + } + , {"client_disconnected", + sc(boolean(), + #{default => true + }) + } + , {"client_subscribed", + sc(boolean(), + #{default => false + }) + } + , {"client_unsubscribed", + sc(boolean(), + #{default => false + }) + } + ]; + fields("sysmon") -> [ {"vm", sc(ref("sysmon_vm"), diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 1beac2092..0d3a78fef 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -43,6 +43,12 @@ , terminate/2 ]). +-export([ on_client_connected/2 + , on_client_disconnected/3 + , on_client_subscribed/3 + , on_client_unsubscribed/3 + ]). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -67,9 +73,9 @@ , sysdescr % Broker description ]). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% APIs -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -spec(start_link() -> {ok, pid()} | ignore | {error, any()}). start_link() -> @@ -101,10 +107,13 @@ datetime() -> "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). sys_interval() -> - emqx:get_config([broker, sys_msg_interval]). + emqx:get_config([sys_topic, sys_msg_interval]). sys_heatbeat_interval() -> - emqx:get_config([broker, sys_heartbeat_interval]). + emqx:get_config([sys_topic, sys_heartbeat_interval]). + +sys_event_message() -> + emqx:get_config([sys_topic, sys_event_messages]). %% @doc Get sys info -spec(info() -> list(tuple())). @@ -114,12 +123,13 @@ info() -> {uptime, uptime()}, {datetime, datetime()}]. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% gen_server callbacks -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- init([]) -> State = #state{sysdescr = iolist_to_binary(sysdescr())}, + load_event_hooks(), {ok, heartbeat(tick(State))}. heartbeat(State) -> @@ -127,6 +137,14 @@ heartbeat(State) -> tick(State) -> State#state{ticker = start_timer(sys_interval(), tick)}. +load_event_hooks() -> + maps:foreach( + fun(_, false) -> ok; + (K, true) -> + {HookPoint, Fun} = hook_and_fun(K), + emqx_hooks:put(HookPoint, {?MODULE, Fun, []}) + end, sys_event_message()). + handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -153,11 +171,81 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> + unload_event_hooks(), lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]). -%%----------------------------------------------------------------------------- +unload_event_hooks() -> + lists:foreach(fun(K, _) -> + {HookPoint, Fun} = hook_and_fun(K), + emqx_hooks:del(HookPoint, {?MODULE, Fun}) + end, sys_event_message()). + +%%-------------------------------------------------------------------- +%% hook callbacks +%%-------------------------------------------------------------------- + +on_client_connected(ClientInfo, ConnInfo) -> + Payload0 = common_infos(ClientInfo, ConnInfo), + Payload = Payload0#{ + keepalive => maps:get(keepalive, ConnInfo, 0), + clean_start => maps:get(clean_start, ConnInfo, true), + expiry_interval => maps:get(expiry_interval, ConnInfo, 0) + }, + publish(connected, Payload). + +on_client_disconnected(ClientInfo, Reason, + ConnInfo = #{disconnected_at := DisconnectedAt}) -> + + Payload0 = common_infos(ClientInfo, ConnInfo), + Payload = Payload0#{ + reason => reason(Reason), + disconnected_at => DisconnectedAt + }, + publish(disconnected, Payload). + +-compile({inline, [reason/1]}). +reason(Reason) when is_atom(Reason) -> Reason; +reason({shutdown, Reason}) when is_atom(Reason) -> Reason; +reason({Error, _}) when is_atom(Error) -> Error; +reason(_) -> internal_error. + +on_client_subscribed(_ClientInfo = #{clientid := ClientId, + username := Username, + protocol := Protocol}, + Topic, SubOpts) -> + Payload = #{clientid => ClientId, + username => Username, + protocol => Protocol, + topic => Topic, + subopts => SubOpts, + ts => erlang:system_time(millisecond) + }, + publish(subscribed, Payload). + +on_client_unsubscribed(_ClientInfo = #{clientid := ClientId, + username := Username, + protocol := Protocol}, + Topic, _SubOpts) -> + Payload = #{clientid => ClientId, + username => Username, + protocol => Protocol, + topic => Topic, + ts => erlang:system_time(millisecond) + }, + publish(unsubscribed, Payload). + +%%-------------------------------------------------------------------- %% Internal functions -%%----------------------------------------------------------------------------- +%%-------------------------------------------------------------------- + +hook_and_fun(client_connected) -> + {'client.connected', on_client_connected}; +hook_and_fun(client_disconnected) -> + {'client.disconnected', on_client_disconnected}; +hook_and_fun(client_subscribed) -> + {'session.subscribed', on_client_subscribed}; +hook_and_fun(client_unsubscribed) -> + {'session.unsubscribed', on_client_unsubscribed}. publish_any(Name, Value) -> _ = publish(Name, Value), @@ -179,7 +267,11 @@ publish(stats, Stats) -> || {Stat, Val} <- Stats, is_atom(Stat), is_integer(Val)]; publish(metrics, Metrics) -> [safe_publish(systop(metric_topic(Name)), integer_to_binary(Val)) - || {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)]. + || {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)]; +publish(Event, Payload) when Event == connected; Event == disconnected; + Event == subscribed; Event == unsubscribed -> + Topic = event_topic(Event, Payload), + safe_publish(Topic, emqx_json:encode(Payload)). metric_topic(Name) -> lists:concat(["metrics/", string:replace(atom_to_list(Name), ".", "/", all)]). @@ -191,3 +283,39 @@ safe_publish(Topic, Flags, Payload) -> emqx_message:set_flags( maps:merge(#{sys => true}, Flags), emqx_message:make(?SYS, Topic, iolist_to_binary(Payload)))). + +common_infos( + _ClientInfo = #{clientid := ClientId, + username := Username, + peerhost := PeerHost, + sockport := SockPort, + protocol := Protocol + }, + _ConnInfo = #{proto_name := ProtoName, + proto_ver := ProtoVer, + connected_at := ConnectedAt + }) -> + #{clientid => ClientId, + username => Username, + ipaddress => ntoa(PeerHost), + sockport => SockPort, + protocol => Protocol, + proto_name => ProtoName, + proto_ver => ProtoVer, + connected_at => ConnectedAt, + ts => erlang:system_time(millisecond) + }. + +ntoa(undefined) -> undefined; +ntoa({IpAddr, Port}) -> + iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); +ntoa(IpAddr) -> + iolist_to_binary(inet:ntoa(IpAddr)). + +event_topic(Event, #{clientid := ClientId, protocol := mqtt}) -> + iolist_to_binary( + [systop("clients"), "/", ClientId, "/", atom_to_binary(Event)]); +event_topic(Event, #{clientid := ClientId, protocol := GwName}) -> + iolist_to_binary( + [systop("gateway"), "/", atom_to_binary(GwName), + "/clients/", ClientId, "/", atom_to_binary(Event)]).