style(emqx): reformat emqx application

This commit is contained in:
JianBo He 2022-03-31 17:17:02 +08:00
parent 3f6d78dda0
commit 244b123742
31 changed files with 1809 additions and 1184 deletions

View File

@ -16,8 +16,12 @@
-module(emqx_bpapi).
%% API:
-export([start/0, announce/1, supported_version/1, supported_version/2,
versions_file/1]).
-export([
start/0,
announce/1,
supported_version/1, supported_version/2,
versions_file/1
]).
-export_type([api/0, api_version/0, var_name/0, call/0, rpc/0, bpapi_meta/0]).
@ -31,11 +35,12 @@
-type rpc() :: {_From :: call(), _To :: call()}.
-type bpapi_meta() ::
#{ api := api()
, version := api_version()
, calls := [rpc()]
, casts := [rpc()]
}.
#{
api := api(),
version := api_version(),
calls := [rpc()],
casts := [rpc()]
}.
-include("emqx_bpapi.hrl").
@ -49,11 +54,12 @@
-spec start() -> ok.
start() ->
ok = mria:create_table(?TAB, [ {type, set}
, {storage, ram_copies}
, {attributes, record_info(fields, ?TAB)}
, {rlog_shard, ?COMMON_SHARD}
]),
ok = mria:create_table(?TAB, [
{type, set},
{storage, ram_copies},
{attributes, record_info(fields, ?TAB)},
{rlog_shard, ?COMMON_SHARD}
]),
ok = mria:wait_for_tables([?TAB]),
announce(emqx).
@ -86,25 +92,34 @@ versions_file(App) ->
announce_fun(Data) ->
%% Delete old records, if present:
MS = ets:fun2ms(fun(#?TAB{key = {node(), API}}) ->
{node(), API}
end),
{node(), API}
end),
OldKeys = mnesia:select(?TAB, MS, write),
_ = [mnesia:delete({?TAB, Key})
|| Key <- OldKeys],
_ = [
mnesia:delete({?TAB, Key})
|| Key <- OldKeys
],
%% Insert new records:
_ = [mnesia:write(#?TAB{key = {node(), API}, version = Version})
|| {API, Version} <- Data],
_ = [
mnesia:write(#?TAB{key = {node(), API}, version = Version})
|| {API, Version} <- Data
],
%% Update maximum supported version:
[update_minimum(API) || {API, _} <- Data],
ok.
-spec update_minimum(api()) -> ok.
update_minimum(API) ->
MS = ets:fun2ms(fun(#?TAB{ key = {N, A}
, version = Value
}) when N =/= ?multicall,
A =:= API ->
Value
end),
MS = ets:fun2ms(fun(
#?TAB{
key = {N, A},
version = Value
}
) when
N =/= ?multicall,
A =:= API
->
Value
end),
MinVersion = lists:min(mnesia:select(?TAB, MS)),
mnesia:write(#?TAB{key = {?multicall, API}, version = MinVersion}).

View File

@ -20,9 +20,9 @@
-define(multicall, multicall).
-record(?TAB,
{ key :: {node() | ?multicall, emqx_bpapi:api()}
, version :: emqx_bpapi:api_version()
}).
-record(?TAB, {
key :: {node() | ?multicall, emqx_bpapi:api()},
version :: emqx_bpapi:api_version()
}).
-endif.

View File

@ -26,22 +26,24 @@
-type semantics() :: call | cast.
-record(s,
{ api :: emqx_bpapi:api()
, module :: module()
, version :: emqx_bpapi:api_version() | undefined
, targets = [] :: [{semantics(), emqx_bpapi:call(), emqx_bpapi:call()}]
, errors = [] :: list()
, file
}).
-record(s, {
api :: emqx_bpapi:api(),
module :: module(),
version :: emqx_bpapi:api_version() | undefined,
targets = [] :: [{semantics(), emqx_bpapi:call(), emqx_bpapi:call()}],
errors = [] :: list(),
file
}).
format_error(invalid_name) ->
"BPAPI module name should follow <API>_proto_v<number> pattern";
format_error({invalid_fun, Name, Arity}) ->
io_lib:format("malformed function ~p/~p. "
"BPAPI functions should have exactly one clause "
"and call (emqx_|e)rpc at the top level",
[Name, Arity]).
io_lib:format(
"malformed function ~p/~p. "
"BPAPI functions should have exactly one clause "
"and call (emqx_|e)rpc at the top level",
[Name, Arity]
).
parse_transform(Forms, _Options) ->
log("Original:~n~p", [Forms]),
@ -60,11 +62,11 @@ go({attribute, _, file, {File, _}}, S) ->
go({attribute, Line, module, Mod}, S) ->
case api_and_version(Mod) of
{ok, API, Vsn} -> S#s{api = API, version = Vsn, module = Mod};
error -> push_err(Line, invalid_name, S)
error -> push_err(Line, invalid_name, S)
end;
go({function, _Line, introduced_in, 0, _}, S) ->
go({function, _Line, introduced_in, 0, _}, S) ->
S;
go({function, _Line, deprecated_since, 0, _}, S) ->
go({function, _Line, deprecated_since, 0, _}, S) ->
S;
go({function, Line, Name, Arity, Clauses}, S) ->
analyze_fun(Line, Name, Arity, Clauses, S);
@ -79,26 +81,25 @@ finalize(Forms, S) ->
{Attrs, Funcs} = lists:splitwith(fun is_attribute/1, Forms),
AST = mk_meta_fun(S),
log("Meta fun:~n~p", [AST]),
Attrs ++ [mk_export()] ++ [AST|Funcs].
Attrs ++ [mk_export()] ++ [AST | Funcs].
mk_meta_fun(#s{api = API, version = Vsn, targets = Targets}) ->
Line = 0,
Calls = [{From, To} || {call, From, To} <- Targets],
Casts = [{From, To} || {cast, From, To} <- Targets],
Ret = typerefl_quote:const(Line, #{ api => API
, version => Vsn
, calls => Calls
, casts => Casts
}),
{function, Line, ?META_FUN, _Arity = 0,
[{clause, Line, _Args = [], _Guards = [],
[Ret]}]}.
Ret = typerefl_quote:const(Line, #{
api => API,
version => Vsn,
calls => Calls,
casts => Casts
}),
{function, Line, ?META_FUN, _Arity = 0, [{clause, Line, _Args = [], _Guards = [], [Ret]}]}.
mk_export() ->
{attribute, 0, export, [{?META_FUN, 0}]}.
is_attribute({attribute, _Line, _Attr, _Val}) -> true;
is_attribute(_) -> false.
is_attribute(_) -> false.
%% Extract the target function of the RPC call
analyze_fun(Line, Name, Arity, [{clause, Line, Head, _Guards, Exprs}], S) ->
@ -122,14 +123,17 @@ analyze_exprs(Line, Name, Arity, Head, Exprs, S) ->
-spec extract_outer_args([erl_parse:abstract_form()]) -> [atom()].
extract_outer_args(Abs) ->
lists:map(fun({var, _, Var}) ->
Var;
({match, _, {var, _, Var}, _}) ->
Var;
({match, _, _, {var, _, Var}}) ->
Var
end,
Abs).
lists:map(
fun
({var, _, Var}) ->
Var;
({match, _, {var, _, Var}, _}) ->
Var;
({match, _, _, {var, _, Var}}) ->
Var
end,
Abs
).
-spec extract_target_call(_AST, [_AST]) -> {semantics(), emqx_bpapi:call()}.
extract_target_call(RPCBackend, OuterArgs) ->
@ -166,13 +170,13 @@ extract_mfa(?BACKEND(emqx_cluster_rpc, multicall), [M, F, A, _RequiredNum, _Time
extract_mfa(_, _) ->
error("unrecognized RPC call").
call_or_cast(cast) -> cast;
call_or_cast(cast) -> cast;
call_or_cast(multicast) -> cast;
call_or_cast(multicall) -> call;
call_or_cast(call) -> call.
call_or_cast(call) -> call.
list_to_args({cons, _, {var, _, A}, T}) ->
[A|list_to_args(T)];
[A | list_to_args(T)];
list_to_args({nil, _}) ->
[].
@ -180,10 +184,10 @@ invalid_fun(Line, Name, Arity, S) ->
push_err(Line, {invalid_fun, Name, Arity}, S).
push_err(Line, Err, S = #s{errors = Errs}) ->
S#s{errors = [{Line, Err}|Errs]}.
S#s{errors = [{Line, Err} | Errs]}.
push_target(Target, S = #s{targets = Targets}) ->
S#s{targets = [Target|Targets]}.
S#s{targets = [Target | Targets]}.
-spec api_and_version(module()) -> {ok, emqx_bpapi:api(), emqx_bpapi:version()} | error.
api_and_version(Module) ->

View File

@ -44,10 +44,16 @@ post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->
update_log_handlers(NewHandlers) ->
OldHandlers = application:get_env(kernel, logger, []),
lists:foreach(fun({handler, HandlerId, _Mod, _Conf}) ->
logger:remove_handler(HandlerId)
end, OldHandlers -- NewHandlers),
lists:foreach(fun({handler, HandlerId, Mod, Conf}) ->
logger:add_handler(HandlerId, Mod, Conf)
end, NewHandlers -- OldHandlers),
lists:foreach(
fun({handler, HandlerId, _Mod, _Conf}) ->
logger:remove_handler(HandlerId)
end,
OldHandlers -- NewHandlers
),
lists:foreach(
fun({handler, HandlerId, Mod, Conf}) ->
logger:add_handler(HandlerId, Mod, Conf)
end,
NewHandlers -- OldHandlers
),
application:set_env(kernel, logger, NewHandlers).

View File

@ -21,17 +21,20 @@
%% API
-export([new_create_options/2, create/1, delete/1, consume/2]).
-type create_options() :: #{ module := ?MODULE
, type := emqx_limiter_schema:limiter_type()
, bucket := emqx_limiter_schema:bucket_name()
}.
-type create_options() :: #{
module := ?MODULE,
type := emqx_limiter_schema:limiter_type(),
bucket := emqx_limiter_schema:bucket_name()
}.
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec new_create_options(emqx_limiter_schema:limiter_type(),
emqx_limiter_schema:bucket_name()) -> create_options().
-spec new_create_options(
emqx_limiter_schema:limiter_type(),
emqx_limiter_schema:bucket_name()
) -> create_options().
new_create_options(Type, BucketName) ->
#{module => ?MODULE, type => Type, bucket => BucketName}.

View File

@ -21,52 +21,71 @@
%% @end
%% API
-export([ make_token_bucket_limiter/2, make_ref_limiter/2, check/2
, consume/2, set_retry/2, retry/1, make_infinity_limiter/0
, make_future/1, available/1
]).
-export([
make_token_bucket_limiter/2,
make_ref_limiter/2,
check/2,
consume/2,
set_retry/2,
retry/1,
make_infinity_limiter/0,
make_future/1,
available/1
]).
-export_type([token_bucket_limiter/0]).
%% a token bucket limiter with a limiter server's bucket reference
-type token_bucket_limiter() :: #{ %% the number of tokens currently available
tokens := non_neg_integer()
, rate := decimal()
, capacity := decimal()
, lasttime := millisecond()
%% @see emqx_limiter_schema
, max_retry_time := non_neg_integer()
%% @see emqx_limiter_schema
, failure_strategy := failure_strategy()
, divisible := boolean() %% @see emqx_limiter_schema
, low_water_mark := non_neg_integer() %% @see emqx_limiter_schema
, bucket := bucket() %% the limiter server's bucket
%% retry contenxt
%% undefined meaning no retry context or no need to retry
, retry_ctx => undefined
| retry_context(token_bucket_limiter()) %% the retry context
, atom => any() %% allow to add other keys
}.
%% the number of tokens currently available
-type token_bucket_limiter() :: #{
tokens := non_neg_integer(),
rate := decimal(),
capacity := decimal(),
lasttime := millisecond(),
%% @see emqx_limiter_schema
max_retry_time := non_neg_integer(),
%% @see emqx_limiter_schema
failure_strategy := failure_strategy(),
%% @see emqx_limiter_schema
divisible := boolean(),
%% @see emqx_limiter_schema
low_water_mark := non_neg_integer(),
%% the limiter server's bucket
bucket := bucket(),
%% retry contenxt
%% undefined meaning no retry context or no need to retry
retry_ctx =>
undefined
%% the retry context
| retry_context(token_bucket_limiter()),
%% allow to add other keys
atom => any()
}.
%% a limiter server's bucket reference
-type ref_limiter() :: #{ max_retry_time := non_neg_integer()
, failure_strategy := failure_strategy()
, divisible := boolean()
, low_water_mark := non_neg_integer()
, bucket := bucket()
-type ref_limiter() :: #{
max_retry_time := non_neg_integer(),
failure_strategy := failure_strategy(),
divisible := boolean(),
low_water_mark := non_neg_integer(),
bucket := bucket(),
, retry_ctx => undefined | retry_context(ref_limiter())
, atom => any() %% allow to add other keys
}.
retry_ctx => undefined | retry_context(ref_limiter()),
%% allow to add other keys
atom => any()
}.
-type retry_fun(Limiter) :: fun((pos_integer(), Limiter) -> inner_check_result(Limiter)).
-type acquire_type(Limiter) :: integer() | retry_context(Limiter).
-type retry_context(Limiter) :: #{ continuation := undefined | retry_fun(Limiter)
, diff := non_neg_integer() %% how many tokens are left to obtain
-type retry_context(Limiter) :: #{
continuation := undefined | retry_fun(Limiter),
%% how many tokens are left to obtain
diff := non_neg_integer(),
, need => pos_integer()
, start => millisecond()
}.
need => pos_integer(),
start => millisecond()
}.
-type bucket() :: emqx_limiter_bucket_ref:bucket_ref().
-type limiter() :: token_bucket_limiter() | ref_limiter() | infinity.
@ -77,27 +96,31 @@
-type check_result_pause(Limiter) :: {pause_type(), millisecond(), retry_context(Limiter), Limiter}.
-type result_drop(Limiter) :: {drop, Limiter}.
-type check_result(Limiter) :: check_result_ok(Limiter)
| check_result_pause(Limiter)
| result_drop(Limiter).
-type check_result(Limiter) ::
check_result_ok(Limiter)
| check_result_pause(Limiter)
| result_drop(Limiter).
-type inner_check_result(Limiter) :: check_result_ok(Limiter)
| check_result_pause(Limiter).
-type inner_check_result(Limiter) ::
check_result_ok(Limiter)
| check_result_pause(Limiter).
-type consume_result(Limiter) :: check_result_ok(Limiter)
| result_drop(Limiter).
-type consume_result(Limiter) ::
check_result_ok(Limiter)
| result_drop(Limiter).
-type decimal() :: emqx_limiter_decimal:decimal().
-type failure_strategy() :: emqx_limiter_schema:failure_strategy().
-type limiter_bucket_cfg() :: #{ rate := decimal()
, initial := non_neg_integer()
, low_water_mark := non_neg_integer()
, capacity := decimal()
, divisible := boolean()
, max_retry_time := non_neg_integer()
, failure_strategy := failure_strategy()
}.
-type limiter_bucket_cfg() :: #{
rate := decimal(),
initial := non_neg_integer(),
low_water_mark := non_neg_integer(),
capacity := decimal(),
divisible := boolean(),
max_retry_time := non_neg_integer(),
failure_strategy := failure_strategy()
}.
-type future() :: pos_integer().
@ -113,10 +136,11 @@
%%@doc create a limiter
-spec make_token_bucket_limiter(limiter_bucket_cfg(), bucket()) -> _.
make_token_bucket_limiter(Cfg, Bucket) ->
Cfg#{ tokens => emqx_limiter_server:get_initial_val(Cfg)
, lasttime => ?NOW
, bucket => Bucket
}.
Cfg#{
tokens => emqx_limiter_server:get_initial_val(Cfg),
lasttime => ?NOW,
bucket => Bucket
}.
%%@doc create a limiter server's reference
-spec make_ref_limiter(limiter_bucket_cfg(), bucket()) -> ref_limiter().
@ -130,38 +154,39 @@ make_infinity_limiter() ->
%% @doc request some tokens
%% it will automatically retry when failed until the maximum retry time is reached
%% @end
-spec consume(integer(), Limiter) -> consume_result(Limiter)
when Limiter :: limiter().
-spec consume(integer(), Limiter) -> consume_result(Limiter) when
Limiter :: limiter().
consume(Need, #{max_retry_time := RetryTime} = Limiter) when Need > 0 ->
try_consume(RetryTime, Need, Limiter);
consume(_, Limiter) ->
{ok, Limiter}.
%% @doc try to request the token and return the result without automatically retrying
-spec check(acquire_type(Limiter), Limiter) -> check_result(Limiter)
when Limiter :: limiter().
-spec check(acquire_type(Limiter), Limiter) -> check_result(Limiter) when
Limiter :: limiter().
check(_, infinity) ->
{ok, infinity};
check(Need, Limiter) when is_integer(Need), Need > 0 ->
case do_check(Need, Limiter) of
{ok, _} = Done ->
Done;
{PauseType, Pause, Ctx, Limiter2} ->
{PauseType,
Pause,
Ctx#{start => ?NOW, need => Need}, Limiter2}
{PauseType, Pause, Ctx#{start => ?NOW, need => Need}, Limiter2}
end;
%% check with retry context.
%% when continuation = undefined, the diff will be 0
%% so there is no need to check continuation here
check(#{continuation := Cont,
check(
#{
continuation := Cont,
diff := Diff,
start := Start} = Retry,
#{failure_strategy := Failure,
max_retry_time := RetryTime} = Limiter) when Diff > 0 ->
start := Start
} = Retry,
#{
failure_strategy := Failure,
max_retry_time := RetryTime
} = Limiter
) when Diff > 0 ->
case Cont(Diff, Limiter) of
{ok, _} = Done ->
Done;
@ -175,13 +200,12 @@ check(#{continuation := Cont,
on_failure(Failure, try_restore(Retry2, Limiter2))
end
end;
check(_, Limiter) ->
{ok, Limiter}.
%% @doc pack the retry context into the limiter data
-spec set_retry(retry_context(Limiter), Limiter) -> Limiter
when Limiter :: limiter().
-spec set_retry(retry_context(Limiter), Limiter) -> Limiter when
Limiter :: limiter().
set_retry(Retry, Limiter) ->
Limiter#{retry_ctx => Retry}.
@ -189,7 +213,6 @@ set_retry(Retry, Limiter) ->
-spec retry(Limiter) -> check_result(Limiter) when Limiter :: limiter().
retry(#{retry_ctx := Retry} = Limiter) when is_map(Retry) ->
check(Retry, Limiter#{retry_ctx := undefined});
retry(Limiter) ->
{ok, Limiter}.
@ -202,30 +225,32 @@ make_future(Need) ->
%% @doc get the number of tokens currently available
-spec available(limiter()) -> decimal().
available(#{tokens := Tokens,
rate := Rate,
lasttime := LastTime,
capacity := Capacity,
bucket := Bucket}) ->
available(#{
tokens := Tokens,
rate := Rate,
lasttime := LastTime,
capacity := Capacity,
bucket := Bucket
}) ->
Tokens2 = apply_elapsed_time(Rate, ?NOW - LastTime, Tokens, Capacity),
erlang:min(Tokens2, emqx_limiter_bucket_ref:available(Bucket));
available(#{bucket := Bucket}) ->
emqx_limiter_bucket_ref:available(Bucket);
available(infinity) ->
infinity.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
-spec try_consume(millisecond(),
acquire_type(Limiter),
Limiter) -> consume_result(Limiter) when Limiter :: limiter().
try_consume(LeftTime, Retry, #{failure_strategy := Failure} = Limiter)
when LeftTime =< 0, is_map(Retry) ->
-spec try_consume(
millisecond(),
acquire_type(Limiter),
Limiter
) -> consume_result(Limiter) when Limiter :: limiter().
try_consume(LeftTime, Retry, #{failure_strategy := Failure} = Limiter) when
LeftTime =< 0, is_map(Retry)
->
on_failure(Failure, try_restore(Retry, Limiter));
try_consume(LeftTime, Need, Limiter) when is_integer(Need) ->
case do_check(Need, Limiter) of
{ok, _} = Done ->
@ -234,10 +259,14 @@ try_consume(LeftTime, Need, Limiter) when is_integer(Need) ->
timer:sleep(erlang:min(LeftTime, Pause)),
try_consume(LeftTime - Pause, Ctx#{need => Need}, Limiter2)
end;
try_consume(LeftTime,
#{continuation := Cont,
diff := Diff} = Retry, Limiter) when Diff > 0 ->
try_consume(
LeftTime,
#{
continuation := Cont,
diff := Diff
} = Retry,
Limiter
) when Diff > 0 ->
case Cont(Diff, Limiter) of
{ok, _} = Done ->
Done;
@ -245,64 +274,78 @@ try_consume(LeftTime,
timer:sleep(erlang:min(LeftTime, Pause)),
try_consume(LeftTime - Pause, maps:merge(Retry, Ctx), Limiter2)
end;
try_consume(_, _, Limiter) ->
{ok, Limiter}.
-spec do_check(acquire_type(Limiter), Limiter) -> inner_check_result(Limiter)
when Limiter :: limiter().
-spec do_check(acquire_type(Limiter), Limiter) -> inner_check_result(Limiter) when
Limiter :: limiter().
do_check(Need, #{tokens := Tokens} = Limiter) when Need =< Tokens ->
do_check_with_parent_limiter(Need, Limiter);
do_check(Need, #{tokens := _} = Limiter) ->
do_reset(Need, Limiter);
do_check(Need, #{divisible := Divisible,
bucket := Bucket} = Ref) ->
do_check(
Need,
#{
divisible := Divisible,
bucket := Bucket
} = Ref
) ->
case emqx_limiter_bucket_ref:check(Need, Bucket, Divisible) of
{ok, Tokens} ->
may_return_or_pause(Tokens, Ref);
{PauseType, Rate, Obtained} ->
return_pause(Rate,
PauseType,
fun ?FUNCTION_NAME/2, Need - Obtained, Ref)
return_pause(
Rate,
PauseType,
fun ?FUNCTION_NAME/2,
Need - Obtained,
Ref
)
end.
on_failure(force, Limiter) ->
{ok, Limiter};
on_failure(drop, Limiter) ->
{drop, Limiter};
on_failure(throw, Limiter) ->
Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]),
erlang:throw({rate_check_fail, Message}).
-spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) ->
inner_check_result(token_bucket_limiter()).
do_check_with_parent_limiter(Need,
#{tokens := Tokens,
divisible := Divisible,
bucket := Bucket} = Limiter) ->
inner_check_result(token_bucket_limiter()).
do_check_with_parent_limiter(
Need,
#{
tokens := Tokens,
divisible := Divisible,
bucket := Bucket
} = Limiter
) ->
case emqx_limiter_bucket_ref:check(Need, Bucket, Divisible) of
{ok, RefLeft} ->
Left = sub(Tokens, Need),
may_return_or_pause(erlang:min(RefLeft, Left), Limiter#{tokens := Left});
{PauseType, Rate, Obtained} ->
return_pause(Rate,
PauseType,
fun ?FUNCTION_NAME/2,
Need - Obtained,
Limiter#{tokens := sub(Tokens, Obtained)})
return_pause(
Rate,
PauseType,
fun ?FUNCTION_NAME/2,
Need - Obtained,
Limiter#{tokens := sub(Tokens, Obtained)}
)
end.
-spec do_reset(pos_integer(), token_bucket_limiter()) -> inner_check_result(token_bucket_limiter()).
do_reset(Need,
#{tokens := Tokens,
rate := Rate,
lasttime := LastTime,
divisible := Divisible,
capacity := Capacity} = Limiter) ->
do_reset(
Need,
#{
tokens := Tokens,
rate := Rate,
lasttime := LastTime,
divisible := Divisible,
capacity := Capacity
} = Limiter
) ->
Now = ?NOW,
Tokens2 = apply_elapsed_time(Rate, Now - LastTime, Tokens, Capacity),
@ -312,49 +355,54 @@ do_reset(Need,
do_check_with_parent_limiter(Need, Limiter2);
Available when Divisible andalso Available > 0 ->
%% must be allocated here, because may be Need > Capacity
return_pause(Rate,
partial,
fun do_reset/2,
Need - Available,
Limiter#{tokens := 0, lasttime := Now});
return_pause(
Rate,
partial,
fun do_reset/2,
Need - Available,
Limiter#{tokens := 0, lasttime := Now}
);
_ ->
return_pause(Rate, pause, fun do_reset/2, Need, Limiter)
end.
-spec return_pause(decimal(), pause_type(), retry_fun(Limiter), pos_integer(), Limiter)
-> check_result_pause(Limiter) when Limiter :: limiter().
-spec return_pause(decimal(), pause_type(), retry_fun(Limiter), pos_integer(), Limiter) ->
check_result_pause(Limiter)
when
Limiter :: limiter().
return_pause(infinity, PauseType, Fun, Diff, Limiter) ->
%% workaround when emqx_limiter_server's rate is infinity
{PauseType, ?MINIMUM_PAUSE, make_retry_context(Fun, Diff), Limiter};
return_pause(Rate, PauseType, Fun, Diff, Limiter) ->
Val = erlang:round(Diff * emqx_limiter_schema:minimum_period() / Rate),
Pause = emqx_misc:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE),
{PauseType, Pause, make_retry_context(Fun, Diff), Limiter}.
-spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) ->
retry_context(Limiter) when Limiter :: limiter().
retry_context(Limiter)
when
Limiter :: limiter().
make_retry_context(Fun, Diff) ->
#{continuation => Fun, diff => Diff}.
-spec try_restore(retry_context(Limiter), Limiter) -> Limiter
when Limiter :: limiter().
try_restore(#{need := Need, diff := Diff},
#{tokens := Tokens, capacity := Capacity, bucket := Bucket} = Limiter) ->
-spec try_restore(retry_context(Limiter), Limiter) -> Limiter when
Limiter :: limiter().
try_restore(
#{need := Need, diff := Diff},
#{tokens := Tokens, capacity := Capacity, bucket := Bucket} = Limiter
) ->
Back = Need - Diff,
Tokens2 = erlang:min(Capacity, Back + Tokens),
emqx_limiter_bucket_ref:try_restore(Back, Bucket),
Limiter#{tokens := Tokens2};
try_restore(#{need := Need, diff := Diff}, #{bucket := Bucket} = Limiter) ->
emqx_limiter_bucket_ref:try_restore(Need - Diff, Bucket),
Limiter.
-spec may_return_or_pause(non_neg_integer(), Limiter) -> check_result(Limiter)
when Limiter :: limiter().
-spec may_return_or_pause(non_neg_integer(), Limiter) -> check_result(Limiter) when
Limiter :: limiter().
may_return_or_pause(Left, #{low_water_mark := Mark} = Limiter) when Left >= Mark ->
{ok, Limiter};
may_return_or_pause(_, Limiter) ->
{pause, ?MINIMUM_PAUSE, make_retry_context(undefined, 0), Limiter}.

View File

@ -1,13 +1,14 @@
%% -*- mode: erlang -*-
{application, emqx_limiter,
[{description, "EMQX Hierarchical Limiter"},
{vsn, "1.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_limiter_sup]},
{applications, [kernel,stdlib,emqx]},
{mod, {emqx_limiter_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQX Team <contact@emqx.io>"]},
{links, []}
]}.
{application, emqx_limiter, [
{description, "EMQX Hierarchical Limiter"},
% strict semver, bump manually!
{vsn, "1.0.0"},
{modules, []},
{registered, [emqx_limiter_sup]},
{applications, [kernel, stdlib, emqx]},
{mod, {emqx_limiter_app, []}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQX Team <contact@emqx.io>"]},
{links, []}
]}.

View File

@ -31,13 +31,16 @@
%% top supervisor of the tree.
%% @end
%%--------------------------------------------------------------------
-spec start(StartType :: normal |
{takeover, Node :: node()} |
{failover, Node :: node()},
StartArgs :: term()) ->
{ok, Pid :: pid()} |
{ok, Pid :: pid(), State :: term()} |
{error, Reason :: term()}.
-spec start(
StartType ::
normal
| {takeover, Node :: node()}
| {failover, Node :: node()},
StartArgs :: term()
) ->
{ok, Pid :: pid()}
| {ok, Pid :: pid(), State :: term()}
| {error, Reason :: term()}.
start(_StartType, _StartArgs) ->
{ok, _} = emqx_limiter_sup:start_link().

View File

@ -21,18 +21,25 @@
%% @end
%% API
-export([ new/3, check/3, try_restore/2
, available/1]).
-export([
new/3,
check/3,
try_restore/2,
available/1
]).
-export_type([bucket_ref/0]).
-type infinity_bucket_ref() :: infinity.
-type finite_bucket_ref() :: #{ counter := counters:counters_ref()
, index := index()
, rate := rate()}.
-type finite_bucket_ref() :: #{
counter := counters:counters_ref(),
index := index(),
rate := rate()
}.
-type bucket_ref() :: infinity_bucket_ref()
| finite_bucket_ref().
-type bucket_ref() ::
infinity_bucket_ref()
| finite_bucket_ref().
-type index() :: emqx_limiter_server:index().
-type rate() :: emqx_limiter_decimal:decimal().
@ -41,37 +48,45 @@
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec new(undefined | counters:countres_ref(),
undefined | index(),
rate()) -> bucket_ref().
-spec new(
undefined | counters:countres_ref(),
undefined | index(),
rate()
) -> bucket_ref().
new(undefined, _, _) ->
infinity;
new(Counter, Index, Rate) ->
#{counter => Counter,
index => Index,
rate => Rate}.
#{
counter => Counter,
index => Index,
rate => Rate
}.
%% @doc check tokens
-spec check(pos_integer(), bucket_ref(), Disivisble :: boolean()) ->
HasToken :: {ok, emqx_limiter_decimal:decimal()}
| {check_failure_type(), rate(), pos_integer()}.
HasToken ::
{ok, emqx_limiter_decimal:decimal()}
| {check_failure_type(), rate(), pos_integer()}.
check(_, infinity, _) ->
{ok, infinity};
check(Need,
#{counter := Counter,
check(
Need,
#{
counter := Counter,
index := Index,
rate := Rate},
Divisible)->
rate := Rate
},
Divisible
) ->
RefToken = counters:get(Counter, Index),
if RefToken >= Need ->
if
RefToken >= Need ->
counters:sub(Counter, Index, Need),
{ok, RefToken - Need};
Divisible andalso RefToken > 0 ->
Divisible andalso RefToken > 0 ->
counters:sub(Counter, Index, RefToken),
{partial, Rate, RefToken};
true ->
true ->
{pause, Rate, 0}
end.
@ -93,7 +108,6 @@ try_restore(Inc, #{counter := Counter, index := Index}) ->
-spec available(bucket_ref()) -> emqx_limiter_decimal:decimal().
available(#{counter := Counter, index := Index}) ->
counters:get(Counter, Index);
available(infinity) ->
infinity.

View File

@ -21,21 +21,31 @@
%% @end
%% API
-export([ new/0, new/1, new/2, get_limiter_by_names/2
, add_new/3, update_by_name/3, set_retry_context/2
, check/3, retry/2, get_retry_context/1
, check_list/2, retry_list/2
]).
-export([
new/0, new/1, new/2,
get_limiter_by_names/2,
add_new/3,
update_by_name/3,
set_retry_context/2,
check/3,
retry/2,
get_retry_context/1,
check_list/2,
retry_list/2
]).
-export_type([container/0, check_result/0]).
-type container() :: #{ limiter_type() => undefined | limiter()
%% the retry context of the limiter
, retry_key() => undefined
| retry_context()
| future()
, retry_ctx := undefined | any() %% the retry context of the container
}.
-type container() :: #{
limiter_type() => undefined | limiter(),
%% the retry context of the limiter
retry_key() =>
undefined
| retry_context()
| future(),
%% the retry context of the container
retry_ctx := undefined | any()
}.
-type future() :: pos_integer().
-type limiter_type() :: emqx_limiter_schema:limiter_type().
@ -43,9 +53,10 @@
-type retry_context() :: emqx_htb_limiter:retry_context().
-type bucket_name() :: emqx_limiter_schema:bucket_name().
-type millisecond() :: non_neg_integer().
-type check_result() :: {ok, container()}
| {drop, container()}
| {pause, millisecond(), container()}.
-type check_result() ::
{ok, container()}
| {drop, container()}
| {pause, millisecond(), container()}.
-define(RETRY_KEY(Type), {retry, Type}).
-type retry_key() :: ?RETRY_KEY(limiter_type()).
@ -62,36 +73,43 @@ new() ->
new(Types) ->
new(Types, #{}).
-spec new(list(limiter_type()),
#{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container().
-spec new(
list(limiter_type()),
#{limiter_type() => emqx_limiter_schema:bucket_name()}
) -> container().
new(Types, Names) ->
get_limiter_by_names(Types, Names).
%% @doc generate a container
%% according to the type of limiter and the bucket name configuration of the limiter
%% @end
-spec get_limiter_by_names(list(limiter_type()),
#{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container().
-spec get_limiter_by_names(
list(limiter_type()),
#{limiter_type() => emqx_limiter_schema:bucket_name()}
) -> container().
get_limiter_by_names(Types, BucketNames) ->
Init = fun(Type, Acc) ->
Limiter = emqx_limiter_server:connect(Type, BucketNames),
add_new(Type, Limiter, Acc)
end,
Limiter = emqx_limiter_server:connect(Type, BucketNames),
add_new(Type, Limiter, Acc)
end,
lists:foldl(Init, #{retry_ctx => undefined}, Types).
%% @doc add the specified type of limiter to the container
-spec update_by_name(limiter_type(),
bucket_name() | #{limiter_type() => bucket_name()},
container()) -> container().
-spec update_by_name(
limiter_type(),
bucket_name() | #{limiter_type() => bucket_name()},
container()
) -> container().
update_by_name(Type, Buckets, Container) ->
Limiter = emqx_limiter_server:connect(Type, Buckets),
add_new(Type, Limiter, Container).
-spec add_new(limiter_type(), limiter(), container()) -> container().
add_new(Type, Limiter, Container) ->
Container#{ Type => Limiter
, ?RETRY_KEY(Type) => undefined
}.
Container#{
Type => Limiter,
?RETRY_KEY(Type) => undefined
}.
%% @doc check the specified limiter
-spec check(pos_integer(), limiter_type(), container()) -> check_result().
@ -107,18 +125,21 @@ check_list([{Need, Type} | T], Container) ->
check_list(T, Container#{Type := Limiter2});
{_, PauseMs, Ctx, Limiter2} ->
Fun = fun({FN, FT}, Acc) ->
Future = emqx_htb_limiter:make_future(FN),
Acc#{?RETRY_KEY(FT) := Future}
end,
C2 = lists:foldl(Fun,
Container#{Type := Limiter2,
?RETRY_KEY(Type) := Ctx},
T),
Future = emqx_htb_limiter:make_future(FN),
Acc#{?RETRY_KEY(FT) := Future}
end,
C2 = lists:foldl(
Fun,
Container#{
Type := Limiter2,
?RETRY_KEY(Type) := Ctx
},
T
),
{pause, PauseMs, C2};
{drop, Limiter2} ->
{drop, Container#{Type := Limiter2}}
end;
check_list([], Container) ->
{ok, Container}.
@ -132,24 +153,23 @@ retry(Type, Container) ->
retry_list([Type | T], Container) ->
Key = ?RETRY_KEY(Type),
case Container of
#{Type := Limiter,
Key := Retry} when Retry =/= undefined ->
#{
Type := Limiter,
Key := Retry
} when Retry =/= undefined ->
case emqx_htb_limiter:check(Retry, Limiter) of
{ok, Limiter2} ->
%% undefined meaning there is no retry context or there is no need to retry
%% when a limiter has a undefined retry context, the check will always success
retry_list(T, Container#{Type := Limiter2, Key := undefined});
{_, PauseMs, Ctx, Limiter2} ->
{pause,
PauseMs,
Container#{Type := Limiter2, Key := Ctx}};
{pause, PauseMs, Container#{Type := Limiter2, Key := Ctx}};
{drop, Limiter2} ->
{drop, Container#{Type := Limiter2}}
end;
_ ->
retry_list(T, Container)
end;
retry_list([], Container) ->
{ok, Container}.

View File

@ -17,11 +17,12 @@
-module(emqx_limiter_correction).
%% API
-export([ add/2 ]).
-export([add/2]).
-type correction_value() :: #{ correction := emqx_limiter_decimal:zero_or_float()
, any() => any()
}.
-type correction_value() :: #{
correction := emqx_limiter_decimal:zero_or_float(),
any() => any()
}.
-export_type([correction_value/0]).

View File

@ -19,8 +19,13 @@
-module(emqx_limiter_decimal).
%% API
-export([ add/2, sub/2, mul/2
, put_to_counter/3, floor_div/2]).
-export([
add/2,
sub/2,
mul/2,
put_to_counter/3,
floor_div/2
]).
-export_type([decimal/0, zero_or_float/0]).
-type decimal() :: infinity | number().
@ -30,33 +35,35 @@
%%% API
%%--------------------------------------------------------------------
-spec add(decimal(), decimal()) -> decimal().
add(A, B) when A =:= infinity
orelse B =:= infinity ->
add(A, B) when
A =:= infinity orelse
B =:= infinity
->
infinity;
add(A, B) ->
A + B.
-spec sub(decimal(), decimal()) -> decimal().
sub(A, B) when A =:= infinity
orelse B =:= infinity ->
sub(A, B) when
A =:= infinity orelse
B =:= infinity
->
infinity;
sub(A, B) ->
A - B.
-spec mul(decimal(), decimal()) -> decimal().
mul(A, B) when A =:= infinity
orelse B =:= infinity ->
mul(A, B) when
A =:= infinity orelse
B =:= infinity
->
infinity;
mul(A, B) ->
A * B.
-spec floor_div(decimal(), number()) -> decimal().
floor_div(infinity, _) ->
infinity;
floor_div(A, B) ->
erlang:floor(A / B).

View File

@ -22,14 +22,26 @@
-include_lib("stdlib/include/ms_transform.hrl").
%% API
-export([ start_link/0, start_server/1, find_bucket/1
, find_bucket/2, insert_bucket/2, insert_bucket/3
, make_path/2, restart_server/1
]).
-export([
start_link/0,
start_server/1,
find_bucket/1,
find_bucket/2,
insert_bucket/2, insert_bucket/3,
make_path/2,
restart_server/1
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, format_status/2]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3,
format_status/2
]).
-export_type([path/0]).
@ -38,9 +50,10 @@
-type bucket_name() :: emqx_limiter_schema:bucket_name().
%% counter record in ets table
-record(bucket, { path :: path()
, bucket :: bucket_ref()
}).
-record(bucket, {
path :: path(),
bucket :: bucket_ref()
}).
-type bucket_ref() :: emqx_limiter_bucket_ref:bucket_ref().
@ -58,7 +71,7 @@ restart_server(Type) ->
emqx_limiter_server_sup:restart(Type).
-spec find_bucket(limiter_type(), bucket_name()) ->
{ok, bucket_ref()} | undefined.
{ok, bucket_ref()} | undefined.
find_bucket(Type, BucketName) ->
find_bucket(make_path(Type, BucketName)).
@ -71,13 +84,14 @@ find_bucket(Path) ->
undefined
end.
-spec insert_bucket(limiter_type(),
bucket_name(),
bucket_ref()) -> boolean().
-spec insert_bucket(
limiter_type(),
bucket_name(),
bucket_ref()
) -> boolean().
insert_bucket(Type, BucketName, Bucket) ->
inner_insert_bucket(make_path(Type, BucketName), Bucket).
-spec insert_bucket(path(), bucket_ref()) -> true.
insert_bucket(Path, Bucket) ->
inner_insert_bucket(Path, Bucket).
@ -91,10 +105,11 @@ make_path(Type, BucketName) ->
%% Starts the server
%% @end
%%--------------------------------------------------------------------
-spec start_link() -> {ok, Pid :: pid()} |
{error, Error :: {already_started, pid()}} |
{error, Error :: term()} |
ignore.
-spec start_link() ->
{ok, Pid :: pid()}
| {error, Error :: {already_started, pid()}}
| {error, Error :: term()}
| ignore.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@ -108,16 +123,22 @@ start_link() ->
%% Initializes the server
%% @end
%%--------------------------------------------------------------------
-spec init(Args :: term()) -> {ok, State :: term()} |
{ok, State :: term(), Timeout :: timeout()} |
{ok, State :: term(), hibernate} |
{stop, Reason :: term()} |
ignore.
-spec init(Args :: term()) ->
{ok, State :: term()}
| {ok, State :: term(), Timeout :: timeout()}
| {ok, State :: term(), hibernate}
| {stop, Reason :: term()}
| ignore.
init([]) ->
_ = ets:new(?TAB, [ set, public, named_table, {keypos, #bucket.path}
, {write_concurrency, true}, {read_concurrency, true}
, {heir, erlang:whereis(emqx_limiter_sup), none}
]),
_ = ets:new(?TAB, [
set,
public,
named_table,
{keypos, #bucket.path},
{write_concurrency, true},
{read_concurrency, true},
{heir, erlang:whereis(emqx_limiter_sup), none}
]),
{ok, #{}}.
%%--------------------------------------------------------------------
@ -127,14 +148,14 @@ init([]) ->
%% @end
%%--------------------------------------------------------------------
-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) ->
{reply, Reply :: term(), NewState :: term()} |
{reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} |
{reply, Reply :: term(), NewState :: term(), hibernate} |
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: term()} |
{stop, Reason :: term(), NewState :: term()}.
{reply, Reply :: term(), NewState :: term()}
| {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()}
| {reply, Reply :: term(), NewState :: term(), hibernate}
| {noreply, NewState :: term()}
| {noreply, NewState :: term(), Timeout :: timeout()}
| {noreply, NewState :: term(), hibernate}
| {stop, Reason :: term(), Reply :: term(), NewState :: term()}
| {stop, Reason :: term(), NewState :: term()}.
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignore, State}.
@ -146,10 +167,10 @@ handle_call(Req, _From, State) ->
%% @end
%%--------------------------------------------------------------------
-spec handle_cast(Request :: term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: term(), NewState :: term()}.
{noreply, NewState :: term()}
| {noreply, NewState :: term(), Timeout :: timeout()}
| {noreply, NewState :: term(), hibernate}
| {stop, Reason :: term(), NewState :: term()}.
handle_cast(Req, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
{noreply, State}.
@ -161,10 +182,10 @@ handle_cast(Req, State) ->
%% @end
%%--------------------------------------------------------------------
-spec handle_info(Info :: timeout() | term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: normal | term(), NewState :: term()}.
{noreply, NewState :: term()}
| {noreply, NewState :: term(), Timeout :: timeout()}
| {noreply, NewState :: term(), hibernate}
| {stop, Reason :: normal | term(), NewState :: term()}.
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}.
@ -178,8 +199,10 @@ handle_info(Info, State) ->
%% with Reason. The return value is ignored.
%% @end
%%--------------------------------------------------------------------
-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(),
State :: term()) -> any().
-spec terminate(
Reason :: normal | shutdown | {shutdown, term()} | term(),
State :: term()
) -> any().
terminate(_Reason, _State) ->
ok.
@ -189,10 +212,13 @@ terminate(_Reason, _State) ->
%% Convert process state when code is changed
%% @end
%%--------------------------------------------------------------------
-spec code_change(OldVsn :: term() | {down, term()},
State :: term(),
Extra :: term()) -> {ok, NewState :: term()} |
{error, Reason :: term()}.
-spec code_change(
OldVsn :: term() | {down, term()},
State :: term(),
Extra :: term()
) ->
{ok, NewState :: term()}
| {error, Reason :: term()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -204,8 +230,10 @@ code_change(_OldVsn, State, _Extra) ->
%% or when it appears in termination error logs.
%% @end
%%--------------------------------------------------------------------
-spec format_status(Opt :: normal | terminate,
Status :: list()) -> Status :: term().
-spec format_status(
Opt :: normal | terminate,
Status :: list()
) -> Status :: term().
format_status(_Opt, Status) ->
Status.
@ -214,5 +242,7 @@ format_status(_Opt, Status) ->
%%--------------------------------------------------------------------
-spec inner_insert_bucket(path(), bucket_ref()) -> true.
inner_insert_bucket(Path, Bucket) ->
ets:insert(?TAB,
#bucket{path = Path, bucket = Bucket}).
ets:insert(
?TAB,
#bucket{path = Path, bucket = Bucket}
).

View File

@ -18,43 +18,60 @@
-include_lib("typerefl/include/types.hrl").
-export([ roots/0, fields/1, to_rate/1, to_capacity/1
, minimum_period/0, to_burst_rate/1, to_initial/1
, namespace/0, get_bucket_cfg_path/2, desc/1
]).
-export([
roots/0,
fields/1,
to_rate/1,
to_capacity/1,
minimum_period/0,
to_burst_rate/1,
to_initial/1,
namespace/0,
get_bucket_cfg_path/2,
desc/1
]).
-define(KILOBYTE, 1024).
-type limiter_type() :: bytes_in
| message_in
| connection
| message_routing
| batch.
-type limiter_type() ::
bytes_in
| message_in
| connection
| message_routing
| batch.
-type bucket_name() :: atom().
-type rate() :: infinity | float().
-type burst_rate() :: 0 | float().
-type capacity() :: infinity | number(). %% the capacity of the token bucket
-type initial() :: non_neg_integer(). %% initial capacity of the token bucket
%% the capacity of the token bucket
-type capacity() :: infinity | number().
%% initial capacity of the token bucket
-type initial() :: non_neg_integer().
-type bucket_path() :: list(atom()).
%% the processing strategy after the failure of the token request
-type failure_strategy() :: force %% Forced to pass
| drop %% discard the current request
| throw. %% throw an exception
%% Forced to pass
-type failure_strategy() ::
force
%% discard the current request
| drop
%% throw an exception
| throw.
-typerefl_from_string({rate/0, ?MODULE, to_rate}).
-typerefl_from_string({burst_rate/0, ?MODULE, to_burst_rate}).
-typerefl_from_string({capacity/0, ?MODULE, to_capacity}).
-typerefl_from_string({initial/0, ?MODULE, to_initial}).
-reflect_type([ rate/0
, burst_rate/0
, capacity/0
, initial/0
, failure_strategy/0
, bucket_name/0
]).
-reflect_type([
rate/0,
burst_rate/0,
capacity/0,
initial/0,
failure_strategy/0,
bucket_name/0
]).
-export_type([limiter_type/0, bucket_path/0]).
@ -66,87 +83,154 @@ namespace() -> limiter.
roots() -> [limiter].
fields(limiter) ->
[ {bytes_in, sc(ref(limiter_opts),
#{description =>
<<"The bytes_in limiter.<br>"
[
{bytes_in,
sc(
ref(limiter_opts),
#{
description =>
<<
"The bytes_in limiter.<br>"
"It is used to limit the inbound bytes rate for this EMQX node."
"If the this limiter limit is reached,"
"the restricted client will be slow down even be hung for a while.">>
})}
, {message_in, sc(ref(limiter_opts),
#{description =>
<<"The message_in limiter.<br>"
"This is used to limit the inbound message numbers for this EMQX node"
"If the this limiter limit is reached,"
"the restricted client will be slow down even be hung for a while.">>
})}
, {connection, sc(ref(limiter_opts),
#{description =>
<<"The connection limiter.<br>"
"This is used to limit the connection rate for this EMQX node"
"If the this limiter limit is reached,"
"New connections will be refused"
>>})}
, {message_routing, sc(ref(limiter_opts),
#{description =>
<<"The message_routing limiter.<br>"
"This is used to limite the deliver rate for this EMQX node"
"If the this limiter limit is reached,"
"New publish will be refused"
>>
})}
, {batch, sc(ref(limiter_opts),
#{description => <<"The batch limiter.<br>"
"This is used for EMQX internal batch operation"
"e.g. limite the retainer's deliver rate"
>>
})}
"the restricted client will be slow down even be hung for a while."
>>
}
)},
{message_in,
sc(
ref(limiter_opts),
#{
description =>
<<
"The message_in limiter.<br>"
"This is used to limit the inbound message numbers for this EMQX node"
"If the this limiter limit is reached,"
"the restricted client will be slow down even be hung for a while."
>>
}
)},
{connection,
sc(
ref(limiter_opts),
#{
description =>
<<
"The connection limiter.<br>"
"This is used to limit the connection rate for this EMQX node"
"If the this limiter limit is reached,"
"New connections will be refused"
>>
}
)},
{message_routing,
sc(
ref(limiter_opts),
#{
description =>
<<
"The message_routing limiter.<br>"
"This is used to limite the deliver rate for this EMQX node"
"If the this limiter limit is reached,"
"New publish will be refused"
>>
}
)},
{batch,
sc(
ref(limiter_opts),
#{
description => <<
"The batch limiter.<br>"
"This is used for EMQX internal batch operation"
"e.g. limite the retainer's deliver rate"
>>
}
)}
];
fields(limiter_opts) ->
[ {rate, sc(rate(), #{default => "infinity", desc => "The rate"})}
, {burst, sc(burst_rate(),
#{default => "0/0s",
desc => "The burst, This value is based on rate.<br/>
This value + rate = the maximum limit that can be achieved when limiter burst."
})}
, {bucket, sc(map("bucket name", ref(bucket_opts)), #{desc => "Buckets config"})}
[
{rate, sc(rate(), #{default => "infinity", desc => "The rate"})},
{burst,
sc(
burst_rate(),
#{
default => "0/0s",
desc =>
"The burst, This value is based on rate.<br/>\n"
" This value + rate = the maximum limit that can be achieved when limiter burst."
}
)},
{bucket, sc(map("bucket name", ref(bucket_opts)), #{desc => "Buckets config"})}
];
fields(bucket_opts) ->
[ {rate, sc(rate(), #{desc => "Rate for this bucket."})}
, {capacity, sc(capacity(), #{desc => "The maximum number of tokens for this bucket."})}
, {initial, sc(initial(), #{default => "0",
desc => "The initial number of tokens for this bucket."})}
, {per_client, sc(ref(client_bucket),
#{default => #{},
desc => "The rate limit for each user of the bucket,"
[
{rate, sc(rate(), #{desc => "Rate for this bucket."})},
{capacity, sc(capacity(), #{desc => "The maximum number of tokens for this bucket."})},
{initial,
sc(initial(), #{
default => "0",
desc => "The initial number of tokens for this bucket."
})},
{per_client,
sc(
ref(client_bucket),
#{
default => #{},
desc =>
"The rate limit for each user of the bucket,"
" this field is not required"
})}
}
)}
];
fields(client_bucket) ->
[ {rate, sc(rate(), #{default => "infinity", desc => "Rate for this bucket."})}
, {initial, sc(initial(), #{default => "0", desc => "The initial number of tokens for this bucket."})}
%% low_water_mark add for emqx_channel and emqx_session
%% both modules consume first and then check
%% so we need to use this value to prevent excessive consumption
%% (e.g, consumption from an empty bucket)
, {low_water_mark, sc(initial(),
#{desc => "If the remaining tokens are lower than this value,
the check/consume will succeed, but it will be forced to wait for a short period of time.",
default => "0"})}
, {capacity, sc(capacity(), #{desc => "The capacity of the token bucket.",
default => "infinity"})}
, {divisible, sc(boolean(),
#{desc => "Is it possible to split the number of requested tokens?",
default => false})}
, {max_retry_time, sc(emqx_schema:duration(),
#{ desc => "The maximum retry time when acquire failed."
, default => "10s"})}
, {failure_strategy, sc(failure_strategy(),
#{ desc => "The strategy when all the retries failed."
, default => force})}
[
{rate, sc(rate(), #{default => "infinity", desc => "Rate for this bucket."})},
{initial,
sc(initial(), #{default => "0", desc => "The initial number of tokens for this bucket."})},
%% low_water_mark add for emqx_channel and emqx_session
%% both modules consume first and then check
%% so we need to use this value to prevent excessive consumption
%% (e.g, consumption from an empty bucket)
{low_water_mark,
sc(
initial(),
#{
desc =>
"If the remaining tokens are lower than this value,\n"
"the check/consume will succeed, but it will be forced to wait for a short period of time.",
default => "0"
}
)},
{capacity,
sc(capacity(), #{
desc => "The capacity of the token bucket.",
default => "infinity"
})},
{divisible,
sc(
boolean(),
#{
desc => "Is it possible to split the number of requested tokens?",
default => false
}
)},
{max_retry_time,
sc(
emqx_schema:duration(),
#{
desc => "The maximum retry time when acquire failed.",
default => "10s"
}
)},
{failure_strategy,
sc(
failure_strategy(),
#{
desc => "The strategy when all the retries failed.",
default => force
}
)}
].
desc(limiter) ->
@ -184,33 +268,40 @@ to_rate(Str, CanInfinity, CanZero) ->
case Tokens of
["infinity"] when CanInfinity ->
{ok, infinity};
[QuotaStr] -> %% if time unit is 1s, it can be omitted
%% if time unit is 1s, it can be omitted
[QuotaStr] ->
{ok, Val} = to_capacity(QuotaStr),
check_capacity(Str, Val, CanZero,
fun(Quota) ->
{ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS}
end);
check_capacity(
Str,
Val,
CanZero,
fun(Quota) ->
{ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS}
end
);
[QuotaStr, Interval] ->
{ok, Val} = to_capacity(QuotaStr),
check_capacity(Str, Val, CanZero,
fun(Quota) ->
case emqx_schema:to_duration_ms(Interval) of
{ok, Ms} when Ms > 0 ->
{ok, Quota * minimum_period() / Ms};
_ ->
{error, Str}
end
end);
check_capacity(
Str,
Val,
CanZero,
fun(Quota) ->
case emqx_schema:to_duration_ms(Interval) of
{ok, Ms} when Ms > 0 ->
{ok, Quota * minimum_period() / Ms};
_ ->
{error, Str}
end
end
);
_ ->
{error, Str}
end.
check_capacity(_Str, 0, true, _Cont) ->
{ok, 0};
check_capacity(Str, 0, false, _Cont) ->
{error, Str};
check_capacity(_Str, Quota, _CanZero, Cont) ->
Cont(Quota).

View File

@ -30,34 +30,53 @@
-include_lib("emqx/include/logger.hrl").
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, format_status/2]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3,
format_status/2
]).
-export([ start_link/1, connect/2, info/1
, name/1, get_initial_val/1, update_config/1
]).
-export([
start_link/1,
connect/2,
info/1,
name/1,
get_initial_val/1,
update_config/1
]).
-type root() :: #{ rate := rate() %% number of tokens generated per period
, burst := rate()
, period := pos_integer() %% token generation interval(second)
, consumed := non_neg_integer()
}.
%% number of tokens generated per period
-type root() :: #{
rate := rate(),
burst := rate(),
%% token generation interval(second)
period := pos_integer(),
consumed := non_neg_integer()
}.
-type bucket() :: #{ name := bucket_name()
, rate := rate()
, obtained := non_neg_integer()
, correction := emqx_limiter_decimal:zero_or_float() %% token correction value
, capacity := capacity()
, counter := undefined | counters:counters_ref()
, index := undefined | index()
}.
-type bucket() :: #{
name := bucket_name(),
rate := rate(),
obtained := non_neg_integer(),
%% token correction value
correction := emqx_limiter_decimal:zero_or_float(),
capacity := capacity(),
counter := undefined | counters:counters_ref(),
index := undefined | index()
}.
-type state() :: #{ type := limiter_type()
, root := undefined | root()
, buckets := buckets()
, counter := undefined | counters:counters_ref() %% current counter to alloc
, index := index()
}.
-type state() :: #{
type := limiter_type(),
root := undefined | root(),
buckets := buckets(),
%% current counter to alloc
counter := undefined | counters:counters_ref(),
index := index()
}.
-type buckets() :: #{bucket_name() => bucket()}.
-type limiter_type() :: emqx_limiter_schema:limiter_type().
@ -69,7 +88,8 @@
-type index() :: pos_integer().
-define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)).
-define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter
%% minimum coefficient for overloaded limiter
-define(OVERLOAD_MIN_ALLOC, 0.3).
-define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end).
-export_type([index/0]).
@ -80,29 +100,33 @@
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec connect(limiter_type(),
bucket_name() | #{limiter_type() => bucket_name() | undefined}) ->
emqx_htb_limiter:limiter().
-spec connect(
limiter_type(),
bucket_name() | #{limiter_type() => bucket_name() | undefined}
) ->
emqx_htb_limiter:limiter().
%% If no bucket path is set in config, there will be no limit
connect(_Type, undefined) ->
emqx_htb_limiter:make_infinity_limiter();
connect(Type, BucketName) when is_atom(BucketName) ->
CfgPath = emqx_limiter_schema:get_bucket_cfg_path(Type, BucketName),
case emqx:get_config(CfgPath, undefined) of
undefined ->
?SLOG(error, #{msg => "bucket_config_not_found", path => CfgPath}),
throw("bucket's config not found");
#{rate := AggrRate,
capacity := AggrSize,
per_client := #{rate := CliRate, capacity := CliSize} = Cfg} ->
#{
rate := AggrRate,
capacity := AggrSize,
per_client := #{rate := CliRate, capacity := CliSize} = Cfg
} ->
case emqx_limiter_manager:find_bucket(Type, BucketName) of
{ok, Bucket} ->
if CliRate < AggrRate orelse CliSize < AggrSize ->
if
CliRate < AggrRate orelse CliSize < AggrSize ->
emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
Bucket =:= infinity ->
Bucket =:= infinity ->
emqx_htb_limiter:make_infinity_limiter();
true ->
true ->
emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
end;
undefined ->
@ -110,7 +134,6 @@ connect(Type, BucketName) when is_atom(BucketName) ->
throw("invalid bucket")
end
end;
connect(Type, Paths) ->
connect(Type, maps:get(Type, Paths, undefined)).
@ -135,7 +158,6 @@ update_config(Type) ->
start_link(Type) ->
gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []).
%%--------------------------------------------------------------------
%%% gen_server callbacks
%%--------------------------------------------------------------------
@ -146,11 +168,12 @@ start_link(Type) ->
%% Initializes the server
%% @end
%%--------------------------------------------------------------------
-spec init(Args :: term()) -> {ok, State :: term()} |
{ok, State :: term(), Timeout :: timeout()} |
{ok, State :: term(), hibernate} |
{stop, Reason :: term()} |
ignore.
-spec init(Args :: term()) ->
{ok, State :: term()}
| {ok, State :: term(), Timeout :: timeout()}
| {ok, State :: term(), hibernate}
| {stop, Reason :: term()}
| ignore.
init([Type]) ->
State = init_tree(Type),
#{root := #{period := Perido}} = State,
@ -164,21 +187,19 @@ init([Type]) ->
%% @end
%%--------------------------------------------------------------------
-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) ->
{reply, Reply :: term(), NewState :: term()} |
{reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} |
{reply, Reply :: term(), NewState :: term(), hibernate} |
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: term()} |
{stop, Reason :: term(), NewState :: term()}.
{reply, Reply :: term(), NewState :: term()}
| {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()}
| {reply, Reply :: term(), NewState :: term(), hibernate}
| {noreply, NewState :: term()}
| {noreply, NewState :: term(), Timeout :: timeout()}
| {noreply, NewState :: term(), hibernate}
| {stop, Reason :: term(), Reply :: term(), NewState :: term()}
| {stop, Reason :: term(), NewState :: term()}.
handle_call(info, _From, State) ->
{reply, State, State};
handle_call(update_config, _From, #{type := Type}) ->
NewState = init_tree(Type),
{reply, ok, NewState};
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}.
@ -190,10 +211,10 @@ handle_call(Req, _From, State) ->
%% @end
%%--------------------------------------------------------------------
-spec handle_cast(Request :: term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: term(), NewState :: term()}.
{noreply, NewState :: term()}
| {noreply, NewState :: term(), Timeout :: timeout()}
| {noreply, NewState :: term(), hibernate}
| {stop, Reason :: term(), NewState :: term()}.
handle_cast(Req, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
{noreply, State}.
@ -205,13 +226,12 @@ handle_cast(Req, State) ->
%% @end
%%--------------------------------------------------------------------
-spec handle_info(Info :: timeout() | term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: normal | term(), NewState :: term()}.
{noreply, NewState :: term()}
| {noreply, NewState :: term(), Timeout :: timeout()}
| {noreply, NewState :: term(), hibernate}
| {stop, Reason :: normal | term(), NewState :: term()}.
handle_info(oscillate, State) ->
{noreply, oscillation(State)};
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}.
@ -225,8 +245,10 @@ handle_info(Info, State) ->
%% with Reason. The return value is ignored.
%% @end
%%--------------------------------------------------------------------
-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(),
State :: term()) -> any().
-spec terminate(
Reason :: normal | shutdown | {shutdown, term()} | term(),
State :: term()
) -> any().
terminate(_Reason, _State) ->
ok.
@ -236,10 +258,13 @@ terminate(_Reason, _State) ->
%% Convert process state when code is changed
%% @end
%%--------------------------------------------------------------------
-spec code_change(OldVsn :: term() | {down, term()},
State :: term(),
Extra :: term()) -> {ok, NewState :: term()} |
{error, Reason :: term()}.
-spec code_change(
OldVsn :: term() | {down, term()},
State :: term(),
Extra :: term()
) ->
{ok, NewState :: term()}
| {error, Reason :: term()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -251,8 +276,10 @@ code_change(_OldVsn, State, _Extra) ->
%% or when it appears in termination error logs.
%% @end
%%--------------------------------------------------------------------
-spec format_status(Opt :: normal | terminate,
Status :: list()) -> Status :: term().
-spec format_status(
Opt :: normal | terminate,
Status :: list()
) -> Status :: term().
format_status(_Opt, Status) ->
Status.
@ -264,40 +291,54 @@ oscillate(Interval) ->
%% @doc generate tokens, and then spread to leaf nodes
-spec oscillation(state()) -> state().
oscillation(#{root := #{rate := Flow,
period := Interval,
consumed := Consumed} = Root,
buckets := Buckets} = State) ->
oscillation(
#{
root := #{
rate := Flow,
period := Interval,
consumed := Consumed
} = Root,
buckets := Buckets
} = State
) ->
oscillate(Interval),
Ordereds = get_ordered_buckets(Buckets),
{Alloced, Buckets2} = transverse(Ordereds, Flow, 0, Buckets),
maybe_burst(State#{buckets := Buckets2,
root := Root#{consumed := Consumed + Alloced}}).
maybe_burst(State#{
buckets := Buckets2,
root := Root#{consumed := Consumed + Alloced}
}).
%% @doc horizontal spread
-spec transverse(list(bucket()),
flow(),
non_neg_integer(),
buckets()) -> {non_neg_integer(), buckets()}.
-spec transverse(
list(bucket()),
flow(),
non_neg_integer(),
buckets()
) -> {non_neg_integer(), buckets()}.
transverse([H | T], InFlow, Alloced, Buckets) when InFlow > 0 ->
{BucketAlloced, Buckets2} = longitudinal(H, InFlow, Buckets),
InFlow2 = sub(InFlow, BucketAlloced),
Alloced2 = Alloced + BucketAlloced,
transverse(T, InFlow2, Alloced2, Buckets2);
transverse(_, _, Alloced, Buckets) ->
{Alloced, Buckets}.
%% @doc vertical spread
-spec longitudinal(bucket(), flow(), buckets()) ->
{non_neg_integer(), buckets()}.
longitudinal(#{name := Name,
rate := Rate,
capacity := Capacity,
counter := Counter,
index := Index,
obtained := Obtained} = Bucket,
InFlow, Buckets) when Counter =/= undefined ->
{non_neg_integer(), buckets()}.
longitudinal(
#{
name := Name,
rate := Rate,
capacity := Capacity,
counter := Counter,
index := Index,
obtained := Obtained
} = Bucket,
InFlow,
Buckets
) when Counter =/= undefined ->
Flow = erlang:min(InFlow, Rate),
ShouldAlloc =
@ -305,8 +346,10 @@ longitudinal(#{name := Name,
Tokens when Tokens < 0 ->
%% toknes's value mayb be a negative value(stolen from the future)
%% because x. add(Capacity, x) < 0, so here we must compare with minimum value
erlang:max(add(Capacity, Tokens),
mul(Capacity, ?OVERLOAD_MIN_ALLOC));
erlang:max(
add(Capacity, Tokens),
mul(Capacity, ?OVERLOAD_MIN_ALLOC)
);
Tokens ->
%% is it possible that Tokens > Capacity ???
erlang:max(sub(Capacity, Tokens), 0)
@ -320,12 +363,10 @@ longitudinal(#{name := Name,
{Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket),
counters:add(Counter, Index, Inc),
{Inc,
Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}};
{Inc, Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}};
_ ->
{0, Buckets}
end;
longitudinal(_, _, Buckets) ->
{0, Buckets}.
@ -333,95 +374,111 @@ longitudinal(_, _, Buckets) ->
get_ordered_buckets(Buckets) when is_map(Buckets) ->
BucketList = maps:values(Buckets),
get_ordered_buckets(BucketList);
get_ordered_buckets(Buckets) ->
%% sort by obtained, avoid node goes hungry
lists:sort(fun(#{obtained := A}, #{obtained := B}) ->
A < B
end,
Buckets).
lists:sort(
fun(#{obtained := A}, #{obtained := B}) ->
A < B
end,
Buckets
).
-spec maybe_burst(state()) -> state().
maybe_burst(#{buckets := Buckets,
root := #{burst := Burst}} = State) when Burst > 0 ->
Fold = fun(_Name, #{counter := Cnt, index := Idx} = Bucket, Acc) when Cnt =/= undefined ->
case counters:get(Cnt, Idx) > 0 of
true ->
Acc;
false ->
[Bucket | Acc]
end;
(_Name, _Bucket, Acc) ->
Acc
end,
maybe_burst(
#{
buckets := Buckets,
root := #{burst := Burst}
} = State
) when Burst > 0 ->
Fold = fun
(_Name, #{counter := Cnt, index := Idx} = Bucket, Acc) when Cnt =/= undefined ->
case counters:get(Cnt, Idx) > 0 of
true ->
Acc;
false ->
[Bucket | Acc]
end;
(_Name, _Bucket, Acc) ->
Acc
end,
Empties = maps:fold(Fold, [], Buckets),
dispatch_burst(Empties, Burst, State);
maybe_burst(State) ->
State.
-spec dispatch_burst(list(bucket()), non_neg_integer(), state()) -> state().
dispatch_burst([], _, State) ->
State;
dispatch_burst(Empties, InFlow,
#{root := #{consumed := Consumed} = Root, buckets := Buckets} = State) ->
dispatch_burst(
Empties,
InFlow,
#{root := #{consumed := Consumed} = Root, buckets := Buckets} = State
) ->
EachFlow = InFlow / erlang:length(Empties),
{Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets),
State#{root := Root#{consumed := Consumed + Alloced}, buckets := Buckets2}.
-spec dispatch_burst_to_buckets(list(bucket()),
float(),
non_neg_integer(), buckets()) -> {non_neg_integer(), buckets()}.
-spec dispatch_burst_to_buckets(
list(bucket()),
float(),
non_neg_integer(),
buckets()
) -> {non_neg_integer(), buckets()}.
dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) ->
#{name := Name,
counter := Counter,
index := Index,
obtained := Obtained} = Bucket,
#{
name := Name,
counter := Counter,
index := Index,
obtained := Obtained
} = Bucket,
{Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket),
counters:add(Counter, Index, Inc),
Buckets2 = Buckets#{Name := Bucket2#{obtained := Obtained + Inc}},
dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Buckets2);
dispatch_burst_to_buckets([], _, Alloced, Buckets) ->
{Alloced, Buckets}.
-spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
init_tree(Type) ->
State = #{ type => Type
, root => undefined
, counter => undefined
, index => 1
, buckets => #{}
},
State = #{
type => Type,
root => undefined,
counter => undefined,
index => 1,
buckets => #{}
},
#{bucket := Buckets} = Cfg = emqx:get_config([limiter, Type]),
{Factor, Root} = make_root(Cfg),
{CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []),
State2 = State#{root := Root,
counter := counters:new(CounterNum, [write_concurrency])
},
State2 = State#{
root := Root,
counter := counters:new(CounterNum, [write_concurrency])
},
lists:foldl(fun(F, Acc) -> F(Acc) end, State2, DelayBuckets).
-spec make_root(hocons:confg()) -> {number(), root()}.
make_root(#{rate := Rate, burst := Burst}) when Rate >= 1 ->
{1, #{rate => Rate,
burst => Burst,
period => emqx_limiter_schema:minimum_period(),
consumed => 0}};
{1, #{
rate => Rate,
burst => Burst,
period => emqx_limiter_schema:minimum_period(),
consumed => 0
}};
make_root(#{rate := Rate, burst := Burst}) ->
MiniPeriod = emqx_limiter_schema:minimum_period(),
Factor = 1 / Rate,
{Factor, #{rate => 1,
burst => Burst * Factor,
period => erlang:floor(Factor * MiniPeriod),
consumed => 0}}.
{Factor, #{
rate => 1,
burst => Burst * Factor,
period => erlang:floor(Factor * MiniPeriod),
consumed => 0
}}.
make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBuckets) ->
Path = emqx_limiter_manager:make_path(Type, Name),
@ -433,48 +490,66 @@ make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBucket
emqx_limiter_manager:insert_bucket(Path, Ref),
CounterNum2 = CounterNum,
InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
State#{buckets := Buckets#{BucketName => Bucket}}
end;
State#{buckets := Buckets#{BucketName => Bucket}}
end;
RawRate ->
#{capacity := Capacity} = Conf,
Initial = get_initial_val(Conf),
Rate = mul(RawRate, Factor),
CounterNum2 = CounterNum + 1,
InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
{Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State),
Bucket2 = Bucket#{counter := Counter, index := Idx},
State2#{buckets := Buckets#{BucketName => Bucket2}}
end
{Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State),
Bucket2 = Bucket#{counter := Counter, index := Idx},
State2#{buckets := Buckets#{BucketName => Bucket2}}
end
end,
Bucket = #{ name => Name
, rate => Rate
, obtained => 0
, correction => 0
, capacity => Capacity
, counter => undefined
, index => undefined},
Bucket = #{
name => Name,
rate => Rate,
obtained => 0,
correction => 0,
capacity => Capacity,
counter => undefined,
index => undefined
},
DelayInit = ?CURRYING(Bucket, InitFun),
make_bucket(T,
Type, GlobalCfg, Factor, CounterNum2, [DelayInit | DelayBuckets]);
make_bucket(
T,
Type,
GlobalCfg,
Factor,
CounterNum2,
[DelayInit | DelayBuckets]
);
make_bucket([], _Type, _Global, _Factor, CounterNum, DelayBuckets) ->
{CounterNum, DelayBuckets}.
-spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) ->
{counters:counters_ref(), pos_integer(), state()}.
alloc_counter(Path, Rate, Initial,
#{counter := Counter, index := Index} = State) ->
{counters:counters_ref(), pos_integer(), state()}.
alloc_counter(
Path,
Rate,
Initial,
#{counter := Counter, index := Index} = State
) ->
case emqx_limiter_manager:find_bucket(Path) of
{ok, #{counter := ECounter,
index := EIndex}} when ECounter =/= undefined ->
{ok, #{
counter := ECounter,
index := EIndex
}} when ECounter =/= undefined ->
init_counter(Path, ECounter, EIndex, Rate, Initial, State);
_ ->
init_counter(Path, Counter, Index,
Rate, Initial, State#{index := Index + 1})
init_counter(
Path,
Counter,
Index,
Rate,
Initial,
State#{index := Index + 1}
)
end.
init_counter(Path, Counter, Index, Rate, Initial, State) ->
@ -483,26 +558,29 @@ init_counter(Path, Counter, Index, Rate, Initial, State) ->
emqx_limiter_manager:insert_bucket(Path, Ref),
{Counter, Index, State}.
%% @doc find first limited node
get_counter_rate(#{rate := Rate, capacity := Capacity}, _GlobalCfg)
when Rate =/= infinity orelse Capacity =/= infinity -> %% TODO maybe no need to check capacity
get_counter_rate(#{rate := Rate, capacity := Capacity}, _GlobalCfg) when
%% TODO maybe no need to check capacity
Rate =/= infinity orelse Capacity =/= infinity
->
Rate;
get_counter_rate(_Cfg, #{rate := Rate}) ->
get_counter_rate(_Cfg, #{rate := Rate}) ->
Rate.
-spec get_initial_val(hocons:config()) -> decimal().
get_initial_val(#{initial := Initial,
rate := Rate,
capacity := Capacity}) ->
get_initial_val(#{
initial := Initial,
rate := Rate,
capacity := Capacity
}) ->
%% initial will nevner be infinity(see the emqx_limiter_schema)
if Initial > 0 ->
if
Initial > 0 ->
Initial;
Rate =/= infinity ->
Rate =/= infinity ->
erlang:min(Rate, Capacity);
Capacity =/= infinity ->
Capacity =/= infinity ->
Capacity;
true ->
true ->
0
end.

View File

@ -33,11 +33,12 @@
%% Starts the supervisor
%% @end
%%--------------------------------------------------------------------
-spec start_link() -> {ok, Pid :: pid()} |
{error, {already_started, Pid :: pid()}} |
{error, {shutdown, term()}} |
{error, term()} |
ignore.
-spec start_link() ->
{ok, Pid :: pid()}
| {error, {already_started, Pid :: pid()}}
| {error, {shutdown, term()}}
| {error, term()}
| ignore.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
@ -67,13 +68,14 @@ restart(Type) ->
%% @end
%%--------------------------------------------------------------------
-spec init(Args :: term()) ->
{ok, {SupFlags :: supervisor:sup_flags(),
[ChildSpec :: supervisor:child_spec()]}} |
ignore.
{ok, {SupFlags :: supervisor:sup_flags(), [ChildSpec :: supervisor:child_spec()]}}
| ignore.
init([]) ->
SupFlags = #{strategy => one_for_one,
intensity => 10,
period => 3600},
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 3600
},
{ok, {SupFlags, childs()}}.
@ -82,12 +84,14 @@ init([]) ->
%%--==================================================================
make_child(Type) ->
Id = emqx_limiter_server:name(Type),
#{id => Id,
start => {emqx_limiter_server, start_link, [Type]},
restart => transient,
shutdown => 5000,
type => worker,
modules => [emqx_limiter_server]}.
#{
id => Id,
start => {emqx_limiter_server, start_link, [Type]},
restart => transient,
shutdown => 5000,
type => worker,
modules => [emqx_limiter_server]
}.
childs() ->
Conf = emqx:get_config([limiter]),

View File

@ -33,11 +33,12 @@
%% Starts the supervisor
%% @end
%%--------------------------------------------------------------------
-spec start_link() -> {ok, Pid :: pid()} |
{error, {already_started, Pid :: pid()}} |
{error, {shutdown, term()}} |
{error, term()} |
ignore.
-spec start_link() ->
{ok, Pid :: pid()}
| {error, {already_started, Pid :: pid()}}
| {error, {shutdown, term()}}
| {error, term()}
| ignore.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
@ -55,22 +56,27 @@ start_link() ->
%% @end
%%--------------------------------------------------------------------
-spec init(Args :: term()) ->
{ok, {SupFlags :: supervisor:sup_flags(),
[ChildSpec :: supervisor:child_spec()]}} |
ignore.
{ok, {SupFlags :: supervisor:sup_flags(), [ChildSpec :: supervisor:child_spec()]}}
| ignore.
init([]) ->
SupFlags = #{strategy => one_for_one,
intensity => 10,
period => 3600},
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 3600
},
Childs = [ make_child(emqx_limiter_manager, worker)
, make_child(emqx_limiter_server_sup, supervisor)],
Childs = [
make_child(emqx_limiter_manager, worker),
make_child(emqx_limiter_server_sup, supervisor)
],
{ok, {SupFlags, Childs}}.
make_child(Mod, Type) ->
#{id => Mod,
start => {Mod, start_link, []},
restart => transient,
type => Type,
modules => [Mod]}.
#{
id => Mod,
start => {Mod, start_link, []},
restart => transient,
type => Type,
modules => [Mod]
}.

View File

@ -24,30 +24,33 @@
-boot_mnesia({mnesia, [boot]}).
-export([mnesia/1]).
-export([ publish/1
, subscribe/3
, unsubscribe/2
, log/3
]).
-export([
publish/1,
subscribe/3,
unsubscribe/2,
log/3
]).
-export([ start_link/0
, list/0
, list/1
, get_trace_filename/1
, create/1
, delete/1
, clear/0
, update/2
, check/0
]).
-export([
start_link/0,
list/0,
list/1,
get_trace_filename/1,
create/1,
delete/1,
clear/0,
update/2,
check/0
]).
-export([ format/1
, zip_dir/0
, filename/2
, trace_dir/0
, trace_file/1
, delete_files_after_send/2
]).
-export([
format/1,
zip_dir/0,
filename/2,
trace_dir/0,
trace_file/1,
delete_files_after_send/2
]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -56,22 +59,23 @@
-define(OWN_KEYS, [level, filters, filter_default, handlers]).
-ifdef(TEST).
-export([ log_file/2
, find_closest_time/2
]).
-export([
log_file/2,
find_closest_time/2
]).
-endif.
-export_type([ip_address/0]).
-type ip_address() :: string().
-record(?TRACE, {
name :: binary() | undefined | '_'
, type :: clientid | topic | ip_address | undefined | '_'
, filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_'
, enable = true :: boolean() | '_'
, start_at :: integer() | undefined | '_'
, end_at :: integer() | undefined | '_'
}).
name :: binary() | undefined | '_',
type :: clientid | topic | ip_address | undefined | '_',
filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_',
enable = true :: boolean() | '_',
start_at :: integer() | undefined | '_',
end_at :: integer() | undefined | '_'
}).
mnesia(boot) ->
ok = mria:create_table(?TRACE, [
@ -79,18 +83,23 @@ mnesia(boot) ->
{rlog_shard, ?COMMON_SHARD},
{storage, disc_copies},
{record_name, ?TRACE},
{attributes, record_info(fields, ?TRACE)}]).
{attributes, record_info(fields, ?TRACE)}
]).
publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore;
publish(#message{topic = <<"$SYS/", _/binary>>}) ->
ignore;
publish(#message{from = From, topic = Topic, payload = Payload}) when
is_binary(From); is_atom(From) ->
is_binary(From); is_atom(From)
->
?TRACE("PUBLISH", "publish_to", #{topic => Topic, payload => Payload}).
subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore;
subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) ->
ignore;
subscribe(Topic, SubId, SubOpts) ->
?TRACE("SUBSCRIBE", "subscribe", #{topic => Topic, sub_opts => SubOpts, sub_id => SubId}).
unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore;
unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) ->
ignore;
unsubscribe(Topic, SubOpts) ->
?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
@ -103,36 +112,47 @@ log(List, Msg, Meta0) ->
Log = #{level => debug, meta => Meta, msg => Msg},
log_filter(List, Log).
log_filter([], _Log) -> ok;
log_filter([], _Log) ->
ok;
log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) ->
case FilterFun(Log0, {Filter, Name}) of
stop -> stop;
ignore -> ignore;
stop ->
stop;
ignore ->
ignore;
Log ->
case logger_config:get(ets:whereis(logger), Id) of
{ok, #{module := Module} = HandlerConfig0} ->
HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0),
try Module:log(Log, HandlerConfig)
catch C:R:S ->
case logger:remove_handler(Id) of
ok ->
logger:internal_log(error, {removed_failing_handler, Id, C, R, S});
{error,{not_found,_}} ->
%% Probably already removed by other client
%% Don't report again
ok;
{error,Reason} ->
logger:internal_log(error,
{removed_handler_failed, Id, Reason, C, R, S})
end
try
Module:log(Log, HandlerConfig)
catch
C:R:S ->
case logger:remove_handler(Id) of
ok ->
logger:internal_log(
error, {removed_failing_handler, Id, C, R, S}
);
{error, {not_found, _}} ->
%% Probably already removed by other client
%% Don't report again
ok;
{error, Reason} ->
logger:internal_log(
error,
{removed_handler_failed, Id, Reason, C, R, S}
)
end
end;
{error, {not_found, Id}} -> ok;
{error, Reason} -> logger:internal_log(error, {find_handle_id_failed, Id, Reason})
{error, {not_found, Id}} ->
ok;
{error, Reason} ->
logger:internal_log(error, {find_handle_id_failed, Id, Reason})
end
end,
log_filter(Rest, Log0).
-spec(start_link() -> emqx_types:startlink_ret()).
-spec start_link() -> emqx_types:startlink_ret().
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@ -145,7 +165,8 @@ list(Enable) ->
ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}).
-spec create([{Key :: binary(), Value :: binary()}] | #{atom() => binary()}) ->
{ok, #?TRACE{}} | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}.
{ok, #?TRACE{}}
| {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}.
create(Trace) ->
case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of
true ->
@ -154,8 +175,9 @@ create(Trace) ->
{error, Reason} -> {error, Reason}
end;
false ->
{error, "The number of traces created has reache the maximum"
" please delete the useless ones first"}
{error,
"The number of traces created has reache the maximum"
" please delete the useless ones first"}
end.
-spec delete(Name :: binary()) -> ok | {error, not_found}.
@ -165,7 +187,7 @@ delete(Name) ->
[_] -> mnesia:delete(?TRACE, Name, write);
[] -> mnesia:abort(not_found)
end
end,
end,
transaction(Tran).
-spec clear() -> ok | {error, Reason :: term()}.
@ -180,15 +202,17 @@ clear() ->
update(Name, Enable) ->
Tran = fun() ->
case mnesia:read(?TRACE, Name) of
[] -> mnesia:abort(not_found);
[#?TRACE{enable = Enable}] -> ok;
[] ->
mnesia:abort(not_found);
[#?TRACE{enable = Enable}] ->
ok;
[Rec] ->
case erlang:system_time(second) >= Rec#?TRACE.end_at of
false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write);
true -> mnesia:abort(finished)
end
end
end,
end,
transaction(Tran).
check() ->
@ -201,12 +225,13 @@ get_trace_filename(Name) ->
case mnesia:read(?TRACE, Name, read) of
[] -> mnesia:abort(not_found);
[#?TRACE{start_at = Start}] -> {ok, filename(Name, Start)}
end end,
end
end,
transaction(Tran).
-spec trace_file(File :: file:filename_all()) ->
{ok, Node :: list(), Binary :: binary()} |
{error, Node :: list(), Reason :: term()}.
{ok, Node :: list(), Binary :: binary()}
| {error, Node :: list(), Reason :: term()}.
trace_file(File) ->
FileName = filename:join(trace_dir(), File),
Node = atom_to_list(node()),
@ -221,10 +246,13 @@ delete_files_after_send(TraceLog, Zips) ->
-spec format(list(#?TRACE{})) -> list(map()).
format(Traces) ->
Fields = record_info(fields, ?TRACE),
lists:map(fun(Trace0 = #?TRACE{}) ->
[_ | Values] = tuple_to_list(Trace0),
maps:from_list(lists:zip(Fields, Values))
end, Traces).
lists:map(
fun(Trace0 = #?TRACE{}) ->
[_ | Values] = tuple_to_list(Trace0),
maps:from_list(lists:zip(Fields, Values))
end,
Traces
).
init([]) ->
ok = mria:wait_for_tables([?TRACE]),
@ -253,7 +281,8 @@ handle_cast(Msg, State) ->
handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitors}) ->
case maps:take(Pid, Monitors) of
error -> {noreply, State};
error ->
{noreply, State};
{Files, NewMonitors} ->
lists:foreach(fun file:delete/1, Files),
{noreply, State#{monitors => NewMonitors}}
@ -264,11 +293,9 @@ handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) ->
update_trace_handler(),
?tp(update_trace_done, #{}),
{noreply, State#{timer => NextTRef}};
handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) ->
emqx_misc:cancel_timer(TRef),
handle_info({timeout, TRef, update_trace}, State);
handle_info(Info, State) ->
?SLOG(error, #{unexpected_info => Info}),
{noreply, State}.
@ -294,11 +321,13 @@ insert_new_trace(Trace) ->
[] ->
ok = mnesia:write(?TRACE, Trace, write),
{ok, Trace};
[#?TRACE{name = Name}] -> mnesia:abort({duplicate_condition, Name})
[#?TRACE{name = Name}] ->
mnesia:abort({duplicate_condition, Name})
end;
[#?TRACE{name = Name}] -> mnesia:abort({already_existed, Name})
[#?TRACE{name = Name}] ->
mnesia:abort({already_existed, Name})
end
end,
end,
transaction(Tran).
update_trace(Traces) ->
@ -314,67 +343,91 @@ update_trace(Traces) ->
emqx_misc:start_timer(NextTime, update_trace).
stop_all_trace_handler() ->
lists:foreach(fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end,
emqx_trace_handler:running()).
lists:foreach(
fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end,
emqx_trace_handler:running()
).
get_enable_trace() ->
transaction(fun() ->
mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read)
end).
end).
find_closest_time(Traces, Now) ->
Sec =
lists:foldl(
fun(#?TRACE{start_at = Start, end_at = End, enable = true}, Closest) ->
min(closest(End, Now, Closest), closest(Start, Now, Closest));
(_, Closest) -> Closest
end, 60 * 15, Traces),
fun
(#?TRACE{start_at = Start, end_at = End, enable = true}, Closest) ->
min(closest(End, Now, Closest), closest(Start, Now, Closest));
(_, Closest) ->
Closest
end,
60 * 15,
Traces
),
timer:seconds(Sec).
closest(Time, Now, Closest) when Now >= Time -> Closest;
closest(Time, Now, Closest) -> min(Time - Now, Closest).
disable_finished([]) -> ok;
disable_finished([]) ->
ok;
disable_finished(Traces) ->
transaction(fun() ->
lists:map(fun(#?TRACE{name = Name}) ->
case mnesia:read(?TRACE, Name, write) of
[] -> ok;
[Trace] -> mnesia:write(?TRACE, Trace#?TRACE{enable = false}, write)
end end, Traces)
end).
lists:map(
fun(#?TRACE{name = Name}) ->
case mnesia:read(?TRACE, Name, write) of
[] -> ok;
[Trace] -> mnesia:write(?TRACE, Trace#?TRACE{enable = false}, write)
end
end,
Traces
)
end).
start_trace(Traces, Started0) ->
Started = lists:map(fun(#{name := Name}) -> Name end, Started0),
lists:foldl(fun(#?TRACE{name = Name} = Trace,
{Running, StartedAcc}) ->
case lists:member(Name, StartedAcc) of
true -> {[Name | Running], StartedAcc};
false ->
case start_trace(Trace) of
ok -> {[Name | Running], [Name | StartedAcc]};
{error, _Reason} -> {[Name | Running], StartedAcc}
end
end
end, {[], Started}, Traces).
lists:foldl(
fun(
#?TRACE{name = Name} = Trace,
{Running, StartedAcc}
) ->
case lists:member(Name, StartedAcc) of
true ->
{[Name | Running], StartedAcc};
false ->
case start_trace(Trace) of
ok -> {[Name | Running], [Name | StartedAcc]};
{error, _Reason} -> {[Name | Running], StartedAcc}
end
end
end,
{[], Started},
Traces
).
start_trace(Trace) ->
#?TRACE{name = Name
, type = Type
, filter = Filter
, start_at = Start
#?TRACE{
name = Name,
type = Type,
filter = Filter,
start_at = Start
} = Trace,
Who = #{name => Name, type => Type, filter => Filter},
emqx_trace_handler:install(Who, debug, log_file(Name, Start)).
stop_trace(Finished, Started) ->
lists:foreach(fun(#{name := Name, type := Type, filter := Filter}) ->
case lists:member(Name, Finished) of
true ->
?TRACE("API", "trace_stopping", #{Type => Filter}),
emqx_trace_handler:uninstall(Type, Name);
false -> ok
end
end, Started).
lists:foreach(
fun(#{name := Name, type := Type, filter := Filter}) ->
case lists:member(Name, Finished) of
true ->
?TRACE("API", "trace_stopping", #{Type => Filter}),
emqx_trace_handler:uninstall(Type, Name);
false ->
ok
end
end,
Started
).
clean_stale_trace_files() ->
TraceDir = trace_dir(),
@ -383,30 +436,44 @@ clean_stale_trace_files() ->
FileFun = fun(#?TRACE{name = Name, start_at = StartAt}) -> filename(Name, StartAt) end,
KeepFiles = lists:map(FileFun, list()),
case AllFiles -- ["zip" | KeepFiles] of
[] -> ok;
[] ->
ok;
DeleteFiles ->
DelFun = fun(F) -> file:delete(filename:join(TraceDir, F)) end,
lists:foreach(DelFun, DeleteFiles)
end;
_ -> ok
_ ->
ok
end.
classify_by_time(Traces, Now) ->
classify_by_time(Traces, Now, [], [], []).
classify_by_time([], _Now, Wait, Run, Finish) -> {Wait, Run, Finish};
classify_by_time([Trace = #?TRACE{start_at = Start} | Traces],
Now, Wait, Run, Finish) when Start > Now ->
classify_by_time([], _Now, Wait, Run, Finish) ->
{Wait, Run, Finish};
classify_by_time(
[Trace = #?TRACE{start_at = Start} | Traces],
Now,
Wait,
Run,
Finish
) when Start > Now ->
classify_by_time(Traces, Now, [Trace | Wait], Run, Finish);
classify_by_time([Trace = #?TRACE{end_at = End} | Traces],
Now, Wait, Run, Finish) when End =< Now ->
classify_by_time(
[Trace = #?TRACE{end_at = End} | Traces],
Now,
Wait,
Run,
Finish
) when End =< Now ->
classify_by_time(Traces, Now, Wait, Run, [Trace | Finish]);
classify_by_time([Trace | Traces], Now, Wait, Run, Finish) ->
classify_by_time(Traces, Now, Wait, [Trace | Run], Finish).
to_trace(TraceParam) ->
case to_trace(ensure_map(TraceParam), #?TRACE{}) of
{error, Reason} -> {error, Reason};
{error, Reason} ->
{error, Reason};
{ok, #?TRACE{name = undefined}} ->
{error, "name required"};
{ok, #?TRACE{type = undefined}} ->
@ -421,21 +488,31 @@ to_trace(TraceParam) ->
end.
ensure_map(#{} = Trace) ->
maps:fold(fun(K, V, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V};
(K, V, Acc) when is_atom(K) -> Acc#{K => V}
end, #{}, Trace);
maps:fold(
fun
(K, V, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V};
(K, V, Acc) when is_atom(K) -> Acc#{K => V}
end,
#{},
Trace
);
ensure_map(Trace) when is_list(Trace) ->
lists:foldl(
fun({K, V}, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V};
fun
({K, V}, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V};
({K, V}, Acc) when is_atom(K) -> Acc#{K => V};
(_, Acc) -> Acc
end, #{}, Trace).
end,
#{},
Trace
).
fill_default(Trace = #?TRACE{start_at = undefined}) ->
fill_default(Trace#?TRACE{start_at = erlang:system_time(second)});
fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) ->
fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60});
fill_default(Trace) -> Trace.
fill_default(Trace) ->
Trace.
-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$").
@ -452,16 +529,19 @@ to_trace(#{type := topic, topic := Filter} = Trace, Rec) ->
ok ->
Trace0 = maps:without([type, topic], Trace),
to_trace(Trace0, Rec#?TRACE{type = topic, filter = Filter});
Error -> Error
Error ->
Error
end;
to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) ->
case validate_ip_address(Filter) of
ok ->
Trace0 = maps:without([type, ip_address], Trace),
to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = binary_to_list(Filter)});
Error -> Error
Error ->
Error
end;
to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])};
to_trace(#{type := Type}, _Rec) ->
{error, io_lib:format("required ~s field", [Type])};
to_trace(#{start_at := StartAt} = Trace, Rec) ->
{ok, Sec} = to_system_second(StartAt),
to_trace(maps:remove(start_at, Trace), Rec#?TRACE{start_at = Sec});
@ -473,7 +553,8 @@ to_trace(#{end_at := EndAt} = Trace, Rec) ->
{ok, _Sec} ->
{error, "end_at time has already passed"}
end;
to_trace(_, Rec) -> {ok, Rec}.
to_trace(_, Rec) ->
{ok, Rec}.
validate_topic(TopicName) ->
try emqx_topic:validate(filter, TopicName) of
@ -513,11 +594,22 @@ transaction(Tran) ->
update_trace_handler() ->
case emqx_trace_handler:running() of
[] -> persistent_term:erase(?TRACE_FILTER);
[] ->
persistent_term:erase(?TRACE_FILTER);
Running ->
List = lists:map(fun(#{id := Id, filter_fun := FilterFun,
filter := Filter, name := Name}) ->
{Id, FilterFun, Filter, Name} end, Running),
List = lists:map(
fun(
#{
id := Id,
filter_fun := FilterFun,
filter := Filter,
name := Name
}
) ->
{Id, FilterFun, Filter, Name}
end,
Running
),
case List =/= persistent_term:get(?TRACE_FILTER, undefined) of
true -> persistent_term:put(?TRACE_FILTER, List);
false -> ok
@ -525,6 +617,9 @@ update_trace_handler() ->
end.
filter_cli_handler(Names) ->
lists:filter(fun(Name) ->
nomatch =:= re:run(Name, "^CLI-+.", [])
end, Names).
lists:filter(
fun(Name) ->
nomatch =:= re:run(Name, "^CLI-+.", [])
end,
Names
).

View File

@ -23,14 +23,15 @@
-spec format(LogEvent, Config) -> unicode:chardata() when
LogEvent :: logger:log_event(),
Config :: logger:config().
format(#{level := debug, meta := Meta = #{trace_tag := Tag}, msg := Msg},
#{payload_encode := PEncode}) ->
format(
#{level := debug, meta := Meta = #{trace_tag := Tag}, msg := Msg},
#{payload_encode := PEncode}
) ->
Time = calendar:system_time_to_rfc3339(erlang:system_time(second)),
ClientId = to_iolist(maps:get(clientid, Meta, "")),
Peername = maps:get(peername, Meta, ""),
MetaBin = format_meta(Meta, PEncode),
[Time, " [", Tag, "] ", ClientId, "@", Peername, " msg: ", Msg, MetaBin, "\n"];
format(Event, Config) ->
emqx_logger_textfmt:format(Event, Config).
@ -72,6 +73,10 @@ to_iolist(SubMap) when is_map(SubMap) -> ["[", map_to_iolist(SubMap), "]"];
to_iolist(Char) -> emqx_logger_textfmt:try_format_unicode(Char).
map_to_iolist(Map) ->
lists:join(",",
lists:map(fun({K, V}) -> [to_iolist(K), ": ", to_iolist(V)] end,
maps:to_list(Map))).
lists:join(
",",
lists:map(
fun({K, V}) -> [to_iolist(K), ": ", to_iolist(V)] end,
maps:to_list(Map)
)
).

View File

@ -22,28 +22,30 @@
-logger_header("[Tracer]").
%% APIs
-export([ running/0
, install/3
, install/4
, install/5
, uninstall/1
, uninstall/2
]).
-export([
running/0,
install/3,
install/4,
install/5,
uninstall/1,
uninstall/2
]).
%% For logger handler filters callbacks
-export([ filter_clientid/2
, filter_topic/2
, filter_ip_address/2
]).
-export([
filter_clientid/2,
filter_topic/2,
filter_ip_address/2
]).
-export([handler_id/2]).
-export([payload_encode/0]).
-type tracer() :: #{
name := binary(),
type := clientid | topic | ip_address,
filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address()
}.
name := binary(),
type := clientid | topic | ip_address,
filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address()
}.
-define(CONFIG(_LogFile_), #{
type => halt,
@ -60,19 +62,23 @@
%% APIs
%%------------------------------------------------------------------------------
-spec install(Name :: binary() | list(),
-spec install(
Name :: binary() | list(),
Type :: clientid | topic | ip_address,
Filter :: emqx_types:clientid() | emqx_types:topic() | string(),
Level :: logger:level() | all,
LogFilePath :: string()) -> ok | {error, term()}.
LogFilePath :: string()
) -> ok | {error, term()}.
install(Name, Type, Filter, Level, LogFile) ->
Who = #{type => Type, filter => ensure_bin(Filter), name => ensure_bin(Name)},
install(Who, Level, LogFile).
-spec install(Type :: clientid | topic | ip_address,
-spec install(
Type :: clientid | topic | ip_address,
Filter :: emqx_types:clientid() | emqx_types:topic() | string(),
Level :: logger:level() | all,
LogFilePath :: string()) -> ok | {error, term()}.
LogFilePath :: string()
) -> ok | {error, term()}.
install(Type, Filter, Level, LogFile) ->
install(Filter, Type, Filter, Level, LogFile).
@ -92,8 +98,10 @@ install(Who = #{name := Name, type := Type}, Level, LogFile) ->
show_prompts(Res, Who, "start_trace"),
Res.
-spec uninstall(Type :: clientid | topic | ip_address,
Name :: binary() | list()) -> ok | {error, term()}.
-spec uninstall(
Type :: clientid | topic | ip_address,
Name :: binary() | list()
) -> ok | {error, term()}.
uninstall(Type, Name) ->
HandlerId = handler_id(ensure_bin(Name), Type),
uninstall(HandlerId).
@ -108,12 +116,12 @@ uninstall(HandlerId) ->
-spec running() ->
[
#{
name => binary(),
type => topic | clientid | ip_address,
id => atom(),
filter => emqx_types:topic() | emqx_types:clienetid() | emqx_trace:ip_address(),
level => logger:level(),
dst => file:filename() | console | unknown
name => binary(),
type => topic | clientid | ip_address,
id => atom(),
filter => emqx_types:topic() | emqx_types:clienetid() | emqx_trace:ip_address(),
level => logger:level(),
dst => file:filename() | console | unknown
}
].
running() ->
@ -122,17 +130,20 @@ running() ->
-spec filter_clientid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
filter_clientid(#{meta := Meta = #{clientid := ClientId}} = Log, {MatchId, _Name}) ->
filter_ret(ClientId =:= MatchId andalso is_trace(Meta), Log);
filter_clientid(_Log, _ExpectId) -> stop.
filter_clientid(_Log, _ExpectId) ->
stop.
-spec filter_topic(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
filter_topic(#{meta := Meta = #{topic := Topic}} = Log, {TopicFilter, _Name}) ->
filter_ret(is_trace(Meta) andalso emqx_topic:match(Topic, TopicFilter), Log);
filter_topic(_Log, _ExpectId) -> stop.
filter_topic(_Log, _ExpectId) ->
stop.
-spec filter_ip_address(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop.
filter_ip_address(#{meta := Meta = #{peername := Peername}} = Log, {IP, _Name}) ->
filter_ret(is_trace(Meta) andalso lists:prefix(IP, Peername), Log);
filter_ip_address(_Log, _ExpectId) -> stop.
filter_ip_address(_Log, _ExpectId) ->
stop.
-compile({inline, [is_trace/1, filter_ret/2]}).
%% TRUE when is_trace is missing.
@ -150,16 +161,14 @@ filters(#{type := ip_address, filter := Filter, name := Name}) ->
[{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}].
formatter(#{type := _Type}) ->
{emqx_trace_formatter,
#{
%% template is for ?SLOG message not ?TRACE.
template => [time," [",level,"] ", msg,"\n"],
single_line => true,
max_size => unlimited,
depth => unlimited,
payload_encode => payload_encode()
}
}.
{emqx_trace_formatter, #{
%% template is for ?SLOG message not ?TRACE.
template => [time, " [", level, "] ", msg, "\n"],
single_line => true,
max_size => unlimited,
depth => unlimited,
payload_encode => payload_encode()
}}.
filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) ->
Init = #{id => Id, level => Level, dst => Dst},
@ -167,7 +176,8 @@ filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc)
[{Type, {FilterFun, {Filter, Name}}}] when
Type =:= topic orelse
Type =:= clientid orelse
Type =:= ip_address ->
Type =:= ip_address
->
[Init#{type => Type, filter => Filter, name => Name, filter_fun => FilterFun} | Acc];
_ ->
Acc
@ -179,7 +189,7 @@ handler_id(Name, Type) ->
try
do_handler_id(Name, Type)
catch
_ : _ ->
_:_ ->
Hash = emqx_misc:bin2hexstr_a_f(crypto:hash(md5, Name)),
do_handler_id(Hash, Type)
end.

View File

@ -18,17 +18,18 @@
-behaviour(emqx_bpapi).
-export([ introduced_in/0
-export([
introduced_in/0,
, forward/3
, forward_async/3
, list_client_subscriptions/2
, list_subscriptions_via_topic/2
forward/3,
forward_async/3,
list_client_subscriptions/2,
list_subscriptions_via_topic/2,
, start_listener/2
, stop_listener/2
, restart_listener/2
]).
start_listener/2,
stop_listener/2,
restart_listener/2
]).
-include("bpapi.hrl").
-include("emqx.hrl").
@ -36,8 +37,9 @@
introduced_in() ->
"5.0.0".
-spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()
| emqx_rpc:badrpc().
-spec forward(node(), emqx_types:topic(), emqx_types:delivery()) ->
emqx_types:deliver_result()
| emqx_rpc:badrpc().
forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
emqx_rpc:call(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
@ -46,8 +48,8 @@ forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
-spec list_client_subscriptions(node(), emqx_types:clientid()) ->
[{emqx_types:topic(), emqx_types:subopts()}]
| emqx_rpc:badrpc().
[{emqx_types:topic(), emqx_types:subopts()}]
| emqx_rpc:badrpc().
list_client_subscriptions(Node, ClientId) ->
rpc:call(Node, emqx_broker, subscriptions, [ClientId]).

View File

@ -18,18 +18,19 @@
-behaviour(emqx_bpapi).
-export([ introduced_in/0
-export([
introduced_in/0,
, lookup_client/2
, kickout_client/2
lookup_client/2,
kickout_client/2,
, get_chan_stats/2
, get_chan_info/2
, get_chann_conn_mod/2
get_chan_stats/2,
get_chan_info/2,
get_chann_conn_mod/2,
, takeover_session/2
, kick_session/3
]).
takeover_session/2,
kick_session/3
]).
-include("bpapi.hrl").
-include("src/emqx_cm.hrl").
@ -42,7 +43,7 @@ kickout_client(Node, ClientId) ->
rpc:call(Node, emqx_cm, kick_session, [ClientId]).
-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
[emqx_cm:channel_info()] | {badrpc, _}.
[emqx_cm:channel_info()] | {badrpc, _}.
lookup_client(Node, Key) ->
rpc:call(Node, emqx_cm, lookup_client, [Key]).
@ -54,15 +55,16 @@ get_chan_stats(ClientId, ChanPid) ->
get_chan_info(ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2).
-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) -> module() | undefined | {badrpc, _}.
-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) ->
module() | undefined | {badrpc, _}.
get_chann_conn_mod(ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2).
-spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) ->
none
| {expired | persistent, emqx_session:session()}
| {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()}
| {badrpc, _}.
none
| {expired | persistent, emqx_session:session()}
| {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()}
| {badrpc, _}.
takeover_session(ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2).

View File

@ -18,10 +18,11 @@
-behaviour(emqx_bpapi).
-export([ introduced_in/0
, resume_begin/3
, resume_end/3
]).
-export([
introduced_in/0,
resume_begin/3,
resume_end/3
]).
-include("bpapi.hrl").
-include("emqx.hrl").
@ -30,11 +31,11 @@ introduced_in() ->
"5.0.0".
-spec resume_begin([node()], pid(), binary()) ->
emqx_rpc:erpc_multicall([{node(), emqx_guid:guid()}]).
emqx_rpc:erpc_multicall([{node(), emqx_guid:guid()}]).
resume_begin(Nodes, Pid, SessionID) when is_pid(Pid), is_binary(SessionID) ->
erpc:multicall(Nodes, emqx_session_router, resume_begin, [Pid, SessionID]).
-spec resume_end([node()], pid(), binary()) ->
emqx_rpc:erpc_multicall({'ok', [emqx_types:message()]} | {'error', term()}).
emqx_rpc:erpc_multicall({'ok', [emqx_types:message()]} | {'error', term()}).
resume_end(Nodes, Pid, SessionID) when is_pid(Pid), is_binary(SessionID) ->
erpc:multicall(Nodes, emqx_session_router, resume_end, [Pid, SessionID]).

View File

@ -20,20 +20,21 @@
-include("bpapi.hrl").
-export([ introduced_in/0
-export([
introduced_in/0,
, is_running/1
is_running/1,
, get_alarms/2
, get_stats/1
, get_metrics/1
get_alarms/2,
get_stats/1,
get_metrics/1,
, deactivate_alarm/2
, delete_all_deactivated_alarms/1
deactivate_alarm/2,
delete_all_deactivated_alarms/1,
, clean_authz_cache/1
, clean_authz_cache/2
]).
clean_authz_cache/1,
clean_authz_cache/2
]).
introduced_in() ->
"5.0.0".
@ -55,9 +56,9 @@ get_metrics(Node) ->
rpc:call(Node, emqx_metrics, all, []).
-spec clean_authz_cache(node(), emqx_types:clientid()) ->
ok
| {error, not_found}
| {badrpc, _}.
ok
| {error, not_found}
| {badrpc, _}.
clean_authz_cache(Node, ClientId) ->
rpc:call(Node, emqx_authz_cache, drain_cache, [ClientId]).
@ -66,7 +67,7 @@ clean_authz_cache(Node) ->
rpc:call(Node, emqx_authz_cache, drain_cache, []).
-spec deactivate_alarm(node(), binary() | atom()) ->
ok | {error, not_found} | {badrpc, _}.
ok | {error, not_found} | {badrpc, _}.
deactivate_alarm(Node, Name) ->
rpc:call(Node, emqx_alarm, deactivate, [Name]).

View File

@ -23,18 +23,24 @@
%%--------------------------------------------------------------------
prop_symmetric() ->
?FORALL(Data, raw_data(),
?FORALL(
Data,
raw_data(),
begin
Encoded = emqx_base62:encode(Data),
to_binary(Data) =:= emqx_base62:decode(Encoded)
end).
end
).
prop_size() ->
?FORALL(Data, binary(),
begin
Encoded = emqx_base62:encode(Data),
base62_size(Data, Encoded)
end).
?FORALL(
Data,
binary(),
begin
Encoded = emqx_base62:encode(Data),
base62_size(Data, Encoded)
end
).
%%--------------------------------------------------------------------
%% Helpers
@ -59,7 +65,7 @@ base62_size(Data, Encoded) ->
RangeEnd = DataSize div 3 * 8,
EncodedSize >= RangeStart andalso EncodedSize =< RangeEnd;
_Rem ->
RangeStart = DataSize * 8 div 6 + 1,
RangeStart = DataSize * 8 div 6 + 1,
RangeEnd = DataSize * 8 div 6 * 2 + 1,
EncodedSize >= RangeStart andalso EncodedSize =< RangeEnd
end.

View File

@ -24,24 +24,27 @@
%%--------------------------------------------------------------------
prop_serialize_parse_connect() ->
?FORALL(Opts = #{version := ProtoVer}, parse_opts(),
begin
ProtoName = proplists:get_value(ProtoVer, ?PROTOCOL_NAMES),
Packet = ?CONNECT_PACKET(#mqtt_packet_connect{
proto_name = ProtoName,
proto_ver = ProtoVer,
clientid = <<"clientId">>,
will_qos = ?QOS_1,
will_flag = true,
will_retain = true,
will_topic = <<"will">>,
will_props = #{},
will_payload = <<"bye">>,
clean_start = true,
properties = #{}
}),
Packet =:= parse_serialize(Packet, Opts)
end).
?FORALL(
Opts = #{version := ProtoVer},
parse_opts(),
begin
ProtoName = proplists:get_value(ProtoVer, ?PROTOCOL_NAMES),
Packet = ?CONNECT_PACKET(#mqtt_packet_connect{
proto_name = ProtoName,
proto_ver = ProtoVer,
clientid = <<"clientId">>,
will_qos = ?QOS_1,
will_flag = true,
will_retain = true,
will_topic = <<"will">>,
will_props = #{},
will_payload = <<"bye">>,
clean_start = true,
properties = #{}
}),
Packet =:= parse_serialize(Packet, Opts)
end
).
%%--------------------------------------------------------------------
%% Helpers
@ -59,4 +62,4 @@ parse_serialize(Packet, Opts) when is_map(Opts) ->
%%--------------------------------------------------------------------
parse_opts() ->
?LET(PropList, [{strict_mode, boolean()}, {version, range(4,5)}], maps:from_list(PropList)).
?LET(PropList, [{strict_mode, boolean()}, {version, range(4, 5)}], maps:from_list(PropList)).

View File

@ -16,14 +16,17 @@
-module(prop_emqx_json).
-import(emqx_json,
[ decode/1
, decode/2
, encode/1
, safe_decode/1
, safe_decode/2
, safe_encode/1
]).
-import(
emqx_json,
[
decode/1,
decode/2,
encode/1,
safe_decode/1,
safe_decode/2,
safe_encode/1
]
).
-include_lib("proper/include/proper.hrl").
@ -32,99 +35,133 @@
%%--------------------------------------------------------------------
prop_json_basic() ->
?FORALL(T, json_basic(),
begin
{ok, J} = safe_encode(T),
{ok, T} = safe_decode(J),
T = decode(encode(T)),
true
end).
?FORALL(
T,
json_basic(),
begin
{ok, J} = safe_encode(T),
{ok, T} = safe_decode(J),
T = decode(encode(T)),
true
end
).
prop_json_basic_atom() ->
?FORALL(T0, latin_atom(),
begin
T = atom_to_binary(T0, utf8),
{ok, J} = safe_encode(T0),
{ok, T} = safe_decode(J),
T = decode(encode(T0)),
true
end).
?FORALL(
T0,
latin_atom(),
begin
T = atom_to_binary(T0, utf8),
{ok, J} = safe_encode(T0),
{ok, T} = safe_decode(J),
T = decode(encode(T0)),
true
end
).
prop_object_proplist_to_proplist() ->
?FORALL(T, json_object(),
begin
{ok, J} = safe_encode(T),
{ok, T} = safe_decode(J),
T = decode(encode(T)),
true
end).
?FORALL(
T,
json_object(),
begin
{ok, J} = safe_encode(T),
{ok, T} = safe_decode(J),
T = decode(encode(T)),
true
end
).
prop_object_map_to_map() ->
?FORALL(T, json_object_map(),
begin
{ok, J} = safe_encode(T),
{ok, T} = safe_decode(J, [return_maps]),
T = decode(encode(T), [return_maps]),
true
end).
?FORALL(
T,
json_object_map(),
begin
{ok, J} = safe_encode(T),
{ok, T} = safe_decode(J, [return_maps]),
T = decode(encode(T), [return_maps]),
true
end
).
%% The duplicated key will be overridden
prop_object_proplist_to_map() ->
?FORALL(T0, json_object(),
begin
T = to_map(T0),
{ok, J} = safe_encode(T0),
{ok, T} = safe_decode(J, [return_maps]),
T = decode(encode(T0), [return_maps]),
true
end).
?FORALL(
T0,
json_object(),
begin
T = to_map(T0),
{ok, J} = safe_encode(T0),
{ok, T} = safe_decode(J, [return_maps]),
T = decode(encode(T0), [return_maps]),
true
end
).
prop_object_map_to_proplist() ->
?FORALL(T0, json_object_map(),
begin
%% jiffy encode a map with descending order, that is,
%% it is opposite with maps traversal sequence
%% see: the `to_list` implementation
T = to_list(T0),
{ok, J} = safe_encode(T0),
{ok, T} = safe_decode(J),
T = decode(encode(T0)),
true
end).
?FORALL(
T0,
json_object_map(),
begin
%% jiffy encode a map with descending order, that is,
%% it is opposite with maps traversal sequence
%% see: the `to_list` implementation
T = to_list(T0),
{ok, J} = safe_encode(T0),
{ok, T} = safe_decode(J),
T = decode(encode(T0)),
true
end
).
prop_safe_encode() ->
?FORALL(T, invalid_json_term(),
begin
{error, _} = safe_encode(T), true
end).
?FORALL(
T,
invalid_json_term(),
begin
{error, _} = safe_encode(T),
true
end
).
prop_safe_decode() ->
?FORALL(T, invalid_json_str(),
begin
{error, _} = safe_decode(T), true
end).
?FORALL(
T,
invalid_json_str(),
begin
{error, _} = safe_decode(T),
true
end
).
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
to_map([{_, _}|_] = L) ->
to_map([{_, _} | _] = L) ->
lists:foldl(
fun({Name, Value}, Acc) ->
Acc#{Name => to_map(Value)}
end, #{}, L);
fun({Name, Value}, Acc) ->
Acc#{Name => to_map(Value)}
end,
#{},
L
);
to_map(L) when is_list(L) ->
[to_map(E) || E <- L];
to_map(T) -> T.
to_map(T) ->
T.
to_list(L) when is_list(L) ->
[to_list(E) || E <- L];
to_list(M) when is_map(M) ->
maps:fold(
fun(K, V, Acc) ->
[{K, to_list(V)}|Acc]
end, [], M);
to_list(T) -> T.
fun(K, V, Acc) ->
[{K, to_list(V)} | Acc]
end,
[],
M
);
to_list(T) ->
T.
%%--------------------------------------------------------------------
%% Generators (https://tools.ietf.org/html/rfc8259)
@ -140,8 +177,14 @@ latin_atom() ->
json_string() -> utf8().
json_object() ->
oneof([json_array_1(), json_object_1(), json_array_object_1(),
json_array_2(), json_object_2(), json_array_object_2()]).
oneof([
json_array_1(),
json_object_1(),
json_array_object_1(),
json_array_2(),
json_object_2(),
json_array_object_2()
]).
json_object_map() ->
?LET(L, json_object(), to_map(L)).
@ -156,9 +199,14 @@ json_object_1() ->
list({json_key(), json_basic()}).
json_object_2() ->
list({json_key(), oneof([json_basic(),
json_array_1(),
json_object_1()])}).
list({
json_key(),
oneof([
json_basic(),
json_array_1(),
json_object_1()
])
}).
json_array_object_1() ->
list(json_object_1()).
@ -186,5 +234,4 @@ chaos(S, 0, _) ->
chaos(S, N, T) ->
I = rand:uniform(length(S)),
{L1, L2} = lists:split(I, S),
chaos(lists:flatten([L1, lists:nth(rand:uniform(length(T)), T), L2]), N-1, T).
chaos(lists:flatten([L1, lists:nth(rand:uniform(length(T)), T), L2]), N - 1, T).

View File

@ -14,31 +14,36 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(prop_emqx_psk).
-include_lib("proper/include/proper.hrl").
-define(ALL(Vars, Types, Exprs),
?SETUP(fun() ->
?SETUP(
fun() ->
State = do_setup(),
fun() -> do_teardown(State) end
end, ?FORALL(Vars, Types, Exprs))).
end,
?FORALL(Vars, Types, Exprs)
)
).
%%--------------------------------------------------------------------
%% Properties
%%--------------------------------------------------------------------
prop_lookup() ->
?ALL({ClientPSKID, UserState},
{client_pskid(), user_state()},
begin
case emqx_tls_psk:lookup(psk, ClientPSKID, UserState) of
{ok, _Result} -> true;
error -> true;
_Other -> false
end
end).
?ALL(
{ClientPSKID, UserState},
{client_pskid(), user_state()},
begin
case emqx_tls_psk:lookup(psk, ClientPSKID, UserState) of
{ok, _Result} -> true;
error -> true;
_Other -> false
end
end
).
%%--------------------------------------------------------------------
%% Helper
@ -47,10 +52,13 @@ prop_lookup() ->
do_setup() ->
ok = emqx_logger:set_log_level(emergency),
ok = meck:new(emqx_hooks, [passthrough, no_history]),
ok = meck:expect(emqx_hooks, run_fold,
fun('tls_handshake.psk_lookup', [ClientPSKID], not_found) ->
unicode:characters_to_binary(ClientPSKID)
end).
ok = meck:expect(
emqx_hooks,
run_fold,
fun('tls_handshake.psk_lookup', [ClientPSKID], not_found) ->
unicode:characters_to_binary(ClientPSKID)
end
).
do_teardown(_) ->
ok = emqx_logger:set_log_level(error),
@ -63,4 +71,3 @@ do_teardown(_) ->
client_pskid() -> oneof([string(), integer(), [1, [-1]]]).
user_state() -> term().

View File

@ -24,20 +24,29 @@
%%--------------------------------------------------------------------
prop_name_text() ->
?FORALL(UnionArgs, union_args(),
is_atom(apply_fun(name, UnionArgs)) andalso
is_binary(apply_fun(text, UnionArgs))).
?FORALL(
UnionArgs,
union_args(),
is_atom(apply_fun(name, UnionArgs)) andalso
is_binary(apply_fun(text, UnionArgs))
).
prop_compat() ->
?FORALL(CompatArgs, compat_args(),
begin
Result = apply_fun(compat, CompatArgs),
is_number(Result) orelse Result =:= undefined
end).
?FORALL(
CompatArgs,
compat_args(),
begin
Result = apply_fun(compat, CompatArgs),
is_number(Result) orelse Result =:= undefined
end
).
prop_connack_error() ->
?FORALL(CONNACK_ERROR_ARGS, connack_error_args(),
is_integer(apply_fun(connack_error, CONNACK_ERROR_ARGS))).
?FORALL(
CONNACK_ERROR_ARGS,
connack_error_args(),
is_integer(apply_fun(connack_error, CONNACK_ERROR_ARGS))
).
%%--------------------------------------------------------------------
%% Helper
@ -51,45 +60,60 @@ apply_fun(Fun, Args) ->
%%--------------------------------------------------------------------
union_args() ->
frequency([{6, [real_mqttv3_rc(), mqttv3_version()]},
{43, [real_mqttv5_rc(), mqttv5_version()]}]).
frequency([
{6, [real_mqttv3_rc(), mqttv3_version()]},
{43, [real_mqttv5_rc(), mqttv5_version()]}
]).
compat_args() ->
frequency([{18, [connack, compat_rc()]},
{2, [suback, compat_rc()]},
{1, [unsuback, compat_rc()]}]).
frequency([
{18, [connack, compat_rc()]},
{2, [suback, compat_rc()]},
{1, [unsuback, compat_rc()]}
]).
connack_error_args() ->
[frequency([{10, connack_error()},
{1, unexpected_connack_error()}])].
[
frequency([
{10, connack_error()},
{1, unexpected_connack_error()}
])
].
connack_error() ->
oneof([client_identifier_not_valid,
bad_username_or_password,
bad_clientid_or_password,
username_or_password_undefined,
password_error,
not_authorized,
server_unavailable,
server_busy,
banned,
bad_authentication_method]).
oneof([
client_identifier_not_valid,
bad_username_or_password,
bad_clientid_or_password,
username_or_password_undefined,
password_error,
not_authorized,
server_unavailable,
server_busy,
banned,
bad_authentication_method
]).
unexpected_connack_error() ->
oneof([who_knows]).
real_mqttv3_rc() ->
frequency([{6, mqttv3_rc()},
{1, unexpected_rc()}]).
frequency([
{6, mqttv3_rc()},
{1, unexpected_rc()}
]).
real_mqttv5_rc() ->
frequency([{43, mqttv5_rc()},
{2, unexpected_rc()}]).
frequency([
{43, mqttv5_rc()},
{2, unexpected_rc()}
]).
compat_rc() ->
frequency([{95, ?SUCHTHAT(RC , mqttv5_rc(), RC >= 16#80 orelse RC =< 2)},
{5, unexpected_rc()}]).
frequency([
{95, ?SUCHTHAT(RC, mqttv5_rc(), RC >= 16#80 orelse RC =< 2)},
{5, unexpected_rc()}
]).
mqttv3_rc() ->
oneof(mqttv3_rcs()).
@ -104,12 +128,51 @@ mqttv3_rcs() ->
[0, 1, 2, 3, 4, 5].
mqttv5_rcs() ->
[16#00, 16#01, 16#02, 16#04, 16#10, 16#11, 16#18, 16#19,
16#80, 16#81, 16#82, 16#83, 16#84, 16#85, 16#86, 16#87,
16#88, 16#89, 16#8A, 16#8B, 16#8C, 16#8D, 16#8E, 16#8F,
16#90, 16#91, 16#92, 16#93, 16#94, 16#95, 16#96, 16#97,
16#98, 16#99, 16#9A, 16#9B, 16#9C, 16#9D, 16#9E, 16#9F,
16#A0, 16#A1, 16#A2].
[
16#00,
16#01,
16#02,
16#04,
16#10,
16#11,
16#18,
16#19,
16#80,
16#81,
16#82,
16#83,
16#84,
16#85,
16#86,
16#87,
16#88,
16#89,
16#8A,
16#8B,
16#8C,
16#8D,
16#8E,
16#8F,
16#90,
16#91,
16#92,
16#93,
16#94,
16#95,
16#96,
16#97,
16#98,
16#99,
16#9A,
16#9B,
16#9C,
16#9D,
16#9E,
16#9F,
16#A0,
16#A1,
16#A2
].
unexpected_rcs() ->
ReasonCodes = mqttv3_rcs() ++ mqttv5_rcs(),

View File

@ -22,64 +22,84 @@
-define(NODENAME, 'test@127.0.0.1').
-define(ALL(Vars, Types, Exprs),
?SETUP(fun() ->
?SETUP(
fun() ->
State = do_setup(),
fun() -> do_teardown(State) end
end, ?FORALL(Vars, Types, Exprs))).
end,
?FORALL(Vars, Types, Exprs)
)
).
%%--------------------------------------------------------------------
%% Properties
%%--------------------------------------------------------------------
prop_node() ->
?ALL(Node0, nodename(),
begin
Node = punch(Node0),
?assert(emqx_rpc:cast(Node, erlang, system_time, [])),
case emqx_rpc:call(Node, erlang, system_time, []) of
{badrpc, _Reason} -> true;
Delivery when is_integer(Delivery) -> true;
_Other -> false
end
end).
?ALL(
Node0,
nodename(),
begin
Node = punch(Node0),
?assert(emqx_rpc:cast(Node, erlang, system_time, [])),
case emqx_rpc:call(Node, erlang, system_time, []) of
{badrpc, _Reason} -> true;
Delivery when is_integer(Delivery) -> true;
_Other -> false
end
end
).
prop_node_with_key() ->
?ALL({Node0, Key}, nodename_with_key(),
begin
Node = punch(Node0),
?assert(emqx_rpc:cast(Key, Node, erlang, system_time, [])),
case emqx_rpc:call(Key, Node, erlang, system_time, []) of
{badrpc, _Reason} -> true;
Delivery when is_integer(Delivery) -> true;
_Other -> false
end
end).
?ALL(
{Node0, Key},
nodename_with_key(),
begin
Node = punch(Node0),
?assert(emqx_rpc:cast(Key, Node, erlang, system_time, [])),
case emqx_rpc:call(Key, Node, erlang, system_time, []) of
{badrpc, _Reason} -> true;
Delivery when is_integer(Delivery) -> true;
_Other -> false
end
end
).
prop_nodes() ->
?ALL(Nodes0, nodesname(),
begin
Nodes = punch(Nodes0),
case emqx_rpc:multicall(Nodes, erlang, system_time, []) of
{RealResults, RealBadNodes}
when is_list(RealResults);
is_list(RealBadNodes) ->
true;
_Other -> false
end
end).
?ALL(
Nodes0,
nodesname(),
begin
Nodes = punch(Nodes0),
case emqx_rpc:multicall(Nodes, erlang, system_time, []) of
{RealResults, RealBadNodes} when
is_list(RealResults);
is_list(RealBadNodes)
->
true;
_Other ->
false
end
end
).
prop_nodes_with_key() ->
?ALL({Nodes0, Key}, nodesname_with_key(),
begin
Nodes = punch(Nodes0),
case emqx_rpc:multicall(Key, Nodes, erlang, system_time, []) of
{RealResults, RealBadNodes}
when is_list(RealResults);
is_list(RealBadNodes) ->
true;
_Other -> false
end
end).
?ALL(
{Nodes0, Key},
nodesname_with_key(),
begin
Nodes = punch(Nodes0),
case emqx_rpc:multicall(Key, Nodes, erlang, system_time, []) of
{RealResults, RealBadNodes} when
is_list(RealResults);
is_list(RealBadNodes)
->
true;
_Other ->
false
end
end
).
%%--------------------------------------------------------------------
%% Helper
@ -91,10 +111,13 @@ do_setup() ->
{ok, _Apps} = application:ensure_all_started(gen_rpc),
ok = application:set_env(gen_rpc, call_receive_timeout, 100),
ok = meck:new(gen_rpc, [passthrough, no_history]),
ok = meck:expect(gen_rpc, multicall,
fun(Nodes, Mod, Fun, Args) ->
gen_rpc:multicall(Nodes, Mod, Fun, Args, 100)
end).
ok = meck:expect(
gen_rpc,
multicall,
fun(Nodes, Mod, Fun, Args) ->
gen_rpc:multicall(Nodes, Mod, Fun, Args, 100)
end
).
do_teardown(_) ->
ok = net_kernel:stop(),
@ -121,22 +144,25 @@ ensure_distributed_nodename() ->
%% Generator
%%--------------------------------------------------------------------
nodename() ->
?LET({NodePrefix, HostName},
{node_prefix(), hostname()},
begin
Node = NodePrefix ++ "@" ++ HostName,
list_to_atom(Node)
end).
?LET(
{NodePrefix, HostName},
{node_prefix(), hostname()},
begin
Node = NodePrefix ++ "@" ++ HostName,
list_to_atom(Node)
end
).
nodename_with_key() ->
?LET({NodePrefix, HostName, Key},
{node_prefix(), hostname(), choose(0, 10)},
begin
Node = NodePrefix ++ "@" ++ HostName,
{list_to_atom(Node), Key}
end).
?LET(
{NodePrefix, HostName, Key},
{node_prefix(), hostname(), choose(0, 10)},
begin
Node = NodePrefix ++ "@" ++ HostName,
{list_to_atom(Node), Key}
end
).
nodesname() ->
oneof([list(nodename()), [node()]]).
@ -163,6 +189,7 @@ hostname() ->
punch(Nodes) when is_list(Nodes) ->
lists:map(fun punch/1, Nodes);
punch('nonode@nohost') ->
node(); %% Equal to ?NODENAME
%% Equal to ?NODENAME
node();
punch(GoodBoy) ->
GoodBoy.

View File

@ -18,41 +18,53 @@
-include_lib("proper/include/proper.hrl").
-export([ initial_state/0
, command/1
, precondition/2
, postcondition/3
, next_state/3
]).
-export([
initial_state/0,
command/1,
precondition/2,
postcondition/3,
next_state/3
]).
-define(mock_modules,
[ emqx_metrics
, emqx_stats
, emqx_broker
, mria_mnesia
, emqx_hooks
]).
-define(mock_modules, [
emqx_metrics,
emqx_stats,
emqx_broker,
mria_mnesia,
emqx_hooks
]).
-define(ALL(Vars, Types, Exprs),
?SETUP(fun() ->
?SETUP(
fun() ->
State = do_setup(),
fun() -> do_teardown(State) end
end, ?FORALL(Vars, Types, Exprs))).
end,
?FORALL(Vars, Types, Exprs)
)
).
%%--------------------------------------------------------------------
%% Properties
%%--------------------------------------------------------------------
prop_sys() ->
?ALL(Cmds, commands(?MODULE),
?ALL(
Cmds,
commands(?MODULE),
begin
{ok, _Pid} = emqx_sys:start_link(),
{History, State, Result} = run_commands(?MODULE, Cmds),
ok = emqx_sys:stop(),
?WHENFAIL(io:format("History: ~p\nState: ~p\nResult: ~p\n",
[History,State,Result]),
aggregate(command_names(Cmds), Result =:= ok))
end).
?WHENFAIL(
io:format(
"History: ~p\nState: ~p\nResult: ~p\n",
[History, State, Result]
),
aggregate(command_names(Cmds), Result =:= ok)
)
end
).
%%--------------------------------------------------------------------
%% Helpers
@ -62,9 +74,15 @@ do_setup() ->
ok = emqx_logger:set_log_level(emergency),
emqx_config:put([sys_topics, sys_msg_interval], 60000),
emqx_config:put([sys_topics, sys_heartbeat_interval], 30000),
emqx_config:put([sys_topics, sys_event_messages],
#{client_connected => true, client_disconnected => true,
client_subscribed => true, client_unsubscribed => true}),
emqx_config:put(
[sys_topics, sys_event_messages],
#{
client_connected => true,
client_disconnected => true,
client_subscribed => true,
client_unsubscribed => true
}
),
[mock(Mod) || Mod <- ?mock_modules],
ok.
@ -78,10 +96,16 @@ mock(Module) ->
do_mock(Module).
do_mock(emqx_broker) ->
meck:expect(emqx_broker, publish,
fun(Msg) -> {node(), <<"test">>, Msg} end),
meck:expect(emqx_broker, safe_publish,
fun(Msg) -> {node(), <<"test">>, Msg} end);
meck:expect(
emqx_broker,
publish,
fun(Msg) -> {node(), <<"test">>, Msg} end
),
meck:expect(
emqx_broker,
safe_publish,
fun(Msg) -> {node(), <<"test">>, Msg} end
);
do_mock(emqx_stats) ->
meck:expect(emqx_stats, getstats, fun() -> [0] end);
do_mock(mria_mnesia) ->
@ -102,16 +126,17 @@ initial_state() ->
%% @doc List of possible commands to run against the system
command(_State) ->
oneof([{call, emqx_sys, info, []},
{call, emqx_sys, version, []},
{call, emqx_sys, uptime, []},
{call, emqx_sys, datetime, []},
{call, emqx_sys, sysdescr, []},
%------------ unexpected message ----------------------%
{call, emqx_sys, handle_call, [emqx_sys, other, state]},
{call, emqx_sys, handle_cast, [emqx_sys, other]},
{call, emqx_sys, handle_info, [info, state]}
]).
oneof([
{call, emqx_sys, info, []},
{call, emqx_sys, version, []},
{call, emqx_sys, uptime, []},
{call, emqx_sys, datetime, []},
{call, emqx_sys, sysdescr, []},
%------------ unexpected message ----------------------%
{call, emqx_sys, handle_call, [emqx_sys, other, state]},
{call, emqx_sys, handle_cast, [emqx_sys, other]},
{call, emqx_sys, handle_info, [info, state]}
]).
precondition(_State, {call, _Mod, _Fun, _Args}) ->
true.