Merge pull request #7767 from zmstone/style-erlfmt-app-auto-sub-and-conf

Style: erlfmt app auto sub and conf
This commit is contained in:
Zaiming (Stone) Shi 2022-04-25 21:50:59 +01:00 committed by GitHub
commit 739e72c520
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1566 additions and 1038 deletions

View File

@ -120,21 +120,45 @@ t_listener_authenticator_import_users(_) ->
test_authenticator_import_users(["listeners", ?TCP_DEFAULT]). test_authenticator_import_users(["listeners", ?TCP_DEFAULT]).
t_aggregate_metrics(_) -> t_aggregate_metrics(_) ->
Metrics = #{ 'emqx@node1.emqx.io' => #{metrics => Metrics = #{
#{failed => 0,matched => 1,rate => 0.0, 'emqx@node1.emqx.io' => #{
rate_last5m => 0.0,rate_max => 0.1, metrics =>
success => 1} #{
}, failed => 0,
'emqx@node2.emqx.io' => #{metrics => matched => 1,
#{failed => 0,matched => 1,rate => 0.0, rate => 0.0,
rate_last5m => 0.0,rate_max => 0.1, rate_last5m => 0.0,
success => 1} rate_max => 0.1,
} success => 1
}, }
},
'emqx@node2.emqx.io' => #{
metrics =>
#{
failed => 0,
matched => 1,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.1,
success => 1
}
}
},
Res = emqx_authn_api:aggregate_metrics(maps:values(Metrics)), Res = emqx_authn_api:aggregate_metrics(maps:values(Metrics)),
?assertEqual(#{metrics => ?assertEqual(
#{failed => 0,matched => 2,rate => 0.0,rate_last5m => 0.0, #{
rate_max => 0.2,success => 2}}, Res). metrics =>
#{
failed => 0,
matched => 2,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.2,
success => 2
}
},
Res
).
test_authenticators(PathPrefix) -> test_authenticators(PathPrefix) ->
ValidConfig = emqx_authn_test_lib:http_example(), ValidConfig = emqx_authn_test_lib:http_example(),

View File

@ -449,21 +449,45 @@ t_move_source(_) ->
ok. ok.
t_aggregate_metrics(_) -> t_aggregate_metrics(_) ->
Metrics = #{ 'emqx@node1.emqx.io' => #{metrics => Metrics = #{
#{failed => 0,matched => 1,rate => 0.0, 'emqx@node1.emqx.io' => #{
rate_last5m => 0.0,rate_max => 0.1, metrics =>
success => 1} #{
}, failed => 0,
'emqx@node2.emqx.io' => #{metrics => matched => 1,
#{failed => 0,matched => 1,rate => 0.0, rate => 0.0,
rate_last5m => 0.0,rate_max => 0.1, rate_last5m => 0.0,
success => 1} rate_max => 0.1,
} success => 1
}, }
},
'emqx@node2.emqx.io' => #{
metrics =>
#{
failed => 0,
matched => 1,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.1,
success => 1
}
}
},
Res = emqx_authn_api:aggregate_metrics(maps:values(Metrics)), Res = emqx_authn_api:aggregate_metrics(maps:values(Metrics)),
?assertEqual(#{metrics => ?assertEqual(
#{failed => 0,matched => 2,rate => 0.0,rate_last5m => 0.0, #{
rate_max => 0.2,success => 2}}, Res). metrics =>
#{
failed => 0,
matched => 2,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.2,
success => 2
}
},
Res
).
get_sources(Result) -> get_sources(Result) ->
maps:get(<<"sources">>, jsx:decode(Result), []). maps:get(<<"sources">>, jsx:decode(Result), []).

View File

@ -1,9 +1,10 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {emqx, {path, "../emqx"}} {deps, [{emqx, {path, "../emqx"}}]}.
]}.
{shell, [ {shell, [
{apps, [emqx_auto_subscribe]} {apps, [emqx_auto_subscribe]}
]}. ]}.
{project_plugins, [erlfmt]}.

View File

@ -18,10 +18,11 @@
-behaviour(minirest_api). -behaviour(minirest_api).
-export([ api_spec/0 -export([
, paths/0 api_spec/0,
, schema/1 paths/0,
]). schema/1
]).
-export([auto_subscribe/2]). -export([auto_subscribe/2]).
@ -45,8 +46,8 @@ schema("/mqtt/auto_subscribe") ->
description => ?DESC(list_auto_subscribe_api), description => ?DESC(list_auto_subscribe_api),
responses => #{ responses => #{
200 => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe") 200 => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe")
} }
}, },
put => #{ put => #{
description => ?DESC(update_auto_subscribe_api), description => ?DESC(update_auto_subscribe_api),
'requestBody' => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe"), 'requestBody' => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe"),
@ -54,23 +55,27 @@ schema("/mqtt/auto_subscribe") ->
200 => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe"), 200 => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe"),
409 => emqx_dashboard_swagger:error_codes( 409 => emqx_dashboard_swagger:error_codes(
[?EXCEED_LIMIT], [?EXCEED_LIMIT],
?DESC(update_auto_subscribe_api_response409)) ?DESC(update_auto_subscribe_api_response409)
} )
} }
}
}. }.
%%%============================================================================================== %%%==============================================================================================
%% api apply %% api apply
auto_subscribe(get, _) -> auto_subscribe(get, _) ->
{200, emqx_auto_subscribe:list()}; {200, emqx_auto_subscribe:list()};
auto_subscribe(put, #{body := #{}}) -> auto_subscribe(put, #{body := #{}}) ->
{400, #{code => ?BAD_REQUEST, message => <<"Request body required">>}}; {400, #{code => ?BAD_REQUEST, message => <<"Request body required">>}};
auto_subscribe(put, #{body := Params}) -> auto_subscribe(put, #{body := Params}) ->
case emqx_auto_subscribe:update(Params) of case emqx_auto_subscribe:update(Params) of
{error, quota_exceeded} -> {error, quota_exceeded} ->
Message = list_to_binary(io_lib:format("Max auto subscribe topic count is ~p", Message = list_to_binary(
[emqx_auto_subscribe:max_limit()])), io_lib:format(
"Max auto subscribe topic count is ~p",
[emqx_auto_subscribe:max_limit()]
)
),
{409, #{code => ?EXCEED_LIMIT, message => Message}}; {409, #{code => ?EXCEED_LIMIT, message => Message}};
{error, Reason} -> {error, Reason} ->
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),

View File

@ -22,37 +22,45 @@
-export([to_topic_table/3]). -export([to_topic_table/3]).
-spec(generate(list() | map()) -> list() | map()). -spec generate(list() | map()) -> list() | map().
generate(Topics) when is_list(Topics) -> generate(Topics) when is_list(Topics) ->
[generate(Topic) || Topic <- Topics]; [generate(Topic) || Topic <- Topics];
generate(T = #{topic := Topic}) -> generate(T = #{topic := Topic}) ->
T#{placeholder => generate(Topic, [])}. T#{placeholder => generate(Topic, [])}.
-spec(to_topic_table(list(), map(), map()) -> list()). -spec to_topic_table(list(), map(), map()) -> list().
to_topic_table(PHs, ClientInfo, ConnInfo) -> to_topic_table(PHs, ClientInfo, ConnInfo) ->
Fold = fun(#{qos := Qos, rh := RH, rap := RAP, nl := NL, Fold = fun(
placeholder := PlaceHolder, topic := RawTopic #{
}, qos := Qos,
Acc) -> rh := RH,
case to_topic(PlaceHolder, ClientInfo, ConnInfo, []) of rap := RAP,
{error, Reason} -> nl := NL,
?SLOG(warning, #{msg => "auto_subscribe_ignored", placeholder := PlaceHolder,
topic => RawTopic, topic := RawTopic
reason => Reason },
}), Acc
Acc; ) ->
<<>> -> case to_topic(PlaceHolder, ClientInfo, ConnInfo, []) of
?SLOG(warning, #{msg => "auto_subscribe_ignored", {error, Reason} ->
topic => RawTopic, ?SLOG(warning, #{
reason => empty_topic msg => "auto_subscribe_ignored",
}), topic => RawTopic,
Acc; reason => Reason
Topic0 -> }),
{Topic, Opts} = emqx_topic:parse(Topic0), Acc;
[{Topic, Opts#{qos => Qos, rh => RH, rap => RAP, nl => NL}} | Acc] <<>> ->
end ?SLOG(warning, #{
end, msg => "auto_subscribe_ignored",
topic => RawTopic,
reason => empty_topic
}),
Acc;
Topic0 ->
{Topic, Opts} = emqx_topic:parse(Topic0),
[{Topic, Opts#{qos => Qos, rh => RH, rap => RAP, nl => NL}} | Acc]
end
end,
lists:foldl(Fold, [], PHs). lists:foldl(Fold, [], PHs).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -21,10 +21,12 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/emqx_placeholder.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl").
-export([ namespace/0 -export([
, roots/0 namespace/0,
, fields/1 roots/0,
, desc/1]). fields/1,
desc/1
]).
namespace() -> "auto_subscribe". namespace() -> "auto_subscribe".
@ -32,27 +34,41 @@ roots() ->
["auto_subscribe"]. ["auto_subscribe"].
fields("auto_subscribe") -> fields("auto_subscribe") ->
[ {topics, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "topic")), [
#{desc => ?DESC(auto_subscribe)})} {topics,
hoconsc:mk(
hoconsc:array(hoconsc:ref(?MODULE, "topic")),
#{desc => ?DESC(auto_subscribe)}
)}
]; ];
fields("topic") -> fields("topic") ->
[ {topic, sc(binary(), #{ [
required => true, {topic,
example => topic_example(), sc(binary(), #{
desc => ?DESC("topic")})} required => true,
, {qos, sc(emqx_schema:qos(), #{ example => topic_example(),
default => 0, desc => ?DESC("topic")
desc => ?DESC("qos")})} })},
, {rh, sc(range(0,2), #{ {qos,
default => 0, sc(emqx_schema:qos(), #{
desc => ?DESC("rh")})} default => 0,
, {rap, sc(range(0, 1), #{ desc => ?DESC("qos")
default => 0, })},
desc => ?DESC("rap")})} {rh,
, {nl, sc(range(0, 1), #{ sc(range(0, 2), #{
default => 0, default => 0,
desc => ?DESC(nl)})} desc => ?DESC("rh")
})},
{rap,
sc(range(0, 1), #{
default => 0,
desc => ?DESC("rap")
})},
{nl,
sc(range(0, 1), #{
default => 0,
desc => ?DESC(nl)
})}
]. ].
desc("auto_subscribe") -> ?DESC("auto_subscribe"); desc("auto_subscribe") -> ?DESC("auto_subscribe");
@ -60,10 +76,8 @@ desc("topic") -> ?DESC("topic");
desc(_) -> undefined. desc(_) -> undefined.
topic_example() -> topic_example() ->
<<"/clientid/", ?PH_S_CLIENTID, <<"/clientid/", ?PH_S_CLIENTID, "/username/", ?PH_S_USERNAME, "/host/", ?PH_S_HOST, "/port/",
"/username/", ?PH_S_USERNAME, ?PH_S_PORT>>.
"/host/", ?PH_S_HOST,
"/port/", ?PH_S_PORT>>.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions

View File

@ -28,9 +28,11 @@ start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []). supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) -> init([]) ->
SupFlags = #{strategy => one_for_all, SupFlags = #{
intensity => 0, strategy => one_for_all,
period => 1}, intensity => 0,
period => 1
},
ChildSpecs = [], ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.

View File

@ -17,13 +17,12 @@
-export([init/1]). -export([init/1]).
-spec(init(hocons:config()) -> {Module :: atom(), Config :: term()}). -spec init(hocons:config()) -> {Module :: atom(), Config :: term()}.
init(Config) -> init(Config) ->
do_init(Config). do_init(Config).
do_init(Config = #{topics := _Topics}) -> do_init(Config = #{topics := _Topics}) ->
Options = emqx_auto_subscribe_internal:init(Config), Options = emqx_auto_subscribe_internal:init(Config),
{emqx_auto_subscribe_internal, Options}; {emqx_auto_subscribe_internal, Options};
do_init(_Config) -> do_init(_Config) ->
erlang:error(not_supported). erlang:error(not_supported).

View File

@ -19,11 +19,11 @@
-export([handle/3]). -export([handle/3]).
-spec(init(Config :: map()) -> HandlerOptions :: term()). -spec init(Config :: map()) -> HandlerOptions :: term().
init(#{topics := Topics}) -> init(#{topics := Topics}) ->
emqx_auto_subscribe_placeholder:generate(Topics). emqx_auto_subscribe_placeholder:generate(Topics).
-spec(handle(ClientInfo :: map(), ConnInfo :: map(), HandlerOptions :: term()) -> -spec handle(ClientInfo :: map(), ConnInfo :: map(), HandlerOptions :: term()) ->
TopicTables :: list()). TopicTables :: list().
handle(ClientInfo, ConnInfo, PlaceHolders) -> handle(ClientInfo, ConnInfo, PlaceHolders) ->
emqx_auto_subscribe_placeholder:to_topic_table(PlaceHolders, ClientInfo, ConnInfo). emqx_auto_subscribe_placeholder:to_topic_table(PlaceHolders, ClientInfo, ConnInfo).

View File

@ -64,32 +64,34 @@ init_per_suite(Config) ->
application:load(?APP), application:load(?APP),
ok = emqx_common_test_helpers:load_config( ok = emqx_common_test_helpers:load_config(
emqx_auto_subscribe_schema, emqx_auto_subscribe_schema,
<<"auto_subscribe {\n" <<
" topics = [\n" "auto_subscribe {\n"
" {\n" " topics = [\n"
" topic = \"/c/${clientid}\"\n" " {\n"
" },\n" " topic = \"/c/${clientid}\"\n"
" {\n" " },\n"
" topic = \"/u/${username}\"\n" " {\n"
" },\n" " topic = \"/u/${username}\"\n"
" {\n" " },\n"
" topic = \"/h/${host}\"\n" " {\n"
" },\n" " topic = \"/h/${host}\"\n"
" {\n" " },\n"
" topic = \"/p/${port}\"\n" " {\n"
" },\n" " topic = \"/p/${port}\"\n"
" {\n" " },\n"
" topic = \"/client/${clientid}/username/${username}/host/${host}/port/${port}\"\n" " {\n"
" },\n" " topic = \"/client/${clientid}/username/${username}/host/${host}/port/${port}\"\n"
" {\n" " },\n"
" topic = \"/topic/simple\"\n" " {\n"
" qos = 1\n" " topic = \"/topic/simple\"\n"
" rh = 0\n" " qos = 1\n"
" rap = 0\n" " rh = 0\n"
" nl = 0\n" " rap = 0\n"
" }\n" " nl = 0\n"
" ]\n" " }\n"
" }">> " ]\n"
" }"
>>
), ),
emqx_common_test_helpers:start_apps( emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_dashboard, ?APP], [emqx_conf, emqx_dashboard, ?APP],

View File

@ -1,4 +1,3 @@
-ifndef(EMQX_CONF_HRL). -ifndef(EMQX_CONF_HRL).
-define(EMQX_CONF_HRL, true). -define(EMQX_CONF_HRL, true).

View File

@ -1,10 +1,11 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {emqx, {path, "../emqx"}} {deps, [{emqx, {path, "../emqx"}}]}.
]}.
{shell, [ {shell, [
% {config, "config/sys.config"}, % {config, "config/sys.config"},
{apps, [emqx_conf]} {apps, [emqx_conf]}
]}. ]}.
{project_plugins, [erlfmt]}.

View File

@ -22,12 +22,25 @@
%% Note: multicall functions are statically checked by %% Note: multicall functions are statically checked by
%% `emqx_bapi_trans' and `emqx_bpapi_static_checks' modules. Don't %% `emqx_bapi_trans' and `emqx_bpapi_static_checks' modules. Don't
%% forget to update it when adding or removing them here: %% forget to update it when adding or removing them here:
-export([multicall/3, multicall/5, query/1, reset/0, status/0, -export([
skip_failed_commit/1, fast_forward_to_commit/2]). multicall/3, multicall/5,
query/1,
reset/0,
status/0,
skip_failed_commit/1,
fast_forward_to_commit/2
]).
-export([get_node_tnx_id/1, latest_tnx_id/0]). -export([get_node_tnx_id/1, latest_tnx_id/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([
handle_continue/2, code_change/3]). init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
handle_continue/2,
code_change/3
]).
-export_type([txn_id/0, succeed_num/0, multicall_return/1, multicall_return/0]). -export_type([txn_id/0, succeed_num/0, multicall_return/1, multicall_return/0]).
@ -48,9 +61,10 @@
-type succeed_num() :: pos_integer() | all. -type succeed_num() :: pos_integer() | all.
-type multicall_return(Result) :: {ok, txn_id(), Result} -type multicall_return(Result) ::
| {error, term()} {ok, txn_id(), Result}
| {retry, txn_id(), Result, node()}. | {error, term()}
| {retry, txn_id(), Result, node()}.
-type multicall_return() :: multicall_return(_). -type multicall_return() :: multicall_return(_).
@ -63,13 +77,15 @@ mnesia(boot) ->
{rlog_shard, ?CLUSTER_RPC_SHARD}, {rlog_shard, ?CLUSTER_RPC_SHARD},
{storage, disc_copies}, {storage, disc_copies},
{record_name, cluster_rpc_mfa}, {record_name, cluster_rpc_mfa},
{attributes, record_info(fields, cluster_rpc_mfa)}]), {attributes, record_info(fields, cluster_rpc_mfa)}
]),
ok = mria:create_table(?CLUSTER_COMMIT, [ ok = mria:create_table(?CLUSTER_COMMIT, [
{type, set}, {type, set},
{rlog_shard, ?CLUSTER_RPC_SHARD}, {rlog_shard, ?CLUSTER_RPC_SHARD},
{storage, disc_copies}, {storage, disc_copies},
{record_name, cluster_rpc_commit}, {record_name, cluster_rpc_commit},
{attributes, record_info(fields, cluster_rpc_commit)}]). {attributes, record_info(fields, cluster_rpc_commit)}
]).
start_link() -> start_link() ->
start_link(node(), ?MODULE, get_retry_ms()). start_link(node(), ?MODULE, get_retry_ms()).
@ -97,7 +113,8 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu
Begin = erlang:monotonic_time(), Begin = erlang:monotonic_time(),
InitRes = InitRes =
case mria_rlog:role() of case mria_rlog:role() of
core -> gen_server:call(?MODULE, MFA, Timeout); core ->
gen_server:call(?MODULE, MFA, Timeout);
replicant -> replicant ->
%% the initiate transaction must happened on core node %% the initiate transaction must happened on core node
%% make sure MFA(in the transaction) and the transaction on the same node %% make sure MFA(in the transaction) and the transaction on the same node
@ -119,11 +136,14 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu
wait_for_all_nodes_commit(TnxId, MinDelay, RetryTimeout); wait_for_all_nodes_commit(TnxId, MinDelay, RetryTimeout);
{ok, TnxId, _} when is_integer(RequireNum) -> {ok, TnxId, _} when is_integer(RequireNum) ->
wait_for_nodes_commit(RequireNum, TnxId, MinDelay, RetryTimeout); wait_for_nodes_commit(RequireNum, TnxId, MinDelay, RetryTimeout);
Error -> Error Error ->
Error
end, end,
case OkOrFailed of case OkOrFailed of
ok -> InitRes; ok ->
{error, Error0} -> {error, Error0}; InitRes;
{error, Error0} ->
{error, Error0};
{retry, Node0} -> {retry, Node0} ->
{ok, TnxId0, MFARes} = InitRes, {ok, TnxId0, MFARes} = InitRes,
{retry, TnxId0, MFARes, Node0} {retry, TnxId0, MFARes, Node0}
@ -187,7 +207,6 @@ handle_call(reset, _From, State) ->
_ = mria:clear_table(?CLUSTER_COMMIT), _ = mria:clear_table(?CLUSTER_COMMIT),
_ = mria:clear_table(?CLUSTER_MFA), _ = mria:clear_table(?CLUSTER_MFA),
{reply, ok, State, {continue, ?CATCH_UP}}; {reply, ok, State, {continue, ?CATCH_UP}};
handle_call({initiate, MFA}, _From, State = #{node := Node}) -> handle_call({initiate, MFA}, _From, State = #{node := Node}) ->
case transaction(fun init_mfa/2, [Node, MFA]) of case transaction(fun init_mfa/2, [Node, MFA]) of
{atomic, {ok, TnxId, Result}} -> {atomic, {ok, TnxId, Result}} ->
@ -226,21 +245,25 @@ catch_up(State) -> catch_up(State, false).
catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
case transaction(fun read_next_mfa/1, [Node]) of case transaction(fun read_next_mfa/1, [Node]) of
{atomic, caught_up} -> ?TIMEOUT; {atomic, caught_up} ->
?TIMEOUT;
{atomic, {still_lagging, NextId, MFA}} -> {atomic, {still_lagging, NextId, MFA}} ->
{Succeed, _} = apply_mfa(NextId, MFA), {Succeed, _} = apply_mfa(NextId, MFA),
case Succeed orelse SkipResult of case Succeed orelse SkipResult of
true -> true ->
case transaction(fun commit/2, [Node, NextId]) of case transaction(fun commit/2, [Node, NextId]) of
{atomic, ok} -> catch_up(State, false); {atomic, ok} ->
catch_up(State, false);
Error -> Error ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed_to_commit_applied_call", msg => "failed_to_commit_applied_call",
applied_id => NextId, applied_id => NextId,
error => Error}), error => Error
}),
RetryMs RetryMs
end; end;
false -> RetryMs false ->
RetryMs
end; end;
{aborted, Reason} -> {aborted, Reason} ->
?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}), ?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}),
@ -256,9 +279,12 @@ read_next_mfa(Node) ->
commit(Node, TnxId), commit(Node, TnxId),
?SLOG(notice, #{ ?SLOG(notice, #{
msg => "new_node_first_catch_up_and_start_commit.", msg => "new_node_first_catch_up_and_start_commit.",
node => Node, tnx_id => TnxId}), node => Node,
tnx_id => TnxId
}),
TnxId; TnxId;
[#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 [#cluster_rpc_commit{tnx_id = LastAppliedID}] ->
LastAppliedID + 1
end, end,
case mnesia:read(?CLUSTER_MFA, NextId) of case mnesia:read(?CLUSTER_MFA, NextId) of
[] -> caught_up; [] -> caught_up;
@ -281,8 +307,11 @@ do_catch_up(ToTnxId, Node) ->
end; end;
[#cluster_rpc_commit{tnx_id = LastAppliedId}] -> [#cluster_rpc_commit{tnx_id = LastAppliedId}] ->
Reason = lists:flatten( Reason = lists:flatten(
io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", io_lib:format(
[Node, LastAppliedId, ToTnxId])), "~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
[Node, LastAppliedId, ToTnxId]
)
),
?SLOG(error, #{ ?SLOG(error, #{
msg => "catch_up_failed!", msg => "catch_up_failed!",
last_applied_id => LastAppliedId, last_applied_id => LastAppliedId,
@ -297,11 +326,13 @@ commit(Node, TnxId) ->
do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) -> do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) ->
{atomic, NodeId} = transaction(fun get_node_tnx_id/1, [Node]), {atomic, NodeId} = transaction(fun get_node_tnx_id/1, [Node]),
case NodeId >= ToTnxId of case NodeId >= ToTnxId of
true -> NodeId; true ->
NodeId;
false -> false ->
{atomic, LatestId} = transaction(fun get_latest_id/0, []), {atomic, LatestId} = transaction(fun get_latest_id/0, []),
case LatestId =< NodeId of case LatestId =< NodeId of
true -> NodeId; true ->
NodeId;
false -> false ->
catch_up(State, true), catch_up(State, true),
do_fast_forward_to_commit(ToTnxId, State) do_fast_forward_to_commit(ToTnxId, State)
@ -319,8 +350,12 @@ init_mfa(Node, MFA) ->
LatestId = get_latest_id(), LatestId = get_latest_id(),
ok = do_catch_up_in_one_trans(LatestId, Node), ok = do_catch_up_in_one_trans(LatestId, Node),
TnxId = LatestId + 1, TnxId = LatestId + 1,
MFARec = #cluster_rpc_mfa{tnx_id = TnxId, mfa = MFA, MFARec = #cluster_rpc_mfa{
initiator = Node, created_at = erlang:localtime()}, tnx_id = TnxId,
mfa = MFA,
initiator = Node,
created_at = erlang:localtime()
},
ok = mnesia:write(?CLUSTER_MFA, MFARec, write), ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
ok = commit(Node, TnxId), ok = commit(Node, TnxId),
case apply_mfa(TnxId, MFA) of case apply_mfa(TnxId, MFA) of
@ -338,24 +373,35 @@ transaction(Func, Args) ->
mria:transaction(?CLUSTER_RPC_SHARD, Func, Args). mria:transaction(?CLUSTER_RPC_SHARD, Func, Args).
trans_status() -> trans_status() ->
mnesia:foldl(fun(Rec, Acc) -> mnesia:foldl(
#cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, fun(Rec, Acc) ->
case mnesia:read(?CLUSTER_MFA, TnxId) of #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec,
[MFARec] -> case mnesia:read(?CLUSTER_MFA, TnxId) of
#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = MFARec, [MFARec] ->
[#{ #cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} =
node => Node, MFARec,
tnx_id => TnxId, [
initiator => InitNode, #{
mfa => MFA, node => Node,
created_at => CreatedAt tnx_id => TnxId,
} | Acc]; initiator => InitNode,
[] -> Acc mfa => MFA,
end end, [], ?CLUSTER_COMMIT). created_at => CreatedAt
}
| Acc
];
[] ->
Acc
end
end,
[],
?CLUSTER_COMMIT
).
trans_query(TnxId) -> trans_query(TnxId) ->
case mnesia:read(?CLUSTER_MFA, TnxId) of case mnesia:read(?CLUSTER_MFA, TnxId) of
[] -> mnesia:abort(not_found); [] ->
mnesia:abort(not_found);
[#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] ->
#{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt}
end. end.
@ -364,11 +410,12 @@ trans_query(TnxId) ->
apply_mfa(TnxId, {M, F, A}) -> apply_mfa(TnxId, {M, F, A}) ->
Res = Res =
try erlang:apply(M, F, A) try
erlang:apply(M, F, A)
catch catch
throw : Reason -> throw:Reason ->
{error, #{reason => Reason}}; {error, #{reason => Reason}};
Class : Reason : Stacktrace -> Class:Reason:Stacktrace ->
{error, #{exception => Class, reason => Reason, stacktrace => Stacktrace}} {error, #{exception => Class, reason => Reason, stacktrace => Stacktrace}}
end, end,
%% Do not log args as it might be sensitive information %% Do not log args as it might be sensitive information
@ -400,19 +447,23 @@ wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
[_ | _] when Remain > 0 -> [_ | _] when Remain > 0 ->
ok = timer:sleep(Delay), ok = timer:sleep(Delay),
wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay); wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay);
[] -> ok; [] ->
Nodes -> {retry, Nodes} ok;
Nodes ->
{retry, Nodes}
end. end.
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) -> wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) ->
ok = timer:sleep(Delay), ok = timer:sleep(Delay),
case length(synced_nodes(TnxId)) >= RequiredNum of case length(synced_nodes(TnxId)) >= RequiredNum of
true -> ok; true ->
ok;
false when Remain > 0 -> false when Remain > 0 ->
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay); wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay);
false -> false ->
case lagging_node(TnxId) of case lagging_node(TnxId) of
[] -> ok; %% All commit but The succeedNum > length(nodes()). %% All commit but The succeedNum > length(nodes()).
[] -> ok;
Nodes -> {retry, Nodes} Nodes -> {retry, Nodes}
end end
end. end.
@ -434,7 +485,7 @@ commit_status_trans(Operator, TnxId) ->
get_retry_ms() -> get_retry_ms() ->
emqx_conf:get(["node", "cluster_call", "retry_interval"], 1000). emqx_conf:get(["node", "cluster_call", "retry_interval"], 1000).
maybe_init_tnx_id(_Node, TnxId)when TnxId < 0 -> ok; maybe_init_tnx_id(_Node, TnxId) when TnxId < 0 -> ok;
maybe_init_tnx_id(Node, TnxId) -> maybe_init_tnx_id(Node, TnxId) ->
{atomic, _} = transaction(fun init_node_tnx_id/2, [Node, TnxId]), {atomic, _} = transaction(fun init_node_tnx_id/2, [Node, TnxId]),
ok. ok.

View File

@ -21,12 +21,18 @@
-include("emqx_conf.hrl"). -include("emqx_conf.hrl").
-export([start_link/0, start_link/2]). -export([start_link/0, start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([
code_change/3]). init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
start_link() -> start_link() ->
MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100), MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100),
CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5*60*1000), CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5 * 60 * 1000),
start_link(MaxHistory, CleanupMs). start_link(MaxHistory, CleanupMs).
start_link(MaxHistory, CleanupMs) -> start_link(MaxHistory, CleanupMs) ->
@ -55,7 +61,6 @@ handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history
Error -> ?SLOG(error, #{msg => "del_stale_cluster_rpc_mfa_error", error => Error}) Error -> ?SLOG(error, #{msg => "del_stale_cluster_rpc_mfa_error", error => Error})
end, end,
{noreply, ensure_timer(State), hibernate}; {noreply, ensure_timer(State), hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}), ?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}. {noreply, State}.
@ -75,11 +80,15 @@ ensure_timer(State = #{cleanup_ms := Ms}) ->
%% @doc Keep the latest completed 100 records for querying and troubleshooting. %% @doc Keep the latest completed 100 records for querying and troubleshooting.
del_stale_mfa(MaxHistory) -> del_stale_mfa(MaxHistory) ->
DoneId = DoneId =
mnesia:foldl(fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end, mnesia:foldl(
infinity, ?CLUSTER_COMMIT), fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end,
infinity,
?CLUSTER_COMMIT
),
delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory). delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory).
delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok; delete_stale_mfa('$end_of_table', _DoneId, _Count) ->
ok;
delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId -> delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId ->
delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count); delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count);
delete_stale_mfa(CurrId, DoneId, Count) when Count > 0 -> delete_stale_mfa(CurrId, DoneId, Count) when Count > 0 ->

View File

@ -1,9 +1,9 @@
{application, emqx_conf, {application, emqx_conf, [
[{description, "EMQX configuration management"}, {description, "EMQX configuration management"},
{vsn, "0.1.0"}, {vsn, "0.1.0"},
{registered, []}, {registered, []},
{mod, {emqx_conf_app, []}}, {mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib]}, {applications, [kernel, stdlib]},
{env, []}, {env, []},
{modules, []} {modules, []}
]}. ]}.

View File

@ -40,7 +40,8 @@ init_conf() ->
copy_override_conf_from_core_node() -> copy_override_conf_from_core_node() ->
case mria_mnesia:running_nodes() -- [node()] of case mria_mnesia:running_nodes() -- [node()] of
[] -> %% The first core nodes is self. %% The first core nodes is self.
[] ->
?SLOG(debug, #{msg => "skip_copy_overide_conf_from_core_node"}), ?SLOG(debug, #{msg => "skip_copy_overide_conf_from_core_node"}),
{ok, -1}; {ok, -1};
Nodes -> Nodes ->
@ -49,26 +50,41 @@ copy_override_conf_from_core_node() ->
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0), NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
case (Failed =/= [] orelse NotReady =/= []) andalso Ready =/= [] of case (Failed =/= [] orelse NotReady =/= []) andalso Ready =/= [] of
true -> true ->
Warning = #{nodes => Nodes, failed => Failed, not_ready => NotReady, Warning = #{
msg => "ignored_bad_nodes_when_copy_init_config"}, nodes => Nodes,
failed => Failed,
not_ready => NotReady,
msg => "ignored_bad_nodes_when_copy_init_config"
},
?SLOG(warning, Warning); ?SLOG(warning, Warning);
false -> ok false ->
ok
end, end,
case Ready of case Ready of
[] -> [] ->
%% Other core nodes running but no one replicated it successfully. %% Other core nodes running but no one replicated it successfully.
?SLOG(error, #{msg => "copy_overide_conf_from_core_node_failed", ?SLOG(error, #{
nodes => Nodes, failed => Failed, not_ready => NotReady}), msg => "copy_overide_conf_from_core_node_failed",
nodes => Nodes,
failed => Failed,
not_ready => NotReady
}),
{error, "core node not ready"}; {error, "core node not ready"};
_ -> _ ->
SortFun = fun({ok, #{wall_clock := W1}}, SortFun = fun(
{ok, #{wall_clock := W2}}) -> W1 > W2 end, {ok, #{wall_clock := W1}},
{ok, #{wall_clock := W2}}
) ->
W1 > W2
end,
[{ok, Info} | _] = lists:sort(SortFun, Ready), [{ok, Info} | _] = lists:sort(SortFun, Ready),
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
Msg = #{msg => "copy_overide_conf_from_core_node_success", node => Node}, Msg = #{msg => "copy_overide_conf_from_core_node_success", node => Node},
?SLOG(debug, Msg), ?SLOG(debug, Msg),
ok = emqx_config:save_to_override_conf(RawOverrideConf, ok = emqx_config:save_to_override_conf(
#{override_to => cluster}), RawOverrideConf,
#{override_to => cluster}
),
{ok, TnxId} {ok, TnxId}
end end
end. end.
@ -77,17 +93,19 @@ get_override_config_file() ->
Node = node(), Node = node(),
Role = mria_rlog:role(), Role = mria_rlog:role(),
case emqx_app:get_init_config_load_done() of case emqx_app:get_init_config_load_done() of
false -> {error, #{node => Node, msg => "init_conf_load_not_done"}}; false ->
{error, #{node => Node, msg => "init_conf_load_not_done"}};
true when Role =:= core -> true when Role =:= core ->
case erlang:whereis(emqx_config_handler) of case erlang:whereis(emqx_config_handler) of
undefined -> {error, #{node => Node, msg => "emqx_config_handler_not_ready"}}; undefined ->
{error, #{node => Node, msg => "emqx_config_handler_not_ready"}};
_ -> _ ->
Fun = fun() -> Fun = fun() ->
TnxId = emqx_cluster_rpc:get_node_tnx_id(Node), TnxId = emqx_cluster_rpc:get_node_tnx_id(Node),
WallClock = erlang:statistics(wall_clock), WallClock = erlang:statistics(wall_clock),
Conf = emqx_config_handler:get_raw_cluster_override_conf(), Conf = emqx_config_handler:get_raw_cluster_override_conf(),
#{wall_clock => WallClock, conf => Conf, tnx_id => TnxId, node => Node} #{wall_clock => WallClock, conf => Conf, tnx_id => TnxId, node => Node}
end, end,
case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of
{atomic, Res} -> {ok, Res}; {atomic, Res} -> {ok, Res};
{aborted, Reason} -> {error, #{node => Node, msg => Reason}} {aborted, Reason} -> {error, #{node => Node, msg => Reason}}

View File

@ -15,10 +15,11 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_conf_cli). -module(emqx_conf_cli).
-export([ load/0 -export([
, admins/1 load/0,
, unload/0 admins/1,
]). unload/0
]).
-define(CMD, cluster_call). -define(CMD, cluster_call).
@ -28,65 +29,65 @@ load() ->
unload() -> unload() ->
emqx_ctl:unregister_command(?CMD). emqx_ctl:unregister_command(?CMD).
admins(["status"]) -> status(); admins(["status"]) ->
status();
admins(["skip"]) -> admins(["skip"]) ->
status(), status(),
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
lists:foreach(fun emqx_cluster_rpc:skip_failed_commit/1, Nodes), lists:foreach(fun emqx_cluster_rpc:skip_failed_commit/1, Nodes),
status(); status();
admins(["skip", Node0]) -> admins(["skip", Node0]) ->
status(), status(),
Node = list_to_existing_atom(Node0), Node = list_to_existing_atom(Node0),
emqx_cluster_rpc:skip_failed_commit(Node), emqx_cluster_rpc:skip_failed_commit(Node),
status(); status();
admins(["tnxid", TnxId0]) -> admins(["tnxid", TnxId0]) ->
TnxId = list_to_integer(TnxId0), TnxId = list_to_integer(TnxId0),
emqx_ctl:print("~p~n", [emqx_cluster_rpc:query(TnxId)]); emqx_ctl:print("~p~n", [emqx_cluster_rpc:query(TnxId)]);
admins(["fast_forward"]) -> admins(["fast_forward"]) ->
status(), status(),
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
TnxId = emqx_cluster_rpc:latest_tnx_id(), TnxId = emqx_cluster_rpc:latest_tnx_id(),
lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes), lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes),
status(); status();
admins(["fast_forward", ToTnxId]) -> admins(["fast_forward", ToTnxId]) ->
status(), status(),
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
TnxId = list_to_integer(ToTnxId), TnxId = list_to_integer(ToTnxId),
lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes), lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes),
status(); status();
admins(["fast_forward", Node0, ToTnxId]) -> admins(["fast_forward", Node0, ToTnxId]) ->
status(), status(),
TnxId = list_to_integer(ToTnxId), TnxId = list_to_integer(ToTnxId),
Node = list_to_existing_atom(Node0), Node = list_to_existing_atom(Node0),
emqx_cluster_rpc:fast_forward_to_commit(Node, TnxId), emqx_cluster_rpc:fast_forward_to_commit(Node, TnxId),
status(); status();
admins(_) -> admins(_) ->
emqx_ctl:usage( emqx_ctl:usage(
[ [
{"cluster_call status", "status"}, {"cluster_call status", "status"},
{"cluster_call skip [node]", "increase one commit on specific node"}, {"cluster_call skip [node]", "increase one commit on specific node"},
{"cluster_call tnxid <TnxId>", "get detailed about TnxId"}, {"cluster_call tnxid <TnxId>", "get detailed about TnxId"},
{"cluster_call fast_forward [node] [tnx_id]", "fast forwards to tnx_id" } {"cluster_call fast_forward [node] [tnx_id]", "fast forwards to tnx_id"}
]). ]
).
status() -> status() ->
emqx_ctl:print("-----------------------------------------------\n"), emqx_ctl:print("-----------------------------------------------\n"),
{atomic, Status} = emqx_cluster_rpc:status(), {atomic, Status} = emqx_cluster_rpc:status(),
lists:foreach(fun(S) -> lists:foreach(
#{ fun(S) ->
node := Node, #{
tnx_id := TnxId, node := Node,
mfa := {M, F, A}, tnx_id := TnxId,
created_at := CreatedAt mfa := {M, F, A},
} = S, created_at := CreatedAt
emqx_ctl:print("~p:[~w] CreatedAt:~p ~p:~p/~w\n", } = S,
[Node, TnxId, CreatedAt, M, F, length(A)]) emqx_ctl:print(
end, Status), "~p:[~w] CreatedAt:~p ~p:~p/~w\n",
[Node, TnxId, CreatedAt, M, F, length(A)]
)
end,
Status
),
emqx_ctl:print("-----------------------------------------------\n"). emqx_ctl:print("-----------------------------------------------\n").

File diff suppressed because it is too large Load Diff

View File

@ -28,12 +28,15 @@ start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []). supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) -> init([]) ->
SupFlags = #{strategy => one_for_all, SupFlags = #{
strategy => one_for_all,
intensity => 10, intensity => 10,
period => 100}, period => 100
},
ChildSpecs = ChildSpecs =
[ child_spec(emqx_cluster_rpc, []) [
, child_spec(emqx_cluster_rpc_handler, []) child_spec(emqx_cluster_rpc, []),
child_spec(emqx_cluster_rpc_handler, [])
], ],
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.

View File

@ -18,22 +18,23 @@
-behaviour(emqx_bpapi). -behaviour(emqx_bpapi).
-export([ introduced_in/0 -export([
introduced_in/0,
, get_config/2 get_config/2,
, get_config/3 get_config/3,
, get_all/1 get_all/1,
, update/3 update/3,
, update/4 update/4,
, remove_config/2 remove_config/2,
, remove_config/3 remove_config/3,
, reset/2 reset/2,
, reset/3 reset/3,
, get_override_config_file/1 get_override_config_file/1
]). ]).
-include_lib("emqx/include/bpapi.hrl"). -include_lib("emqx/include/bpapi.hrl").
@ -43,12 +44,12 @@ introduced_in() ->
"5.0.0". "5.0.0".
-spec get_config(node(), emqx_map_lib:config_key_path()) -> -spec get_config(node(), emqx_map_lib:config_key_path()) ->
term() | emqx_rpc:badrpc(). term() | emqx_rpc:badrpc().
get_config(Node, KeyPath) -> get_config(Node, KeyPath) ->
rpc:call(Node, emqx, get_config, [KeyPath]). rpc:call(Node, emqx, get_config, [KeyPath]).
-spec get_config(node(), emqx_map_lib:config_key_path(), _Default) -> -spec get_config(node(), emqx_map_lib:config_key_path(), _Default) ->
term() | emqx_rpc:badrpc(). term() | emqx_rpc:badrpc().
get_config(Node, KeyPath, Default) -> get_config(Node, KeyPath, Default) ->
rpc:call(Node, emqx, get_config, [KeyPath, Default]). rpc:call(Node, emqx, get_config, [KeyPath, Default]).
@ -56,40 +57,47 @@ get_config(Node, KeyPath, Default) ->
get_all(KeyPath) -> get_all(KeyPath) ->
rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000). rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000).
-spec update(update_config_key_path(), emqx_config:update_request(), -spec update(
emqx_config:update_opts()) -> emqx_cluster_rpc:multicall_return(). update_config_key_path(),
emqx_config:update_request(),
emqx_config:update_opts()
) -> emqx_cluster_rpc:multicall_return().
update(KeyPath, UpdateReq, Opts) -> update(KeyPath, UpdateReq, Opts) ->
emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]). emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]).
-spec update(node(), update_config_key_path(), emqx_config:update_request(), -spec update(
emqx_config:update_opts()) -> node(),
{ok, emqx_config:update_result()} update_config_key_path(),
| {error, emqx_config:update_error()} emqx_config:update_request(),
| emqx_rpc:badrpc(). emqx_config:update_opts()
) ->
{ok, emqx_config:update_result()}
| {error, emqx_config:update_error()}
| emqx_rpc:badrpc().
update(Node, KeyPath, UpdateReq, Opts) -> update(Node, KeyPath, UpdateReq, Opts) ->
rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000). rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000).
-spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> -spec remove_config(update_config_key_path(), emqx_config:update_opts()) ->
emqx_cluster_rpc:multicall_result(). emqx_cluster_rpc:multicall_result().
remove_config(KeyPath, Opts) -> remove_config(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]). emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]).
-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) -> -spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} {ok, emqx_config:update_result()}
| {error, emqx_config:update_error()} | {error, emqx_config:update_error()}
| emqx_rpc:badrpc(). | emqx_rpc:badrpc().
remove_config(Node, KeyPath, Opts) -> remove_config(Node, KeyPath, Opts) ->
rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000). rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000).
-spec reset(update_config_key_path(), emqx_config:update_opts()) -> -spec reset(update_config_key_path(), emqx_config:update_opts()) ->
emqx_cluster_rpc:multicall_return(). emqx_cluster_rpc:multicall_return().
reset(KeyPath, Opts) -> reset(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]). emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]).
-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) -> -spec reset(node(), update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} {ok, emqx_config:update_result()}
| {error, emqx_config:update_error()} | {error, emqx_config:update_error()}
| emqx_rpc:badrpc(). | emqx_rpc:badrpc().
reset(Node, KeyPath, Opts) -> reset(Node, KeyPath, Opts) ->
rpc:call(Node, emqx, reset_config, [KeyPath, Opts]). rpc:call(Node, emqx, reset_config, [KeyPath, Opts]).

View File

@ -26,16 +26,17 @@
-define(NODE2, emqx_cluster_rpc2). -define(NODE2, emqx_cluster_rpc2).
-define(NODE3, emqx_cluster_rpc3). -define(NODE3, emqx_cluster_rpc3).
all() -> [ all() ->
t_base_test, [
t_commit_fail_test, t_base_test,
t_commit_crash_test, t_commit_fail_test,
t_commit_ok_but_apply_fail_on_other_node, t_commit_crash_test,
t_commit_ok_apply_fail_on_other_node_then_recover, t_commit_ok_but_apply_fail_on_other_node,
t_del_stale_mfa, t_commit_ok_apply_fail_on_other_node_then_recover,
t_skip_failed_commit, t_del_stale_mfa,
t_fast_forward_commit t_skip_failed_commit,
]. t_fast_forward_commit
].
suite() -> [{timetrap, {minutes, 3}}]. suite() -> [{timetrap, {minutes, 3}}].
groups() -> []. groups() -> [].
@ -77,7 +78,8 @@ t_base_test(_Config) ->
?assertEqual({ok, 2, ok}, emqx_cluster_rpc:multicall(M, F, A)), ?assertEqual({ok, 2, ok}, emqx_cluster_rpc:multicall(M, F, A)),
{atomic, Status} = emqx_cluster_rpc:status(), {atomic, Status} = emqx_cluster_rpc:status(),
case length(Status) =:= 3 of case length(Status) =:= 3 of
true -> ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status)); true ->
?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status));
false -> false ->
%% wait for mnesia to write in. %% wait for mnesia to write in.
ct:sleep(42), ct:sleep(42),
@ -117,7 +119,7 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
?assertEqual(MFA, maps:get(mfa, Status)), ?assertEqual(MFA, maps:get(mfa, Status)),
?assertEqual(node(), maps:get(node, Status)), ?assertEqual(node(), maps:get(node, Status)),
erlang:send(?NODE2, test), erlang:send(?NODE2, test),
Res = gen_server:call(?NODE2, {initiate, {M, F, A}}), Res = gen_server:call(?NODE2, {initiate, {M, F, A}}),
?assertEqual({error, "MFA return not ok"}, Res), ?assertEqual({error, "MFA return not ok"}, Res),
ok. ok.
@ -126,7 +128,7 @@ t_catch_up_status_handle_next_commit(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
{M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]},
{ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), {ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 2} = gen_server:call(?NODE2, {initiate, {M, F, A}}), {ok, 2} = gen_server:call(?NODE2, {initiate, {M, F, A}}),
ok. ok.
t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
@ -139,7 +141,7 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
{ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), {ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
ct:pal("222:~p~n", [ets:tab2list(cluster_rpc_commit)]), ct:pal("222:~p~n", [ets:tab2list(cluster_rpc_commit)]),
ct:pal("333:~p~n", [emqx_cluster_rpc:status()]), ct:pal("333:~p~n", [emqx_cluster_rpc:status()]),
{atomic, [_Status|L]} = emqx_cluster_rpc:status(), {atomic, [_Status | L]} = emqx_cluster_rpc:status(),
?assertEqual([], L), ?assertEqual([], L),
ets:insert(test, {other_mfa_result, ok}), ets:insert(test, {other_mfa_result, ok}),
{ok, 2, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000), {ok, 2, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000),
@ -163,25 +165,39 @@ t_del_stale_mfa(_Config) ->
Keys = lists:seq(1, 50), Keys = lists:seq(1, 50),
Keys2 = lists:seq(51, 150), Keys2 = lists:seq(51, 150),
Ids = Ids =
[begin [
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), begin
TnxId end || _ <- Keys], {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
TnxId
end
|| _ <- Keys
],
?assertEqual(Keys, Ids), ?assertEqual(Keys, Ids),
Ids2 = Ids2 =
[begin [
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), begin
TnxId end || _ <- Keys2], {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
TnxId
end
|| _ <- Keys2
],
?assertEqual(Keys2, Ids2), ?assertEqual(Keys2, Ids2),
ct:sleep(1200), ct:sleep(1200),
[begin [
?assertEqual({aborted, not_found}, emqx_cluster_rpc:query(I)) begin
end || I <- lists:seq(1, 50)], ?assertEqual({aborted, not_found}, emqx_cluster_rpc:query(I))
[begin end
{atomic, Map} = emqx_cluster_rpc:query(I), || I <- lists:seq(1, 50)
?assertEqual(MFA, maps:get(mfa, Map)), ],
?assertEqual(node(), maps:get(initiator, Map)), [
?assert(maps:is_key(created_at, Map)) begin
end || I <- lists:seq(51, 150)], {atomic, Map} = emqx_cluster_rpc:query(I),
?assertEqual(MFA, maps:get(mfa, Map)),
?assertEqual(node(), maps:get(initiator, Map)),
?assert(maps:is_key(created_at, Map))
end
|| I <- lists:seq(51, 150)
],
ok. ok.
t_skip_failed_commit(_Config) -> t_skip_failed_commit(_Config) ->
@ -191,14 +207,18 @@ t_skip_failed_commit(_Config) ->
ct:sleep(180), ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(), {atomic, List1} = emqx_cluster_rpc:status(),
Node = node(), Node = node(),
?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}], ?assertEqual(
tnx_ids(List1)), [{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
tnx_ids(List1)
),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), {ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
2 = gen_server:call(?NODE2, skip_failed_commit, 5000), 2 = gen_server:call(?NODE2, skip_failed_commit, 5000),
{atomic, List2} = emqx_cluster_rpc:status(), {atomic, List2} = emqx_cluster_rpc:status(),
?assertEqual([{Node, 2}, {{Node, ?NODE2}, 2}, {{Node, ?NODE3}, 1}], ?assertEqual(
tnx_ids(List2)), [{Node, 2}, {{Node, ?NODE2}, 2}, {{Node, ?NODE3}, 1}],
tnx_ids(List2)
),
ok. ok.
t_fast_forward_commit(_Config) -> t_fast_forward_commit(_Config) ->
@ -208,8 +228,10 @@ t_fast_forward_commit(_Config) ->
ct:sleep(180), ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(), {atomic, List1} = emqx_cluster_rpc:status(),
Node = node(), Node = node(),
?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}], ?assertEqual(
tnx_ids(List1)), [{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
tnx_ids(List1)
),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), {ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 3, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), {ok, 3, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
@ -221,8 +243,10 @@ t_fast_forward_commit(_Config) ->
6 = gen_server:call(?NODE2, {fast_forward_to_commit, 7}, 5000), 6 = gen_server:call(?NODE2, {fast_forward_to_commit, 7}, 5000),
2 = gen_server:call(?NODE3, {fast_forward_to_commit, 2}, 5000), 2 = gen_server:call(?NODE3, {fast_forward_to_commit, 2}, 5000),
{atomic, List2} = emqx_cluster_rpc:status(), {atomic, List2} = emqx_cluster_rpc:status(),
?assertEqual([{Node, 6}, {{Node, ?NODE2}, 6}, {{Node, ?NODE3}, 2}], ?assertEqual(
tnx_ids(List2)), [{Node, 6}, {{Node, ?NODE2}, 6}, {{Node, ?NODE3}, 2}],
tnx_ids(List2)
),
ok. ok.
t_handler_unexpected_msg(_Config) -> t_handler_unexpected_msg(_Config) ->
@ -236,8 +260,14 @@ t_handler_unexpected_msg(_Config) ->
ok. ok.
tnx_ids(Status) -> tnx_ids(Status) ->
lists:sort(lists:map(fun(#{tnx_id := TnxId, node := Node}) -> lists:sort(
{Node, TnxId} end, Status)). lists:map(
fun(#{tnx_id := TnxId, node := Node}) ->
{Node, TnxId}
end,
Status
)
).
start() -> start() ->
{ok, Pid1} = emqx_cluster_rpc:start_link(), {ok, Pid1} = emqx_cluster_rpc:start_link(),
@ -248,19 +278,26 @@ start() ->
{ok, [Pid1, Pid2, Pid3, Pid4]}. {ok, [Pid1, Pid2, Pid3, Pid4]}.
stop() -> stop() ->
[begin [
case erlang:whereis(N) of begin
undefined -> ok; case erlang:whereis(N) of
P -> undefined ->
erlang:unlink(P), ok;
erlang:exit(P, kill) P ->
end end || N <- [?NODE1, ?NODE2, ?NODE3]], erlang:unlink(P),
erlang:exit(P, kill)
end
end
|| N <- [?NODE1, ?NODE2, ?NODE3]
],
gen_server:stop(emqx_cluster_rpc_handler, normal, 5000). gen_server:stop(emqx_cluster_rpc_handler, normal, 5000).
receive_msg(0, _Msg) -> ok; receive_msg(0, _Msg) ->
ok;
receive_msg(Count, Msg) when Count > 0 -> receive_msg(Count, Msg) when Count > 0 ->
receive Msg -> receive
receive_msg(Count - 1, Msg) Msg ->
receive_msg(Count - 1, Msg)
after 800 -> after 800 ->
timeout timeout
end. end.
@ -277,7 +314,8 @@ failed_on_node(Pid) ->
failed_on_node_by_odd(Pid) -> failed_on_node_by_odd(Pid) ->
case Pid =:= self() of case Pid =:= self() of
true -> ok; true ->
ok;
false -> false ->
catch ets:new(test, [named_table, set, public]), catch ets:new(test, [named_table, set, public]),
Num = ets:update_counter(test, self(), {2, 1}, {self(), 1}), Num = ets:update_counter(test, self(), {2, 1}, {self(), 1}),
@ -289,7 +327,8 @@ failed_on_node_by_odd(Pid) ->
failed_on_other_recover_after_retry(Pid) -> failed_on_other_recover_after_retry(Pid) ->
case Pid =:= self() of case Pid =:= self() of
true -> ok; true ->
ok;
false -> false ->
[{_, Res}] = ets:lookup(test, other_mfa_result), [{_, Res}] = ets:lookup(test, other_mfa_result),
Res Res

View File

@ -30,4 +30,3 @@ t_run_gc(_) ->
{ok, MilliSecs} = emqx_global_gc:run(), {ok, MilliSecs} = emqx_global_gc:run(),
ct:print("Global GC: ~w(ms)~n", [MilliSecs]), ct:print("Global GC: ~w(ms)~n", [MilliSecs]),
emqx_global_gc:stop(). emqx_global_gc:stop().

View File

@ -29,5 +29,7 @@ bf54f571fb8b27e76ada4ca75137d96ce4211d60
83511f8a4c1570a2c89d9c6c5b6f462520199ed8 83511f8a4c1570a2c89d9c6c5b6f462520199ed8
# reformat apps/emqx_psk # reformat apps/emqx_psk
b168102615e574df15ec6a91304747b4637a9171 b168102615e574df15ec6a91304747b4637a9171
# revormat apps/emqx_machine|emqx_plugin_libs|emqx_statsd # reformat apps/emqx_machine|emqx_plugin_libs|emqx_statsd
b4451823350ec46126c49ca915b4b169dd4cf49e b4451823350ec46126c49ca915b4b169dd4cf49e
# reformat apps/emqx_auto_subscribe and apps/emqx_conf
a4feb3e6e95c18cb531416112e57520c5ba00d40

View File

@ -16,6 +16,7 @@ APPS+=( 'apps/emqx_retainer' 'apps/emqx_slow_subs')
APPS+=( 'apps/emqx_management') APPS+=( 'apps/emqx_management')
APPS+=( 'apps/emqx_psk') APPS+=( 'apps/emqx_psk')
APPS+=( 'apps/emqx_plugin_libs' 'apps/emqx_machine' 'apps/emqx_statsd' ) APPS+=( 'apps/emqx_plugin_libs' 'apps/emqx_machine' 'apps/emqx_statsd' )
APPS+=( 'apps/emqx_auto_subscribe' 'apps/emqx_conf')
for app in "${APPS[@]}"; do for app in "${APPS[@]}"; do
echo "$app ..." echo "$app ..."