refactor(emqx_conf): Decorate cluster RPCs

This commit is contained in:
k32 2022-01-12 16:21:07 +01:00
parent 784ca5bf24
commit 48366a80c8
3 changed files with 49 additions and 32 deletions

View File

@ -27,7 +27,9 @@
-export([get_node_tnx_id/1, latest_tnx_id/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
handle_continue/2, code_change/3]).
handle_continue/2, code_change/3]).
-export_type([txn_id/0, succeed_num/0, multicall_return/0]).
-ifdef(TEST).
-compile(export_all).
@ -42,6 +44,14 @@
-define(CATCH_UP, catch_up).
-define(TIMEOUT, timer:minutes(1)).
-type txn_id() :: pos_integer().
-type succeed_num() :: pos_integer() | all.
-type multicall_return() :: {ok, txn_id(), _Result}
| {error, term()}
| {retry, txn_id(), _Result, node()}.
%%%===================================================================
%%% API
%%%===================================================================
@ -68,27 +78,11 @@ start_link(Node, Name, RetryMs) ->
%% @doc return {ok, TnxId, MFARes} the first MFA result when all MFA run ok.
%% return {error, MFARes} when the first MFA result is no ok or {ok, term()}.
%% return {retry, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok.
-spec multicall(Module, Function, Args) ->
{ok, TnxId, term()} | {error, Reason} | {retry, TnxId, MFARes, node()} when
Module :: module(),
Function :: atom(),
Args :: [term()],
MFARes :: term(),
TnxId :: pos_integer(),
Reason :: string().
-spec multicall(module(), atom(), list()) -> multicall_return().
multicall(M, F, A) ->
multicall(M, F, A, all, timer:minutes(2)).
-spec multicall(Module, Function, Args, SucceedNum, Timeout) ->
{ok, TnxId, MFARes} | {error, Reason} | {retry, TnxId, MFARes, node()} when
Module :: module(),
Function :: atom(),
Args :: [term()],
SucceedNum :: pos_integer() | all,
TnxId :: pos_integer(),
MFARes :: term(),
Timeout :: timeout(),
Reason :: string().
-spec multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return().
multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 ->
MFA = {initiate, {M, F, A}},
Begin = erlang:monotonic_time(),

View File

@ -81,9 +81,8 @@ get_node_and_config(KeyPath) ->
-spec update(emqx_map_lib:config_key_path(), emqx_config:update_request(),
emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts0) ->
Args = [KeyPath, UpdateReq, Opts0],
multicall(emqx, update_config, Args).
update(KeyPath, UpdateReq, Opts) ->
check_cluster_rpc_result(emqx_conf_proto_v1:update(KeyPath, UpdateReq, Opts)).
%% @doc Update the specified node's key path in local-override.conf.
-spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_request(),
@ -97,9 +96,8 @@ update(Node, KeyPath, UpdateReq, Opts) ->
%% @doc remove all value of key path in cluster-override.conf or local-override.conf.
-spec remove(emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove(KeyPath, Opts0) ->
Args = [KeyPath, Opts0],
multicall(emqx, remove_config, Args).
remove(KeyPath, Opts) ->
check_cluster_rpc_result(emqx_conf_proto_v1:remove_config(KeyPath, Opts)).
%% @doc remove the specified node's key path in local-override.conf.
-spec remove(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
@ -112,9 +110,8 @@ remove(Node, KeyPath, Opts) ->
%% @doc reset all value of key path in cluster-override.conf or local-override.conf.
-spec reset(emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset(KeyPath, Opts0) ->
Args = [KeyPath, Opts0],
multicall(emqx, reset_config, Args).
reset(KeyPath, Opts) ->
check_cluster_rpc_result(emqx_conf_proto_v1:reset(KeyPath, Opts)).
%% @doc reset the specified node's key path in local-override.conf.
-spec reset(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
@ -122,7 +119,7 @@ reset(KeyPath, Opts0) ->
reset(Node, KeyPath, Opts) when Node =:= node() ->
emqx:reset_config(KeyPath, Opts#{override_to => local});
reset(Node, KeyPath, Opts) ->
rpc:call(Node, ?MODULE, reset, [KeyPath, Opts]).
emqx_conf_proto_v1:reset(Node, KeyPath, Opts).
-spec gen_doc(file:name_all()) -> ok.
gen_doc(File) ->
@ -138,14 +135,14 @@ gen_doc(File) ->
%% Internal functions
%%--------------------------------------------------------------------
multicall(M, F, Args) ->
case emqx_cluster_rpc:multicall(M, F, Args) of
check_cluster_rpc_result(Result) ->
case Result of
{ok, _TnxId, Res} -> Res;
{retry, TnxId, Res, Nodes} ->
%% The init MFA return ok, but other nodes failed.
%% We return ok and alert an alarm.
?SLOG(error, #{msg => "failed_to_update_config_in_cluster", nodes => Nodes,
tnx_id => TnxId, mfa => {M, F, Args}}),
tnx_id => TnxId}),
Res;
{error, Error} -> %% all MFA return not ok or {ok, term()}.
Error

View File

@ -24,8 +24,13 @@
, get_config/3
, get_all/1
, update/3
, update/4
, remove_config/2
, remove_config/3
, reset/2
, reset/3
]).
-include_lib("emqx/include/bpapi.hrl").
@ -49,6 +54,11 @@ get_config(Node, KeyPath, Default) ->
get_all(KeyPath) ->
rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000).
-spec update(update_config_key_path(), emqx_config:update_request(),
emqx_config:update_opts()) -> emqx_cluster_rpc:multicall_return().
update(KeyPath, UpdateReq, Opts) ->
emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]).
-spec update(node(), update_config_key_path(), emqx_config:update_request(),
emqx_config:update_opts()) ->
{ok, emqx_config:update_result()}
@ -57,9 +67,25 @@ get_all(KeyPath) ->
update(Node, KeyPath, UpdateReq, Opts) ->
rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000).
-spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> _.
remove_config(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]).
-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()}
| {error, emqx_config:update_error()}
| emqx_rpc:badrpc().
remove_config(Node, KeyPath, Opts) ->
rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000).
-spec reset(update_config_key_path(), emqx_config:update_opts()) ->
emqx_cluster_rpc:multicall_return().
reset(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]).
-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()}
| {error, emqx_config:update_error()}
| emqx_rpc:badrpc().
reset(Node, KeyPath, Opts) ->
rpc:call(Node, emqx, reset_config, [KeyPath, Opts]).