Merge pull request #7205 from HJianBo/redesign-event-msg

Refactor `event_messages` with `sys_topics`
This commit is contained in:
JianBo He 2022-03-09 21:58:27 +08:00 committed by GitHub
commit 98d2f80506
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 249 additions and 67 deletions

View File

@ -1009,22 +1009,6 @@ zones.default {
## Broker
##==================================================================
broker {
## System interval of publishing $SYS messages.
##
## @doc broker.sys_msg_interval
## ValueType: Duration | disabled
## Default: 1m
sys_msg_interval = 1m
## System heartbeat interval of publishing following heart beat message:
## - "$SYS/brokers/<node>/uptime"
## - "$SYS/brokers/<node>/datetime"
##
## @doc broker.sys_heartbeat_interval
## ValueType: Duration
## Default: 30s | disabled
sys_heartbeat_interval = 30s
## Session locking strategy in a cluster.
##
## @doc broker.session_locking_strategy
@ -1095,6 +1079,51 @@ broker {
perf.trie_compaction = true
}
##==================================================================
## System Topic
##==================================================================
sys_topics {
## System interval of publishing $SYS messages.
##
## @doc broker.sys_msg_interval
## ValueType: Duration | disabled
## Default: 1m
sys_msg_interval = 1m
## System heartbeat interval of publishing following heart beat message:
## - "$SYS/brokers/<node>/uptime"
## - "$SYS/brokers/<node>/datetime"
##
## @doc broker.sys_heartbeat_interval
## ValueType: Duration
## Default: 30s | disabled
sys_heartbeat_interval = 30s
## Whether to enable Client lifecycle event messages publish.
## The following options are not only for enabling MQTT client event messages
## publish but also for Gateway clients. However, these kinds of clients type
## are distinguished by the Topic prefix:
## - For the MQTT client, its event topic format is:
## $SYS/broker/<node>/clients/<clientid>/<event>
## - For the Gateway client, it is
## $SYS/broker/<node>/gateway/<gateway-name>/clients/<clientid>/<event>
sys_event_messages {
## Enable to publish client connected event messages.
## - Topic: "$SYS/broker/<node>/clients/<clientid>/connected"
client_connected = true
## Enable to publish client disconnected event messages.
## - Topic: "$SYS/broker/<node>/clients/<clientid>/disconnected"
client_disconnected = true
## Enable to publish event message that client subscribed a topic successfully.
## - Topic: "$SYS/broker/<node>/clients/<clientid>/subscribed"
client_subscribed = false
## Enable to publish event message that client unsubscribed a topic successfully.
## - Topic: "$SYS/broker/<node>/clients/<clientid>/unsubscribed"
client_unsubscribed = false
}
}
##==================================================================
## System Monitor
##==================================================================

View File

@ -144,6 +144,9 @@ roots(medium) ->
[ {"broker",
sc(ref("broker"),
#{})}
, {"sys_topics",
sc(ref("sys_topics"),
#{})}
, {"rate_limit",
sc(ref("rate_limit"),
#{})}
@ -857,17 +860,7 @@ fields("deflate_opts") ->
];
fields("broker") ->
[ {"sys_msg_interval",
sc(hoconsc:union([disabled, duration()]),
#{ default => "1m"
})
}
, {"sys_heartbeat_interval",
sc(hoconsc:union([disabled, duration()]),
#{ default => "30s"
})
}
, {"enable_session_registry",
[ {"enable_session_registry",
sc(boolean(),
#{ default => true
})
@ -909,6 +902,55 @@ fields("broker_perf") ->
})}
];
fields("sys_topics") ->
[ {"sys_msg_interval",
sc(hoconsc:union([disabled, duration()]),
#{ default => "1m"
})
}
, {"sys_heartbeat_interval",
sc(hoconsc:union([disabled, duration()]),
#{ default => "30s"
})
}
, {"sys_event_messages",
sc(ref("event_names"),
#{ desc =>
"""Whether to enable Client lifecycle event messages publish.<br/>
The following options are not only for enabling MQTT client event messages
publish but also for Gateway clients. However, these kinds of clients type
are distinguished by the Topic prefix:
- For the MQTT client, its event topic format is:<br/>
<code>$SYS/broker/<node>/clients/<clientid>/<event></code><br/>
- For the Gateway client, it is
<code>$SYS/broker/<node>/gateway/<gateway-name>/clients/<clientid>/<event></code>"""
})
}
];
fields("event_names") ->
[ {"client_connected",
sc(boolean(),
#{default => true
})
}
, {"client_disconnected",
sc(boolean(),
#{default => true
})
}
, {"client_subscribed",
sc(boolean(),
#{default => false
})
}
, {"client_unsubscribed",
sc(boolean(),
#{default => false
})
}
];
fields("sysmon") ->
[ {"vm",
sc(ref("sysmon_vm"),

View File

@ -43,6 +43,12 @@
, terminate/2
]).
-export([ on_client_connected/2
, on_client_disconnected/3
, on_client_subscribed/3
, on_client_unsubscribed/3
]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
@ -67,9 +73,9 @@
, sysdescr % Broker description
]).
%%------------------------------------------------------------------------------
%%--------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
%%--------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() ->
@ -101,10 +107,13 @@ datetime() ->
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
sys_interval() ->
emqx:get_config([broker, sys_msg_interval]).
emqx:get_config([sys_topics, sys_msg_interval]).
sys_heatbeat_interval() ->
emqx:get_config([broker, sys_heartbeat_interval]).
emqx:get_config([sys_topics, sys_heartbeat_interval]).
sys_event_messages() ->
emqx:get_config([sys_topics, sys_event_messages]).
%% @doc Get sys info
-spec(info() -> list(tuple())).
@ -114,12 +123,13 @@ info() ->
{uptime, uptime()},
{datetime, datetime()}].
%%------------------------------------------------------------------------------
%%--------------------------------------------------------------------
%% gen_server callbacks
%%------------------------------------------------------------------------------
%%--------------------------------------------------------------------
init([]) ->
State = #state{sysdescr = iolist_to_binary(sysdescr())},
load_event_hooks(),
{ok, heartbeat(tick(State))}.
heartbeat(State) ->
@ -127,6 +137,14 @@ heartbeat(State) ->
tick(State) ->
State#state{ticker = start_timer(sys_interval(), tick)}.
load_event_hooks() ->
lists:foreach(
fun({_, false}) -> ok;
({K, true}) ->
{HookPoint, Fun} = hook_and_fun(K),
emqx_hooks:put(HookPoint, {?MODULE, Fun, []})
end, maps:to_list(sys_event_messages())).
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}.
@ -153,11 +171,81 @@ handle_info(Info, State) ->
{noreply, State}.
terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) ->
unload_event_hooks(),
lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]).
%%-----------------------------------------------------------------------------
unload_event_hooks() ->
lists:foreach(fun({K, _}) ->
{HookPoint, Fun} = hook_and_fun(K),
emqx_hooks:del(HookPoint, {?MODULE, Fun})
end, maps:to_list(sys_event_messages())).
%%--------------------------------------------------------------------
%% hook callbacks
%%--------------------------------------------------------------------
on_client_connected(ClientInfo, ConnInfo) ->
Payload0 = common_infos(ClientInfo, ConnInfo),
Payload = Payload0#{
keepalive => maps:get(keepalive, ConnInfo, 0),
clean_start => maps:get(clean_start, ConnInfo, true),
expiry_interval => maps:get(expiry_interval, ConnInfo, 0)
},
publish(connected, Payload).
on_client_disconnected(ClientInfo, Reason,
ConnInfo = #{disconnected_at := DisconnectedAt}) ->
Payload0 = common_infos(ClientInfo, ConnInfo),
Payload = Payload0#{
reason => reason(Reason),
disconnected_at => DisconnectedAt
},
publish(disconnected, Payload).
-compile({inline, [reason/1]}).
reason(Reason) when is_atom(Reason) -> Reason;
reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
reason({Error, _}) when is_atom(Error) -> Error;
reason(_) -> internal_error.
on_client_subscribed(_ClientInfo = #{clientid := ClientId,
username := Username,
protocol := Protocol},
Topic, SubOpts) ->
Payload = #{clientid => ClientId,
username => Username,
protocol => Protocol,
topic => Topic,
subopts => SubOpts,
ts => erlang:system_time(millisecond)
},
publish(subscribed, Payload).
on_client_unsubscribed(_ClientInfo = #{clientid := ClientId,
username := Username,
protocol := Protocol},
Topic, _SubOpts) ->
Payload = #{clientid => ClientId,
username => Username,
protocol => Protocol,
topic => Topic,
ts => erlang:system_time(millisecond)
},
publish(unsubscribed, Payload).
%%--------------------------------------------------------------------
%% Internal functions
%%-----------------------------------------------------------------------------
%%--------------------------------------------------------------------
hook_and_fun(client_connected) ->
{'client.connected', on_client_connected};
hook_and_fun(client_disconnected) ->
{'client.disconnected', on_client_disconnected};
hook_and_fun(client_subscribed) ->
{'session.subscribed', on_client_subscribed};
hook_and_fun(client_unsubscribed) ->
{'session.unsubscribed', on_client_unsubscribed}.
publish_any(Name, Value) ->
_ = publish(Name, Value),
@ -179,7 +267,11 @@ publish(stats, Stats) ->
|| {Stat, Val} <- Stats, is_atom(Stat), is_integer(Val)];
publish(metrics, Metrics) ->
[safe_publish(systop(metric_topic(Name)), integer_to_binary(Val))
|| {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)].
|| {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)];
publish(Event, Payload) when Event == connected; Event == disconnected;
Event == subscribed; Event == unsubscribed ->
Topic = event_topic(Event, Payload),
safe_publish(Topic, emqx_json:encode(Payload)).
metric_topic(Name) ->
lists:concat(["metrics/", string:replace(atom_to_list(Name), ".", "/", all)]).
@ -191,3 +283,39 @@ safe_publish(Topic, Flags, Payload) ->
emqx_message:set_flags(
maps:merge(#{sys => true}, Flags),
emqx_message:make(?SYS, Topic, iolist_to_binary(Payload)))).
common_infos(
_ClientInfo = #{clientid := ClientId,
username := Username,
peerhost := PeerHost,
sockport := SockPort,
protocol := Protocol
},
_ConnInfo = #{proto_name := ProtoName,
proto_ver := ProtoVer,
connected_at := ConnectedAt
}) ->
#{clientid => ClientId,
username => Username,
ipaddress => ntoa(PeerHost),
sockport => SockPort,
protocol => Protocol,
proto_name => ProtoName,
proto_ver => ProtoVer,
connected_at => ConnectedAt,
ts => erlang:system_time(millisecond)
}.
ntoa(undefined) -> undefined;
ntoa({IpAddr, Port}) ->
iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
ntoa(IpAddr) ->
iolist_to_binary(inet:ntoa(IpAddr)).
event_topic(Event, #{clientid := ClientId, protocol := mqtt}) ->
iolist_to_binary(
[systop("clients"), "/", ClientId, "/", atom_to_binary(Event)]);
event_topic(Event, #{clientid := ClientId, protocol := GwName}) ->
iolist_to_binary(
[systop("gateway"), "/", atom_to_binary(GwName),
"/clients/", ClientId, "/", atom_to_binary(Event)]).

View File

@ -45,9 +45,7 @@ t_fill_default_values(_) ->
<<"route_batch_clean">> := false,
<<"session_locking_strategy">> := quorum,
<<"shared_dispatch_ack_enabled">> := false,
<<"shared_subscription_strategy">> := round_robin,
<<"sys_heartbeat_interval">> := <<"30s">>,
<<"sys_msg_interval">> := <<"1m">>}}, WithDefaults),
<<"shared_subscription_strategy">> := round_robin}}, WithDefaults),
%% ensure JSON compatible
_ = emqx_json:encode(WithDefaults),
ok.

View File

@ -32,21 +32,3 @@ end_per_suite(_Config) ->
application:unload(emqx),
ok = emqx_logger:set_log_level(error),
ok.
% t_version(_) ->
% error('TODO').
% t_sysdescr(_) ->
% error('TODO').
% t_datetime(_) ->
% error('TODO').
% t_sys_interval(_) ->
% error('TODO').
% t_sys_heatbeat_interval(_) ->
% error('TODO').
% t_info(_) ->
% error('TODO').

View File

@ -30,6 +30,7 @@
, emqx_stats
, emqx_broker
, mria_mnesia
, emqx_hooks
]).
-define(ALL(Vars, Types, Exprs),
@ -59,8 +60,11 @@ prop_sys() ->
do_setup() ->
ok = emqx_logger:set_log_level(emergency),
emqx_config:put([broker, sys_msg_interval], 60000),
emqx_config:put([broker, sys_heartbeat_interval], 30000),
emqx_config:put([sys_topics, sys_msg_interval], 60000),
emqx_config:put([sys_topics, sys_heartbeat_interval], 30000),
emqx_config:put([sys_topics, sys_event_messages],
#{client_connected => true, client_disconnected => true,
client_subscribed => true, client_unsubscribed => true}),
[mock(Mod) || Mod <- ?mock_modules],
ok.
@ -83,7 +87,10 @@ do_mock(emqx_stats) ->
do_mock(mria_mnesia) ->
meck:expect(mria_mnesia, running_nodes, fun() -> [node()] end);
do_mock(emqx_metrics) ->
meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end).
meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end);
do_mock(emqx_hooks) ->
meck:expect(emqx_hooks, put, fun(_HookPoint, _MFA) -> ok end),
meck:expect(emqx_hooks, del, fun(_HookPoint, _MF) -> ok end).
%%--------------------------------------------------------------------
%% MODEL

View File

@ -90,12 +90,6 @@ end_per_testcase(_, Config) ->
%% Test cases
%%--------------------------------------------------------------------
t_noserver_nohook(_) ->
emqx_exhook_mgr:disable(<<"default">>),
?assertEqual([], ets:tab2list(emqx_hooks)),
{ok, _} = emqx_exhook_mgr:enable(<<"default">>),
?assertNotEqual([], ets:tab2list(emqx_hooks)).
t_access_failed_if_no_server_running(_) ->
emqx_exhook_mgr:disable(<<"default">>),
ClientInfo = #{clientid => <<"user-id-1">>,
@ -152,7 +146,9 @@ t_error_update_conf(_) ->
ErrorAnd = #{<<"name">> => Name, <<"url">> => <<"http://127.0.0.1:9001">>},
{ok, _} = emqx_exhook_mgr:update_config(Path, {add, ErrorAnd}),
DisableAnd = #{<<"name">> => Name, <<"url">> => <<"http://127.0.0.1:9001">>, <<"enable">> => false},
DisableAnd = #{<<"name">> => Name,
<<"url">> => <<"http://127.0.0.1:9001">>,
<<"enable">> => false},
{ok, _} = emqx_exhook_mgr:update_config(Path, {add, DisableAnd}),
{ok, _} = emqx_exhook_mgr:update_config(Path, {delete, <<"error">>}),