Merge pull request #5409 from zmstone/refactor-listener-impl

Refactor: listeners listing & quic listener start
This commit is contained in:
Zaiming (Stone) Shi 2021-08-05 14:10:12 +02:00 committed by GitHub
commit 70e49ab629
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 50 additions and 38 deletions

View File

@ -18,7 +18,7 @@ IsQuicSupp = fun() ->
end, end,
Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}}, Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}},
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {branch, "main"}}}, Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {branch, "0.0.7"}}},
ExtraDeps = fun(C) -> ExtraDeps = fun(C) ->
{deps, Deps0} = lists:keyfind(deps, 1, C), {deps, Deps0} = lists:keyfind(deps, 1, C),

View File

@ -96,14 +96,23 @@ maybe_start_listeners() ->
end. end.
maybe_start_quicer() -> maybe_start_quicer() ->
case os:getenv("EMQX_NO_QUIC") of case is_quicer_app_present() andalso is_quic_listener_configured() of
X when X =:= "1" orelse X =:= "true" -> true -> {ok, _} = application:ensure_all_started(quicer), ok;
ok; false -> ok
_ ->
{ok, _} = application:ensure_all_started(quicer),
ok
end. end.
is_quicer_app_present() ->
case application:load(quicer) of
ok -> true;
{error, {already_loaded, _}} -> true;
_ ->
?SLOG(info, #{msg => "quicer_app_not_found"}),
false
end.
is_quic_listener_configured() ->
emqx_listeners:has_enabled_listener_conf_by_type(quic).
get_description() -> get_description() ->
{ok, Descr0} = application:get_key(?APP, description), {ok, Descr0} = application:get_key(?APP, description),
case os:getenv("EMQX_DESCRIPTION") of case os:getenv("EMQX_DESCRIPTION") of

View File

@ -34,10 +34,15 @@
, stop_listener/3 , stop_listener/3
, restart_listener/1 , restart_listener/1
, restart_listener/3 , restart_listener/3
, has_enabled_listener_conf_by_type/1
]). ]).
%% @doc List configured listeners.
-spec(list() -> [{ListenerId :: atom(), ListenerConf :: map()}]). -spec(list() -> [{ListenerId :: atom(), ListenerConf :: map()}]).
list() -> list() ->
[{listener_id(ZoneName, LName), LConf} || {ZoneName, LName, LConf} <- do_list()].
do_list() ->
Zones = maps:to_list(emqx_config:get([zones], #{})), Zones = maps:to_list(emqx_config:get([zones], #{})),
lists:append([list(ZoneName, ZoneConf) || {ZoneName, ZoneConf} <- Zones]). lists:append([list(ZoneName, ZoneConf) || {ZoneName, ZoneConf} <- Zones]).
@ -45,26 +50,19 @@ list(ZoneName, ZoneConf) ->
Listeners = maps:to_list(maps:get(listeners, ZoneConf, #{})), Listeners = maps:to_list(maps:get(listeners, ZoneConf, #{})),
[ [
begin begin
ListenerId = listener_id(ZoneName, LName),
Running = is_running(ListenerId),
Conf = merge_zone_and_listener_confs(ZoneConf, LConf), Conf = merge_zone_and_listener_confs(ZoneConf, LConf),
{ListenerId, maps:put(running, Running, Conf)} Running = is_running(listener_id(ZoneName, LName), Conf),
{ZoneName , LName, maps:put(running, Running, Conf)}
end end
|| {LName, LConf} <- Listeners]. || {LName, LConf} <- Listeners, is_map(LConf)].
-spec is_running(ListenerId :: atom()) -> boolean() | {error, no_found}. -spec is_running(ListenerId :: atom()) -> boolean() | {error, no_found}.
is_running(ListenerId) -> is_running(ListenerId) ->
Zones = maps:to_list(emqx_config:get([zones], #{})), case lists:filtermap(fun({_Zone, Id, #{running := IsRunning}}) ->
Listeners = lists:append( Id =:= ListenerId andalso {true, IsRunning}
[ end, do_list()) of
[{listener_id(ZoneName, LName),merge_zone_and_listener_confs(ZoneConf, LConf)} [IsRunning] -> IsRunning;
|| {LName, LConf} <- maps:to_list(maps:get(listeners, ZoneConf, #{}))] [] -> {error, not_found}
|| {ZoneName, ZoneConf} <- Zones]),
case proplists:get_value(ListenerId, Listeners, undefined) of
undefined ->
{error, no_found};
Conf ->
is_running(ListenerId, Conf)
end. end.
is_running(ListenerId, #{type := tcp, bind := ListenOn})-> is_running(ListenerId, #{type := tcp, bind := ListenOn})->
@ -271,9 +269,11 @@ listener_id(ZoneName, ListenerName) ->
list_to_atom(lists:append([atom_to_list(ZoneName), ":", atom_to_list(ListenerName)])). list_to_atom(lists:append([atom_to_list(ZoneName), ":", atom_to_list(ListenerName)])).
decode_listener_id(Id) -> decode_listener_id(Id) ->
case string:split(atom_to_list(Id), ":", leading) of try
[Zone, Listen] -> {list_to_atom(Zone), list_to_atom(Listen)}; [Zone, Listen] = string:split(atom_to_list(Id), ":", leading),
_ -> error({invalid_listener_id, Id}) {list_to_existing_atom(Zone), list_to_existing_atom(Listen)}
catch
_ : _ -> error({invalid_listener_id, Id})
end. end.
ssl_opts(Opts) -> ssl_opts(Opts) ->
@ -291,11 +291,17 @@ is_ssl(Opts) ->
emqx_map_lib:deep_get([ssl, enable], Opts, false). emqx_map_lib:deep_get([ssl, enable], Opts, false).
foreach_listeners(Do) -> foreach_listeners(Do) ->
lists:foreach(fun({ZoneName, ZoneConf}) -> lists:foreach(
lists:foreach(fun({LName, LConf}) -> fun({ZoneName, LName, LConf}) ->
Do(ZoneName, LName, merge_zone_and_listener_confs(ZoneConf, LConf)) Do(ZoneName, LName, LConf)
end, maps:to_list(maps:get(listeners, ZoneConf, #{}))) end, do_list()).
end, maps:to_list(emqx_config:get([zones], #{}))).
has_enabled_listener_conf_by_type(Type) ->
lists:any(
fun({_Zone, _LName, LConf}) when is_map(LConf) ->
Type =:= maps:get(type, LConf) andalso
maps:get(enabled, LConf, true)
end, do_list()).
%% merge the configs in zone and listeners in a manner that %% merge the configs in zone and listeners in a manner that
%% all config entries in the listener are prior to the ones in the zone. %% all config entries in the listener are prior to the ones in the zone.

View File

@ -192,7 +192,8 @@ fields("force_gc") ->
fields("listeners") -> fields("listeners") ->
[ {"$name", hoconsc:union( [ {"$name", hoconsc:union(
[ hoconsc:ref("mqtt_tcp_listener") [ disabled
, hoconsc:ref("mqtt_tcp_listener")
, hoconsc:ref("mqtt_ws_listener") , hoconsc:ref("mqtt_ws_listener")
, hoconsc:ref("mqtt_quic_listener") , hoconsc:ref("mqtt_quic_listener")
])} ])}

View File

@ -267,7 +267,7 @@ message BrokerInfo {
string sysdescr = 2; string sysdescr = 2;
string uptime = 3; int64 uptime = 3;
string datetime = 4; string datetime = 4;
} }

View File

@ -127,15 +127,11 @@ start_one_app(App) ->
%% 1. due to static static config change %% 1. due to static static config change
%% 2. after join a cluster %% 2. after join a cluster
reboot_apps() -> reboot_apps() ->
[gproc, esockd, ranch, cowboy, ekka, quicer, emqx | ?EMQX_DEP_APPS]. [gproc, esockd, ranch, cowboy, ekka, emqx | ?EMQX_DEP_APPS].
%% quicer can not be added to emqx's .app because it might be opted out at build time
implicit_deps() ->
[{emqx, [quicer]}].
sorted_reboot_apps() -> sorted_reboot_apps() ->
Apps = [{App, app_deps(App)} || App <- reboot_apps()], Apps = [{App, app_deps(App)} || App <- reboot_apps()],
sorted_reboot_apps(Apps ++ implicit_deps()). sorted_reboot_apps(Apps).
app_deps(App) -> app_deps(App) ->
case application:get_key(App, applications) of case application:get_key(App, applications) of