diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 7bd8cfe73..e6249a039 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1009,22 +1009,6 @@ zones.default { ## Broker ##================================================================== broker { - ## System interval of publishing $SYS messages. - ## - ## @doc broker.sys_msg_interval - ## ValueType: Duration | disabled - ## Default: 1m - sys_msg_interval = 1m - - ## System heartbeat interval of publishing following heart beat message: - ## - "$SYS/brokers//uptime" - ## - "$SYS/brokers//datetime" - ## - ## @doc broker.sys_heartbeat_interval - ## ValueType: Duration - ## Default: 30s | disabled - sys_heartbeat_interval = 30s - ## Session locking strategy in a cluster. ## ## @doc broker.session_locking_strategy @@ -1095,6 +1079,51 @@ broker { perf.trie_compaction = true } +##================================================================== +## System Topic +##================================================================== + +sys_topics { + ## System interval of publishing $SYS messages. + ## + ## @doc broker.sys_msg_interval + ## ValueType: Duration | disabled + ## Default: 1m + sys_msg_interval = 1m + + ## System heartbeat interval of publishing following heart beat message: + ## - "$SYS/brokers//uptime" + ## - "$SYS/brokers//datetime" + ## + ## @doc broker.sys_heartbeat_interval + ## ValueType: Duration + ## Default: 30s | disabled + sys_heartbeat_interval = 30s + + ## Whether to enable Client lifecycle event messages publish. + ## The following options are not only for enabling MQTT client event messages + ## publish but also for Gateway clients. However, these kinds of clients type + ## are distinguished by the Topic prefix: + ## - For the MQTT client, its event topic format is: + ## $SYS/broker//clients// + ## - For the Gateway client, it is + ## $SYS/broker//gateway//clients// + sys_event_messages { + ## Enable to publish client connected event messages. + ## - Topic: "$SYS/broker//clients//connected" + client_connected = true + ## Enable to publish client disconnected event messages. + ## - Topic: "$SYS/broker//clients//disconnected" + client_disconnected = true + ## Enable to publish event message that client subscribed a topic successfully. + ## - Topic: "$SYS/broker//clients//subscribed" + client_subscribed = false + ## Enable to publish event message that client unsubscribed a topic successfully. + ## - Topic: "$SYS/broker//clients//unsubscribed" + client_unsubscribed = false + } +} + ##================================================================== ## System Monitor ##================================================================== diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index d09acd25e..94de42e31 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -144,6 +144,9 @@ roots(medium) -> [ {"broker", sc(ref("broker"), #{})} + , {"sys_topics", + sc(ref("sys_topics"), + #{})} , {"rate_limit", sc(ref("rate_limit"), #{})} @@ -857,17 +860,7 @@ fields("deflate_opts") -> ]; fields("broker") -> - [ {"sys_msg_interval", - sc(hoconsc:union([disabled, duration()]), - #{ default => "1m" - }) - } - , {"sys_heartbeat_interval", - sc(hoconsc:union([disabled, duration()]), - #{ default => "30s" - }) - } - , {"enable_session_registry", + [ {"enable_session_registry", sc(boolean(), #{ default => true }) @@ -909,6 +902,55 @@ fields("broker_perf") -> })} ]; +fields("sys_topics") -> + [ {"sys_msg_interval", + sc(hoconsc:union([disabled, duration()]), + #{ default => "1m" + }) + } + , {"sys_heartbeat_interval", + sc(hoconsc:union([disabled, duration()]), + #{ default => "30s" + }) + } + , {"sys_event_messages", + sc(ref("event_names"), + #{ desc => + """Whether to enable Client lifecycle event messages publish.
+The following options are not only for enabling MQTT client event messages +publish but also for Gateway clients. However, these kinds of clients type +are distinguished by the Topic prefix: +- For the MQTT client, its event topic format is:
+ $SYS/broker//clients//
+- For the Gateway client, it is + $SYS/broker//gateway//clients//""" + }) + } + ]; + +fields("event_names") -> + [ {"client_connected", + sc(boolean(), + #{default => true + }) + } + , {"client_disconnected", + sc(boolean(), + #{default => true + }) + } + , {"client_subscribed", + sc(boolean(), + #{default => false + }) + } + , {"client_unsubscribed", + sc(boolean(), + #{default => false + }) + } + ]; + fields("sysmon") -> [ {"vm", sc(ref("sysmon_vm"), diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 1beac2092..2deb250d8 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -43,6 +43,12 @@ , terminate/2 ]). +-export([ on_client_connected/2 + , on_client_disconnected/3 + , on_client_subscribed/3 + , on_client_unsubscribed/3 + ]). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -67,9 +73,9 @@ , sysdescr % Broker description ]). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% APIs -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -spec(start_link() -> {ok, pid()} | ignore | {error, any()}). start_link() -> @@ -101,10 +107,13 @@ datetime() -> "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). sys_interval() -> - emqx:get_config([broker, sys_msg_interval]). + emqx:get_config([sys_topics, sys_msg_interval]). sys_heatbeat_interval() -> - emqx:get_config([broker, sys_heartbeat_interval]). + emqx:get_config([sys_topics, sys_heartbeat_interval]). + +sys_event_messages() -> + emqx:get_config([sys_topics, sys_event_messages]). %% @doc Get sys info -spec(info() -> list(tuple())). @@ -114,12 +123,13 @@ info() -> {uptime, uptime()}, {datetime, datetime()}]. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% gen_server callbacks -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- init([]) -> State = #state{sysdescr = iolist_to_binary(sysdescr())}, + load_event_hooks(), {ok, heartbeat(tick(State))}. heartbeat(State) -> @@ -127,6 +137,14 @@ heartbeat(State) -> tick(State) -> State#state{ticker = start_timer(sys_interval(), tick)}. +load_event_hooks() -> + lists:foreach( + fun({_, false}) -> ok; + ({K, true}) -> + {HookPoint, Fun} = hook_and_fun(K), + emqx_hooks:put(HookPoint, {?MODULE, Fun, []}) + end, maps:to_list(sys_event_messages())). + handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -153,11 +171,81 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> + unload_event_hooks(), lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]). -%%----------------------------------------------------------------------------- +unload_event_hooks() -> + lists:foreach(fun({K, _}) -> + {HookPoint, Fun} = hook_and_fun(K), + emqx_hooks:del(HookPoint, {?MODULE, Fun}) + end, maps:to_list(sys_event_messages())). + +%%-------------------------------------------------------------------- +%% hook callbacks +%%-------------------------------------------------------------------- + +on_client_connected(ClientInfo, ConnInfo) -> + Payload0 = common_infos(ClientInfo, ConnInfo), + Payload = Payload0#{ + keepalive => maps:get(keepalive, ConnInfo, 0), + clean_start => maps:get(clean_start, ConnInfo, true), + expiry_interval => maps:get(expiry_interval, ConnInfo, 0) + }, + publish(connected, Payload). + +on_client_disconnected(ClientInfo, Reason, + ConnInfo = #{disconnected_at := DisconnectedAt}) -> + + Payload0 = common_infos(ClientInfo, ConnInfo), + Payload = Payload0#{ + reason => reason(Reason), + disconnected_at => DisconnectedAt + }, + publish(disconnected, Payload). + +-compile({inline, [reason/1]}). +reason(Reason) when is_atom(Reason) -> Reason; +reason({shutdown, Reason}) when is_atom(Reason) -> Reason; +reason({Error, _}) when is_atom(Error) -> Error; +reason(_) -> internal_error. + +on_client_subscribed(_ClientInfo = #{clientid := ClientId, + username := Username, + protocol := Protocol}, + Topic, SubOpts) -> + Payload = #{clientid => ClientId, + username => Username, + protocol => Protocol, + topic => Topic, + subopts => SubOpts, + ts => erlang:system_time(millisecond) + }, + publish(subscribed, Payload). + +on_client_unsubscribed(_ClientInfo = #{clientid := ClientId, + username := Username, + protocol := Protocol}, + Topic, _SubOpts) -> + Payload = #{clientid => ClientId, + username => Username, + protocol => Protocol, + topic => Topic, + ts => erlang:system_time(millisecond) + }, + publish(unsubscribed, Payload). + +%%-------------------------------------------------------------------- %% Internal functions -%%----------------------------------------------------------------------------- +%%-------------------------------------------------------------------- + +hook_and_fun(client_connected) -> + {'client.connected', on_client_connected}; +hook_and_fun(client_disconnected) -> + {'client.disconnected', on_client_disconnected}; +hook_and_fun(client_subscribed) -> + {'session.subscribed', on_client_subscribed}; +hook_and_fun(client_unsubscribed) -> + {'session.unsubscribed', on_client_unsubscribed}. publish_any(Name, Value) -> _ = publish(Name, Value), @@ -179,7 +267,11 @@ publish(stats, Stats) -> || {Stat, Val} <- Stats, is_atom(Stat), is_integer(Val)]; publish(metrics, Metrics) -> [safe_publish(systop(metric_topic(Name)), integer_to_binary(Val)) - || {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)]. + || {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)]; +publish(Event, Payload) when Event == connected; Event == disconnected; + Event == subscribed; Event == unsubscribed -> + Topic = event_topic(Event, Payload), + safe_publish(Topic, emqx_json:encode(Payload)). metric_topic(Name) -> lists:concat(["metrics/", string:replace(atom_to_list(Name), ".", "/", all)]). @@ -191,3 +283,39 @@ safe_publish(Topic, Flags, Payload) -> emqx_message:set_flags( maps:merge(#{sys => true}, Flags), emqx_message:make(?SYS, Topic, iolist_to_binary(Payload)))). + +common_infos( + _ClientInfo = #{clientid := ClientId, + username := Username, + peerhost := PeerHost, + sockport := SockPort, + protocol := Protocol + }, + _ConnInfo = #{proto_name := ProtoName, + proto_ver := ProtoVer, + connected_at := ConnectedAt + }) -> + #{clientid => ClientId, + username => Username, + ipaddress => ntoa(PeerHost), + sockport => SockPort, + protocol => Protocol, + proto_name => ProtoName, + proto_ver => ProtoVer, + connected_at => ConnectedAt, + ts => erlang:system_time(millisecond) + }. + +ntoa(undefined) -> undefined; +ntoa({IpAddr, Port}) -> + iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); +ntoa(IpAddr) -> + iolist_to_binary(inet:ntoa(IpAddr)). + +event_topic(Event, #{clientid := ClientId, protocol := mqtt}) -> + iolist_to_binary( + [systop("clients"), "/", ClientId, "/", atom_to_binary(Event)]); +event_topic(Event, #{clientid := ClientId, protocol := GwName}) -> + iolist_to_binary( + [systop("gateway"), "/", atom_to_binary(GwName), + "/clients/", ClientId, "/", atom_to_binary(Event)]). diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index a9ee5645f..46ef7cba8 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -45,9 +45,7 @@ t_fill_default_values(_) -> <<"route_batch_clean">> := false, <<"session_locking_strategy">> := quorum, <<"shared_dispatch_ack_enabled">> := false, - <<"shared_subscription_strategy">> := round_robin, - <<"sys_heartbeat_interval">> := <<"30s">>, - <<"sys_msg_interval">> := <<"1m">>}}, WithDefaults), + <<"shared_subscription_strategy">> := round_robin}}, WithDefaults), %% ensure JSON compatible _ = emqx_json:encode(WithDefaults), ok. diff --git a/apps/emqx/test/emqx_sys_SUITE.erl b/apps/emqx/test/emqx_sys_SUITE.erl index 094778c0e..6af42ca85 100644 --- a/apps/emqx/test/emqx_sys_SUITE.erl +++ b/apps/emqx/test/emqx_sys_SUITE.erl @@ -32,21 +32,3 @@ end_per_suite(_Config) -> application:unload(emqx), ok = emqx_logger:set_log_level(error), ok. - -% t_version(_) -> -% error('TODO'). - -% t_sysdescr(_) -> -% error('TODO'). - -% t_datetime(_) -> -% error('TODO'). - -% t_sys_interval(_) -> -% error('TODO'). - -% t_sys_heatbeat_interval(_) -> -% error('TODO'). - -% t_info(_) -> -% error('TODO'). diff --git a/apps/emqx/test/props/prop_emqx_sys.erl b/apps/emqx/test/props/prop_emqx_sys.erl index 780992951..c65383d81 100644 --- a/apps/emqx/test/props/prop_emqx_sys.erl +++ b/apps/emqx/test/props/prop_emqx_sys.erl @@ -30,6 +30,7 @@ , emqx_stats , emqx_broker , mria_mnesia + , emqx_hooks ]). -define(ALL(Vars, Types, Exprs), @@ -59,8 +60,11 @@ prop_sys() -> do_setup() -> ok = emqx_logger:set_log_level(emergency), - emqx_config:put([broker, sys_msg_interval], 60000), - emqx_config:put([broker, sys_heartbeat_interval], 30000), + emqx_config:put([sys_topics, sys_msg_interval], 60000), + emqx_config:put([sys_topics, sys_heartbeat_interval], 30000), + emqx_config:put([sys_topics, sys_event_messages], + #{client_connected => true, client_disconnected => true, + client_subscribed => true, client_unsubscribed => true}), [mock(Mod) || Mod <- ?mock_modules], ok. @@ -83,7 +87,10 @@ do_mock(emqx_stats) -> do_mock(mria_mnesia) -> meck:expect(mria_mnesia, running_nodes, fun() -> [node()] end); do_mock(emqx_metrics) -> - meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end). + meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end); +do_mock(emqx_hooks) -> + meck:expect(emqx_hooks, put, fun(_HookPoint, _MFA) -> ok end), + meck:expect(emqx_hooks, del, fun(_HookPoint, _MF) -> ok end). %%-------------------------------------------------------------------- %% MODEL diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index 26fe652d9..ed3e529f6 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -90,12 +90,6 @@ end_per_testcase(_, Config) -> %% Test cases %%-------------------------------------------------------------------- -t_noserver_nohook(_) -> - emqx_exhook_mgr:disable(<<"default">>), - ?assertEqual([], ets:tab2list(emqx_hooks)), - {ok, _} = emqx_exhook_mgr:enable(<<"default">>), - ?assertNotEqual([], ets:tab2list(emqx_hooks)). - t_access_failed_if_no_server_running(_) -> emqx_exhook_mgr:disable(<<"default">>), ClientInfo = #{clientid => <<"user-id-1">>, @@ -152,7 +146,9 @@ t_error_update_conf(_) -> ErrorAnd = #{<<"name">> => Name, <<"url">> => <<"http://127.0.0.1:9001">>}, {ok, _} = emqx_exhook_mgr:update_config(Path, {add, ErrorAnd}), - DisableAnd = #{<<"name">> => Name, <<"url">> => <<"http://127.0.0.1:9001">>, <<"enable">> => false}, + DisableAnd = #{<<"name">> => Name, + <<"url">> => <<"http://127.0.0.1:9001">>, + <<"enable">> => false}, {ok, _} = emqx_exhook_mgr:update_config(Path, {add, DisableAnd}), {ok, _} = emqx_exhook_mgr:update_config(Path, {delete, <<"error">>}),