diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 6188d8030..77ece1c60 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -72,9 +72,13 @@ set_init_config_load_done() -> get_init_config_load_done() -> application:get_env(emqx, init_config_load_done, false). +%% @doc Set the transaction id from which this node should start applying after boot. +%% The transaction ID is received from the core node which we just copied the latest +%% config from. set_init_tnx_id(TnxId) -> application:set_env(emqx, cluster_rpc_init_tnx_id, TnxId). +%% @doc Get the transaction id from which this node should start applying after boot. get_init_tnx_id() -> application:get_env(emqx, cluster_rpc_init_tnx_id, -1). diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 89f678554..f7c34031c 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -275,8 +275,13 @@ init([Node, RetryMs]) -> _ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), State = #{node => Node, retry_interval => RetryMs}, + %% The init transaction ID is set in emqx_conf_app after + %% it has fetched the latest config from one of the core nodes TnxId = emqx_app:get_init_tnx_id(), ok = maybe_init_tnx_id(Node, TnxId), + %% Now continue with the normal catch-up process + %% That is: apply the missing transactions after the config + %% was copied until now. {ok, State, {continue, ?CATCH_UP}}. %% @private @@ -396,6 +401,7 @@ get_cluster_tnx_id() -> Id -> Id end. +%% The entry point of a config change transaction. init_mfa(Node, MFA) -> mnesia:write_lock_table(?CLUSTER_MFA), LatestId = get_cluster_tnx_id(), diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 09478e304..0896eb718 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -38,7 +38,7 @@ start(_StartType, _StartArgs) -> reason => E, stacktrace => St }), - init:stop() + init:stop(1) end, ok = emqx_config_logger:refresh_config(), emqx_conf_sup:start_link().