diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index d6a5e6044..2478a2540 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -32,6 +32,7 @@ ]). -export([ get_node_tnx_id/1, + get_cluster_tnx_id/0, latest_tnx_id/0, make_initiate_call_req/3 ]). @@ -60,9 +61,11 @@ -include_lib("emqx/include/logger.hrl"). -include("emqx_conf.hrl"). --define(INITIATE(MFA, LatestIdLastSeen), {initiate, MFA, LatestIdLastSeen}). +-define(INITIATE(MFA), {initiate, MFA}). -define(CATCH_UP, catch_up). -define(TIMEOUT, timer:minutes(1)). +-define(APPLY_KIND_REPLICATE, replicate). +-define(APPLY_KIND_INITIATE, initiate). -type tnx_id() :: pos_integer(). @@ -74,7 +77,7 @@ | {peers_lagging, tnx_id(), Result, [node()]}. -type multicall_return() :: multicall_return(_). --type init_call_req() :: ?INITIATE({module(), atom(), list()}, tnx_id()). +-type init_call_req() :: ?INITIATE({module(), atom(), list()}). %%%=================================================================== %%% API @@ -146,25 +149,11 @@ multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse Req %% return {init_failure, Error} when the initial MFA result is no ok or {ok, term()}. %% return {peers_lagging, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok. -spec do_multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return(). -do_multicall(M, F, A, RequiredSyncs, Timeout) when - RequiredSyncs =:= all orelse RequiredSyncs >= 1 --> - %% Idealy 'LatestId' should be provided by the multicall originator, - %% which is the viewer of the state e.g. - %% * Sysadmin who issues CLI-commands or REST-API calls to make config changes - %% * Dashboard viewer who is making decision based on what they can see from the UI - %% To reach the ideal state, it would require adding transaction ID to each and - %% every view/GET requests and also provide the ID as a part of the view/GET responses. - %% - %% To keep things simple, we try to get the 'old' view when a multicall request - %% is received as early as possible. - %% - %% Reason to do this: - %% The 'initiate' call handler tries to take a table lock (cluster-wide) before - %% bumping the transaction ID. While waiting for the lock, the ID might have been - %% bumpped by another node in the cluster. - InitReq = make_initiate_call_req(M, F, A), +do_multicall(M, F, A, RequiredSyncs, Timeout) -> + %% assert + true = (RequiredSyncs =:= all orelse RequiredSyncs >= 1), Begin = erlang:monotonic_time(), + InitReq = make_initiate_call_req(M, F, A), InitRes = case mria_rlog:role() of core -> @@ -216,13 +205,12 @@ status() -> -spec latest_tnx_id() -> pos_integer(). latest_tnx_id() -> - {atomic, TnxId} = transaction(fun get_latest_id/0, []), + {atomic, TnxId} = transaction(fun get_cluster_tnx_id/0, []), TnxId. -spec make_initiate_call_req(module(), atom(), list()) -> init_call_req(). make_initiate_call_req(M, F, A) -> - TnxId = get_latest_id(dirty), - ?INITIATE({M, F, A}, TnxId). + ?INITIATE({M, F, A}). -spec get_node_tnx_id(node()) -> integer(). get_node_tnx_id(Node) -> @@ -289,8 +277,8 @@ handle_call(reset, _From, State) -> _ = mria:clear_table(?CLUSTER_COMMIT), _ = mria:clear_table(?CLUSTER_MFA), {reply, ok, State, {continue, ?CATCH_UP}}; -handle_call(?INITIATE(MFA, LatestId), _From, State = #{node := Node}) -> - case transaction(fun init_mfa/3, [Node, MFA, LatestId]) of +handle_call(?INITIATE(MFA), _From, State = #{node := Node}) -> + case transaction(fun init_mfa/2, [Node, MFA]) of {atomic, {ok, TnxId, Result}} -> {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}}; {aborted, Error} -> @@ -330,7 +318,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> {atomic, caught_up} -> ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> - {Succeed, _} = apply_mfa(NextId, MFA, catch_up), + {Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE), case Succeed orelse SkipResult of true -> case transaction(fun commit/2, [Node, NextId]) of @@ -356,7 +344,7 @@ read_next_mfa(Node) -> NextId = case mnesia:wread({?CLUSTER_COMMIT, Node}) of [] -> - LatestId = get_latest_id(), + LatestId = get_cluster_tnx_id(), TnxId = max(LatestId - 1, 0), commit(Node, TnxId), ?SLOG(notice, #{ @@ -382,7 +370,7 @@ do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) -> true -> NodeId; false -> - {atomic, LatestId} = transaction(fun get_latest_id/0, []), + {atomic, LatestId} = transaction(fun get_cluster_tnx_id/0, []), case LatestId =< NodeId of true -> NodeId; @@ -392,24 +380,17 @@ do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) -> end end. -get_latest_id() -> - get_latest_id(tnx). - -get_latest_id(IsolationLevel) -> - F = - case IsolationLevel of - tnx -> fun mnesia:last/1; - dirty -> fun mnesia:dirty_last/1 - end, - case F(?CLUSTER_MFA) of +get_cluster_tnx_id() -> + case mnesia:last(?CLUSTER_MFA) of '$end_of_table' -> 0; Id -> Id end. -init_mfa(Node, MFA, LatestIdLastSeen) -> +init_mfa(Node, MFA) -> mnesia:write_lock_table(?CLUSTER_MFA), - LatestId = get_latest_id(), - case LatestIdLastSeen =:= LatestId of + LatestId = get_cluster_tnx_id(), + MyTnxId = get_node_tnx_id(node()), + case MyTnxId =:= LatestId of true -> TnxId = LatestId + 1, MFARec = #cluster_rpc_mfa{ @@ -420,17 +401,21 @@ init_mfa(Node, MFA, LatestIdLastSeen) -> }, ok = mnesia:write(?CLUSTER_MFA, MFARec, write), ok = commit(Node, TnxId), - case apply_mfa(TnxId, MFA, init) of + case apply_mfa(TnxId, MFA, ?APPLY_KIND_INITIATE) of {true, Result} -> {ok, TnxId, Result}; {false, Error} -> mnesia:abort(Error) end; false -> - ?SLOG(error, #{ + %% refuse to initiate cluster call from this node + %% because it's likely that the caller is based on + %% a stale view. + Reason = #{ msg => stale_view_of_cluster_state, - tnx_id => LatestId, - last_seen_tnx_id => LatestIdLastSeen - }), - mnesia:abort({error, stale_view_of_cluster_state}) + cluster_tnx_id => LatestId, + node_tnx_id => MyTnxId + }, + ?SLOG(warning, Reason), + mnesia:abort({error, Reason}) end. transaction(Func, Args) -> @@ -495,6 +480,15 @@ is_success(ok) -> true; is_success({ok, _}) -> true; is_success(_) -> false. +log_and_alarm(IsSuccess, Res, #{kind := ?APPLY_KIND_INITIATE} = Meta) -> + %% no alarm or error log in case of failure at originating a new cluster-call + %% because nothing is committed + case IsSuccess of + true -> + ?SLOG(debug, Meta#{msg => "cluster_rpc_apply_result", result => Res}); + false -> + ?SLOG(warning, Meta#{msg => "cluster_rpc_apply_result", result => Res}) + end; log_and_alarm(true, Res, Meta) -> ?SLOG(debug, Meta#{msg => "cluster_rpc_apply_ok", result => Res}), do_alarm(deactivate, Res, Meta);