diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 6353a4efa..ddc4eccc5 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -31,10 +31,16 @@ fast_forward_to_commit/2 ]). -export([ - get_node_tnx_id/1, + commit/2, + commit_status_trans/2, get_cluster_tnx_id/0, + get_node_tnx_id/1, + init_mfa/2, latest_tnx_id/0, - make_initiate_call_req/3 + make_initiate_call_req/3, + read_next_mfa/1, + trans_query/1, + trans_status/0 ]). -export([ @@ -194,18 +200,18 @@ do_multicall(M, F, A, RequiredSyncs, Timeout) -> -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. query(TnxId) -> - transaction(fun trans_query/1, [TnxId]). + transaction(fun ?MODULE:trans_query/1, [TnxId]). -spec reset() -> reset. reset() -> gen_server:call(?MODULE, reset). -spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}. status() -> - transaction(fun trans_status/0, []). + transaction(fun ?MODULE:trans_status/0, []). -spec latest_tnx_id() -> pos_integer(). latest_tnx_id() -> - {atomic, TnxId} = transaction(fun get_cluster_tnx_id/0, []), + {atomic, TnxId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []), TnxId. -spec make_initiate_call_req(module(), atom(), list()) -> init_call_req(). @@ -280,7 +286,7 @@ handle_call(reset, _From, State) -> _ = mria:clear_table(?CLUSTER_MFA), {reply, ok, State, {continue, ?CATCH_UP}}; handle_call(?INITIATE(MFA), _From, State = #{node := Node}) -> - case transaction(fun init_mfa/2, [Node, MFA]) of + case transaction(fun ?MODULE:init_mfa/2, [Node, MFA]) of {atomic, {ok, TnxId, Result}} -> {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}}; {aborted, Error} -> @@ -288,7 +294,7 @@ handle_call(?INITIATE(MFA), _From, State = #{node := Node}) -> end; handle_call(skip_failed_commit, _From, State = #{node := Node}) -> Timeout = catch_up(State, true), - {atomic, LatestId} = transaction(fun get_node_tnx_id/1, [Node]), + {atomic, LatestId} = transaction(fun ?MODULE:get_node_tnx_id/1, [Node]), {reply, LatestId, State, Timeout}; handle_call({fast_forward_to_commit, ToTnxId}, _From, State) -> NodeId = do_fast_forward_to_commit(ToTnxId, State), @@ -316,14 +322,14 @@ code_change(_OldVsn, State, _Extra) -> catch_up(State) -> catch_up(State, false). catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> - case transaction(fun read_next_mfa/1, [Node]) of + case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of {atomic, caught_up} -> ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> {Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE), case Succeed orelse SkipResult of true -> - case transaction(fun commit/2, [Node, NextId]) of + case transaction(fun ?MODULE:commit/2, [Node, NextId]) of {atomic, ok} -> catch_up(State, false); Error -> @@ -367,12 +373,12 @@ commit(Node, TnxId) -> ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write). do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) -> - {atomic, NodeId} = transaction(fun get_node_tnx_id/1, [Node]), + {atomic, NodeId} = transaction(fun ?MODULE:get_node_tnx_id/1, [Node]), case NodeId >= ToTnxId of true -> NodeId; false -> - {atomic, LatestId} = transaction(fun get_cluster_tnx_id/0, []), + {atomic, LatestId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []), case LatestId =< NodeId of true -> NodeId; @@ -529,11 +535,11 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) -> end. lagging_node(TnxId) -> - {atomic, Nodes} = transaction(fun commit_status_trans/2, ['<', TnxId]), + {atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['<', TnxId]), Nodes. synced_nodes(TnxId) -> - {atomic, Nodes} = transaction(fun commit_status_trans/2, ['>=', TnxId]), + {atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['>=', TnxId]), Nodes. commit_status_trans(Operator, TnxId) -> @@ -547,5 +553,5 @@ get_retry_ms() -> maybe_init_tnx_id(_Node, TnxId) when TnxId < 0 -> ok; maybe_init_tnx_id(Node, TnxId) -> - {atomic, _} = transaction(fun commit/2, [Node, TnxId]), + {atomic, _} = transaction(fun ?MODULE:commit/2, [Node, TnxId]), ok. diff --git a/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl b/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl index 40df5a02c..7f7c7f77f 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl @@ -30,6 +30,11 @@ code_change/3 ]). +%% Internal exports (RPC) +-export([ + del_stale_mfa/1 +]). + start_link() -> MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100), CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5 * 60 * 1000), @@ -56,7 +61,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) -> - case mria:transaction(?CLUSTER_RPC_SHARD, fun del_stale_mfa/1, [MaxHistory]) of + case mria:transaction(?CLUSTER_RPC_SHARD, fun ?MODULE:del_stale_mfa/1, [MaxHistory]) of {atomic, ok} -> ok; Error -> ?SLOG(error, #{msg => "del_stale_cluster_rpc_mfa_error", error => Error}) end, diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index 1441a4180..854fdac07 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib]},