Merge pull request #11148 from lafirest/fix/mcall_error_report

fix(emqx_conf): don't sync the MFA to a leaved node
This commit is contained in:
zhongwencool 2023-07-04 19:56:52 +08:00 committed by GitHub
commit f2c5f4932c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 53 additions and 9 deletions

View File

@ -145,6 +145,8 @@ jobs:
pgsql_authn_authz: pgsql_authn_authz:
runs-on: ubuntu-22.04 runs-on: ubuntu-22.04
env:
_EMQX_DOCKER_IMAGE_TAG: emqx/emqx:${{ needs.build_emqx_for_jmeter_tests.outputs.version }}
strategy: strategy:
fail-fast: false fail-fast: false
@ -175,7 +177,6 @@ jobs:
- name: docker compose up - name: docker compose up
timeout-minutes: 5 timeout-minutes: 5
env: env:
_EMQX_DOCKER_IMAGE_TAG: emqx/emqx:${{ needs.build_emqx_for_jmeter_tests.outputs.version }}
PGSQL_TAG: ${{ matrix.pgsql_tag }} PGSQL_TAG: ${{ matrix.pgsql_tag }}
run: | run: |
docker-compose \ docker-compose \
@ -246,6 +247,10 @@ jobs:
echo "check logs failed" echo "check logs failed"
exit 1 exit 1
fi fi
- name: dump docker compose logs
if: failure()
run: |
docker-compose -f .ci/docker-compose-file/docker-compose-emqx-cluster.yaml logs --no-color > ./jmeter_logs/emqx.log
- uses: actions/upload-artifact@v3 - uses: actions/upload-artifact@v3
if: always() if: always()
with: with:

View File

@ -27,7 +27,7 @@
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.4"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.5"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.10"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.10"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},

View File

@ -28,7 +28,8 @@
reset/0, reset/0,
status/0, status/0,
skip_failed_commit/1, skip_failed_commit/1,
fast_forward_to_commit/2 fast_forward_to_commit/2,
on_mria_stop/1
]). ]).
-export([ -export([
commit/2, commit/2,
@ -40,7 +41,8 @@
make_initiate_call_req/3, make_initiate_call_req/3,
read_next_mfa/1, read_next_mfa/1,
trans_query/1, trans_query/1,
trans_status/0 trans_status/0,
on_leave_clean/0
]). ]).
-export([ -export([
@ -211,6 +213,9 @@ reset() -> gen_server:call(?MODULE, reset).
status() -> status() ->
transaction(fun ?MODULE:trans_status/0, []). transaction(fun ?MODULE:trans_status/0, []).
on_leave_clean() ->
mnesia:delete({?CLUSTER_COMMIT, node()}).
-spec latest_tnx_id() -> pos_integer(). -spec latest_tnx_id() -> pos_integer().
latest_tnx_id() -> latest_tnx_id() ->
{atomic, TnxId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []), {atomic, TnxId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []),
@ -264,14 +269,22 @@ skip_failed_commit(Node) ->
-spec fast_forward_to_commit(node(), pos_integer()) -> pos_integer(). -spec fast_forward_to_commit(node(), pos_integer()) -> pos_integer().
fast_forward_to_commit(Node, ToTnxId) -> fast_forward_to_commit(Node, ToTnxId) ->
gen_server:call({?MODULE, Node}, {fast_forward_to_commit, ToTnxId}). gen_server:call({?MODULE, Node}, {fast_forward_to_commit, ToTnxId}).
%% It is necessary to clean this node commit record in the cluster
on_mria_stop(leave) ->
gen_server:call(?MODULE, on_leave);
on_mria_stop(_) ->
ok.
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%=================================================================== %%%===================================================================
%% @private %% @private
init([Node, RetryMs]) -> init([Node, RetryMs]) ->
register_mria_stop_cb(fun ?MODULE:on_mria_stop/1),
{ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
State = #{node => Node, retry_interval => RetryMs}, State = #{node => Node, retry_interval => RetryMs, is_leaving => false},
%% The init transaction ID is set in emqx_conf_app after %% The init transaction ID is set in emqx_conf_app after
%% it has fetched the latest config from one of the core nodes %% it has fetched the latest config from one of the core nodes
TnxId = emqx_app:get_init_tnx_id(), TnxId = emqx_app:get_init_tnx_id(),
@ -306,6 +319,9 @@ handle_call(skip_failed_commit, _From, State = #{node := Node}) ->
handle_call({fast_forward_to_commit, ToTnxId}, _From, State) -> handle_call({fast_forward_to_commit, ToTnxId}, _From, State) ->
NodeId = do_fast_forward_to_commit(ToTnxId, State), NodeId = do_fast_forward_to_commit(ToTnxId, State),
{reply, NodeId, State, catch_up(State)}; {reply, NodeId, State, catch_up(State)};
handle_call(on_leave, _From, State) ->
{atomic, ok} = transaction(fun ?MODULE:on_leave_clean/0, []),
{reply, ok, State#{is_leaving := true}};
handle_call(_, _From, State) -> handle_call(_, _From, State) ->
{reply, ok, State, catch_up(State)}. {reply, ok, State, catch_up(State)}.
@ -328,7 +344,7 @@ code_change(_OldVsn, State, _Extra) ->
%%%=================================================================== %%%===================================================================
catch_up(State) -> catch_up(State, false). catch_up(State) -> catch_up(State, false).
catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> catch_up(#{node := Node, retry_interval := RetryMs, is_leaving := false} = State, SkipResult) ->
case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of
{atomic, caught_up} -> {atomic, caught_up} ->
?TIMEOUT; ?TIMEOUT;
@ -353,7 +369,10 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
{aborted, Reason} -> {aborted, Reason} ->
?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}), ?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}),
RetryMs RetryMs
end. end;
catch_up(#{is_leaving := true}, _SkipResult) ->
?SLOG(info, #{msg => "ignore_mfa_transactions", reason => "Node is in leaving"}),
?TIMEOUT.
read_next_mfa(Node) -> read_next_mfa(Node) ->
NextId = NextId =
@ -624,3 +643,22 @@ do_wait_for_emqx_ready2(N) ->
timer:sleep(100), timer:sleep(100),
do_wait_for_emqx_ready2(N - 1) do_wait_for_emqx_ready2(N - 1)
end. end.
register_mria_stop_cb(Callback) ->
case mria_config:callback(stop) of
undefined ->
mria:register_callback(stop, Callback);
{ok, Previous} ->
mria:register_callback(
stop,
fun(Arg) ->
Callback(Arg),
case erlang:fun_info(Previous, arity) of
{arity, 0} ->
Previous();
{arity, 1} ->
Previous(Arg)
end
end
)
end.

View File

@ -0,0 +1 @@
Fix when a node has left the cluster, other nodes still try to synchronize configuration update operations to it.

View File

@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do
{:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
{:esockd, github: "emqx/esockd", tag: "5.9.6", override: true}, {:esockd, github: "emqx/esockd", tag: "5.9.6", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-11", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-11", override: true},
{:ekka, github: "emqx/ekka", tag: "0.15.4", override: true}, {:ekka, github: "emqx/ekka", tag: "0.15.5", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true},
{:minirest, github: "emqx/minirest", tag: "1.3.11", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.11", override: true},

View File

@ -62,7 +62,7 @@
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-11"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-11"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.4"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.5"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.11"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.11"}}}