diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 58042e34e..40957bd3c 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -27,7 +27,9 @@ -export([get_node_tnx_id/1, latest_tnx_id/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - handle_continue/2, code_change/3]). + handle_continue/2, code_change/3]). + +-export_type([txn_id/0, succeed_num/0, multicall_return/0]). -ifdef(TEST). -compile(export_all). @@ -42,6 +44,14 @@ -define(CATCH_UP, catch_up). -define(TIMEOUT, timer:minutes(1)). +-type txn_id() :: pos_integer(). + +-type succeed_num() :: pos_integer() | all. + +-type multicall_return() :: {ok, txn_id(), _Result} + | {error, term()} + | {retry, txn_id(), _Result, node()}. + %%%=================================================================== %%% API %%%=================================================================== @@ -68,27 +78,11 @@ start_link(Node, Name, RetryMs) -> %% @doc return {ok, TnxId, MFARes} the first MFA result when all MFA run ok. %% return {error, MFARes} when the first MFA result is no ok or {ok, term()}. %% return {retry, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok. --spec multicall(Module, Function, Args) -> - {ok, TnxId, term()} | {error, Reason} | {retry, TnxId, MFARes, node()} when - Module :: module(), - Function :: atom(), - Args :: [term()], - MFARes :: term(), - TnxId :: pos_integer(), - Reason :: string(). +-spec multicall(module(), atom(), list()) -> multicall_return(). multicall(M, F, A) -> multicall(M, F, A, all, timer:minutes(2)). --spec multicall(Module, Function, Args, SucceedNum, Timeout) -> - {ok, TnxId, MFARes} | {error, Reason} | {retry, TnxId, MFARes, node()} when - Module :: module(), - Function :: atom(), - Args :: [term()], - SucceedNum :: pos_integer() | all, - TnxId :: pos_integer(), - MFARes :: term(), - Timeout :: timeout(), - Reason :: string(). +-spec multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return(). multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 -> MFA = {initiate, {M, F, A}}, Begin = erlang:monotonic_time(), diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 26da6ac14..3100833f0 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -81,9 +81,8 @@ get_node_and_config(KeyPath) -> -spec update(emqx_map_lib:config_key_path(), emqx_config:update_request(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -update(KeyPath, UpdateReq, Opts0) -> - Args = [KeyPath, UpdateReq, Opts0], - multicall(emqx, update_config, Args). +update(KeyPath, UpdateReq, Opts) -> + check_cluster_rpc_result(emqx_conf_proto_v1:update(KeyPath, UpdateReq, Opts)). %% @doc Update the specified node's key path in local-override.conf. -spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_request(), @@ -97,9 +96,8 @@ update(Node, KeyPath, UpdateReq, Opts) -> %% @doc remove all value of key path in cluster-override.conf or local-override.conf. -spec remove(emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -remove(KeyPath, Opts0) -> - Args = [KeyPath, Opts0], - multicall(emqx, remove_config, Args). +remove(KeyPath, Opts) -> + check_cluster_rpc_result(emqx_conf_proto_v1:remove_config(KeyPath, Opts)). %% @doc remove the specified node's key path in local-override.conf. -spec remove(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> @@ -112,9 +110,8 @@ remove(Node, KeyPath, Opts) -> %% @doc reset all value of key path in cluster-override.conf or local-override.conf. -spec reset(emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -reset(KeyPath, Opts0) -> - Args = [KeyPath, Opts0], - multicall(emqx, reset_config, Args). +reset(KeyPath, Opts) -> + check_cluster_rpc_result(emqx_conf_proto_v1:reset(KeyPath, Opts)). %% @doc reset the specified node's key path in local-override.conf. -spec reset(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> @@ -122,7 +119,7 @@ reset(KeyPath, Opts0) -> reset(Node, KeyPath, Opts) when Node =:= node() -> emqx:reset_config(KeyPath, Opts#{override_to => local}); reset(Node, KeyPath, Opts) -> - rpc:call(Node, ?MODULE, reset, [KeyPath, Opts]). + emqx_conf_proto_v1:reset(Node, KeyPath, Opts). -spec gen_doc(file:name_all()) -> ok. gen_doc(File) -> @@ -138,14 +135,14 @@ gen_doc(File) -> %% Internal functions %%-------------------------------------------------------------------- -multicall(M, F, Args) -> - case emqx_cluster_rpc:multicall(M, F, Args) of +check_cluster_rpc_result(Result) -> + case Result of {ok, _TnxId, Res} -> Res; {retry, TnxId, Res, Nodes} -> %% The init MFA return ok, but other nodes failed. %% We return ok and alert an alarm. ?SLOG(error, #{msg => "failed_to_update_config_in_cluster", nodes => Nodes, - tnx_id => TnxId, mfa => {M, F, Args}}), + tnx_id => TnxId}), Res; {error, Error} -> %% all MFA return not ok or {ok, term()}. Error diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl index 9e92c8c76..1aa6973c3 100644 --- a/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl @@ -24,8 +24,13 @@ , get_config/3 , get_all/1 + , update/3 , update/4 + , remove_config/2 , remove_config/3 + + , reset/2 + , reset/3 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -49,6 +54,11 @@ get_config(Node, KeyPath, Default) -> get_all(KeyPath) -> rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000). +-spec update(update_config_key_path(), emqx_config:update_request(), + emqx_config:update_opts()) -> emqx_cluster_rpc:multicall_return(). +update(KeyPath, UpdateReq, Opts) -> + emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]). + -spec update(node(), update_config_key_path(), emqx_config:update_request(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} @@ -57,9 +67,25 @@ get_all(KeyPath) -> update(Node, KeyPath, UpdateReq, Opts) -> rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000). +-spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> _. +remove_config(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]). + -spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()} | emqx_rpc:badrpc(). remove_config(Node, KeyPath, Opts) -> rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000). + +-spec reset(update_config_key_path(), emqx_config:update_opts()) -> + emqx_cluster_rpc:multicall_return(). +reset(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]). + +-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +reset(Node, KeyPath, Opts) -> + rpc:call(Node, emqx, reset_config, [KeyPath, Opts]).