Merge pull request #7882 from zhongwencool/dashboard-listener-start-async
feat: start dashboard listener asynchronously
This commit is contained in:
commit
b51936b7f3
|
@ -209,7 +209,7 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
|||
Msg = lists:flatten(
|
||||
io_lib:format(
|
||||
"~ts(~ts) : ~p",
|
||||
[ListenerId, BindStr, element(1, Reason)]
|
||||
[ListenerId, BindStr, filter_stacktrace(Reason)]
|
||||
)
|
||||
),
|
||||
{error, {failed_to_start, Msg}}
|
||||
|
@ -514,7 +514,8 @@ foreach_listeners(Do) ->
|
|||
{ok, #{type := Type, name := Name}} = parse_listener_id(Id),
|
||||
case Do(Type, Name, LConf) of
|
||||
{error, {failed_to_start, _} = Reason} -> error(Reason);
|
||||
_ -> ok
|
||||
{error, {already_started, _}} -> ok;
|
||||
ok -> ok
|
||||
end
|
||||
end,
|
||||
list()
|
||||
|
@ -568,3 +569,6 @@ convert_certs(CertsDir, Conf) ->
|
|||
clear_certs(CertsDir, Conf) ->
|
||||
OldSSL = maps:get(<<"ssl">>, Conf, undefined),
|
||||
emqx_tls_lib:delete_ssl_files(CertsDir, undefined, OldSSL).
|
||||
|
||||
filter_stacktrace({Reason, _Stacktrace}) -> Reason;
|
||||
filter_stacktrace(Reason) -> Reason.
|
||||
|
|
|
@ -99,11 +99,20 @@ all_ciphers(['tlsv1.3']) ->
|
|||
ssl:cipher_suites(exclusive, 'tlsv1.3', openssl);
|
||||
all_ciphers(Versions) ->
|
||||
%% assert non-empty
|
||||
[_ | _] = dedup(lists:append([ssl:cipher_suites(all, V, openssl) || V <- Versions])).
|
||||
List = lists:append([ssl:cipher_suites(all, V, openssl) || V <- Versions]),
|
||||
[_ | _] = dedup(List).
|
||||
|
||||
%% @doc All Pre-selected TLS ciphers.
|
||||
%% ssl:cipher_suites(all, V, openssl) is too slow. so we cache default ciphers.
|
||||
default_ciphers() ->
|
||||
selected_ciphers(available_versions()).
|
||||
case persistent_term:get(default_ciphers, undefined) of
|
||||
undefined ->
|
||||
Default = selected_ciphers(available_versions()),
|
||||
persistent_term:put(default_ciphers, Default),
|
||||
Default;
|
||||
Default ->
|
||||
Default
|
||||
end.
|
||||
|
||||
%% @doc Pre-selected TLS ciphers for given versions..
|
||||
selected_ciphers(Vsns) ->
|
||||
|
@ -205,8 +214,20 @@ default_versions(_) ->
|
|||
lists:delete('tlsv1.3', proplists:get_value(available, ssl:versions())).
|
||||
|
||||
%% Deduplicate a list without re-ordering the elements.
|
||||
dedup([]) -> [];
|
||||
dedup([H | T]) -> [H | dedup([I || I <- T, I =/= H])].
|
||||
dedup([]) ->
|
||||
[];
|
||||
dedup(List0) ->
|
||||
List = lists:foldl(
|
||||
fun(L, Acc) ->
|
||||
case lists:member(L, Acc) of
|
||||
false -> [L | Acc];
|
||||
true -> Acc
|
||||
end
|
||||
end,
|
||||
[],
|
||||
List0
|
||||
),
|
||||
lists:reverse(List).
|
||||
|
||||
%% parse comma separated tls version strings
|
||||
parse_versions(Versions) ->
|
||||
|
|
|
@ -193,8 +193,11 @@ start_app(App, Schema, ConfigFile, SpecAppConfig) ->
|
|||
copy_certs(App, RenderedConfigFile),
|
||||
SpecAppConfig(App),
|
||||
case application:ensure_all_started(App) of
|
||||
{ok, _} -> ok;
|
||||
{error, Reason} -> error({failed_to_start_app, App, Reason})
|
||||
{ok, _} ->
|
||||
ok = ensure_dashboard_listeners_started(App),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
error({failed_to_start_app, App, Reason})
|
||||
end.
|
||||
|
||||
render_config_file(ConfigFile, Vars0) ->
|
||||
|
@ -494,3 +497,9 @@ start_ekka() ->
|
|||
application:set_env(mria, db_backend, mnesia),
|
||||
ekka:start()
|
||||
end.
|
||||
|
||||
ensure_dashboard_listeners_started(emqx_dashboard) ->
|
||||
ok = gen_server:call(emqx_dashboard_listener, sync),
|
||||
ok;
|
||||
ensure_dashboard_listeners_started(_App) ->
|
||||
ok.
|
||||
|
|
|
@ -443,6 +443,7 @@ fields("node") ->
|
|||
#{
|
||||
mapping => "vm_args.-env ERL_CRASH_DUMP",
|
||||
desc => ?DESC(node_crash_dump_file),
|
||||
default => "log/erl_crash.dump",
|
||||
'readOnly' => true
|
||||
}
|
||||
)},
|
||||
|
|
|
@ -30,6 +30,8 @@ convert_certs(RltvDir, NewConfig) ->
|
|||
{error, {bad_ssl_config, Reason}}
|
||||
end.
|
||||
|
||||
clear_certs(_RltvDir, undefined) ->
|
||||
ok;
|
||||
clear_certs(RltvDir, Config) ->
|
||||
OldSSL = map_get_oneof([<<"ssl">>, ssl], Config, undefined),
|
||||
ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL).
|
||||
|
|
|
@ -22,7 +22,8 @@
|
|||
start_listeners/0,
|
||||
start_listeners/1,
|
||||
stop_listeners/1,
|
||||
stop_listeners/0
|
||||
stop_listeners/0,
|
||||
list_listeners/0
|
||||
]).
|
||||
|
||||
-export([
|
||||
|
@ -54,7 +55,6 @@ stop_listeners() ->
|
|||
|
||||
start_listeners(Listeners) ->
|
||||
{ok, _} = application:ensure_all_started(minirest),
|
||||
init_i18n(),
|
||||
Authorization = {?MODULE, authorize},
|
||||
GlobalSpec = #{
|
||||
openapi => "3.0.0",
|
||||
|
@ -81,7 +81,7 @@ start_listeners(Listeners) ->
|
|||
security => [#{'basicAuth' => []}, #{'bearerAuth' => []}],
|
||||
swagger_global_spec => GlobalSpec,
|
||||
dispatch => Dispatch,
|
||||
middlewares => [cowboy_router, ?EMQX_MIDDLE, cowboy_handler]
|
||||
middlewares => [?EMQX_MIDDLE, cowboy_router, cowboy_handler]
|
||||
},
|
||||
Res =
|
||||
lists:foldl(
|
||||
|
@ -101,7 +101,6 @@ start_listeners(Listeners) ->
|
|||
[],
|
||||
listeners(Listeners)
|
||||
),
|
||||
clear_i18n(),
|
||||
case Res of
|
||||
[] -> ok;
|
||||
_ -> {error, Res}
|
||||
|
@ -164,6 +163,9 @@ listeners(Listeners) ->
|
|||
maps:to_list(Listeners)
|
||||
).
|
||||
|
||||
list_listeners() ->
|
||||
listeners(listeners()).
|
||||
|
||||
ip_port(Opts) -> ip_port(maps:take(bind, Opts), Opts).
|
||||
|
||||
ip_port(error, Opts) -> {Opts#{port => 18083}, 18083};
|
||||
|
|
|
@ -26,19 +26,18 @@
|
|||
-include("emqx_dashboard.hrl").
|
||||
|
||||
start(_StartType, _StartArgs) ->
|
||||
{ok, Sup} = emqx_dashboard_sup:start_link(),
|
||||
ok = mria_rlog:wait_for_shards([?DASHBOARD_SHARD], infinity),
|
||||
{ok, Sup} = emqx_dashboard_sup:start_link(),
|
||||
case emqx_dashboard:start_listeners() of
|
||||
ok ->
|
||||
emqx_dashboard_cli:load(),
|
||||
{ok, _Result} = emqx_dashboard_admin:add_default_user(),
|
||||
ok = emqx_dashboard_config:add_handler(),
|
||||
{ok, _} = emqx_dashboard_admin:add_default_user(),
|
||||
{ok, Sup};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
stop(_State) ->
|
||||
ok = emqx_dashboard_config:remove_handler(),
|
||||
ok = emqx_dashboard:stop_listeners(),
|
||||
emqx_dashboard_cli:unload(),
|
||||
emqx_dashboard:stop_listeners().
|
||||
ok.
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_dashboard_config).
|
||||
-module(emqx_dashboard_listener).
|
||||
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-behaviour(emqx_config_handler).
|
||||
|
@ -21,38 +21,87 @@
|
|||
%% API
|
||||
-export([add_handler/0, remove_handler/0]).
|
||||
-export([pre_config_update/3, post_config_update/5]).
|
||||
-export([regenerate_minirest_dispatch/0]).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([start_link/0, is_ready/1]).
|
||||
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
-export([
|
||||
init/1,
|
||||
handle_continue/2,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2,
|
||||
code_change/3
|
||||
]).
|
||||
|
||||
is_ready(Timeout) ->
|
||||
ready =:= gen_server:call(?MODULE, is_ready, Timeout).
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
init([]) ->
|
||||
{ok, #{}, hibernate}.
|
||||
erlang:process_flag(trap_exit, true),
|
||||
ok = add_handler(),
|
||||
{ok, undefined, {continue, regenerate_dispatch}}.
|
||||
|
||||
handle_continue(regenerate_dispatch, _State) ->
|
||||
NewState = regenerate_minirest_dispatch(),
|
||||
{noreply, NewState, hibernate}.
|
||||
|
||||
handle_call(is_ready, _From, retry) ->
|
||||
NewState = regenerate_minirest_dispatch(),
|
||||
{reply, NewState, NewState, hibernate};
|
||||
handle_call(is_ready, _From, State) ->
|
||||
{reply, State, State, hibernate};
|
||||
handle_call(_Request, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
{reply, ok, State, hibernate}.
|
||||
|
||||
handle_cast(_Request, State) ->
|
||||
{noreply, State}.
|
||||
{noreply, State, hibernate}.
|
||||
|
||||
handle_info({update_listeners, OldListeners, NewListeners}, State) ->
|
||||
handle_info({update_listeners, OldListeners, NewListeners}, _State) ->
|
||||
ok = emqx_dashboard:stop_listeners(OldListeners),
|
||||
ok = emqx_dashboard:start_listeners(NewListeners),
|
||||
{noreply, State};
|
||||
NewState = regenerate_minirest_dispatch(),
|
||||
{noreply, NewState, hibernate};
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
{noreply, State, hibernate}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok = remove_handler(),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%% generate dispatch is very slow.
|
||||
regenerate_minirest_dispatch() ->
|
||||
try
|
||||
emqx_dashboard:init_i18n(),
|
||||
lists:foreach(
|
||||
fun(Listener) ->
|
||||
minirest:update_dispatch(element(1, Listener))
|
||||
end,
|
||||
emqx_dashboard:list_listeners()
|
||||
),
|
||||
ready
|
||||
catch
|
||||
T:E:S ->
|
||||
?SLOG(error, #{
|
||||
msg => "regenerate_minirest_dispatch_failed",
|
||||
reason => E,
|
||||
type => T,
|
||||
stacktrace => S
|
||||
}),
|
||||
retry
|
||||
after
|
||||
emqx_dashboard:clear_i18n()
|
||||
end.
|
||||
|
||||
add_handler() ->
|
||||
Roots = emqx_dashboard_schema:roots(),
|
||||
ok = emqx_config_handler:add_handler(Roots, ?MODULE),
|
|
@ -21,13 +21,27 @@
|
|||
-export([execute/2]).
|
||||
|
||||
execute(Req, Env) ->
|
||||
case check_dispatch_ready(Env) of
|
||||
true -> add_cors_flag(Req, Env);
|
||||
false -> {stop, cowboy_req:reply(503, Req)}
|
||||
end.
|
||||
|
||||
add_cors_flag(Req, Env) ->
|
||||
CORS = emqx_conf:get([dashboard, cors], false),
|
||||
case CORS andalso cowboy_req:header(<<"origin">>, Req, undefined) of
|
||||
Origin = cowboy_req:header(<<"origin">>, Req, undefined),
|
||||
case CORS andalso Origin =/= undefined of
|
||||
false ->
|
||||
{ok, Req, Env};
|
||||
undefined ->
|
||||
{ok, Req, Env};
|
||||
_ ->
|
||||
true ->
|
||||
Req2 = cowboy_req:set_resp_header(<<"Access-Control-Allow-Origin">>, <<"*">>, Req),
|
||||
{ok, Req2, Env}
|
||||
end.
|
||||
|
||||
check_dispatch_ready(Env) ->
|
||||
case maps:is_key(options, Env) of
|
||||
false ->
|
||||
true;
|
||||
true ->
|
||||
%% dashboard should always ready, if not, is_ready/1 will block until ready.
|
||||
emqx_dashboard_listener:is_ready(timer:seconds(15))
|
||||
end.
|
||||
|
|
|
@ -29,8 +29,8 @@ start_link() ->
|
|||
|
||||
init([]) ->
|
||||
{ok,
|
||||
{{one_for_one, 10, 100}, [
|
||||
{{one_for_one, 5, 100}, [
|
||||
?CHILD(emqx_dashboard_listener),
|
||||
?CHILD(emqx_dashboard_token),
|
||||
?CHILD(emqx_dashboard_monitor),
|
||||
?CHILD(emqx_dashboard_config)
|
||||
?CHILD(emqx_dashboard_monitor)
|
||||
]}}.
|
||||
|
|
|
@ -35,26 +35,7 @@ end_per_suite(_) ->
|
|||
ok.
|
||||
|
||||
set_special_configs(emqx_dashboard) ->
|
||||
Config = #{
|
||||
default_username => <<"admin">>,
|
||||
default_password => <<"public">>,
|
||||
listeners =>
|
||||
#{
|
||||
http =>
|
||||
#{
|
||||
backlog => 512,
|
||||
bind => 18083,
|
||||
enable => true,
|
||||
inet6 => false,
|
||||
ipv6_v6only => false,
|
||||
max_connections => 512,
|
||||
num_acceptors => 4,
|
||||
send_timeout => 5000
|
||||
}
|
||||
}
|
||||
},
|
||||
emqx_config:put([dashboard], Config),
|
||||
ok;
|
||||
emqx_dashboard_api_test_helpers:set_default_config();
|
||||
set_special_configs(_App) ->
|
||||
ok.
|
||||
|
||||
|
|
2
mix.exs
2
mix.exs
|
@ -57,7 +57,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
{:mria, github: "emqx/mria", tag: "0.2.4", override: true},
|
||||
{:ekka, github: "emqx/ekka", tag: "0.12.5", override: true},
|
||||
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
|
||||
{:minirest, github: "emqx/minirest", tag: "1.2.13", override: true},
|
||||
{:minirest, github: "emqx/minirest", tag: "1.3.2", override: true},
|
||||
{:ecpool, github: "emqx/ecpool", tag: "0.5.2"},
|
||||
{:replayq, "0.3.4", override: true},
|
||||
{:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
|
||||
|
|
|
@ -56,7 +56,7 @@
|
|||
, {mria, {git, "https://github.com/emqx/mria", {tag, "0.2.4"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.12.5"}}}
|
||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.13"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.2"}}}
|
||||
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
|
||||
, {replayq, "0.3.4"}
|
||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
||||
|
|
Loading…
Reference in New Issue