Merge pull request #10325 from zmstone/0404-delay-config-change-replay-until-handler-is-ready
0404 delay config change replay until handler is ready
This commit is contained in:
commit
9e17064e47
|
@ -270,9 +270,6 @@ fast_forward_to_commit(Node, ToTnxId) ->
|
|||
|
||||
%% @private
|
||||
init([Node, RetryMs]) ->
|
||||
%% Workaround for https://github.com/emqx/mria/issues/94:
|
||||
_ = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1000),
|
||||
_ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]),
|
||||
{ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
|
||||
State = #{node => Node, retry_interval => RetryMs},
|
||||
TnxId = emqx_app:get_init_tnx_id(),
|
||||
|
@ -281,6 +278,9 @@ init([Node, RetryMs]) ->
|
|||
|
||||
%% @private
|
||||
handle_continue(?CATCH_UP, State) ->
|
||||
%% emqx app must be started before
|
||||
%% trying to catch up the rpc commit logs
|
||||
ok = wait_for_emqx_ready(),
|
||||
{noreply, State, catch_up(State)}.
|
||||
|
||||
handle_call(reset, _From, State) ->
|
||||
|
@ -566,3 +566,37 @@ maybe_init_tnx_id(_Node, TnxId) when TnxId < 0 -> ok;
|
|||
maybe_init_tnx_id(Node, TnxId) ->
|
||||
{atomic, _} = transaction(fun ?MODULE:commit/2, [Node, TnxId]),
|
||||
ok.
|
||||
|
||||
%% @priv Cannot proceed until emqx app is ready.
|
||||
%% Otherwise the committed transaction catch up may fail.
|
||||
wait_for_emqx_ready() ->
|
||||
%% wait 10 seconds for emqx to start
|
||||
ok = do_wait_for_emqx_ready(10).
|
||||
|
||||
%% Wait for emqx app to be ready,
|
||||
%% write a log message every 1 second
|
||||
do_wait_for_emqx_ready(0) ->
|
||||
timeout;
|
||||
do_wait_for_emqx_ready(N) ->
|
||||
%% check interval is 100ms
|
||||
%% makes the total wait time 1 second
|
||||
case do_wait_for_emqx_ready2(10) of
|
||||
ok ->
|
||||
ok;
|
||||
timeout ->
|
||||
?SLOG(warning, #{msg => "stil_waiting_for_emqx_app_to_be_ready"}),
|
||||
do_wait_for_emqx_ready(N - 1)
|
||||
end.
|
||||
|
||||
%% Wait for emqx app to be ready,
|
||||
%% check interval is 100ms
|
||||
do_wait_for_emqx_ready2(0) ->
|
||||
timeout;
|
||||
do_wait_for_emqx_ready2(N) ->
|
||||
case emqx:is_running() of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
timer:sleep(100),
|
||||
do_wait_for_emqx_ready2(N - 1)
|
||||
end.
|
||||
|
|
|
@ -13,7 +13,9 @@
|
|||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_cluster_rpc_handler).
|
||||
|
||||
%% @doc This module is responsible for cleaning up the cluster RPC MFA.
|
||||
-module(emqx_cluster_rpc_cleaner).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
|
@ -84,13 +84,16 @@ init_load() ->
|
|||
-endif.
|
||||
|
||||
init_conf() ->
|
||||
%% Workaround for https://github.com/emqx/mria/issues/94:
|
||||
_ = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1000),
|
||||
_ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]),
|
||||
{ok, TnxId} = copy_override_conf_from_core_node(),
|
||||
emqx_app:set_init_tnx_id(TnxId),
|
||||
init_load(),
|
||||
emqx_app:set_init_config_load_done().
|
||||
|
||||
cluster_nodes() ->
|
||||
mria_mnesia:running_nodes() -- [node()].
|
||||
mria:cluster_nodes(cores) -- [node()].
|
||||
|
||||
copy_override_conf_from_core_node() ->
|
||||
case cluster_nodes() of
|
||||
|
|
|
@ -36,7 +36,7 @@ init([]) ->
|
|||
ChildSpecs =
|
||||
[
|
||||
child_spec(emqx_cluster_rpc, []),
|
||||
child_spec(emqx_cluster_rpc_handler, [])
|
||||
child_spec(emqx_cluster_rpc_cleaner, [])
|
||||
],
|
||||
{ok, {SupFlags, ChildSpecs}}.
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ groups() -> [].
|
|||
init_per_suite(Config) ->
|
||||
application:load(emqx_conf),
|
||||
ok = ekka:start(),
|
||||
ok = emqx_common_test_helpers:start_apps([]),
|
||||
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
|
||||
ok = emqx_config:put([node, cluster_call, retry_interval], 1000),
|
||||
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
|
||||
|
@ -53,6 +54,7 @@ init_per_suite(Config) ->
|
|||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok = emqx_common_test_helpers:stop_apps([]),
|
||||
ekka:stop(),
|
||||
mria:stop(),
|
||||
meck:unload(mria),
|
||||
|
@ -255,13 +257,13 @@ t_fast_forward_commit(_Config) ->
|
|||
),
|
||||
ok.
|
||||
|
||||
t_handler_unexpected_msg(_Config) ->
|
||||
Handler = emqx_cluster_rpc_handler,
|
||||
OldPid = erlang:whereis(Handler),
|
||||
ok = gen_server:cast(Handler, unexpected_cast_msg),
|
||||
ignore = gen_server:call(Handler, unexpected_cast_msg),
|
||||
erlang:send(Handler, unexpected_info_msg),
|
||||
NewPid = erlang:whereis(Handler),
|
||||
t_cleaner_unexpected_msg(_Config) ->
|
||||
Cleaner = emqx_cluster_cleaner,
|
||||
OldPid = erlang:whereis(Cleaner),
|
||||
ok = gen_server:cast(Cleaner, unexpected_cast_msg),
|
||||
ignore = gen_server:call(Cleaner, unexpected_cast_msg),
|
||||
erlang:send(Cleaner, unexpected_info_msg),
|
||||
NewPid = erlang:whereis(Cleaner),
|
||||
?assertEqual(OldPid, NewPid),
|
||||
ok.
|
||||
|
||||
|
@ -279,8 +281,8 @@ start() ->
|
|||
{ok, Pid1} = emqx_cluster_rpc:start_link(),
|
||||
{ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
|
||||
{ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
|
||||
{ok, Pid4} = emqx_cluster_rpc_handler:start_link(100, 500),
|
||||
true = erlang:register(emqx_cluster_rpc_handler, Pid4),
|
||||
{ok, Pid4} = emqx_cluster_rpc_cleaner:start_link(100, 500),
|
||||
true = erlang:register(emqx_cluster_rpc_cleaner, Pid4),
|
||||
{ok, [Pid1, Pid2, Pid3, Pid4]}.
|
||||
|
||||
stop() ->
|
||||
|
@ -296,7 +298,7 @@ stop() ->
|
|||
end
|
||||
|| N <- [?NODE1, ?NODE2, ?NODE3]
|
||||
],
|
||||
gen_server:stop(emqx_cluster_rpc_handler, normal, 5000).
|
||||
gen_server:stop(emqx_cluster_rpc_cleaner, normal, 5000).
|
||||
|
||||
receive_msg(0, _Msg) ->
|
||||
ok;
|
||||
|
|
|
@ -27,6 +27,13 @@ help() {
|
|||
echo " E.g. ghcr.io/emqx/emqx-builder/5.0-28:1.13.4-24.3.4.2-2-debian11"
|
||||
}
|
||||
|
||||
die() {
|
||||
msg="$1"
|
||||
echo "$msg" >&2
|
||||
help
|
||||
exit 1
|
||||
}
|
||||
|
||||
while [ "$#" -gt 0 ]; do
|
||||
case $1 in
|
||||
-h|--help)
|
||||
|
@ -81,13 +88,23 @@ while [ "$#" -gt 0 ]; do
|
|||
esac
|
||||
done
|
||||
|
||||
if [ -z "${PROFILE:-}" ] ||
|
||||
[ -z "${PKGTYPE:-}" ] ||
|
||||
[ -z "${BUILDER:-}" ] ||
|
||||
[ -z "${ARCH:-}" ]; then
|
||||
help
|
||||
exit 1
|
||||
## we have a different naming for them
|
||||
if [[ $(uname -m) == "x86_64" ]]; then
|
||||
NATIVE_ARCH='amd64'
|
||||
elif [[ $(uname -m) == "aarch64" ]]; then
|
||||
NATIVE_ARCH='arm64'
|
||||
elif [[ $(uname -m) == "arm64" ]]; then
|
||||
NATIVE_ARCH='arm64'
|
||||
elif [[ $(uname -m) == "armv7l" ]]; then
|
||||
# CHECKME: really ?
|
||||
NATIVE_ARCH='arm64'
|
||||
fi
|
||||
ARCH="${ARCH:-${NATIVE_ARCH:-}}"
|
||||
|
||||
[ -z "${PROFILE:-}" ] && die "missing --prifile"
|
||||
[ -z "${PKGTYPE:-}" ] && die "missing --pkgtyp"
|
||||
[ -z "${BUILDER:-}" ] && die "missing --builder"
|
||||
[ -z "${ARCH:-}" ] && die "missing --arch"
|
||||
|
||||
# ensure dir
|
||||
cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
|
||||
|
@ -128,13 +145,7 @@ if [[ "$HOST_SYSTEM" = "$BUILDER_SYSTEM" ]]; then
|
|||
fi
|
||||
|
||||
IS_NATIVE_ARCH='no'
|
||||
if [[ $(uname -m) == "x86_64" && "$ARCH" == "amd64" ]]; then
|
||||
IS_NATIVE_ARCH='yes'
|
||||
elif [[ $(uname -m) == "aarch64" && "$ARCH" == "arm64" ]]; then
|
||||
IS_NATIVE_ARCH='yes'
|
||||
elif [[ $(uname -m) == "arm64" && "$ARCH" == "arm64" ]]; then
|
||||
IS_NATIVE_ARCH='yes'
|
||||
elif [[ $(uname -m) == "armv7l" && "$ARCH" == "arm64" ]]; then
|
||||
if [[ "$NATIVE_ARCH" == "$ARCH" ]]; then
|
||||
IS_NATIVE_ARCH='yes'
|
||||
fi
|
||||
|
||||
|
@ -151,7 +162,7 @@ elif docker info; then
|
|||
--platform="linux/$ARCH" \
|
||||
--env ACLOCAL_PATH="/usr/share/aclocal:/usr/local/share/aclocal" \
|
||||
"$BUILDER" \
|
||||
bash -euc "$CMD_RUN"
|
||||
bash -euc "git config --global --add safe.directory /emqx && $CMD_RUN"
|
||||
else
|
||||
echo "Error: Docker not available on unsupported platform"
|
||||
exit 1;
|
||||
|
|
Loading…
Reference in New Issue