diff --git a/apps/emqx_authn/test/emqx_authn_api_SUITE.erl b/apps/emqx_authn/test/emqx_authn_api_SUITE.erl index d7342185a..21b53bb1f 100644 --- a/apps/emqx_authn/test/emqx_authn_api_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_api_SUITE.erl @@ -120,21 +120,45 @@ t_listener_authenticator_import_users(_) -> test_authenticator_import_users(["listeners", ?TCP_DEFAULT]). t_aggregate_metrics(_) -> - Metrics = #{ 'emqx@node1.emqx.io' => #{metrics => - #{failed => 0,matched => 1,rate => 0.0, - rate_last5m => 0.0,rate_max => 0.1, - success => 1} - }, - 'emqx@node2.emqx.io' => #{metrics => - #{failed => 0,matched => 1,rate => 0.0, - rate_last5m => 0.0,rate_max => 0.1, - success => 1} - } - }, + Metrics = #{ + 'emqx@node1.emqx.io' => #{ + metrics => + #{ + failed => 0, + matched => 1, + rate => 0.0, + rate_last5m => 0.0, + rate_max => 0.1, + success => 1 + } + }, + 'emqx@node2.emqx.io' => #{ + metrics => + #{ + failed => 0, + matched => 1, + rate => 0.0, + rate_last5m => 0.0, + rate_max => 0.1, + success => 1 + } + } + }, Res = emqx_authn_api:aggregate_metrics(maps:values(Metrics)), - ?assertEqual(#{metrics => - #{failed => 0,matched => 2,rate => 0.0,rate_last5m => 0.0, - rate_max => 0.2,success => 2}}, Res). + ?assertEqual( + #{ + metrics => + #{ + failed => 0, + matched => 2, + rate => 0.0, + rate_last5m => 0.0, + rate_max => 0.2, + success => 2 + } + }, + Res + ). test_authenticators(PathPrefix) -> ValidConfig = emqx_authn_test_lib:http_example(), diff --git a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl index a41f98d27..308e67d53 100644 --- a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl @@ -449,21 +449,45 @@ t_move_source(_) -> ok. t_aggregate_metrics(_) -> - Metrics = #{ 'emqx@node1.emqx.io' => #{metrics => - #{failed => 0,matched => 1,rate => 0.0, - rate_last5m => 0.0,rate_max => 0.1, - success => 1} - }, - 'emqx@node2.emqx.io' => #{metrics => - #{failed => 0,matched => 1,rate => 0.0, - rate_last5m => 0.0,rate_max => 0.1, - success => 1} - } - }, + Metrics = #{ + 'emqx@node1.emqx.io' => #{ + metrics => + #{ + failed => 0, + matched => 1, + rate => 0.0, + rate_last5m => 0.0, + rate_max => 0.1, + success => 1 + } + }, + 'emqx@node2.emqx.io' => #{ + metrics => + #{ + failed => 0, + matched => 1, + rate => 0.0, + rate_last5m => 0.0, + rate_max => 0.1, + success => 1 + } + } + }, Res = emqx_authn_api:aggregate_metrics(maps:values(Metrics)), - ?assertEqual(#{metrics => - #{failed => 0,matched => 2,rate => 0.0,rate_last5m => 0.0, - rate_max => 0.2,success => 2}}, Res). + ?assertEqual( + #{ + metrics => + #{ + failed => 0, + matched => 2, + rate => 0.0, + rate_last5m => 0.0, + rate_max => 0.2, + success => 2 + } + }, + Res + ). get_sources(Result) -> maps:get(<<"sources">>, jsx:decode(Result), []). diff --git a/apps/emqx_auto_subscribe/rebar.config b/apps/emqx_auto_subscribe/rebar.config index 84fadddbc..33e077f50 100644 --- a/apps/emqx_auto_subscribe/rebar.config +++ b/apps/emqx_auto_subscribe/rebar.config @@ -1,9 +1,10 @@ %% -*- mode: erlang -*- {erl_opts, [debug_info]}. -{deps, [ {emqx, {path, "../emqx"}} - ]}. +{deps, [{emqx, {path, "../emqx"}}]}. {shell, [ {apps, [emqx_auto_subscribe]} ]}. + +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl index 48ec92fb4..d33861b09 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl @@ -18,10 +18,11 @@ -behaviour(minirest_api). --export([ api_spec/0 - , paths/0 - , schema/1 - ]). +-export([ + api_spec/0, + paths/0, + schema/1 +]). -export([auto_subscribe/2]). @@ -45,8 +46,8 @@ schema("/mqtt/auto_subscribe") -> description => ?DESC(list_auto_subscribe_api), responses => #{ 200 => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe") - } - }, + } + }, put => #{ description => ?DESC(update_auto_subscribe_api), 'requestBody' => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe"), @@ -54,23 +55,27 @@ schema("/mqtt/auto_subscribe") -> 200 => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe"), 409 => emqx_dashboard_swagger:error_codes( [?EXCEED_LIMIT], - ?DESC(update_auto_subscribe_api_response409)) - } + ?DESC(update_auto_subscribe_api_response409) + ) } + } }. %%%============================================================================================== %% api apply auto_subscribe(get, _) -> {200, emqx_auto_subscribe:list()}; - auto_subscribe(put, #{body := #{}}) -> {400, #{code => ?BAD_REQUEST, message => <<"Request body required">>}}; auto_subscribe(put, #{body := Params}) -> case emqx_auto_subscribe:update(Params) of {error, quota_exceeded} -> - Message = list_to_binary(io_lib:format("Max auto subscribe topic count is ~p", - [emqx_auto_subscribe:max_limit()])), + Message = list_to_binary( + io_lib:format( + "Max auto subscribe topic count is ~p", + [emqx_auto_subscribe:max_limit()] + ) + ), {409, #{code => ?EXCEED_LIMIT, message => Message}}; {error, Reason} -> Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_placeholder.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_placeholder.erl index 582d4ef5e..f86139059 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_placeholder.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_placeholder.erl @@ -22,37 +22,45 @@ -export([to_topic_table/3]). --spec(generate(list() | map()) -> list() | map()). +-spec generate(list() | map()) -> list() | map(). generate(Topics) when is_list(Topics) -> [generate(Topic) || Topic <- Topics]; - generate(T = #{topic := Topic}) -> T#{placeholder => generate(Topic, [])}. --spec(to_topic_table(list(), map(), map()) -> list()). +-spec to_topic_table(list(), map(), map()) -> list(). to_topic_table(PHs, ClientInfo, ConnInfo) -> - Fold = fun(#{qos := Qos, rh := RH, rap := RAP, nl := NL, - placeholder := PlaceHolder, topic := RawTopic - }, - Acc) -> - case to_topic(PlaceHolder, ClientInfo, ConnInfo, []) of - {error, Reason} -> - ?SLOG(warning, #{msg => "auto_subscribe_ignored", - topic => RawTopic, - reason => Reason - }), - Acc; - <<>> -> - ?SLOG(warning, #{msg => "auto_subscribe_ignored", - topic => RawTopic, - reason => empty_topic - }), - Acc; - Topic0 -> - {Topic, Opts} = emqx_topic:parse(Topic0), - [{Topic, Opts#{qos => Qos, rh => RH, rap => RAP, nl => NL}} | Acc] - end - end, + Fold = fun( + #{ + qos := Qos, + rh := RH, + rap := RAP, + nl := NL, + placeholder := PlaceHolder, + topic := RawTopic + }, + Acc + ) -> + case to_topic(PlaceHolder, ClientInfo, ConnInfo, []) of + {error, Reason} -> + ?SLOG(warning, #{ + msg => "auto_subscribe_ignored", + topic => RawTopic, + reason => Reason + }), + Acc; + <<>> -> + ?SLOG(warning, #{ + msg => "auto_subscribe_ignored", + topic => RawTopic, + reason => empty_topic + }), + Acc; + Topic0 -> + {Topic, Opts} = emqx_topic:parse(Topic0), + [{Topic, Opts#{qos => Qos, rh => RH, rap => RAP, nl => NL}} | Acc] + end + end, lists:foldl(Fold, [], PHs). %%-------------------------------------------------------------------- diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_schema.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_schema.erl index 6770c6774..68897b802 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_schema.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_schema.erl @@ -21,10 +21,12 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl"). --export([ namespace/0 - , roots/0 - , fields/1 - , desc/1]). +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). namespace() -> "auto_subscribe". @@ -32,27 +34,41 @@ roots() -> ["auto_subscribe"]. fields("auto_subscribe") -> - [ {topics, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "topic")), - #{desc => ?DESC(auto_subscribe)})} + [ + {topics, + hoconsc:mk( + hoconsc:array(hoconsc:ref(?MODULE, "topic")), + #{desc => ?DESC(auto_subscribe)} + )} ]; - fields("topic") -> - [ {topic, sc(binary(), #{ - required => true, - example => topic_example(), - desc => ?DESC("topic")})} - , {qos, sc(emqx_schema:qos(), #{ - default => 0, - desc => ?DESC("qos")})} - , {rh, sc(range(0,2), #{ - default => 0, - desc => ?DESC("rh")})} - , {rap, sc(range(0, 1), #{ - default => 0, - desc => ?DESC("rap")})} - , {nl, sc(range(0, 1), #{ - default => 0, - desc => ?DESC(nl)})} + [ + {topic, + sc(binary(), #{ + required => true, + example => topic_example(), + desc => ?DESC("topic") + })}, + {qos, + sc(emqx_schema:qos(), #{ + default => 0, + desc => ?DESC("qos") + })}, + {rh, + sc(range(0, 2), #{ + default => 0, + desc => ?DESC("rh") + })}, + {rap, + sc(range(0, 1), #{ + default => 0, + desc => ?DESC("rap") + })}, + {nl, + sc(range(0, 1), #{ + default => 0, + desc => ?DESC(nl) + })} ]. desc("auto_subscribe") -> ?DESC("auto_subscribe"); @@ -60,10 +76,8 @@ desc("topic") -> ?DESC("topic"); desc(_) -> undefined. topic_example() -> - <<"/clientid/", ?PH_S_CLIENTID, - "/username/", ?PH_S_USERNAME, - "/host/", ?PH_S_HOST, - "/port/", ?PH_S_PORT>>. + <<"/clientid/", ?PH_S_CLIENTID, "/username/", ?PH_S_USERNAME, "/host/", ?PH_S_HOST, "/port/", + ?PH_S_PORT>>. %%-------------------------------------------------------------------- %% Internal functions diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_sup.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_sup.erl index 9733c1c59..3257afa83 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_sup.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_sup.erl @@ -28,9 +28,11 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). init([]) -> - SupFlags = #{strategy => one_for_all, - intensity => 0, - period => 1}, + SupFlags = #{ + strategy => one_for_all, + intensity => 0, + period => 1 + }, ChildSpecs = [], {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_auto_subscribe/src/topics_handler/emqx_auto_subscribe_handler.erl b/apps/emqx_auto_subscribe/src/topics_handler/emqx_auto_subscribe_handler.erl index 86c72bf49..e50bf0ef4 100644 --- a/apps/emqx_auto_subscribe/src/topics_handler/emqx_auto_subscribe_handler.erl +++ b/apps/emqx_auto_subscribe/src/topics_handler/emqx_auto_subscribe_handler.erl @@ -17,13 +17,12 @@ -export([init/1]). --spec(init(hocons:config()) -> {Module :: atom(), Config :: term()}). +-spec init(hocons:config()) -> {Module :: atom(), Config :: term()}. init(Config) -> do_init(Config). do_init(Config = #{topics := _Topics}) -> Options = emqx_auto_subscribe_internal:init(Config), {emqx_auto_subscribe_internal, Options}; - do_init(_Config) -> erlang:error(not_supported). diff --git a/apps/emqx_auto_subscribe/src/topics_handler/emqx_auto_subscribe_internal.erl b/apps/emqx_auto_subscribe/src/topics_handler/emqx_auto_subscribe_internal.erl index 1b5fd4fcc..5d413eb41 100644 --- a/apps/emqx_auto_subscribe/src/topics_handler/emqx_auto_subscribe_internal.erl +++ b/apps/emqx_auto_subscribe/src/topics_handler/emqx_auto_subscribe_internal.erl @@ -19,11 +19,11 @@ -export([handle/3]). --spec(init(Config :: map()) -> HandlerOptions :: term()). +-spec init(Config :: map()) -> HandlerOptions :: term(). init(#{topics := Topics}) -> emqx_auto_subscribe_placeholder:generate(Topics). --spec(handle(ClientInfo :: map(), ConnInfo :: map(), HandlerOptions :: term()) -> - TopicTables :: list()). +-spec handle(ClientInfo :: map(), ConnInfo :: map(), HandlerOptions :: term()) -> + TopicTables :: list(). handle(ClientInfo, ConnInfo, PlaceHolders) -> emqx_auto_subscribe_placeholder:to_topic_table(PlaceHolders, ClientInfo, ConnInfo). diff --git a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl index 5d7ebf57e..5681e7867 100644 --- a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl +++ b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl @@ -64,32 +64,34 @@ init_per_suite(Config) -> application:load(?APP), ok = emqx_common_test_helpers:load_config( emqx_auto_subscribe_schema, - <<"auto_subscribe {\n" - " topics = [\n" - " {\n" - " topic = \"/c/${clientid}\"\n" - " },\n" - " {\n" - " topic = \"/u/${username}\"\n" - " },\n" - " {\n" - " topic = \"/h/${host}\"\n" - " },\n" - " {\n" - " topic = \"/p/${port}\"\n" - " },\n" - " {\n" - " topic = \"/client/${clientid}/username/${username}/host/${host}/port/${port}\"\n" - " },\n" - " {\n" - " topic = \"/topic/simple\"\n" - " qos = 1\n" - " rh = 0\n" - " rap = 0\n" - " nl = 0\n" - " }\n" - " ]\n" - " }">> + << + "auto_subscribe {\n" + " topics = [\n" + " {\n" + " topic = \"/c/${clientid}\"\n" + " },\n" + " {\n" + " topic = \"/u/${username}\"\n" + " },\n" + " {\n" + " topic = \"/h/${host}\"\n" + " },\n" + " {\n" + " topic = \"/p/${port}\"\n" + " },\n" + " {\n" + " topic = \"/client/${clientid}/username/${username}/host/${host}/port/${port}\"\n" + " },\n" + " {\n" + " topic = \"/topic/simple\"\n" + " qos = 1\n" + " rh = 0\n" + " rap = 0\n" + " nl = 0\n" + " }\n" + " ]\n" + " }" + >> ), emqx_common_test_helpers:start_apps( [emqx_conf, emqx_dashboard, ?APP], diff --git a/apps/emqx_conf/include/emqx_conf.hrl b/apps/emqx_conf/include/emqx_conf.hrl index 87c457f40..a87471989 100644 --- a/apps/emqx_conf/include/emqx_conf.hrl +++ b/apps/emqx_conf/include/emqx_conf.hrl @@ -1,4 +1,3 @@ - -ifndef(EMQX_CONF_HRL). -define(EMQX_CONF_HRL, true). diff --git a/apps/emqx_conf/rebar.config b/apps/emqx_conf/rebar.config index 86cbfc40b..c947932a0 100644 --- a/apps/emqx_conf/rebar.config +++ b/apps/emqx_conf/rebar.config @@ -1,10 +1,11 @@ %% -*- mode: erlang -*- {erl_opts, [debug_info]}. -{deps, [ {emqx, {path, "../emqx"}} - ]}. +{deps, [{emqx, {path, "../emqx"}}]}. {shell, [ - % {config, "config/sys.config"}, + % {config, "config/sys.config"}, {apps, [emqx_conf]} ]}. + +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 1947d400d..e3b824d75 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -22,12 +22,25 @@ %% Note: multicall functions are statically checked by %% `emqx_bapi_trans' and `emqx_bpapi_static_checks' modules. Don't %% forget to update it when adding or removing them here: --export([multicall/3, multicall/5, query/1, reset/0, status/0, - skip_failed_commit/1, fast_forward_to_commit/2]). +-export([ + multicall/3, multicall/5, + query/1, + reset/0, + status/0, + skip_failed_commit/1, + fast_forward_to_commit/2 +]). -export([get_node_tnx_id/1, latest_tnx_id/0]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - handle_continue/2, code_change/3]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + handle_continue/2, + code_change/3 +]). -export_type([txn_id/0, succeed_num/0, multicall_return/1, multicall_return/0]). @@ -48,9 +61,10 @@ -type succeed_num() :: pos_integer() | all. --type multicall_return(Result) :: {ok, txn_id(), Result} - | {error, term()} - | {retry, txn_id(), Result, node()}. +-type multicall_return(Result) :: + {ok, txn_id(), Result} + | {error, term()} + | {retry, txn_id(), Result, node()}. -type multicall_return() :: multicall_return(_). @@ -63,13 +77,15 @@ mnesia(boot) -> {rlog_shard, ?CLUSTER_RPC_SHARD}, {storage, disc_copies}, {record_name, cluster_rpc_mfa}, - {attributes, record_info(fields, cluster_rpc_mfa)}]), + {attributes, record_info(fields, cluster_rpc_mfa)} + ]), ok = mria:create_table(?CLUSTER_COMMIT, [ {type, set}, {rlog_shard, ?CLUSTER_RPC_SHARD}, {storage, disc_copies}, {record_name, cluster_rpc_commit}, - {attributes, record_info(fields, cluster_rpc_commit)}]). + {attributes, record_info(fields, cluster_rpc_commit)} + ]). start_link() -> start_link(node(), ?MODULE, get_retry_ms()). @@ -97,7 +113,8 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu Begin = erlang:monotonic_time(), InitRes = case mria_rlog:role() of - core -> gen_server:call(?MODULE, MFA, Timeout); + core -> + gen_server:call(?MODULE, MFA, Timeout); replicant -> %% the initiate transaction must happened on core node %% make sure MFA(in the transaction) and the transaction on the same node @@ -119,11 +136,14 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu wait_for_all_nodes_commit(TnxId, MinDelay, RetryTimeout); {ok, TnxId, _} when is_integer(RequireNum) -> wait_for_nodes_commit(RequireNum, TnxId, MinDelay, RetryTimeout); - Error -> Error + Error -> + Error end, case OkOrFailed of - ok -> InitRes; - {error, Error0} -> {error, Error0}; + ok -> + InitRes; + {error, Error0} -> + {error, Error0}; {retry, Node0} -> {ok, TnxId0, MFARes} = InitRes, {retry, TnxId0, MFARes, Node0} @@ -187,7 +207,6 @@ handle_call(reset, _From, State) -> _ = mria:clear_table(?CLUSTER_COMMIT), _ = mria:clear_table(?CLUSTER_MFA), {reply, ok, State, {continue, ?CATCH_UP}}; - handle_call({initiate, MFA}, _From, State = #{node := Node}) -> case transaction(fun init_mfa/2, [Node, MFA]) of {atomic, {ok, TnxId, Result}} -> @@ -226,21 +245,25 @@ catch_up(State) -> catch_up(State, false). catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> case transaction(fun read_next_mfa/1, [Node]) of - {atomic, caught_up} -> ?TIMEOUT; + {atomic, caught_up} -> + ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> {Succeed, _} = apply_mfa(NextId, MFA), case Succeed orelse SkipResult of true -> case transaction(fun commit/2, [Node, NextId]) of - {atomic, ok} -> catch_up(State, false); + {atomic, ok} -> + catch_up(State, false); Error -> ?SLOG(error, #{ msg => "failed_to_commit_applied_call", applied_id => NextId, - error => Error}), + error => Error + }), RetryMs end; - false -> RetryMs + false -> + RetryMs end; {aborted, Reason} -> ?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}), @@ -256,9 +279,12 @@ read_next_mfa(Node) -> commit(Node, TnxId), ?SLOG(notice, #{ msg => "new_node_first_catch_up_and_start_commit.", - node => Node, tnx_id => TnxId}), + node => Node, + tnx_id => TnxId + }), TnxId; - [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 + [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> + LastAppliedID + 1 end, case mnesia:read(?CLUSTER_MFA, NextId) of [] -> caught_up; @@ -281,8 +307,11 @@ do_catch_up(ToTnxId, Node) -> end; [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> Reason = lists:flatten( - io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", - [Node, LastAppliedId, ToTnxId])), + io_lib:format( + "~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", + [Node, LastAppliedId, ToTnxId] + ) + ), ?SLOG(error, #{ msg => "catch_up_failed!", last_applied_id => LastAppliedId, @@ -297,11 +326,13 @@ commit(Node, TnxId) -> do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) -> {atomic, NodeId} = transaction(fun get_node_tnx_id/1, [Node]), case NodeId >= ToTnxId of - true -> NodeId; + true -> + NodeId; false -> {atomic, LatestId} = transaction(fun get_latest_id/0, []), case LatestId =< NodeId of - true -> NodeId; + true -> + NodeId; false -> catch_up(State, true), do_fast_forward_to_commit(ToTnxId, State) @@ -319,8 +350,12 @@ init_mfa(Node, MFA) -> LatestId = get_latest_id(), ok = do_catch_up_in_one_trans(LatestId, Node), TnxId = LatestId + 1, - MFARec = #cluster_rpc_mfa{tnx_id = TnxId, mfa = MFA, - initiator = Node, created_at = erlang:localtime()}, + MFARec = #cluster_rpc_mfa{ + tnx_id = TnxId, + mfa = MFA, + initiator = Node, + created_at = erlang:localtime() + }, ok = mnesia:write(?CLUSTER_MFA, MFARec, write), ok = commit(Node, TnxId), case apply_mfa(TnxId, MFA) of @@ -338,24 +373,35 @@ transaction(Func, Args) -> mria:transaction(?CLUSTER_RPC_SHARD, Func, Args). trans_status() -> - mnesia:foldl(fun(Rec, Acc) -> - #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, - case mnesia:read(?CLUSTER_MFA, TnxId) of - [MFARec] -> - #cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = MFARec, - [#{ - node => Node, - tnx_id => TnxId, - initiator => InitNode, - mfa => MFA, - created_at => CreatedAt - } | Acc]; - [] -> Acc - end end, [], ?CLUSTER_COMMIT). + mnesia:foldl( + fun(Rec, Acc) -> + #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, + case mnesia:read(?CLUSTER_MFA, TnxId) of + [MFARec] -> + #cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = + MFARec, + [ + #{ + node => Node, + tnx_id => TnxId, + initiator => InitNode, + mfa => MFA, + created_at => CreatedAt + } + | Acc + ]; + [] -> + Acc + end + end, + [], + ?CLUSTER_COMMIT + ). trans_query(TnxId) -> case mnesia:read(?CLUSTER_MFA, TnxId) of - [] -> mnesia:abort(not_found); + [] -> + mnesia:abort(not_found); [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} end. @@ -364,11 +410,12 @@ trans_query(TnxId) -> apply_mfa(TnxId, {M, F, A}) -> Res = - try erlang:apply(M, F, A) + try + erlang:apply(M, F, A) catch - throw : Reason -> + throw:Reason -> {error, #{reason => Reason}}; - Class : Reason : Stacktrace -> + Class:Reason:Stacktrace -> {error, #{exception => Class, reason => Reason, stacktrace => Stacktrace}} end, %% Do not log args as it might be sensitive information @@ -400,19 +447,23 @@ wait_for_all_nodes_commit(TnxId, Delay, Remain) -> [_ | _] when Remain > 0 -> ok = timer:sleep(Delay), wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay); - [] -> ok; - Nodes -> {retry, Nodes} + [] -> + ok; + Nodes -> + {retry, Nodes} end. wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) -> ok = timer:sleep(Delay), case length(synced_nodes(TnxId)) >= RequiredNum of - true -> ok; + true -> + ok; false when Remain > 0 -> wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay); false -> case lagging_node(TnxId) of - [] -> ok; %% All commit but The succeedNum > length(nodes()). + %% All commit but The succeedNum > length(nodes()). + [] -> ok; Nodes -> {retry, Nodes} end end. @@ -434,7 +485,7 @@ commit_status_trans(Operator, TnxId) -> get_retry_ms() -> emqx_conf:get(["node", "cluster_call", "retry_interval"], 1000). -maybe_init_tnx_id(_Node, TnxId)when TnxId < 0 -> ok; +maybe_init_tnx_id(_Node, TnxId) when TnxId < 0 -> ok; maybe_init_tnx_id(Node, TnxId) -> {atomic, _} = transaction(fun init_node_tnx_id/2, [Node, TnxId]), ok. diff --git a/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl b/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl index a11ccdace..40df5a02c 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl @@ -21,12 +21,18 @@ -include("emqx_conf.hrl"). -export([start_link/0, start_link/2]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). start_link() -> MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100), - CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5*60*1000), + CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5 * 60 * 1000), start_link(MaxHistory, CleanupMs). start_link(MaxHistory, CleanupMs) -> @@ -55,7 +61,6 @@ handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history Error -> ?SLOG(error, #{msg => "del_stale_cluster_rpc_mfa_error", error => Error}) end, {noreply, ensure_timer(State), hibernate}; - handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. @@ -75,11 +80,15 @@ ensure_timer(State = #{cleanup_ms := Ms}) -> %% @doc Keep the latest completed 100 records for querying and troubleshooting. del_stale_mfa(MaxHistory) -> DoneId = - mnesia:foldl(fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end, - infinity, ?CLUSTER_COMMIT), + mnesia:foldl( + fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end, + infinity, + ?CLUSTER_COMMIT + ), delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory). -delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok; +delete_stale_mfa('$end_of_table', _DoneId, _Count) -> + ok; delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId -> delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count); delete_stale_mfa(CurrId, DoneId, Count) when Count > 0 -> diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index e9fbbb4b9..0e355c6d2 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,9 +1,9 @@ -{application, emqx_conf, - [{description, "EMQX configuration management"}, - {vsn, "0.1.0"}, - {registered, []}, - {mod, {emqx_conf_app, []}}, - {applications, [kernel, stdlib]}, - {env, []}, - {modules, []} - ]}. +{application, emqx_conf, [ + {description, "EMQX configuration management"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {emqx_conf_app, []}}, + {applications, [kernel, stdlib]}, + {env, []}, + {modules, []} +]}. diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 7d596a737..81d0481df 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -40,7 +40,8 @@ init_conf() -> copy_override_conf_from_core_node() -> case mria_mnesia:running_nodes() -- [node()] of - [] -> %% The first core nodes is self. + %% The first core nodes is self. + [] -> ?SLOG(debug, #{msg => "skip_copy_overide_conf_from_core_node"}), {ok, -1}; Nodes -> @@ -49,26 +50,41 @@ copy_override_conf_from_core_node() -> NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0), case (Failed =/= [] orelse NotReady =/= []) andalso Ready =/= [] of true -> - Warning = #{nodes => Nodes, failed => Failed, not_ready => NotReady, - msg => "ignored_bad_nodes_when_copy_init_config"}, + Warning = #{ + nodes => Nodes, + failed => Failed, + not_ready => NotReady, + msg => "ignored_bad_nodes_when_copy_init_config" + }, ?SLOG(warning, Warning); - false -> ok + false -> + ok end, case Ready of [] -> %% Other core nodes running but no one replicated it successfully. - ?SLOG(error, #{msg => "copy_overide_conf_from_core_node_failed", - nodes => Nodes, failed => Failed, not_ready => NotReady}), + ?SLOG(error, #{ + msg => "copy_overide_conf_from_core_node_failed", + nodes => Nodes, + failed => Failed, + not_ready => NotReady + }), {error, "core node not ready"}; _ -> - SortFun = fun({ok, #{wall_clock := W1}}, - {ok, #{wall_clock := W2}}) -> W1 > W2 end, + SortFun = fun( + {ok, #{wall_clock := W1}}, + {ok, #{wall_clock := W2}} + ) -> + W1 > W2 + end, [{ok, Info} | _] = lists:sort(SortFun, Ready), #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, Msg = #{msg => "copy_overide_conf_from_core_node_success", node => Node}, ?SLOG(debug, Msg), - ok = emqx_config:save_to_override_conf(RawOverrideConf, - #{override_to => cluster}), + ok = emqx_config:save_to_override_conf( + RawOverrideConf, + #{override_to => cluster} + ), {ok, TnxId} end end. @@ -77,17 +93,19 @@ get_override_config_file() -> Node = node(), Role = mria_rlog:role(), case emqx_app:get_init_config_load_done() of - false -> {error, #{node => Node, msg => "init_conf_load_not_done"}}; + false -> + {error, #{node => Node, msg => "init_conf_load_not_done"}}; true when Role =:= core -> case erlang:whereis(emqx_config_handler) of - undefined -> {error, #{node => Node, msg => "emqx_config_handler_not_ready"}}; + undefined -> + {error, #{node => Node, msg => "emqx_config_handler_not_ready"}}; _ -> Fun = fun() -> TnxId = emqx_cluster_rpc:get_node_tnx_id(Node), WallClock = erlang:statistics(wall_clock), Conf = emqx_config_handler:get_raw_cluster_override_conf(), #{wall_clock => WallClock, conf => Conf, tnx_id => TnxId, node => Node} - end, + end, case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of {atomic, Res} -> {ok, Res}; {aborted, Reason} -> {error, #{node => Node, msg => Reason}} diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 6813e0c73..71a96ccf3 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -15,10 +15,11 @@ %%-------------------------------------------------------------------- -module(emqx_conf_cli). --export([ load/0 - , admins/1 - , unload/0 - ]). +-export([ + load/0, + admins/1, + unload/0 +]). -define(CMD, cluster_call). @@ -28,65 +29,65 @@ load() -> unload() -> emqx_ctl:unregister_command(?CMD). -admins(["status"]) -> status(); - +admins(["status"]) -> + status(); admins(["skip"]) -> status(), Nodes = mria_mnesia:running_nodes(), lists:foreach(fun emqx_cluster_rpc:skip_failed_commit/1, Nodes), status(); - admins(["skip", Node0]) -> status(), Node = list_to_existing_atom(Node0), emqx_cluster_rpc:skip_failed_commit(Node), status(); - admins(["tnxid", TnxId0]) -> TnxId = list_to_integer(TnxId0), emqx_ctl:print("~p~n", [emqx_cluster_rpc:query(TnxId)]); - admins(["fast_forward"]) -> status(), Nodes = mria_mnesia:running_nodes(), TnxId = emqx_cluster_rpc:latest_tnx_id(), lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes), status(); - admins(["fast_forward", ToTnxId]) -> status(), Nodes = mria_mnesia:running_nodes(), TnxId = list_to_integer(ToTnxId), lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes), status(); - admins(["fast_forward", Node0, ToTnxId]) -> status(), TnxId = list_to_integer(ToTnxId), Node = list_to_existing_atom(Node0), emqx_cluster_rpc:fast_forward_to_commit(Node, TnxId), status(); - admins(_) -> emqx_ctl:usage( - [ - {"cluster_call status", "status"}, - {"cluster_call skip [node]", "increase one commit on specific node"}, - {"cluster_call tnxid ", "get detailed about TnxId"}, - {"cluster_call fast_forward [node] [tnx_id]", "fast forwards to tnx_id" } - ]). + [ + {"cluster_call status", "status"}, + {"cluster_call skip [node]", "increase one commit on specific node"}, + {"cluster_call tnxid ", "get detailed about TnxId"}, + {"cluster_call fast_forward [node] [tnx_id]", "fast forwards to tnx_id"} + ] + ). status() -> emqx_ctl:print("-----------------------------------------------\n"), {atomic, Status} = emqx_cluster_rpc:status(), - lists:foreach(fun(S) -> - #{ - node := Node, - tnx_id := TnxId, - mfa := {M, F, A}, - created_at := CreatedAt - } = S, - emqx_ctl:print("~p:[~w] CreatedAt:~p ~p:~p/~w\n", - [Node, TnxId, CreatedAt, M, F, length(A)]) - end, Status), + lists:foreach( + fun(S) -> + #{ + node := Node, + tnx_id := TnxId, + mfa := {M, F, A}, + created_at := CreatedAt + } = S, + emqx_ctl:print( + "~p:[~w] CreatedAt:~p ~p:~p/~w\n", + [Node, TnxId, CreatedAt, M, F, length(A)] + ) + end, + Status + ), emqx_ctl:print("-----------------------------------------------\n"). diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 4c8916835..2c102f03a 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -32,9 +32,11 @@ -behaviour(hocon_schema). --reflect_type([ log_level/0, - file/0, - cipher/0]). +-reflect_type([ + log_level/0, + file/0, + cipher/0 +]). -export([namespace/0, roots/0, fields/1, translations/0, translation/1, validations/0, desc/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]). @@ -42,25 +44,25 @@ %% Static apps which merge their configs into the merged emqx.conf %% The list can not be made a dynamic read at run-time as it is used %% by nodetool to generate app.