fix: cluster conf must be readonly; add dashboard listener update hook

This commit is contained in:
Zhongwen Deng 2022-03-25 16:35:41 +08:00
parent 9a4a8ec022
commit 86f455fd3a
6 changed files with 122 additions and 19 deletions

View File

@ -148,7 +148,8 @@ code_change(_OldVsn, State, _Extra) ->
deep_put_handler([], Handlers, Mod) -> deep_put_handler([], Handlers, Mod) ->
{ok, Handlers#{?MOD => Mod}}; {ok, Handlers#{?MOD => Mod}};
deep_put_handler([Key | KeyPath], Handlers, Mod) -> deep_put_handler([Key0 | KeyPath], Handlers, Mod) ->
Key = atom(Key0),
SubHandlers = maps:get(Key, Handlers, #{}), SubHandlers = maps:get(Key, Handlers, #{}),
case deep_put_handler(KeyPath, SubHandlers, Mod) of case deep_put_handler(KeyPath, SubHandlers, Mod) of
{ok, NewSubHandlers} -> {ok, NewSubHandlers} ->

View File

@ -117,17 +117,20 @@ fields("cluster") ->
#{ mapping => "ekka.cluster_name" #{ mapping => "ekka.cluster_name"
, default => emqxcl , default => emqxcl
, desc => "Human-friendly name of the EMQX cluster." , desc => "Human-friendly name of the EMQX cluster."
, 'readOnly' => true
})} })}
, {"discovery_strategy", , {"discovery_strategy",
sc(hoconsc:enum([manual, static, mcast, dns, etcd, k8s]), sc(hoconsc:enum([manual, static, mcast, dns, etcd, k8s]),
#{ default => manual #{ default => manual
, desc => "Service discovery method for the cluster nodes." , desc => "Service discovery method for the cluster nodes."
, 'readOnly' => true
})} })}
, {"autoclean", , {"autoclean",
sc(emqx_schema:duration(), sc(emqx_schema:duration(),
#{ mapping => "ekka.cluster_autoclean" #{ mapping => "ekka.cluster_autoclean"
, default => "5m" , default => "5m"
, desc => "Remove disconnected nodes from the cluster after this interval." , desc => "Remove disconnected nodes from the cluster after this interval."
, 'readOnly' => true
})} })}
, {"autoheal", , {"autoheal",
sc(boolean(), sc(boolean(),
@ -135,12 +138,14 @@ fields("cluster") ->
, default => true , default => true
, desc => "If <code>true</code>, the node will try to heal network partitions , desc => "If <code>true</code>, the node will try to heal network partitions
automatically." automatically."
})} , 'readOnly' => true
})}
, {"proto_dist", , {"proto_dist",
sc(hoconsc:enum([inet_tcp, inet6_tcp, inet_tls]), sc(hoconsc:enum([inet_tcp, inet6_tcp, inet_tls]),
#{ mapping => "ekka.proto_dist" #{ mapping => "ekka.proto_dist"
, default => inet_tcp , default => inet_tcp
})} , 'readOnly' => true
})}
, {"static", , {"static",
sc(ref(cluster_static), sc(ref(cluster_static),
#{ desc => "Service discovery via static nodes. The new node joins the cluster by #{ desc => "Service discovery via static nodes. The new node joins the cluster by
@ -169,7 +174,8 @@ fields(cluster_static) ->
sc(hoconsc:array(atom()), sc(hoconsc:array(atom()),
#{ default => [] #{ default => []
, desc => "List EMQX node names in the static cluster. See <code>node.name</code>." , desc => "List EMQX node names in the static cluster. See <code>node.name</code>."
})} , 'readOnly' => true
})}
]; ];
fields(cluster_mcast) -> fields(cluster_mcast) ->
@ -177,10 +183,12 @@ fields(cluster_mcast) ->
sc(string(), sc(string(),
#{ default => "239.192.0.1" #{ default => "239.192.0.1"
, desc => "Multicast IPv4 address." , desc => "Multicast IPv4 address."
})} , 'readOnly' => true
})}
, {"ports", , {"ports",
sc(hoconsc:array(integer()), sc(hoconsc:array(integer()),
#{ default => [4369, 4370] #{ default => [4369, 4370]
, 'readOnly' => true
, desc => "List of UDP ports used for service discovery.<br/> , desc => "List of UDP ports used for service discovery.<br/>
Note: probe messages are broadcast to all the specified ports." Note: probe messages are broadcast to all the specified ports."
})} })}
@ -188,32 +196,38 @@ Note: probe messages are broadcast to all the specified ports."
sc(string(), sc(string(),
#{ default => "0.0.0.0" #{ default => "0.0.0.0"
, desc => "Local IP address the node discovery service needs to bind to." , desc => "Local IP address the node discovery service needs to bind to."
})} , 'readOnly' => true
})}
, {"ttl", , {"ttl",
sc(range(0, 255), sc(range(0, 255),
#{ default => 255 #{ default => 255
, desc => "Time-to-live (TTL) for the outgoing UDP datagrams." , desc => "Time-to-live (TTL) for the outgoing UDP datagrams."
})} , 'readOnly' => true
})}
, {"loop", , {"loop",
sc(boolean(), sc(boolean(),
#{ default => true #{ default => true
, desc => "If <code>true</code>, loop UDP datagrams back to the local socket." , desc => "If <code>true</code>, loop UDP datagrams back to the local socket."
})} , 'readOnly' => true
})}
, {"sndbuf", , {"sndbuf",
sc(emqx_schema:bytesize(), sc(emqx_schema:bytesize(),
#{ default => "16KB" #{ default => "16KB"
, desc => "Size of the kernel-level buffer for outgoing datagrams." , desc => "Size of the kernel-level buffer for outgoing datagrams."
})} , 'readOnly' => true
})}
, {"recbuf", , {"recbuf",
sc(emqx_schema:bytesize(), sc(emqx_schema:bytesize(),
#{ default => "16KB" #{ default => "16KB"
, desc => "Size of the kernel-level buffer for incoming datagrams." , desc => "Size of the kernel-level buffer for incoming datagrams."
})} , 'readOnly' => true
})}
, {"buffer", , {"buffer",
sc(emqx_schema:bytesize(), sc(emqx_schema:bytesize(),
#{ default =>"32KB" #{ default =>"32KB"
, desc => "Size of the user-level buffer." , desc => "Size of the user-level buffer."
})} , 'readOnly' => true
})}
]; ];
fields(cluster_dns) -> fields(cluster_dns) ->
@ -221,11 +235,13 @@ fields(cluster_dns) ->
sc(string(), sc(string(),
#{ default => "localhost" #{ default => "localhost"
, desc => "The domain name of the EMQX cluster." , desc => "The domain name of the EMQX cluster."
})} , 'readOnly' => true
})}
, {"app", , {"app",
sc(string(), sc(string(),
#{ default => "emqx" #{ default => "emqx"
, desc => "The symbolic name of the EMQX service." , desc => "The symbolic name of the EMQX service."
, 'readOnly' => true
})} })}
]; ];
@ -233,21 +249,25 @@ fields(cluster_etcd) ->
[ {"server", [ {"server",
sc(emqx_schema:comma_separated_list(), sc(emqx_schema:comma_separated_list(),
#{ desc => "List of endpoint URLs of the etcd cluster" #{ desc => "List of endpoint URLs of the etcd cluster"
, 'readOnly' => true
})} })}
, {"prefix", , {"prefix",
sc(string(), sc(string(),
#{ default => "emqxcl" #{ default => "emqxcl"
, desc => "Key prefix used for EMQX service discovery." , desc => "Key prefix used for EMQX service discovery."
, 'readOnly' => true
})} })}
, {"node_ttl", , {"node_ttl",
sc(emqx_schema:duration(), sc(emqx_schema:duration(),
#{ default => "1m" #{ default => "1m"
, 'readOnly' => true
, desc => "Expiration time of the etcd key associated with the node. , desc => "Expiration time of the etcd key associated with the node.
It is refreshed automatically, as long as the node is alive." It is refreshed automatically, as long as the node is alive."
})} })}
, {"ssl", , {"ssl",
sc(hoconsc:ref(emqx_schema, ssl_client_opts), sc(hoconsc:ref(emqx_schema, ssl_client_opts),
#{ desc => "Options for the TLS connection to the etcd cluster." #{ desc => "Options for the TLS connection to the etcd cluster."
, 'readOnly' => true
})} })}
]; ];
@ -255,19 +275,23 @@ fields(cluster_k8s) ->
[ {"apiserver", [ {"apiserver",
sc(string(), sc(string(),
#{ desc => "Kubernetes API endpoint URL." #{ desc => "Kubernetes API endpoint URL."
, 'readOnly' => true
})} })}
, {"service_name", , {"service_name",
sc(string(), sc(string(),
#{ default => "emqx" #{ default => "emqx"
, desc => "EMQX broker service name." , desc => "EMQX broker service name."
, 'readOnly' => true
})} })}
, {"address_type", , {"address_type",
sc(hoconsc:enum([ip, dns, hostname]), sc(hoconsc:enum([ip, dns, hostname]),
#{ desc => "Address type used for connecting to the discovered nodes." #{ desc => "Address type used for connecting to the discovered nodes."
, 'readOnly' => true
})} })}
, {"app_name", , {"app_name",
sc(string(), sc(string(),
#{ default => "emqx" #{ default => "emqx"
, 'readOnly' => true
, desc => "This parameter should be set to the part of the <code>node.name</code> , desc => "This parameter should be set to the part of the <code>node.name</code>
before the '@'.<br/> before the '@'.<br/>
For example, if the <code>node.name</code> is <code>emqx@127.0.0.1</code>, then this parameter For example, if the <code>node.name</code> is <code>emqx@127.0.0.1</code>, then this parameter
@ -277,10 +301,12 @@ should be set to <code>emqx</code>."
sc(string(), sc(string(),
#{ default => "default" #{ default => "default"
, desc => "Kubernetes namespace." , desc => "Kubernetes namespace."
, 'readOnly' => true
})} })}
, {"suffix", , {"suffix",
sc(string(), sc(string(),
#{ default => "pod.local" #{ default => "pod.local"
, 'readOnly' => true
, desc => "Node name suffix.<br/> , desc => "Node name suffix.<br/>
Note: this parameter is only relevant when <code>address_type</code> is <code>dns</code> Note: this parameter is only relevant when <code>address_type</code> is <code>dns</code>
or <code>hostname</code>." or <code>hostname</code>."
@ -290,14 +316,16 @@ or <code>hostname</code>."
fields("node") -> fields("node") ->
[ {"name", [ {"name",
sc(string(), sc(string(),
#{ default => "emqx@127.0.0.1", #{ default => "emqx@127.0.0.1"
desc => "Unique name of the EMQX node. It must follow <code>%name%@FQDN</code> or , 'readOnly' => true
, desc => "Unique name of the EMQX node. It must follow <code>%name%@FQDN</code> or
<code>%name%@IPv4</code> format." <code>%name%@IPv4</code> format."
})} })}
, {"cookie", , {"cookie",
sc(string(), sc(string(),
#{ mapping => "vm_args.-setcookie", #{ mapping => "vm_args.-setcookie",
default => "emqxsecretcookie", default => "emqxsecretcookie",
'readOnly' => true,
sensitive => true, sensitive => true,
desc => "Secret cookie is a random string that should be the same on all nodes in desc => "Secret cookie is a random string that should be the same on all nodes in
the given EMQX cluster, but unique per EMQX cluster. It is used to prevent EMQX nodes that the given EMQX cluster, but unique per EMQX cluster. It is used to prevent EMQX nodes that
@ -306,6 +334,7 @@ fields("node") ->
, {"data_dir", , {"data_dir",
sc(string(), sc(string(),
#{ required => true, #{ required => true,
'readOnly' => true,
mapping => "emqx.data_dir", mapping => "emqx.data_dir",
desc => desc =>
""" """
@ -327,6 +356,7 @@ Possible auto-created subdirectories are:
sc(list(string()), sc(list(string()),
#{ mapping => "emqx.config_files" #{ mapping => "emqx.config_files"
, default => undefined , default => undefined
, 'readOnly' => true
, desc => "List of configuration files that are read during startup. The order is , desc => "List of configuration files that are read during startup. The order is
significant: later configuration files override the previous ones." significant: later configuration files override the previous ones."
})} })}
@ -335,11 +365,13 @@ Possible auto-created subdirectories are:
#{ mapping => "emqx_machine.global_gc_interval" #{ mapping => "emqx_machine.global_gc_interval"
, default => "15m" , default => "15m"
, desc => "Periodic garbage collection interval." , desc => "Periodic garbage collection interval."
, 'readOnly' => true
})} })}
, {"crash_dump_file", , {"crash_dump_file",
sc(file(), sc(file(),
#{ mapping => "vm_args.-env ERL_CRASH_DUMP" #{ mapping => "vm_args.-env ERL_CRASH_DUMP"
, desc => "Location of the crash dump file" , desc => "Location of the crash dump file"
, 'readOnly' => true
})} })}
, {"crash_dump_seconds", , {"crash_dump_seconds",
sc(emqx_schema:duration_s(), sc(emqx_schema:duration_s(),
@ -347,17 +379,20 @@ Possible auto-created subdirectories are:
, default => "30s" , default => "30s"
, desc => "The number of seconds that the broker is allowed to spend writing , desc => "The number of seconds that the broker is allowed to spend writing
a crash dump" a crash dump"
, 'readOnly' => true
})} })}
, {"crash_dump_bytes", , {"crash_dump_bytes",
sc(emqx_schema:bytesize(), sc(emqx_schema:bytesize(),
#{ mapping => "vm_args.-env ERL_CRASH_DUMP_BYTES" #{ mapping => "vm_args.-env ERL_CRASH_DUMP_BYTES"
, default => "100MB" , default => "100MB"
, desc => "The maximum size of a crash dump file in bytes." , desc => "The maximum size of a crash dump file in bytes."
, 'readOnly' => true
})} })}
, {"dist_net_ticktime", , {"dist_net_ticktime",
sc(emqx_schema:duration(), sc(emqx_schema:duration(),
#{ mapping => "vm_args.-kernel net_ticktime" #{ mapping => "vm_args.-kernel net_ticktime"
, default => "2m" , default => "2m"
, 'readOnly' => true
, desc => "This is the approximate time an EMQX node may be unresponsive " , desc => "This is the approximate time an EMQX node may be unresponsive "
"until it is considered down and thereby disconnected." "until it is considered down and thereby disconnected."
})} })}
@ -365,6 +400,7 @@ a crash dump"
sc(integer(), sc(integer(),
#{ mapping => "emqx_machine.backtrace_depth" #{ mapping => "emqx_machine.backtrace_depth"
, default => 23 , default => 23
, 'readOnly' => true
, desc => "Maximum depth of the call stack printed in error messages and , desc => "Maximum depth of the call stack printed in error messages and
<code>process_info</code>." <code>process_info</code>."
})} })}
@ -372,18 +408,21 @@ a crash dump"
sc(emqx_schema:comma_separated_atoms(), sc(emqx_schema:comma_separated_atoms(),
#{ mapping => "emqx_machine.applications" #{ mapping => "emqx_machine.applications"
, default => [] , default => []
, 'readOnly' => true
, desc => "List of Erlang applications that shall be rebooted when the EMQX broker joins , desc => "List of Erlang applications that shall be rebooted when the EMQX broker joins
the cluster." the cluster."
})} })}
, {"etc_dir", , {"etc_dir",
sc(string(), sc(string(),
#{ desc => "<code>etc</code> dir for the node" #{ desc => "<code>etc</code> dir for the node"
, 'readOnly' => true
} }
)} )}
, {"cluster_call", , {"cluster_call",
sc(ref("cluster_call"), sc(ref("cluster_call"),
#{ desc => "Options for the 'cluster call' feature that allows to execute a callback #{ desc => "Options for the 'cluster call' feature that allows to execute a callback
on all nodes in the cluster." on all nodes in the cluster."
, 'readOnly' => true
} }
)} )}
]; ];
@ -393,6 +432,7 @@ fields("db") ->
sc(hoconsc:enum([mnesia, rlog]), sc(hoconsc:enum([mnesia, rlog]),
#{ mapping => "mria.db_backend" #{ mapping => "mria.db_backend"
, default => rlog , default => rlog
, 'readOnly' => true
, desc => """ , desc => """
Select the backend for the embedded database.<br/> Select the backend for the embedded database.<br/>
<code>rlog</code> is the default backend, a new experimental backend <code>rlog</code> is the default backend, a new experimental backend
@ -404,6 +444,7 @@ that is suitable for very large clusters.<br/>
sc(hoconsc:enum([core, replicant]), sc(hoconsc:enum([core, replicant]),
#{ mapping => "mria.node_role" #{ mapping => "mria.node_role"
, default => core , default => core
, 'readOnly' => true
, desc => """ , desc => """
Select a node role.<br/> Select a node role.<br/>
<code>core</code> nodes provide durability of the data, and take care of writes. <code>core</code> nodes provide durability of the data, and take care of writes.
@ -419,6 +460,7 @@ to <code>rlog</code>.
sc(emqx_schema:comma_separated_atoms(), sc(emqx_schema:comma_separated_atoms(),
#{ mapping => "mria.core_nodes" #{ mapping => "mria.core_nodes"
, default => [] , default => []
, 'readOnly' => true
, desc => """ , desc => """
List of core nodes that the replicant will connect to.<br/> List of core nodes that the replicant will connect to.<br/>
Note: this parameter only takes effect when the <code>backend</code> is set Note: this parameter only takes effect when the <code>backend</code> is set
@ -432,6 +474,7 @@ there is no need to set this value.
sc(hoconsc:enum([gen_rpc, rpc]), sc(hoconsc:enum([gen_rpc, rpc]),
#{ mapping => "mria.rlog_rpc_module" #{ mapping => "mria.rlog_rpc_module"
, default => gen_rpc , default => gen_rpc
, 'readOnly' => true
, desc => """ , desc => """
Protocol used for pushing transaction logs to the replicant nodes. Protocol used for pushing transaction logs to the replicant nodes.
""" """
@ -440,6 +483,7 @@ Protocol used for pushing transaction logs to the replicant nodes.
sc(hoconsc:enum([sync, async]), sc(hoconsc:enum([sync, async]),
#{ mapping => "mria.tlog_push_mode" #{ mapping => "mria.tlog_push_mode"
, default => async , default => async
, 'readOnly' => true
, desc => """ , desc => """
In sync mode the core node waits for an ack from the replicant nodes before sending the next In sync mode the core node waits for an ack from the replicant nodes before sending the next
transaction log entry. transaction log entry.

View File

@ -20,6 +20,8 @@
-export([ start_listeners/0 -export([ start_listeners/0
, start_listeners/1
, stop_listeners/1
, stop_listeners/0]). , stop_listeners/0]).
%% Authorization %% Authorization
@ -37,6 +39,14 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_listeners() -> start_listeners() ->
Listeners = emqx_conf:get([dashboard, listeners], []),
start_listeners(Listeners).
stop_listeners() ->
Listeners = emqx_conf:get([dashboard, listeners], []),
stop_listeners(Listeners).
start_listeners(Listeners) ->
{ok, _} = application:ensure_all_started(minirest), {ok, _} = application:ensure_all_started(minirest),
Authorization = {?MODULE, authorize}, Authorization = {?MODULE, authorize},
GlobalSpec = #{ GlobalSpec = #{
@ -73,13 +83,13 @@ start_listeners() ->
%% Don't record the reason because minirest already does(too much logs noise). %% Don't record the reason because minirest already does(too much logs noise).
[Name | Acc] [Name | Acc]
end end
end, [], listeners()), end, [], listeners(Listeners)),
case Res of case Res of
[] -> ok; [] -> ok;
_ -> {error, Res} _ -> {error, Res}
end. end.
stop_listeners() -> stop_listeners(Listeners) ->
[begin [begin
case minirest:stop(Name) of case minirest:stop(Name) of
ok -> ok ->
@ -87,7 +97,8 @@ stop_listeners() ->
{error, not_found} -> {error, not_found} ->
?SLOG(warning, #{msg => "stop_listener_failed", name => Name, port => Port}) ?SLOG(warning, #{msg => "stop_listener_failed", name => Name, port => Port})
end end
end || {Name, _, Port, _} <- listeners()]. end || {Name, _, Port, _} <- listeners(Listeners)],
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% internal %% internal
@ -99,14 +110,14 @@ apps() ->
_ -> false _ -> false
end]. end].
listeners() -> listeners(Listeners) ->
[begin [begin
Protocol = maps:get(protocol, ListenerOption0, http), Protocol = maps:get(protocol, ListenerOption0, http),
{ListenerOption, Bind} = ip_port(ListenerOption0), {ListenerOption, Bind} = ip_port(ListenerOption0),
Name = listener_name(Protocol, ListenerOption), Name = listener_name(Protocol, ListenerOption),
RanchOptions = ranch_opts(maps:without([protocol], ListenerOption)), RanchOptions = ranch_opts(maps:without([protocol], ListenerOption)),
{Name, Protocol, Bind, RanchOptions} {Name, Protocol, Bind, RanchOptions}
end || ListenerOption0 <- emqx_conf:get([dashboard, listeners], [])]. end || ListenerOption0 <- Listeners].
ip_port(Opts) -> ip_port(maps:take(bind, Opts), Opts). ip_port(Opts) -> ip_port(maps:take(bind, Opts), Opts).

View File

@ -31,10 +31,12 @@ start(_StartType, _StartArgs) ->
ok -> ok ->
emqx_dashboard_cli:load(), emqx_dashboard_cli:load(),
{ok, _Result} = emqx_dashboard_admin:add_default_user(), {ok, _Result} = emqx_dashboard_admin:add_default_user(),
ok = emqx_dashboard_config:add_handler(),
{ok, Sup}; {ok, Sup};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
stop(_State) -> stop(_State) ->
ok = emqx_dashboard_config:remove_handler(),
emqx_dashboard_cli:unload(), emqx_dashboard_cli:unload(),
emqx_dashboard:stop_listeners(). emqx_dashboard:stop_listeners().

View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_dashboard_config).
-behaviour(emqx_config_handler).
%% API
-export([add_handler/0, remove_handler/0]).
-export([post_config_update/5]).
add_handler() ->
Roots = emqx_dashboard_schema:roots(),
ok = emqx_config_handler:add_handler(Roots, ?MODULE),
ok.
remove_handler() ->
Roots = emqx_dashboard_schema:roots(),
ok = emqx_config_handler:remove_handler(Roots),
ok.
post_config_update(_, _Req, NewConf, OldConf, _AppEnvs) ->
#{listeners := NewListeners} = NewConf,
#{listeners := OldListeners} = OldConf,
case NewListeners =:= OldListeners of
true -> ok;
false ->
ok = emqx_dashboard:stop_listeners(OldListeners),
ok = emqx_dashboard:start_listeners(NewListeners)
end,
ok.

View File

@ -105,11 +105,13 @@ default_username(type) -> string();
default_username(default) -> "admin"; default_username(default) -> "admin";
default_username(required) -> true; default_username(required) -> true;
default_username(desc) -> "The default username of the automatically created dashboard user."; default_username(desc) -> "The default username of the automatically created dashboard user.";
default_username('readOnly') -> true;
default_username(_) -> undefined. default_username(_) -> undefined.
default_password(type) -> string(); default_password(type) -> string();
default_password(default) -> "public"; default_password(default) -> "public";
default_password(required) -> true; default_password(required) -> true;
default_password('readOnly') -> true;
default_password(sensitive) -> true; default_password(sensitive) -> true;
default_password(desc) -> """ default_password(desc) -> """
The initial default password for dashboard 'admin' user. The initial default password for dashboard 'admin' user.