refactor(emqx_conf): Extract transactions

This commit is contained in:
ieQu1 2022-08-22 13:41:09 +02:00
parent 6f4d0e2ed5
commit f323e3cb79
3 changed files with 27 additions and 16 deletions

View File

@ -31,10 +31,16 @@
fast_forward_to_commit/2 fast_forward_to_commit/2
]). ]).
-export([ -export([
get_node_tnx_id/1, commit/2,
commit_status_trans/2,
get_cluster_tnx_id/0, get_cluster_tnx_id/0,
get_node_tnx_id/1,
init_mfa/2,
latest_tnx_id/0, latest_tnx_id/0,
make_initiate_call_req/3 make_initiate_call_req/3,
read_next_mfa/1,
trans_query/1,
trans_status/0
]). ]).
-export([ -export([
@ -194,18 +200,18 @@ do_multicall(M, F, A, RequiredSyncs, Timeout) ->
-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
query(TnxId) -> query(TnxId) ->
transaction(fun trans_query/1, [TnxId]). transaction(fun ?MODULE:trans_query/1, [TnxId]).
-spec reset() -> reset. -spec reset() -> reset.
reset() -> gen_server:call(?MODULE, reset). reset() -> gen_server:call(?MODULE, reset).
-spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}. -spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}.
status() -> status() ->
transaction(fun trans_status/0, []). transaction(fun ?MODULE:trans_status/0, []).
-spec latest_tnx_id() -> pos_integer(). -spec latest_tnx_id() -> pos_integer().
latest_tnx_id() -> latest_tnx_id() ->
{atomic, TnxId} = transaction(fun get_cluster_tnx_id/0, []), {atomic, TnxId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []),
TnxId. TnxId.
-spec make_initiate_call_req(module(), atom(), list()) -> init_call_req(). -spec make_initiate_call_req(module(), atom(), list()) -> init_call_req().
@ -280,7 +286,7 @@ handle_call(reset, _From, State) ->
_ = mria:clear_table(?CLUSTER_MFA), _ = mria:clear_table(?CLUSTER_MFA),
{reply, ok, State, {continue, ?CATCH_UP}}; {reply, ok, State, {continue, ?CATCH_UP}};
handle_call(?INITIATE(MFA), _From, State = #{node := Node}) -> handle_call(?INITIATE(MFA), _From, State = #{node := Node}) ->
case transaction(fun init_mfa/2, [Node, MFA]) of case transaction(fun ?MODULE:init_mfa/2, [Node, MFA]) of
{atomic, {ok, TnxId, Result}} -> {atomic, {ok, TnxId, Result}} ->
{reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}}; {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}};
{aborted, Error} -> {aborted, Error} ->
@ -288,7 +294,7 @@ handle_call(?INITIATE(MFA), _From, State = #{node := Node}) ->
end; end;
handle_call(skip_failed_commit, _From, State = #{node := Node}) -> handle_call(skip_failed_commit, _From, State = #{node := Node}) ->
Timeout = catch_up(State, true), Timeout = catch_up(State, true),
{atomic, LatestId} = transaction(fun get_node_tnx_id/1, [Node]), {atomic, LatestId} = transaction(fun ?MODULE:get_node_tnx_id/1, [Node]),
{reply, LatestId, State, Timeout}; {reply, LatestId, State, Timeout};
handle_call({fast_forward_to_commit, ToTnxId}, _From, State) -> handle_call({fast_forward_to_commit, ToTnxId}, _From, State) ->
NodeId = do_fast_forward_to_commit(ToTnxId, State), NodeId = do_fast_forward_to_commit(ToTnxId, State),
@ -316,14 +322,14 @@ code_change(_OldVsn, State, _Extra) ->
catch_up(State) -> catch_up(State, false). catch_up(State) -> catch_up(State, false).
catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
case transaction(fun read_next_mfa/1, [Node]) of case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of
{atomic, caught_up} -> {atomic, caught_up} ->
?TIMEOUT; ?TIMEOUT;
{atomic, {still_lagging, NextId, MFA}} -> {atomic, {still_lagging, NextId, MFA}} ->
{Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE), {Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE),
case Succeed orelse SkipResult of case Succeed orelse SkipResult of
true -> true ->
case transaction(fun commit/2, [Node, NextId]) of case transaction(fun ?MODULE:commit/2, [Node, NextId]) of
{atomic, ok} -> {atomic, ok} ->
catch_up(State, false); catch_up(State, false);
Error -> Error ->
@ -367,12 +373,12 @@ commit(Node, TnxId) ->
ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write). ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write).
do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) -> do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) ->
{atomic, NodeId} = transaction(fun get_node_tnx_id/1, [Node]), {atomic, NodeId} = transaction(fun ?MODULE:get_node_tnx_id/1, [Node]),
case NodeId >= ToTnxId of case NodeId >= ToTnxId of
true -> true ->
NodeId; NodeId;
false -> false ->
{atomic, LatestId} = transaction(fun get_cluster_tnx_id/0, []), {atomic, LatestId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []),
case LatestId =< NodeId of case LatestId =< NodeId of
true -> true ->
NodeId; NodeId;
@ -529,11 +535,11 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
end. end.
lagging_node(TnxId) -> lagging_node(TnxId) ->
{atomic, Nodes} = transaction(fun commit_status_trans/2, ['<', TnxId]), {atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['<', TnxId]),
Nodes. Nodes.
synced_nodes(TnxId) -> synced_nodes(TnxId) ->
{atomic, Nodes} = transaction(fun commit_status_trans/2, ['>=', TnxId]), {atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['>=', TnxId]),
Nodes. Nodes.
commit_status_trans(Operator, TnxId) -> commit_status_trans(Operator, TnxId) ->
@ -547,5 +553,5 @@ get_retry_ms() ->
maybe_init_tnx_id(_Node, TnxId) when TnxId < 0 -> ok; maybe_init_tnx_id(_Node, TnxId) when TnxId < 0 -> ok;
maybe_init_tnx_id(Node, TnxId) -> maybe_init_tnx_id(Node, TnxId) ->
{atomic, _} = transaction(fun commit/2, [Node, TnxId]), {atomic, _} = transaction(fun ?MODULE:commit/2, [Node, TnxId]),
ok. ok.

View File

@ -30,6 +30,11 @@
code_change/3 code_change/3
]). ]).
%% Internal exports (RPC)
-export([
del_stale_mfa/1
]).
start_link() -> start_link() ->
MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100), MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100),
CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5 * 60 * 1000), CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5 * 60 * 1000),
@ -56,7 +61,7 @@ handle_cast(Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) -> handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) ->
case mria:transaction(?CLUSTER_RPC_SHARD, fun del_stale_mfa/1, [MaxHistory]) of case mria:transaction(?CLUSTER_RPC_SHARD, fun ?MODULE:del_stale_mfa/1, [MaxHistory]) of
{atomic, ok} -> ok; {atomic, ok} -> ok;
Error -> ?SLOG(error, #{msg => "del_stale_cluster_rpc_mfa_error", error => Error}) Error -> ?SLOG(error, #{msg => "del_stale_cluster_rpc_mfa_error", error => Error})
end, end,

View File

@ -1,6 +1,6 @@
{application, emqx_conf, [ {application, emqx_conf, [
{description, "EMQX configuration management"}, {description, "EMQX configuration management"},
{vsn, "0.1.2"}, {vsn, "0.1.3"},
{registered, []}, {registered, []},
{mod, {emqx_conf_app, []}}, {mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib]}, {applications, [kernel, stdlib]},