style: reformat emqx_auto_subscribe and emqx_conf

This commit is contained in:
Zaiming (Stone) Shi 2022-04-25 18:05:10 +02:00
parent c3c8e773e7
commit a4feb3e6e9
23 changed files with 1563 additions and 1037 deletions

View File

@ -120,21 +120,45 @@ t_listener_authenticator_import_users(_) ->
test_authenticator_import_users(["listeners", ?TCP_DEFAULT]).
t_aggregate_metrics(_) ->
Metrics = #{ 'emqx@node1.emqx.io' => #{metrics =>
#{failed => 0,matched => 1,rate => 0.0,
rate_last5m => 0.0,rate_max => 0.1,
success => 1}
Metrics = #{
'emqx@node1.emqx.io' => #{
metrics =>
#{
failed => 0,
matched => 1,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.1,
success => 1
}
},
'emqx@node2.emqx.io' => #{metrics =>
#{failed => 0,matched => 1,rate => 0.0,
rate_last5m => 0.0,rate_max => 0.1,
success => 1}
'emqx@node2.emqx.io' => #{
metrics =>
#{
failed => 0,
matched => 1,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.1,
success => 1
}
}
},
Res = emqx_authn_api:aggregate_metrics(maps:values(Metrics)),
?assertEqual(#{metrics =>
#{failed => 0,matched => 2,rate => 0.0,rate_last5m => 0.0,
rate_max => 0.2,success => 2}}, Res).
?assertEqual(
#{
metrics =>
#{
failed => 0,
matched => 2,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.2,
success => 2
}
},
Res
).
test_authenticators(PathPrefix) ->
ValidConfig = emqx_authn_test_lib:http_example(),

View File

@ -449,21 +449,45 @@ t_move_source(_) ->
ok.
t_aggregate_metrics(_) ->
Metrics = #{ 'emqx@node1.emqx.io' => #{metrics =>
#{failed => 0,matched => 1,rate => 0.0,
rate_last5m => 0.0,rate_max => 0.1,
success => 1}
Metrics = #{
'emqx@node1.emqx.io' => #{
metrics =>
#{
failed => 0,
matched => 1,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.1,
success => 1
}
},
'emqx@node2.emqx.io' => #{metrics =>
#{failed => 0,matched => 1,rate => 0.0,
rate_last5m => 0.0,rate_max => 0.1,
success => 1}
'emqx@node2.emqx.io' => #{
metrics =>
#{
failed => 0,
matched => 1,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.1,
success => 1
}
}
},
Res = emqx_authn_api:aggregate_metrics(maps:values(Metrics)),
?assertEqual(#{metrics =>
#{failed => 0,matched => 2,rate => 0.0,rate_last5m => 0.0,
rate_max => 0.2,success => 2}}, Res).
?assertEqual(
#{
metrics =>
#{
failed => 0,
matched => 2,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.2,
success => 2
}
},
Res
).
get_sources(Result) ->
maps:get(<<"sources">>, jsx:decode(Result), []).

View File

@ -1,9 +1,10 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [ {emqx, {path, "../emqx"}}
]}.
{deps, [{emqx, {path, "../emqx"}}]}.
{shell, [
{apps, [emqx_auto_subscribe]}
]}.
{project_plugins, [erlfmt]}.

View File

@ -18,10 +18,11 @@
-behaviour(minirest_api).
-export([ api_spec/0
, paths/0
, schema/1
]).
-export([
api_spec/0,
paths/0,
schema/1
]).
-export([auto_subscribe/2]).
@ -54,7 +55,8 @@ schema("/mqtt/auto_subscribe") ->
200 => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe"),
409 => emqx_dashboard_swagger:error_codes(
[?EXCEED_LIMIT],
?DESC(update_auto_subscribe_api_response409))
?DESC(update_auto_subscribe_api_response409)
)
}
}
}.
@ -63,14 +65,17 @@ schema("/mqtt/auto_subscribe") ->
%% api apply
auto_subscribe(get, _) ->
{200, emqx_auto_subscribe:list()};
auto_subscribe(put, #{body := #{}}) ->
{400, #{code => ?BAD_REQUEST, message => <<"Request body required">>}};
auto_subscribe(put, #{body := Params}) ->
case emqx_auto_subscribe:update(Params) of
{error, quota_exceeded} ->
Message = list_to_binary(io_lib:format("Max auto subscribe topic count is ~p",
[emqx_auto_subscribe:max_limit()])),
Message = list_to_binary(
io_lib:format(
"Max auto subscribe topic count is ~p",
[emqx_auto_subscribe:max_limit()]
)
),
{409, #{code => ?EXCEED_LIMIT, message => Message}};
{error, Reason} ->
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),

View File

@ -22,28 +22,36 @@
-export([to_topic_table/3]).
-spec(generate(list() | map()) -> list() | map()).
-spec generate(list() | map()) -> list() | map().
generate(Topics) when is_list(Topics) ->
[generate(Topic) || Topic <- Topics];
generate(T = #{topic := Topic}) ->
T#{placeholder => generate(Topic, [])}.
-spec(to_topic_table(list(), map(), map()) -> list()).
-spec to_topic_table(list(), map(), map()) -> list().
to_topic_table(PHs, ClientInfo, ConnInfo) ->
Fold = fun(#{qos := Qos, rh := RH, rap := RAP, nl := NL,
placeholder := PlaceHolder, topic := RawTopic
Fold = fun(
#{
qos := Qos,
rh := RH,
rap := RAP,
nl := NL,
placeholder := PlaceHolder,
topic := RawTopic
},
Acc) ->
Acc
) ->
case to_topic(PlaceHolder, ClientInfo, ConnInfo, []) of
{error, Reason} ->
?SLOG(warning, #{msg => "auto_subscribe_ignored",
?SLOG(warning, #{
msg => "auto_subscribe_ignored",
topic => RawTopic,
reason => Reason
}),
Acc;
<<>> ->
?SLOG(warning, #{msg => "auto_subscribe_ignored",
?SLOG(warning, #{
msg => "auto_subscribe_ignored",
topic => RawTopic,
reason => empty_topic
}),

View File

@ -21,10 +21,12 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/emqx_placeholder.hrl").
-export([ namespace/0
, roots/0
, fields/1
, desc/1]).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
namespace() -> "auto_subscribe".
@ -32,27 +34,41 @@ roots() ->
["auto_subscribe"].
fields("auto_subscribe") ->
[ {topics, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "topic")),
#{desc => ?DESC(auto_subscribe)})}
[
{topics,
hoconsc:mk(
hoconsc:array(hoconsc:ref(?MODULE, "topic")),
#{desc => ?DESC(auto_subscribe)}
)}
];
fields("topic") ->
[ {topic, sc(binary(), #{
[
{topic,
sc(binary(), #{
required => true,
example => topic_example(),
desc => ?DESC("topic")})}
, {qos, sc(emqx_schema:qos(), #{
desc => ?DESC("topic")
})},
{qos,
sc(emqx_schema:qos(), #{
default => 0,
desc => ?DESC("qos")})}
, {rh, sc(range(0,2), #{
desc => ?DESC("qos")
})},
{rh,
sc(range(0, 2), #{
default => 0,
desc => ?DESC("rh")})}
, {rap, sc(range(0, 1), #{
desc => ?DESC("rh")
})},
{rap,
sc(range(0, 1), #{
default => 0,
desc => ?DESC("rap")})}
, {nl, sc(range(0, 1), #{
desc => ?DESC("rap")
})},
{nl,
sc(range(0, 1), #{
default => 0,
desc => ?DESC(nl)})}
desc => ?DESC(nl)
})}
].
desc("auto_subscribe") -> ?DESC("auto_subscribe");
@ -60,10 +76,8 @@ desc("topic") -> ?DESC("topic");
desc(_) -> undefined.
topic_example() ->
<<"/clientid/", ?PH_S_CLIENTID,
"/username/", ?PH_S_USERNAME,
"/host/", ?PH_S_HOST,
"/port/", ?PH_S_PORT>>.
<<"/clientid/", ?PH_S_CLIENTID, "/username/", ?PH_S_USERNAME, "/host/", ?PH_S_HOST, "/port/",
?PH_S_PORT>>.
%%--------------------------------------------------------------------
%% Internal functions

View File

@ -28,9 +28,11 @@ start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
SupFlags = #{strategy => one_for_all,
SupFlags = #{
strategy => one_for_all,
intensity => 0,
period => 1},
period => 1
},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.

View File

@ -17,13 +17,12 @@
-export([init/1]).
-spec(init(hocons:config()) -> {Module :: atom(), Config :: term()}).
-spec init(hocons:config()) -> {Module :: atom(), Config :: term()}.
init(Config) ->
do_init(Config).
do_init(Config = #{topics := _Topics}) ->
Options = emqx_auto_subscribe_internal:init(Config),
{emqx_auto_subscribe_internal, Options};
do_init(_Config) ->
erlang:error(not_supported).

View File

@ -19,11 +19,11 @@
-export([handle/3]).
-spec(init(Config :: map()) -> HandlerOptions :: term()).
-spec init(Config :: map()) -> HandlerOptions :: term().
init(#{topics := Topics}) ->
emqx_auto_subscribe_placeholder:generate(Topics).
-spec(handle(ClientInfo :: map(), ConnInfo :: map(), HandlerOptions :: term()) ->
TopicTables :: list()).
-spec handle(ClientInfo :: map(), ConnInfo :: map(), HandlerOptions :: term()) ->
TopicTables :: list().
handle(ClientInfo, ConnInfo, PlaceHolders) ->
emqx_auto_subscribe_placeholder:to_topic_table(PlaceHolders, ClientInfo, ConnInfo).

View File

@ -64,7 +64,8 @@ init_per_suite(Config) ->
application:load(?APP),
ok = emqx_common_test_helpers:load_config(
emqx_auto_subscribe_schema,
<<"auto_subscribe {\n"
<<
"auto_subscribe {\n"
" topics = [\n"
" {\n"
" topic = \"/c/${clientid}\"\n"
@ -89,7 +90,8 @@ init_per_suite(Config) ->
" nl = 0\n"
" }\n"
" ]\n"
" }">>
" }"
>>
),
emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_dashboard, ?APP],

View File

@ -1,4 +1,3 @@
-ifndef(EMQX_CONF_HRL).
-define(EMQX_CONF_HRL, true).

View File

@ -1,10 +1,11 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [ {emqx, {path, "../emqx"}}
]}.
{deps, [{emqx, {path, "../emqx"}}]}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx_conf]}
]}.
{project_plugins, [erlfmt]}.

View File

@ -22,12 +22,25 @@
%% Note: multicall functions are statically checked by
%% `emqx_bapi_trans' and `emqx_bpapi_static_checks' modules. Don't
%% forget to update it when adding or removing them here:
-export([multicall/3, multicall/5, query/1, reset/0, status/0,
skip_failed_commit/1, fast_forward_to_commit/2]).
-export([
multicall/3, multicall/5,
query/1,
reset/0,
status/0,
skip_failed_commit/1,
fast_forward_to_commit/2
]).
-export([get_node_tnx_id/1, latest_tnx_id/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
handle_continue/2, code_change/3]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
handle_continue/2,
code_change/3
]).
-export_type([txn_id/0, succeed_num/0, multicall_return/1, multicall_return/0]).
@ -48,7 +61,8 @@
-type succeed_num() :: pos_integer() | all.
-type multicall_return(Result) :: {ok, txn_id(), Result}
-type multicall_return(Result) ::
{ok, txn_id(), Result}
| {error, term()}
| {retry, txn_id(), Result, node()}.
@ -63,13 +77,15 @@ mnesia(boot) ->
{rlog_shard, ?CLUSTER_RPC_SHARD},
{storage, disc_copies},
{record_name, cluster_rpc_mfa},
{attributes, record_info(fields, cluster_rpc_mfa)}]),
{attributes, record_info(fields, cluster_rpc_mfa)}
]),
ok = mria:create_table(?CLUSTER_COMMIT, [
{type, set},
{rlog_shard, ?CLUSTER_RPC_SHARD},
{storage, disc_copies},
{record_name, cluster_rpc_commit},
{attributes, record_info(fields, cluster_rpc_commit)}]).
{attributes, record_info(fields, cluster_rpc_commit)}
]).
start_link() ->
start_link(node(), ?MODULE, get_retry_ms()).
@ -97,7 +113,8 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu
Begin = erlang:monotonic_time(),
InitRes =
case mria_rlog:role() of
core -> gen_server:call(?MODULE, MFA, Timeout);
core ->
gen_server:call(?MODULE, MFA, Timeout);
replicant ->
%% the initiate transaction must happened on core node
%% make sure MFA(in the transaction) and the transaction on the same node
@ -119,11 +136,14 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu
wait_for_all_nodes_commit(TnxId, MinDelay, RetryTimeout);
{ok, TnxId, _} when is_integer(RequireNum) ->
wait_for_nodes_commit(RequireNum, TnxId, MinDelay, RetryTimeout);
Error -> Error
Error ->
Error
end,
case OkOrFailed of
ok -> InitRes;
{error, Error0} -> {error, Error0};
ok ->
InitRes;
{error, Error0} ->
{error, Error0};
{retry, Node0} ->
{ok, TnxId0, MFARes} = InitRes,
{retry, TnxId0, MFARes, Node0}
@ -187,7 +207,6 @@ handle_call(reset, _From, State) ->
_ = mria:clear_table(?CLUSTER_COMMIT),
_ = mria:clear_table(?CLUSTER_MFA),
{reply, ok, State, {continue, ?CATCH_UP}};
handle_call({initiate, MFA}, _From, State = #{node := Node}) ->
case transaction(fun init_mfa/2, [Node, MFA]) of
{atomic, {ok, TnxId, Result}} ->
@ -226,21 +245,25 @@ catch_up(State) -> catch_up(State, false).
catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
case transaction(fun read_next_mfa/1, [Node]) of
{atomic, caught_up} -> ?TIMEOUT;
{atomic, caught_up} ->
?TIMEOUT;
{atomic, {still_lagging, NextId, MFA}} ->
{Succeed, _} = apply_mfa(NextId, MFA),
case Succeed orelse SkipResult of
true ->
case transaction(fun commit/2, [Node, NextId]) of
{atomic, ok} -> catch_up(State, false);
{atomic, ok} ->
catch_up(State, false);
Error ->
?SLOG(error, #{
msg => "failed_to_commit_applied_call",
applied_id => NextId,
error => Error}),
error => Error
}),
RetryMs
end;
false -> RetryMs
false ->
RetryMs
end;
{aborted, Reason} ->
?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}),
@ -256,9 +279,12 @@ read_next_mfa(Node) ->
commit(Node, TnxId),
?SLOG(notice, #{
msg => "new_node_first_catch_up_and_start_commit.",
node => Node, tnx_id => TnxId}),
node => Node,
tnx_id => TnxId
}),
TnxId;
[#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1
[#cluster_rpc_commit{tnx_id = LastAppliedID}] ->
LastAppliedID + 1
end,
case mnesia:read(?CLUSTER_MFA, NextId) of
[] -> caught_up;
@ -281,8 +307,11 @@ do_catch_up(ToTnxId, Node) ->
end;
[#cluster_rpc_commit{tnx_id = LastAppliedId}] ->
Reason = lists:flatten(
io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
[Node, LastAppliedId, ToTnxId])),
io_lib:format(
"~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
[Node, LastAppliedId, ToTnxId]
)
),
?SLOG(error, #{
msg => "catch_up_failed!",
last_applied_id => LastAppliedId,
@ -297,11 +326,13 @@ commit(Node, TnxId) ->
do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) ->
{atomic, NodeId} = transaction(fun get_node_tnx_id/1, [Node]),
case NodeId >= ToTnxId of
true -> NodeId;
true ->
NodeId;
false ->
{atomic, LatestId} = transaction(fun get_latest_id/0, []),
case LatestId =< NodeId of
true -> NodeId;
true ->
NodeId;
false ->
catch_up(State, true),
do_fast_forward_to_commit(ToTnxId, State)
@ -319,8 +350,12 @@ init_mfa(Node, MFA) ->
LatestId = get_latest_id(),
ok = do_catch_up_in_one_trans(LatestId, Node),
TnxId = LatestId + 1,
MFARec = #cluster_rpc_mfa{tnx_id = TnxId, mfa = MFA,
initiator = Node, created_at = erlang:localtime()},
MFARec = #cluster_rpc_mfa{
tnx_id = TnxId,
mfa = MFA,
initiator = Node,
created_at = erlang:localtime()
},
ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
ok = commit(Node, TnxId),
case apply_mfa(TnxId, MFA) of
@ -338,24 +373,35 @@ transaction(Func, Args) ->
mria:transaction(?CLUSTER_RPC_SHARD, Func, Args).
trans_status() ->
mnesia:foldl(fun(Rec, Acc) ->
mnesia:foldl(
fun(Rec, Acc) ->
#cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec,
case mnesia:read(?CLUSTER_MFA, TnxId) of
[MFARec] ->
#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = MFARec,
[#{
#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} =
MFARec,
[
#{
node => Node,
tnx_id => TnxId,
initiator => InitNode,
mfa => MFA,
created_at => CreatedAt
} | Acc];
[] -> Acc
end end, [], ?CLUSTER_COMMIT).
}
| Acc
];
[] ->
Acc
end
end,
[],
?CLUSTER_COMMIT
).
trans_query(TnxId) ->
case mnesia:read(?CLUSTER_MFA, TnxId) of
[] -> mnesia:abort(not_found);
[] ->
mnesia:abort(not_found);
[#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] ->
#{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt}
end.
@ -364,11 +410,12 @@ trans_query(TnxId) ->
apply_mfa(TnxId, {M, F, A}) ->
Res =
try erlang:apply(M, F, A)
try
erlang:apply(M, F, A)
catch
throw : Reason ->
throw:Reason ->
{error, #{reason => Reason}};
Class : Reason : Stacktrace ->
Class:Reason:Stacktrace ->
{error, #{exception => Class, reason => Reason, stacktrace => Stacktrace}}
end,
%% Do not log args as it might be sensitive information
@ -400,19 +447,23 @@ wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
[_ | _] when Remain > 0 ->
ok = timer:sleep(Delay),
wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay);
[] -> ok;
Nodes -> {retry, Nodes}
[] ->
ok;
Nodes ->
{retry, Nodes}
end.
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) ->
ok = timer:sleep(Delay),
case length(synced_nodes(TnxId)) >= RequiredNum of
true -> ok;
true ->
ok;
false when Remain > 0 ->
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay);
false ->
case lagging_node(TnxId) of
[] -> ok; %% All commit but The succeedNum > length(nodes()).
%% All commit but The succeedNum > length(nodes()).
[] -> ok;
Nodes -> {retry, Nodes}
end
end.
@ -434,7 +485,7 @@ commit_status_trans(Operator, TnxId) ->
get_retry_ms() ->
emqx_conf:get(["node", "cluster_call", "retry_interval"], 1000).
maybe_init_tnx_id(_Node, TnxId)when TnxId < 0 -> ok;
maybe_init_tnx_id(_Node, TnxId) when TnxId < 0 -> ok;
maybe_init_tnx_id(Node, TnxId) ->
{atomic, _} = transaction(fun init_node_tnx_id/2, [Node, TnxId]),
ok.

View File

@ -21,12 +21,18 @@
-include("emqx_conf.hrl").
-export([start_link/0, start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
start_link() ->
MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100),
CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5*60*1000),
CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5 * 60 * 1000),
start_link(MaxHistory, CleanupMs).
start_link(MaxHistory, CleanupMs) ->
@ -55,7 +61,6 @@ handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history
Error -> ?SLOG(error, #{msg => "del_stale_cluster_rpc_mfa_error", error => Error})
end,
{noreply, ensure_timer(State), hibernate};
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}.
@ -75,11 +80,15 @@ ensure_timer(State = #{cleanup_ms := Ms}) ->
%% @doc Keep the latest completed 100 records for querying and troubleshooting.
del_stale_mfa(MaxHistory) ->
DoneId =
mnesia:foldl(fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end,
infinity, ?CLUSTER_COMMIT),
mnesia:foldl(
fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end,
infinity,
?CLUSTER_COMMIT
),
delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory).
delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok;
delete_stale_mfa('$end_of_table', _DoneId, _Count) ->
ok;
delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId ->
delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count);
delete_stale_mfa(CurrId, DoneId, Count) when Count > 0 ->

View File

@ -1,9 +1,9 @@
{application, emqx_conf,
[{description, "EMQX configuration management"},
{application, emqx_conf, [
{description, "EMQX configuration management"},
{vsn, "0.1.0"},
{registered, []},
{mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib]},
{env, []},
{modules, []}
]}.
]}.

View File

@ -40,7 +40,8 @@ init_conf() ->
copy_override_conf_from_core_node() ->
case mria_mnesia:running_nodes() -- [node()] of
[] -> %% The first core nodes is self.
%% The first core nodes is self.
[] ->
?SLOG(debug, #{msg => "skip_copy_overide_conf_from_core_node"}),
{ok, -1};
Nodes ->
@ -49,26 +50,41 @@ copy_override_conf_from_core_node() ->
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
case (Failed =/= [] orelse NotReady =/= []) andalso Ready =/= [] of
true ->
Warning = #{nodes => Nodes, failed => Failed, not_ready => NotReady,
msg => "ignored_bad_nodes_when_copy_init_config"},
Warning = #{
nodes => Nodes,
failed => Failed,
not_ready => NotReady,
msg => "ignored_bad_nodes_when_copy_init_config"
},
?SLOG(warning, Warning);
false -> ok
false ->
ok
end,
case Ready of
[] ->
%% Other core nodes running but no one replicated it successfully.
?SLOG(error, #{msg => "copy_overide_conf_from_core_node_failed",
nodes => Nodes, failed => Failed, not_ready => NotReady}),
?SLOG(error, #{
msg => "copy_overide_conf_from_core_node_failed",
nodes => Nodes,
failed => Failed,
not_ready => NotReady
}),
{error, "core node not ready"};
_ ->
SortFun = fun({ok, #{wall_clock := W1}},
{ok, #{wall_clock := W2}}) -> W1 > W2 end,
SortFun = fun(
{ok, #{wall_clock := W1}},
{ok, #{wall_clock := W2}}
) ->
W1 > W2
end,
[{ok, Info} | _] = lists:sort(SortFun, Ready),
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
Msg = #{msg => "copy_overide_conf_from_core_node_success", node => Node},
?SLOG(debug, Msg),
ok = emqx_config:save_to_override_conf(RawOverrideConf,
#{override_to => cluster}),
ok = emqx_config:save_to_override_conf(
RawOverrideConf,
#{override_to => cluster}
),
{ok, TnxId}
end
end.
@ -77,10 +93,12 @@ get_override_config_file() ->
Node = node(),
Role = mria_rlog:role(),
case emqx_app:get_init_config_load_done() of
false -> {error, #{node => Node, msg => "init_conf_load_not_done"}};
false ->
{error, #{node => Node, msg => "init_conf_load_not_done"}};
true when Role =:= core ->
case erlang:whereis(emqx_config_handler) of
undefined -> {error, #{node => Node, msg => "emqx_config_handler_not_ready"}};
undefined ->
{error, #{node => Node, msg => "emqx_config_handler_not_ready"}};
_ ->
Fun = fun() ->
TnxId = emqx_cluster_rpc:get_node_tnx_id(Node),

View File

@ -15,10 +15,11 @@
%%--------------------------------------------------------------------
-module(emqx_conf_cli).
-export([ load/0
, admins/1
, unload/0
]).
-export([
load/0,
admins/1,
unload/0
]).
-define(CMD, cluster_call).
@ -28,65 +29,65 @@ load() ->
unload() ->
emqx_ctl:unregister_command(?CMD).
admins(["status"]) -> status();
admins(["status"]) ->
status();
admins(["skip"]) ->
status(),
Nodes = mria_mnesia:running_nodes(),
lists:foreach(fun emqx_cluster_rpc:skip_failed_commit/1, Nodes),
status();
admins(["skip", Node0]) ->
status(),
Node = list_to_existing_atom(Node0),
emqx_cluster_rpc:skip_failed_commit(Node),
status();
admins(["tnxid", TnxId0]) ->
TnxId = list_to_integer(TnxId0),
emqx_ctl:print("~p~n", [emqx_cluster_rpc:query(TnxId)]);
admins(["fast_forward"]) ->
status(),
Nodes = mria_mnesia:running_nodes(),
TnxId = emqx_cluster_rpc:latest_tnx_id(),
lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes),
status();
admins(["fast_forward", ToTnxId]) ->
status(),
Nodes = mria_mnesia:running_nodes(),
TnxId = list_to_integer(ToTnxId),
lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes),
status();
admins(["fast_forward", Node0, ToTnxId]) ->
status(),
TnxId = list_to_integer(ToTnxId),
Node = list_to_existing_atom(Node0),
emqx_cluster_rpc:fast_forward_to_commit(Node, TnxId),
status();
admins(_) ->
emqx_ctl:usage(
[
{"cluster_call status", "status"},
{"cluster_call skip [node]", "increase one commit on specific node"},
{"cluster_call tnxid <TnxId>", "get detailed about TnxId"},
{"cluster_call fast_forward [node] [tnx_id]", "fast forwards to tnx_id" }
]).
{"cluster_call fast_forward [node] [tnx_id]", "fast forwards to tnx_id"}
]
).
status() ->
emqx_ctl:print("-----------------------------------------------\n"),
{atomic, Status} = emqx_cluster_rpc:status(),
lists:foreach(fun(S) ->
lists:foreach(
fun(S) ->
#{
node := Node,
tnx_id := TnxId,
mfa := {M, F, A},
created_at := CreatedAt
} = S,
emqx_ctl:print("~p:[~w] CreatedAt:~p ~p:~p/~w\n",
[Node, TnxId, CreatedAt, M, F, length(A)])
end, Status),
emqx_ctl:print(
"~p:[~w] CreatedAt:~p ~p:~p/~w\n",
[Node, TnxId, CreatedAt, M, F, length(A)]
)
end,
Status
),
emqx_ctl:print("-----------------------------------------------\n").

File diff suppressed because it is too large Load Diff

View File

@ -28,12 +28,15 @@ start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
SupFlags = #{strategy => one_for_all,
SupFlags = #{
strategy => one_for_all,
intensity => 10,
period => 100},
period => 100
},
ChildSpecs =
[ child_spec(emqx_cluster_rpc, [])
, child_spec(emqx_cluster_rpc_handler, [])
[
child_spec(emqx_cluster_rpc, []),
child_spec(emqx_cluster_rpc_handler, [])
],
{ok, {SupFlags, ChildSpecs}}.

View File

@ -18,22 +18,23 @@
-behaviour(emqx_bpapi).
-export([ introduced_in/0
-export([
introduced_in/0,
, get_config/2
, get_config/3
, get_all/1
get_config/2,
get_config/3,
get_all/1,
, update/3
, update/4
, remove_config/2
, remove_config/3
update/3,
update/4,
remove_config/2,
remove_config/3,
, reset/2
, reset/3
reset/2,
reset/3,
, get_override_config_file/1
]).
get_override_config_file/1
]).
-include_lib("emqx/include/bpapi.hrl").
@ -56,13 +57,20 @@ get_config(Node, KeyPath, Default) ->
get_all(KeyPath) ->
rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000).
-spec update(update_config_key_path(), emqx_config:update_request(),
emqx_config:update_opts()) -> emqx_cluster_rpc:multicall_return().
-spec update(
update_config_key_path(),
emqx_config:update_request(),
emqx_config:update_opts()
) -> emqx_cluster_rpc:multicall_return().
update(KeyPath, UpdateReq, Opts) ->
emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]).
-spec update(node(), update_config_key_path(), emqx_config:update_request(),
emqx_config:update_opts()) ->
-spec update(
node(),
update_config_key_path(),
emqx_config:update_request(),
emqx_config:update_opts()
) ->
{ok, emqx_config:update_result()}
| {error, emqx_config:update_error()}
| emqx_rpc:badrpc().

View File

@ -26,7 +26,8 @@
-define(NODE2, emqx_cluster_rpc2).
-define(NODE3, emqx_cluster_rpc3).
all() -> [
all() ->
[
t_base_test,
t_commit_fail_test,
t_commit_crash_test,
@ -35,7 +36,7 @@ all() -> [
t_del_stale_mfa,
t_skip_failed_commit,
t_fast_forward_commit
].
].
suite() -> [{timetrap, {minutes, 3}}].
groups() -> [].
@ -77,7 +78,8 @@ t_base_test(_Config) ->
?assertEqual({ok, 2, ok}, emqx_cluster_rpc:multicall(M, F, A)),
{atomic, Status} = emqx_cluster_rpc:status(),
case length(Status) =:= 3 of
true -> ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status));
true ->
?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status));
false ->
%% wait for mnesia to write in.
ct:sleep(42),
@ -139,7 +141,7 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
{ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
ct:pal("222:~p~n", [ets:tab2list(cluster_rpc_commit)]),
ct:pal("333:~p~n", [emqx_cluster_rpc:status()]),
{atomic, [_Status|L]} = emqx_cluster_rpc:status(),
{atomic, [_Status | L]} = emqx_cluster_rpc:status(),
?assertEqual([], L),
ets:insert(test, {other_mfa_result, ok}),
{ok, 2, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000),
@ -163,25 +165,39 @@ t_del_stale_mfa(_Config) ->
Keys = lists:seq(1, 50),
Keys2 = lists:seq(51, 150),
Ids =
[begin
[
begin
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
TnxId end || _ <- Keys],
TnxId
end
|| _ <- Keys
],
?assertEqual(Keys, Ids),
Ids2 =
[begin
[
begin
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
TnxId end || _ <- Keys2],
TnxId
end
|| _ <- Keys2
],
?assertEqual(Keys2, Ids2),
ct:sleep(1200),
[begin
[
begin
?assertEqual({aborted, not_found}, emqx_cluster_rpc:query(I))
end || I <- lists:seq(1, 50)],
[begin
end
|| I <- lists:seq(1, 50)
],
[
begin
{atomic, Map} = emqx_cluster_rpc:query(I),
?assertEqual(MFA, maps:get(mfa, Map)),
?assertEqual(node(), maps:get(initiator, Map)),
?assert(maps:is_key(created_at, Map))
end || I <- lists:seq(51, 150)],
end
|| I <- lists:seq(51, 150)
],
ok.
t_skip_failed_commit(_Config) ->
@ -191,14 +207,18 @@ t_skip_failed_commit(_Config) ->
ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
tnx_ids(List1)),
?assertEqual(
[{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
tnx_ids(List1)
),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
2 = gen_server:call(?NODE2, skip_failed_commit, 5000),
{atomic, List2} = emqx_cluster_rpc:status(),
?assertEqual([{Node, 2}, {{Node, ?NODE2}, 2}, {{Node, ?NODE3}, 1}],
tnx_ids(List2)),
?assertEqual(
[{Node, 2}, {{Node, ?NODE2}, 2}, {{Node, ?NODE3}, 1}],
tnx_ids(List2)
),
ok.
t_fast_forward_commit(_Config) ->
@ -208,8 +228,10 @@ t_fast_forward_commit(_Config) ->
ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
tnx_ids(List1)),
?assertEqual(
[{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
tnx_ids(List1)
),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 3, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
@ -221,8 +243,10 @@ t_fast_forward_commit(_Config) ->
6 = gen_server:call(?NODE2, {fast_forward_to_commit, 7}, 5000),
2 = gen_server:call(?NODE3, {fast_forward_to_commit, 2}, 5000),
{atomic, List2} = emqx_cluster_rpc:status(),
?assertEqual([{Node, 6}, {{Node, ?NODE2}, 6}, {{Node, ?NODE3}, 2}],
tnx_ids(List2)),
?assertEqual(
[{Node, 6}, {{Node, ?NODE2}, 6}, {{Node, ?NODE3}, 2}],
tnx_ids(List2)
),
ok.
t_handler_unexpected_msg(_Config) ->
@ -236,8 +260,14 @@ t_handler_unexpected_msg(_Config) ->
ok.
tnx_ids(Status) ->
lists:sort(lists:map(fun(#{tnx_id := TnxId, node := Node}) ->
{Node, TnxId} end, Status)).
lists:sort(
lists:map(
fun(#{tnx_id := TnxId, node := Node}) ->
{Node, TnxId}
end,
Status
)
).
start() ->
{ok, Pid1} = emqx_cluster_rpc:start_link(),
@ -248,18 +278,25 @@ start() ->
{ok, [Pid1, Pid2, Pid3, Pid4]}.
stop() ->
[begin
[
begin
case erlang:whereis(N) of
undefined -> ok;
undefined ->
ok;
P ->
erlang:unlink(P),
erlang:exit(P, kill)
end end || N <- [?NODE1, ?NODE2, ?NODE3]],
end
end
|| N <- [?NODE1, ?NODE2, ?NODE3]
],
gen_server:stop(emqx_cluster_rpc_handler, normal, 5000).
receive_msg(0, _Msg) -> ok;
receive_msg(0, _Msg) ->
ok;
receive_msg(Count, Msg) when Count > 0 ->
receive Msg ->
receive
Msg ->
receive_msg(Count - 1, Msg)
after 800 ->
timeout
@ -277,7 +314,8 @@ failed_on_node(Pid) ->
failed_on_node_by_odd(Pid) ->
case Pid =:= self() of
true -> ok;
true ->
ok;
false ->
catch ets:new(test, [named_table, set, public]),
Num = ets:update_counter(test, self(), {2, 1}, {self(), 1}),
@ -289,7 +327,8 @@ failed_on_node_by_odd(Pid) ->
failed_on_other_recover_after_retry(Pid) ->
case Pid =:= self() of
true -> ok;
true ->
ok;
false ->
[{_, Res}] = ets:lookup(test, other_mfa_result),
Res

View File

@ -30,4 +30,3 @@ t_run_gc(_) ->
{ok, MilliSecs} = emqx_global_gc:run(),
ct:print("Global GC: ~w(ms)~n", [MilliSecs]),
emqx_global_gc:stop().

View File

@ -16,6 +16,7 @@ APPS+=( 'apps/emqx_retainer' 'apps/emqx_slow_subs')
APPS+=( 'apps/emqx_management')
APPS+=( 'apps/emqx_psk')
APPS+=( 'apps/emqx_plugin_libs' 'apps/emqx_machine' 'apps/emqx_statsd' )
APPS+=( 'apps/emqx_auto_subscribe' 'apps/emqx_conf')
for app in "${APPS[@]}"; do
echo "$app ..."