diff --git a/apps/emqx/test/emqx_trace_handler_SUITE.erl b/apps/emqx/test/emqx_trace_handler_SUITE.erl index e010b6c5e..950f0faf6 100644 --- a/apps/emqx/test/emqx_trace_handler_SUITE.erl +++ b/apps/emqx/test/emqx_trace_handler_SUITE.erl @@ -199,6 +199,7 @@ t_trace_ip_address(_Config) -> ?assertEqual([], emqx_trace_handler:running()). filesync(Name, Type) -> + ct:sleep(50), filesync(Name, Type, 3). %% sometime the handler process is not started yet. diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 153800414..7ebe7645b 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -18,8 +18,9 @@ %% API -export([start_link/0, mnesia/1]). --export([multicall/3, multicall/5, query/1, reset/0, status/0, skip_failed_commit/1]). --export([get_node_tnx_id/1]). +-export([multicall/3, multicall/5, query/1, reset/0, status/0, + skip_failed_commit/1, fast_forward_to_commit/2]). +-export([get_node_tnx_id/1, latest_tnx_id/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, handle_continue/2, code_change/3]). @@ -60,21 +61,28 @@ start_link() -> start_link(Node, Name, RetryMs) -> gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). --spec multicall(Module, Function, Args) -> {ok, TnxId, term()} | {error, Reason} when +%% @doc return {ok, TnxId, MFARes} the first MFA result when all MFA run ok. +%% return {error, MFARes} when the first MFA result is no ok or {ok, term()}. +%% return {retry, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok. +-spec multicall(Module, Function, Args) -> + {ok, TnxId, term()} | {error, Reason} | {retry, TnxId, MFARes, node()} when Module :: module(), Function :: atom(), Args :: [term()], + MFARes :: term(), TnxId :: pos_integer(), Reason :: string(). multicall(M, F, A) -> multicall(M, F, A, all, timer:minutes(2)). --spec multicall(Module, Function, Args, SucceedNum, Timeout) -> {ok, TnxId, term()} |{error, Reason} when +-spec multicall(Module, Function, Args, SucceedNum, Timeout) -> + {ok, TnxId, MFARes} | {error, Reason} | {retry, TnxId, MFARes, node()} when Module :: module(), Function :: atom(), Args :: [term()], SucceedNum :: pos_integer() | all, TnxId :: pos_integer(), + MFARes :: term(), Timeout :: timeout(), Reason :: string(). multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 -> @@ -108,7 +116,10 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu end, case OkOrFailed of ok -> InitRes; - _ -> OkOrFailed + {error, Error0} -> {error, Error0}; + {retry, Node0} -> + {ok, TnxId0, MFARes} = InitRes, + {retry, TnxId0, MFARes, Node0} end. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. @@ -122,6 +133,11 @@ reset() -> gen_server:call(?MODULE, reset). status() -> transaction(fun trans_status/0, []). +-spec latest_tnx_id() -> pos_integer(). +latest_tnx_id() -> + {atomic, TnxId} = transaction(fun get_latest_id/0, []), + TnxId. + -spec get_node_tnx_id(node()) -> integer(). get_node_tnx_id(Node) -> case mnesia:wread({?CLUSTER_COMMIT, Node}) of @@ -136,6 +152,13 @@ get_node_tnx_id(Node) -> skip_failed_commit(Node) -> gen_server:call({?MODULE, Node}, skip_failed_commit). +%% Regardless of what MFA is returned, consider it a success), +%% then skip the specified TnxId. +%% If CurrTnxId >= TnxId, nothing happened. +%% If CurrTnxId < TnxId, the CurrTnxId will skip to TnxId. +-spec fast_forward_to_commit(node(), pos_integer()) -> pos_integer(). +fast_forward_to_commit(Node, ToTnxId) -> + gen_server:call({?MODULE, Node}, {fast_forward_to_commit, ToTnxId}). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -165,8 +188,13 @@ handle_call({initiate, MFA}, _From, State = #{node := Node}) -> {aborted, Reason} -> {reply, {error, Reason}, State, {continue, ?CATCH_UP}} end; -handle_call(skip_failed_commit, _From, State) -> - {reply, ok, State, catch_up(State, true)}; +handle_call(skip_failed_commit, _From, State = #{node := Node}) -> + Timeout = catch_up(State, true), + {atomic, LatestId} = transaction(fun get_node_tnx_id/1, [Node]), + {reply, LatestId, State, Timeout}; +handle_call({fast_forward_to_commit, ToTnxId}, _From, State) -> + NodeId = do_fast_forward_to_commit(ToTnxId, State), + {reply, NodeId, State, catch_up(State)}; handle_call(_, _From, State) -> {reply, ok, State, catch_up(State)}. @@ -245,7 +273,8 @@ do_catch_up(ToTnxId, Node) -> {false, Error} -> mnesia:abort(Error) end; [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> - Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", + Reason = lists:flatten( + io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", [Node, LastAppliedId, ToTnxId])), ?SLOG(error, #{ msg => "catch up failed!", @@ -258,6 +287,20 @@ do_catch_up(ToTnxId, Node) -> commit(Node, TnxId) -> ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write). +do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) -> + {atomic, NodeId} = transaction(fun get_node_tnx_id/1, [Node]), + case NodeId >= ToTnxId of + true -> NodeId; + false -> + {atomic, LatestId} = transaction(fun get_latest_id/0, []), + case LatestId =< NodeId of + true -> NodeId; + false -> + catch_up(State, true), + do_fast_forward_to_commit(ToTnxId, State) + end + end. + get_latest_id() -> case mnesia:last(?CLUSTER_MFA) of '$end_of_table' -> 0; @@ -269,7 +312,8 @@ init_mfa(Node, MFA) -> LatestId = get_latest_id(), ok = do_catch_up_in_one_trans(LatestId, Node), TnxId = LatestId + 1, - MFARec = #cluster_rpc_mfa{tnx_id = TnxId, mfa = MFA, initiator = Node, created_at = erlang:localtime()}, + MFARec = #cluster_rpc_mfa{tnx_id = TnxId, mfa = MFA, + initiator = Node, created_at = erlang:localtime()}, ok = mnesia:write(?CLUSTER_MFA, MFARec, write), ok = commit(Node, TnxId), case apply_mfa(TnxId, MFA) of @@ -344,7 +388,7 @@ wait_for_all_nodes_commit(TnxId, Delay, Remain) -> ok = timer:sleep(Delay), wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay); [] -> ok; - Nodes -> {error, Nodes} + Nodes -> {retry, Nodes} end. wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) -> @@ -356,7 +400,7 @@ wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) -> false -> case lagging_node(TnxId) of [] -> ok; %% All commit but The succeedNum > length(nodes()). - Nodes -> {error, Nodes} + Nodes -> {retry, Nodes} end end. diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index c3dfa8c49..c82623e72 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -16,6 +16,7 @@ -module(emqx_conf). -compile({no_auto_import, [get/1, get/2]}). +-include_lib("emqx/include/logger.hrl"). -export([add_handler/2, remove_handler/1]). -export([get/1, get/2, get_raw/2, get_all/1]). @@ -123,13 +124,18 @@ reset(Node, KeyPath, Opts) -> rpc:call(Node, ?MODULE, reset, [KeyPath, Opts]). %%-------------------------------------------------------------------- -%% Internal funcs +%% Internal functions %%-------------------------------------------------------------------- multicall(M, F, Args) -> case emqx_cluster_rpc:multicall(M, F, Args) of - {ok, _TnxId, Res} -> + {ok, _TnxId, Res} -> Res; + {retry, TnxId, Res, Nodes} -> + %% The init MFA return ok, but other nodes failed. + %% We return ok and alert an alarm. + ?SLOG(error, #{msg => "failed to update config in cluster", nodes => Nodes, + tnx_id => TnxId, mfa => {M, F, Args}}), Res; - {error, Reason} -> - {error, Reason} + {error, Error} -> %% all MFA return not ok or {ok, term()}. + Error end. diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl new file mode 100644 index 000000000..7fb421e75 --- /dev/null +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -0,0 +1,92 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_cli). +-export([ load/0 + , admins/1 + , unload/0 + ]). + +-define(CMD, cluster_call). + +load() -> + emqx_ctl:register_command(?CMD, {?MODULE, admins}, []). + +unload() -> + emqx_ctl:unregister_command(?CMD). + +admins(["status"]) -> status(); + +admins(["skip"]) -> + status(), + Nodes = mria_mnesia:running_nodes(), + lists:foreach(fun emqx_cluster_rpc:skip_failed_commit/1, Nodes), + status(); + +admins(["skip", Node0]) -> + status(), + Node = list_to_existing_atom(Node0), + emqx_cluster_rpc:skip_failed_commit(Node), + status(); + +admins(["tnxid", TnxId0]) -> + TnxId = list_to_integer(TnxId0), + emqx_ctl:print("~p~n", [emqx_cluster_rpc:query(TnxId)]); + +admins(["fast_forward"]) -> + status(), + Nodes = mria_mnesia:running_nodes(), + TnxId = emqx_cluster_rpc:latest_tnx_id(), + lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes), + status(); + +admins(["fast_forward", ToTnxId]) -> + status(), + Nodes = mria_mnesia:running_nodes(), + TnxId = list_to_integer(ToTnxId), + lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes), + status(); + +admins(["fast_forward", Node0, ToTnxId]) -> + status(), + TnxId = list_to_integer(ToTnxId), + Node = list_to_existing_atom(Node0), + emqx_cluster_rpc:fast_forward_to_commit(Node, TnxId), + status(); + +admins(_) -> + emqx_ctl:usage( + [ + {"cluster_call status", "status"}, + {"cluster_call skip [node]", "increase one commit on specific node"}, + {"cluster_call tnxid ", "get detailed about TnxId"}, + {"cluster_call fast_forward [node] [tnx_id]", "fast forwards to tnx_id" } + ]). + +status() -> + emqx_ctl:print("-----------------------------------------------\n"), + {atomic, Status} = emqx_cluster_rpc:status(), + lists:foreach(fun(S) -> + #{ + node := Node, + tnx_id := TnxId, + mfa := {M, F, A}, + created_at := CreatedAt + } = S, + emqx_ctl:print("~p:[~w] CreatedAt:~p ~p:~p/~w\n", + [Node, TnxId, CreatedAt, M, F, length(A)]) + end, Status), + emqx_ctl:print("-----------------------------------------------\n"). diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index 993ab3dc5..ad74faf99 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -33,7 +33,8 @@ all() -> [ t_commit_ok_but_apply_fail_on_other_node, t_commit_ok_apply_fail_on_other_node_then_recover, t_del_stale_mfa, - t_skip_failed_commit + t_skip_failed_commit, + t_fast_forward_commit ]. suite() -> [{timetrap, {minutes, 3}}]. groups() -> []. @@ -183,13 +184,37 @@ t_skip_failed_commit(_Config) -> ?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}], tnx_ids(List1)), {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, - {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), - ok = gen_server:call(?NODE2, skip_failed_commit, 5000), + {ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + 2 = gen_server:call(?NODE2, skip_failed_commit, 5000), {atomic, List2} = emqx_cluster_rpc:status(), ?assertEqual([{Node, 2}, {{Node, ?NODE2}, 2}, {{Node, ?NODE3}, 1}], tnx_ids(List2)), ok. +t_fast_forward_commit(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + {ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000), + ct:sleep(180), + {atomic, List1} = emqx_cluster_rpc:status(), + Node = node(), + ?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}], + tnx_ids(List1)), + {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, + {ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + {ok, 3, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + {ok, 4, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + {ok, 5, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + {retry, 6, ok, _} = emqx_cluster_rpc:multicall(M, F, A, 2, 1000), + 3 = gen_server:call(?NODE2, {fast_forward_to_commit, 3}, 5000), + 4 = gen_server:call(?NODE2, {fast_forward_to_commit, 4}, 5000), + 6 = gen_server:call(?NODE2, {fast_forward_to_commit, 7}, 5000), + 2 = gen_server:call(?NODE3, {fast_forward_to_commit, 2}, 5000), + {atomic, List2} = emqx_cluster_rpc:status(), + ?assertEqual([{Node, 6}, {{Node, ?NODE2}, 6}, {{Node, ?NODE3}, 2}], + tnx_ids(List2)), + ok. + tnx_ids(Status) -> lists:sort(lists:map(fun(#{tnx_id := TnxId, node := Node}) -> {Node, TnxId} end, Status)). diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index ddbf99189..3612b428d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -232,7 +232,7 @@ update(Req) -> res(emqx_conf:update([gateway], Req, #{override_to => cluster})). res({ok, _Result}) -> ok; -res({error, {error, {pre_config_update,emqx_gateway_conf,Reason}}}) -> {error, Reason}; +res({error, {pre_config_update, emqx_gateway_conf, Reason}}) -> {error, Reason}; res({error, Reason}) -> {error, Reason}. bin({LType, LName}) -> diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index 1605c3382..55c882f94 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -36,6 +36,7 @@ maybe_enable_modules() -> emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(), emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(), emqx_event_message:enable(), + emqx_conf_cli:load(), ok = emqx_rewrite:enable(), emqx_topic_metrics:enable(). @@ -45,4 +46,5 @@ maybe_disable_modules() -> emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(), emqx_event_message:disable(), emqx_rewrite:disable(), + emqx_conf_cli:unload(), emqx_topic_metrics:disable(). diff --git a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl index 4d51834c2..713468460 100644 --- a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl +++ b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl @@ -168,12 +168,11 @@ t_update_re_failed(_Config) -> }], Error = {badmatch, {error, - {error, - {emqx_modules_schema, - [{validation_error, - #{array_index => 1,path => "rewrite.re", - reason => {<<"*^test/*">>,{"nothing to repeat",0}}, - value => <<"*^test/*">>}}]}}}}, + {emqx_modules_schema, + [{validation_error, + #{array_index => 1,path => "rewrite.re", + reason => {<<"*^test/*">>,{"nothing to repeat",0}}, + value => <<"*^test/*">>}}]}}}, ?assertError(Error, emqx_rewrite:update(Rules)), ok.