From 244b1237421318e86e25f0157b825f20a6cd859f Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 31 Mar 2022 17:17:02 +0800 Subject: [PATCH] style(emqx): reformat emqx application --- apps/emqx/src/bpapi/emqx_bpapi.erl | 63 ++- apps/emqx/src/bpapi/emqx_bpapi.hrl | 8 +- apps/emqx/src/bpapi/emqx_bpapi_trans.erl | 80 ++-- apps/emqx/src/config/emqx_config_logger.erl | 18 +- .../src/emqx_esockd_htb_limiter.erl | 15 +- .../src/emqx_limiter/src/emqx_htb_limiter.erl | 316 +++++++------ .../src/emqx_limiter/src/emqx_limiter.app.src | 25 +- .../src/emqx_limiter/src/emqx_limiter_app.erl | 17 +- .../src/emqx_limiter_bucket_ref.erl | 64 ++- .../src/emqx_limiter_container.erl | 104 +++-- .../src/emqx_limiter_correction.erl | 9 +- .../emqx_limiter/src/emqx_limiter_decimal.erl | 31 +- .../emqx_limiter/src/emqx_limiter_manager.erl | 136 +++--- .../emqx_limiter/src/emqx_limiter_schema.erl | 309 ++++++++----- .../emqx_limiter/src/emqx_limiter_server.erl | 430 +++++++++++------- .../src/emqx_limiter_server_sup.erl | 38 +- .../src/emqx_limiter/src/emqx_limiter_sup.erl | 42 +- apps/emqx/src/emqx_trace/emqx_trace.erl | 371 +++++++++------ .../src/emqx_trace/emqx_trace_formatter.erl | 17 +- .../src/emqx_trace/emqx_trace_handler.erl | 94 ++-- apps/emqx/src/proto/emqx_broker_proto_v1.erl | 28 +- apps/emqx/src/proto/emqx_cm_proto_v1.erl | 32 +- .../emqx_persistent_session_proto_v1.erl | 13 +- apps/emqx/src/proto/emqx_proto_v1.erl | 29 +- apps/emqx/test/props/prop_emqx_base62.erl | 22 +- apps/emqx/test/props/prop_emqx_frame.erl | 41 +- apps/emqx/test/props/prop_emqx_json.erl | 207 +++++---- apps/emqx/test/props/prop_emqx_psk.erl | 41 +- .../test/props/prop_emqx_reason_codes.erl | 143 ++++-- apps/emqx/test/props/prop_emqx_rpc.erl | 151 +++--- apps/emqx/test/props/prop_emqx_sys.erl | 99 ++-- 31 files changed, 1809 insertions(+), 1184 deletions(-) diff --git a/apps/emqx/src/bpapi/emqx_bpapi.erl b/apps/emqx/src/bpapi/emqx_bpapi.erl index 8d68ed543..616462d94 100644 --- a/apps/emqx/src/bpapi/emqx_bpapi.erl +++ b/apps/emqx/src/bpapi/emqx_bpapi.erl @@ -16,8 +16,12 @@ -module(emqx_bpapi). %% API: --export([start/0, announce/1, supported_version/1, supported_version/2, - versions_file/1]). +-export([ + start/0, + announce/1, + supported_version/1, supported_version/2, + versions_file/1 +]). -export_type([api/0, api_version/0, var_name/0, call/0, rpc/0, bpapi_meta/0]). @@ -31,11 +35,12 @@ -type rpc() :: {_From :: call(), _To :: call()}. -type bpapi_meta() :: - #{ api := api() - , version := api_version() - , calls := [rpc()] - , casts := [rpc()] - }. + #{ + api := api(), + version := api_version(), + calls := [rpc()], + casts := [rpc()] + }. -include("emqx_bpapi.hrl"). @@ -49,11 +54,12 @@ -spec start() -> ok. start() -> - ok = mria:create_table(?TAB, [ {type, set} - , {storage, ram_copies} - , {attributes, record_info(fields, ?TAB)} - , {rlog_shard, ?COMMON_SHARD} - ]), + ok = mria:create_table(?TAB, [ + {type, set}, + {storage, ram_copies}, + {attributes, record_info(fields, ?TAB)}, + {rlog_shard, ?COMMON_SHARD} + ]), ok = mria:wait_for_tables([?TAB]), announce(emqx). @@ -86,25 +92,34 @@ versions_file(App) -> announce_fun(Data) -> %% Delete old records, if present: MS = ets:fun2ms(fun(#?TAB{key = {node(), API}}) -> - {node(), API} - end), + {node(), API} + end), OldKeys = mnesia:select(?TAB, MS, write), - _ = [mnesia:delete({?TAB, Key}) - || Key <- OldKeys], + _ = [ + mnesia:delete({?TAB, Key}) + || Key <- OldKeys + ], %% Insert new records: - _ = [mnesia:write(#?TAB{key = {node(), API}, version = Version}) - || {API, Version} <- Data], + _ = [ + mnesia:write(#?TAB{key = {node(), API}, version = Version}) + || {API, Version} <- Data + ], %% Update maximum supported version: [update_minimum(API) || {API, _} <- Data], ok. -spec update_minimum(api()) -> ok. update_minimum(API) -> - MS = ets:fun2ms(fun(#?TAB{ key = {N, A} - , version = Value - }) when N =/= ?multicall, - A =:= API -> - Value - end), + MS = ets:fun2ms(fun( + #?TAB{ + key = {N, A}, + version = Value + } + ) when + N =/= ?multicall, + A =:= API + -> + Value + end), MinVersion = lists:min(mnesia:select(?TAB, MS)), mnesia:write(#?TAB{key = {?multicall, API}, version = MinVersion}). diff --git a/apps/emqx/src/bpapi/emqx_bpapi.hrl b/apps/emqx/src/bpapi/emqx_bpapi.hrl index 08dd2a0b0..b2d2d218f 100644 --- a/apps/emqx/src/bpapi/emqx_bpapi.hrl +++ b/apps/emqx/src/bpapi/emqx_bpapi.hrl @@ -20,9 +20,9 @@ -define(multicall, multicall). --record(?TAB, - { key :: {node() | ?multicall, emqx_bpapi:api()} - , version :: emqx_bpapi:api_version() - }). +-record(?TAB, { + key :: {node() | ?multicall, emqx_bpapi:api()}, + version :: emqx_bpapi:api_version() +}). -endif. diff --git a/apps/emqx/src/bpapi/emqx_bpapi_trans.erl b/apps/emqx/src/bpapi/emqx_bpapi_trans.erl index aec4aa320..48a32a904 100644 --- a/apps/emqx/src/bpapi/emqx_bpapi_trans.erl +++ b/apps/emqx/src/bpapi/emqx_bpapi_trans.erl @@ -26,22 +26,24 @@ -type semantics() :: call | cast. --record(s, - { api :: emqx_bpapi:api() - , module :: module() - , version :: emqx_bpapi:api_version() | undefined - , targets = [] :: [{semantics(), emqx_bpapi:call(), emqx_bpapi:call()}] - , errors = [] :: list() - , file - }). +-record(s, { + api :: emqx_bpapi:api(), + module :: module(), + version :: emqx_bpapi:api_version() | undefined, + targets = [] :: [{semantics(), emqx_bpapi:call(), emqx_bpapi:call()}], + errors = [] :: list(), + file +}). format_error(invalid_name) -> "BPAPI module name should follow _proto_v pattern"; format_error({invalid_fun, Name, Arity}) -> - io_lib:format("malformed function ~p/~p. " - "BPAPI functions should have exactly one clause " - "and call (emqx_|e)rpc at the top level", - [Name, Arity]). + io_lib:format( + "malformed function ~p/~p. " + "BPAPI functions should have exactly one clause " + "and call (emqx_|e)rpc at the top level", + [Name, Arity] + ). parse_transform(Forms, _Options) -> log("Original:~n~p", [Forms]), @@ -60,11 +62,11 @@ go({attribute, _, file, {File, _}}, S) -> go({attribute, Line, module, Mod}, S) -> case api_and_version(Mod) of {ok, API, Vsn} -> S#s{api = API, version = Vsn, module = Mod}; - error -> push_err(Line, invalid_name, S) + error -> push_err(Line, invalid_name, S) end; -go({function, _Line, introduced_in, 0, _}, S) -> +go({function, _Line, introduced_in, 0, _}, S) -> S; -go({function, _Line, deprecated_since, 0, _}, S) -> +go({function, _Line, deprecated_since, 0, _}, S) -> S; go({function, Line, Name, Arity, Clauses}, S) -> analyze_fun(Line, Name, Arity, Clauses, S); @@ -79,26 +81,25 @@ finalize(Forms, S) -> {Attrs, Funcs} = lists:splitwith(fun is_attribute/1, Forms), AST = mk_meta_fun(S), log("Meta fun:~n~p", [AST]), - Attrs ++ [mk_export()] ++ [AST|Funcs]. + Attrs ++ [mk_export()] ++ [AST | Funcs]. mk_meta_fun(#s{api = API, version = Vsn, targets = Targets}) -> Line = 0, Calls = [{From, To} || {call, From, To} <- Targets], Casts = [{From, To} || {cast, From, To} <- Targets], - Ret = typerefl_quote:const(Line, #{ api => API - , version => Vsn - , calls => Calls - , casts => Casts - }), - {function, Line, ?META_FUN, _Arity = 0, - [{clause, Line, _Args = [], _Guards = [], - [Ret]}]}. + Ret = typerefl_quote:const(Line, #{ + api => API, + version => Vsn, + calls => Calls, + casts => Casts + }), + {function, Line, ?META_FUN, _Arity = 0, [{clause, Line, _Args = [], _Guards = [], [Ret]}]}. mk_export() -> {attribute, 0, export, [{?META_FUN, 0}]}. is_attribute({attribute, _Line, _Attr, _Val}) -> true; -is_attribute(_) -> false. +is_attribute(_) -> false. %% Extract the target function of the RPC call analyze_fun(Line, Name, Arity, [{clause, Line, Head, _Guards, Exprs}], S) -> @@ -122,14 +123,17 @@ analyze_exprs(Line, Name, Arity, Head, Exprs, S) -> -spec extract_outer_args([erl_parse:abstract_form()]) -> [atom()]. extract_outer_args(Abs) -> - lists:map(fun({var, _, Var}) -> - Var; - ({match, _, {var, _, Var}, _}) -> - Var; - ({match, _, _, {var, _, Var}}) -> - Var - end, - Abs). + lists:map( + fun + ({var, _, Var}) -> + Var; + ({match, _, {var, _, Var}, _}) -> + Var; + ({match, _, _, {var, _, Var}}) -> + Var + end, + Abs + ). -spec extract_target_call(_AST, [_AST]) -> {semantics(), emqx_bpapi:call()}. extract_target_call(RPCBackend, OuterArgs) -> @@ -166,13 +170,13 @@ extract_mfa(?BACKEND(emqx_cluster_rpc, multicall), [M, F, A, _RequiredNum, _Time extract_mfa(_, _) -> error("unrecognized RPC call"). -call_or_cast(cast) -> cast; +call_or_cast(cast) -> cast; call_or_cast(multicast) -> cast; call_or_cast(multicall) -> call; -call_or_cast(call) -> call. +call_or_cast(call) -> call. list_to_args({cons, _, {var, _, A}, T}) -> - [A|list_to_args(T)]; + [A | list_to_args(T)]; list_to_args({nil, _}) -> []. @@ -180,10 +184,10 @@ invalid_fun(Line, Name, Arity, S) -> push_err(Line, {invalid_fun, Name, Arity}, S). push_err(Line, Err, S = #s{errors = Errs}) -> - S#s{errors = [{Line, Err}|Errs]}. + S#s{errors = [{Line, Err} | Errs]}. push_target(Target, S = #s{targets = Targets}) -> - S#s{targets = [Target|Targets]}. + S#s{targets = [Target | Targets]}. -spec api_and_version(module()) -> {ok, emqx_bpapi:api(), emqx_bpapi:version()} | error. api_and_version(Module) -> diff --git a/apps/emqx/src/config/emqx_config_logger.erl b/apps/emqx/src/config/emqx_config_logger.erl index fca6ebc43..70b8ef473 100644 --- a/apps/emqx/src/config/emqx_config_logger.erl +++ b/apps/emqx/src/config/emqx_config_logger.erl @@ -44,10 +44,16 @@ post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) -> update_log_handlers(NewHandlers) -> OldHandlers = application:get_env(kernel, logger, []), - lists:foreach(fun({handler, HandlerId, _Mod, _Conf}) -> - logger:remove_handler(HandlerId) - end, OldHandlers -- NewHandlers), - lists:foreach(fun({handler, HandlerId, Mod, Conf}) -> - logger:add_handler(HandlerId, Mod, Conf) - end, NewHandlers -- OldHandlers), + lists:foreach( + fun({handler, HandlerId, _Mod, _Conf}) -> + logger:remove_handler(HandlerId) + end, + OldHandlers -- NewHandlers + ), + lists:foreach( + fun({handler, HandlerId, Mod, Conf}) -> + logger:add_handler(HandlerId, Mod, Conf) + end, + NewHandlers -- OldHandlers + ), application:set_env(kernel, logger, NewHandlers). diff --git a/apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl index 302be9224..b465fc287 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl @@ -21,17 +21,20 @@ %% API -export([new_create_options/2, create/1, delete/1, consume/2]). --type create_options() :: #{ module := ?MODULE - , type := emqx_limiter_schema:limiter_type() - , bucket := emqx_limiter_schema:bucket_name() - }. +-type create_options() :: #{ + module := ?MODULE, + type := emqx_limiter_schema:limiter_type(), + bucket := emqx_limiter_schema:bucket_name() +}. %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- --spec new_create_options(emqx_limiter_schema:limiter_type(), - emqx_limiter_schema:bucket_name()) -> create_options(). +-spec new_create_options( + emqx_limiter_schema:limiter_type(), + emqx_limiter_schema:bucket_name() +) -> create_options(). new_create_options(Type, BucketName) -> #{module => ?MODULE, type => Type, bucket => BucketName}. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index 2a4e13731..9ae27fba5 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -21,52 +21,71 @@ %% @end %% API --export([ make_token_bucket_limiter/2, make_ref_limiter/2, check/2 - , consume/2, set_retry/2, retry/1, make_infinity_limiter/0 - , make_future/1, available/1 - ]). +-export([ + make_token_bucket_limiter/2, + make_ref_limiter/2, + check/2, + consume/2, + set_retry/2, + retry/1, + make_infinity_limiter/0, + make_future/1, + available/1 +]). -export_type([token_bucket_limiter/0]). %% a token bucket limiter with a limiter server's bucket reference --type token_bucket_limiter() :: #{ %% the number of tokens currently available - tokens := non_neg_integer() - , rate := decimal() - , capacity := decimal() - , lasttime := millisecond() - %% @see emqx_limiter_schema - , max_retry_time := non_neg_integer() - %% @see emqx_limiter_schema - , failure_strategy := failure_strategy() - , divisible := boolean() %% @see emqx_limiter_schema - , low_water_mark := non_neg_integer() %% @see emqx_limiter_schema - , bucket := bucket() %% the limiter server's bucket - %% retry contenxt - %% undefined meaning no retry context or no need to retry - , retry_ctx => undefined - | retry_context(token_bucket_limiter()) %% the retry context - , atom => any() %% allow to add other keys - }. +%% the number of tokens currently available +-type token_bucket_limiter() :: #{ + tokens := non_neg_integer(), + rate := decimal(), + capacity := decimal(), + lasttime := millisecond(), + %% @see emqx_limiter_schema + max_retry_time := non_neg_integer(), + %% @see emqx_limiter_schema + failure_strategy := failure_strategy(), + %% @see emqx_limiter_schema + divisible := boolean(), + %% @see emqx_limiter_schema + low_water_mark := non_neg_integer(), + %% the limiter server's bucket + bucket := bucket(), + + %% retry contenxt + %% undefined meaning no retry context or no need to retry + retry_ctx => + undefined + %% the retry context + | retry_context(token_bucket_limiter()), + %% allow to add other keys + atom => any() +}. %% a limiter server's bucket reference --type ref_limiter() :: #{ max_retry_time := non_neg_integer() - , failure_strategy := failure_strategy() - , divisible := boolean() - , low_water_mark := non_neg_integer() - , bucket := bucket() +-type ref_limiter() :: #{ + max_retry_time := non_neg_integer(), + failure_strategy := failure_strategy(), + divisible := boolean(), + low_water_mark := non_neg_integer(), + bucket := bucket(), - , retry_ctx => undefined | retry_context(ref_limiter()) - , atom => any() %% allow to add other keys - }. + retry_ctx => undefined | retry_context(ref_limiter()), + %% allow to add other keys + atom => any() +}. -type retry_fun(Limiter) :: fun((pos_integer(), Limiter) -> inner_check_result(Limiter)). -type acquire_type(Limiter) :: integer() | retry_context(Limiter). --type retry_context(Limiter) :: #{ continuation := undefined | retry_fun(Limiter) - , diff := non_neg_integer() %% how many tokens are left to obtain +-type retry_context(Limiter) :: #{ + continuation := undefined | retry_fun(Limiter), + %% how many tokens are left to obtain + diff := non_neg_integer(), - , need => pos_integer() - , start => millisecond() - }. + need => pos_integer(), + start => millisecond() +}. -type bucket() :: emqx_limiter_bucket_ref:bucket_ref(). -type limiter() :: token_bucket_limiter() | ref_limiter() | infinity. @@ -77,27 +96,31 @@ -type check_result_pause(Limiter) :: {pause_type(), millisecond(), retry_context(Limiter), Limiter}. -type result_drop(Limiter) :: {drop, Limiter}. --type check_result(Limiter) :: check_result_ok(Limiter) - | check_result_pause(Limiter) - | result_drop(Limiter). +-type check_result(Limiter) :: + check_result_ok(Limiter) + | check_result_pause(Limiter) + | result_drop(Limiter). --type inner_check_result(Limiter) :: check_result_ok(Limiter) - | check_result_pause(Limiter). +-type inner_check_result(Limiter) :: + check_result_ok(Limiter) + | check_result_pause(Limiter). --type consume_result(Limiter) :: check_result_ok(Limiter) - | result_drop(Limiter). +-type consume_result(Limiter) :: + check_result_ok(Limiter) + | result_drop(Limiter). -type decimal() :: emqx_limiter_decimal:decimal(). -type failure_strategy() :: emqx_limiter_schema:failure_strategy(). --type limiter_bucket_cfg() :: #{ rate := decimal() - , initial := non_neg_integer() - , low_water_mark := non_neg_integer() - , capacity := decimal() - , divisible := boolean() - , max_retry_time := non_neg_integer() - , failure_strategy := failure_strategy() - }. +-type limiter_bucket_cfg() :: #{ + rate := decimal(), + initial := non_neg_integer(), + low_water_mark := non_neg_integer(), + capacity := decimal(), + divisible := boolean(), + max_retry_time := non_neg_integer(), + failure_strategy := failure_strategy() +}. -type future() :: pos_integer(). @@ -113,10 +136,11 @@ %%@doc create a limiter -spec make_token_bucket_limiter(limiter_bucket_cfg(), bucket()) -> _. make_token_bucket_limiter(Cfg, Bucket) -> - Cfg#{ tokens => emqx_limiter_server:get_initial_val(Cfg) - , lasttime => ?NOW - , bucket => Bucket - }. + Cfg#{ + tokens => emqx_limiter_server:get_initial_val(Cfg), + lasttime => ?NOW, + bucket => Bucket + }. %%@doc create a limiter server's reference -spec make_ref_limiter(limiter_bucket_cfg(), bucket()) -> ref_limiter(). @@ -130,38 +154,39 @@ make_infinity_limiter() -> %% @doc request some tokens %% it will automatically retry when failed until the maximum retry time is reached %% @end --spec consume(integer(), Limiter) -> consume_result(Limiter) - when Limiter :: limiter(). +-spec consume(integer(), Limiter) -> consume_result(Limiter) when + Limiter :: limiter(). consume(Need, #{max_retry_time := RetryTime} = Limiter) when Need > 0 -> try_consume(RetryTime, Need, Limiter); - consume(_, Limiter) -> {ok, Limiter}. %% @doc try to request the token and return the result without automatically retrying --spec check(acquire_type(Limiter), Limiter) -> check_result(Limiter) - when Limiter :: limiter(). +-spec check(acquire_type(Limiter), Limiter) -> check_result(Limiter) when + Limiter :: limiter(). check(_, infinity) -> {ok, infinity}; - check(Need, Limiter) when is_integer(Need), Need > 0 -> case do_check(Need, Limiter) of {ok, _} = Done -> Done; {PauseType, Pause, Ctx, Limiter2} -> - {PauseType, - Pause, - Ctx#{start => ?NOW, need => Need}, Limiter2} + {PauseType, Pause, Ctx#{start => ?NOW, need => Need}, Limiter2} end; - %% check with retry context. %% when continuation = undefined, the diff will be 0 %% so there is no need to check continuation here -check(#{continuation := Cont, +check( + #{ + continuation := Cont, diff := Diff, - start := Start} = Retry, - #{failure_strategy := Failure, - max_retry_time := RetryTime} = Limiter) when Diff > 0 -> + start := Start + } = Retry, + #{ + failure_strategy := Failure, + max_retry_time := RetryTime + } = Limiter +) when Diff > 0 -> case Cont(Diff, Limiter) of {ok, _} = Done -> Done; @@ -175,13 +200,12 @@ check(#{continuation := Cont, on_failure(Failure, try_restore(Retry2, Limiter2)) end end; - check(_, Limiter) -> {ok, Limiter}. %% @doc pack the retry context into the limiter data --spec set_retry(retry_context(Limiter), Limiter) -> Limiter - when Limiter :: limiter(). +-spec set_retry(retry_context(Limiter), Limiter) -> Limiter when + Limiter :: limiter(). set_retry(Retry, Limiter) -> Limiter#{retry_ctx => Retry}. @@ -189,7 +213,6 @@ set_retry(Retry, Limiter) -> -spec retry(Limiter) -> check_result(Limiter) when Limiter :: limiter(). retry(#{retry_ctx := Retry} = Limiter) when is_map(Retry) -> check(Retry, Limiter#{retry_ctx := undefined}); - retry(Limiter) -> {ok, Limiter}. @@ -202,30 +225,32 @@ make_future(Need) -> %% @doc get the number of tokens currently available -spec available(limiter()) -> decimal(). -available(#{tokens := Tokens, - rate := Rate, - lasttime := LastTime, - capacity := Capacity, - bucket := Bucket}) -> +available(#{ + tokens := Tokens, + rate := Rate, + lasttime := LastTime, + capacity := Capacity, + bucket := Bucket +}) -> Tokens2 = apply_elapsed_time(Rate, ?NOW - LastTime, Tokens, Capacity), erlang:min(Tokens2, emqx_limiter_bucket_ref:available(Bucket)); - available(#{bucket := Bucket}) -> emqx_limiter_bucket_ref:available(Bucket); - available(infinity) -> infinity. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- --spec try_consume(millisecond(), - acquire_type(Limiter), - Limiter) -> consume_result(Limiter) when Limiter :: limiter(). -try_consume(LeftTime, Retry, #{failure_strategy := Failure} = Limiter) - when LeftTime =< 0, is_map(Retry) -> +-spec try_consume( + millisecond(), + acquire_type(Limiter), + Limiter +) -> consume_result(Limiter) when Limiter :: limiter(). +try_consume(LeftTime, Retry, #{failure_strategy := Failure} = Limiter) when + LeftTime =< 0, is_map(Retry) +-> on_failure(Failure, try_restore(Retry, Limiter)); - try_consume(LeftTime, Need, Limiter) when is_integer(Need) -> case do_check(Need, Limiter) of {ok, _} = Done -> @@ -234,10 +259,14 @@ try_consume(LeftTime, Need, Limiter) when is_integer(Need) -> timer:sleep(erlang:min(LeftTime, Pause)), try_consume(LeftTime - Pause, Ctx#{need => Need}, Limiter2) end; - -try_consume(LeftTime, - #{continuation := Cont, - diff := Diff} = Retry, Limiter) when Diff > 0 -> +try_consume( + LeftTime, + #{ + continuation := Cont, + diff := Diff + } = Retry, + Limiter +) when Diff > 0 -> case Cont(Diff, Limiter) of {ok, _} = Done -> Done; @@ -245,64 +274,78 @@ try_consume(LeftTime, timer:sleep(erlang:min(LeftTime, Pause)), try_consume(LeftTime - Pause, maps:merge(Retry, Ctx), Limiter2) end; - try_consume(_, _, Limiter) -> {ok, Limiter}. --spec do_check(acquire_type(Limiter), Limiter) -> inner_check_result(Limiter) - when Limiter :: limiter(). +-spec do_check(acquire_type(Limiter), Limiter) -> inner_check_result(Limiter) when + Limiter :: limiter(). do_check(Need, #{tokens := Tokens} = Limiter) when Need =< Tokens -> do_check_with_parent_limiter(Need, Limiter); - do_check(Need, #{tokens := _} = Limiter) -> do_reset(Need, Limiter); - -do_check(Need, #{divisible := Divisible, - bucket := Bucket} = Ref) -> +do_check( + Need, + #{ + divisible := Divisible, + bucket := Bucket + } = Ref +) -> case emqx_limiter_bucket_ref:check(Need, Bucket, Divisible) of {ok, Tokens} -> may_return_or_pause(Tokens, Ref); {PauseType, Rate, Obtained} -> - return_pause(Rate, - PauseType, - fun ?FUNCTION_NAME/2, Need - Obtained, Ref) + return_pause( + Rate, + PauseType, + fun ?FUNCTION_NAME/2, + Need - Obtained, + Ref + ) end. on_failure(force, Limiter) -> {ok, Limiter}; - on_failure(drop, Limiter) -> {drop, Limiter}; - on_failure(throw, Limiter) -> Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]), erlang:throw({rate_check_fail, Message}). -spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) -> - inner_check_result(token_bucket_limiter()). -do_check_with_parent_limiter(Need, - #{tokens := Tokens, - divisible := Divisible, - bucket := Bucket} = Limiter) -> + inner_check_result(token_bucket_limiter()). +do_check_with_parent_limiter( + Need, + #{ + tokens := Tokens, + divisible := Divisible, + bucket := Bucket + } = Limiter +) -> case emqx_limiter_bucket_ref:check(Need, Bucket, Divisible) of {ok, RefLeft} -> Left = sub(Tokens, Need), may_return_or_pause(erlang:min(RefLeft, Left), Limiter#{tokens := Left}); {PauseType, Rate, Obtained} -> - return_pause(Rate, - PauseType, - fun ?FUNCTION_NAME/2, - Need - Obtained, - Limiter#{tokens := sub(Tokens, Obtained)}) + return_pause( + Rate, + PauseType, + fun ?FUNCTION_NAME/2, + Need - Obtained, + Limiter#{tokens := sub(Tokens, Obtained)} + ) end. -spec do_reset(pos_integer(), token_bucket_limiter()) -> inner_check_result(token_bucket_limiter()). -do_reset(Need, - #{tokens := Tokens, - rate := Rate, - lasttime := LastTime, - divisible := Divisible, - capacity := Capacity} = Limiter) -> +do_reset( + Need, + #{ + tokens := Tokens, + rate := Rate, + lasttime := LastTime, + divisible := Divisible, + capacity := Capacity + } = Limiter +) -> Now = ?NOW, Tokens2 = apply_elapsed_time(Rate, Now - LastTime, Tokens, Capacity), @@ -312,49 +355,54 @@ do_reset(Need, do_check_with_parent_limiter(Need, Limiter2); Available when Divisible andalso Available > 0 -> %% must be allocated here, because may be Need > Capacity - return_pause(Rate, - partial, - fun do_reset/2, - Need - Available, - Limiter#{tokens := 0, lasttime := Now}); + return_pause( + Rate, + partial, + fun do_reset/2, + Need - Available, + Limiter#{tokens := 0, lasttime := Now} + ); _ -> return_pause(Rate, pause, fun do_reset/2, Need, Limiter) end. --spec return_pause(decimal(), pause_type(), retry_fun(Limiter), pos_integer(), Limiter) - -> check_result_pause(Limiter) when Limiter :: limiter(). +-spec return_pause(decimal(), pause_type(), retry_fun(Limiter), pos_integer(), Limiter) -> + check_result_pause(Limiter) +when + Limiter :: limiter(). return_pause(infinity, PauseType, Fun, Diff, Limiter) -> %% workaround when emqx_limiter_server's rate is infinity {PauseType, ?MINIMUM_PAUSE, make_retry_context(Fun, Diff), Limiter}; - return_pause(Rate, PauseType, Fun, Diff, Limiter) -> Val = erlang:round(Diff * emqx_limiter_schema:minimum_period() / Rate), Pause = emqx_misc:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE), {PauseType, Pause, make_retry_context(Fun, Diff), Limiter}. -spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) -> - retry_context(Limiter) when Limiter :: limiter(). + retry_context(Limiter) +when + Limiter :: limiter(). make_retry_context(Fun, Diff) -> #{continuation => Fun, diff => Diff}. --spec try_restore(retry_context(Limiter), Limiter) -> Limiter - when Limiter :: limiter(). -try_restore(#{need := Need, diff := Diff}, - #{tokens := Tokens, capacity := Capacity, bucket := Bucket} = Limiter) -> +-spec try_restore(retry_context(Limiter), Limiter) -> Limiter when + Limiter :: limiter(). +try_restore( + #{need := Need, diff := Diff}, + #{tokens := Tokens, capacity := Capacity, bucket := Bucket} = Limiter +) -> Back = Need - Diff, Tokens2 = erlang:min(Capacity, Back + Tokens), emqx_limiter_bucket_ref:try_restore(Back, Bucket), Limiter#{tokens := Tokens2}; - try_restore(#{need := Need, diff := Diff}, #{bucket := Bucket} = Limiter) -> emqx_limiter_bucket_ref:try_restore(Need - Diff, Bucket), Limiter. --spec may_return_or_pause(non_neg_integer(), Limiter) -> check_result(Limiter) - when Limiter :: limiter(). +-spec may_return_or_pause(non_neg_integer(), Limiter) -> check_result(Limiter) when + Limiter :: limiter(). may_return_or_pause(Left, #{low_water_mark := Mark} = Limiter) when Left >= Mark -> {ok, Limiter}; - may_return_or_pause(_, Limiter) -> {pause, ?MINIMUM_PAUSE, make_retry_context(undefined, 0), Limiter}. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter.app.src b/apps/emqx/src/emqx_limiter/src/emqx_limiter.app.src index cd4c1467b..69c1c6fb0 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter.app.src +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter.app.src @@ -1,13 +1,14 @@ %% -*- mode: erlang -*- -{application, emqx_limiter, - [{description, "EMQX Hierarchical Limiter"}, - {vsn, "1.0.0"}, % strict semver, bump manually! - {modules, []}, - {registered, [emqx_limiter_sup]}, - {applications, [kernel,stdlib,emqx]}, - {mod, {emqx_limiter_app,[]}}, - {env, []}, - {licenses, ["Apache-2.0"]}, - {maintainers, ["EMQX Team "]}, - {links, []} - ]}. +{application, emqx_limiter, [ + {description, "EMQX Hierarchical Limiter"}, + % strict semver, bump manually! + {vsn, "1.0.0"}, + {modules, []}, + {registered, [emqx_limiter_sup]}, + {applications, [kernel, stdlib, emqx]}, + {mod, {emqx_limiter_app, []}}, + {env, []}, + {licenses, ["Apache-2.0"]}, + {maintainers, ["EMQX Team "]}, + {links, []} +]}. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl index 6e66645f3..71a1ff315 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl @@ -31,13 +31,16 @@ %% top supervisor of the tree. %% @end %%-------------------------------------------------------------------- --spec start(StartType :: normal | - {takeover, Node :: node()} | - {failover, Node :: node()}, - StartArgs :: term()) -> - {ok, Pid :: pid()} | - {ok, Pid :: pid(), State :: term()} | - {error, Reason :: term()}. +-spec start( + StartType :: + normal + | {takeover, Node :: node()} + | {failover, Node :: node()}, + StartArgs :: term() +) -> + {ok, Pid :: pid()} + | {ok, Pid :: pid(), State :: term()} + | {error, Reason :: term()}. start(_StartType, _StartArgs) -> {ok, _} = emqx_limiter_sup:start_link(). diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl index 4265e64ae..00baf5281 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl @@ -21,18 +21,25 @@ %% @end %% API --export([ new/3, check/3, try_restore/2 - , available/1]). +-export([ + new/3, + check/3, + try_restore/2, + available/1 +]). -export_type([bucket_ref/0]). -type infinity_bucket_ref() :: infinity. --type finite_bucket_ref() :: #{ counter := counters:counters_ref() - , index := index() - , rate := rate()}. +-type finite_bucket_ref() :: #{ + counter := counters:counters_ref(), + index := index(), + rate := rate() +}. --type bucket_ref() :: infinity_bucket_ref() - | finite_bucket_ref(). +-type bucket_ref() :: + infinity_bucket_ref() + | finite_bucket_ref(). -type index() :: emqx_limiter_server:index(). -type rate() :: emqx_limiter_decimal:decimal(). @@ -41,37 +48,45 @@ %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- --spec new(undefined | counters:countres_ref(), - undefined | index(), - rate()) -> bucket_ref(). +-spec new( + undefined | counters:countres_ref(), + undefined | index(), + rate() +) -> bucket_ref(). new(undefined, _, _) -> infinity; - new(Counter, Index, Rate) -> - #{counter => Counter, - index => Index, - rate => Rate}. + #{ + counter => Counter, + index => Index, + rate => Rate + }. %% @doc check tokens -spec check(pos_integer(), bucket_ref(), Disivisble :: boolean()) -> - HasToken :: {ok, emqx_limiter_decimal:decimal()} - | {check_failure_type(), rate(), pos_integer()}. + HasToken :: + {ok, emqx_limiter_decimal:decimal()} + | {check_failure_type(), rate(), pos_integer()}. check(_, infinity, _) -> {ok, infinity}; - -check(Need, - #{counter := Counter, +check( + Need, + #{ + counter := Counter, index := Index, - rate := Rate}, - Divisible)-> + rate := Rate + }, + Divisible +) -> RefToken = counters:get(Counter, Index), - if RefToken >= Need -> + if + RefToken >= Need -> counters:sub(Counter, Index, Need), {ok, RefToken - Need}; - Divisible andalso RefToken > 0 -> + Divisible andalso RefToken > 0 -> counters:sub(Counter, Index, RefToken), {partial, Rate, RefToken}; - true -> + true -> {pause, Rate, 0} end. @@ -93,7 +108,6 @@ try_restore(Inc, #{counter := Counter, index := Index}) -> -spec available(bucket_ref()) -> emqx_limiter_decimal:decimal(). available(#{counter := Counter, index := Index}) -> counters:get(Counter, Index); - available(infinity) -> infinity. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl index 65b213485..84f32c2cf 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl @@ -21,21 +21,31 @@ %% @end %% API --export([ new/0, new/1, new/2, get_limiter_by_names/2 - , add_new/3, update_by_name/3, set_retry_context/2 - , check/3, retry/2, get_retry_context/1 - , check_list/2, retry_list/2 - ]). +-export([ + new/0, new/1, new/2, + get_limiter_by_names/2, + add_new/3, + update_by_name/3, + set_retry_context/2, + check/3, + retry/2, + get_retry_context/1, + check_list/2, + retry_list/2 +]). -export_type([container/0, check_result/0]). --type container() :: #{ limiter_type() => undefined | limiter() - %% the retry context of the limiter - , retry_key() => undefined - | retry_context() - | future() - , retry_ctx := undefined | any() %% the retry context of the container - }. +-type container() :: #{ + limiter_type() => undefined | limiter(), + %% the retry context of the limiter + retry_key() => + undefined + | retry_context() + | future(), + %% the retry context of the container + retry_ctx := undefined | any() +}. -type future() :: pos_integer(). -type limiter_type() :: emqx_limiter_schema:limiter_type(). @@ -43,9 +53,10 @@ -type retry_context() :: emqx_htb_limiter:retry_context(). -type bucket_name() :: emqx_limiter_schema:bucket_name(). -type millisecond() :: non_neg_integer(). --type check_result() :: {ok, container()} - | {drop, container()} - | {pause, millisecond(), container()}. +-type check_result() :: + {ok, container()} + | {drop, container()} + | {pause, millisecond(), container()}. -define(RETRY_KEY(Type), {retry, Type}). -type retry_key() :: ?RETRY_KEY(limiter_type()). @@ -62,36 +73,43 @@ new() -> new(Types) -> new(Types, #{}). --spec new(list(limiter_type()), - #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container(). +-spec new( + list(limiter_type()), + #{limiter_type() => emqx_limiter_schema:bucket_name()} +) -> container(). new(Types, Names) -> get_limiter_by_names(Types, Names). %% @doc generate a container %% according to the type of limiter and the bucket name configuration of the limiter %% @end --spec get_limiter_by_names(list(limiter_type()), - #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container(). +-spec get_limiter_by_names( + list(limiter_type()), + #{limiter_type() => emqx_limiter_schema:bucket_name()} +) -> container(). get_limiter_by_names(Types, BucketNames) -> Init = fun(Type, Acc) -> - Limiter = emqx_limiter_server:connect(Type, BucketNames), - add_new(Type, Limiter, Acc) - end, + Limiter = emqx_limiter_server:connect(Type, BucketNames), + add_new(Type, Limiter, Acc) + end, lists:foldl(Init, #{retry_ctx => undefined}, Types). %% @doc add the specified type of limiter to the container --spec update_by_name(limiter_type(), - bucket_name() | #{limiter_type() => bucket_name()}, - container()) -> container(). +-spec update_by_name( + limiter_type(), + bucket_name() | #{limiter_type() => bucket_name()}, + container() +) -> container(). update_by_name(Type, Buckets, Container) -> Limiter = emqx_limiter_server:connect(Type, Buckets), add_new(Type, Limiter, Container). -spec add_new(limiter_type(), limiter(), container()) -> container(). add_new(Type, Limiter, Container) -> - Container#{ Type => Limiter - , ?RETRY_KEY(Type) => undefined - }. + Container#{ + Type => Limiter, + ?RETRY_KEY(Type) => undefined + }. %% @doc check the specified limiter -spec check(pos_integer(), limiter_type(), container()) -> check_result(). @@ -107,18 +125,21 @@ check_list([{Need, Type} | T], Container) -> check_list(T, Container#{Type := Limiter2}); {_, PauseMs, Ctx, Limiter2} -> Fun = fun({FN, FT}, Acc) -> - Future = emqx_htb_limiter:make_future(FN), - Acc#{?RETRY_KEY(FT) := Future} - end, - C2 = lists:foldl(Fun, - Container#{Type := Limiter2, - ?RETRY_KEY(Type) := Ctx}, - T), + Future = emqx_htb_limiter:make_future(FN), + Acc#{?RETRY_KEY(FT) := Future} + end, + C2 = lists:foldl( + Fun, + Container#{ + Type := Limiter2, + ?RETRY_KEY(Type) := Ctx + }, + T + ), {pause, PauseMs, C2}; {drop, Limiter2} -> {drop, Container#{Type := Limiter2}} end; - check_list([], Container) -> {ok, Container}. @@ -132,24 +153,23 @@ retry(Type, Container) -> retry_list([Type | T], Container) -> Key = ?RETRY_KEY(Type), case Container of - #{Type := Limiter, - Key := Retry} when Retry =/= undefined -> + #{ + Type := Limiter, + Key := Retry + } when Retry =/= undefined -> case emqx_htb_limiter:check(Retry, Limiter) of {ok, Limiter2} -> %% undefined meaning there is no retry context or there is no need to retry %% when a limiter has a undefined retry context, the check will always success retry_list(T, Container#{Type := Limiter2, Key := undefined}); {_, PauseMs, Ctx, Limiter2} -> - {pause, - PauseMs, - Container#{Type := Limiter2, Key := Ctx}}; + {pause, PauseMs, Container#{Type := Limiter2, Key := Ctx}}; {drop, Limiter2} -> {drop, Container#{Type := Limiter2}} end; _ -> retry_list(T, Container) end; - retry_list([], Container) -> {ok, Container}. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl index d4cba2505..5dda06ba3 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl @@ -17,11 +17,12 @@ -module(emqx_limiter_correction). %% API --export([ add/2 ]). +-export([add/2]). --type correction_value() :: #{ correction := emqx_limiter_decimal:zero_or_float() - , any() => any() - }. +-type correction_value() :: #{ + correction := emqx_limiter_decimal:zero_or_float(), + any() => any() +}. -export_type([correction_value/0]). diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl index 6c6016c06..b4c0839fe 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl @@ -19,8 +19,13 @@ -module(emqx_limiter_decimal). %% API --export([ add/2, sub/2, mul/2 - , put_to_counter/3, floor_div/2]). +-export([ + add/2, + sub/2, + mul/2, + put_to_counter/3, + floor_div/2 +]). -export_type([decimal/0, zero_or_float/0]). -type decimal() :: infinity | number(). @@ -30,33 +35,35 @@ %%% API %%-------------------------------------------------------------------- -spec add(decimal(), decimal()) -> decimal(). -add(A, B) when A =:= infinity - orelse B =:= infinity -> +add(A, B) when + A =:= infinity orelse + B =:= infinity +-> infinity; - add(A, B) -> A + B. -spec sub(decimal(), decimal()) -> decimal(). -sub(A, B) when A =:= infinity - orelse B =:= infinity -> +sub(A, B) when + A =:= infinity orelse + B =:= infinity +-> infinity; - sub(A, B) -> A - B. -spec mul(decimal(), decimal()) -> decimal(). -mul(A, B) when A =:= infinity - orelse B =:= infinity -> +mul(A, B) when + A =:= infinity orelse + B =:= infinity +-> infinity; - mul(A, B) -> A * B. -spec floor_div(decimal(), number()) -> decimal(). floor_div(infinity, _) -> infinity; - floor_div(A, B) -> erlang:floor(A / B). diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl index 9cf4bc1d0..d95856332 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl @@ -22,14 +22,26 @@ -include_lib("stdlib/include/ms_transform.hrl"). %% API --export([ start_link/0, start_server/1, find_bucket/1 - , find_bucket/2, insert_bucket/2, insert_bucket/3 - , make_path/2, restart_server/1 - ]). +-export([ + start_link/0, + start_server/1, + find_bucket/1, + find_bucket/2, + insert_bucket/2, insert_bucket/3, + make_path/2, + restart_server/1 +]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, format_status/2]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3, + format_status/2 +]). -export_type([path/0]). @@ -38,9 +50,10 @@ -type bucket_name() :: emqx_limiter_schema:bucket_name(). %% counter record in ets table --record(bucket, { path :: path() - , bucket :: bucket_ref() - }). +-record(bucket, { + path :: path(), + bucket :: bucket_ref() +}). -type bucket_ref() :: emqx_limiter_bucket_ref:bucket_ref(). @@ -58,7 +71,7 @@ restart_server(Type) -> emqx_limiter_server_sup:restart(Type). -spec find_bucket(limiter_type(), bucket_name()) -> - {ok, bucket_ref()} | undefined. + {ok, bucket_ref()} | undefined. find_bucket(Type, BucketName) -> find_bucket(make_path(Type, BucketName)). @@ -71,13 +84,14 @@ find_bucket(Path) -> undefined end. --spec insert_bucket(limiter_type(), - bucket_name(), - bucket_ref()) -> boolean(). +-spec insert_bucket( + limiter_type(), + bucket_name(), + bucket_ref() +) -> boolean(). insert_bucket(Type, BucketName, Bucket) -> inner_insert_bucket(make_path(Type, BucketName), Bucket). - -spec insert_bucket(path(), bucket_ref()) -> true. insert_bucket(Path, Bucket) -> inner_insert_bucket(Path, Bucket). @@ -91,10 +105,11 @@ make_path(Type, BucketName) -> %% Starts the server %% @end %%-------------------------------------------------------------------- --spec start_link() -> {ok, Pid :: pid()} | - {error, Error :: {already_started, pid()}} | - {error, Error :: term()} | - ignore. +-spec start_link() -> + {ok, Pid :: pid()} + | {error, Error :: {already_started, pid()}} + | {error, Error :: term()} + | ignore. start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -108,16 +123,22 @@ start_link() -> %% Initializes the server %% @end %%-------------------------------------------------------------------- --spec init(Args :: term()) -> {ok, State :: term()} | - {ok, State :: term(), Timeout :: timeout()} | - {ok, State :: term(), hibernate} | - {stop, Reason :: term()} | - ignore. +-spec init(Args :: term()) -> + {ok, State :: term()} + | {ok, State :: term(), Timeout :: timeout()} + | {ok, State :: term(), hibernate} + | {stop, Reason :: term()} + | ignore. init([]) -> - _ = ets:new(?TAB, [ set, public, named_table, {keypos, #bucket.path} - , {write_concurrency, true}, {read_concurrency, true} - , {heir, erlang:whereis(emqx_limiter_sup), none} - ]), + _ = ets:new(?TAB, [ + set, + public, + named_table, + {keypos, #bucket.path}, + {write_concurrency, true}, + {read_concurrency, true}, + {heir, erlang:whereis(emqx_limiter_sup), none} + ]), {ok, #{}}. %%-------------------------------------------------------------------- @@ -127,14 +148,14 @@ init([]) -> %% @end %%-------------------------------------------------------------------- -spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) -> - {reply, Reply :: term(), NewState :: term()} | - {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} | - {reply, Reply :: term(), NewState :: term(), hibernate} | - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: term()} | - {stop, Reason :: term(), NewState :: term()}. + {reply, Reply :: term(), NewState :: term()} + | {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} + | {reply, Reply :: term(), NewState :: term(), hibernate} + | {noreply, NewState :: term()} + | {noreply, NewState :: term(), Timeout :: timeout()} + | {noreply, NewState :: term(), hibernate} + | {stop, Reason :: term(), Reply :: term(), NewState :: term()} + | {stop, Reason :: term(), NewState :: term()}. handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignore, State}. @@ -146,10 +167,10 @@ handle_call(Req, _From, State) -> %% @end %%-------------------------------------------------------------------- -spec handle_cast(Request :: term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: term(), NewState :: term()}. + {noreply, NewState :: term()} + | {noreply, NewState :: term(), Timeout :: timeout()} + | {noreply, NewState :: term(), hibernate} + | {stop, Reason :: term(), NewState :: term()}. handle_cast(Req, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Req}), {noreply, State}. @@ -161,10 +182,10 @@ handle_cast(Req, State) -> %% @end %%-------------------------------------------------------------------- -spec handle_info(Info :: timeout() | term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: normal | term(), NewState :: term()}. + {noreply, NewState :: term()} + | {noreply, NewState :: term(), Timeout :: timeout()} + | {noreply, NewState :: term(), hibernate} + | {stop, Reason :: normal | term(), NewState :: term()}. handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. @@ -178,8 +199,10 @@ handle_info(Info, State) -> %% with Reason. The return value is ignored. %% @end %%-------------------------------------------------------------------- --spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), - State :: term()) -> any(). +-spec terminate( + Reason :: normal | shutdown | {shutdown, term()} | term(), + State :: term() +) -> any(). terminate(_Reason, _State) -> ok. @@ -189,10 +212,13 @@ terminate(_Reason, _State) -> %% Convert process state when code is changed %% @end %%-------------------------------------------------------------------- --spec code_change(OldVsn :: term() | {down, term()}, - State :: term(), - Extra :: term()) -> {ok, NewState :: term()} | - {error, Reason :: term()}. +-spec code_change( + OldVsn :: term() | {down, term()}, + State :: term(), + Extra :: term() +) -> + {ok, NewState :: term()} + | {error, Reason :: term()}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -204,8 +230,10 @@ code_change(_OldVsn, State, _Extra) -> %% or when it appears in termination error logs. %% @end %%-------------------------------------------------------------------- --spec format_status(Opt :: normal | terminate, - Status :: list()) -> Status :: term(). +-spec format_status( + Opt :: normal | terminate, + Status :: list() +) -> Status :: term(). format_status(_Opt, Status) -> Status. @@ -214,5 +242,7 @@ format_status(_Opt, Status) -> %%-------------------------------------------------------------------- -spec inner_insert_bucket(path(), bucket_ref()) -> true. inner_insert_bucket(Path, Bucket) -> - ets:insert(?TAB, - #bucket{path = Path, bucket = Bucket}). + ets:insert( + ?TAB, + #bucket{path = Path, bucket = Bucket} + ). diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 7ae4a01cc..9740fac4b 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -18,43 +18,60 @@ -include_lib("typerefl/include/types.hrl"). --export([ roots/0, fields/1, to_rate/1, to_capacity/1 - , minimum_period/0, to_burst_rate/1, to_initial/1 - , namespace/0, get_bucket_cfg_path/2, desc/1 - ]). +-export([ + roots/0, + fields/1, + to_rate/1, + to_capacity/1, + minimum_period/0, + to_burst_rate/1, + to_initial/1, + namespace/0, + get_bucket_cfg_path/2, + desc/1 +]). -define(KILOBYTE, 1024). --type limiter_type() :: bytes_in - | message_in - | connection - | message_routing - | batch. +-type limiter_type() :: + bytes_in + | message_in + | connection + | message_routing + | batch. -type bucket_name() :: atom(). -type rate() :: infinity | float(). -type burst_rate() :: 0 | float(). --type capacity() :: infinity | number(). %% the capacity of the token bucket --type initial() :: non_neg_integer(). %% initial capacity of the token bucket +%% the capacity of the token bucket +-type capacity() :: infinity | number(). +%% initial capacity of the token bucket +-type initial() :: non_neg_integer(). -type bucket_path() :: list(atom()). %% the processing strategy after the failure of the token request --type failure_strategy() :: force %% Forced to pass - | drop %% discard the current request - | throw. %% throw an exception + +%% Forced to pass +-type failure_strategy() :: + force + %% discard the current request + | drop + %% throw an exception + | throw. -typerefl_from_string({rate/0, ?MODULE, to_rate}). -typerefl_from_string({burst_rate/0, ?MODULE, to_burst_rate}). -typerefl_from_string({capacity/0, ?MODULE, to_capacity}). -typerefl_from_string({initial/0, ?MODULE, to_initial}). --reflect_type([ rate/0 - , burst_rate/0 - , capacity/0 - , initial/0 - , failure_strategy/0 - , bucket_name/0 - ]). +-reflect_type([ + rate/0, + burst_rate/0, + capacity/0, + initial/0, + failure_strategy/0, + bucket_name/0 +]). -export_type([limiter_type/0, bucket_path/0]). @@ -66,87 +83,154 @@ namespace() -> limiter. roots() -> [limiter]. fields(limiter) -> - [ {bytes_in, sc(ref(limiter_opts), - #{description => - <<"The bytes_in limiter.
" + [ + {bytes_in, + sc( + ref(limiter_opts), + #{ + description => + << + "The bytes_in limiter.
" "It is used to limit the inbound bytes rate for this EMQX node." "If the this limiter limit is reached," - "the restricted client will be slow down even be hung for a while.">> - })} - , {message_in, sc(ref(limiter_opts), - #{description => - <<"The message_in limiter.
" - "This is used to limit the inbound message numbers for this EMQX node" - "If the this limiter limit is reached," - "the restricted client will be slow down even be hung for a while.">> - })} - , {connection, sc(ref(limiter_opts), - #{description => - <<"The connection limiter.
" - "This is used to limit the connection rate for this EMQX node" - "If the this limiter limit is reached," - "New connections will be refused" - >>})} - , {message_routing, sc(ref(limiter_opts), - #{description => - <<"The message_routing limiter.
" - "This is used to limite the deliver rate for this EMQX node" - "If the this limiter limit is reached," - "New publish will be refused" - >> - })} - , {batch, sc(ref(limiter_opts), - #{description => <<"The batch limiter.
" - "This is used for EMQX internal batch operation" - "e.g. limite the retainer's deliver rate" - >> - })} + "the restricted client will be slow down even be hung for a while." + >> + } + )}, + {message_in, + sc( + ref(limiter_opts), + #{ + description => + << + "The message_in limiter.
" + "This is used to limit the inbound message numbers for this EMQX node" + "If the this limiter limit is reached," + "the restricted client will be slow down even be hung for a while." + >> + } + )}, + {connection, + sc( + ref(limiter_opts), + #{ + description => + << + "The connection limiter.
" + "This is used to limit the connection rate for this EMQX node" + "If the this limiter limit is reached," + "New connections will be refused" + >> + } + )}, + {message_routing, + sc( + ref(limiter_opts), + #{ + description => + << + "The message_routing limiter.
" + "This is used to limite the deliver rate for this EMQX node" + "If the this limiter limit is reached," + "New publish will be refused" + >> + } + )}, + {batch, + sc( + ref(limiter_opts), + #{ + description => << + "The batch limiter.
" + "This is used for EMQX internal batch operation" + "e.g. limite the retainer's deliver rate" + >> + } + )} ]; - fields(limiter_opts) -> - [ {rate, sc(rate(), #{default => "infinity", desc => "The rate"})} - , {burst, sc(burst_rate(), - #{default => "0/0s", - desc => "The burst, This value is based on rate.
- This value + rate = the maximum limit that can be achieved when limiter burst." - })} - , {bucket, sc(map("bucket name", ref(bucket_opts)), #{desc => "Buckets config"})} + [ + {rate, sc(rate(), #{default => "infinity", desc => "The rate"})}, + {burst, + sc( + burst_rate(), + #{ + default => "0/0s", + desc => + "The burst, This value is based on rate.
\n" + " This value + rate = the maximum limit that can be achieved when limiter burst." + } + )}, + {bucket, sc(map("bucket name", ref(bucket_opts)), #{desc => "Buckets config"})} ]; - fields(bucket_opts) -> - [ {rate, sc(rate(), #{desc => "Rate for this bucket."})} - , {capacity, sc(capacity(), #{desc => "The maximum number of tokens for this bucket."})} - , {initial, sc(initial(), #{default => "0", - desc => "The initial number of tokens for this bucket."})} - , {per_client, sc(ref(client_bucket), - #{default => #{}, - desc => "The rate limit for each user of the bucket," + [ + {rate, sc(rate(), #{desc => "Rate for this bucket."})}, + {capacity, sc(capacity(), #{desc => "The maximum number of tokens for this bucket."})}, + {initial, + sc(initial(), #{ + default => "0", + desc => "The initial number of tokens for this bucket." + })}, + {per_client, + sc( + ref(client_bucket), + #{ + default => #{}, + desc => + "The rate limit for each user of the bucket," " this field is not required" - })} + } + )} ]; - fields(client_bucket) -> - [ {rate, sc(rate(), #{default => "infinity", desc => "Rate for this bucket."})} - , {initial, sc(initial(), #{default => "0", desc => "The initial number of tokens for this bucket."})} - %% low_water_mark add for emqx_channel and emqx_session - %% both modules consume first and then check - %% so we need to use this value to prevent excessive consumption - %% (e.g, consumption from an empty bucket) - , {low_water_mark, sc(initial(), - #{desc => "If the remaining tokens are lower than this value, -the check/consume will succeed, but it will be forced to wait for a short period of time.", - default => "0"})} - , {capacity, sc(capacity(), #{desc => "The capacity of the token bucket.", - default => "infinity"})} - , {divisible, sc(boolean(), - #{desc => "Is it possible to split the number of requested tokens?", - default => false})} - , {max_retry_time, sc(emqx_schema:duration(), - #{ desc => "The maximum retry time when acquire failed." - , default => "10s"})} - , {failure_strategy, sc(failure_strategy(), - #{ desc => "The strategy when all the retries failed." - , default => force})} + [ + {rate, sc(rate(), #{default => "infinity", desc => "Rate for this bucket."})}, + {initial, + sc(initial(), #{default => "0", desc => "The initial number of tokens for this bucket."})}, + %% low_water_mark add for emqx_channel and emqx_session + %% both modules consume first and then check + %% so we need to use this value to prevent excessive consumption + %% (e.g, consumption from an empty bucket) + {low_water_mark, + sc( + initial(), + #{ + desc => + "If the remaining tokens are lower than this value,\n" + "the check/consume will succeed, but it will be forced to wait for a short period of time.", + default => "0" + } + )}, + {capacity, + sc(capacity(), #{ + desc => "The capacity of the token bucket.", + default => "infinity" + })}, + {divisible, + sc( + boolean(), + #{ + desc => "Is it possible to split the number of requested tokens?", + default => false + } + )}, + {max_retry_time, + sc( + emqx_schema:duration(), + #{ + desc => "The maximum retry time when acquire failed.", + default => "10s" + } + )}, + {failure_strategy, + sc( + failure_strategy(), + #{ + desc => "The strategy when all the retries failed.", + default => force + } + )} ]. desc(limiter) -> @@ -184,33 +268,40 @@ to_rate(Str, CanInfinity, CanZero) -> case Tokens of ["infinity"] when CanInfinity -> {ok, infinity}; - [QuotaStr] -> %% if time unit is 1s, it can be omitted + %% if time unit is 1s, it can be omitted + [QuotaStr] -> {ok, Val} = to_capacity(QuotaStr), - check_capacity(Str, Val, CanZero, - fun(Quota) -> - {ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS} - end); + check_capacity( + Str, + Val, + CanZero, + fun(Quota) -> + {ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS} + end + ); [QuotaStr, Interval] -> {ok, Val} = to_capacity(QuotaStr), - check_capacity(Str, Val, CanZero, - fun(Quota) -> - case emqx_schema:to_duration_ms(Interval) of - {ok, Ms} when Ms > 0 -> - {ok, Quota * minimum_period() / Ms}; - _ -> - {error, Str} - end - end); + check_capacity( + Str, + Val, + CanZero, + fun(Quota) -> + case emqx_schema:to_duration_ms(Interval) of + {ok, Ms} when Ms > 0 -> + {ok, Quota * minimum_period() / Ms}; + _ -> + {error, Str} + end + end + ); _ -> {error, Str} end. check_capacity(_Str, 0, true, _Cont) -> {ok, 0}; - check_capacity(Str, 0, false, _Cont) -> {error, Str}; - check_capacity(_Str, Quota, _CanZero, Cont) -> Cont(Quota). diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 84688ba38..eb450bcfb 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -30,34 +30,53 @@ -include_lib("emqx/include/logger.hrl"). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, format_status/2]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3, + format_status/2 +]). --export([ start_link/1, connect/2, info/1 - , name/1, get_initial_val/1, update_config/1 - ]). +-export([ + start_link/1, + connect/2, + info/1, + name/1, + get_initial_val/1, + update_config/1 +]). --type root() :: #{ rate := rate() %% number of tokens generated per period - , burst := rate() - , period := pos_integer() %% token generation interval(second) - , consumed := non_neg_integer() - }. +%% number of tokens generated per period +-type root() :: #{ + rate := rate(), + burst := rate(), + %% token generation interval(second) + period := pos_integer(), + consumed := non_neg_integer() +}. --type bucket() :: #{ name := bucket_name() - , rate := rate() - , obtained := non_neg_integer() - , correction := emqx_limiter_decimal:zero_or_float() %% token correction value - , capacity := capacity() - , counter := undefined | counters:counters_ref() - , index := undefined | index() - }. +-type bucket() :: #{ + name := bucket_name(), + rate := rate(), + obtained := non_neg_integer(), + %% token correction value + correction := emqx_limiter_decimal:zero_or_float(), + capacity := capacity(), + counter := undefined | counters:counters_ref(), + index := undefined | index() +}. --type state() :: #{ type := limiter_type() - , root := undefined | root() - , buckets := buckets() - , counter := undefined | counters:counters_ref() %% current counter to alloc - , index := index() - }. +-type state() :: #{ + type := limiter_type(), + root := undefined | root(), + buckets := buckets(), + %% current counter to alloc + counter := undefined | counters:counters_ref(), + index := index() +}. -type buckets() :: #{bucket_name() => bucket()}. -type limiter_type() :: emqx_limiter_schema:limiter_type(). @@ -69,7 +88,8 @@ -type index() :: pos_integer(). -define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)). --define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter +%% minimum coefficient for overloaded limiter +-define(OVERLOAD_MIN_ALLOC, 0.3). -define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end). -export_type([index/0]). @@ -80,29 +100,33 @@ %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- --spec connect(limiter_type(), - bucket_name() | #{limiter_type() => bucket_name() | undefined}) -> - emqx_htb_limiter:limiter(). +-spec connect( + limiter_type(), + bucket_name() | #{limiter_type() => bucket_name() | undefined} +) -> + emqx_htb_limiter:limiter(). %% If no bucket path is set in config, there will be no limit connect(_Type, undefined) -> emqx_htb_limiter:make_infinity_limiter(); - connect(Type, BucketName) when is_atom(BucketName) -> CfgPath = emqx_limiter_schema:get_bucket_cfg_path(Type, BucketName), case emqx:get_config(CfgPath, undefined) of undefined -> ?SLOG(error, #{msg => "bucket_config_not_found", path => CfgPath}), throw("bucket's config not found"); - #{rate := AggrRate, - capacity := AggrSize, - per_client := #{rate := CliRate, capacity := CliSize} = Cfg} -> + #{ + rate := AggrRate, + capacity := AggrSize, + per_client := #{rate := CliRate, capacity := CliSize} = Cfg + } -> case emqx_limiter_manager:find_bucket(Type, BucketName) of {ok, Bucket} -> - if CliRate < AggrRate orelse CliSize < AggrSize -> + if + CliRate < AggrRate orelse CliSize < AggrSize -> emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); - Bucket =:= infinity -> + Bucket =:= infinity -> emqx_htb_limiter:make_infinity_limiter(); - true -> + true -> emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) end; undefined -> @@ -110,7 +134,6 @@ connect(Type, BucketName) when is_atom(BucketName) -> throw("invalid bucket") end end; - connect(Type, Paths) -> connect(Type, maps:get(Type, Paths, undefined)). @@ -135,7 +158,6 @@ update_config(Type) -> start_link(Type) -> gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []). - %%-------------------------------------------------------------------- %%% gen_server callbacks %%-------------------------------------------------------------------- @@ -146,11 +168,12 @@ start_link(Type) -> %% Initializes the server %% @end %%-------------------------------------------------------------------- --spec init(Args :: term()) -> {ok, State :: term()} | - {ok, State :: term(), Timeout :: timeout()} | - {ok, State :: term(), hibernate} | - {stop, Reason :: term()} | - ignore. +-spec init(Args :: term()) -> + {ok, State :: term()} + | {ok, State :: term(), Timeout :: timeout()} + | {ok, State :: term(), hibernate} + | {stop, Reason :: term()} + | ignore. init([Type]) -> State = init_tree(Type), #{root := #{period := Perido}} = State, @@ -164,21 +187,19 @@ init([Type]) -> %% @end %%-------------------------------------------------------------------- -spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) -> - {reply, Reply :: term(), NewState :: term()} | - {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} | - {reply, Reply :: term(), NewState :: term(), hibernate} | - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: term()} | - {stop, Reason :: term(), NewState :: term()}. + {reply, Reply :: term(), NewState :: term()} + | {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} + | {reply, Reply :: term(), NewState :: term(), hibernate} + | {noreply, NewState :: term()} + | {noreply, NewState :: term(), Timeout :: timeout()} + | {noreply, NewState :: term(), hibernate} + | {stop, Reason :: term(), Reply :: term(), NewState :: term()} + | {stop, Reason :: term(), NewState :: term()}. handle_call(info, _From, State) -> {reply, State, State}; - handle_call(update_config, _From, #{type := Type}) -> NewState = init_tree(Type), {reply, ok, NewState}; - handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -190,10 +211,10 @@ handle_call(Req, _From, State) -> %% @end %%-------------------------------------------------------------------- -spec handle_cast(Request :: term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: term(), NewState :: term()}. + {noreply, NewState :: term()} + | {noreply, NewState :: term(), Timeout :: timeout()} + | {noreply, NewState :: term(), hibernate} + | {stop, Reason :: term(), NewState :: term()}. handle_cast(Req, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Req}), {noreply, State}. @@ -205,13 +226,12 @@ handle_cast(Req, State) -> %% @end %%-------------------------------------------------------------------- -spec handle_info(Info :: timeout() | term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: normal | term(), NewState :: term()}. + {noreply, NewState :: term()} + | {noreply, NewState :: term(), Timeout :: timeout()} + | {noreply, NewState :: term(), hibernate} + | {stop, Reason :: normal | term(), NewState :: term()}. handle_info(oscillate, State) -> {noreply, oscillation(State)}; - handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. @@ -225,8 +245,10 @@ handle_info(Info, State) -> %% with Reason. The return value is ignored. %% @end %%-------------------------------------------------------------------- --spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), - State :: term()) -> any(). +-spec terminate( + Reason :: normal | shutdown | {shutdown, term()} | term(), + State :: term() +) -> any(). terminate(_Reason, _State) -> ok. @@ -236,10 +258,13 @@ terminate(_Reason, _State) -> %% Convert process state when code is changed %% @end %%-------------------------------------------------------------------- --spec code_change(OldVsn :: term() | {down, term()}, - State :: term(), - Extra :: term()) -> {ok, NewState :: term()} | - {error, Reason :: term()}. +-spec code_change( + OldVsn :: term() | {down, term()}, + State :: term(), + Extra :: term() +) -> + {ok, NewState :: term()} + | {error, Reason :: term()}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -251,8 +276,10 @@ code_change(_OldVsn, State, _Extra) -> %% or when it appears in termination error logs. %% @end %%-------------------------------------------------------------------- --spec format_status(Opt :: normal | terminate, - Status :: list()) -> Status :: term(). +-spec format_status( + Opt :: normal | terminate, + Status :: list() +) -> Status :: term(). format_status(_Opt, Status) -> Status. @@ -264,40 +291,54 @@ oscillate(Interval) -> %% @doc generate tokens, and then spread to leaf nodes -spec oscillation(state()) -> state(). -oscillation(#{root := #{rate := Flow, - period := Interval, - consumed := Consumed} = Root, - buckets := Buckets} = State) -> +oscillation( + #{ + root := #{ + rate := Flow, + period := Interval, + consumed := Consumed + } = Root, + buckets := Buckets + } = State +) -> oscillate(Interval), Ordereds = get_ordered_buckets(Buckets), {Alloced, Buckets2} = transverse(Ordereds, Flow, 0, Buckets), - maybe_burst(State#{buckets := Buckets2, - root := Root#{consumed := Consumed + Alloced}}). + maybe_burst(State#{ + buckets := Buckets2, + root := Root#{consumed := Consumed + Alloced} + }). %% @doc horizontal spread --spec transverse(list(bucket()), - flow(), - non_neg_integer(), - buckets()) -> {non_neg_integer(), buckets()}. +-spec transverse( + list(bucket()), + flow(), + non_neg_integer(), + buckets() +) -> {non_neg_integer(), buckets()}. transverse([H | T], InFlow, Alloced, Buckets) when InFlow > 0 -> {BucketAlloced, Buckets2} = longitudinal(H, InFlow, Buckets), InFlow2 = sub(InFlow, BucketAlloced), Alloced2 = Alloced + BucketAlloced, transverse(T, InFlow2, Alloced2, Buckets2); - transverse(_, _, Alloced, Buckets) -> {Alloced, Buckets}. %% @doc vertical spread -spec longitudinal(bucket(), flow(), buckets()) -> - {non_neg_integer(), buckets()}. -longitudinal(#{name := Name, - rate := Rate, - capacity := Capacity, - counter := Counter, - index := Index, - obtained := Obtained} = Bucket, - InFlow, Buckets) when Counter =/= undefined -> + {non_neg_integer(), buckets()}. +longitudinal( + #{ + name := Name, + rate := Rate, + capacity := Capacity, + counter := Counter, + index := Index, + obtained := Obtained + } = Bucket, + InFlow, + Buckets +) when Counter =/= undefined -> Flow = erlang:min(InFlow, Rate), ShouldAlloc = @@ -305,8 +346,10 @@ longitudinal(#{name := Name, Tokens when Tokens < 0 -> %% toknes's value mayb be a negative value(stolen from the future) %% because ∃ x. add(Capacity, x) < 0, so here we must compare with minimum value - erlang:max(add(Capacity, Tokens), - mul(Capacity, ?OVERLOAD_MIN_ALLOC)); + erlang:max( + add(Capacity, Tokens), + mul(Capacity, ?OVERLOAD_MIN_ALLOC) + ); Tokens -> %% is it possible that Tokens > Capacity ??? erlang:max(sub(Capacity, Tokens), 0) @@ -320,12 +363,10 @@ longitudinal(#{name := Name, {Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket), counters:add(Counter, Index, Inc), - {Inc, - Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}}; + {Inc, Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}}; _ -> {0, Buckets} end; - longitudinal(_, _, Buckets) -> {0, Buckets}. @@ -333,95 +374,111 @@ longitudinal(_, _, Buckets) -> get_ordered_buckets(Buckets) when is_map(Buckets) -> BucketList = maps:values(Buckets), get_ordered_buckets(BucketList); - get_ordered_buckets(Buckets) -> %% sort by obtained, avoid node goes hungry - lists:sort(fun(#{obtained := A}, #{obtained := B}) -> - A < B - end, - Buckets). + lists:sort( + fun(#{obtained := A}, #{obtained := B}) -> + A < B + end, + Buckets + ). -spec maybe_burst(state()) -> state(). -maybe_burst(#{buckets := Buckets, - root := #{burst := Burst}} = State) when Burst > 0 -> - Fold = fun(_Name, #{counter := Cnt, index := Idx} = Bucket, Acc) when Cnt =/= undefined -> - case counters:get(Cnt, Idx) > 0 of - true -> - Acc; - false -> - [Bucket | Acc] - end; - (_Name, _Bucket, Acc) -> - Acc - end, +maybe_burst( + #{ + buckets := Buckets, + root := #{burst := Burst} + } = State +) when Burst > 0 -> + Fold = fun + (_Name, #{counter := Cnt, index := Idx} = Bucket, Acc) when Cnt =/= undefined -> + case counters:get(Cnt, Idx) > 0 of + true -> + Acc; + false -> + [Bucket | Acc] + end; + (_Name, _Bucket, Acc) -> + Acc + end, Empties = maps:fold(Fold, [], Buckets), dispatch_burst(Empties, Burst, State); - maybe_burst(State) -> State. -spec dispatch_burst(list(bucket()), non_neg_integer(), state()) -> state(). dispatch_burst([], _, State) -> State; - -dispatch_burst(Empties, InFlow, - #{root := #{consumed := Consumed} = Root, buckets := Buckets} = State) -> +dispatch_burst( + Empties, + InFlow, + #{root := #{consumed := Consumed} = Root, buckets := Buckets} = State +) -> EachFlow = InFlow / erlang:length(Empties), {Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets), State#{root := Root#{consumed := Consumed + Alloced}, buckets := Buckets2}. --spec dispatch_burst_to_buckets(list(bucket()), - float(), - non_neg_integer(), buckets()) -> {non_neg_integer(), buckets()}. +-spec dispatch_burst_to_buckets( + list(bucket()), + float(), + non_neg_integer(), + buckets() +) -> {non_neg_integer(), buckets()}. dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) -> - #{name := Name, - counter := Counter, - index := Index, - obtained := Obtained} = Bucket, + #{ + name := Name, + counter := Counter, + index := Index, + obtained := Obtained + } = Bucket, {Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket), counters:add(Counter, Index, Inc), Buckets2 = Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}, dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Buckets2); - dispatch_burst_to_buckets([], _, Alloced, Buckets) -> {Alloced, Buckets}. -spec init_tree(emqx_limiter_schema:limiter_type()) -> state(). init_tree(Type) -> - State = #{ type => Type - , root => undefined - , counter => undefined - , index => 1 - , buckets => #{} - }, + State = #{ + type => Type, + root => undefined, + counter => undefined, + index => 1, + buckets => #{} + }, #{bucket := Buckets} = Cfg = emqx:get_config([limiter, Type]), {Factor, Root} = make_root(Cfg), {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []), - State2 = State#{root := Root, - counter := counters:new(CounterNum, [write_concurrency]) - }, + State2 = State#{ + root := Root, + counter := counters:new(CounterNum, [write_concurrency]) + }, lists:foldl(fun(F, Acc) -> F(Acc) end, State2, DelayBuckets). -spec make_root(hocons:confg()) -> {number(), root()}. make_root(#{rate := Rate, burst := Burst}) when Rate >= 1 -> - {1, #{rate => Rate, - burst => Burst, - period => emqx_limiter_schema:minimum_period(), - consumed => 0}}; - + {1, #{ + rate => Rate, + burst => Burst, + period => emqx_limiter_schema:minimum_period(), + consumed => 0 + }}; make_root(#{rate := Rate, burst := Burst}) -> MiniPeriod = emqx_limiter_schema:minimum_period(), Factor = 1 / Rate, - {Factor, #{rate => 1, - burst => Burst * Factor, - period => erlang:floor(Factor * MiniPeriod), - consumed => 0}}. + {Factor, #{ + rate => 1, + burst => Burst * Factor, + period => erlang:floor(Factor * MiniPeriod), + consumed => 0 + }}. make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBuckets) -> Path = emqx_limiter_manager:make_path(Type, Name), @@ -433,48 +490,66 @@ make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBucket emqx_limiter_manager:insert_bucket(Path, Ref), CounterNum2 = CounterNum, InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> - State#{buckets := Buckets#{BucketName => Bucket}} - end; + State#{buckets := Buckets#{BucketName => Bucket}} + end; RawRate -> #{capacity := Capacity} = Conf, Initial = get_initial_val(Conf), Rate = mul(RawRate, Factor), CounterNum2 = CounterNum + 1, InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> - {Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State), - Bucket2 = Bucket#{counter := Counter, index := Idx}, - State2#{buckets := Buckets#{BucketName => Bucket2}} - end + {Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State), + Bucket2 = Bucket#{counter := Counter, index := Idx}, + State2#{buckets := Buckets#{BucketName => Bucket2}} + end end, - Bucket = #{ name => Name - , rate => Rate - , obtained => 0 - , correction => 0 - , capacity => Capacity - , counter => undefined - , index => undefined}, + Bucket = #{ + name => Name, + rate => Rate, + obtained => 0, + correction => 0, + capacity => Capacity, + counter => undefined, + index => undefined + }, DelayInit = ?CURRYING(Bucket, InitFun), - make_bucket(T, - Type, GlobalCfg, Factor, CounterNum2, [DelayInit | DelayBuckets]); - + make_bucket( + T, + Type, + GlobalCfg, + Factor, + CounterNum2, + [DelayInit | DelayBuckets] + ); make_bucket([], _Type, _Global, _Factor, CounterNum, DelayBuckets) -> {CounterNum, DelayBuckets}. -spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) -> - {counters:counters_ref(), pos_integer(), state()}. -alloc_counter(Path, Rate, Initial, - #{counter := Counter, index := Index} = State) -> - + {counters:counters_ref(), pos_integer(), state()}. +alloc_counter( + Path, + Rate, + Initial, + #{counter := Counter, index := Index} = State +) -> case emqx_limiter_manager:find_bucket(Path) of - {ok, #{counter := ECounter, - index := EIndex}} when ECounter =/= undefined -> + {ok, #{ + counter := ECounter, + index := EIndex + }} when ECounter =/= undefined -> init_counter(Path, ECounter, EIndex, Rate, Initial, State); _ -> - init_counter(Path, Counter, Index, - Rate, Initial, State#{index := Index + 1}) + init_counter( + Path, + Counter, + Index, + Rate, + Initial, + State#{index := Index + 1} + ) end. init_counter(Path, Counter, Index, Rate, Initial, State) -> @@ -483,26 +558,29 @@ init_counter(Path, Counter, Index, Rate, Initial, State) -> emqx_limiter_manager:insert_bucket(Path, Ref), {Counter, Index, State}. - %% @doc find first limited node -get_counter_rate(#{rate := Rate, capacity := Capacity}, _GlobalCfg) - when Rate =/= infinity orelse Capacity =/= infinity -> %% TODO maybe no need to check capacity +get_counter_rate(#{rate := Rate, capacity := Capacity}, _GlobalCfg) when + %% TODO maybe no need to check capacity + Rate =/= infinity orelse Capacity =/= infinity +-> Rate; - -get_counter_rate(_Cfg, #{rate := Rate}) -> +get_counter_rate(_Cfg, #{rate := Rate}) -> Rate. -spec get_initial_val(hocons:config()) -> decimal(). -get_initial_val(#{initial := Initial, - rate := Rate, - capacity := Capacity}) -> +get_initial_val(#{ + initial := Initial, + rate := Rate, + capacity := Capacity +}) -> %% initial will nevner be infinity(see the emqx_limiter_schema) - if Initial > 0 -> + if + Initial > 0 -> Initial; - Rate =/= infinity -> + Rate =/= infinity -> erlang:min(Rate, Capacity); - Capacity =/= infinity -> + Capacity =/= infinity -> Capacity; - true -> + true -> 0 end. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl index 70332481a..0727d3235 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl @@ -33,11 +33,12 @@ %% Starts the supervisor %% @end %%-------------------------------------------------------------------- --spec start_link() -> {ok, Pid :: pid()} | - {error, {already_started, Pid :: pid()}} | - {error, {shutdown, term()}} | - {error, term()} | - ignore. +-spec start_link() -> + {ok, Pid :: pid()} + | {error, {already_started, Pid :: pid()}} + | {error, {shutdown, term()}} + | {error, term()} + | ignore. start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -67,13 +68,14 @@ restart(Type) -> %% @end %%-------------------------------------------------------------------- -spec init(Args :: term()) -> - {ok, {SupFlags :: supervisor:sup_flags(), - [ChildSpec :: supervisor:child_spec()]}} | - ignore. + {ok, {SupFlags :: supervisor:sup_flags(), [ChildSpec :: supervisor:child_spec()]}} + | ignore. init([]) -> - SupFlags = #{strategy => one_for_one, - intensity => 10, - period => 3600}, + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 3600 + }, {ok, {SupFlags, childs()}}. @@ -82,12 +84,14 @@ init([]) -> %%--================================================================== make_child(Type) -> Id = emqx_limiter_server:name(Type), - #{id => Id, - start => {emqx_limiter_server, start_link, [Type]}, - restart => transient, - shutdown => 5000, - type => worker, - modules => [emqx_limiter_server]}. + #{ + id => Id, + start => {emqx_limiter_server, start_link, [Type]}, + restart => transient, + shutdown => 5000, + type => worker, + modules => [emqx_limiter_server] + }. childs() -> Conf = emqx:get_config([limiter]), diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_sup.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_sup.erl index 210eb9507..a8808151e 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_sup.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_sup.erl @@ -33,11 +33,12 @@ %% Starts the supervisor %% @end %%-------------------------------------------------------------------- --spec start_link() -> {ok, Pid :: pid()} | - {error, {already_started, Pid :: pid()}} | - {error, {shutdown, term()}} | - {error, term()} | - ignore. +-spec start_link() -> + {ok, Pid :: pid()} + | {error, {already_started, Pid :: pid()}} + | {error, {shutdown, term()}} + | {error, term()} + | ignore. start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -55,22 +56,27 @@ start_link() -> %% @end %%-------------------------------------------------------------------- -spec init(Args :: term()) -> - {ok, {SupFlags :: supervisor:sup_flags(), - [ChildSpec :: supervisor:child_spec()]}} | - ignore. + {ok, {SupFlags :: supervisor:sup_flags(), [ChildSpec :: supervisor:child_spec()]}} + | ignore. init([]) -> - SupFlags = #{strategy => one_for_one, - intensity => 10, - period => 3600}, + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 3600 + }, - Childs = [ make_child(emqx_limiter_manager, worker) - , make_child(emqx_limiter_server_sup, supervisor)], + Childs = [ + make_child(emqx_limiter_manager, worker), + make_child(emqx_limiter_server_sup, supervisor) + ], {ok, {SupFlags, Childs}}. make_child(Mod, Type) -> - #{id => Mod, - start => {Mod, start_link, []}, - restart => transient, - type => Type, - modules => [Mod]}. + #{ + id => Mod, + start => {Mod, start_link, []}, + restart => transient, + type => Type, + modules => [Mod] + }. diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 33e20f845..b02e33f01 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -24,30 +24,33 @@ -boot_mnesia({mnesia, [boot]}). -export([mnesia/1]). --export([ publish/1 - , subscribe/3 - , unsubscribe/2 - , log/3 - ]). +-export([ + publish/1, + subscribe/3, + unsubscribe/2, + log/3 +]). --export([ start_link/0 - , list/0 - , list/1 - , get_trace_filename/1 - , create/1 - , delete/1 - , clear/0 - , update/2 - , check/0 - ]). +-export([ + start_link/0, + list/0, + list/1, + get_trace_filename/1, + create/1, + delete/1, + clear/0, + update/2, + check/0 +]). --export([ format/1 - , zip_dir/0 - , filename/2 - , trace_dir/0 - , trace_file/1 - , delete_files_after_send/2 - ]). +-export([ + format/1, + zip_dir/0, + filename/2, + trace_dir/0, + trace_file/1, + delete_files_after_send/2 +]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -56,22 +59,23 @@ -define(OWN_KEYS, [level, filters, filter_default, handlers]). -ifdef(TEST). --export([ log_file/2 - , find_closest_time/2 - ]). +-export([ + log_file/2, + find_closest_time/2 +]). -endif. -export_type([ip_address/0]). -type ip_address() :: string(). -record(?TRACE, { - name :: binary() | undefined | '_' - , type :: clientid | topic | ip_address | undefined | '_' - , filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_' - , enable = true :: boolean() | '_' - , start_at :: integer() | undefined | '_' - , end_at :: integer() | undefined | '_' - }). + name :: binary() | undefined | '_', + type :: clientid | topic | ip_address | undefined | '_', + filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_', + enable = true :: boolean() | '_', + start_at :: integer() | undefined | '_', + end_at :: integer() | undefined | '_' +}). mnesia(boot) -> ok = mria:create_table(?TRACE, [ @@ -79,18 +83,23 @@ mnesia(boot) -> {rlog_shard, ?COMMON_SHARD}, {storage, disc_copies}, {record_name, ?TRACE}, - {attributes, record_info(fields, ?TRACE)}]). + {attributes, record_info(fields, ?TRACE)} + ]). -publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore; +publish(#message{topic = <<"$SYS/", _/binary>>}) -> + ignore; publish(#message{from = From, topic = Topic, payload = Payload}) when - is_binary(From); is_atom(From) -> + is_binary(From); is_atom(From) +-> ?TRACE("PUBLISH", "publish_to", #{topic => Topic, payload => Payload}). -subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore; +subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> + ignore; subscribe(Topic, SubId, SubOpts) -> ?TRACE("SUBSCRIBE", "subscribe", #{topic => Topic, sub_opts => SubOpts, sub_id => SubId}). -unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore; +unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> + ignore; unsubscribe(Topic, SubOpts) -> ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). @@ -103,36 +112,47 @@ log(List, Msg, Meta0) -> Log = #{level => debug, meta => Meta, msg => Msg}, log_filter(List, Log). -log_filter([], _Log) -> ok; +log_filter([], _Log) -> + ok; log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) -> case FilterFun(Log0, {Filter, Name}) of - stop -> stop; - ignore -> ignore; + stop -> + stop; + ignore -> + ignore; Log -> case logger_config:get(ets:whereis(logger), Id) of {ok, #{module := Module} = HandlerConfig0} -> HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0), - try Module:log(Log, HandlerConfig) - catch C:R:S -> - case logger:remove_handler(Id) of - ok -> - logger:internal_log(error, {removed_failing_handler, Id, C, R, S}); - {error,{not_found,_}} -> - %% Probably already removed by other client - %% Don't report again - ok; - {error,Reason} -> - logger:internal_log(error, - {removed_handler_failed, Id, Reason, C, R, S}) - end + try + Module:log(Log, HandlerConfig) + catch + C:R:S -> + case logger:remove_handler(Id) of + ok -> + logger:internal_log( + error, {removed_failing_handler, Id, C, R, S} + ); + {error, {not_found, _}} -> + %% Probably already removed by other client + %% Don't report again + ok; + {error, Reason} -> + logger:internal_log( + error, + {removed_handler_failed, Id, Reason, C, R, S} + ) + end end; - {error, {not_found, Id}} -> ok; - {error, Reason} -> logger:internal_log(error, {find_handle_id_failed, Id, Reason}) + {error, {not_found, Id}} -> + ok; + {error, Reason} -> + logger:internal_log(error, {find_handle_id_failed, Id, Reason}) end end, log_filter(Rest, Log0). --spec(start_link() -> emqx_types:startlink_ret()). +-spec start_link() -> emqx_types:startlink_ret(). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -145,7 +165,8 @@ list(Enable) -> ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}). -spec create([{Key :: binary(), Value :: binary()}] | #{atom() => binary()}) -> -{ok, #?TRACE{}} | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}. + {ok, #?TRACE{}} + | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}. create(Trace) -> case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of true -> @@ -154,8 +175,9 @@ create(Trace) -> {error, Reason} -> {error, Reason} end; false -> - {error, "The number of traces created has reache the maximum" - " please delete the useless ones first"} + {error, + "The number of traces created has reache the maximum" + " please delete the useless ones first"} end. -spec delete(Name :: binary()) -> ok | {error, not_found}. @@ -165,7 +187,7 @@ delete(Name) -> [_] -> mnesia:delete(?TRACE, Name, write); [] -> mnesia:abort(not_found) end - end, + end, transaction(Tran). -spec clear() -> ok | {error, Reason :: term()}. @@ -180,15 +202,17 @@ clear() -> update(Name, Enable) -> Tran = fun() -> case mnesia:read(?TRACE, Name) of - [] -> mnesia:abort(not_found); - [#?TRACE{enable = Enable}] -> ok; + [] -> + mnesia:abort(not_found); + [#?TRACE{enable = Enable}] -> + ok; [Rec] -> case erlang:system_time(second) >= Rec#?TRACE.end_at of false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write); true -> mnesia:abort(finished) end end - end, + end, transaction(Tran). check() -> @@ -201,12 +225,13 @@ get_trace_filename(Name) -> case mnesia:read(?TRACE, Name, read) of [] -> mnesia:abort(not_found); [#?TRACE{start_at = Start}] -> {ok, filename(Name, Start)} - end end, + end + end, transaction(Tran). -spec trace_file(File :: file:filename_all()) -> - {ok, Node :: list(), Binary :: binary()} | - {error, Node :: list(), Reason :: term()}. + {ok, Node :: list(), Binary :: binary()} + | {error, Node :: list(), Reason :: term()}. trace_file(File) -> FileName = filename:join(trace_dir(), File), Node = atom_to_list(node()), @@ -221,10 +246,13 @@ delete_files_after_send(TraceLog, Zips) -> -spec format(list(#?TRACE{})) -> list(map()). format(Traces) -> Fields = record_info(fields, ?TRACE), - lists:map(fun(Trace0 = #?TRACE{}) -> - [_ | Values] = tuple_to_list(Trace0), - maps:from_list(lists:zip(Fields, Values)) - end, Traces). + lists:map( + fun(Trace0 = #?TRACE{}) -> + [_ | Values] = tuple_to_list(Trace0), + maps:from_list(lists:zip(Fields, Values)) + end, + Traces + ). init([]) -> ok = mria:wait_for_tables([?TRACE]), @@ -253,7 +281,8 @@ handle_cast(Msg, State) -> handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitors}) -> case maps:take(Pid, Monitors) of - error -> {noreply, State}; + error -> + {noreply, State}; {Files, NewMonitors} -> lists:foreach(fun file:delete/1, Files), {noreply, State#{monitors => NewMonitors}} @@ -264,11 +293,9 @@ handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) -> update_trace_handler(), ?tp(update_trace_done, #{}), {noreply, State#{timer => NextTRef}}; - handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) -> emqx_misc:cancel_timer(TRef), handle_info({timeout, TRef, update_trace}, State); - handle_info(Info, State) -> ?SLOG(error, #{unexpected_info => Info}), {noreply, State}. @@ -294,11 +321,13 @@ insert_new_trace(Trace) -> [] -> ok = mnesia:write(?TRACE, Trace, write), {ok, Trace}; - [#?TRACE{name = Name}] -> mnesia:abort({duplicate_condition, Name}) + [#?TRACE{name = Name}] -> + mnesia:abort({duplicate_condition, Name}) end; - [#?TRACE{name = Name}] -> mnesia:abort({already_existed, Name}) + [#?TRACE{name = Name}] -> + mnesia:abort({already_existed, Name}) end - end, + end, transaction(Tran). update_trace(Traces) -> @@ -314,67 +343,91 @@ update_trace(Traces) -> emqx_misc:start_timer(NextTime, update_trace). stop_all_trace_handler() -> - lists:foreach(fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end, - emqx_trace_handler:running()). + lists:foreach( + fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end, + emqx_trace_handler:running() + ). get_enable_trace() -> transaction(fun() -> mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read) - end). + end). find_closest_time(Traces, Now) -> Sec = lists:foldl( - fun(#?TRACE{start_at = Start, end_at = End, enable = true}, Closest) -> - min(closest(End, Now, Closest), closest(Start, Now, Closest)); - (_, Closest) -> Closest - end, 60 * 15, Traces), + fun + (#?TRACE{start_at = Start, end_at = End, enable = true}, Closest) -> + min(closest(End, Now, Closest), closest(Start, Now, Closest)); + (_, Closest) -> + Closest + end, + 60 * 15, + Traces + ), timer:seconds(Sec). closest(Time, Now, Closest) when Now >= Time -> Closest; closest(Time, Now, Closest) -> min(Time - Now, Closest). -disable_finished([]) -> ok; +disable_finished([]) -> + ok; disable_finished(Traces) -> transaction(fun() -> - lists:map(fun(#?TRACE{name = Name}) -> - case mnesia:read(?TRACE, Name, write) of - [] -> ok; - [Trace] -> mnesia:write(?TRACE, Trace#?TRACE{enable = false}, write) - end end, Traces) - end). + lists:map( + fun(#?TRACE{name = Name}) -> + case mnesia:read(?TRACE, Name, write) of + [] -> ok; + [Trace] -> mnesia:write(?TRACE, Trace#?TRACE{enable = false}, write) + end + end, + Traces + ) + end). start_trace(Traces, Started0) -> Started = lists:map(fun(#{name := Name}) -> Name end, Started0), - lists:foldl(fun(#?TRACE{name = Name} = Trace, - {Running, StartedAcc}) -> - case lists:member(Name, StartedAcc) of - true -> {[Name | Running], StartedAcc}; - false -> - case start_trace(Trace) of - ok -> {[Name | Running], [Name | StartedAcc]}; - {error, _Reason} -> {[Name | Running], StartedAcc} - end - end - end, {[], Started}, Traces). + lists:foldl( + fun( + #?TRACE{name = Name} = Trace, + {Running, StartedAcc} + ) -> + case lists:member(Name, StartedAcc) of + true -> + {[Name | Running], StartedAcc}; + false -> + case start_trace(Trace) of + ok -> {[Name | Running], [Name | StartedAcc]}; + {error, _Reason} -> {[Name | Running], StartedAcc} + end + end + end, + {[], Started}, + Traces + ). start_trace(Trace) -> - #?TRACE{name = Name - , type = Type - , filter = Filter - , start_at = Start + #?TRACE{ + name = Name, + type = Type, + filter = Filter, + start_at = Start } = Trace, Who = #{name => Name, type => Type, filter => Filter}, emqx_trace_handler:install(Who, debug, log_file(Name, Start)). stop_trace(Finished, Started) -> - lists:foreach(fun(#{name := Name, type := Type, filter := Filter}) -> - case lists:member(Name, Finished) of - true -> - ?TRACE("API", "trace_stopping", #{Type => Filter}), - emqx_trace_handler:uninstall(Type, Name); - false -> ok - end - end, Started). + lists:foreach( + fun(#{name := Name, type := Type, filter := Filter}) -> + case lists:member(Name, Finished) of + true -> + ?TRACE("API", "trace_stopping", #{Type => Filter}), + emqx_trace_handler:uninstall(Type, Name); + false -> + ok + end + end, + Started + ). clean_stale_trace_files() -> TraceDir = trace_dir(), @@ -383,30 +436,44 @@ clean_stale_trace_files() -> FileFun = fun(#?TRACE{name = Name, start_at = StartAt}) -> filename(Name, StartAt) end, KeepFiles = lists:map(FileFun, list()), case AllFiles -- ["zip" | KeepFiles] of - [] -> ok; + [] -> + ok; DeleteFiles -> DelFun = fun(F) -> file:delete(filename:join(TraceDir, F)) end, lists:foreach(DelFun, DeleteFiles) end; - _ -> ok + _ -> + ok end. classify_by_time(Traces, Now) -> classify_by_time(Traces, Now, [], [], []). -classify_by_time([], _Now, Wait, Run, Finish) -> {Wait, Run, Finish}; -classify_by_time([Trace = #?TRACE{start_at = Start} | Traces], - Now, Wait, Run, Finish) when Start > Now -> +classify_by_time([], _Now, Wait, Run, Finish) -> + {Wait, Run, Finish}; +classify_by_time( + [Trace = #?TRACE{start_at = Start} | Traces], + Now, + Wait, + Run, + Finish +) when Start > Now -> classify_by_time(Traces, Now, [Trace | Wait], Run, Finish); -classify_by_time([Trace = #?TRACE{end_at = End} | Traces], - Now, Wait, Run, Finish) when End =< Now -> +classify_by_time( + [Trace = #?TRACE{end_at = End} | Traces], + Now, + Wait, + Run, + Finish +) when End =< Now -> classify_by_time(Traces, Now, Wait, Run, [Trace | Finish]); classify_by_time([Trace | Traces], Now, Wait, Run, Finish) -> classify_by_time(Traces, Now, Wait, [Trace | Run], Finish). to_trace(TraceParam) -> case to_trace(ensure_map(TraceParam), #?TRACE{}) of - {error, Reason} -> {error, Reason}; + {error, Reason} -> + {error, Reason}; {ok, #?TRACE{name = undefined}} -> {error, "name required"}; {ok, #?TRACE{type = undefined}} -> @@ -421,21 +488,31 @@ to_trace(TraceParam) -> end. ensure_map(#{} = Trace) -> - maps:fold(fun(K, V, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V}; - (K, V, Acc) when is_atom(K) -> Acc#{K => V} - end, #{}, Trace); + maps:fold( + fun + (K, V, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V}; + (K, V, Acc) when is_atom(K) -> Acc#{K => V} + end, + #{}, + Trace + ); ensure_map(Trace) when is_list(Trace) -> lists:foldl( - fun({K, V}, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V}; + fun + ({K, V}, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V}; ({K, V}, Acc) when is_atom(K) -> Acc#{K => V}; (_, Acc) -> Acc - end, #{}, Trace). + end, + #{}, + Trace + ). fill_default(Trace = #?TRACE{start_at = undefined}) -> fill_default(Trace#?TRACE{start_at = erlang:system_time(second)}); fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) -> fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60}); -fill_default(Trace) -> Trace. +fill_default(Trace) -> + Trace. -define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$"). @@ -452,16 +529,19 @@ to_trace(#{type := topic, topic := Filter} = Trace, Rec) -> ok -> Trace0 = maps:without([type, topic], Trace), to_trace(Trace0, Rec#?TRACE{type = topic, filter = Filter}); - Error -> Error + Error -> + Error end; to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) -> case validate_ip_address(Filter) of ok -> Trace0 = maps:without([type, ip_address], Trace), to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = binary_to_list(Filter)}); - Error -> Error + Error -> + Error end; -to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])}; +to_trace(#{type := Type}, _Rec) -> + {error, io_lib:format("required ~s field", [Type])}; to_trace(#{start_at := StartAt} = Trace, Rec) -> {ok, Sec} = to_system_second(StartAt), to_trace(maps:remove(start_at, Trace), Rec#?TRACE{start_at = Sec}); @@ -473,7 +553,8 @@ to_trace(#{end_at := EndAt} = Trace, Rec) -> {ok, _Sec} -> {error, "end_at time has already passed"} end; -to_trace(_, Rec) -> {ok, Rec}. +to_trace(_, Rec) -> + {ok, Rec}. validate_topic(TopicName) -> try emqx_topic:validate(filter, TopicName) of @@ -513,11 +594,22 @@ transaction(Tran) -> update_trace_handler() -> case emqx_trace_handler:running() of - [] -> persistent_term:erase(?TRACE_FILTER); + [] -> + persistent_term:erase(?TRACE_FILTER); Running -> - List = lists:map(fun(#{id := Id, filter_fun := FilterFun, - filter := Filter, name := Name}) -> - {Id, FilterFun, Filter, Name} end, Running), + List = lists:map( + fun( + #{ + id := Id, + filter_fun := FilterFun, + filter := Filter, + name := Name + } + ) -> + {Id, FilterFun, Filter, Name} + end, + Running + ), case List =/= persistent_term:get(?TRACE_FILTER, undefined) of true -> persistent_term:put(?TRACE_FILTER, List); false -> ok @@ -525,6 +617,9 @@ update_trace_handler() -> end. filter_cli_handler(Names) -> - lists:filter(fun(Name) -> - nomatch =:= re:run(Name, "^CLI-+.", []) - end, Names). + lists:filter( + fun(Name) -> + nomatch =:= re:run(Name, "^CLI-+.", []) + end, + Names + ). diff --git a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl index b1640e621..33c718b72 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl @@ -23,14 +23,15 @@ -spec format(LogEvent, Config) -> unicode:chardata() when LogEvent :: logger:log_event(), Config :: logger:config(). -format(#{level := debug, meta := Meta = #{trace_tag := Tag}, msg := Msg}, - #{payload_encode := PEncode}) -> +format( + #{level := debug, meta := Meta = #{trace_tag := Tag}, msg := Msg}, + #{payload_encode := PEncode} +) -> Time = calendar:system_time_to_rfc3339(erlang:system_time(second)), ClientId = to_iolist(maps:get(clientid, Meta, "")), Peername = maps:get(peername, Meta, ""), MetaBin = format_meta(Meta, PEncode), [Time, " [", Tag, "] ", ClientId, "@", Peername, " msg: ", Msg, MetaBin, "\n"]; - format(Event, Config) -> emqx_logger_textfmt:format(Event, Config). @@ -72,6 +73,10 @@ to_iolist(SubMap) when is_map(SubMap) -> ["[", map_to_iolist(SubMap), "]"]; to_iolist(Char) -> emqx_logger_textfmt:try_format_unicode(Char). map_to_iolist(Map) -> - lists:join(",", - lists:map(fun({K, V}) -> [to_iolist(K), ": ", to_iolist(V)] end, - maps:to_list(Map))). + lists:join( + ",", + lists:map( + fun({K, V}) -> [to_iolist(K), ": ", to_iolist(V)] end, + maps:to_list(Map) + ) + ). diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index df7d1059e..424fb04ed 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -22,28 +22,30 @@ -logger_header("[Tracer]"). %% APIs --export([ running/0 - , install/3 - , install/4 - , install/5 - , uninstall/1 - , uninstall/2 - ]). +-export([ + running/0, + install/3, + install/4, + install/5, + uninstall/1, + uninstall/2 +]). %% For logger handler filters callbacks --export([ filter_clientid/2 - , filter_topic/2 - , filter_ip_address/2 - ]). +-export([ + filter_clientid/2, + filter_topic/2, + filter_ip_address/2 +]). -export([handler_id/2]). -export([payload_encode/0]). -type tracer() :: #{ - name := binary(), - type := clientid | topic | ip_address, - filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address() - }. + name := binary(), + type := clientid | topic | ip_address, + filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address() +}. -define(CONFIG(_LogFile_), #{ type => halt, @@ -60,19 +62,23 @@ %% APIs %%------------------------------------------------------------------------------ --spec install(Name :: binary() | list(), +-spec install( + Name :: binary() | list(), Type :: clientid | topic | ip_address, Filter :: emqx_types:clientid() | emqx_types:topic() | string(), Level :: logger:level() | all, - LogFilePath :: string()) -> ok | {error, term()}. + LogFilePath :: string() +) -> ok | {error, term()}. install(Name, Type, Filter, Level, LogFile) -> Who = #{type => Type, filter => ensure_bin(Filter), name => ensure_bin(Name)}, install(Who, Level, LogFile). --spec install(Type :: clientid | topic | ip_address, +-spec install( + Type :: clientid | topic | ip_address, Filter :: emqx_types:clientid() | emqx_types:topic() | string(), Level :: logger:level() | all, - LogFilePath :: string()) -> ok | {error, term()}. + LogFilePath :: string() +) -> ok | {error, term()}. install(Type, Filter, Level, LogFile) -> install(Filter, Type, Filter, Level, LogFile). @@ -92,8 +98,10 @@ install(Who = #{name := Name, type := Type}, Level, LogFile) -> show_prompts(Res, Who, "start_trace"), Res. --spec uninstall(Type :: clientid | topic | ip_address, - Name :: binary() | list()) -> ok | {error, term()}. +-spec uninstall( + Type :: clientid | topic | ip_address, + Name :: binary() | list() +) -> ok | {error, term()}. uninstall(Type, Name) -> HandlerId = handler_id(ensure_bin(Name), Type), uninstall(HandlerId). @@ -108,12 +116,12 @@ uninstall(HandlerId) -> -spec running() -> [ #{ - name => binary(), - type => topic | clientid | ip_address, - id => atom(), - filter => emqx_types:topic() | emqx_types:clienetid() | emqx_trace:ip_address(), - level => logger:level(), - dst => file:filename() | console | unknown + name => binary(), + type => topic | clientid | ip_address, + id => atom(), + filter => emqx_types:topic() | emqx_types:clienetid() | emqx_trace:ip_address(), + level => logger:level(), + dst => file:filename() | console | unknown } ]. running() -> @@ -122,17 +130,20 @@ running() -> -spec filter_clientid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop. filter_clientid(#{meta := Meta = #{clientid := ClientId}} = Log, {MatchId, _Name}) -> filter_ret(ClientId =:= MatchId andalso is_trace(Meta), Log); -filter_clientid(_Log, _ExpectId) -> stop. +filter_clientid(_Log, _ExpectId) -> + stop. -spec filter_topic(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop. filter_topic(#{meta := Meta = #{topic := Topic}} = Log, {TopicFilter, _Name}) -> filter_ret(is_trace(Meta) andalso emqx_topic:match(Topic, TopicFilter), Log); -filter_topic(_Log, _ExpectId) -> stop. +filter_topic(_Log, _ExpectId) -> + stop. -spec filter_ip_address(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop. filter_ip_address(#{meta := Meta = #{peername := Peername}} = Log, {IP, _Name}) -> filter_ret(is_trace(Meta) andalso lists:prefix(IP, Peername), Log); -filter_ip_address(_Log, _ExpectId) -> stop. +filter_ip_address(_Log, _ExpectId) -> + stop. -compile({inline, [is_trace/1, filter_ret/2]}). %% TRUE when is_trace is missing. @@ -150,16 +161,14 @@ filters(#{type := ip_address, filter := Filter, name := Name}) -> [{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}]. formatter(#{type := _Type}) -> - {emqx_trace_formatter, - #{ - %% template is for ?SLOG message not ?TRACE. - template => [time," [",level,"] ", msg,"\n"], - single_line => true, - max_size => unlimited, - depth => unlimited, - payload_encode => payload_encode() - } - }. + {emqx_trace_formatter, #{ + %% template is for ?SLOG message not ?TRACE. + template => [time, " [", level, "] ", msg, "\n"], + single_line => true, + max_size => unlimited, + depth => unlimited, + payload_encode => payload_encode() + }}. filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) -> Init = #{id => Id, level => Level, dst => Dst}, @@ -167,7 +176,8 @@ filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) [{Type, {FilterFun, {Filter, Name}}}] when Type =:= topic orelse Type =:= clientid orelse - Type =:= ip_address -> + Type =:= ip_address + -> [Init#{type => Type, filter => Filter, name => Name, filter_fun => FilterFun} | Acc]; _ -> Acc @@ -179,7 +189,7 @@ handler_id(Name, Type) -> try do_handler_id(Name, Type) catch - _ : _ -> + _:_ -> Hash = emqx_misc:bin2hexstr_a_f(crypto:hash(md5, Name)), do_handler_id(Hash, Type) end. diff --git a/apps/emqx/src/proto/emqx_broker_proto_v1.erl b/apps/emqx/src/proto/emqx_broker_proto_v1.erl index 8bf777935..54ab4f2b8 100644 --- a/apps/emqx/src/proto/emqx_broker_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_broker_proto_v1.erl @@ -18,17 +18,18 @@ -behaviour(emqx_bpapi). --export([ introduced_in/0 +-export([ + introduced_in/0, - , forward/3 - , forward_async/3 - , list_client_subscriptions/2 - , list_subscriptions_via_topic/2 + forward/3, + forward_async/3, + list_client_subscriptions/2, + list_subscriptions_via_topic/2, - , start_listener/2 - , stop_listener/2 - , restart_listener/2 - ]). + start_listener/2, + stop_listener/2, + restart_listener/2 +]). -include("bpapi.hrl"). -include("emqx.hrl"). @@ -36,8 +37,9 @@ introduced_in() -> "5.0.0". --spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result() - | emqx_rpc:badrpc(). +-spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> + emqx_types:deliver_result() + | emqx_rpc:badrpc(). forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> emqx_rpc:call(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). @@ -46,8 +48,8 @@ forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). -spec list_client_subscriptions(node(), emqx_types:clientid()) -> - [{emqx_types:topic(), emqx_types:subopts()}] - | emqx_rpc:badrpc(). + [{emqx_types:topic(), emqx_types:subopts()}] + | emqx_rpc:badrpc(). list_client_subscriptions(Node, ClientId) -> rpc:call(Node, emqx_broker, subscriptions, [ClientId]). diff --git a/apps/emqx/src/proto/emqx_cm_proto_v1.erl b/apps/emqx/src/proto/emqx_cm_proto_v1.erl index 3c673bce0..99898bdb2 100644 --- a/apps/emqx/src/proto/emqx_cm_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_cm_proto_v1.erl @@ -18,18 +18,19 @@ -behaviour(emqx_bpapi). --export([ introduced_in/0 +-export([ + introduced_in/0, - , lookup_client/2 - , kickout_client/2 + lookup_client/2, + kickout_client/2, - , get_chan_stats/2 - , get_chan_info/2 - , get_chann_conn_mod/2 + get_chan_stats/2, + get_chan_info/2, + get_chann_conn_mod/2, - , takeover_session/2 - , kick_session/3 - ]). + takeover_session/2, + kick_session/3 +]). -include("bpapi.hrl"). -include("src/emqx_cm.hrl"). @@ -42,7 +43,7 @@ kickout_client(Node, ClientId) -> rpc:call(Node, emqx_cm, kick_session, [ClientId]). -spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) -> - [emqx_cm:channel_info()] | {badrpc, _}. + [emqx_cm:channel_info()] | {badrpc, _}. lookup_client(Node, Key) -> rpc:call(Node, emqx_cm, lookup_client, [Key]). @@ -54,15 +55,16 @@ get_chan_stats(ClientId, ChanPid) -> get_chan_info(ClientId, ChanPid) -> rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2). --spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) -> module() | undefined | {badrpc, _}. +-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) -> + module() | undefined | {badrpc, _}. get_chann_conn_mod(ClientId, ChanPid) -> rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2). -spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) -> - none - | {expired | persistent, emqx_session:session()} - | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()} - | {badrpc, _}. + none + | {expired | persistent, emqx_session:session()} + | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()} + | {badrpc, _}. takeover_session(ClientId, ChanPid) -> rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2). diff --git a/apps/emqx/src/proto/emqx_persistent_session_proto_v1.erl b/apps/emqx/src/proto/emqx_persistent_session_proto_v1.erl index b1baf4334..de497f582 100644 --- a/apps/emqx/src/proto/emqx_persistent_session_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_persistent_session_proto_v1.erl @@ -18,10 +18,11 @@ -behaviour(emqx_bpapi). --export([ introduced_in/0 - , resume_begin/3 - , resume_end/3 - ]). +-export([ + introduced_in/0, + resume_begin/3, + resume_end/3 +]). -include("bpapi.hrl"). -include("emqx.hrl"). @@ -30,11 +31,11 @@ introduced_in() -> "5.0.0". -spec resume_begin([node()], pid(), binary()) -> - emqx_rpc:erpc_multicall([{node(), emqx_guid:guid()}]). + emqx_rpc:erpc_multicall([{node(), emqx_guid:guid()}]). resume_begin(Nodes, Pid, SessionID) when is_pid(Pid), is_binary(SessionID) -> erpc:multicall(Nodes, emqx_session_router, resume_begin, [Pid, SessionID]). -spec resume_end([node()], pid(), binary()) -> - emqx_rpc:erpc_multicall({'ok', [emqx_types:message()]} | {'error', term()}). + emqx_rpc:erpc_multicall({'ok', [emqx_types:message()]} | {'error', term()}). resume_end(Nodes, Pid, SessionID) when is_pid(Pid), is_binary(SessionID) -> erpc:multicall(Nodes, emqx_session_router, resume_end, [Pid, SessionID]). diff --git a/apps/emqx/src/proto/emqx_proto_v1.erl b/apps/emqx/src/proto/emqx_proto_v1.erl index a92561e8c..7d06d3f55 100644 --- a/apps/emqx/src/proto/emqx_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_proto_v1.erl @@ -20,20 +20,21 @@ -include("bpapi.hrl"). --export([ introduced_in/0 +-export([ + introduced_in/0, - , is_running/1 + is_running/1, - , get_alarms/2 - , get_stats/1 - , get_metrics/1 + get_alarms/2, + get_stats/1, + get_metrics/1, - , deactivate_alarm/2 - , delete_all_deactivated_alarms/1 + deactivate_alarm/2, + delete_all_deactivated_alarms/1, - , clean_authz_cache/1 - , clean_authz_cache/2 - ]). + clean_authz_cache/1, + clean_authz_cache/2 +]). introduced_in() -> "5.0.0". @@ -55,9 +56,9 @@ get_metrics(Node) -> rpc:call(Node, emqx_metrics, all, []). -spec clean_authz_cache(node(), emqx_types:clientid()) -> - ok - | {error, not_found} - | {badrpc, _}. + ok + | {error, not_found} + | {badrpc, _}. clean_authz_cache(Node, ClientId) -> rpc:call(Node, emqx_authz_cache, drain_cache, [ClientId]). @@ -66,7 +67,7 @@ clean_authz_cache(Node) -> rpc:call(Node, emqx_authz_cache, drain_cache, []). -spec deactivate_alarm(node(), binary() | atom()) -> - ok | {error, not_found} | {badrpc, _}. + ok | {error, not_found} | {badrpc, _}. deactivate_alarm(Node, Name) -> rpc:call(Node, emqx_alarm, deactivate, [Name]). diff --git a/apps/emqx/test/props/prop_emqx_base62.erl b/apps/emqx/test/props/prop_emqx_base62.erl index e82d2b620..822fcb8e4 100644 --- a/apps/emqx/test/props/prop_emqx_base62.erl +++ b/apps/emqx/test/props/prop_emqx_base62.erl @@ -23,18 +23,24 @@ %%-------------------------------------------------------------------- prop_symmetric() -> - ?FORALL(Data, raw_data(), + ?FORALL( + Data, + raw_data(), begin Encoded = emqx_base62:encode(Data), to_binary(Data) =:= emqx_base62:decode(Encoded) - end). + end + ). prop_size() -> - ?FORALL(Data, binary(), - begin - Encoded = emqx_base62:encode(Data), - base62_size(Data, Encoded) - end). + ?FORALL( + Data, + binary(), + begin + Encoded = emqx_base62:encode(Data), + base62_size(Data, Encoded) + end + ). %%-------------------------------------------------------------------- %% Helpers @@ -59,7 +65,7 @@ base62_size(Data, Encoded) -> RangeEnd = DataSize div 3 * 8, EncodedSize >= RangeStart andalso EncodedSize =< RangeEnd; _Rem -> - RangeStart = DataSize * 8 div 6 + 1, + RangeStart = DataSize * 8 div 6 + 1, RangeEnd = DataSize * 8 div 6 * 2 + 1, EncodedSize >= RangeStart andalso EncodedSize =< RangeEnd end. diff --git a/apps/emqx/test/props/prop_emqx_frame.erl b/apps/emqx/test/props/prop_emqx_frame.erl index 69d470c0e..df50408a8 100644 --- a/apps/emqx/test/props/prop_emqx_frame.erl +++ b/apps/emqx/test/props/prop_emqx_frame.erl @@ -24,24 +24,27 @@ %%-------------------------------------------------------------------- prop_serialize_parse_connect() -> - ?FORALL(Opts = #{version := ProtoVer}, parse_opts(), - begin - ProtoName = proplists:get_value(ProtoVer, ?PROTOCOL_NAMES), - Packet = ?CONNECT_PACKET(#mqtt_packet_connect{ - proto_name = ProtoName, - proto_ver = ProtoVer, - clientid = <<"clientId">>, - will_qos = ?QOS_1, - will_flag = true, - will_retain = true, - will_topic = <<"will">>, - will_props = #{}, - will_payload = <<"bye">>, - clean_start = true, - properties = #{} - }), - Packet =:= parse_serialize(Packet, Opts) - end). + ?FORALL( + Opts = #{version := ProtoVer}, + parse_opts(), + begin + ProtoName = proplists:get_value(ProtoVer, ?PROTOCOL_NAMES), + Packet = ?CONNECT_PACKET(#mqtt_packet_connect{ + proto_name = ProtoName, + proto_ver = ProtoVer, + clientid = <<"clientId">>, + will_qos = ?QOS_1, + will_flag = true, + will_retain = true, + will_topic = <<"will">>, + will_props = #{}, + will_payload = <<"bye">>, + clean_start = true, + properties = #{} + }), + Packet =:= parse_serialize(Packet, Opts) + end + ). %%-------------------------------------------------------------------- %% Helpers @@ -59,4 +62,4 @@ parse_serialize(Packet, Opts) when is_map(Opts) -> %%-------------------------------------------------------------------- parse_opts() -> - ?LET(PropList, [{strict_mode, boolean()}, {version, range(4,5)}], maps:from_list(PropList)). + ?LET(PropList, [{strict_mode, boolean()}, {version, range(4, 5)}], maps:from_list(PropList)). diff --git a/apps/emqx/test/props/prop_emqx_json.erl b/apps/emqx/test/props/prop_emqx_json.erl index 20293ab6b..2320176f5 100644 --- a/apps/emqx/test/props/prop_emqx_json.erl +++ b/apps/emqx/test/props/prop_emqx_json.erl @@ -16,14 +16,17 @@ -module(prop_emqx_json). --import(emqx_json, - [ decode/1 - , decode/2 - , encode/1 - , safe_decode/1 - , safe_decode/2 - , safe_encode/1 - ]). +-import( + emqx_json, + [ + decode/1, + decode/2, + encode/1, + safe_decode/1, + safe_decode/2, + safe_encode/1 + ] +). -include_lib("proper/include/proper.hrl"). @@ -32,99 +35,133 @@ %%-------------------------------------------------------------------- prop_json_basic() -> - ?FORALL(T, json_basic(), - begin - {ok, J} = safe_encode(T), - {ok, T} = safe_decode(J), - T = decode(encode(T)), - true - end). + ?FORALL( + T, + json_basic(), + begin + {ok, J} = safe_encode(T), + {ok, T} = safe_decode(J), + T = decode(encode(T)), + true + end + ). prop_json_basic_atom() -> - ?FORALL(T0, latin_atom(), - begin - T = atom_to_binary(T0, utf8), - {ok, J} = safe_encode(T0), - {ok, T} = safe_decode(J), - T = decode(encode(T0)), - true - end). + ?FORALL( + T0, + latin_atom(), + begin + T = atom_to_binary(T0, utf8), + {ok, J} = safe_encode(T0), + {ok, T} = safe_decode(J), + T = decode(encode(T0)), + true + end + ). prop_object_proplist_to_proplist() -> - ?FORALL(T, json_object(), - begin - {ok, J} = safe_encode(T), - {ok, T} = safe_decode(J), - T = decode(encode(T)), - true - end). + ?FORALL( + T, + json_object(), + begin + {ok, J} = safe_encode(T), + {ok, T} = safe_decode(J), + T = decode(encode(T)), + true + end + ). prop_object_map_to_map() -> - ?FORALL(T, json_object_map(), - begin - {ok, J} = safe_encode(T), - {ok, T} = safe_decode(J, [return_maps]), - T = decode(encode(T), [return_maps]), - true - end). + ?FORALL( + T, + json_object_map(), + begin + {ok, J} = safe_encode(T), + {ok, T} = safe_decode(J, [return_maps]), + T = decode(encode(T), [return_maps]), + true + end + ). %% The duplicated key will be overridden prop_object_proplist_to_map() -> - ?FORALL(T0, json_object(), - begin - T = to_map(T0), - {ok, J} = safe_encode(T0), - {ok, T} = safe_decode(J, [return_maps]), - T = decode(encode(T0), [return_maps]), - true - end). + ?FORALL( + T0, + json_object(), + begin + T = to_map(T0), + {ok, J} = safe_encode(T0), + {ok, T} = safe_decode(J, [return_maps]), + T = decode(encode(T0), [return_maps]), + true + end + ). prop_object_map_to_proplist() -> - ?FORALL(T0, json_object_map(), - begin - %% jiffy encode a map with descending order, that is, - %% it is opposite with maps traversal sequence - %% see: the `to_list` implementation - T = to_list(T0), - {ok, J} = safe_encode(T0), - {ok, T} = safe_decode(J), - T = decode(encode(T0)), - true - end). + ?FORALL( + T0, + json_object_map(), + begin + %% jiffy encode a map with descending order, that is, + %% it is opposite with maps traversal sequence + %% see: the `to_list` implementation + T = to_list(T0), + {ok, J} = safe_encode(T0), + {ok, T} = safe_decode(J), + T = decode(encode(T0)), + true + end + ). prop_safe_encode() -> - ?FORALL(T, invalid_json_term(), - begin - {error, _} = safe_encode(T), true - end). + ?FORALL( + T, + invalid_json_term(), + begin + {error, _} = safe_encode(T), + true + end + ). prop_safe_decode() -> - ?FORALL(T, invalid_json_str(), - begin - {error, _} = safe_decode(T), true - end). + ?FORALL( + T, + invalid_json_str(), + begin + {error, _} = safe_decode(T), + true + end + ). %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- -to_map([{_, _}|_] = L) -> +to_map([{_, _} | _] = L) -> lists:foldl( - fun({Name, Value}, Acc) -> - Acc#{Name => to_map(Value)} - end, #{}, L); + fun({Name, Value}, Acc) -> + Acc#{Name => to_map(Value)} + end, + #{}, + L + ); to_map(L) when is_list(L) -> [to_map(E) || E <- L]; -to_map(T) -> T. +to_map(T) -> + T. to_list(L) when is_list(L) -> [to_list(E) || E <- L]; to_list(M) when is_map(M) -> maps:fold( - fun(K, V, Acc) -> - [{K, to_list(V)}|Acc] - end, [], M); -to_list(T) -> T. + fun(K, V, Acc) -> + [{K, to_list(V)} | Acc] + end, + [], + M + ); +to_list(T) -> + T. %%-------------------------------------------------------------------- %% Generators (https://tools.ietf.org/html/rfc8259) @@ -140,8 +177,14 @@ latin_atom() -> json_string() -> utf8(). json_object() -> - oneof([json_array_1(), json_object_1(), json_array_object_1(), - json_array_2(), json_object_2(), json_array_object_2()]). + oneof([ + json_array_1(), + json_object_1(), + json_array_object_1(), + json_array_2(), + json_object_2(), + json_array_object_2() + ]). json_object_map() -> ?LET(L, json_object(), to_map(L)). @@ -156,9 +199,14 @@ json_object_1() -> list({json_key(), json_basic()}). json_object_2() -> - list({json_key(), oneof([json_basic(), - json_array_1(), - json_object_1()])}). + list({ + json_key(), + oneof([ + json_basic(), + json_array_1(), + json_object_1() + ]) + }). json_array_object_1() -> list(json_object_1()). @@ -186,5 +234,4 @@ chaos(S, 0, _) -> chaos(S, N, T) -> I = rand:uniform(length(S)), {L1, L2} = lists:split(I, S), - chaos(lists:flatten([L1, lists:nth(rand:uniform(length(T)), T), L2]), N-1, T). - + chaos(lists:flatten([L1, lists:nth(rand:uniform(length(T)), T), L2]), N - 1, T). diff --git a/apps/emqx/test/props/prop_emqx_psk.erl b/apps/emqx/test/props/prop_emqx_psk.erl index 2aabe5d77..3900b43cd 100644 --- a/apps/emqx/test/props/prop_emqx_psk.erl +++ b/apps/emqx/test/props/prop_emqx_psk.erl @@ -14,31 +14,36 @@ %% limitations under the License. %%-------------------------------------------------------------------- - -module(prop_emqx_psk). -include_lib("proper/include/proper.hrl"). -define(ALL(Vars, Types, Exprs), - ?SETUP(fun() -> + ?SETUP( + fun() -> State = do_setup(), fun() -> do_teardown(State) end - end, ?FORALL(Vars, Types, Exprs))). + end, + ?FORALL(Vars, Types, Exprs) + ) +). %%-------------------------------------------------------------------- %% Properties %%-------------------------------------------------------------------- prop_lookup() -> - ?ALL({ClientPSKID, UserState}, - {client_pskid(), user_state()}, - begin - case emqx_tls_psk:lookup(psk, ClientPSKID, UserState) of - {ok, _Result} -> true; - error -> true; - _Other -> false - end - end). + ?ALL( + {ClientPSKID, UserState}, + {client_pskid(), user_state()}, + begin + case emqx_tls_psk:lookup(psk, ClientPSKID, UserState) of + {ok, _Result} -> true; + error -> true; + _Other -> false + end + end + ). %%-------------------------------------------------------------------- %% Helper @@ -47,10 +52,13 @@ prop_lookup() -> do_setup() -> ok = emqx_logger:set_log_level(emergency), ok = meck:new(emqx_hooks, [passthrough, no_history]), - ok = meck:expect(emqx_hooks, run_fold, - fun('tls_handshake.psk_lookup', [ClientPSKID], not_found) -> - unicode:characters_to_binary(ClientPSKID) - end). + ok = meck:expect( + emqx_hooks, + run_fold, + fun('tls_handshake.psk_lookup', [ClientPSKID], not_found) -> + unicode:characters_to_binary(ClientPSKID) + end + ). do_teardown(_) -> ok = emqx_logger:set_log_level(error), @@ -63,4 +71,3 @@ do_teardown(_) -> client_pskid() -> oneof([string(), integer(), [1, [-1]]]). user_state() -> term(). - diff --git a/apps/emqx/test/props/prop_emqx_reason_codes.erl b/apps/emqx/test/props/prop_emqx_reason_codes.erl index 8f860fb3b..213a05144 100644 --- a/apps/emqx/test/props/prop_emqx_reason_codes.erl +++ b/apps/emqx/test/props/prop_emqx_reason_codes.erl @@ -24,20 +24,29 @@ %%-------------------------------------------------------------------- prop_name_text() -> - ?FORALL(UnionArgs, union_args(), - is_atom(apply_fun(name, UnionArgs)) andalso - is_binary(apply_fun(text, UnionArgs))). + ?FORALL( + UnionArgs, + union_args(), + is_atom(apply_fun(name, UnionArgs)) andalso + is_binary(apply_fun(text, UnionArgs)) + ). prop_compat() -> - ?FORALL(CompatArgs, compat_args(), - begin - Result = apply_fun(compat, CompatArgs), - is_number(Result) orelse Result =:= undefined - end). + ?FORALL( + CompatArgs, + compat_args(), + begin + Result = apply_fun(compat, CompatArgs), + is_number(Result) orelse Result =:= undefined + end + ). prop_connack_error() -> - ?FORALL(CONNACK_ERROR_ARGS, connack_error_args(), - is_integer(apply_fun(connack_error, CONNACK_ERROR_ARGS))). + ?FORALL( + CONNACK_ERROR_ARGS, + connack_error_args(), + is_integer(apply_fun(connack_error, CONNACK_ERROR_ARGS)) + ). %%-------------------------------------------------------------------- %% Helper @@ -51,45 +60,60 @@ apply_fun(Fun, Args) -> %%-------------------------------------------------------------------- union_args() -> - frequency([{6, [real_mqttv3_rc(), mqttv3_version()]}, - {43, [real_mqttv5_rc(), mqttv5_version()]}]). + frequency([ + {6, [real_mqttv3_rc(), mqttv3_version()]}, + {43, [real_mqttv5_rc(), mqttv5_version()]} + ]). compat_args() -> - frequency([{18, [connack, compat_rc()]}, - {2, [suback, compat_rc()]}, - {1, [unsuback, compat_rc()]}]). + frequency([ + {18, [connack, compat_rc()]}, + {2, [suback, compat_rc()]}, + {1, [unsuback, compat_rc()]} + ]). connack_error_args() -> - [frequency([{10, connack_error()}, - {1, unexpected_connack_error()}])]. + [ + frequency([ + {10, connack_error()}, + {1, unexpected_connack_error()} + ]) + ]. connack_error() -> - oneof([client_identifier_not_valid, - bad_username_or_password, - bad_clientid_or_password, - username_or_password_undefined, - password_error, - not_authorized, - server_unavailable, - server_busy, - banned, - bad_authentication_method]). + oneof([ + client_identifier_not_valid, + bad_username_or_password, + bad_clientid_or_password, + username_or_password_undefined, + password_error, + not_authorized, + server_unavailable, + server_busy, + banned, + bad_authentication_method + ]). unexpected_connack_error() -> oneof([who_knows]). - real_mqttv3_rc() -> - frequency([{6, mqttv3_rc()}, - {1, unexpected_rc()}]). + frequency([ + {6, mqttv3_rc()}, + {1, unexpected_rc()} + ]). real_mqttv5_rc() -> - frequency([{43, mqttv5_rc()}, - {2, unexpected_rc()}]). + frequency([ + {43, mqttv5_rc()}, + {2, unexpected_rc()} + ]). compat_rc() -> - frequency([{95, ?SUCHTHAT(RC , mqttv5_rc(), RC >= 16#80 orelse RC =< 2)}, - {5, unexpected_rc()}]). + frequency([ + {95, ?SUCHTHAT(RC, mqttv5_rc(), RC >= 16#80 orelse RC =< 2)}, + {5, unexpected_rc()} + ]). mqttv3_rc() -> oneof(mqttv3_rcs()). @@ -104,12 +128,51 @@ mqttv3_rcs() -> [0, 1, 2, 3, 4, 5]. mqttv5_rcs() -> - [16#00, 16#01, 16#02, 16#04, 16#10, 16#11, 16#18, 16#19, - 16#80, 16#81, 16#82, 16#83, 16#84, 16#85, 16#86, 16#87, - 16#88, 16#89, 16#8A, 16#8B, 16#8C, 16#8D, 16#8E, 16#8F, - 16#90, 16#91, 16#92, 16#93, 16#94, 16#95, 16#96, 16#97, - 16#98, 16#99, 16#9A, 16#9B, 16#9C, 16#9D, 16#9E, 16#9F, - 16#A0, 16#A1, 16#A2]. + [ + 16#00, + 16#01, + 16#02, + 16#04, + 16#10, + 16#11, + 16#18, + 16#19, + 16#80, + 16#81, + 16#82, + 16#83, + 16#84, + 16#85, + 16#86, + 16#87, + 16#88, + 16#89, + 16#8A, + 16#8B, + 16#8C, + 16#8D, + 16#8E, + 16#8F, + 16#90, + 16#91, + 16#92, + 16#93, + 16#94, + 16#95, + 16#96, + 16#97, + 16#98, + 16#99, + 16#9A, + 16#9B, + 16#9C, + 16#9D, + 16#9E, + 16#9F, + 16#A0, + 16#A1, + 16#A2 + ]. unexpected_rcs() -> ReasonCodes = mqttv3_rcs() ++ mqttv5_rcs(), diff --git a/apps/emqx/test/props/prop_emqx_rpc.erl b/apps/emqx/test/props/prop_emqx_rpc.erl index fb8447fe1..04a904806 100644 --- a/apps/emqx/test/props/prop_emqx_rpc.erl +++ b/apps/emqx/test/props/prop_emqx_rpc.erl @@ -22,64 +22,84 @@ -define(NODENAME, 'test@127.0.0.1'). -define(ALL(Vars, Types, Exprs), - ?SETUP(fun() -> + ?SETUP( + fun() -> State = do_setup(), fun() -> do_teardown(State) end - end, ?FORALL(Vars, Types, Exprs))). + end, + ?FORALL(Vars, Types, Exprs) + ) +). %%-------------------------------------------------------------------- %% Properties %%-------------------------------------------------------------------- prop_node() -> - ?ALL(Node0, nodename(), - begin - Node = punch(Node0), - ?assert(emqx_rpc:cast(Node, erlang, system_time, [])), - case emqx_rpc:call(Node, erlang, system_time, []) of - {badrpc, _Reason} -> true; - Delivery when is_integer(Delivery) -> true; - _Other -> false - end - end). + ?ALL( + Node0, + nodename(), + begin + Node = punch(Node0), + ?assert(emqx_rpc:cast(Node, erlang, system_time, [])), + case emqx_rpc:call(Node, erlang, system_time, []) of + {badrpc, _Reason} -> true; + Delivery when is_integer(Delivery) -> true; + _Other -> false + end + end + ). prop_node_with_key() -> - ?ALL({Node0, Key}, nodename_with_key(), - begin - Node = punch(Node0), - ?assert(emqx_rpc:cast(Key, Node, erlang, system_time, [])), - case emqx_rpc:call(Key, Node, erlang, system_time, []) of - {badrpc, _Reason} -> true; - Delivery when is_integer(Delivery) -> true; - _Other -> false - end - end). + ?ALL( + {Node0, Key}, + nodename_with_key(), + begin + Node = punch(Node0), + ?assert(emqx_rpc:cast(Key, Node, erlang, system_time, [])), + case emqx_rpc:call(Key, Node, erlang, system_time, []) of + {badrpc, _Reason} -> true; + Delivery when is_integer(Delivery) -> true; + _Other -> false + end + end + ). prop_nodes() -> - ?ALL(Nodes0, nodesname(), - begin - Nodes = punch(Nodes0), - case emqx_rpc:multicall(Nodes, erlang, system_time, []) of - {RealResults, RealBadNodes} - when is_list(RealResults); - is_list(RealBadNodes) -> - true; - _Other -> false - end - end). + ?ALL( + Nodes0, + nodesname(), + begin + Nodes = punch(Nodes0), + case emqx_rpc:multicall(Nodes, erlang, system_time, []) of + {RealResults, RealBadNodes} when + is_list(RealResults); + is_list(RealBadNodes) + -> + true; + _Other -> + false + end + end + ). prop_nodes_with_key() -> - ?ALL({Nodes0, Key}, nodesname_with_key(), - begin - Nodes = punch(Nodes0), - case emqx_rpc:multicall(Key, Nodes, erlang, system_time, []) of - {RealResults, RealBadNodes} - when is_list(RealResults); - is_list(RealBadNodes) -> - true; - _Other -> false - end - end). + ?ALL( + {Nodes0, Key}, + nodesname_with_key(), + begin + Nodes = punch(Nodes0), + case emqx_rpc:multicall(Key, Nodes, erlang, system_time, []) of + {RealResults, RealBadNodes} when + is_list(RealResults); + is_list(RealBadNodes) + -> + true; + _Other -> + false + end + end + ). %%-------------------------------------------------------------------- %% Helper @@ -91,10 +111,13 @@ do_setup() -> {ok, _Apps} = application:ensure_all_started(gen_rpc), ok = application:set_env(gen_rpc, call_receive_timeout, 100), ok = meck:new(gen_rpc, [passthrough, no_history]), - ok = meck:expect(gen_rpc, multicall, - fun(Nodes, Mod, Fun, Args) -> - gen_rpc:multicall(Nodes, Mod, Fun, Args, 100) - end). + ok = meck:expect( + gen_rpc, + multicall, + fun(Nodes, Mod, Fun, Args) -> + gen_rpc:multicall(Nodes, Mod, Fun, Args, 100) + end + ). do_teardown(_) -> ok = net_kernel:stop(), @@ -121,22 +144,25 @@ ensure_distributed_nodename() -> %% Generator %%-------------------------------------------------------------------- - nodename() -> - ?LET({NodePrefix, HostName}, - {node_prefix(), hostname()}, - begin - Node = NodePrefix ++ "@" ++ HostName, - list_to_atom(Node) - end). + ?LET( + {NodePrefix, HostName}, + {node_prefix(), hostname()}, + begin + Node = NodePrefix ++ "@" ++ HostName, + list_to_atom(Node) + end + ). nodename_with_key() -> - ?LET({NodePrefix, HostName, Key}, - {node_prefix(), hostname(), choose(0, 10)}, - begin - Node = NodePrefix ++ "@" ++ HostName, - {list_to_atom(Node), Key} - end). + ?LET( + {NodePrefix, HostName, Key}, + {node_prefix(), hostname(), choose(0, 10)}, + begin + Node = NodePrefix ++ "@" ++ HostName, + {list_to_atom(Node), Key} + end + ). nodesname() -> oneof([list(nodename()), [node()]]). @@ -163,6 +189,7 @@ hostname() -> punch(Nodes) when is_list(Nodes) -> lists:map(fun punch/1, Nodes); punch('nonode@nohost') -> - node(); %% Equal to ?NODENAME + %% Equal to ?NODENAME + node(); punch(GoodBoy) -> GoodBoy. diff --git a/apps/emqx/test/props/prop_emqx_sys.erl b/apps/emqx/test/props/prop_emqx_sys.erl index c65383d81..d67bb18b3 100644 --- a/apps/emqx/test/props/prop_emqx_sys.erl +++ b/apps/emqx/test/props/prop_emqx_sys.erl @@ -18,41 +18,53 @@ -include_lib("proper/include/proper.hrl"). --export([ initial_state/0 - , command/1 - , precondition/2 - , postcondition/3 - , next_state/3 - ]). +-export([ + initial_state/0, + command/1, + precondition/2, + postcondition/3, + next_state/3 +]). --define(mock_modules, - [ emqx_metrics - , emqx_stats - , emqx_broker - , mria_mnesia - , emqx_hooks - ]). +-define(mock_modules, [ + emqx_metrics, + emqx_stats, + emqx_broker, + mria_mnesia, + emqx_hooks +]). -define(ALL(Vars, Types, Exprs), - ?SETUP(fun() -> + ?SETUP( + fun() -> State = do_setup(), fun() -> do_teardown(State) end - end, ?FORALL(Vars, Types, Exprs))). + end, + ?FORALL(Vars, Types, Exprs) + ) +). %%-------------------------------------------------------------------- %% Properties %%-------------------------------------------------------------------- prop_sys() -> - ?ALL(Cmds, commands(?MODULE), + ?ALL( + Cmds, + commands(?MODULE), begin {ok, _Pid} = emqx_sys:start_link(), {History, State, Result} = run_commands(?MODULE, Cmds), ok = emqx_sys:stop(), - ?WHENFAIL(io:format("History: ~p\nState: ~p\nResult: ~p\n", - [History,State,Result]), - aggregate(command_names(Cmds), Result =:= ok)) - end). + ?WHENFAIL( + io:format( + "History: ~p\nState: ~p\nResult: ~p\n", + [History, State, Result] + ), + aggregate(command_names(Cmds), Result =:= ok) + ) + end + ). %%-------------------------------------------------------------------- %% Helpers @@ -62,9 +74,15 @@ do_setup() -> ok = emqx_logger:set_log_level(emergency), emqx_config:put([sys_topics, sys_msg_interval], 60000), emqx_config:put([sys_topics, sys_heartbeat_interval], 30000), - emqx_config:put([sys_topics, sys_event_messages], - #{client_connected => true, client_disconnected => true, - client_subscribed => true, client_unsubscribed => true}), + emqx_config:put( + [sys_topics, sys_event_messages], + #{ + client_connected => true, + client_disconnected => true, + client_subscribed => true, + client_unsubscribed => true + } + ), [mock(Mod) || Mod <- ?mock_modules], ok. @@ -78,10 +96,16 @@ mock(Module) -> do_mock(Module). do_mock(emqx_broker) -> - meck:expect(emqx_broker, publish, - fun(Msg) -> {node(), <<"test">>, Msg} end), - meck:expect(emqx_broker, safe_publish, - fun(Msg) -> {node(), <<"test">>, Msg} end); + meck:expect( + emqx_broker, + publish, + fun(Msg) -> {node(), <<"test">>, Msg} end + ), + meck:expect( + emqx_broker, + safe_publish, + fun(Msg) -> {node(), <<"test">>, Msg} end + ); do_mock(emqx_stats) -> meck:expect(emqx_stats, getstats, fun() -> [0] end); do_mock(mria_mnesia) -> @@ -102,16 +126,17 @@ initial_state() -> %% @doc List of possible commands to run against the system command(_State) -> - oneof([{call, emqx_sys, info, []}, - {call, emqx_sys, version, []}, - {call, emqx_sys, uptime, []}, - {call, emqx_sys, datetime, []}, - {call, emqx_sys, sysdescr, []}, - %------------ unexpected message ----------------------% - {call, emqx_sys, handle_call, [emqx_sys, other, state]}, - {call, emqx_sys, handle_cast, [emqx_sys, other]}, - {call, emqx_sys, handle_info, [info, state]} - ]). + oneof([ + {call, emqx_sys, info, []}, + {call, emqx_sys, version, []}, + {call, emqx_sys, uptime, []}, + {call, emqx_sys, datetime, []}, + {call, emqx_sys, sysdescr, []}, + %------------ unexpected message ----------------------% + {call, emqx_sys, handle_call, [emqx_sys, other, state]}, + {call, emqx_sys, handle_cast, [emqx_sys, other]}, + {call, emqx_sys, handle_info, [info, state]} + ]). precondition(_State, {call, _Mod, _Fun, _Args}) -> true.