From 221f6eba0611c7424bc80e9f6207ece7a2fdb7da Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 6 Jul 2023 23:56:15 +0800 Subject: [PATCH 1/3] fix: bad tnx-id when rejoin cluster --- apps/emqx/src/emqx_app.erl | 14 +----- apps/emqx_conf/src/emqx_cluster_rpc.erl | 60 ++++++++++++++++++------- apps/emqx_conf/src/emqx_conf.app.src | 2 +- apps/emqx_conf/src/emqx_conf_app.erl | 27 ++++++----- apps/emqx_conf/src/emqx_conf_schema.erl | 6 +-- apps/emqx_conf/src/emqx_conf_sup.erl | 10 ++--- 6 files changed, 67 insertions(+), 52 deletions(-) diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index ffb4e3d1e..038c93283 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -25,9 +25,7 @@ get_description/0, get_release/0, set_config_loader/1, - get_config_loader/0, - set_init_tnx_id/1, - get_init_tnx_id/0 + get_config_loader/0 ]). -include("logger.hrl"). @@ -65,16 +63,6 @@ set_config_loader(Module) when is_atom(Module) -> get_config_loader() -> application:get_env(emqx, config_loader, emqx). -%% @doc Set the transaction id from which this node should start applying after boot. -%% The transaction ID is received from the core node which we just copied the latest -%% config from. -set_init_tnx_id(TnxId) -> - application:set_env(emqx, cluster_rpc_init_tnx_id, TnxId). - -%% @doc Get the transaction id from which this node should start applying after boot. -get_init_tnx_id() -> - application:get_env(emqx, cluster_rpc_init_tnx_id, -1). - maybe_load_config() -> case get_config_loader() of emqx -> diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 599b8474b..003851420 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -17,7 +17,7 @@ -behaviour(gen_server). %% API --export([start_link/0, mnesia/1]). +-export([start_link/1, mnesia/1]). %% Note: multicall functions are statically checked by %% `emqx_bapi_trans' and `emqx_bpapi_static_checks' modules. Don't @@ -29,7 +29,8 @@ status/0, skip_failed_commit/1, fast_forward_to_commit/2, - on_mria_stop/1 + on_mria_stop/1, + wait_for_cluster_rpc/0 ]). -export([ commit/2, @@ -62,6 +63,10 @@ -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). + +start_link() -> + start_link(-1). + -endif. -boot_mnesia({mnesia, [boot]}). @@ -107,11 +112,11 @@ mnesia(boot) -> {attributes, record_info(fields, cluster_rpc_commit)} ]). -start_link() -> - start_link(node(), ?MODULE, get_retry_ms()). +start_link(TnxId) -> + start_link(TnxId, node(), ?MODULE, get_retry_ms()). -start_link(Node, Name, RetryMs) -> - case gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []) of +start_link(TnxId, Node, Name, RetryMs) -> + case gen_server:start_link({local, Name}, ?MODULE, [TnxId, Node, RetryMs], []) of {ok, Pid} -> {ok, Pid}; {error, {already_started, Pid}} -> @@ -276,29 +281,46 @@ on_mria_stop(leave) -> on_mria_stop(_) -> ok. +wait_for_cluster_rpc() -> + %% Workaround for https://github.com/emqx/mria/issues/94: + Msg1 = #{msg => "wait_for_cluster_rpc_shard"}, + case mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1500) of + ok -> ?SLOG(info, Msg1#{result => ok}); + Error0 -> ?SLOG(error, Msg1#{result => Error0}) + end, + Msg2 = #{msg => "wait_for_cluster_rpc_tables"}, + case mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]) of + ok -> ?SLOG(info, Msg2#{result => ok}); + Error1 -> ?SLOG(error, Msg2#{result => Error1}) + end, + ok. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== %% @private -init([Node, RetryMs]) -> +init([TnxId, Node, RetryMs]) -> register_mria_stop_cb(fun ?MODULE:on_mria_stop/1), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), State = #{node => Node, retry_interval => RetryMs, is_leaving => false}, - %% The init transaction ID is set in emqx_conf_app after - %% it has fetched the latest config from one of the core nodes - TnxId = emqx_app:get_init_tnx_id(), - ok = maybe_init_tnx_id(Node, TnxId), %% Now continue with the normal catch-up process %% That is: apply the missing transactions after the config %% was copied until now. - {ok, State, {continue, ?CATCH_UP}}. + {ok, State, {continue, {?CATCH_UP, TnxId}}}. %% @private -handle_continue(?CATCH_UP, State) -> +handle_continue({?CATCH_UP, TnxId}, State = #{node := Node}) -> %% emqx app must be started before %% trying to catch up the rpc commit logs ok = wait_for_emqx_ready(), + ok = wait_for_cluster_rpc(), + %% The init transaction ID is set in emqx_conf_app after + %% it has fetched the latest config from one of the core nodes + ok = maybe_init_tnx_id(Node, TnxId), + {noreply, State, catch_up(State)}; +%% @private +handle_continue(?CATCH_UP, State) -> {noreply, State, catch_up(State)}. handle_call(reset, _From, State) -> @@ -388,7 +410,8 @@ read_next_mfa(Node) -> }), TnxId; [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> - LastAppliedID + 1 + OldestId = get_oldest_mfa_id(), + max(LastAppliedID + 1, OldestId) end, case mnesia:read(?CLUSTER_MFA, NextId) of [] -> caught_up; @@ -404,8 +427,7 @@ do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) -> true -> NodeId; false -> - {atomic, LatestId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []), - case LatestId =< NodeId of + case latest_tnx_id() =< NodeId of true -> NodeId; false -> @@ -420,6 +442,12 @@ get_cluster_tnx_id() -> Id -> Id end. +get_oldest_mfa_id() -> + case mnesia:first(?CLUSTER_MFA) of + '$end_of_table' -> 0; + Id -> Id + end. + %% The entry point of a config change transaction. init_mfa(Node, MFA) -> mnesia:write_lock_table(?CLUSTER_MFA), diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index d8ee672f3..3c1e5592f 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.1.23"}, + {vsn, "0.1.24"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index c92c28971..08fe73e69 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -31,16 +31,17 @@ -define(DEFAULT_INIT_TXN_ID, -1). start(_StartType, _StartArgs) -> - try - ok = init_conf() - catch - C:E:St -> - %% logger is not quite ready. - io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]), - init:stop(1) - end, + {ok, TnxId} = + try + {ok, _} = init_conf() + catch + C:E:St -> + %% logger is not quite ready. + io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]), + init:stop(1) + end, ok = emqx_config_logger:refresh_config(), - emqx_conf_sup:start_link(). + emqx_conf_sup:start_link(TnxId). stop(_State) -> ok. @@ -112,12 +113,10 @@ init_load_done() -> emqx_app:get_config_loader() =/= emqx. init_conf() -> - %% Workaround for https://github.com/emqx/mria/issues/94: - _ = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1000), - _ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]), + emqx_cluster_rpc:wait_for_cluster_rpc(), {ok, TnxId} = sync_cluster_conf(), - _ = emqx_app:set_init_tnx_id(TnxId), - ok = init_load(). + ok = init_load(), + {ok, TnxId}. cluster_nodes() -> mria:cluster_nodes(cores) -- [node()]. diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index d5d92920d..816e2f454 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -684,10 +684,10 @@ fields("cluster_call") -> )}, {"max_history", sc( - range(1, 500), + range(100, 10240), #{ desc => ?DESC(cluster_call_max_history), - default => 100 + default => 1024 } )}, {"cleanup_interval", @@ -695,7 +695,7 @@ fields("cluster_call") -> emqx_schema:duration(), #{ desc => ?DESC(cluster_call_cleanup_interval), - default => <<"5m">> + default => <<"24h">> } )} ]; diff --git a/apps/emqx_conf/src/emqx_conf_sup.erl b/apps/emqx_conf/src/emqx_conf_sup.erl index 6a3d795ae..d224db28e 100644 --- a/apps/emqx_conf/src/emqx_conf_sup.erl +++ b/apps/emqx_conf/src/emqx_conf_sup.erl @@ -18,16 +18,16 @@ -behaviour(supervisor). --export([start_link/0]). +-export([start_link/1]). -export([init/1]). -define(SERVER, ?MODULE). -start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). +start_link(TnxId) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [TnxId]). -init([]) -> +init([TnxId]) -> SupFlags = #{ strategy => one_for_all, intensity => 10, @@ -35,7 +35,7 @@ init([]) -> }, ChildSpecs = [ - child_spec(emqx_cluster_rpc, []), + child_spec(emqx_cluster_rpc, [TnxId]), child_spec(emqx_cluster_rpc_cleaner, []) ], {ok, {SupFlags, ChildSpecs}}. From 9f57ba510ebe6275a82457a74d97f04829715a8f Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 7 Jul 2023 08:10:33 +0800 Subject: [PATCH 2/3] chore: add 11214 changelog --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 13 ++++++++----- changes/ce/fix-11214.en.md | 1 + 2 files changed, 9 insertions(+), 5 deletions(-) create mode 100644 changes/ce/fix-11214.en.md diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 003851420..072b92347 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -60,6 +60,11 @@ -export_type([tnx_id/0, succeed_num/0]). +-boot_mnesia({mnesia, [boot]}). + +-include_lib("emqx/include/logger.hrl"). +-include("emqx_conf.hrl"). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -67,13 +72,11 @@ start_link() -> start_link(-1). +start_link(Node, Name, RetryMs) -> + start_link(-1, Node, Name, RetryMs). + -endif. --boot_mnesia({mnesia, [boot]}). - --include_lib("emqx/include/logger.hrl"). --include("emqx_conf.hrl"). - -define(INITIATE(MFA), {initiate, MFA}). -define(CATCH_UP, catch_up). -define(TIMEOUT, timer:minutes(1)). diff --git a/changes/ce/fix-11214.en.md b/changes/ce/fix-11214.en.md new file mode 100644 index 000000000..35a33970a --- /dev/null +++ b/changes/ce/fix-11214.en.md @@ -0,0 +1 @@ +Fix a bug where node configuration may fail to synchronize correctly when joining the cluster. From f7513b900ae75b4e1f6c3b72ed8da268ed0f7bc2 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 7 Jul 2023 11:05:26 +0800 Subject: [PATCH 3/3] fix: set load config done after update tnx_id --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 29 +++++++++---------------- apps/emqx_conf/src/emqx_conf_app.erl | 27 ++++++++++++----------- apps/emqx_conf/src/emqx_conf_sup.erl | 10 ++++----- 3 files changed, 29 insertions(+), 37 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 072b92347..bb154f8b5 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -17,7 +17,7 @@ -behaviour(gen_server). %% API --export([start_link/1, mnesia/1]). +-export([start_link/0, mnesia/1]). %% Note: multicall functions are statically checked by %% `emqx_bapi_trans' and `emqx_bpapi_static_checks' modules. Don't @@ -30,7 +30,8 @@ skip_failed_commit/1, fast_forward_to_commit/2, on_mria_stop/1, - wait_for_cluster_rpc/0 + wait_for_cluster_rpc/0, + maybe_init_tnx_id/2 ]). -export([ commit/2, @@ -69,12 +70,6 @@ -compile(export_all). -compile(nowarn_export_all). -start_link() -> - start_link(-1). - -start_link(Node, Name, RetryMs) -> - start_link(-1, Node, Name, RetryMs). - -endif. -define(INITIATE(MFA), {initiate, MFA}). @@ -115,11 +110,11 @@ mnesia(boot) -> {attributes, record_info(fields, cluster_rpc_commit)} ]). -start_link(TnxId) -> - start_link(TnxId, node(), ?MODULE, get_retry_ms()). +start_link() -> + start_link(node(), ?MODULE, get_retry_ms()). -start_link(TnxId, Node, Name, RetryMs) -> - case gen_server:start_link({local, Name}, ?MODULE, [TnxId, Node, RetryMs], []) of +start_link(Node, Name, RetryMs) -> + case gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []) of {ok, Pid} -> {ok, Pid}; {error, {already_started, Pid}} -> @@ -303,26 +298,22 @@ wait_for_cluster_rpc() -> %%%=================================================================== %% @private -init([TnxId, Node, RetryMs]) -> +init([Node, RetryMs]) -> register_mria_stop_cb(fun ?MODULE:on_mria_stop/1), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), State = #{node => Node, retry_interval => RetryMs, is_leaving => false}, %% Now continue with the normal catch-up process %% That is: apply the missing transactions after the config %% was copied until now. - {ok, State, {continue, {?CATCH_UP, TnxId}}}. + {ok, State, {continue, {?CATCH_UP, init}}}. %% @private -handle_continue({?CATCH_UP, TnxId}, State = #{node := Node}) -> +handle_continue({?CATCH_UP, init}, State) -> %% emqx app must be started before %% trying to catch up the rpc commit logs ok = wait_for_emqx_ready(), ok = wait_for_cluster_rpc(), - %% The init transaction ID is set in emqx_conf_app after - %% it has fetched the latest config from one of the core nodes - ok = maybe_init_tnx_id(Node, TnxId), {noreply, State, catch_up(State)}; -%% @private handle_continue(?CATCH_UP, State) -> {noreply, State, catch_up(State)}. diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 08fe73e69..3c9af9393 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -31,17 +31,16 @@ -define(DEFAULT_INIT_TXN_ID, -1). start(_StartType, _StartArgs) -> - {ok, TnxId} = - try - {ok, _} = init_conf() - catch - C:E:St -> - %% logger is not quite ready. - io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]), - init:stop(1) - end, + try + ok = init_conf() + catch + C:E:St -> + %% logger is not quite ready. + io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]), + init:stop(1) + end, ok = emqx_config_logger:refresh_config(), - emqx_conf_sup:start_link(TnxId). + emqx_conf_sup:start_link(). stop(_State) -> ok. @@ -94,10 +93,12 @@ sync_data_from_node() -> %% Internal functions %% ------------------------------------------------------------------------------ -init_load() -> +init_load(TnxId) -> case emqx_app:get_config_loader() of Module when Module == emqx; Module == emqx_conf -> ok = emqx_config:init_load(emqx_conf:schema_module()), + %% Set load config done after update(init) tnx_id. + ok = emqx_cluster_rpc:maybe_init_tnx_id(node(), TnxId), ok = emqx_app:set_config_loader(emqx_conf), ok; Module -> @@ -115,8 +116,8 @@ init_load_done() -> init_conf() -> emqx_cluster_rpc:wait_for_cluster_rpc(), {ok, TnxId} = sync_cluster_conf(), - ok = init_load(), - {ok, TnxId}. + ok = init_load(TnxId), + ok. cluster_nodes() -> mria:cluster_nodes(cores) -- [node()]. diff --git a/apps/emqx_conf/src/emqx_conf_sup.erl b/apps/emqx_conf/src/emqx_conf_sup.erl index d224db28e..6a3d795ae 100644 --- a/apps/emqx_conf/src/emqx_conf_sup.erl +++ b/apps/emqx_conf/src/emqx_conf_sup.erl @@ -18,16 +18,16 @@ -behaviour(supervisor). --export([start_link/1]). +-export([start_link/0]). -export([init/1]). -define(SERVER, ?MODULE). -start_link(TnxId) -> - supervisor:start_link({local, ?SERVER}, ?MODULE, [TnxId]). +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). -init([TnxId]) -> +init([]) -> SupFlags = #{ strategy => one_for_all, intensity => 10, @@ -35,7 +35,7 @@ init([TnxId]) -> }, ChildSpecs = [ - child_spec(emqx_cluster_rpc, [TnxId]), + child_spec(emqx_cluster_rpc, []), child_spec(emqx_cluster_rpc_cleaner, []) ], {ok, {SupFlags, ChildSpecs}}.