From f7513b900ae75b4e1f6c3b72ed8da268ed0f7bc2 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 7 Jul 2023 11:05:26 +0800 Subject: [PATCH] fix: set load config done after update tnx_id --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 29 +++++++++---------------- apps/emqx_conf/src/emqx_conf_app.erl | 27 ++++++++++++----------- apps/emqx_conf/src/emqx_conf_sup.erl | 10 ++++----- 3 files changed, 29 insertions(+), 37 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 072b92347..bb154f8b5 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -17,7 +17,7 @@ -behaviour(gen_server). %% API --export([start_link/1, mnesia/1]). +-export([start_link/0, mnesia/1]). %% Note: multicall functions are statically checked by %% `emqx_bapi_trans' and `emqx_bpapi_static_checks' modules. Don't @@ -30,7 +30,8 @@ skip_failed_commit/1, fast_forward_to_commit/2, on_mria_stop/1, - wait_for_cluster_rpc/0 + wait_for_cluster_rpc/0, + maybe_init_tnx_id/2 ]). -export([ commit/2, @@ -69,12 +70,6 @@ -compile(export_all). -compile(nowarn_export_all). -start_link() -> - start_link(-1). - -start_link(Node, Name, RetryMs) -> - start_link(-1, Node, Name, RetryMs). - -endif. -define(INITIATE(MFA), {initiate, MFA}). @@ -115,11 +110,11 @@ mnesia(boot) -> {attributes, record_info(fields, cluster_rpc_commit)} ]). -start_link(TnxId) -> - start_link(TnxId, node(), ?MODULE, get_retry_ms()). +start_link() -> + start_link(node(), ?MODULE, get_retry_ms()). -start_link(TnxId, Node, Name, RetryMs) -> - case gen_server:start_link({local, Name}, ?MODULE, [TnxId, Node, RetryMs], []) of +start_link(Node, Name, RetryMs) -> + case gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []) of {ok, Pid} -> {ok, Pid}; {error, {already_started, Pid}} -> @@ -303,26 +298,22 @@ wait_for_cluster_rpc() -> %%%=================================================================== %% @private -init([TnxId, Node, RetryMs]) -> +init([Node, RetryMs]) -> register_mria_stop_cb(fun ?MODULE:on_mria_stop/1), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), State = #{node => Node, retry_interval => RetryMs, is_leaving => false}, %% Now continue with the normal catch-up process %% That is: apply the missing transactions after the config %% was copied until now. - {ok, State, {continue, {?CATCH_UP, TnxId}}}. + {ok, State, {continue, {?CATCH_UP, init}}}. %% @private -handle_continue({?CATCH_UP, TnxId}, State = #{node := Node}) -> +handle_continue({?CATCH_UP, init}, State) -> %% emqx app must be started before %% trying to catch up the rpc commit logs ok = wait_for_emqx_ready(), ok = wait_for_cluster_rpc(), - %% The init transaction ID is set in emqx_conf_app after - %% it has fetched the latest config from one of the core nodes - ok = maybe_init_tnx_id(Node, TnxId), {noreply, State, catch_up(State)}; -%% @private handle_continue(?CATCH_UP, State) -> {noreply, State, catch_up(State)}. diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 08fe73e69..3c9af9393 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -31,17 +31,16 @@ -define(DEFAULT_INIT_TXN_ID, -1). start(_StartType, _StartArgs) -> - {ok, TnxId} = - try - {ok, _} = init_conf() - catch - C:E:St -> - %% logger is not quite ready. - io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]), - init:stop(1) - end, + try + ok = init_conf() + catch + C:E:St -> + %% logger is not quite ready. + io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]), + init:stop(1) + end, ok = emqx_config_logger:refresh_config(), - emqx_conf_sup:start_link(TnxId). + emqx_conf_sup:start_link(). stop(_State) -> ok. @@ -94,10 +93,12 @@ sync_data_from_node() -> %% Internal functions %% ------------------------------------------------------------------------------ -init_load() -> +init_load(TnxId) -> case emqx_app:get_config_loader() of Module when Module == emqx; Module == emqx_conf -> ok = emqx_config:init_load(emqx_conf:schema_module()), + %% Set load config done after update(init) tnx_id. + ok = emqx_cluster_rpc:maybe_init_tnx_id(node(), TnxId), ok = emqx_app:set_config_loader(emqx_conf), ok; Module -> @@ -115,8 +116,8 @@ init_load_done() -> init_conf() -> emqx_cluster_rpc:wait_for_cluster_rpc(), {ok, TnxId} = sync_cluster_conf(), - ok = init_load(), - {ok, TnxId}. + ok = init_load(TnxId), + ok. cluster_nodes() -> mria:cluster_nodes(cores) -- [node()]. diff --git a/apps/emqx_conf/src/emqx_conf_sup.erl b/apps/emqx_conf/src/emqx_conf_sup.erl index d224db28e..6a3d795ae 100644 --- a/apps/emqx_conf/src/emqx_conf_sup.erl +++ b/apps/emqx_conf/src/emqx_conf_sup.erl @@ -18,16 +18,16 @@ -behaviour(supervisor). --export([start_link/1]). +-export([start_link/0]). -export([init/1]). -define(SERVER, ?MODULE). -start_link(TnxId) -> - supervisor:start_link({local, ?SERVER}, ?MODULE, [TnxId]). +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). -init([TnxId]) -> +init([]) -> SupFlags = #{ strategy => one_for_all, intensity => 10, @@ -35,7 +35,7 @@ init([TnxId]) -> }, ChildSpecs = [ - child_spec(emqx_cluster_rpc, [TnxId]), + child_spec(emqx_cluster_rpc, []), child_spec(emqx_cluster_rpc_cleaner, []) ], {ok, {SupFlags, ChildSpecs}}.