refactor(emqx_retainer): use hierarchical limiter for the flow control
This commit is contained in:
parent
c5d0c8da94
commit
5bf1d77993
|
@ -4,10 +4,7 @@
|
|||
|
||||
limiter {
|
||||
bytes_in {
|
||||
global.rate = infinity # token generation rate
|
||||
zone.default.rate = infinity
|
||||
bucket.default {
|
||||
zone = default
|
||||
aggregated.rate = infinity
|
||||
aggregated.capacity = infinity
|
||||
per_client.rate = infinity
|
||||
|
@ -16,10 +13,7 @@ limiter {
|
|||
}
|
||||
|
||||
message_in {
|
||||
global.rate = infinity
|
||||
zone.default.rate = infinity
|
||||
bucket.default {
|
||||
zone = default
|
||||
aggregated.rate = infinity
|
||||
aggregated.capacity = infinity
|
||||
per_client.rate = infinity
|
||||
|
@ -28,10 +22,7 @@ limiter {
|
|||
}
|
||||
|
||||
connection {
|
||||
global.rate = infinity
|
||||
zone.default.rate = infinity
|
||||
bucket.default {
|
||||
zone = default
|
||||
aggregated.rate = infinity
|
||||
aggregated.capacity = infinity
|
||||
per_client.rate = infinity
|
||||
|
@ -40,10 +31,16 @@ limiter {
|
|||
}
|
||||
|
||||
message_routing {
|
||||
global.rate = infinity
|
||||
zone.default.rate = infinity
|
||||
bucket.default {
|
||||
zone = default
|
||||
aggregated.rate = infinity
|
||||
aggregated.capacity = infinity
|
||||
per_client.rate = infinity
|
||||
per_client.capacity = infinity
|
||||
}
|
||||
}
|
||||
|
||||
shared {
|
||||
bucket.retainer {
|
||||
aggregated.rate = infinity
|
||||
aggregated.capacity = infinity
|
||||
per_client.rate = infinity
|
||||
|
|
|
@ -27,7 +27,8 @@
|
|||
-type limiter_type() :: bytes_in
|
||||
| message_in
|
||||
| connection
|
||||
| message_routing.
|
||||
| message_routing
|
||||
| shared.
|
||||
|
||||
-type bucket_name() :: atom().
|
||||
-type zone_name() :: atom().
|
||||
|
@ -66,22 +67,29 @@ fields(limiter) ->
|
|||
, {message_in, sc(ref(limiter_opts), #{})}
|
||||
, {connection, sc(ref(limiter_opts), #{})}
|
||||
, {message_routing, sc(ref(limiter_opts), #{})}
|
||||
, {shared, sc(ref(shared_limiter_opts),
|
||||
#{description => <<"some functions that do not need to use global and zone scope, them can shared use this type">>})}
|
||||
];
|
||||
|
||||
fields(limiter_opts) ->
|
||||
[ {global, sc(ref(rate_burst), #{})}
|
||||
, {zone, sc(map("zone name", ref(rate_burst)), #{})}
|
||||
[ {global, sc(ref(rate_burst), #{nuallabe => true})}
|
||||
, {zone, sc(map("zone name", ref(rate_burst)), #{nullable => true})}
|
||||
, {bucket, sc(map("bucket_id", ref(bucket)),
|
||||
#{desc => "Token bucket"})}
|
||||
];
|
||||
|
||||
fields(shared_limiter_opts) ->
|
||||
[{bucket, sc(map("bucket_id", ref(bucket)),
|
||||
#{desc => "Token bucket"})}
|
||||
];
|
||||
|
||||
fields(rate_burst) ->
|
||||
[ {rate, sc(rate(), #{})}
|
||||
, {burst, sc(burst_rate(), #{default => "0/0s"})}
|
||||
];
|
||||
|
||||
fields(bucket) ->
|
||||
[ {zone, sc(atom(), #{desc => "The bucket's zone"})}
|
||||
[ {zone, sc(atom(), #{desc => "The bucket's zone", default => default})}
|
||||
, {aggregated, sc(ref(bucket_aggregated), #{})}
|
||||
, {per_client, sc(ref(client_bucket), #{})}
|
||||
];
|
||||
|
|
|
@ -378,11 +378,11 @@ maybe_burst(#{buckets := Buckets,
|
|||
index := Index,
|
||||
zone := Zone} = maps:get(Id, Nodes),
|
||||
case counters:get(Counter, Index) of
|
||||
Any when Any =< 0 ->
|
||||
Group = maps:get(Zone, Groups, []),
|
||||
maps:put(Zone, [Id | Group], Groups);
|
||||
_ ->
|
||||
Groups
|
||||
Any when Any =< 0 ->
|
||||
Group = maps:get(Zone, Groups, []),
|
||||
maps:put(Zone, [Id | Group], Groups);
|
||||
_ ->
|
||||
Groups
|
||||
end
|
||||
end,
|
||||
|
||||
|
@ -451,9 +451,15 @@ dispatch_burst_to_buckets([], _, Alloced, Nodes) ->
|
|||
|
||||
-spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state().
|
||||
init_tree(Type, State) ->
|
||||
#{global := Global,
|
||||
zone := Zone,
|
||||
bucket := Bucket} = emqx:get_config([limiter, Type]),
|
||||
case emqx:get_config([limiter, Type]) of
|
||||
#{global := Global,
|
||||
zone := Zone,
|
||||
bucket := Bucket} -> ok;
|
||||
#{bucket := Bucket} ->
|
||||
Global = default_rate_burst_cfg(),
|
||||
Zone = #{default => default_rate_burst_cfg()},
|
||||
ok
|
||||
end,
|
||||
{Factor, Root} = make_root(Global, Zone),
|
||||
State2 = State#{root := Root},
|
||||
{NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2),
|
||||
|
@ -592,3 +598,6 @@ get_initial_val(#{initial := Initial,
|
|||
true ->
|
||||
0
|
||||
end.
|
||||
|
||||
default_rate_burst_cfg() ->
|
||||
#{rate => infinity, burst => 0}.
|
||||
|
|
|
@ -59,7 +59,7 @@ limiter {
|
|||
aggregated.capacity = infinity
|
||||
per_client.rate = infinity
|
||||
per_client.capacity = infinity
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
message_routing {
|
||||
|
@ -73,6 +73,15 @@ limiter {
|
|||
per_client.capacity = infinity
|
||||
}
|
||||
}
|
||||
|
||||
shared {
|
||||
bucket.retainer {
|
||||
aggregated.rate = infinity
|
||||
aggregated.capacity = infinity
|
||||
per_client.rate = infinity
|
||||
per_client.capacity = infinity
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
""">>).
|
||||
|
|
|
@ -45,20 +45,20 @@ retainer {
|
|||
## repeat this, until all retianed messages are delivered
|
||||
##
|
||||
flow_control {
|
||||
## The max messages number per read from storage. 0 means no limit
|
||||
## The messages number per read from storage. 0 means no limit
|
||||
##
|
||||
## Default: 0
|
||||
max_read_number = 0
|
||||
batch_read_number = 0
|
||||
|
||||
## The max number of retained message can be delivered in emqx per quota_release_interval.0 means no limit
|
||||
## The number of retained message can be delivered per batch
|
||||
##
|
||||
## Default: 0
|
||||
msg_deliver_quota = 0
|
||||
batch_deliver_number = 0
|
||||
|
||||
## deliver quota reset interval
|
||||
## deliver limiter bucket
|
||||
##
|
||||
## Default: 0s
|
||||
quota_release_interval = 0s
|
||||
limiter_bucket_name = retainer
|
||||
}
|
||||
|
||||
## Maximum retained message size.
|
||||
|
@ -66,11 +66,11 @@ retainer {
|
|||
## Value: Bytes
|
||||
max_payload_size = 1MB
|
||||
|
||||
## Storage connect parameters
|
||||
## Storage backend parameters
|
||||
##
|
||||
## Value: built_in_database
|
||||
##
|
||||
config {
|
||||
backend {
|
||||
|
||||
type = built_in_database
|
||||
|
||||
|
|
|
@ -27,10 +27,10 @@
|
|||
, on_message_publish/2
|
||||
]).
|
||||
|
||||
-export([ dispatch/4
|
||||
, delete_message/2
|
||||
-export([ delete_message/2
|
||||
, store_retained/2
|
||||
, deliver/5]).
|
||||
, get_backend_module/0
|
||||
]).
|
||||
|
||||
-export([ get_expiry_time/1
|
||||
, update_config/1
|
||||
|
@ -54,8 +54,6 @@
|
|||
, context_id := non_neg_integer()
|
||||
, context := undefined | context()
|
||||
, clear_timer := undefined | reference()
|
||||
, release_quota_timer := undefined | reference()
|
||||
, wait_quotas := list()
|
||||
}.
|
||||
|
||||
-define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)).
|
||||
|
@ -116,45 +114,6 @@ on_message_publish(Msg, _) ->
|
|||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
-spec dispatch(context(), pid(), topic(), cursor()) -> ok.
|
||||
dispatch(Context, Pid, Topic, Cursor) ->
|
||||
Mod = get_backend_module(),
|
||||
case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
|
||||
false ->
|
||||
{ok, Result} = Mod:read_message(Context, Topic),
|
||||
deliver(Result, Context, Pid, Topic, undefined);
|
||||
true ->
|
||||
{ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor),
|
||||
deliver(Result, Context, Pid, Topic, NewCursor)
|
||||
end.
|
||||
|
||||
deliver([], Context, Pid, Topic, Cursor) ->
|
||||
case Cursor of
|
||||
undefined ->
|
||||
ok;
|
||||
_ ->
|
||||
dispatch(Context, Pid, Topic, Cursor)
|
||||
end;
|
||||
deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) ->
|
||||
case erlang:is_process_alive(Pid) of
|
||||
false ->
|
||||
ok;
|
||||
_ ->
|
||||
#{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([retainer, flow_control]),
|
||||
case MaxDeliverNum of
|
||||
0 ->
|
||||
_ = [Pid ! {deliver, Topic, Msg} || Msg <- Result],
|
||||
ok;
|
||||
_ ->
|
||||
case do_deliver(Result, Id, Pid, Topic) of
|
||||
ok ->
|
||||
deliver([], Context, Pid, Topic, Cursor);
|
||||
abort ->
|
||||
ok
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) ->
|
||||
0;
|
||||
get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
|
||||
|
@ -198,7 +157,6 @@ stats_fun() ->
|
|||
|
||||
init([]) ->
|
||||
emqx_conf:add_handler([retainer], ?MODULE),
|
||||
init_shared_context(),
|
||||
State = new_state(),
|
||||
#{enable := Enable} = Cfg = emqx:get_config([retainer]),
|
||||
{ok,
|
||||
|
@ -213,9 +171,6 @@ handle_call({update_config, NewConf, OldConf}, _, State) ->
|
|||
State2 = update_config(State, NewConf, OldConf),
|
||||
{reply, ok, State2};
|
||||
|
||||
handle_call({wait_semaphore, Id}, From, #{wait_quotas := Waits} = State) ->
|
||||
{noreply, State#{wait_quotas := [{Id, From} | Waits]}};
|
||||
|
||||
handle_call(clean, _, #{context := Context} = State) ->
|
||||
clean(Context),
|
||||
{reply, ok, State};
|
||||
|
@ -249,30 +204,12 @@ handle_info(clear_expired, #{context := Context} = State) ->
|
|||
Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
|
||||
{noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate};
|
||||
|
||||
handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = State) ->
|
||||
insert_shared_context(?DELIVER_SEMAPHORE, get_msg_deliver_quota()),
|
||||
case Waits of
|
||||
[] ->
|
||||
ok;
|
||||
_ ->
|
||||
#{context_id := NowId} = Context,
|
||||
Waits2 = lists:reverse(Waits),
|
||||
lists:foreach(fun({Id, From}) ->
|
||||
gen_server:reply(From, Id =:= NowId)
|
||||
end,
|
||||
Waits2)
|
||||
end,
|
||||
Interval = emqx:get_config([retainer, flow_control, quota_release_interval]),
|
||||
{noreply, State#{release_quota_timer := add_timer(Interval, release_deliver_quota),
|
||||
wait_quotas := []}};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #{clear_timer := TRef1, release_quota_timer := TRef2}) ->
|
||||
_ = stop_timer(TRef1),
|
||||
_ = stop_timer(TRef2),
|
||||
terminate(_Reason, #{clear_timer := ClearTimer}) ->
|
||||
_ = stop_timer(ClearTimer),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
|
@ -286,22 +223,19 @@ new_state() ->
|
|||
#{enable => false,
|
||||
context_id => 0,
|
||||
context => undefined,
|
||||
clear_timer => undefined,
|
||||
release_quota_timer => undefined,
|
||||
wait_quotas => []}.
|
||||
clear_timer => undefined
|
||||
}.
|
||||
|
||||
-spec new_context(pos_integer()) -> context().
|
||||
new_context(Id) ->
|
||||
#{context_id => Id}.
|
||||
|
||||
|
||||
payload_size_limit() ->
|
||||
emqx_conf:get(?MAX_PAYLOAD_SIZE_CONFIG_PATH, ?DEF_MAX_PAYLOAD_SIZE).
|
||||
|
||||
%% @private
|
||||
dispatch(Context, Topic) ->
|
||||
emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/4,
|
||||
[Context, self(), Topic, undefined]).
|
||||
emqx_retainer_dispatcher:dispatch(Context, Topic).
|
||||
|
||||
-spec delete_message(context(), topic()) -> ok.
|
||||
delete_message(Context, Topic) ->
|
||||
|
@ -328,53 +262,6 @@ clean(Context) ->
|
|||
Mod = get_backend_module(),
|
||||
Mod:clean(Context).
|
||||
|
||||
-spec do_deliver(list(term()), pos_integer(), pid(), topic()) -> ok | abort.
|
||||
do_deliver([Msg | T], Id, Pid, Topic) ->
|
||||
case require_semaphore(?DELIVER_SEMAPHORE, Id) of
|
||||
true ->
|
||||
Pid ! {deliver, Topic, Msg},
|
||||
do_deliver(T, Id, Pid, Topic);
|
||||
_ ->
|
||||
abort
|
||||
end;
|
||||
do_deliver([], _, _, _) ->
|
||||
ok.
|
||||
|
||||
-spec require_semaphore(semaphore(), pos_integer()) -> boolean().
|
||||
require_semaphore(Semaphore, Id) ->
|
||||
Remained = ets:update_counter(?SHARED_CONTEXT_TAB,
|
||||
Semaphore,
|
||||
{#shared_context.value, -1, -1, -1}),
|
||||
wait_semaphore(Remained, Id).
|
||||
|
||||
-spec wait_semaphore(non_neg_integer(), pos_integer()) -> boolean().
|
||||
wait_semaphore(X, Id) when X < 0 ->
|
||||
call({?FUNCTION_NAME, Id});
|
||||
wait_semaphore(_, _) ->
|
||||
true.
|
||||
|
||||
-spec init_shared_context() -> ok.
|
||||
init_shared_context() ->
|
||||
?SHARED_CONTEXT_TAB = ets:new(?SHARED_CONTEXT_TAB,
|
||||
[ set, named_table, public
|
||||
, {keypos, #shared_context.key}
|
||||
, {write_concurrency, true}
|
||||
, {read_concurrency, true}]),
|
||||
lists:foreach(fun({K, V}) ->
|
||||
insert_shared_context(K, V)
|
||||
end,
|
||||
[{?DELIVER_SEMAPHORE, get_msg_deliver_quota()}]).
|
||||
|
||||
|
||||
-spec insert_shared_context(shared_context_key(), term()) -> ok.
|
||||
insert_shared_context(Key, Term) ->
|
||||
ets:insert(?SHARED_CONTEXT_TAB, #shared_context{key = Key, value = Term}),
|
||||
ok.
|
||||
|
||||
-spec get_msg_deliver_quota() -> non_neg_integer().
|
||||
get_msg_deliver_quota() ->
|
||||
emqx:get_config([retainer, flow_control, msg_deliver_quota]).
|
||||
|
||||
-spec update_config(state(), hocons:config(), hocons:config()) -> state().
|
||||
update_config(State, Conf, OldConf) ->
|
||||
update_config(maps:get(enable, Conf),
|
||||
|
@ -391,24 +278,19 @@ update_config(true, false, State, NewConf, _) ->
|
|||
enable_retainer(State, NewConf);
|
||||
|
||||
update_config(true, true,
|
||||
#{clear_timer := ClearTimer,
|
||||
release_quota_timer := QuotaTimer} = State, NewConf, OldConf) ->
|
||||
#{config := Cfg,
|
||||
flow_control := #{quota_release_interval := QuotaInterval},
|
||||
#{clear_timer := ClearTimer} = State, NewConf, OldConf) ->
|
||||
#{backend := BackendCfg,
|
||||
msg_clear_interval := ClearInterval} = NewConf,
|
||||
|
||||
#{config := OldCfg} = OldConf,
|
||||
#{backend := OldBackendCfg} = OldConf,
|
||||
|
||||
StorageType = maps:get(type, Cfg),
|
||||
OldStrorageType = maps:get(type, OldCfg),
|
||||
StorageType = maps:get(type, BackendCfg),
|
||||
OldStrorageType = maps:get(type, OldBackendCfg),
|
||||
case OldStrorageType of
|
||||
StorageType ->
|
||||
State#{clear_timer := check_timer(ClearTimer,
|
||||
ClearInterval,
|
||||
clear_expired),
|
||||
release_quota_timer := check_timer(QuotaTimer,
|
||||
QuotaInterval,
|
||||
release_deliver_quota)};
|
||||
clear_expired)};
|
||||
_ ->
|
||||
State2 = disable_retainer(State),
|
||||
enable_retainer(State2, NewConf)
|
||||
|
@ -417,29 +299,23 @@ update_config(true, true,
|
|||
-spec enable_retainer(state(), hocon:config()) -> state().
|
||||
enable_retainer(#{context_id := ContextId} = State,
|
||||
#{msg_clear_interval := ClearInterval,
|
||||
flow_control := #{quota_release_interval := ReleaseInterval},
|
||||
config := Config}) ->
|
||||
backend := BackendCfg}) ->
|
||||
NewContextId = ContextId + 1,
|
||||
Context = create_resource(new_context(NewContextId), Config),
|
||||
Context = create_resource(new_context(NewContextId), BackendCfg),
|
||||
load(Context),
|
||||
State#{enable := true,
|
||||
context_id := NewContextId,
|
||||
context := Context,
|
||||
clear_timer := add_timer(ClearInterval, clear_expired),
|
||||
release_quota_timer := add_timer(ReleaseInterval, release_deliver_quota)}.
|
||||
clear_timer := add_timer(ClearInterval, clear_expired)}.
|
||||
|
||||
-spec disable_retainer(state()) -> state().
|
||||
disable_retainer(#{clear_timer := TRef1,
|
||||
release_quota_timer := TRef2,
|
||||
context := Context,
|
||||
wait_quotas := Waits} = State) ->
|
||||
disable_retainer(#{clear_timer := ClearTimer,
|
||||
context := Context} = State) ->
|
||||
unload(),
|
||||
ok = lists:foreach(fun(E) -> gen_server:reply(E, false) end, Waits),
|
||||
ok = close_resource(Context),
|
||||
State#{enable := false,
|
||||
clear_timer := stop_timer(TRef1),
|
||||
release_quota_timer := stop_timer(TRef2),
|
||||
wait_quotas := []}.
|
||||
clear_timer := stop_timer(ClearTimer)
|
||||
}.
|
||||
|
||||
-spec stop_timer(undefined | reference()) -> undefined.
|
||||
stop_timer(undefined) ->
|
||||
|
@ -466,7 +342,7 @@ check_timer(Timer, _, _) ->
|
|||
|
||||
-spec get_backend_module() -> backend().
|
||||
get_backend_module() ->
|
||||
ModName = case emqx:get_config([retainer, config]) of
|
||||
ModName = case emqx:get_config([retainer, backend]) of
|
||||
#{type := built_in_database} -> mnesia;
|
||||
#{type := Backend} -> Backend
|
||||
end,
|
||||
|
|
|
@ -14,27 +14,41 @@
|
|||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_retainer_pool).
|
||||
-module(emqx_retainer_dispatcher).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx_retainer.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
%% API
|
||||
-export([start_link/2,
|
||||
async_submit/2]).
|
||||
-export([start_link/2
|
||||
, dispatch/2
|
||||
, refresh_limiter/0
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3, format_status/2]).
|
||||
|
||||
-type limiter() :: emqx_htb_limiter:limiter().
|
||||
|
||||
-define(POOL, ?MODULE).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
async_submit(Fun, Args) ->
|
||||
cast({async_submit, {Fun, Args}}).
|
||||
dispatch(Context, Topic) ->
|
||||
cast({?FUNCTION_NAME, Context, self(), Topic}).
|
||||
|
||||
%% sometimes it is necessary to reset the client's limiter after updated the limiter's config
|
||||
%% an limiter update handler maybe added later, now this is a workaround
|
||||
refresh_limiter() ->
|
||||
Workers = gproc_pool:active_workers(?POOL),
|
||||
lists:foreach(fun({_, Pid}) ->
|
||||
gen_server:cast(Pid, ?FUNCTION_NAME)
|
||||
end,
|
||||
Workers).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -66,7 +80,9 @@ start_link(Pool, Id) ->
|
|||
ignore.
|
||||
init([Pool, Id]) ->
|
||||
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
||||
{ok, #{pool => Pool, id => Id}}.
|
||||
Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]),
|
||||
Limiter = emqx_limiter_server:connect(shared, Bucket),
|
||||
{ok, #{pool => Pool, id => Id, limiter => Limiter}}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @private
|
||||
|
@ -98,12 +114,14 @@ handle_call(Req, _From, State) ->
|
|||
{noreply, NewState :: term(), Timeout :: timeout()} |
|
||||
{noreply, NewState :: term(), hibernate} |
|
||||
{stop, Reason :: term(), NewState :: term()}.
|
||||
handle_cast({async_submit, Task}, State) ->
|
||||
try run(Task)
|
||||
catch _:Error:Stacktrace ->
|
||||
?SLOG(error, #{msg => "crashed_handling_async_task", exception => Error, stacktrace => Stacktrace})
|
||||
end,
|
||||
{noreply, State};
|
||||
handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
|
||||
{ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter),
|
||||
{noreply, State#{limiter := Limiter2}};
|
||||
|
||||
handle_cast(refresh_limiter, State) ->
|
||||
Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]),
|
||||
Limiter = emqx_limiter_server:connect(shared, Bucket),
|
||||
{noreply, State#{limiter := Limiter}};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||
|
@ -174,9 +192,74 @@ cast(Msg) ->
|
|||
worker() ->
|
||||
gproc_pool:pick_worker(?POOL, self()).
|
||||
|
||||
run({M, F, A}) ->
|
||||
erlang:apply(M, F, A);
|
||||
run({F, A}) when is_function(F), is_list(A) ->
|
||||
erlang:apply(F, A);
|
||||
run(Fun) when is_function(Fun) ->
|
||||
Fun().
|
||||
-spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
|
||||
dispatch(Context, Pid, Topic, Cursor, Limiter) ->
|
||||
Mod = emqx_retainer:get_backend_module(),
|
||||
case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
|
||||
false ->
|
||||
{ok, Result} = Mod:read_message(Context, Topic),
|
||||
deliver(Result, Context, Pid, Topic, undefined, Limiter);
|
||||
true ->
|
||||
{ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor),
|
||||
deliver(Result, Context, Pid, Topic, NewCursor, Limiter)
|
||||
end.
|
||||
|
||||
-spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
|
||||
deliver([], _Context, _Pid, _Topic, undefined, Limiter) ->
|
||||
{ok, Limiter};
|
||||
|
||||
deliver([], Context, Pid, Topic, Cursor, Limiter) ->
|
||||
dispatch(Context, Pid, Topic, Cursor, Limiter);
|
||||
|
||||
deliver(Result, Context, Pid, Topic, Cursor, Limiter) ->
|
||||
case erlang:is_process_alive(Pid) of
|
||||
false ->
|
||||
{ok, Limiter};
|
||||
_ ->
|
||||
DeliverNum = emqx:get_config([retainer, flow_control, batch_deliver_number]),
|
||||
case DeliverNum of
|
||||
0 ->
|
||||
do_deliver(Result, Pid, Topic),
|
||||
{ok, Limiter};
|
||||
_ ->
|
||||
case do_deliver(Result, DeliverNum, Pid, Topic, Limiter) of
|
||||
{ok, Limiter2} ->
|
||||
deliver([], Context, Pid, Topic, Cursor, Limiter2);
|
||||
{drop, Limiter2} ->
|
||||
{ok, Limiter2}
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) ->
|
||||
{ok, Limiter};
|
||||
|
||||
do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
|
||||
{Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs),
|
||||
case emqx_htb_limiter:consume(Num, Limiter) of
|
||||
{ok, Limiter2} ->
|
||||
do_deliver(ToDelivers, Pid, Topic),
|
||||
do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2);
|
||||
{drop, _} = Drop ->
|
||||
?SLOG(error, #{msg => "the retainer deliver failed because the required quota could not be obtained"}),
|
||||
Drop
|
||||
end.
|
||||
|
||||
do_deliver([Msg | T], Pid, Topic) ->
|
||||
Pid ! {deliver, Topic, Msg},
|
||||
do_deliver(T, Pid, Topic);
|
||||
|
||||
do_deliver([], _, _) ->
|
||||
ok.
|
||||
|
||||
safe_split(N, List) ->
|
||||
safe_split(N, List, 0, []).
|
||||
|
||||
safe_split(0, List, Count, Acc) ->
|
||||
{Count, lists:reverse(Acc), List};
|
||||
|
||||
safe_split(_N, [], Count, Acc) ->
|
||||
{Count, lists:reverse(Acc), []};
|
||||
|
||||
safe_split(N, [H | T], Count, Acc) ->
|
||||
safe_split(N - 1, T, Count + 1, [H | Acc]).
|
|
@ -144,17 +144,17 @@ page_read(_, Topic, Page, Limit) ->
|
|||
{ok, Rows}.
|
||||
|
||||
match_messages(_, Topic, Cursor) ->
|
||||
MaxReadNum = emqx:get_config([retainer, flow_control, max_read_number]),
|
||||
BatchReadNum = emqx:get_config([retainer, flow_control, batch_read_number]),
|
||||
case Cursor of
|
||||
undefined ->
|
||||
case MaxReadNum of
|
||||
case BatchReadNum of
|
||||
0 ->
|
||||
{ok, sort_retained(match_messages(Topic)), undefined};
|
||||
_ ->
|
||||
start_batch_read(Topic, MaxReadNum)
|
||||
start_batch_read(Topic, BatchReadNum)
|
||||
end;
|
||||
_ ->
|
||||
batch_read_messages(Cursor, MaxReadNum)
|
||||
batch_read_messages(Cursor, BatchReadNum)
|
||||
end.
|
||||
|
||||
clean(_) ->
|
||||
|
@ -253,7 +253,7 @@ make_cursor(Topic) ->
|
|||
|
||||
-spec is_table_full() -> boolean().
|
||||
is_table_full() ->
|
||||
#{max_retained_messages := Limit} = emqx:get_config([retainer, config]),
|
||||
Limit = emqx:get_config([retainer, backend, max_retained_messages]),
|
||||
Limit > 0 andalso (table_size() >= Limit).
|
||||
|
||||
-spec table_size() -> non_neg_integer().
|
||||
|
|
|
@ -17,7 +17,7 @@ fields("retainer") ->
|
|||
, {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))}
|
||||
, {max_payload_size, sc(emqx_schema:bytesize(), "1MB")}
|
||||
, {stop_publish_clear_msg, sc(boolean(), false)}
|
||||
, {config, config()}
|
||||
, {backend, backend_config()}
|
||||
];
|
||||
|
||||
fields(mnesia_config) ->
|
||||
|
@ -27,9 +27,9 @@ fields(mnesia_config) ->
|
|||
];
|
||||
|
||||
fields(flow_control) ->
|
||||
[ {max_read_number, sc(integer(), 0, fun is_pos_integer/1)}
|
||||
, {msg_deliver_quota, sc(integer(), 0, fun is_pos_integer/1)}
|
||||
, {quota_release_interval, sc(emqx_schema:duration_ms(), "0ms")}
|
||||
[ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)}
|
||||
, {batch_deliver_number, sc(range(0, 50), 0)}
|
||||
, {limiter_bucket_name, sc(atom(), retainer)}
|
||||
].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -45,5 +45,5 @@ sc(Type, Default, Validator) ->
|
|||
is_pos_integer(V) ->
|
||||
V >= 0.
|
||||
|
||||
config() ->
|
||||
backend_config() ->
|
||||
#{type => hoconsc:union([hoconsc:ref(?MODULE, mnesia_config)])}.
|
||||
|
|
|
@ -26,8 +26,8 @@ start_link() ->
|
|||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
PoolSpec = emqx_pool_sup:spec([emqx_retainer_pool, hash, emqx_vm:schedulers(),
|
||||
{emqx_retainer_pool, start_link, []}]),
|
||||
PoolSpec = emqx_pool_sup:spec([emqx_retainer_dispatcher, hash, emqx_vm:schedulers(),
|
||||
{emqx_retainer_dispatcher, start_link, []}]),
|
||||
{ok, {{one_for_one, 10, 3600},
|
||||
[#{id => retainer,
|
||||
start => {emqx_retainer, start_link, []},
|
||||
|
|
|
@ -34,16 +34,16 @@ retainer {
|
|||
msg_expiry_interval = 0s
|
||||
max_payload_size = 1MB
|
||||
flow_control {
|
||||
max_read_number = 0
|
||||
msg_deliver_quota = 0
|
||||
quota_release_interval = 0s
|
||||
}
|
||||
config {
|
||||
batch_read_number = 0
|
||||
batch_deliver_number = 0
|
||||
limiter_bucket_name = retainer
|
||||
}
|
||||
backend {
|
||||
type = built_in_database
|
||||
storage_type = ram
|
||||
max_retained_messages = 0
|
||||
}
|
||||
}""">>).
|
||||
|
||||
}""">>).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Setups
|
||||
|
@ -57,7 +57,8 @@ init_per_suite(Config) ->
|
|||
meck:expect(emqx_alarm, activate, 3, ok),
|
||||
meck:expect(emqx_alarm, deactivate, 3, ok),
|
||||
|
||||
ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF),
|
||||
base_conf(),
|
||||
emqx_ratelimiter_SUITE:base_conf(),
|
||||
emqx_common_test_helpers:start_apps([emqx_retainer]),
|
||||
Config.
|
||||
|
||||
|
@ -83,6 +84,9 @@ end_per_testcase(_, Config) ->
|
|||
end,
|
||||
Config.
|
||||
|
||||
base_conf() ->
|
||||
ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Test Cases
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -282,10 +286,20 @@ t_stop_publish_clear_msg(_) ->
|
|||
ok = emqtt:disconnect(C1).
|
||||
|
||||
t_flow_control(_) ->
|
||||
#{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, shared, bucket, retainer]),
|
||||
RetainerCfg2 = RetainerCfg#{per_client := PerClient#{rate := emqx_ratelimiter_SUITE:to_rate("1/1s"),
|
||||
capacity := 1}},
|
||||
emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg2),
|
||||
emqx_limiter_manager:restart_server(shared),
|
||||
timer:sleep(500),
|
||||
|
||||
emqx_retainer_dispatcher:refresh_limiter(),
|
||||
timer:sleep(500),
|
||||
|
||||
emqx_retainer:update_config(#{<<"flow_control">> =>
|
||||
#{<<"max_read_number">> => 1,
|
||||
<<"msg_deliver_quota">> => 1,
|
||||
<<"quota_release_interval">> => <<"1s">>}}),
|
||||
#{<<"batch_read_number">> => 1,
|
||||
<<"batch_deliver_number">> => 1,
|
||||
<<"limiter_bucket_name">> => retainer}}),
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
emqtt:publish(
|
||||
|
@ -309,11 +323,19 @@ t_flow_control(_) ->
|
|||
End = erlang:system_time(millisecond),
|
||||
Diff = End - Begin,
|
||||
|
||||
%% msg_deliver_quota = 1 and quota_release_interval = 1, and there has three message
|
||||
%% so total wait time is between in 1 ~ 2s(may be timer will delay, so plus 0.5s to maximum)
|
||||
?assert(Diff > timer:seconds(1) andalso Diff < timer:seconds(2.5)),
|
||||
?assert(Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9),
|
||||
lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))),
|
||||
|
||||
ok = emqtt:disconnect(C1).
|
||||
ok = emqtt:disconnect(C1),
|
||||
|
||||
%% recover the limiter
|
||||
emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg),
|
||||
emqx_limiter_manager:restart_server(shared),
|
||||
timer:sleep(500),
|
||||
|
||||
emqx_retainer_dispatcher:refresh_limiter(),
|
||||
timer:sleep(500),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
|
|
|
@ -21,28 +21,10 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(BASE_CONF, <<"""
|
||||
retainer {
|
||||
enable = true
|
||||
msg_clear_interval = 0s
|
||||
msg_expiry_interval = 0s
|
||||
max_payload_size = 1MB
|
||||
flow_control {
|
||||
max_read_number = 0
|
||||
msg_deliver_quota = 0
|
||||
quota_release_interval = 0s
|
||||
}
|
||||
config {
|
||||
type = built_in_database
|
||||
storage_type = ram
|
||||
max_retained_messages = 0
|
||||
}
|
||||
}""">>).
|
||||
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF),
|
||||
emqx_retainer_SUITE:base_conf(),
|
||||
%% Meck emqtt
|
||||
ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]),
|
||||
%% Start Apps
|
||||
|
|
Loading…
Reference in New Issue