Merge pull request #8182 from zmstone/0611-fix-config-update-race-condition

fix(cluster_rpc): config update race condition
This commit is contained in:
Zaiming (Stone) Shi 2022-06-13 11:46:08 +01:00 committed by GitHub
commit 8518e71db1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 283 additions and 222 deletions

View File

@ -0,0 +1,42 @@
## This is a fast-build Dockerfile only for testing
FROM ubuntu:20.04
ARG PROFILE=emqx
RUN apt-get update; \
apt-get install -y --no-install-recommends ca-certificates procps; \
rm -rf /var/lib/apt/lists/*
RUN mkdir /opt/emqx
RUN date > /opt/emqx/BUILD_TIME
COPY _build/${PROFILE}/rel/emqx /opt/emqx
RUN ln -s /opt/emqx/bin/* /usr/local/bin/
COPY deploy/docker/docker-entrypoint.sh /usr/bin/
WORKDIR /opt/emqx
RUN groupadd -r -g 1000 emqx; \
useradd -r -m -u 1000 -g emqx emqx; \
chgrp -Rf emqx /opt/emqx; \
chmod -Rf g+w /opt/emqx; \
chown -Rf emqx /opt/emqx
USER emqx
VOLUME ["/opt/emqx/log", "/opt/emqx/data"]
# emqx will occupy these port:
# - 1883 port for MQTT
# - 8081 for mgmt API
# - 8083 for WebSocket/HTTP
# - 8084 for WSS/HTTPS
# - 8883 port for MQTT(SSL)
# - 11883 port for internal MQTT/TCP
# - 18083 for dashboard
# - 4370 default Erlang distrbution port
# - 5369 for backplain gen_rpc
EXPOSE 1883 8081 8083 8084 8883 11883 18083 4370 5369
ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"]
CMD ["/opt/emqx/bin/emqx", "foreground"]

View File

@ -0,0 +1,4 @@
*
!_build/emqx
!_build/emqx-enterprise
!deploy

View File

@ -233,7 +233,8 @@ put(Config) ->
erase(RootName) ->
persistent_term:erase(?PERSIS_KEY(?CONF, bin(RootName))),
persistent_term:erase(?PERSIS_KEY(?RAW_CONF, bin(RootName))).
persistent_term:erase(?PERSIS_KEY(?RAW_CONF, bin(RootName))),
ok.
-spec put(emqx_map_lib:config_key_path(), term()) -> ok.
put(KeyPath, Config) ->

View File

@ -163,8 +163,7 @@ get_counters(Name, Id) ->
reset_counters(Name, Id) ->
Indexes = maps:values(get_indexes(Name, Id)),
Ref = get_ref(Name, Id),
[counters:put(Ref, Idx, 0) || Idx <- Indexes],
ok.
lists:foreach(fun(Idx) -> counters:put(Ref, Idx, 0) end, Indexes).
-spec get_metrics(handler_name(), metric_id()) -> metrics().
get_metrics(Name, Id) ->

View File

@ -53,8 +53,7 @@
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
%% List of known functions also known to do RPC:
-define(RPC_FUNCTIONS,
"emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5, "
"emqx_plugin_libs_rule:cluster_call/3"
"emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5"
).
%% List of functions in the RPC backend modules that we can ignore:
@ -63,11 +62,9 @@
%% List of business-layer functions that are exempt from the checks:
%% erlfmt-ignore
-define(EXEMPTIONS,
"emqx_mgmt_api:do_query/6," % Reason: legacy code. A fun and a QC query are
"emqx_mgmt_api:do_query/6" % Reason: legacy code. A fun and a QC query are
% passed in the args, it's futile to try to statically
% check it
"emqx_plugin_libs_rule:cluster_call/3" % Reason: some sort of external plugin API that we
% don't want to break?
).
-define(XREF, myxref).

View File

@ -30,7 +30,11 @@
skip_failed_commit/1,
fast_forward_to_commit/2
]).
-export([get_node_tnx_id/1, latest_tnx_id/0]).
-export([
get_node_tnx_id/1,
latest_tnx_id/0,
make_initiate_call_req/3
]).
-export([
init/1,
@ -44,7 +48,7 @@
-export([get_tables_status/0]).
-export_type([txn_id/0, succeed_num/0, multicall_return/1, multicall_return/0]).
-export_type([tnx_id/0, succeed_num/0]).
-ifdef(TEST).
-compile(export_all).
@ -56,19 +60,21 @@
-include_lib("emqx/include/logger.hrl").
-include("emqx_conf.hrl").
-define(INITIATE(MFA, LatestIdLastSeen), {initiate, MFA, LatestIdLastSeen}).
-define(CATCH_UP, catch_up).
-define(TIMEOUT, timer:minutes(1)).
-type txn_id() :: pos_integer().
-type tnx_id() :: pos_integer().
-type succeed_num() :: pos_integer() | all.
-type multicall_return(Result) ::
{ok, txn_id(), Result}
| {error, term()}
| {retry, txn_id(), Result, node()}.
{ok, tnx_id(), Result}
| {init_failure, term()}
| {peers_lagging, tnx_id(), Result, [node()]}.
-type multicall_return() :: multicall_return(_).
-type init_call_req() :: ?INITIATE({module(), atom(), list()}, tnx_id()).
%%%===================================================================
%%% API
@ -102,27 +108,73 @@ start_link(Node, Name, RetryMs) ->
{error, Reason}
end.
%% @doc return {ok, TnxId, MFARes} the first MFA result when all MFA run ok.
%% return {error, MFARes} when the first MFA result is no ok or {ok, term()}.
%% return {retry, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok.
-spec multicall(module(), atom(), list()) -> multicall_return().
%% @doc Initiate a local call (or core node),
%% then async-ly replicate the call to peer nodes in the cluster.
%% The evaluation result of the provided MFA is returned,
%% the result is expected to be `ok | {ok, _}' to indicate success,
%% and `{error, _}' to indicate failure.
%%
%% The excpetion of the MFA evaluation is captured and translated
%% into an `{error, _}' tuple.
%% This call tries to wait for all peer nodes to be in-sync before
%% returning the result.
%%
%% In case of partial success, an `error' level log is emitted
%% but the initial localy apply result is returned.
-spec multicall(module(), atom(), list()) -> term().
multicall(M, F, A) ->
multicall(M, F, A, all, timer:minutes(2)).
-spec multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return().
multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 ->
MFA = {initiate, {M, F, A}},
-spec multicall(module(), atom(), list(), succeed_num(), timeout()) -> term().
multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse RequiredSyncs >= 1 ->
case do_multicall(M, F, A, RequiredSyncs, Timeout) of
{ok, _TxnId, Result} ->
Result;
{init_failure, Error} ->
Error;
{peers_lagging, TnxId, Res, Nodes} ->
%% The init MFA return ok, but some other nodes failed.
?SLOG(error, #{
msg => "cluster_rpc_peers_lagging",
lagging_nodes => Nodes,
tnx_id => TnxId
}),
Res
end.
%% Return {ok, TnxId, MFARes} the first MFA result when all MFA run ok.
%% return {init_failure, Error} when the initial MFA result is no ok or {ok, term()}.
%% return {peers_lagging, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok.
-spec do_multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return().
do_multicall(M, F, A, RequiredSyncs, Timeout) when
RequiredSyncs =:= all orelse RequiredSyncs >= 1
->
%% Idealy 'LatestId' should be provided by the multicall originator,
%% which is the viewer of the state e.g.
%% * Sysadmin who issues CLI-commands or REST-API calls to make config changes
%% * Dashboard viewer who is making decision based on what they can see from the UI
%% To reach the ideal state, it would require adding transaction ID to each and
%% every view/GET requests and also provide the ID as a part of the view/GET responses.
%%
%% To keep things simple, we try to get the 'old' view when a multicall request
%% is received as early as possible.
%%
%% Reason to do this:
%% The 'initiate' call handler tries to take a table lock (cluster-wide) before
%% bumping the transaction ID. While waiting for the lock, the ID might have been
%% bumpped by another node in the cluster.
InitReq = make_initiate_call_req(M, F, A),
Begin = erlang:monotonic_time(),
InitRes =
case mria_rlog:role() of
core ->
gen_server:call(?MODULE, MFA, Timeout);
gen_server:call(?MODULE, InitReq, Timeout);
replicant ->
%% the initiate transaction must happened on core node
%% make sure MFA(in the transaction) and the transaction on the same node
%% don't need rpc again inside transaction.
case mria_status:upstream_node(?CLUSTER_RPC_SHARD) of
{ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout);
{ok, Node} -> gen_server:call({?MODULE, Node}, InitReq, Timeout);
disconnected -> {error, disconnected}
end
end,
@ -132,23 +184,23 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu
RetryTimeout = ceil(3 * max(MinDelay, get_retry_ms())),
OkOrFailed =
case InitRes of
{ok, _TnxId, _} when RequireNum =:= 1 ->
{ok, _TnxId, _} when RequiredSyncs =:= 1 ->
ok;
{ok, TnxId, _} when RequireNum =:= all ->
{ok, TnxId, _} when RequiredSyncs =:= all ->
wait_for_all_nodes_commit(TnxId, MinDelay, RetryTimeout);
{ok, TnxId, _} when is_integer(RequireNum) ->
wait_for_nodes_commit(RequireNum, TnxId, MinDelay, RetryTimeout);
{ok, TnxId, _} when is_integer(RequiredSyncs) ->
wait_for_nodes_commit(RequiredSyncs, TnxId, MinDelay, RetryTimeout);
Error ->
Error
end,
case OkOrFailed of
ok ->
InitRes;
{error, Error0} ->
{error, Error0};
{retry, Node0} ->
{init_failure, Error0} ->
{init_failure, Error0};
{peers_lagging, Nodes} ->
{ok, TnxId0, MFARes} = InitRes,
{retry, TnxId0, MFARes, Node0}
{peers_lagging, TnxId0, MFARes, Nodes}
end.
-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
@ -167,6 +219,11 @@ latest_tnx_id() ->
{atomic, TnxId} = transaction(fun get_latest_id/0, []),
TnxId.
-spec make_initiate_call_req(module(), atom(), list()) -> init_call_req().
make_initiate_call_req(M, F, A) ->
TnxId = get_latest_id(dirty),
?INITIATE({M, F, A}, TnxId).
-spec get_node_tnx_id(node()) -> integer().
get_node_tnx_id(Node) ->
case mnesia:wread({?CLUSTER_COMMIT, Node}) of
@ -232,12 +289,12 @@ handle_call(reset, _From, State) ->
_ = mria:clear_table(?CLUSTER_COMMIT),
_ = mria:clear_table(?CLUSTER_MFA),
{reply, ok, State, {continue, ?CATCH_UP}};
handle_call({initiate, MFA}, _From, State = #{node := Node}) ->
case transaction(fun init_mfa/2, [Node, MFA]) of
handle_call(?INITIATE(MFA, LatestId), _From, State = #{node := Node}) ->
case transaction(fun init_mfa/3, [Node, MFA, LatestId]) of
{atomic, {ok, TnxId, Result}} ->
{reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}};
{aborted, Reason} ->
{reply, {error, Reason}, State, {continue, ?CATCH_UP}}
{aborted, Error} ->
{reply, {init_failure, Error}, State, {continue, ?CATCH_UP}}
end;
handle_call(skip_failed_commit, _From, State = #{node := Node}) ->
Timeout = catch_up(State, true),
@ -273,7 +330,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
{atomic, caught_up} ->
?TIMEOUT;
{atomic, {still_lagging, NextId, MFA}} ->
{Succeed, _} = apply_mfa(NextId, MFA),
{Succeed, _} = apply_mfa(NextId, MFA, catch_up),
case Succeed orelse SkipResult of
true ->
case transaction(fun commit/2, [Node, NextId]) of
@ -316,35 +373,6 @@ read_next_mfa(Node) ->
[#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA}
end.
do_catch_up(ToTnxId, Node) ->
case mnesia:wread({?CLUSTER_COMMIT, Node}) of
[] ->
commit(Node, ToTnxId),
caught_up;
[#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId =:= LastAppliedId ->
caught_up;
[#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId > LastAppliedId ->
CurTnxId = LastAppliedId + 1,
[#cluster_rpc_mfa{mfa = MFA}] = mnesia:read(?CLUSTER_MFA, CurTnxId),
case apply_mfa(CurTnxId, MFA) of
{true, _Result} -> ok = commit(Node, CurTnxId);
{false, Error} -> mnesia:abort(Error)
end;
[#cluster_rpc_commit{tnx_id = LastAppliedId}] ->
Reason = lists:flatten(
io_lib:format(
"~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
[Node, LastAppliedId, ToTnxId]
)
),
?SLOG(error, #{
msg => "catch_up_failed!",
last_applied_id => LastAppliedId,
to_tnx_id => ToTnxId
}),
mnesia:abort(Reason)
end.
commit(Node, TnxId) ->
ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write).
@ -365,15 +393,24 @@ do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) ->
end.
get_latest_id() ->
case mnesia:last(?CLUSTER_MFA) of
get_latest_id(tnx).
get_latest_id(IsolationLevel) ->
F =
case IsolationLevel of
tnx -> fun mnesia:last/1;
dirty -> fun mnesia:dirty_last/1
end,
case F(?CLUSTER_MFA) of
'$end_of_table' -> 0;
Id -> Id
end.
init_mfa(Node, MFA) ->
init_mfa(Node, MFA, LatestIdLastSeen) ->
mnesia:write_lock_table(?CLUSTER_MFA),
LatestId = get_latest_id(),
ok = do_catch_up_in_one_trans(LatestId, Node),
case LatestIdLastSeen =:= LatestId of
true ->
TnxId = LatestId + 1,
MFARec = #cluster_rpc_mfa{
tnx_id = TnxId,
@ -383,15 +420,17 @@ init_mfa(Node, MFA) ->
},
ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
ok = commit(Node, TnxId),
case apply_mfa(TnxId, MFA) of
case apply_mfa(TnxId, MFA, init) of
{true, Result} -> {ok, TnxId, Result};
{false, Error} -> mnesia:abort(Error)
end.
do_catch_up_in_one_trans(LatestId, Node) ->
case do_catch_up(LatestId, Node) of
caught_up -> ok;
ok -> do_catch_up_in_one_trans(LatestId, Node)
end;
false ->
?SLOG(error, #{
msg => stale_view_of_cluster_state,
tnx_id => LatestId,
last_seen_tnx_id => LatestIdLastSeen
}),
mnesia:abort({error, stale_view_of_cluster_state})
end.
transaction(Func, Args) ->
@ -433,7 +472,7 @@ trans_query(TnxId) ->
-define(TO_BIN(_B_), iolist_to_binary(io_lib:format("~p", [_B_]))).
apply_mfa(TnxId, {M, F, A}) ->
apply_mfa(TnxId, {M, F, A}, Kind) ->
Res =
try
erlang:apply(M, F, A)
@ -444,7 +483,7 @@ apply_mfa(TnxId, {M, F, A}) ->
{error, #{exception => Class, reason => Reason, stacktrace => Stacktrace}}
end,
%% Do not log args as it might be sensitive information
Meta = #{tnx_id => TnxId, entrypoint => format_mfa(M, F, length(A))},
Meta = #{kind => Kind, tnx_id => TnxId, entrypoint => format_mfa(M, F, length(A))},
IsSuccess = is_success(Res),
log_and_alarm(IsSuccess, Res, Meta),
{IsSuccess, Res}.
@ -475,21 +514,21 @@ wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
[] ->
ok;
Nodes ->
{retry, Nodes}
{peers_lagging, Nodes}
end.
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) ->
wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
ok = timer:sleep(Delay),
case length(synced_nodes(TnxId)) >= RequiredNum of
case length(synced_nodes(TnxId)) >= RequiredSyncs of
true ->
ok;
false when Remain > 0 ->
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay);
wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay);
false ->
case lagging_node(TnxId) of
%% All commit but The succeedNum > length(nodes()).
[] -> ok;
Nodes -> {retry, Nodes}
Nodes -> {peers_lagging, Nodes}
end
end.

View File

@ -92,7 +92,7 @@ get_node_and_config(KeyPath) ->
) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts) ->
check_cluster_rpc_result(emqx_conf_proto_v1:update(KeyPath, UpdateReq, Opts)).
emqx_conf_proto_v1:update(KeyPath, UpdateReq, Opts).
%% @doc Update the specified node's key path in local-override.conf.
-spec update(
@ -111,7 +111,7 @@ update(Node, KeyPath, UpdateReq, Opts) ->
-spec remove(emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove(KeyPath, Opts) ->
check_cluster_rpc_result(emqx_conf_proto_v1:remove_config(KeyPath, Opts)).
emqx_conf_proto_v1:remove_config(KeyPath, Opts).
%% @doc remove the specified node's key path in local-override.conf.
-spec remove(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
@ -125,7 +125,7 @@ remove(Node, KeyPath, Opts) ->
-spec reset(emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset(KeyPath, Opts) ->
check_cluster_rpc_result(emqx_conf_proto_v1:reset(KeyPath, Opts)).
emqx_conf_proto_v1:reset(KeyPath, Opts).
%% @doc reset the specified node's key path in local-override.conf.
-spec reset(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
@ -208,24 +208,6 @@ gen_example(File, SchemaModule, I18nFile, Lang) ->
Example = hocon_schema_example:gen(SchemaModule, Opts),
file:write_file(File, Example).
check_cluster_rpc_result(Result) ->
case Result of
{ok, _TnxId, Res} ->
Res;
{retry, TnxId, Res, Nodes} ->
%% The init MFA return ok, but other nodes failed.
%% We return ok and alert an alarm.
?SLOG(error, #{
msg => "failed_to_update_config_in_cluster",
nodes => Nodes,
tnx_id => TnxId
}),
Res;
%% all MFA return not ok or {ok, term()}.
{error, Error} ->
Error
end.
%% Only gen hot_conf schema, not all configuration fields.
gen_hot_conf_schema(File) ->
{ApiSpec0, Components0} = emqx_dashboard_swagger:spec(

View File

@ -61,7 +61,7 @@ get_all(KeyPath) ->
update_config_key_path(),
emqx_config:update_request(),
emqx_config:update_opts()
) -> emqx_cluster_rpc:multicall_return().
) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts) ->
emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]).
@ -78,7 +78,7 @@ update(Node, KeyPath, UpdateReq, Opts) ->
rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000).
-spec remove_config(update_config_key_path(), emqx_config:update_opts()) ->
emqx_cluster_rpc:multicall_result().
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove_config(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]).
@ -90,7 +90,7 @@ remove_config(Node, KeyPath, Opts) ->
rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000).
-spec reset(update_config_key_path(), emqx_config:update_opts()) ->
emqx_cluster_rpc:multicall_return().
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]).

View File

@ -69,13 +69,13 @@ t_base_test(_Config) ->
?assertEqual(emqx_cluster_rpc:status(), {atomic, []}),
Pid = self(),
MFA = {M, F, A} = {?MODULE, echo, [Pid, test]},
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
{ok, TnxId, ok} = multicall(M, F, A),
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
?assertEqual(MFA, maps:get(mfa, Query)),
?assertEqual(node(), maps:get(initiator, Query)),
?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)),
?assertEqual({ok, 2, ok}, emqx_cluster_rpc:multicall(M, F, A)),
?assertEqual({ok, 2, ok}, multicall(M, F, A)),
{atomic, Status} = emqx_cluster_rpc:status(),
case length(Status) =:= 3 of
true ->
@ -95,7 +95,7 @@ t_commit_fail_test(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE2)]},
{error, "MFA return not ok"} = emqx_cluster_rpc:multicall(M, F, A),
{init_failure, "MFA return not ok"} = multicall(M, F, A),
?assertEqual({atomic, []}, emqx_cluster_rpc:status()),
ok.
@ -103,7 +103,7 @@ t_commit_crash_test(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
{M, F, A} = {?MODULE, no_exist_function, []},
{error, {error, Meta}} = emqx_cluster_rpc:multicall(M, F, A),
{init_failure, {error, Meta}} = multicall(M, F, A),
?assertEqual(undef, maps:get(reason, Meta)),
?assertEqual(error, maps:get(exception, Meta)),
?assertEqual(true, maps:is_key(stacktrace, Meta)),
@ -114,21 +114,23 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, _, ok} = multicall(M, F, A, 1, 1000),
{atomic, [Status]} = emqx_cluster_rpc:status(),
?assertEqual(MFA, maps:get(mfa, Status)),
?assertEqual(node(), maps:get(node, Status)),
erlang:send(?NODE2, test),
Res = gen_server:call(?NODE2, {initiate, {M, F, A}}),
?assertEqual({error, "MFA return not ok"}, Res),
Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A),
Res = gen_server:call(?NODE2, Call),
?assertEqual({init_failure, "MFA return not ok"}, Res),
ok.
t_catch_up_status_handle_next_commit(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
{M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]},
{ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 2} = gen_server:call(?NODE2, {initiate, {M, F, A}}),
{ok, 1, ok} = multicall(M, F, A, 1, 1000),
Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A),
{ok, 2} = gen_server:call(?NODE2, Call),
ok.
t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
@ -138,19 +140,19 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
ets:insert(test, {other_mfa_result, failed}),
ct:pal("111:~p~n", [ets:tab2list(cluster_rpc_commit)]),
{M, F, A} = {?MODULE, failed_on_other_recover_after_retry, [erlang:whereis(?NODE1)]},
{ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 1, ok} = multicall(M, F, A, 1, 1000),
ct:pal("222:~p~n", [ets:tab2list(cluster_rpc_commit)]),
ct:pal("333:~p~n", [emqx_cluster_rpc:status()]),
{atomic, [_Status | L]} = emqx_cluster_rpc:status(),
?assertEqual([], L),
ets:insert(test, {other_mfa_result, ok}),
{ok, 2, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000),
{ok, 2, ok} = multicall(io, format, ["test"], 1, 1000),
ct:sleep(1000),
{atomic, NewStatus} = emqx_cluster_rpc:status(),
?assertEqual(3, length(NewStatus)),
Pid = self(),
MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]},
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M1, F1, A1),
{ok, TnxId, ok} = multicall(M1, F1, A1),
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
?assertEqual(MFAEcho, maps:get(mfa, Query)),
?assertEqual(node(), maps:get(initiator, Query)),
@ -167,7 +169,7 @@ t_del_stale_mfa(_Config) ->
Ids =
[
begin
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
{ok, TnxId, ok} = multicall(M, F, A),
TnxId
end
|| _ <- Keys
@ -176,7 +178,7 @@ t_del_stale_mfa(_Config) ->
Ids2 =
[
begin
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
{ok, TnxId, ok} = multicall(M, F, A),
TnxId
end
|| _ <- Keys2
@ -203,7 +205,7 @@ t_del_stale_mfa(_Config) ->
t_skip_failed_commit(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000),
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
@ -212,7 +214,7 @@ t_skip_failed_commit(_Config) ->
tnx_ids(List1)
),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 2, ok} = multicall(M, F, A, 1, 1000),
2 = gen_server:call(?NODE2, skip_failed_commit, 5000),
{atomic, List2} = emqx_cluster_rpc:status(),
?assertEqual(
@ -224,7 +226,7 @@ t_skip_failed_commit(_Config) ->
t_fast_forward_commit(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000),
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
@ -233,11 +235,11 @@ t_fast_forward_commit(_Config) ->
tnx_ids(List1)
),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 3, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 4, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 5, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{retry, 6, ok, _} = emqx_cluster_rpc:multicall(M, F, A, 2, 1000),
{ok, 2, ok} = multicall(M, F, A, 1, 1000),
{ok, 3, ok} = multicall(M, F, A, 1, 1000),
{ok, 4, ok} = multicall(M, F, A, 1, 1000),
{ok, 5, ok} = multicall(M, F, A, 1, 1000),
{peers_lagging, 6, ok, _} = multicall(M, F, A, 2, 1000),
3 = gen_server:call(?NODE2, {fast_forward_to_commit, 3}, 5000),
4 = gen_server:call(?NODE2, {fast_forward_to_commit, 4}, 5000),
6 = gen_server:call(?NODE2, {fast_forward_to_commit, 7}, 5000),
@ -333,3 +335,9 @@ failed_on_other_recover_after_retry(Pid) ->
[{_, Res}] = ets:lookup(test, other_mfa_result),
Res
end.
multicall(M, F, A, N, T) ->
emqx_cluster_rpc:do_multicall(M, F, A, N, T).
multicall(M, F, A) ->
multicall(M, F, A, all, timer:minutes(2)).

View File

@ -70,6 +70,7 @@ end_per_testcase(_, _Config) ->
ok.
t_list_raw_empty(_) ->
ok = emqx_config:erase(hd(emqx_connector:config_key_path())),
Result = emqx_connector:list_raw(),
?assertEqual([], Result).

View File

@ -556,7 +556,7 @@ with_gateway(GwName0, Fun) ->
end,
case emqx_gateway:lookup(GwName) of
undefined ->
return_http_error(404, "Gateway not load");
return_http_error(404, "Gateway not loaded");
Gateway ->
Fun(GwName, Gateway)
end

View File

@ -317,7 +317,7 @@ get_plugins() ->
upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) ->
[{FileName, Bin}] = maps:to_list(maps:without([type], Plugin)),
%% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall
%% TODO what happened when a new node join in?
%% TODO what happens when a new node join in?
%% emqx_plugins_monitor should copy plugins from other core node when boot-up.
case emqx_plugins:describe(string:trim(FileName, trailing, ".tar.gz")) of
{error, #{error := "bad_info_file", return := {enoent, _}}} ->
@ -358,16 +358,11 @@ upload_install(post, #{}) ->
}}.
do_install_package(FileName, Bin) ->
{Res, _} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin),
case lists:filter(fun(R) -> R =/= ok end, Res) of
[] ->
{200};
[{error, Reason} | _] ->
{400, #{
code => 'UNEXPECTED_ERROR',
message => iolist_to_binary(io_lib:format("~p", [Reason]))
}}
end.
%% TODO: handle bad nodes
{[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin),
%% TODO: handle non-OKs
[] = lists:filter(fun(R) -> R =/= ok end, Res),
{200}.
plugin(get, #{bindings := #{name := Name}}) ->
{Plugins, _} = emqx_mgmt_api_plugins_proto_v1:describe_package(Name),
@ -376,11 +371,11 @@ plugin(get, #{bindings := #{name := Name}}) ->
[] -> {404, #{code => 'NOT_FOUND', message => Name}}
end;
plugin(delete, #{bindings := #{name := Name}}) ->
{ok, _TnxId, Res} = emqx_mgmt_api_plugins_proto_v1:delete_package(Name),
Res = emqx_mgmt_api_plugins_proto_v1:delete_package(Name),
return(204, Res).
update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
{ok, _TnxId, Res} = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action),
Res = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action),
return(204, Res).
update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
@ -422,7 +417,8 @@ delete_package(Name) ->
ok ->
_ = emqx_plugins:ensure_disabled(Name),
_ = emqx_plugins:ensure_uninstalled(Name),
_ = emqx_plugins:delete_package(Name);
_ = emqx_plugins:delete_package(Name),
ok;
Error ->
Error
end.
@ -430,20 +426,19 @@ delete_package(Name) ->
%% for RPC plugin update
ensure_action(Name, start) ->
_ = emqx_plugins:ensure_enabled(Name),
_ = emqx_plugins:ensure_started(Name);
_ = emqx_plugins:ensure_started(Name),
ok;
ensure_action(Name, stop) ->
_ = emqx_plugins:ensure_stopped(Name),
_ = emqx_plugins:ensure_disabled(Name);
_ = emqx_plugins:ensure_disabled(Name),
ok;
ensure_action(Name, restart) ->
_ = emqx_plugins:ensure_enabled(Name),
_ = emqx_plugins:restart(Name).
_ = emqx_plugins:restart(Name),
ok.
return(Code, ok) ->
{Code};
return(Code, {ok, Result}) ->
{Code, Result};
return(_, {error, #{error := "bad_info_file", return := {enoent, _}, path := Path}}) ->
{404, #{code => 'NOT_FOUND', message => Path}};
return(_, {error, Reason}) ->
{400, #{code => 'PARAM_ERROR', message => iolist_to_binary(io_lib:format("~p", [Reason]))}}.

View File

@ -43,11 +43,10 @@ install_package(Filename, Bin) ->
describe_package(Name) ->
rpc:multicall(emqx_mgmt_api_plugins, describe_package, [Name], 10000).
-spec delete_package(binary() | string()) -> emqx_cluster_rpc:multicall_return().
-spec delete_package(binary() | string()) -> ok | {error, any()}.
delete_package(Name) ->
emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000).
-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') ->
emqx_cluster_rpc:multicall_return().
-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> ok | {error, any()}.
ensure_action(Name, Action) ->
emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000).

View File

@ -61,8 +61,6 @@
can_topic_match_oneof/2
]).
-export([cluster_call/3]).
-compile({no_auto_import, [float/1]}).
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
@ -307,7 +305,3 @@ can_topic_match_oneof(Topic, Filters) ->
end,
Filters
).
cluster_call(Module, Func, Args) ->
{ok, _TnxId, Result} = emqx_cluster_rpc:multicall(Module, Func, Args),
Result.

View File

@ -337,7 +337,7 @@ do_ensure_started(NameVsn) ->
).
%% try the function, catch 'throw' exceptions as normal 'error' return
%% other exceptions with stacktrace returned.
%% other exceptions with stacktrace logged.
tryit(WhichOp, F) ->
try
F()
@ -648,7 +648,7 @@ put_config(Key, Value) when is_atom(Key) ->
put_config([Key], Value);
put_config(Path, Values) when is_list(Path) ->
Opts = #{rawconf_with_defaults => true, override_to => cluster},
case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of
{ok, _} -> ok;
Error -> Error
end.

View File

@ -125,20 +125,10 @@ purge_test() ->
meck_emqx() ->
meck:new(emqx, [unstick, passthrough]),
meck:expect(
emqx,
update_config,
emqx_conf,
update,
fun(Path, Values, _Opts) ->
emqx_config:put(Path, Values)
end
),
%meck:expect(emqx, get_config,
% fun(KeyPath, Default) ->
% Map = emqx:get_raw_config(KeyPath, Default),
% Map1 = emqx_map_lib:safe_atom_key_map(Map),
% case Map1 of
% #{states := Plugins} ->
% Map1#{states => [emqx_map_lib:safe_atom_key_map(P) ||P <- Plugins]};
% _ -> Map1
% end
% end),
ok.

View File

@ -174,7 +174,7 @@ create(InstId, Group, ResourceType, Config) ->
-spec create(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create(InstId, Group, ResourceType, Config, Opts) ->
wrap_rpc(emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts)).
emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts).
% --------------------------------------------
-spec create_local(instance_id(), resource_group(), resource_type(), resource_config()) ->
@ -196,7 +196,7 @@ create_local(InstId, Group, ResourceType, Config, Opts) ->
-spec create_dry_run(resource_type(), resource_config()) ->
ok | {error, Reason :: term()}.
create_dry_run(ResourceType, Config) ->
wrap_rpc(emqx_resource_proto_v1:create_dry_run(ResourceType, Config)).
emqx_resource_proto_v1:create_dry_run(ResourceType, Config).
-spec create_dry_run_local(resource_type(), resource_config()) ->
ok | {error, Reason :: term()}.
@ -211,7 +211,7 @@ recreate(InstId, ResourceType, Config) ->
-spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
{ok, resource_data()} | {error, Reason :: term()}.
recreate(InstId, ResourceType, Config, Opts) ->
wrap_rpc(emqx_resource_proto_v1:recreate(InstId, ResourceType, Config, Opts)).
emqx_resource_proto_v1:recreate(InstId, ResourceType, Config, Opts).
-spec recreate_local(instance_id(), resource_type(), resource_config()) ->
{ok, resource_data()} | {error, Reason :: term()}.
@ -225,7 +225,7 @@ recreate_local(InstId, ResourceType, Config, Opts) ->
-spec remove(instance_id()) -> ok | {error, Reason :: term()}.
remove(InstId) ->
wrap_rpc(emqx_resource_proto_v1:remove(InstId)).
emqx_resource_proto_v1:remove(InstId).
-spec remove_local(instance_id()) -> ok | {error, Reason :: term()}.
remove_local(InstId) ->
@ -237,7 +237,7 @@ reset_metrics_local(InstId) ->
-spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}.
reset_metrics(InstId) ->
wrap_rpc(emqx_resource_proto_v1:reset_metrics(InstId)).
emqx_resource_proto_v1:reset_metrics(InstId).
%% =================================================================================
-spec query(instance_id(), Request :: term()) -> Result :: term().
@ -430,11 +430,5 @@ inc_metrics_funcs(InstId) ->
safe_apply(Func, Args) ->
?SAFE_CALL(erlang:apply(Func, Args)).
wrap_rpc(Ret) ->
case Ret of
{ok, _TxnId, Result} -> Result;
Failed -> Failed
end.
query_error(Reason, Msg) ->
{error, {?MODULE, #{reason => Reason, msg => Msg}}}.

View File

@ -40,7 +40,7 @@ introduced_in() ->
resource_config(),
create_opts()
) ->
emqx_cluster_rpc:multicall_return(resource_data()).
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create(InstId, Group, ResourceType, Config, Opts) ->
emqx_cluster_rpc:multicall(emqx_resource, create_local, [
InstId, Group, ResourceType, Config, Opts
@ -50,7 +50,7 @@ create(InstId, Group, ResourceType, Config, Opts) ->
resource_type(),
resource_config()
) ->
emqx_cluster_rpc:multicall_return(resource_data()).
ok | {error, Reason :: term()}.
create_dry_run(ResourceType, Config) ->
emqx_cluster_rpc:multicall(emqx_resource, create_dry_run_local, [ResourceType, Config]).
@ -60,16 +60,14 @@ create_dry_run(ResourceType, Config) ->
resource_config(),
create_opts()
) ->
emqx_cluster_rpc:multicall_return(resource_data()).
{ok, resource_data()} | {error, Reason :: term()}.
recreate(InstId, ResourceType, Config, Opts) ->
emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [InstId, ResourceType, Config, Opts]).
-spec remove(instance_id()) ->
emqx_cluster_rpc:multicall_return(ok).
-spec remove(instance_id()) -> ok | {error, Reason :: term()}.
remove(InstId) ->
emqx_cluster_rpc:multicall(emqx_resource, remove_local, [InstId]).
-spec reset_metrics(instance_id()) ->
emqx_cluster_rpc:multicall_return(ok).
-spec reset_metrics(instance_id()) -> ok | {error, any()}.
reset_metrics(InstId) ->
emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [InstId]).

View File

@ -321,7 +321,7 @@ replace_sql_clrf(#{<<"sql">> := SQL} = Params) ->
end.
'/rules/:id/reset_metrics'(put, #{bindings := #{id := RuleId}}) ->
case emqx_rule_engine_proto_v1:reset_metrics(RuleId) of
{ok, _TxnId, _Result} ->
ok ->
{200, <<"Reset Success">>};
Failed ->
{400, #{

View File

@ -30,7 +30,6 @@
introduced_in() ->
"5.0.0".
-spec reset_metrics(rule_id()) ->
emqx_cluster_rpc:multicall_return(ok).
-spec reset_metrics(rule_id()) -> ok | {error, any()}.
reset_metrics(RuleId) ->
emqx_cluster_rpc:multicall(emqx_rule_engine, reset_metrics_for_rule, [RuleId]).

View File

@ -0,0 +1,28 @@
#!/usr/bin/env bash
set -euo pipefail
set -x
PROFILE="$1"
COMPILE="${2:-no}"
DISTRO="$(./scripts/get-distro.sh)"
PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh "$PROFILE")}"
case "$DISTRO" in
ubuntu20*)
EMQX_DOCKERFILE="Dockerfile.ubuntu20.04.runner"
;;
*)
echo "sorry, no support for $DISTRO yet"
exit 1
esac
if [ "$COMPILE" = '--compile' ]; then
make "$PROFILE"
sync
fi
export DOCKER_BUILDKIT=1
docker build --build-arg PROFILE="${PROFILE}" \
-t "emqx/emqx:${PKG_VSN}-${DISTRO}" \
-f "$EMQX_DOCKERFILE" .

View File

@ -2,14 +2,6 @@
set -euo pipefail
## This script takes the first argument as docker image name,
## starts two containers running with the built code mount
## into docker containers.
##
## NOTE: containers are not instructed to rebuild emqx,
## Please use a docker image which is compatible with
## the docker host.
##
## EMQX can only start with longname (https://erlang.org/doc/reference_manual/distributed.html)
## The host name part of EMQX's node name has to be static, this means we should either
## pre-assign static IP for containers, or ensure containers can communiate with each other by name
@ -19,7 +11,6 @@ set -euo pipefail
cd -P -- "$(dirname -- "$0")/.."
IMAGE="${1}"
PROJ_DIR="$(pwd)"
NET='emqx.io'
NODE1="node1.$NET"
@ -35,23 +26,23 @@ docker network create "$NET"
docker run -d -t --restart=always --name "$NODE1" \
--net "$NET" \
-e EMQX_LOG__CONSOLE_HANDLER__LEVEL=debug \
-e EMQX_NODE_NAME="emqx@$NODE1" \
-e EMQX_NODE_COOKIE="$COOKIE" \
-p 18083:18083 \
-v "$PROJ_DIR"/_build/emqx/rel/emqx:/built \
"$IMAGE" sh -c 'cp -r /built /emqx && /emqx/bin/emqx console'
"$IMAGE"
docker run -d -t --restart=always --name "$NODE2" \
--net "$NET" \
-e EMQX_LOG__CONSOLE_HANDLER__LEVEL=debug \
-e EMQX_NODE_NAME="emqx@$NODE2" \
-e EMQX_NODE_COOKIE="$COOKIE" \
-p 18084:18083 \
-v "$PROJ_DIR"/_build/emqx/rel/emqx:/built \
"$IMAGE" sh -c 'cp -r /built /emqx && /emqx/bin/emqx console'
"$IMAGE"
wait (){
container="$1"
while ! docker exec "$container" /emqx/bin/emqx_ctl status >/dev/null 2>&1; do
while ! docker exec "$container" emqx_ctl status >/dev/null 2>&1; do
echo -n '.'
sleep 1
done
@ -61,4 +52,4 @@ wait $NODE1
wait $NODE2
echo
docker exec $NODE1 /emqx/bin/emqx_ctl cluster join "emqx@$NODE2"
docker exec $NODE1 emqx_ctl cluster join "emqx@$NODE2"