refactor(config): rework - config struct for zones and listeners
``` listeners.tcp.default { bind = "0.0.0.0:1883" acceptors = 16 max_connections = 1024000 access_rules = [ "allow all" ] proxy_protocol = false proxy_protocol_timeout = 3s mountpoint = "" tcp.backlog = 1024 tcp.buffer = 4KB } listeners.ssl.default { bind = "0.0.0.0:8883" acceptors = 16 max_connections = 512000 access_rules = [ "allow all" ] proxy_protocol = false proxy_protocol_timeout = 3s mountpoint = "" ssl.versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"] ssl.keyfile = "etc/certs/key.pem" ssl.certfile = "etc/certs/cert.pem" ssl.cacertfile = "etc/certs/cacert.pem" tcp.backlog = 1024 tcp.buffer = 4KB } listeners.quic.default { bind = "0.0.0.0:14567" acceptors = 16 max_connections = 1024000 keyfile = "etc/certs/key.pem" certfile = "etc/certs/cert.pem" mountpoint = "" } listeners.ws.default { bind = "0.0.0.0:8083" acceptors = 16 max_connections = 1024000 access_rules = [ "allow all" ] proxy_protocol = false proxy_protocol_timeout = 3s mountpoint = "" tcp.backlog = 1024 tcp.buffer = 4KB websocket.idle_timeout = 86400s } listeners.wss.default { bind = "0.0.0.0:8084" acceptors = 16 max_connections = 512000 access_rules = [ "allow all" ] proxy_protocol = false proxy_protocol_timeout = 3s mountpoint = "" ssl.keyfile = "etc/certs/key.pem" ssl.certfile = "etc/certs/cert.pem" ssl.cacertfile = "etc/certs/cacert.pem" tcp.backlog = 1024 tcp.buffer = 4KB websocket.idle_timeout = 86400s } ``` ``` zones.default { } ```
This commit is contained in:
parent
4be58ae759
commit
c5f0091b5d
File diff suppressed because it is too large
Load Diff
|
@ -58,7 +58,6 @@
|
||||||
-export([ get_zone_conf/2
|
-export([ get_zone_conf/2
|
||||||
, get_zone_conf/3
|
, get_zone_conf/3
|
||||||
, put_zone_conf/3
|
, put_zone_conf/3
|
||||||
, find_zone_conf/2
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ get_listener_conf/3
|
-export([ get_listener_conf/3
|
||||||
|
@ -72,7 +71,7 @@
|
||||||
-define(PERSIS_SCHEMA_MODS, {?MODULE, schema_mods}).
|
-define(PERSIS_SCHEMA_MODS, {?MODULE, schema_mods}).
|
||||||
-define(PERSIS_KEY(TYPE, ROOT), {?MODULE, TYPE, ROOT}).
|
-define(PERSIS_KEY(TYPE, ROOT), {?MODULE, TYPE, ROOT}).
|
||||||
-define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]).
|
-define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]).
|
||||||
-define(LISTENER_CONF_PATH(ZONE, LISTENER, PATH), [zones, ZONE, listeners, LISTENER | PATH]).
|
-define(LISTENER_CONF_PATH(TYPE, LISTENER, PATH), [listeners, TYPE, LISTENER | PATH]).
|
||||||
|
|
||||||
-define(ATOM_CONF_PATH(PATH, EXP, EXP_ON_FAIL),
|
-define(ATOM_CONF_PATH(PATH, EXP, EXP_ON_FAIL),
|
||||||
try [atom(Key) || Key <- PATH] of
|
try [atom(Key) || Key <- PATH] of
|
||||||
|
@ -151,37 +150,40 @@ find_raw(KeyPath) ->
|
||||||
|
|
||||||
-spec get_zone_conf(atom(), emqx_map_lib:config_key_path()) -> term().
|
-spec get_zone_conf(atom(), emqx_map_lib:config_key_path()) -> term().
|
||||||
get_zone_conf(Zone, KeyPath) ->
|
get_zone_conf(Zone, KeyPath) ->
|
||||||
?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath)).
|
case find(?ZONE_CONF_PATH(Zone, KeyPath)) of
|
||||||
|
{not_found, _, _} -> %% not found in zones, try to find the global config
|
||||||
|
?MODULE:get(KeyPath);
|
||||||
|
{ok, Value} -> Value
|
||||||
|
end.
|
||||||
|
|
||||||
-spec get_zone_conf(atom(), emqx_map_lib:config_key_path(), term()) -> term().
|
-spec get_zone_conf(atom(), emqx_map_lib:config_key_path(), term()) -> term().
|
||||||
get_zone_conf(Zone, KeyPath, Default) ->
|
get_zone_conf(Zone, KeyPath, Default) ->
|
||||||
?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath), Default).
|
case find(?ZONE_CONF_PATH(Zone, KeyPath)) of
|
||||||
|
{not_found, _, _} -> %% not found in zones, try to find the global config
|
||||||
|
?MODULE:get(KeyPath, Default);
|
||||||
|
{ok, Value} -> Value
|
||||||
|
end.
|
||||||
|
|
||||||
-spec put_zone_conf(atom(), emqx_map_lib:config_key_path(), term()) -> ok.
|
-spec put_zone_conf(atom(), emqx_map_lib:config_key_path(), term()) -> ok.
|
||||||
put_zone_conf(Zone, KeyPath, Conf) ->
|
put_zone_conf(Zone, KeyPath, Conf) ->
|
||||||
?MODULE:put(?ZONE_CONF_PATH(Zone, KeyPath), Conf).
|
?MODULE:put(?ZONE_CONF_PATH(Zone, KeyPath), Conf).
|
||||||
|
|
||||||
-spec find_zone_conf(atom(), emqx_map_lib:config_key_path()) ->
|
|
||||||
{ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}.
|
|
||||||
find_zone_conf(Zone, KeyPath) ->
|
|
||||||
find(?ZONE_CONF_PATH(Zone, KeyPath)).
|
|
||||||
|
|
||||||
-spec get_listener_conf(atom(), atom(), emqx_map_lib:config_key_path()) -> term().
|
-spec get_listener_conf(atom(), atom(), emqx_map_lib:config_key_path()) -> term().
|
||||||
get_listener_conf(Zone, Listener, KeyPath) ->
|
get_listener_conf(Type, Listener, KeyPath) ->
|
||||||
?MODULE:get(?LISTENER_CONF_PATH(Zone, Listener, KeyPath)).
|
?MODULE:get(?LISTENER_CONF_PATH(Type, Listener, KeyPath)).
|
||||||
|
|
||||||
-spec get_listener_conf(atom(), atom(), emqx_map_lib:config_key_path(), term()) -> term().
|
-spec get_listener_conf(atom(), atom(), emqx_map_lib:config_key_path(), term()) -> term().
|
||||||
get_listener_conf(Zone, Listener, KeyPath, Default) ->
|
get_listener_conf(Type, Listener, KeyPath, Default) ->
|
||||||
?MODULE:get(?LISTENER_CONF_PATH(Zone, Listener, KeyPath), Default).
|
?MODULE:get(?LISTENER_CONF_PATH(Type, Listener, KeyPath), Default).
|
||||||
|
|
||||||
-spec put_listener_conf(atom(), atom(), emqx_map_lib:config_key_path(), term()) -> ok.
|
-spec put_listener_conf(atom(), atom(), emqx_map_lib:config_key_path(), term()) -> ok.
|
||||||
put_listener_conf(Zone, Listener, KeyPath, Conf) ->
|
put_listener_conf(Type, Listener, KeyPath, Conf) ->
|
||||||
?MODULE:put(?LISTENER_CONF_PATH(Zone, Listener, KeyPath), Conf).
|
?MODULE:put(?LISTENER_CONF_PATH(Type, Listener, KeyPath), Conf).
|
||||||
|
|
||||||
-spec find_listener_conf(atom(), atom(), emqx_map_lib:config_key_path()) ->
|
-spec find_listener_conf(atom(), atom(), emqx_map_lib:config_key_path()) ->
|
||||||
{ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}.
|
{ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}.
|
||||||
find_listener_conf(Zone, Listener, KeyPath) ->
|
find_listener_conf(Type, Listener, KeyPath) ->
|
||||||
find(?LISTENER_CONF_PATH(Zone, Listener, KeyPath)).
|
find(?LISTENER_CONF_PATH(Type, Listener, KeyPath)).
|
||||||
|
|
||||||
-spec put(map()) -> ok.
|
-spec put(map()) -> ok.
|
||||||
put(Config) ->
|
put(Config) ->
|
||||||
|
|
|
@ -43,18 +43,14 @@ list() ->
|
||||||
[{listener_id(ZoneName, LName), LConf} || {ZoneName, LName, LConf} <- do_list()].
|
[{listener_id(ZoneName, LName), LConf} || {ZoneName, LName, LConf} <- do_list()].
|
||||||
|
|
||||||
do_list() ->
|
do_list() ->
|
||||||
Zones = maps:to_list(emqx:get_config([zones], #{})),
|
Listeners = maps:to_list(emqx:get_config([listeners], #{})),
|
||||||
lists:append([list(ZoneName, ZoneConf) || {ZoneName, ZoneConf} <- Zones]).
|
lists:append([list(Type, maps:to_list(Conf)) || {Type, Conf} <- Listeners]).
|
||||||
|
|
||||||
list(ZoneName, ZoneConf) ->
|
list(Type, Conf) ->
|
||||||
Listeners = maps:to_list(maps:get(listeners, ZoneConf, #{})),
|
[begin
|
||||||
[
|
Running = is_running(Type, listener_id(Type, LName), LConf),
|
||||||
begin
|
{Type, LName, maps:put(running, Running, LConf)}
|
||||||
Conf = merge_zone_and_listener_confs(ZoneConf, LConf),
|
end || {LName, LConf} <- Conf, is_map(LConf)].
|
||||||
Running = is_running(listener_id(ZoneName, LName), Conf),
|
|
||||||
{ZoneName , LName, maps:put(running, Running, Conf)}
|
|
||||||
end
|
|
||||||
|| {LName, LConf} <- Listeners, is_map(LConf)].
|
|
||||||
|
|
||||||
-spec is_running(ListenerId :: atom()) -> boolean() | {error, no_found}.
|
-spec is_running(ListenerId :: atom()) -> boolean() | {error, no_found}.
|
||||||
is_running(ListenerId) ->
|
is_running(ListenerId) ->
|
||||||
|
@ -65,7 +61,7 @@ is_running(ListenerId) ->
|
||||||
[] -> {error, not_found}
|
[] -> {error, not_found}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
is_running(ListenerId, #{type := tcp, bind := ListenOn})->
|
is_running(Type, ListenerId, #{bind := ListenOn}) when Type =:= tcp; Type =:= ssl ->
|
||||||
try esockd:listener({ListenerId, ListenOn}) of
|
try esockd:listener({ListenerId, ListenOn}) of
|
||||||
Pid when is_pid(Pid)->
|
Pid when is_pid(Pid)->
|
||||||
true
|
true
|
||||||
|
@ -73,7 +69,7 @@ is_running(ListenerId, #{type := tcp, bind := ListenOn})->
|
||||||
false
|
false
|
||||||
end;
|
end;
|
||||||
|
|
||||||
is_running(ListenerId, #{type := ws})->
|
is_running(Type, ListenerId, _Conf) when Type =:= ws; Type =:= wss ->
|
||||||
try
|
try
|
||||||
Info = ranch:info(ListenerId),
|
Info = ranch:info(ListenerId),
|
||||||
proplists:get_value(status, Info) =:= running
|
proplists:get_value(status, Info) =:= running
|
||||||
|
@ -81,7 +77,7 @@ is_running(ListenerId, #{type := ws})->
|
||||||
false
|
false
|
||||||
end;
|
end;
|
||||||
|
|
||||||
is_running(_ListenerId, #{type := quic})->
|
is_running(quic, _ListenerId, _Conf)->
|
||||||
%% TODO: quic support
|
%% TODO: quic support
|
||||||
{error, no_found}.
|
{error, no_found}.
|
||||||
|
|
||||||
|
@ -95,23 +91,56 @@ start_listener(ListenerId) ->
|
||||||
apply_on_listener(ListenerId, fun start_listener/3).
|
apply_on_listener(ListenerId, fun start_listener/3).
|
||||||
|
|
||||||
-spec start_listener(atom(), atom(), map()) -> ok | {error, term()}.
|
-spec start_listener(atom(), atom(), map()) -> ok | {error, term()}.
|
||||||
start_listener(ZoneName, ListenerName, #{type := Type, bind := Bind} = Conf) ->
|
start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
||||||
case do_start_listener(ZoneName, ListenerName, Conf) of
|
case do_start_listener(Type, ListenerName, Conf) of
|
||||||
{ok, {skipped, Reason}} when Reason =:= listener_disabled;
|
{ok, {skipped, Reason}} when Reason =:= listener_disabled;
|
||||||
Reason =:= quic_app_missing ->
|
Reason =:= quic_app_missing ->
|
||||||
console_print("- Skip - starting ~s listener ~s on ~s ~n due to ~p",
|
console_print("- Skip - starting listener ~s on ~s ~n due to ~p",
|
||||||
[Type, listener_id(ZoneName, ListenerName), format(Bind), Reason]);
|
[listener_id(Type, ListenerName), format_addr(Bind), Reason]);
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
console_print("Start ~s listener ~s on ~s successfully.~n",
|
console_print("Start listener ~s on ~s successfully.~n",
|
||||||
[Type, listener_id(ZoneName, ListenerName), format(Bind)]);
|
[listener_id(Type, ListenerName), format_addr(Bind)]);
|
||||||
{error, {already_started, Pid}} ->
|
{error, {already_started, Pid}} ->
|
||||||
{error, {already_started, Pid}};
|
{error, {already_started, Pid}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?ELOG("Failed to start ~s listener ~s on ~s: ~0p~n",
|
?ELOG("Failed to start listener ~s on ~s: ~0p~n",
|
||||||
[Type, listener_id(ZoneName, ListenerName), format(Bind), Reason]),
|
[listener_id(Type, ListenerName), format_addr(Bind), Reason]),
|
||||||
error(Reason)
|
error(Reason)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc Restart all listeners
|
||||||
|
-spec(restart() -> ok).
|
||||||
|
restart() ->
|
||||||
|
foreach_listeners(fun restart_listener/3).
|
||||||
|
|
||||||
|
-spec(restart_listener(atom()) -> ok | {error, term()}).
|
||||||
|
restart_listener(ListenerId) ->
|
||||||
|
apply_on_listener(ListenerId, fun restart_listener/3).
|
||||||
|
|
||||||
|
-spec(restart_listener(atom(), atom(), map()) -> ok | {error, term()}).
|
||||||
|
restart_listener(Type, ListenerName, Conf) ->
|
||||||
|
case stop_listener(Type, ListenerName, Conf) of
|
||||||
|
ok -> start_listener(Type, ListenerName, Conf);
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Stop all listeners.
|
||||||
|
-spec(stop() -> ok).
|
||||||
|
stop() ->
|
||||||
|
foreach_listeners(fun stop_listener/3).
|
||||||
|
|
||||||
|
-spec(stop_listener(atom()) -> ok | {error, term()}).
|
||||||
|
stop_listener(ListenerId) ->
|
||||||
|
apply_on_listener(ListenerId, fun stop_listener/3).
|
||||||
|
|
||||||
|
-spec(stop_listener(atom(), atom(), map()) -> ok | {error, term()}).
|
||||||
|
stop_listener(Type, ListenerName, #{type := tcp, bind := ListenOn}) ->
|
||||||
|
esockd:close(listener_id(Type, ListenerName), ListenOn);
|
||||||
|
stop_listener(Type, ListenerName, #{type := ws}) ->
|
||||||
|
cowboy:stop_listener(listener_id(Type, ListenerName));
|
||||||
|
stop_listener(Type, ListenerName, #{type := quic}) ->
|
||||||
|
quicer:stop_listener(listener_id(Type, ListenerName)).
|
||||||
|
|
||||||
-ifndef(TEST).
|
-ifndef(TEST).
|
||||||
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
|
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
|
||||||
-else.
|
-else.
|
||||||
|
@ -121,27 +150,28 @@ console_print(_Fmt, _Args) -> ok.
|
||||||
%% Start MQTT/TCP listener
|
%% Start MQTT/TCP listener
|
||||||
-spec(do_start_listener(atom(), atom(), map())
|
-spec(do_start_listener(atom(), atom(), map())
|
||||||
-> {ok, pid() | {skipped, atom()}} | {error, term()}).
|
-> {ok, pid() | {skipped, atom()}} | {error, term()}).
|
||||||
do_start_listener(_ZoneName, _ListenerName, #{enabled := false}) ->
|
do_start_listener(_Type, _ListenerName, #{enabled := false}) ->
|
||||||
{ok, {skipped, listener_disabled}};
|
{ok, {skipped, listener_disabled}};
|
||||||
do_start_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn} = Opts) ->
|
do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts)
|
||||||
esockd:open(listener_id(ZoneName, ListenerName), ListenOn, merge_default(esockd_opts(Opts)),
|
when Type == tcp; Type == ssl ->
|
||||||
|
esockd:open(listener_id(Type, ListenerName), ListenOn, merge_default(esockd_opts(Type, Opts)),
|
||||||
{emqx_connection, start_link,
|
{emqx_connection, start_link,
|
||||||
[#{zone => ZoneName, listener => ListenerName}]});
|
[#{type => Type, listener => ListenerName,
|
||||||
|
zone => zone(Opts)}]});
|
||||||
|
|
||||||
%% Start MQTT/WS listener
|
%% Start MQTT/WS listener
|
||||||
do_start_listener(ZoneName, ListenerName, #{type := ws, bind := ListenOn} = Opts) ->
|
do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts)
|
||||||
Id = listener_id(ZoneName, ListenerName),
|
when Type == ws; Type == wss ->
|
||||||
RanchOpts = ranch_opts(ListenOn, Opts),
|
Id = listener_id(Type, ListenerName),
|
||||||
WsOpts = ws_opts(ZoneName, ListenerName, Opts),
|
RanchOpts = ranch_opts(Type, ListenOn, Opts),
|
||||||
case is_ssl(Opts) of
|
WsOpts = ws_opts(Type, ListenerName, Opts),
|
||||||
false ->
|
case Type of
|
||||||
cowboy:start_clear(Id, RanchOpts, WsOpts);
|
ws -> cowboy:start_clear(Id, RanchOpts, WsOpts);
|
||||||
true ->
|
wss -> cowboy:start_tls(Id, RanchOpts, WsOpts)
|
||||||
cowboy:start_tls(Id, RanchOpts, WsOpts)
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% Start MQTT/QUIC listener
|
%% Start MQTT/QUIC listener
|
||||||
do_start_listener(ZoneName, ListenerName, #{type := quic, bind := ListenOn} = Opts) ->
|
do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
|
||||||
case [ A || {quicer, _, _} = A<-application:which_applications() ] of
|
case [ A || {quicer, _, _} = A<-application:which_applications() ] of
|
||||||
[_] ->
|
[_] ->
|
||||||
%% @fixme unsure why we need reopen lib and reopen config.
|
%% @fixme unsure why we need reopen lib and reopen config.
|
||||||
|
@ -152,48 +182,48 @@ do_start_listener(ZoneName, ListenerName, #{type := quic, bind := ListenOn} = Op
|
||||||
, {key, maps:get(keyfile, Opts)}
|
, {key, maps:get(keyfile, Opts)}
|
||||||
, {alpn, ["mqtt"]}
|
, {alpn, ["mqtt"]}
|
||||||
, {conn_acceptors, maps:get(acceptors, Opts, DefAcceptors)}
|
, {conn_acceptors, maps:get(acceptors, Opts, DefAcceptors)}
|
||||||
, {idle_timeout_ms, emqx_config:get_zone_conf(ZoneName, [mqtt, idle_timeout])}
|
, {idle_timeout_ms, emqx_config:get_zone_conf(zone(Opts),
|
||||||
|
[mqtt, idle_timeout])}
|
||||||
],
|
],
|
||||||
ConnectionOpts = #{conn_callback => emqx_quic_connection
|
ConnectionOpts = #{conn_callback => emqx_quic_connection
|
||||||
, peer_unidi_stream_count => 1
|
, peer_unidi_stream_count => 1
|
||||||
, peer_bidi_stream_count => 10
|
, peer_bidi_stream_count => 10
|
||||||
, zone => ZoneName
|
, zone => zone(Opts)
|
||||||
|
, type => quic
|
||||||
, listener => ListenerName
|
, listener => ListenerName
|
||||||
},
|
},
|
||||||
StreamOpts = [],
|
StreamOpts = [],
|
||||||
quicer:start_listener(listener_id(ZoneName, ListenerName),
|
quicer:start_listener(listener_id(quic, ListenerName),
|
||||||
port(ListenOn), {ListenOpts, ConnectionOpts, StreamOpts});
|
port(ListenOn), {ListenOpts, ConnectionOpts, StreamOpts});
|
||||||
[] ->
|
[] ->
|
||||||
{ok, {skipped, quic_app_missing}}
|
{ok, {skipped, quic_app_missing}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
esockd_opts(Opts0) ->
|
esockd_opts(Type, Opts0) ->
|
||||||
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
|
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
|
||||||
Opts2 = case emqx_map_lib:deep_get([rate_limit, max_conn_rate], Opts0) of
|
Opts2 = case emqx_map_lib:deep_get([rate_limit, max_conn_rate], Opts0) of
|
||||||
infinity -> Opts1;
|
infinity -> Opts1;
|
||||||
Rate -> Opts1#{max_conn_rate => Rate}
|
Rate -> Opts1#{max_conn_rate => Rate}
|
||||||
end,
|
end,
|
||||||
Opts3 = Opts2#{access_rules => esockd_access_rules(maps:get(access_rules, Opts0, []))},
|
Opts3 = Opts2#{access_rules => esockd_access_rules(maps:get(access_rules, Opts0, []))},
|
||||||
maps:to_list(case is_ssl(Opts0) of
|
maps:to_list(case Type of
|
||||||
false ->
|
tcp -> Opts3#{tcp_options => tcp_opts(Opts0)};
|
||||||
Opts3#{tcp_options => tcp_opts(Opts0)};
|
ssl -> Opts3#{ssl_options => ssl_opts(Opts0), tcp_options => tcp_opts(Opts0)}
|
||||||
true ->
|
|
||||||
Opts3#{ssl_options => ssl_opts(Opts0), tcp_options => tcp_opts(Opts0)}
|
|
||||||
end).
|
end).
|
||||||
|
|
||||||
ws_opts(ZoneName, ListenerName, Opts) ->
|
ws_opts(Type, ListenerName, Opts) ->
|
||||||
WsPaths = [{maps:get(mqtt_path, Opts, "/mqtt"), emqx_ws_connection,
|
WsPaths = [{maps:get(mqtt_path, Opts, "/mqtt"), emqx_ws_connection,
|
||||||
#{zone => ZoneName, listener => ListenerName}}],
|
#{zone => zone(Opts), type => Type, listener => ListenerName}}],
|
||||||
Dispatch = cowboy_router:compile([{'_', WsPaths}]),
|
Dispatch = cowboy_router:compile([{'_', WsPaths}]),
|
||||||
ProxyProto = maps:get(proxy_protocol, Opts, false),
|
ProxyProto = maps:get(proxy_protocol, Opts, false),
|
||||||
#{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}.
|
#{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}.
|
||||||
|
|
||||||
ranch_opts(ListenOn, Opts) ->
|
ranch_opts(Type, ListenOn, Opts) ->
|
||||||
NumAcceptors = maps:get(acceptors, Opts, 4),
|
NumAcceptors = maps:get(acceptors, Opts, 4),
|
||||||
MaxConnections = maps:get(max_connections, Opts, 1024),
|
MaxConnections = maps:get(max_connections, Opts, 1024),
|
||||||
SocketOpts = case is_ssl(Opts) of
|
SocketOpts = case Type of
|
||||||
true -> tcp_opts(Opts) ++ proplists:delete(handshake_timeout, ssl_opts(Opts));
|
wss -> tcp_opts(Opts) ++ proplists:delete(handshake_timeout, ssl_opts(Opts));
|
||||||
false -> tcp_opts(Opts)
|
ws -> tcp_opts(Opts)
|
||||||
end,
|
end,
|
||||||
#{num_acceptors => NumAcceptors,
|
#{num_acceptors => NumAcceptors,
|
||||||
max_connections => MaxConnections,
|
max_connections => MaxConnections,
|
||||||
|
@ -217,39 +247,6 @@ esockd_access_rules(StrRules) ->
|
||||||
end,
|
end,
|
||||||
[Access(R) || R <- StrRules].
|
[Access(R) || R <- StrRules].
|
||||||
|
|
||||||
%% @doc Restart all listeners
|
|
||||||
-spec(restart() -> ok).
|
|
||||||
restart() ->
|
|
||||||
foreach_listeners(fun restart_listener/3).
|
|
||||||
|
|
||||||
-spec(restart_listener(atom()) -> ok | {error, term()}).
|
|
||||||
restart_listener(ListenerId) ->
|
|
||||||
apply_on_listener(ListenerId, fun restart_listener/3).
|
|
||||||
|
|
||||||
-spec(restart_listener(atom(), atom(), map()) -> ok | {error, term()}).
|
|
||||||
restart_listener(ZoneName, ListenerName, Conf) ->
|
|
||||||
case stop_listener(ZoneName, ListenerName, Conf) of
|
|
||||||
ok -> start_listener(ZoneName, ListenerName, Conf);
|
|
||||||
Error -> Error
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @doc Stop all listeners.
|
|
||||||
-spec(stop() -> ok).
|
|
||||||
stop() ->
|
|
||||||
foreach_listeners(fun stop_listener/3).
|
|
||||||
|
|
||||||
-spec(stop_listener(atom()) -> ok | {error, term()}).
|
|
||||||
stop_listener(ListenerId) ->
|
|
||||||
apply_on_listener(ListenerId, fun stop_listener/3).
|
|
||||||
|
|
||||||
-spec(stop_listener(atom(), atom(), map()) -> ok | {error, term()}).
|
|
||||||
stop_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn}) ->
|
|
||||||
esockd:close(listener_id(ZoneName, ListenerName), ListenOn);
|
|
||||||
stop_listener(ZoneName, ListenerName, #{type := ws}) ->
|
|
||||||
cowboy:stop_listener(listener_id(ZoneName, ListenerName));
|
|
||||||
stop_listener(ZoneName, ListenerName, #{type := quic}) ->
|
|
||||||
quicer:stop_listener(listener_id(ZoneName, ListenerName)).
|
|
||||||
|
|
||||||
merge_default(Options) ->
|
merge_default(Options) ->
|
||||||
case lists:keytake(tcp_options, 1, Options) of
|
case lists:keytake(tcp_options, 1, Options) of
|
||||||
{value, {tcp_options, TcpOpts}, Options1} ->
|
{value, {tcp_options, TcpOpts}, Options1} ->
|
||||||
|
@ -258,15 +255,15 @@ merge_default(Options) ->
|
||||||
[{tcp_options, ?MQTT_SOCKOPTS} | Options]
|
[{tcp_options, ?MQTT_SOCKOPTS} | Options]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
format(Port) when is_integer(Port) ->
|
format_addr(Port) when is_integer(Port) ->
|
||||||
io_lib:format("0.0.0.0:~w", [Port]);
|
io_lib:format("0.0.0.0:~w", [Port]);
|
||||||
format({Addr, Port}) when is_list(Addr) ->
|
format_addr({Addr, Port}) when is_list(Addr) ->
|
||||||
io_lib:format("~s:~w", [Addr, Port]);
|
io_lib:format("~s:~w", [Addr, Port]);
|
||||||
format({Addr, Port}) when is_tuple(Addr) ->
|
format_addr({Addr, Port}) when is_tuple(Addr) ->
|
||||||
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
|
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
|
||||||
|
|
||||||
listener_id(ZoneName, ListenerName) ->
|
listener_id(Type, ListenerName) ->
|
||||||
list_to_atom(lists:append([atom_to_list(ZoneName), ":", atom_to_list(ListenerName)])).
|
list_to_atom(lists:append([atom_to_list(Type), ":", atom_to_list(ListenerName)])).
|
||||||
|
|
||||||
decode_listener_id(Id) ->
|
decode_listener_id(Id) ->
|
||||||
try
|
try
|
||||||
|
@ -276,6 +273,9 @@ decode_listener_id(Id) ->
|
||||||
_ : _ -> error({invalid_listener_id, Id})
|
_ : _ -> error({invalid_listener_id, Id})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
zone(Opts) ->
|
||||||
|
maps:get(zone, Opts, undefined).
|
||||||
|
|
||||||
ssl_opts(Opts) ->
|
ssl_opts(Opts) ->
|
||||||
maps:to_list(
|
maps:to_list(
|
||||||
emqx_tls_lib:drop_tls13_for_old_otp(
|
emqx_tls_lib:drop_tls13_for_old_otp(
|
||||||
|
@ -287,9 +287,6 @@ tcp_opts(Opts) ->
|
||||||
maps:without([active_n],
|
maps:without([active_n],
|
||||||
maps:get(tcp, Opts, #{}))).
|
maps:get(tcp, Opts, #{}))).
|
||||||
|
|
||||||
is_ssl(Opts) ->
|
|
||||||
emqx_map_lib:deep_get([ssl, enable], Opts, false).
|
|
||||||
|
|
||||||
foreach_listeners(Do) ->
|
foreach_listeners(Do) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun({ZoneName, LName, LConf}) ->
|
fun({ZoneName, LName, LConf}) ->
|
||||||
|
@ -298,21 +295,13 @@ foreach_listeners(Do) ->
|
||||||
|
|
||||||
has_enabled_listener_conf_by_type(Type) ->
|
has_enabled_listener_conf_by_type(Type) ->
|
||||||
lists:any(
|
lists:any(
|
||||||
fun({_Zone, _LName, LConf}) when is_map(LConf) ->
|
fun({Type0, _LName, LConf}) when is_map(LConf) ->
|
||||||
Type =:= maps:get(type, LConf) andalso
|
Type =:= Type0 andalso maps:get(enabled, LConf, true)
|
||||||
maps:get(enabled, LConf, true)
|
|
||||||
end, do_list()).
|
end, do_list()).
|
||||||
|
|
||||||
%% merge the configs in zone and listeners in a manner that
|
|
||||||
%% all config entries in the listener are prior to the ones in the zone.
|
|
||||||
merge_zone_and_listener_confs(ZoneConf, ListenerConf) ->
|
|
||||||
ConfsInZonesOnly = [listeners, overall_max_connections],
|
|
||||||
BaseConf = maps:without(ConfsInZonesOnly, ZoneConf),
|
|
||||||
emqx_map_lib:deep_merge(BaseConf, ListenerConf).
|
|
||||||
|
|
||||||
apply_on_listener(ListenerId, Do) ->
|
apply_on_listener(ListenerId, Do) ->
|
||||||
{ZoneName, ListenerName} = decode_listener_id(ListenerId),
|
{Type, ListenerName} = decode_listener_id(ListenerId),
|
||||||
case emqx_config:find_listener_conf(ZoneName, ListenerName, []) of
|
case emqx_config:find_listener_conf(Type, ListenerName, []) of
|
||||||
{not_found, _, _} -> error({listener_config_not_found, ZoneName, ListenerName});
|
{not_found, _, _} -> error({listener_config_not_found, Type, ListenerName});
|
||||||
{ok, Conf} -> Do(ZoneName, ListenerName, Conf)
|
{ok, Conf} -> Do(Type, ListenerName, Conf)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -70,18 +70,17 @@
|
||||||
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
|
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
|
||||||
-export([ssl/1]).
|
-export([ssl/1]).
|
||||||
|
|
||||||
structs() -> ["zones", "listeners", "broker", "plugins", "sysmon", "alarm", "authorization"].
|
structs() -> ["zones", "mqtt", "flapping_detect", "force_shutdown", "force_gc",
|
||||||
|
"conn_congestion", "rate_limit", "quota", "listeners", "broker", "plugins",
|
||||||
|
"sysmon", "alarm", "authorization"].
|
||||||
|
|
||||||
fields("stats") ->
|
fields("stats") ->
|
||||||
[ {"enable", t(boolean(), undefined, true)}
|
[ {"enable", t(boolean(), undefined, true)}
|
||||||
];
|
];
|
||||||
|
|
||||||
fields("auth") ->
|
|
||||||
[ {"enable", t(boolean(), undefined, false)}
|
|
||||||
];
|
|
||||||
|
|
||||||
fields("authorization") ->
|
fields("authorization") ->
|
||||||
[ {"no_match", t(union(allow, deny), undefined, allow)}
|
[ {"no_match", t(union(allow, deny), undefined, allow)}
|
||||||
|
, {"enable", t(boolean(), undefined, true)}
|
||||||
, {"deny_action", t(union(ignore, disconnect), undefined, ignore)}
|
, {"deny_action", t(union(ignore, disconnect), undefined, ignore)}
|
||||||
, {"cache", ref("authorization_cache")}
|
, {"cache", ref("authorization_cache")}
|
||||||
];
|
];
|
||||||
|
@ -93,8 +92,7 @@ fields("authorization_cache") ->
|
||||||
];
|
];
|
||||||
|
|
||||||
fields("mqtt") ->
|
fields("mqtt") ->
|
||||||
[ {"mountpoint", t(binary(), undefined, <<>>)}
|
[ {"idle_timeout", maybe_infinity(duration(), "15s")}
|
||||||
, {"idle_timeout", maybe_infinity(duration(), "15s")}
|
|
||||||
, {"max_packet_size", t(bytesize(), undefined, "1MB")}
|
, {"max_packet_size", t(bytesize(), undefined, "1MB")}
|
||||||
, {"max_clientid_len", t(range(23, 65535), undefined, 65535)}
|
, {"max_clientid_len", t(range(23, 65535), undefined, 65535)}
|
||||||
, {"max_topic_levels", t(range(1, 65535), undefined, 65535)}
|
, {"max_topic_levels", t(range(1, 65535), undefined, 65535)}
|
||||||
|
@ -129,13 +127,11 @@ fields("zones") ->
|
||||||
|
|
||||||
fields("zone_settings") ->
|
fields("zone_settings") ->
|
||||||
[ {"mqtt", ref("mqtt")}
|
[ {"mqtt", ref("mqtt")}
|
||||||
, {"auth", ref("auth")}
|
|
||||||
, {"stats", ref("stats")}
|
, {"stats", ref("stats")}
|
||||||
, {"flapping_detect", ref("flapping_detect")}
|
, {"flapping_detect", ref("flapping_detect")}
|
||||||
, {"force_shutdown", ref("force_shutdown")}
|
, {"force_shutdown", ref("force_shutdown")}
|
||||||
, {"conn_congestion", ref("conn_congestion")}
|
, {"conn_congestion", ref("conn_congestion")}
|
||||||
, {"force_gc", ref("force_gc")}
|
, {"force_gc", ref("force_gc")}
|
||||||
, {"overall_max_connections", maybe_infinity(integer())}
|
|
||||||
, {"listeners", t("listeners")}
|
, {"listeners", t("listeners")}
|
||||||
];
|
];
|
||||||
|
|
||||||
|
@ -143,10 +139,10 @@ fields("rate_limit") ->
|
||||||
[ {"max_conn_rate", maybe_infinity(integer(), 1000)}
|
[ {"max_conn_rate", maybe_infinity(integer(), 1000)}
|
||||||
, {"conn_messages_in", maybe_infinity(comma_separated_list())}
|
, {"conn_messages_in", maybe_infinity(comma_separated_list())}
|
||||||
, {"conn_bytes_in", maybe_infinity(comma_separated_list())}
|
, {"conn_bytes_in", maybe_infinity(comma_separated_list())}
|
||||||
, {"quota", ref("rate_limit_quota")}
|
, {"quota", ref("quota")}
|
||||||
];
|
];
|
||||||
|
|
||||||
fields("rate_limit_quota") ->
|
fields("quota") ->
|
||||||
[ {"conn_messages_routing", maybe_infinity(comma_separated_list())}
|
[ {"conn_messages_routing", maybe_infinity(comma_separated_list())}
|
||||||
, {"overall_messages_routing", maybe_infinity(comma_separated_list())}
|
, {"overall_messages_routing", maybe_infinity(comma_separated_list())}
|
||||||
];
|
];
|
||||||
|
@ -190,30 +186,51 @@ fields("force_gc") ->
|
||||||
];
|
];
|
||||||
|
|
||||||
fields("listeners") ->
|
fields("listeners") ->
|
||||||
[ {"$name", hoconsc:union(
|
[ {"tcp", ref("t_tcp_listeners")}
|
||||||
[ disabled
|
, {"ssl", ref("t_ssl_listeners")}
|
||||||
, hoconsc:ref("mqtt_tcp_listener")
|
, {"ws", ref("t_ws_listeners")}
|
||||||
, hoconsc:ref("mqtt_ws_listener")
|
, {"wss", ref("t_wss_listeners")}
|
||||||
, hoconsc:ref("mqtt_quic_listener")
|
, {"quic", ref("t_quic_listeners")}
|
||||||
])}
|
];
|
||||||
|
|
||||||
|
fields("t_tcp_listeners") ->
|
||||||
|
[ {"$name", ref("mqtt_tcp_listener")}
|
||||||
|
];
|
||||||
|
fields("t_ssl_listeners") ->
|
||||||
|
[ {"$name", ref("mqtt_ssl_listener")}
|
||||||
|
];
|
||||||
|
fields("t_ws_listeners") ->
|
||||||
|
[ {"$name", ref("mqtt_ws_listener")}
|
||||||
|
];
|
||||||
|
fields("t_wss_listeners") ->
|
||||||
|
[ {"$name", ref("mqtt_wss_listener")}
|
||||||
|
];
|
||||||
|
fields("t_quic_listeners") ->
|
||||||
|
[ {"$name", ref("mqtt_quic_listener")}
|
||||||
];
|
];
|
||||||
|
|
||||||
fields("mqtt_tcp_listener") ->
|
fields("mqtt_tcp_listener") ->
|
||||||
[ {"type", t(tcp)}
|
[ {"tcp", ref("tcp_opts")}
|
||||||
, {"tcp", ref("tcp_opts")}
|
] ++ mqtt_listener();
|
||||||
|
|
||||||
|
fields("mqtt_ssl_listener") ->
|
||||||
|
[ {"tcp", ref("tcp_opts")}
|
||||||
, {"ssl", ref("ssl_opts")}
|
, {"ssl", ref("ssl_opts")}
|
||||||
] ++ mqtt_listener();
|
] ++ mqtt_listener();
|
||||||
|
|
||||||
fields("mqtt_ws_listener") ->
|
fields("mqtt_ws_listener") ->
|
||||||
[ {"type", t(ws)}
|
[ {"tcp", ref("tcp_opts")}
|
||||||
, {"tcp", ref("tcp_opts")}
|
, {"websocket", ref("ws_opts")}
|
||||||
|
] ++ mqtt_listener();
|
||||||
|
|
||||||
|
fields("mqtt_wss_listener") ->
|
||||||
|
[ {"tcp", ref("tcp_opts")}
|
||||||
, {"ssl", ref("ssl_opts")}
|
, {"ssl", ref("ssl_opts")}
|
||||||
, {"websocket", ref("ws_opts")}
|
, {"websocket", ref("ws_opts")}
|
||||||
] ++ mqtt_listener();
|
] ++ mqtt_listener();
|
||||||
|
|
||||||
fields("mqtt_quic_listener") ->
|
fields("mqtt_quic_listener") ->
|
||||||
[ {"enabled", t(boolean(), undefined, true)}
|
[ {"enabled", t(boolean(), undefined, true)}
|
||||||
, {"type", t(quic)}
|
|
||||||
, {"certfile", t(string(), undefined, undefined)}
|
, {"certfile", t(string(), undefined, undefined)}
|
||||||
, {"keyfile", t(string(), undefined, undefined)}
|
, {"keyfile", t(string(), undefined, undefined)}
|
||||||
, {"ciphers", t(comma_separated_list(), undefined, "TLS_AES_256_GCM_SHA384,"
|
, {"ciphers", t(comma_separated_list(), undefined, "TLS_AES_256_GCM_SHA384,"
|
||||||
|
@ -332,6 +349,8 @@ base_listener() ->
|
||||||
, {"acceptors", t(integer(), undefined, 16)}
|
, {"acceptors", t(integer(), undefined, 16)}
|
||||||
, {"max_connections", maybe_infinity(integer(), infinity)}
|
, {"max_connections", maybe_infinity(integer(), infinity)}
|
||||||
, {"rate_limit", ref("rate_limit")}
|
, {"rate_limit", ref("rate_limit")}
|
||||||
|
, {"mountpoint", t(binary(), undefined, <<>>)}
|
||||||
|
, {"zone", t(binary(), undefined, undefined)}
|
||||||
].
|
].
|
||||||
|
|
||||||
%% utils
|
%% utils
|
||||||
|
|
|
@ -153,13 +153,11 @@ param_path_node() ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
param_path_id() ->
|
param_path_id() ->
|
||||||
{Example,_} = hd(emqx_mgmt:list_listeners(node())),
|
|
||||||
#{
|
#{
|
||||||
name => id,
|
name => id,
|
||||||
in => path,
|
in => path,
|
||||||
schema => #{type => string},
|
schema => #{type => string},
|
||||||
required => true,
|
required => true
|
||||||
example => Example
|
|
||||||
}.
|
}.
|
||||||
|
|
||||||
param_path_operation()->
|
param_path_operation()->
|
||||||
|
|
Loading…
Reference in New Issue