fix(conf): emqx_conf return nest error

This commit is contained in:
zhongwencool 2021-12-01 16:08:49 +08:00
parent 25c4f4aa4e
commit c1a7d7bede
4 changed files with 83 additions and 16 deletions

View File

@ -60,21 +60,28 @@ start_link() ->
start_link(Node, Name, RetryMs) -> start_link(Node, Name, RetryMs) ->
gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []).
-spec multicall(Module, Function, Args) -> {ok, TnxId, term()} | {error, Reason} when %% @doc return {ok, TnxId, MFARes} the first MFA result when all MFA run ok.
%% return {error, MFARes} when the first MFA result is no ok or {ok, term()}.
%% return {retry, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok.
-spec multicall(Module, Function, Args) ->
{ok, TnxId, term()} | {error, Reason} | {retry, TnxId, MFARes, node()} when
Module :: module(), Module :: module(),
Function :: atom(), Function :: atom(),
Args :: [term()], Args :: [term()],
MFARes :: term(),
TnxId :: pos_integer(), TnxId :: pos_integer(),
Reason :: string(). Reason :: string().
multicall(M, F, A) -> multicall(M, F, A) ->
multicall(M, F, A, all, timer:minutes(2)). multicall(M, F, A, all, timer:minutes(2)).
-spec multicall(Module, Function, Args, SucceedNum, Timeout) -> {ok, TnxId, term()} |{error, Reason} when -spec multicall(Module, Function, Args, SucceedNum, Timeout) ->
{ok, TnxId, MFARes} | {error, Reason} | {retry, TnxId, MFARes, node()} when
Module :: module(), Module :: module(),
Function :: atom(), Function :: atom(),
Args :: [term()], Args :: [term()],
SucceedNum :: pos_integer() | all, SucceedNum :: pos_integer() | all,
TnxId :: pos_integer(), TnxId :: pos_integer(),
MFARes :: term(),
Timeout :: timeout(), Timeout :: timeout(),
Reason :: string(). Reason :: string().
multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 -> multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 ->
@ -108,7 +115,10 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu
end, end,
case OkOrFailed of case OkOrFailed of
ok -> InitRes; ok -> InitRes;
_ -> OkOrFailed {error, Error0} -> {error, Error0};
{retry, Node0} ->
{ok, TnxId0, MFARes} = InitRes,
{retry, TnxId0, MFARes, Node0}
end. end.
-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
@ -136,6 +146,13 @@ get_node_tnx_id(Node) ->
skip_failed_commit(Node) -> skip_failed_commit(Node) ->
gen_server:call({?MODULE, Node}, skip_failed_commit). gen_server:call({?MODULE, Node}, skip_failed_commit).
%% Regardless of what MFA is returned, consider it a success),
%% then skip the specified TnxId.
%% If CurrTnxId >= TnxId, nothing happened.
%% If CurrTnxId < TnxId, the CurrTnxId will skip to TnxId.
-spec fast_forward_to_commit(node(), pos_integer()) -> pos_integer().
fast_forward_to_commit(Node, ToTnxId) ->
gen_server:call({?MODULE, Node}, {fast_forward_to_commit, ToTnxId}).
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%=================================================================== %%%===================================================================
@ -165,8 +182,13 @@ handle_call({initiate, MFA}, _From, State = #{node := Node}) ->
{aborted, Reason} -> {aborted, Reason} ->
{reply, {error, Reason}, State, {continue, ?CATCH_UP}} {reply, {error, Reason}, State, {continue, ?CATCH_UP}}
end; end;
handle_call(skip_failed_commit, _From, State) -> handle_call(skip_failed_commit, _From, State = #{node := Node}) ->
{reply, ok, State, catch_up(State, true)}; Timeout = catch_up(State, true),
{atomic, LatestId} = transaction(fun get_node_tnx_id/1, [Node]),
{reply, LatestId, State, Timeout};
handle_call({fast_forward_to_commit, ToTnxId}, _From, State) ->
NodeId = do_fast_forward_to_commit(ToTnxId, State),
{reply, NodeId, State, catch_up(State)};
handle_call(_, _From, State) -> handle_call(_, _From, State) ->
{reply, ok, State, catch_up(State)}. {reply, ok, State, catch_up(State)}.
@ -258,6 +280,20 @@ do_catch_up(ToTnxId, Node) ->
commit(Node, TnxId) -> commit(Node, TnxId) ->
ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write). ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write).
do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) ->
{atomic, NodeId} = transaction(fun get_node_tnx_id/1, [Node]),
case NodeId >= ToTnxId of
true -> NodeId;
false ->
{atomic, LatestId} = transaction(fun get_latest_id/0, []),
case LatestId =< NodeId of
true -> NodeId;
false ->
catch_up(State, true),
do_fast_forward_to_commit(ToTnxId, State)
end
end.
get_latest_id() -> get_latest_id() ->
case mnesia:last(?CLUSTER_MFA) of case mnesia:last(?CLUSTER_MFA) of
'$end_of_table' -> 0; '$end_of_table' -> 0;
@ -269,7 +305,8 @@ init_mfa(Node, MFA) ->
LatestId = get_latest_id(), LatestId = get_latest_id(),
ok = do_catch_up_in_one_trans(LatestId, Node), ok = do_catch_up_in_one_trans(LatestId, Node),
TnxId = LatestId + 1, TnxId = LatestId + 1,
MFARec = #cluster_rpc_mfa{tnx_id = TnxId, mfa = MFA, initiator = Node, created_at = erlang:localtime()}, MFARec = #cluster_rpc_mfa{tnx_id = TnxId, mfa = MFA,
initiator = Node, created_at = erlang:localtime()},
ok = mnesia:write(?CLUSTER_MFA, MFARec, write), ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
ok = commit(Node, TnxId), ok = commit(Node, TnxId),
case apply_mfa(TnxId, MFA) of case apply_mfa(TnxId, MFA) of
@ -344,7 +381,7 @@ wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
ok = timer:sleep(Delay), ok = timer:sleep(Delay),
wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay); wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay);
[] -> ok; [] -> ok;
Nodes -> {error, Nodes} Nodes -> {retry, Nodes}
end. end.
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) -> wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) ->
@ -356,7 +393,7 @@ wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) ->
false -> false ->
case lagging_node(TnxId) of case lagging_node(TnxId) of
[] -> ok; %% All commit but The succeedNum > length(nodes()). [] -> ok; %% All commit but The succeedNum > length(nodes()).
Nodes -> {error, Nodes} Nodes -> {retry, Nodes}
end end
end. end.

View File

@ -16,6 +16,7 @@
-module(emqx_conf). -module(emqx_conf).
-compile({no_auto_import, [get/1, get/2]}). -compile({no_auto_import, [get/1, get/2]}).
-include_lib("emqx/include/logger.hrl").
-export([add_handler/2, remove_handler/1]). -export([add_handler/2, remove_handler/1]).
-export([get/1, get/2, get_raw/2, get_all/1]). -export([get/1, get/2, get_raw/2, get_all/1]).
@ -123,13 +124,18 @@ reset(Node, KeyPath, Opts) ->
rpc:call(Node, ?MODULE, reset, [KeyPath, Opts]). rpc:call(Node, ?MODULE, reset, [KeyPath, Opts]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal funcs %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
multicall(M, F, Args) -> multicall(M, F, Args) ->
case emqx_cluster_rpc:multicall(M, F, Args) of case emqx_cluster_rpc:multicall(M, F, Args) of
{ok, _TnxId, Res} -> {ok, _TnxId, Res} -> Res;
{retry, TnxId, Res, Nodes} ->
%% The init MFA return ok, but other nodes failed.
%% We return ok and alert an alarm.
?SLOG(error, #{msg => "failed to update config in cluster", nodes => Nodes,
tnx_id => TnxId, mfa => {M, F, Args}}),
Res; Res;
{error, Reason} -> {error, Error} -> %% all MFA return not ok or {ok, term()}.
{error, Reason} Error
end. end.

View File

@ -33,7 +33,8 @@ all() -> [
t_commit_ok_but_apply_fail_on_other_node, t_commit_ok_but_apply_fail_on_other_node,
t_commit_ok_apply_fail_on_other_node_then_recover, t_commit_ok_apply_fail_on_other_node_then_recover,
t_del_stale_mfa, t_del_stale_mfa,
t_skip_failed_commit t_skip_failed_commit,
t_fast_forward_commit
]. ].
suite() -> [{timetrap, {minutes, 3}}]. suite() -> [{timetrap, {minutes, 3}}].
groups() -> []. groups() -> [].
@ -183,13 +184,36 @@ t_skip_failed_commit(_Config) ->
?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}], ?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
tnx_ids(List1)), tnx_ids(List1)),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), {ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
ok = gen_server:call(?NODE2, skip_failed_commit, 5000), 2 = gen_server:call(?NODE2, skip_failed_commit, 5000),
{atomic, List2} = emqx_cluster_rpc:status(), {atomic, List2} = emqx_cluster_rpc:status(),
?assertEqual([{Node, 2}, {{Node, ?NODE2}, 2}, {{Node, ?NODE3}, 1}], ?assertEqual([{Node, 2}, {{Node, ?NODE2}, 2}, {{Node, ?NODE3}, 1}],
tnx_ids(List2)), tnx_ids(List2)),
ok. ok.
t_fast_forward_commit(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000),
ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
tnx_ids(List1)),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 3, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 4, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 5, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
3 = gen_server:call(?NODE2, {fast_forward_to_commit, 3}, 5000),
4 = gen_server:call(?NODE2, {fast_forward_to_commit, 4}, 5000),
5 = gen_server:call(?NODE2, {fast_forward_to_commit, 6}, 5000),
2 = gen_server:call(?NODE3, {fast_forward_to_commit, 2}, 5000),
{atomic, List2} = emqx_cluster_rpc:status(),
?assertEqual([{Node, 5}, {{Node, ?NODE2}, 5}, {{Node, ?NODE3}, 2}],
tnx_ids(List2)),
ok.
tnx_ids(Status) -> tnx_ids(Status) ->
lists:sort(lists:map(fun(#{tnx_id := TnxId, node := Node}) -> lists:sort(lists:map(fun(#{tnx_id := TnxId, node := Node}) ->
{Node, TnxId} end, Status)). {Node, TnxId} end, Status)).

View File

@ -232,7 +232,7 @@ update(Req) ->
res(emqx_conf:update([gateway], Req, #{override_to => cluster})). res(emqx_conf:update([gateway], Req, #{override_to => cluster})).
res({ok, _Result}) -> ok; res({ok, _Result}) -> ok;
res({error, {error, {pre_config_update,emqx_gateway_conf,Reason}}}) -> {error, Reason}; res({error, {pre_config_update, emqx_gateway_conf, Reason}}) -> {error, Reason};
res({error, Reason}) -> {error, Reason}. res({error, Reason}) -> {error, Reason}.
bin({LType, LName}) -> bin({LType, LName}) ->