diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 17223234e..7ebe7645b 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -18,8 +18,9 @@ %% API -export([start_link/0, mnesia/1]). --export([multicall/3, multicall/5, query/1, reset/0, status/0, skip_failed_commit/1]). --export([get_node_tnx_id/1]). +-export([multicall/3, multicall/5, query/1, reset/0, status/0, + skip_failed_commit/1, fast_forward_to_commit/2]). +-export([get_node_tnx_id/1, latest_tnx_id/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, handle_continue/2, code_change/3]). @@ -132,6 +133,11 @@ reset() -> gen_server:call(?MODULE, reset). status() -> transaction(fun trans_status/0, []). +-spec latest_tnx_id() -> pos_integer(). +latest_tnx_id() -> + {atomic, TnxId} = transaction(fun get_latest_id/0, []), + TnxId. + -spec get_node_tnx_id(node()) -> integer(). get_node_tnx_id(Node) -> case mnesia:wread({?CLUSTER_COMMIT, Node}) of @@ -267,7 +273,8 @@ do_catch_up(ToTnxId, Node) -> {false, Error} -> mnesia:abort(Error) end; [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> - Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", + Reason = lists:flatten( + io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", [Node, LastAppliedId, ToTnxId])), ?SLOG(error, #{ msg => "catch up failed!", diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl new file mode 100644 index 000000000..7fb421e75 --- /dev/null +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -0,0 +1,92 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_cli). +-export([ load/0 + , admins/1 + , unload/0 + ]). + +-define(CMD, cluster_call). + +load() -> + emqx_ctl:register_command(?CMD, {?MODULE, admins}, []). + +unload() -> + emqx_ctl:unregister_command(?CMD). + +admins(["status"]) -> status(); + +admins(["skip"]) -> + status(), + Nodes = mria_mnesia:running_nodes(), + lists:foreach(fun emqx_cluster_rpc:skip_failed_commit/1, Nodes), + status(); + +admins(["skip", Node0]) -> + status(), + Node = list_to_existing_atom(Node0), + emqx_cluster_rpc:skip_failed_commit(Node), + status(); + +admins(["tnxid", TnxId0]) -> + TnxId = list_to_integer(TnxId0), + emqx_ctl:print("~p~n", [emqx_cluster_rpc:query(TnxId)]); + +admins(["fast_forward"]) -> + status(), + Nodes = mria_mnesia:running_nodes(), + TnxId = emqx_cluster_rpc:latest_tnx_id(), + lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes), + status(); + +admins(["fast_forward", ToTnxId]) -> + status(), + Nodes = mria_mnesia:running_nodes(), + TnxId = list_to_integer(ToTnxId), + lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes), + status(); + +admins(["fast_forward", Node0, ToTnxId]) -> + status(), + TnxId = list_to_integer(ToTnxId), + Node = list_to_existing_atom(Node0), + emqx_cluster_rpc:fast_forward_to_commit(Node, TnxId), + status(); + +admins(_) -> + emqx_ctl:usage( + [ + {"cluster_call status", "status"}, + {"cluster_call skip [node]", "increase one commit on specific node"}, + {"cluster_call tnxid ", "get detailed about TnxId"}, + {"cluster_call fast_forward [node] [tnx_id]", "fast forwards to tnx_id" } + ]). + +status() -> + emqx_ctl:print("-----------------------------------------------\n"), + {atomic, Status} = emqx_cluster_rpc:status(), + lists:foreach(fun(S) -> + #{ + node := Node, + tnx_id := TnxId, + mfa := {M, F, A}, + created_at := CreatedAt + } = S, + emqx_ctl:print("~p:[~w] CreatedAt:~p ~p:~p/~w\n", + [Node, TnxId, CreatedAt, M, F, length(A)]) + end, Status), + emqx_ctl:print("-----------------------------------------------\n"). diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index 1710b90bd..ad74faf99 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -205,12 +205,13 @@ t_fast_forward_commit(_Config) -> {ok, 3, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), {ok, 4, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), {ok, 5, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + {retry, 6, ok, _} = emqx_cluster_rpc:multicall(M, F, A, 2, 1000), 3 = gen_server:call(?NODE2, {fast_forward_to_commit, 3}, 5000), 4 = gen_server:call(?NODE2, {fast_forward_to_commit, 4}, 5000), - 5 = gen_server:call(?NODE2, {fast_forward_to_commit, 6}, 5000), + 6 = gen_server:call(?NODE2, {fast_forward_to_commit, 7}, 5000), 2 = gen_server:call(?NODE3, {fast_forward_to_commit, 2}, 5000), {atomic, List2} = emqx_cluster_rpc:status(), - ?assertEqual([{Node, 5}, {{Node, ?NODE2}, 5}, {{Node, ?NODE3}, 2}], + ?assertEqual([{Node, 6}, {{Node, ?NODE2}, 6}, {{Node, ?NODE3}, 2}], tnx_ids(List2)), ok. diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index 1605c3382..55c882f94 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -36,6 +36,7 @@ maybe_enable_modules() -> emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(), emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(), emqx_event_message:enable(), + emqx_conf_cli:load(), ok = emqx_rewrite:enable(), emqx_topic_metrics:enable(). @@ -45,4 +46,5 @@ maybe_disable_modules() -> emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(), emqx_event_message:disable(), emqx_rewrite:disable(), + emqx_conf_cli:unload(), emqx_topic_metrics:disable().