From 5bf1d77993320c4418330b06ba3d566df5a11316 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 24 Feb 2022 18:06:40 +0800 Subject: [PATCH] refactor(emqx_retainer): use hierarchical limiter for the flow control --- .../src/emqx_limiter/etc/emqx_limiter.conf | 21 +-- .../emqx_limiter/src/emqx_limiter_schema.erl | 16 +- .../emqx_limiter/src/emqx_limiter_server.erl | 25 ++- apps/emqx/test/emqx_ratelimiter_SUITE.erl | 11 +- apps/emqx_retainer/etc/emqx_retainer.conf | 16 +- apps/emqx_retainer/src/emqx_retainer.erl | 168 +++--------------- ..._pool.erl => emqx_retainer_dispatcher.erl} | 119 +++++++++++-- .../src/emqx_retainer_mnesia.erl | 10 +- .../src/emqx_retainer_schema.erl | 10 +- apps/emqx_retainer/src/emqx_retainer_sup.erl | 4 +- .../test/emqx_retainer_SUITE.erl | 52 ++++-- .../test/emqx_retainer_mqtt_v5_SUITE.erl | 20 +-- 12 files changed, 229 insertions(+), 243 deletions(-) rename apps/emqx_retainer/src/{emqx_retainer_pool.erl => emqx_retainer_dispatcher.erl} (62%) diff --git a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf index 0d28fb106..2f57d6bc6 100644 --- a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf +++ b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf @@ -4,10 +4,7 @@ limiter { bytes_in { - global.rate = infinity # token generation rate - zone.default.rate = infinity bucket.default { - zone = default aggregated.rate = infinity aggregated.capacity = infinity per_client.rate = infinity @@ -16,10 +13,7 @@ limiter { } message_in { - global.rate = infinity - zone.default.rate = infinity bucket.default { - zone = default aggregated.rate = infinity aggregated.capacity = infinity per_client.rate = infinity @@ -28,10 +22,7 @@ limiter { } connection { - global.rate = infinity - zone.default.rate = infinity bucket.default { - zone = default aggregated.rate = infinity aggregated.capacity = infinity per_client.rate = infinity @@ -40,10 +31,16 @@ limiter { } message_routing { - global.rate = infinity - zone.default.rate = infinity bucket.default { - zone = default + aggregated.rate = infinity + aggregated.capacity = infinity + per_client.rate = infinity + per_client.capacity = infinity + } + } + + shared { + bucket.retainer { aggregated.rate = infinity aggregated.capacity = infinity per_client.rate = infinity diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 666864ca0..0bf0fb13b 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -27,7 +27,8 @@ -type limiter_type() :: bytes_in | message_in | connection - | message_routing. + | message_routing + | shared. -type bucket_name() :: atom(). -type zone_name() :: atom(). @@ -66,22 +67,29 @@ fields(limiter) -> , {message_in, sc(ref(limiter_opts), #{})} , {connection, sc(ref(limiter_opts), #{})} , {message_routing, sc(ref(limiter_opts), #{})} + , {shared, sc(ref(shared_limiter_opts), + #{description => <<"some functions that do not need to use global and zone scope, them can shared use this type">>})} ]; fields(limiter_opts) -> - [ {global, sc(ref(rate_burst), #{})} - , {zone, sc(map("zone name", ref(rate_burst)), #{})} + [ {global, sc(ref(rate_burst), #{nuallabe => true})} + , {zone, sc(map("zone name", ref(rate_burst)), #{nullable => true})} , {bucket, sc(map("bucket_id", ref(bucket)), #{desc => "Token bucket"})} ]; +fields(shared_limiter_opts) -> + [{bucket, sc(map("bucket_id", ref(bucket)), + #{desc => "Token bucket"})} + ]; + fields(rate_burst) -> [ {rate, sc(rate(), #{})} , {burst, sc(burst_rate(), #{default => "0/0s"})} ]; fields(bucket) -> - [ {zone, sc(atom(), #{desc => "The bucket's zone"})} + [ {zone, sc(atom(), #{desc => "The bucket's zone", default => default})} , {aggregated, sc(ref(bucket_aggregated), #{})} , {per_client, sc(ref(client_bucket), #{})} ]; diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 4f19e0a24..c9984cd1a 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -378,11 +378,11 @@ maybe_burst(#{buckets := Buckets, index := Index, zone := Zone} = maps:get(Id, Nodes), case counters:get(Counter, Index) of - Any when Any =< 0 -> - Group = maps:get(Zone, Groups, []), - maps:put(Zone, [Id | Group], Groups); - _ -> - Groups + Any when Any =< 0 -> + Group = maps:get(Zone, Groups, []), + maps:put(Zone, [Id | Group], Groups); + _ -> + Groups end end, @@ -451,9 +451,15 @@ dispatch_burst_to_buckets([], _, Alloced, Nodes) -> -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state(). init_tree(Type, State) -> - #{global := Global, - zone := Zone, - bucket := Bucket} = emqx:get_config([limiter, Type]), + case emqx:get_config([limiter, Type]) of + #{global := Global, + zone := Zone, + bucket := Bucket} -> ok; + #{bucket := Bucket} -> + Global = default_rate_burst_cfg(), + Zone = #{default => default_rate_burst_cfg()}, + ok + end, {Factor, Root} = make_root(Global, Zone), State2 = State#{root := Root}, {NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2), @@ -592,3 +598,6 @@ get_initial_val(#{initial := Initial, true -> 0 end. + +default_rate_burst_cfg() -> + #{rate => infinity, burst => 0}. diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index 60616c5d5..ea2e31700 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -59,7 +59,7 @@ limiter { aggregated.capacity = infinity per_client.rate = infinity per_client.capacity = infinity - } + } } message_routing { @@ -73,6 +73,15 @@ limiter { per_client.capacity = infinity } } + + shared { + bucket.retainer { + aggregated.rate = infinity + aggregated.capacity = infinity + per_client.rate = infinity + per_client.capacity = infinity + } + } } """>>). diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 18643d7cf..2bec041c2 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -45,20 +45,20 @@ retainer { ## repeat this, until all retianed messages are delivered ## flow_control { - ## The max messages number per read from storage. 0 means no limit + ## The messages number per read from storage. 0 means no limit ## ## Default: 0 - max_read_number = 0 + batch_read_number = 0 - ## The max number of retained message can be delivered in emqx per quota_release_interval.0 means no limit + ## The number of retained message can be delivered per batch ## ## Default: 0 - msg_deliver_quota = 0 + batch_deliver_number = 0 - ## deliver quota reset interval + ## deliver limiter bucket ## ## Default: 0s - quota_release_interval = 0s + limiter_bucket_name = retainer } ## Maximum retained message size. @@ -66,11 +66,11 @@ retainer { ## Value: Bytes max_payload_size = 1MB - ## Storage connect parameters + ## Storage backend parameters ## ## Value: built_in_database ## - config { + backend { type = built_in_database diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 97333f022..f55675f88 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -27,10 +27,10 @@ , on_message_publish/2 ]). --export([ dispatch/4 - , delete_message/2 +-export([ delete_message/2 , store_retained/2 - , deliver/5]). + , get_backend_module/0 + ]). -export([ get_expiry_time/1 , update_config/1 @@ -54,8 +54,6 @@ , context_id := non_neg_integer() , context := undefined | context() , clear_timer := undefined | reference() - , release_quota_timer := undefined | reference() - , wait_quotas := list() }. -define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)). @@ -116,45 +114,6 @@ on_message_publish(Msg, _) -> start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). --spec dispatch(context(), pid(), topic(), cursor()) -> ok. -dispatch(Context, Pid, Topic, Cursor) -> - Mod = get_backend_module(), - case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of - false -> - {ok, Result} = Mod:read_message(Context, Topic), - deliver(Result, Context, Pid, Topic, undefined); - true -> - {ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor), - deliver(Result, Context, Pid, Topic, NewCursor) - end. - -deliver([], Context, Pid, Topic, Cursor) -> - case Cursor of - undefined -> - ok; - _ -> - dispatch(Context, Pid, Topic, Cursor) - end; -deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) -> - case erlang:is_process_alive(Pid) of - false -> - ok; - _ -> - #{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([retainer, flow_control]), - case MaxDeliverNum of - 0 -> - _ = [Pid ! {deliver, Topic, Msg} || Msg <- Result], - ok; - _ -> - case do_deliver(Result, Id, Pid, Topic) of - ok -> - deliver([], Context, Pid, Topic, Cursor); - abort -> - ok - end - end - end. - get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) -> 0; get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, @@ -198,7 +157,6 @@ stats_fun() -> init([]) -> emqx_conf:add_handler([retainer], ?MODULE), - init_shared_context(), State = new_state(), #{enable := Enable} = Cfg = emqx:get_config([retainer]), {ok, @@ -213,9 +171,6 @@ handle_call({update_config, NewConf, OldConf}, _, State) -> State2 = update_config(State, NewConf, OldConf), {reply, ok, State2}; -handle_call({wait_semaphore, Id}, From, #{wait_quotas := Waits} = State) -> - {noreply, State#{wait_quotas := [{Id, From} | Waits]}}; - handle_call(clean, _, #{context := Context} = State) -> clean(Context), {reply, ok, State}; @@ -249,30 +204,12 @@ handle_info(clear_expired, #{context := Context} = State) -> Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL), {noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate}; -handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = State) -> - insert_shared_context(?DELIVER_SEMAPHORE, get_msg_deliver_quota()), - case Waits of - [] -> - ok; - _ -> - #{context_id := NowId} = Context, - Waits2 = lists:reverse(Waits), - lists:foreach(fun({Id, From}) -> - gen_server:reply(From, Id =:= NowId) - end, - Waits2) - end, - Interval = emqx:get_config([retainer, flow_control, quota_release_interval]), - {noreply, State#{release_quota_timer := add_timer(Interval, release_deliver_quota), - wait_quotas := []}}; - handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. -terminate(_Reason, #{clear_timer := TRef1, release_quota_timer := TRef2}) -> - _ = stop_timer(TRef1), - _ = stop_timer(TRef2), +terminate(_Reason, #{clear_timer := ClearTimer}) -> + _ = stop_timer(ClearTimer), ok. code_change(_OldVsn, State, _Extra) -> @@ -286,22 +223,19 @@ new_state() -> #{enable => false, context_id => 0, context => undefined, - clear_timer => undefined, - release_quota_timer => undefined, - wait_quotas => []}. + clear_timer => undefined + }. -spec new_context(pos_integer()) -> context(). new_context(Id) -> #{context_id => Id}. - payload_size_limit() -> emqx_conf:get(?MAX_PAYLOAD_SIZE_CONFIG_PATH, ?DEF_MAX_PAYLOAD_SIZE). %% @private dispatch(Context, Topic) -> - emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/4, - [Context, self(), Topic, undefined]). + emqx_retainer_dispatcher:dispatch(Context, Topic). -spec delete_message(context(), topic()) -> ok. delete_message(Context, Topic) -> @@ -328,53 +262,6 @@ clean(Context) -> Mod = get_backend_module(), Mod:clean(Context). --spec do_deliver(list(term()), pos_integer(), pid(), topic()) -> ok | abort. -do_deliver([Msg | T], Id, Pid, Topic) -> - case require_semaphore(?DELIVER_SEMAPHORE, Id) of - true -> - Pid ! {deliver, Topic, Msg}, - do_deliver(T, Id, Pid, Topic); - _ -> - abort - end; -do_deliver([], _, _, _) -> - ok. - --spec require_semaphore(semaphore(), pos_integer()) -> boolean(). -require_semaphore(Semaphore, Id) -> - Remained = ets:update_counter(?SHARED_CONTEXT_TAB, - Semaphore, - {#shared_context.value, -1, -1, -1}), - wait_semaphore(Remained, Id). - --spec wait_semaphore(non_neg_integer(), pos_integer()) -> boolean(). -wait_semaphore(X, Id) when X < 0 -> - call({?FUNCTION_NAME, Id}); -wait_semaphore(_, _) -> - true. - --spec init_shared_context() -> ok. -init_shared_context() -> - ?SHARED_CONTEXT_TAB = ets:new(?SHARED_CONTEXT_TAB, - [ set, named_table, public - , {keypos, #shared_context.key} - , {write_concurrency, true} - , {read_concurrency, true}]), - lists:foreach(fun({K, V}) -> - insert_shared_context(K, V) - end, - [{?DELIVER_SEMAPHORE, get_msg_deliver_quota()}]). - - --spec insert_shared_context(shared_context_key(), term()) -> ok. -insert_shared_context(Key, Term) -> - ets:insert(?SHARED_CONTEXT_TAB, #shared_context{key = Key, value = Term}), - ok. - --spec get_msg_deliver_quota() -> non_neg_integer(). -get_msg_deliver_quota() -> - emqx:get_config([retainer, flow_control, msg_deliver_quota]). - -spec update_config(state(), hocons:config(), hocons:config()) -> state(). update_config(State, Conf, OldConf) -> update_config(maps:get(enable, Conf), @@ -391,24 +278,19 @@ update_config(true, false, State, NewConf, _) -> enable_retainer(State, NewConf); update_config(true, true, - #{clear_timer := ClearTimer, - release_quota_timer := QuotaTimer} = State, NewConf, OldConf) -> - #{config := Cfg, - flow_control := #{quota_release_interval := QuotaInterval}, + #{clear_timer := ClearTimer} = State, NewConf, OldConf) -> + #{backend := BackendCfg, msg_clear_interval := ClearInterval} = NewConf, - #{config := OldCfg} = OldConf, + #{backend := OldBackendCfg} = OldConf, - StorageType = maps:get(type, Cfg), - OldStrorageType = maps:get(type, OldCfg), + StorageType = maps:get(type, BackendCfg), + OldStrorageType = maps:get(type, OldBackendCfg), case OldStrorageType of StorageType -> State#{clear_timer := check_timer(ClearTimer, ClearInterval, - clear_expired), - release_quota_timer := check_timer(QuotaTimer, - QuotaInterval, - release_deliver_quota)}; + clear_expired)}; _ -> State2 = disable_retainer(State), enable_retainer(State2, NewConf) @@ -417,29 +299,23 @@ update_config(true, true, -spec enable_retainer(state(), hocon:config()) -> state(). enable_retainer(#{context_id := ContextId} = State, #{msg_clear_interval := ClearInterval, - flow_control := #{quota_release_interval := ReleaseInterval}, - config := Config}) -> + backend := BackendCfg}) -> NewContextId = ContextId + 1, - Context = create_resource(new_context(NewContextId), Config), + Context = create_resource(new_context(NewContextId), BackendCfg), load(Context), State#{enable := true, context_id := NewContextId, context := Context, - clear_timer := add_timer(ClearInterval, clear_expired), - release_quota_timer := add_timer(ReleaseInterval, release_deliver_quota)}. + clear_timer := add_timer(ClearInterval, clear_expired)}. -spec disable_retainer(state()) -> state(). -disable_retainer(#{clear_timer := TRef1, - release_quota_timer := TRef2, - context := Context, - wait_quotas := Waits} = State) -> +disable_retainer(#{clear_timer := ClearTimer, + context := Context} = State) -> unload(), - ok = lists:foreach(fun(E) -> gen_server:reply(E, false) end, Waits), ok = close_resource(Context), State#{enable := false, - clear_timer := stop_timer(TRef1), - release_quota_timer := stop_timer(TRef2), - wait_quotas := []}. + clear_timer := stop_timer(ClearTimer) + }. -spec stop_timer(undefined | reference()) -> undefined. stop_timer(undefined) -> @@ -466,7 +342,7 @@ check_timer(Timer, _, _) -> -spec get_backend_module() -> backend(). get_backend_module() -> - ModName = case emqx:get_config([retainer, config]) of + ModName = case emqx:get_config([retainer, backend]) of #{type := built_in_database} -> mnesia; #{type := Backend} -> Backend end, diff --git a/apps/emqx_retainer/src/emqx_retainer_pool.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl similarity index 62% rename from apps/emqx_retainer/src/emqx_retainer_pool.erl rename to apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index cd83858c6..8bcc80f60 100644 --- a/apps/emqx_retainer/src/emqx_retainer_pool.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -14,27 +14,41 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_retainer_pool). +-module(emqx_retainer_dispatcher). -behaviour(gen_server). +-include("emqx_retainer.hrl"). -include_lib("emqx/include/logger.hrl"). %% API --export([start_link/2, - async_submit/2]). +-export([start_link/2 + , dispatch/2 + , refresh_limiter/0 + ]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, format_status/2]). +-type limiter() :: emqx_htb_limiter:limiter(). + -define(POOL, ?MODULE). %%%=================================================================== %%% API %%%=================================================================== -async_submit(Fun, Args) -> - cast({async_submit, {Fun, Args}}). +dispatch(Context, Topic) -> + cast({?FUNCTION_NAME, Context, self(), Topic}). + +%% sometimes it is necessary to reset the client's limiter after updated the limiter's config +%% an limiter update handler maybe added later, now this is a workaround +refresh_limiter() -> + Workers = gproc_pool:active_workers(?POOL), + lists:foreach(fun({_, Pid}) -> + gen_server:cast(Pid, ?FUNCTION_NAME) + end, + Workers). %%-------------------------------------------------------------------- %% @doc @@ -66,7 +80,9 @@ start_link(Pool, Id) -> ignore. init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #{pool => Pool, id => Id}}. + Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]), + Limiter = emqx_limiter_server:connect(shared, Bucket), + {ok, #{pool => Pool, id => Id, limiter => Limiter}}. %%-------------------------------------------------------------------- %% @private @@ -98,12 +114,14 @@ handle_call(Req, _From, State) -> {noreply, NewState :: term(), Timeout :: timeout()} | {noreply, NewState :: term(), hibernate} | {stop, Reason :: term(), NewState :: term()}. -handle_cast({async_submit, Task}, State) -> - try run(Task) - catch _:Error:Stacktrace -> - ?SLOG(error, #{msg => "crashed_handling_async_task", exception => Error, stacktrace => Stacktrace}) - end, - {noreply, State}; +handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> + {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter), + {noreply, State#{limiter := Limiter2}}; + +handle_cast(refresh_limiter, State) -> + Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]), + Limiter = emqx_limiter_server:connect(shared, Bucket), + {noreply, State#{limiter := Limiter}}; handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), @@ -174,9 +192,74 @@ cast(Msg) -> worker() -> gproc_pool:pick_worker(?POOL, self()). -run({M, F, A}) -> - erlang:apply(M, F, A); -run({F, A}) when is_function(F), is_list(A) -> - erlang:apply(F, A); -run(Fun) when is_function(Fun) -> - Fun(). +-spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. +dispatch(Context, Pid, Topic, Cursor, Limiter) -> + Mod = emqx_retainer:get_backend_module(), + case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of + false -> + {ok, Result} = Mod:read_message(Context, Topic), + deliver(Result, Context, Pid, Topic, undefined, Limiter); + true -> + {ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor), + deliver(Result, Context, Pid, Topic, NewCursor, Limiter) + end. + +-spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. +deliver([], _Context, _Pid, _Topic, undefined, Limiter) -> + {ok, Limiter}; + +deliver([], Context, Pid, Topic, Cursor, Limiter) -> + dispatch(Context, Pid, Topic, Cursor, Limiter); + +deliver(Result, Context, Pid, Topic, Cursor, Limiter) -> + case erlang:is_process_alive(Pid) of + false -> + {ok, Limiter}; + _ -> + DeliverNum = emqx:get_config([retainer, flow_control, batch_deliver_number]), + case DeliverNum of + 0 -> + do_deliver(Result, Pid, Topic), + {ok, Limiter}; + _ -> + case do_deliver(Result, DeliverNum, Pid, Topic, Limiter) of + {ok, Limiter2} -> + deliver([], Context, Pid, Topic, Cursor, Limiter2); + {drop, Limiter2} -> + {ok, Limiter2} + end + end + end. + +do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) -> + {ok, Limiter}; + +do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> + {Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs), + case emqx_htb_limiter:consume(Num, Limiter) of + {ok, Limiter2} -> + do_deliver(ToDelivers, Pid, Topic), + do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2); + {drop, _} = Drop -> + ?SLOG(error, #{msg => "the retainer deliver failed because the required quota could not be obtained"}), + Drop + end. + +do_deliver([Msg | T], Pid, Topic) -> + Pid ! {deliver, Topic, Msg}, + do_deliver(T, Pid, Topic); + +do_deliver([], _, _) -> + ok. + +safe_split(N, List) -> + safe_split(N, List, 0, []). + +safe_split(0, List, Count, Acc) -> + {Count, lists:reverse(Acc), List}; + +safe_split(_N, [], Count, Acc) -> + {Count, lists:reverse(Acc), []}; + +safe_split(N, [H | T], Count, Acc) -> + safe_split(N - 1, T, Count + 1, [H | Acc]). diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 28be3aa55..93f5251a6 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -144,17 +144,17 @@ page_read(_, Topic, Page, Limit) -> {ok, Rows}. match_messages(_, Topic, Cursor) -> - MaxReadNum = emqx:get_config([retainer, flow_control, max_read_number]), + BatchReadNum = emqx:get_config([retainer, flow_control, batch_read_number]), case Cursor of undefined -> - case MaxReadNum of + case BatchReadNum of 0 -> {ok, sort_retained(match_messages(Topic)), undefined}; _ -> - start_batch_read(Topic, MaxReadNum) + start_batch_read(Topic, BatchReadNum) end; _ -> - batch_read_messages(Cursor, MaxReadNum) + batch_read_messages(Cursor, BatchReadNum) end. clean(_) -> @@ -253,7 +253,7 @@ make_cursor(Topic) -> -spec is_table_full() -> boolean(). is_table_full() -> - #{max_retained_messages := Limit} = emqx:get_config([retainer, config]), + Limit = emqx:get_config([retainer, backend, max_retained_messages]), Limit > 0 andalso (table_size() >= Limit). -spec table_size() -> non_neg_integer(). diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index f4c1e54e5..25a2dda28 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -17,7 +17,7 @@ fields("retainer") -> , {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))} , {max_payload_size, sc(emqx_schema:bytesize(), "1MB")} , {stop_publish_clear_msg, sc(boolean(), false)} - , {config, config()} + , {backend, backend_config()} ]; fields(mnesia_config) -> @@ -27,9 +27,9 @@ fields(mnesia_config) -> ]; fields(flow_control) -> - [ {max_read_number, sc(integer(), 0, fun is_pos_integer/1)} - , {msg_deliver_quota, sc(integer(), 0, fun is_pos_integer/1)} - , {quota_release_interval, sc(emqx_schema:duration_ms(), "0ms")} + [ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)} + , {batch_deliver_number, sc(range(0, 50), 0)} + , {limiter_bucket_name, sc(atom(), retainer)} ]. %%-------------------------------------------------------------------- @@ -45,5 +45,5 @@ sc(Type, Default, Validator) -> is_pos_integer(V) -> V >= 0. -config() -> +backend_config() -> #{type => hoconsc:union([hoconsc:ref(?MODULE, mnesia_config)])}. diff --git a/apps/emqx_retainer/src/emqx_retainer_sup.erl b/apps/emqx_retainer/src/emqx_retainer_sup.erl index 23037c154..5073bb987 100644 --- a/apps/emqx_retainer/src/emqx_retainer_sup.erl +++ b/apps/emqx_retainer/src/emqx_retainer_sup.erl @@ -26,8 +26,8 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - PoolSpec = emqx_pool_sup:spec([emqx_retainer_pool, hash, emqx_vm:schedulers(), - {emqx_retainer_pool, start_link, []}]), + PoolSpec = emqx_pool_sup:spec([emqx_retainer_dispatcher, hash, emqx_vm:schedulers(), + {emqx_retainer_dispatcher, start_link, []}]), {ok, {{one_for_one, 10, 3600}, [#{id => retainer, start => {emqx_retainer, start_link, []}, diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index c25dbae2e..0185e25ea 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -34,16 +34,16 @@ retainer { msg_expiry_interval = 0s max_payload_size = 1MB flow_control { - max_read_number = 0 - msg_deliver_quota = 0 - quota_release_interval = 0s - } - config { + batch_read_number = 0 + batch_deliver_number = 0 + limiter_bucket_name = retainer + } + backend { type = built_in_database storage_type = ram max_retained_messages = 0 - } - }""">>). + +}""">>). %%-------------------------------------------------------------------- %% Setups @@ -57,7 +57,8 @@ init_per_suite(Config) -> meck:expect(emqx_alarm, activate, 3, ok), meck:expect(emqx_alarm, deactivate, 3, ok), - ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF), + base_conf(), + emqx_ratelimiter_SUITE:base_conf(), emqx_common_test_helpers:start_apps([emqx_retainer]), Config. @@ -83,6 +84,9 @@ end_per_testcase(_, Config) -> end, Config. +base_conf() -> + ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF). + %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- @@ -282,10 +286,20 @@ t_stop_publish_clear_msg(_) -> ok = emqtt:disconnect(C1). t_flow_control(_) -> + #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, shared, bucket, retainer]), + RetainerCfg2 = RetainerCfg#{per_client := PerClient#{rate := emqx_ratelimiter_SUITE:to_rate("1/1s"), + capacity := 1}}, + emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg2), + emqx_limiter_manager:restart_server(shared), + timer:sleep(500), + + emqx_retainer_dispatcher:refresh_limiter(), + timer:sleep(500), + emqx_retainer:update_config(#{<<"flow_control">> => - #{<<"max_read_number">> => 1, - <<"msg_deliver_quota">> => 1, - <<"quota_release_interval">> => <<"1s">>}}), + #{<<"batch_read_number">> => 1, + <<"batch_deliver_number">> => 1, + <<"limiter_bucket_name">> => retainer}}), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish( @@ -309,11 +323,19 @@ t_flow_control(_) -> End = erlang:system_time(millisecond), Diff = End - Begin, - %% msg_deliver_quota = 1 and quota_release_interval = 1, and there has three message - %% so total wait time is between in 1 ~ 2s(may be timer will delay, so plus 0.5s to maximum) - ?assert(Diff > timer:seconds(1) andalso Diff < timer:seconds(2.5)), + ?assert(Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9), + lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))), - ok = emqtt:disconnect(C1). + ok = emqtt:disconnect(C1), + + %% recover the limiter + emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg), + emqx_limiter_manager:restart_server(shared), + timer:sleep(500), + + emqx_retainer_dispatcher:refresh_limiter(), + timer:sleep(500), + ok. %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl index 3cfcb949f..852631279 100644 --- a/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl @@ -21,28 +21,10 @@ -include_lib("eunit/include/eunit.hrl"). --define(BASE_CONF, <<""" -retainer { - enable = true - msg_clear_interval = 0s - msg_expiry_interval = 0s - max_payload_size = 1MB - flow_control { - max_read_number = 0 - msg_deliver_quota = 0 - quota_release_interval = 0s - } - config { - type = built_in_database - storage_type = ram - max_retained_messages = 0 - } - }""">>). - all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF), + emqx_retainer_SUITE:base_conf(), %% Meck emqtt ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]), %% Start Apps