fix(emqx_cluster_rpc): fail fast on stale state

Due to:

* Cluster RPC MFA is not idempotent!
* There is a lack of rollback for callback's side-effects

For instance, when two nodes try to add a cluster-singleton
concurrently, one of them will have to wait for the table lock
then try to catch-up, then try to apply MFA.
The catch-up will have the singleton created, but the initiated
initiated multicall apply will fail causing the commit to rollback,
but not to 'undo' the singleton creation.
Later, the retries will fail indefinitely.
This commit is contained in:
Zaiming (Stone) Shi 2022-06-11 20:47:46 +02:00
parent b92708726a
commit 2065be569e
18 changed files with 202 additions and 209 deletions

View File

@ -233,7 +233,8 @@ put(Config) ->
erase(RootName) ->
persistent_term:erase(?PERSIS_KEY(?CONF, bin(RootName))),
persistent_term:erase(?PERSIS_KEY(?RAW_CONF, bin(RootName))).
persistent_term:erase(?PERSIS_KEY(?RAW_CONF, bin(RootName))),
ok.
-spec put(emqx_map_lib:config_key_path(), term()) -> ok.
put(KeyPath, Config) ->

View File

@ -163,8 +163,7 @@ get_counters(Name, Id) ->
reset_counters(Name, Id) ->
Indexes = maps:values(get_indexes(Name, Id)),
Ref = get_ref(Name, Id),
[counters:put(Ref, Idx, 0) || Idx <- Indexes],
ok.
lists:foreach(fun(Idx) -> counters:put(Ref, Idx, 0) end, Indexes).
-spec get_metrics(handler_name(), metric_id()) -> metrics().
get_metrics(Name, Id) ->

View File

@ -53,8 +53,7 @@
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
%% List of known functions also known to do RPC:
-define(RPC_FUNCTIONS,
"emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5, "
"emqx_plugin_libs_rule:cluster_call/3"
"emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5"
).
%% List of functions in the RPC backend modules that we can ignore:
@ -63,11 +62,9 @@
%% List of business-layer functions that are exempt from the checks:
%% erlfmt-ignore
-define(EXEMPTIONS,
"emqx_mgmt_api:do_query/6," % Reason: legacy code. A fun and a QC query are
% passed in the args, it's futile to try to statically
% check it
"emqx_plugin_libs_rule:cluster_call/3" % Reason: some sort of external plugin API that we
% don't want to break?
"emqx_mgmt_api:do_query/6" % Reason: legacy code. A fun and a QC query are
% passed in the args, it's futile to try to statically
% check it
).
-define(XREF, myxref).

View File

@ -30,7 +30,11 @@
skip_failed_commit/1,
fast_forward_to_commit/2
]).
-export([get_node_tnx_id/1, latest_tnx_id/0]).
-export([
get_node_tnx_id/1,
latest_tnx_id/0,
make_initiate_call_req/3
]).
-export([
init/1,
@ -44,7 +48,7 @@
-export([get_tables_status/0]).
-export_type([txn_id/0, succeed_num/0, multicall_return/1, multicall_return/0]).
-export_type([tnx_id/0, succeed_num/0]).
-ifdef(TEST).
-compile(export_all).
@ -56,19 +60,21 @@
-include_lib("emqx/include/logger.hrl").
-include("emqx_conf.hrl").
-define(INITIATE(MFA, LatestIdLastSeen), {initiate, MFA, LatestIdLastSeen}).
-define(CATCH_UP, catch_up).
-define(TIMEOUT, timer:minutes(1)).
-type txn_id() :: pos_integer().
-type tnx_id() :: pos_integer().
-type succeed_num() :: pos_integer() | all.
-type multicall_return(Result) ::
{ok, txn_id(), Result}
| {error, term()}
| {retry, txn_id(), Result, node()}.
{ok, tnx_id(), Result}
| {init_failure, term()}
| {peers_lagging, tnx_id(), Result, [node()]}.
-type multicall_return() :: multicall_return(_).
-type init_call_req() :: ?INITIATE({module(), atom(), list()}, tnx_id()).
%%%===================================================================
%%% API
@ -102,27 +108,73 @@ start_link(Node, Name, RetryMs) ->
{error, Reason}
end.
%% @doc return {ok, TnxId, MFARes} the first MFA result when all MFA run ok.
%% return {error, MFARes} when the first MFA result is no ok or {ok, term()}.
%% return {retry, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok.
-spec multicall(module(), atom(), list()) -> multicall_return().
%% @doc Initiate a local call (or core node),
%% then async-ly replicate the call to peer nodes in the cluster.
%% The evaluation result of the provided MFA is returned,
%% the result is expected to be `ok | {ok, _}' to indicate success,
%% and `{error, _}' to indicate failure.
%%
%% The excpetion of the MFA evaluation is captured and translated
%% into an `{error, _}' tuple.
%% This call tries to wait for all peer nodes to be in-sync before
%% returning the result.
%%
%% In case of partial success, an `error' level log is emitted
%% but the initial localy apply result is returned.
-spec multicall(module(), atom(), list()) -> term().
multicall(M, F, A) ->
multicall(M, F, A, all, timer:minutes(2)).
-spec multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return().
multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 ->
MFA = {initiate, {M, F, A}},
-spec multicall(module(), atom(), list(), succeed_num(), timeout()) -> term().
multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse RequiredSyncs >= 1 ->
case do_multicall(M, F, A, RequiredSyncs, Timeout) of
{ok, _TxnId, Result} ->
Result;
{init_failure, Error} ->
Error;
{peers_lagging, TnxId, Res, Nodes} ->
%% The init MFA return ok, but some other nodes failed.
?SLOG(error, #{
msg => "cluster_rpc_peers_lagging",
lagging_nodes => Nodes,
tnx_id => TnxId
}),
Res
end.
%% Return {ok, TnxId, MFARes} the first MFA result when all MFA run ok.
%% return {init_failure, Error} when the initial MFA result is no ok or {ok, term()}.
%% return {peers_lagging, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok.
-spec do_multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return().
do_multicall(M, F, A, RequiredSyncs, Timeout) when
RequiredSyncs =:= all orelse RequiredSyncs >= 1
->
%% Idealy 'LatestId' should be provided by the multicall originator,
%% which is the viewer of the state e.g.
%% * Sysadmin who issues CLI-commands or REST-API calls to make config changes
%% * Dashboard viewer who is making decision based on what they can see from the UI
%% To reach the ideal state, it would require adding transaction ID to each and
%% every view/GET requests and also provide the ID as a part of the view/GET responses.
%%
%% To keep things simple, we try to get the 'old' view when a multicall request
%% is received as early as possible.
%%
%% Reason to do this:
%% The 'initiate' call handler tries to take a table lock (cluster-wide) before
%% bumping the transaction ID. While waiting for the lock, the ID might have been
%% bumpped by another node in the cluster.
InitReq = make_initiate_call_req(M, F, A),
Begin = erlang:monotonic_time(),
InitRes =
case mria_rlog:role() of
core ->
gen_server:call(?MODULE, MFA, Timeout);
gen_server:call(?MODULE, InitReq, Timeout);
replicant ->
%% the initiate transaction must happened on core node
%% make sure MFA(in the transaction) and the transaction on the same node
%% don't need rpc again inside transaction.
case mria_status:upstream_node(?CLUSTER_RPC_SHARD) of
{ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout);
{ok, Node} -> gen_server:call({?MODULE, Node}, InitReq, Timeout);
disconnected -> {error, disconnected}
end
end,
@ -132,23 +184,23 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu
RetryTimeout = ceil(3 * max(MinDelay, get_retry_ms())),
OkOrFailed =
case InitRes of
{ok, _TnxId, _} when RequireNum =:= 1 ->
{ok, _TnxId, _} when RequiredSyncs =:= 1 ->
ok;
{ok, TnxId, _} when RequireNum =:= all ->
{ok, TnxId, _} when RequiredSyncs =:= all ->
wait_for_all_nodes_commit(TnxId, MinDelay, RetryTimeout);
{ok, TnxId, _} when is_integer(RequireNum) ->
wait_for_nodes_commit(RequireNum, TnxId, MinDelay, RetryTimeout);
{ok, TnxId, _} when is_integer(RequiredSyncs) ->
wait_for_nodes_commit(RequiredSyncs, TnxId, MinDelay, RetryTimeout);
Error ->
Error
end,
case OkOrFailed of
ok ->
InitRes;
{error, Error0} ->
{error, Error0};
{retry, Node0} ->
{init_failure, Error0} ->
{init_failure, Error0};
{peers_lagging, Nodes} ->
{ok, TnxId0, MFARes} = InitRes,
{retry, TnxId0, MFARes, Node0}
{peers_lagging, TnxId0, MFARes, Nodes}
end.
-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
@ -167,6 +219,11 @@ latest_tnx_id() ->
{atomic, TnxId} = transaction(fun get_latest_id/0, []),
TnxId.
-spec make_initiate_call_req(module(), atom(), list()) -> init_call_req().
make_initiate_call_req(M, F, A) ->
TnxId = get_latest_id(dirty),
?INITIATE({M, F, A}, TnxId).
-spec get_node_tnx_id(node()) -> integer().
get_node_tnx_id(Node) ->
case mnesia:wread({?CLUSTER_COMMIT, Node}) of
@ -232,12 +289,12 @@ handle_call(reset, _From, State) ->
_ = mria:clear_table(?CLUSTER_COMMIT),
_ = mria:clear_table(?CLUSTER_MFA),
{reply, ok, State, {continue, ?CATCH_UP}};
handle_call({initiate, MFA}, _From, State = #{node := Node}) ->
case transaction(fun init_mfa/2, [Node, MFA]) of
handle_call(?INITIATE(MFA, LatestId), _From, State = #{node := Node}) ->
case transaction(fun init_mfa/3, [Node, MFA, LatestId]) of
{atomic, {ok, TnxId, Result}} ->
{reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}};
{aborted, Reason} ->
{reply, {error, Reason}, State, {continue, ?CATCH_UP}}
{aborted, Error} ->
{reply, {init_failure, Error}, State, {continue, ?CATCH_UP}}
end;
handle_call(skip_failed_commit, _From, State = #{node := Node}) ->
Timeout = catch_up(State, true),
@ -273,7 +330,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
{atomic, caught_up} ->
?TIMEOUT;
{atomic, {still_lagging, NextId, MFA}} ->
{Succeed, _} = apply_mfa(NextId, MFA),
{Succeed, _} = apply_mfa(NextId, MFA, catch_up),
case Succeed orelse SkipResult of
true ->
case transaction(fun commit/2, [Node, NextId]) of
@ -316,35 +373,6 @@ read_next_mfa(Node) ->
[#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA}
end.
do_catch_up(ToTnxId, Node) ->
case mnesia:wread({?CLUSTER_COMMIT, Node}) of
[] ->
commit(Node, ToTnxId),
caught_up;
[#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId =:= LastAppliedId ->
caught_up;
[#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId > LastAppliedId ->
CurTnxId = LastAppliedId + 1,
[#cluster_rpc_mfa{mfa = MFA}] = mnesia:read(?CLUSTER_MFA, CurTnxId),
case apply_mfa(CurTnxId, MFA) of
{true, _Result} -> ok = commit(Node, CurTnxId);
{false, Error} -> mnesia:abort(Error)
end;
[#cluster_rpc_commit{tnx_id = LastAppliedId}] ->
Reason = lists:flatten(
io_lib:format(
"~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
[Node, LastAppliedId, ToTnxId]
)
),
?SLOG(error, #{
msg => "catch_up_failed!",
last_applied_id => LastAppliedId,
to_tnx_id => ToTnxId
}),
mnesia:abort(Reason)
end.
commit(Node, TnxId) ->
ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write).
@ -365,33 +393,44 @@ do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) ->
end.
get_latest_id() ->
case mnesia:last(?CLUSTER_MFA) of
get_latest_id(tnx).
get_latest_id(IsolationLevel) ->
F =
case IsolationLevel of
tnx -> fun mnesia:last/1;
dirty -> fun mnesia:dirty_last/1
end,
case F(?CLUSTER_MFA) of
'$end_of_table' -> 0;
Id -> Id
end.
init_mfa(Node, MFA) ->
init_mfa(Node, MFA, LatestIdLastSeen) ->
mnesia:write_lock_table(?CLUSTER_MFA),
LatestId = get_latest_id(),
ok = do_catch_up_in_one_trans(LatestId, Node),
TnxId = LatestId + 1,
MFARec = #cluster_rpc_mfa{
tnx_id = TnxId,
mfa = MFA,
initiator = Node,
created_at = erlang:localtime()
},
ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
ok = commit(Node, TnxId),
case apply_mfa(TnxId, MFA) of
{true, Result} -> {ok, TnxId, Result};
{false, Error} -> mnesia:abort(Error)
end.
do_catch_up_in_one_trans(LatestId, Node) ->
case do_catch_up(LatestId, Node) of
caught_up -> ok;
ok -> do_catch_up_in_one_trans(LatestId, Node)
case LatestIdLastSeen =:= LatestId of
true ->
TnxId = LatestId + 1,
MFARec = #cluster_rpc_mfa{
tnx_id = TnxId,
mfa = MFA,
initiator = Node,
created_at = erlang:localtime()
},
ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
ok = commit(Node, TnxId),
case apply_mfa(TnxId, MFA, init) of
{true, Result} -> {ok, TnxId, Result};
{false, Error} -> mnesia:abort(Error)
end;
false ->
?SLOG(error, #{
msg => stale_view_of_cluster_state,
tnx_id => LatestId,
last_seen_tnx_id => LatestIdLastSeen
}),
mnesia:abort({error, stale_view_of_cluster_state})
end.
transaction(Func, Args) ->
@ -433,7 +472,7 @@ trans_query(TnxId) ->
-define(TO_BIN(_B_), iolist_to_binary(io_lib:format("~p", [_B_]))).
apply_mfa(TnxId, {M, F, A}) ->
apply_mfa(TnxId, {M, F, A}, Kind) ->
Res =
try
erlang:apply(M, F, A)
@ -444,7 +483,7 @@ apply_mfa(TnxId, {M, F, A}) ->
{error, #{exception => Class, reason => Reason, stacktrace => Stacktrace}}
end,
%% Do not log args as it might be sensitive information
Meta = #{tnx_id => TnxId, entrypoint => format_mfa(M, F, length(A))},
Meta = #{kind => Kind, tnx_id => TnxId, entrypoint => format_mfa(M, F, length(A))},
IsSuccess = is_success(Res),
log_and_alarm(IsSuccess, Res, Meta),
{IsSuccess, Res}.
@ -475,21 +514,21 @@ wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
[] ->
ok;
Nodes ->
{retry, Nodes}
{peers_lagging, Nodes}
end.
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) ->
wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
ok = timer:sleep(Delay),
case length(synced_nodes(TnxId)) >= RequiredNum of
case length(synced_nodes(TnxId)) >= RequiredSyncs of
true ->
ok;
false when Remain > 0 ->
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay);
wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay);
false ->
case lagging_node(TnxId) of
%% All commit but The succeedNum > length(nodes()).
[] -> ok;
Nodes -> {retry, Nodes}
Nodes -> {peers_lagging, Nodes}
end
end.

View File

@ -92,7 +92,7 @@ get_node_and_config(KeyPath) ->
) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts) ->
check_cluster_rpc_result(emqx_conf_proto_v1:update(KeyPath, UpdateReq, Opts)).
emqx_conf_proto_v1:update(KeyPath, UpdateReq, Opts).
%% @doc Update the specified node's key path in local-override.conf.
-spec update(
@ -111,7 +111,7 @@ update(Node, KeyPath, UpdateReq, Opts) ->
-spec remove(emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove(KeyPath, Opts) ->
check_cluster_rpc_result(emqx_conf_proto_v1:remove_config(KeyPath, Opts)).
emqx_conf_proto_v1:remove_config(KeyPath, Opts).
%% @doc remove the specified node's key path in local-override.conf.
-spec remove(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
@ -125,7 +125,7 @@ remove(Node, KeyPath, Opts) ->
-spec reset(emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset(KeyPath, Opts) ->
check_cluster_rpc_result(emqx_conf_proto_v1:reset(KeyPath, Opts)).
emqx_conf_proto_v1:reset(KeyPath, Opts).
%% @doc reset the specified node's key path in local-override.conf.
-spec reset(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
@ -208,27 +208,6 @@ gen_example(File, SchemaModule, I18nFile, Lang) ->
Example = hocon_schema_example:gen(SchemaModule, Opts),
file:write_file(File, Example).
check_cluster_rpc_result(Result) ->
case Result of
{ok, _TnxId, Res} ->
Res;
{retry, TnxId, Res, Nodes} ->
%% The init MFA return ok, but other nodes failed.
%% We return ok and alert an alarm.
?SLOG(error, #{
msg => "failed_to_update_config_in_cluster",
nodes => Nodes,
tnx_id => TnxId
}),
Res;
%% all MFA return not ok or {ok, term()}.
{error, Error} ->
%% a lot of the callers do not handle
%% this error return, some even ignore
%% throw here to ensure the code will not proceed
erlang:throw(Error)
end.
%% Only gen hot_conf schema, not all configuration fields.
gen_hot_conf_schema(File) ->
{ApiSpec0, Components0} = emqx_dashboard_swagger:spec(

View File

@ -61,7 +61,7 @@ get_all(KeyPath) ->
update_config_key_path(),
emqx_config:update_request(),
emqx_config:update_opts()
) -> emqx_cluster_rpc:multicall_return().
) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts) ->
emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]).
@ -78,7 +78,7 @@ update(Node, KeyPath, UpdateReq, Opts) ->
rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000).
-spec remove_config(update_config_key_path(), emqx_config:update_opts()) ->
emqx_cluster_rpc:multicall_result().
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove_config(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]).
@ -90,7 +90,7 @@ remove_config(Node, KeyPath, Opts) ->
rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000).
-spec reset(update_config_key_path(), emqx_config:update_opts()) ->
emqx_cluster_rpc:multicall_return().
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]).

View File

@ -69,13 +69,13 @@ t_base_test(_Config) ->
?assertEqual(emqx_cluster_rpc:status(), {atomic, []}),
Pid = self(),
MFA = {M, F, A} = {?MODULE, echo, [Pid, test]},
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
{ok, TnxId, ok} = multicall(M, F, A),
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
?assertEqual(MFA, maps:get(mfa, Query)),
?assertEqual(node(), maps:get(initiator, Query)),
?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)),
?assertEqual({ok, 2, ok}, emqx_cluster_rpc:multicall(M, F, A)),
?assertEqual({ok, 2, ok}, multicall(M, F, A)),
{atomic, Status} = emqx_cluster_rpc:status(),
case length(Status) =:= 3 of
true ->
@ -95,7 +95,7 @@ t_commit_fail_test(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE2)]},
{error, "MFA return not ok"} = emqx_cluster_rpc:multicall(M, F, A),
{init_failure, "MFA return not ok"} = multicall(M, F, A),
?assertEqual({atomic, []}, emqx_cluster_rpc:status()),
ok.
@ -103,7 +103,7 @@ t_commit_crash_test(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
{M, F, A} = {?MODULE, no_exist_function, []},
{error, {error, Meta}} = emqx_cluster_rpc:multicall(M, F, A),
{init_failure, {error, Meta}} = multicall(M, F, A),
?assertEqual(undef, maps:get(reason, Meta)),
?assertEqual(error, maps:get(exception, Meta)),
?assertEqual(true, maps:is_key(stacktrace, Meta)),
@ -114,21 +114,23 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, _, ok} = multicall(M, F, A, 1, 1000),
{atomic, [Status]} = emqx_cluster_rpc:status(),
?assertEqual(MFA, maps:get(mfa, Status)),
?assertEqual(node(), maps:get(node, Status)),
erlang:send(?NODE2, test),
Res = gen_server:call(?NODE2, {initiate, {M, F, A}}),
?assertEqual({error, "MFA return not ok"}, Res),
Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A),
Res = gen_server:call(?NODE2, Call),
?assertEqual({init_failure, "MFA return not ok"}, Res),
ok.
t_catch_up_status_handle_next_commit(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
{M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]},
{ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 2} = gen_server:call(?NODE2, {initiate, {M, F, A}}),
{ok, 1, ok} = multicall(M, F, A, 1, 1000),
Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A),
{ok, 2} = gen_server:call(?NODE2, Call),
ok.
t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
@ -138,19 +140,19 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
ets:insert(test, {other_mfa_result, failed}),
ct:pal("111:~p~n", [ets:tab2list(cluster_rpc_commit)]),
{M, F, A} = {?MODULE, failed_on_other_recover_after_retry, [erlang:whereis(?NODE1)]},
{ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 1, ok} = multicall(M, F, A, 1, 1000),
ct:pal("222:~p~n", [ets:tab2list(cluster_rpc_commit)]),
ct:pal("333:~p~n", [emqx_cluster_rpc:status()]),
{atomic, [_Status | L]} = emqx_cluster_rpc:status(),
?assertEqual([], L),
ets:insert(test, {other_mfa_result, ok}),
{ok, 2, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000),
{ok, 2, ok} = multicall(io, format, ["test"], 1, 1000),
ct:sleep(1000),
{atomic, NewStatus} = emqx_cluster_rpc:status(),
?assertEqual(3, length(NewStatus)),
Pid = self(),
MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]},
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M1, F1, A1),
{ok, TnxId, ok} = multicall(M1, F1, A1),
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
?assertEqual(MFAEcho, maps:get(mfa, Query)),
?assertEqual(node(), maps:get(initiator, Query)),
@ -167,7 +169,7 @@ t_del_stale_mfa(_Config) ->
Ids =
[
begin
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
{ok, TnxId, ok} = multicall(M, F, A),
TnxId
end
|| _ <- Keys
@ -176,7 +178,7 @@ t_del_stale_mfa(_Config) ->
Ids2 =
[
begin
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
{ok, TnxId, ok} = multicall(M, F, A),
TnxId
end
|| _ <- Keys2
@ -203,7 +205,7 @@ t_del_stale_mfa(_Config) ->
t_skip_failed_commit(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000),
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
@ -212,7 +214,7 @@ t_skip_failed_commit(_Config) ->
tnx_ids(List1)
),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 2, ok} = multicall(M, F, A, 1, 1000),
2 = gen_server:call(?NODE2, skip_failed_commit, 5000),
{atomic, List2} = emqx_cluster_rpc:status(),
?assertEqual(
@ -224,7 +226,7 @@ t_skip_failed_commit(_Config) ->
t_fast_forward_commit(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000),
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
@ -233,11 +235,11 @@ t_fast_forward_commit(_Config) ->
tnx_ids(List1)
),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 3, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 4, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 5, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{retry, 6, ok, _} = emqx_cluster_rpc:multicall(M, F, A, 2, 1000),
{ok, 2, ok} = multicall(M, F, A, 1, 1000),
{ok, 3, ok} = multicall(M, F, A, 1, 1000),
{ok, 4, ok} = multicall(M, F, A, 1, 1000),
{ok, 5, ok} = multicall(M, F, A, 1, 1000),
{peers_lagging, 6, ok, _} = multicall(M, F, A, 2, 1000),
3 = gen_server:call(?NODE2, {fast_forward_to_commit, 3}, 5000),
4 = gen_server:call(?NODE2, {fast_forward_to_commit, 4}, 5000),
6 = gen_server:call(?NODE2, {fast_forward_to_commit, 7}, 5000),
@ -333,3 +335,9 @@ failed_on_other_recover_after_retry(Pid) ->
[{_, Res}] = ets:lookup(test, other_mfa_result),
Res
end.
multicall(M, F, A, N, T) ->
emqx_cluster_rpc:do_multicall(M, F, A, N, T).
multicall(M, F, A) ->
multicall(M, F, A, all, timer:minutes(2)).

View File

@ -70,6 +70,7 @@ end_per_testcase(_, _Config) ->
ok.
t_list_raw_empty(_) ->
ok = emqx_config:erase(hd(emqx_connector:config_key_path())),
Result = emqx_connector:list_raw(),
?assertEqual([], Result).

View File

@ -556,7 +556,7 @@ with_gateway(GwName0, Fun) ->
end,
case emqx_gateway:lookup(GwName) of
undefined ->
return_http_error(404, "Gateway not load");
return_http_error(404, "Gateway not loaded");
Gateway ->
Fun(GwName, Gateway)
end

View File

@ -317,7 +317,7 @@ get_plugins() ->
upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) ->
[{FileName, Bin}] = maps:to_list(maps:without([type], Plugin)),
%% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall
%% TODO what happened when a new node join in?
%% TODO what happens when a new node join in?
%% emqx_plugins_monitor should copy plugins from other core node when boot-up.
case emqx_plugins:describe(string:trim(FileName, trailing, ".tar.gz")) of
{error, #{error := "bad_info_file", return := {enoent, _}}} ->
@ -358,16 +358,11 @@ upload_install(post, #{}) ->
}}.
do_install_package(FileName, Bin) ->
{Res, _} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin),
case lists:filter(fun(R) -> R =/= ok end, Res) of
[] ->
{200};
[{error, Reason} | _] ->
{400, #{
code => 'UNEXPECTED_ERROR',
message => iolist_to_binary(io_lib:format("~p", [Reason]))
}}
end.
%% TODO: handle bad nodes
{[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin),
%% TODO: handle non-OKs
[] = lists:filter(fun(R) -> R =/= ok end, Res),
{200}.
plugin(get, #{bindings := #{name := Name}}) ->
{Plugins, _} = emqx_mgmt_api_plugins_proto_v1:describe_package(Name),
@ -376,11 +371,11 @@ plugin(get, #{bindings := #{name := Name}}) ->
[] -> {404, #{code => 'NOT_FOUND', message => Name}}
end;
plugin(delete, #{bindings := #{name := Name}}) ->
{ok, _TnxId, Res} = emqx_mgmt_api_plugins_proto_v1:delete_package(Name),
Res = emqx_mgmt_api_plugins_proto_v1:delete_package(Name),
return(204, Res).
update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
{ok, _TnxId, Res} = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action),
Res = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action),
return(204, Res).
update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
@ -422,7 +417,8 @@ delete_package(Name) ->
ok ->
_ = emqx_plugins:ensure_disabled(Name),
_ = emqx_plugins:ensure_uninstalled(Name),
_ = emqx_plugins:delete_package(Name);
_ = emqx_plugins:delete_package(Name),
ok;
Error ->
Error
end.
@ -430,20 +426,19 @@ delete_package(Name) ->
%% for RPC plugin update
ensure_action(Name, start) ->
_ = emqx_plugins:ensure_enabled(Name),
_ = emqx_plugins:ensure_started(Name);
_ = emqx_plugins:ensure_started(Name),
ok;
ensure_action(Name, stop) ->
_ = emqx_plugins:ensure_stopped(Name),
_ = emqx_plugins:ensure_disabled(Name);
_ = emqx_plugins:ensure_disabled(Name),
ok;
ensure_action(Name, restart) ->
_ = emqx_plugins:ensure_enabled(Name),
_ = emqx_plugins:restart(Name).
_ = emqx_plugins:restart(Name),
ok.
return(Code, ok) ->
{Code};
return(Code, {ok, Result}) ->
{Code, Result};
return(_, {error, #{error := "bad_info_file", return := {enoent, _}, path := Path}}) ->
{404, #{code => 'NOT_FOUND', message => Path}};
return(_, {error, Reason}) ->
{400, #{code => 'PARAM_ERROR', message => iolist_to_binary(io_lib:format("~p", [Reason]))}}.

View File

@ -43,11 +43,10 @@ install_package(Filename, Bin) ->
describe_package(Name) ->
rpc:multicall(emqx_mgmt_api_plugins, describe_package, [Name], 10000).
-spec delete_package(binary() | string()) -> emqx_cluster_rpc:multicall_return().
-spec delete_package(binary() | string()) -> ok | {error, any()}.
delete_package(Name) ->
emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000).
-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') ->
emqx_cluster_rpc:multicall_return().
-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> ok | {error, any()}.
ensure_action(Name, Action) ->
emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000).

View File

@ -61,8 +61,6 @@
can_topic_match_oneof/2
]).
-export([cluster_call/3]).
-compile({no_auto_import, [float/1]}).
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
@ -307,7 +305,3 @@ can_topic_match_oneof(Topic, Filters) ->
end,
Filters
).
cluster_call(Module, Func, Args) ->
{ok, _TnxId, Result} = emqx_cluster_rpc:multicall(Module, Func, Args),
Result.

View File

@ -337,7 +337,7 @@ do_ensure_started(NameVsn) ->
).
%% try the function, catch 'throw' exceptions as normal 'error' return
%% other exceptions with stacktrace returned.
%% other exceptions with stacktrace logged.
tryit(WhichOp, F) ->
try
F()

View File

@ -125,20 +125,10 @@ purge_test() ->
meck_emqx() ->
meck:new(emqx, [unstick, passthrough]),
meck:expect(
emqx,
update_config,
emqx_conf,
update,
fun(Path, Values, _Opts) ->
emqx_config:put(Path, Values)
end
),
%meck:expect(emqx, get_config,
% fun(KeyPath, Default) ->
% Map = emqx:get_raw_config(KeyPath, Default),
% Map1 = emqx_map_lib:safe_atom_key_map(Map),
% case Map1 of
% #{states := Plugins} ->
% Map1#{states => [emqx_map_lib:safe_atom_key_map(P) ||P <- Plugins]};
% _ -> Map1
% end
% end),
ok.

View File

@ -174,7 +174,7 @@ create(InstId, Group, ResourceType, Config) ->
-spec create(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create(InstId, Group, ResourceType, Config, Opts) ->
wrap_rpc(emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts)).
emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts).
% --------------------------------------------
-spec create_local(instance_id(), resource_group(), resource_type(), resource_config()) ->
@ -196,7 +196,7 @@ create_local(InstId, Group, ResourceType, Config, Opts) ->
-spec create_dry_run(resource_type(), resource_config()) ->
ok | {error, Reason :: term()}.
create_dry_run(ResourceType, Config) ->
wrap_rpc(emqx_resource_proto_v1:create_dry_run(ResourceType, Config)).
emqx_resource_proto_v1:create_dry_run(ResourceType, Config).
-spec create_dry_run_local(resource_type(), resource_config()) ->
ok | {error, Reason :: term()}.
@ -211,7 +211,7 @@ recreate(InstId, ResourceType, Config) ->
-spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
{ok, resource_data()} | {error, Reason :: term()}.
recreate(InstId, ResourceType, Config, Opts) ->
wrap_rpc(emqx_resource_proto_v1:recreate(InstId, ResourceType, Config, Opts)).
emqx_resource_proto_v1:recreate(InstId, ResourceType, Config, Opts).
-spec recreate_local(instance_id(), resource_type(), resource_config()) ->
{ok, resource_data()} | {error, Reason :: term()}.
@ -225,7 +225,7 @@ recreate_local(InstId, ResourceType, Config, Opts) ->
-spec remove(instance_id()) -> ok | {error, Reason :: term()}.
remove(InstId) ->
wrap_rpc(emqx_resource_proto_v1:remove(InstId)).
emqx_resource_proto_v1:remove(InstId).
-spec remove_local(instance_id()) -> ok | {error, Reason :: term()}.
remove_local(InstId) ->
@ -237,7 +237,7 @@ reset_metrics_local(InstId) ->
-spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}.
reset_metrics(InstId) ->
wrap_rpc(emqx_resource_proto_v1:reset_metrics(InstId)).
emqx_resource_proto_v1:reset_metrics(InstId).
%% =================================================================================
-spec query(instance_id(), Request :: term()) -> Result :: term().
@ -430,11 +430,5 @@ inc_metrics_funcs(InstId) ->
safe_apply(Func, Args) ->
?SAFE_CALL(erlang:apply(Func, Args)).
wrap_rpc(Ret) ->
case Ret of
{ok, _TxnId, Result} -> Result;
Failed -> Failed
end.
query_error(Reason, Msg) ->
{error, {?MODULE, #{reason => Reason, msg => Msg}}}.

View File

@ -40,7 +40,7 @@ introduced_in() ->
resource_config(),
create_opts()
) ->
emqx_cluster_rpc:multicall_return(resource_data()).
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create(InstId, Group, ResourceType, Config, Opts) ->
emqx_cluster_rpc:multicall(emqx_resource, create_local, [
InstId, Group, ResourceType, Config, Opts
@ -50,7 +50,7 @@ create(InstId, Group, ResourceType, Config, Opts) ->
resource_type(),
resource_config()
) ->
emqx_cluster_rpc:multicall_return(resource_data()).
ok | {error, Reason :: term()}.
create_dry_run(ResourceType, Config) ->
emqx_cluster_rpc:multicall(emqx_resource, create_dry_run_local, [ResourceType, Config]).
@ -60,16 +60,14 @@ create_dry_run(ResourceType, Config) ->
resource_config(),
create_opts()
) ->
emqx_cluster_rpc:multicall_return(resource_data()).
{ok, resource_data()} | {error, Reason :: term()}.
recreate(InstId, ResourceType, Config, Opts) ->
emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [InstId, ResourceType, Config, Opts]).
-spec remove(instance_id()) ->
emqx_cluster_rpc:multicall_return(ok).
-spec remove(instance_id()) -> ok | {error, Reason :: term()}.
remove(InstId) ->
emqx_cluster_rpc:multicall(emqx_resource, remove_local, [InstId]).
-spec reset_metrics(instance_id()) ->
emqx_cluster_rpc:multicall_return(ok).
-spec reset_metrics(instance_id()) -> ok | {error, any()}.
reset_metrics(InstId) ->
emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [InstId]).

View File

@ -321,7 +321,7 @@ replace_sql_clrf(#{<<"sql">> := SQL} = Params) ->
end.
'/rules/:id/reset_metrics'(put, #{bindings := #{id := RuleId}}) ->
case emqx_rule_engine_proto_v1:reset_metrics(RuleId) of
{ok, _TxnId, _Result} ->
ok ->
{200, <<"Reset Success">>};
Failed ->
{400, #{

View File

@ -30,7 +30,6 @@
introduced_in() ->
"5.0.0".
-spec reset_metrics(rule_id()) ->
emqx_cluster_rpc:multicall_return(ok).
-spec reset_metrics(rule_id()) -> ok | {error, any()}.
reset_metrics(RuleId) ->
emqx_cluster_rpc:multicall(emqx_rule_engine, reset_metrics_for_rule, [RuleId]).