From 7b95273218d23795ceef9fb08990fd1cafebd82b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 10 May 2024 15:21:58 +0200 Subject: [PATCH] feat(routesync): make syncer a bit more generic and reusable --- apps/emqx/src/emqx_broker_sup.erl | 2 +- apps/emqx/src/emqx_router_syncer.erl | 124 +++++++++++++++++++++------ 2 files changed, 98 insertions(+), 28 deletions(-) diff --git a/apps/emqx/src/emqx_broker_sup.erl b/apps/emqx/src/emqx_broker_sup.erl index 2abc43ceb..cda7e2167 100644 --- a/apps/emqx/src/emqx_broker_sup.erl +++ b/apps/emqx/src/emqx_broker_sup.erl @@ -47,7 +47,7 @@ init([]) -> router_syncer_pool, hash, PoolSize, - {emqx_router_syncer, start_link, []} + {emqx_router_syncer, start_link_pooled, []} ]), %% Shared subscription diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index d050d9d18..4756d0a37 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -21,11 +21,17 @@ -behaviour(gen_server). +-export([start_link/1]). -export([start_link/2]). +-export([start_link_pooled/2]). -export([push/4]). +-export([push/5]). -export([wait/1]). +-export([close/1]). +-export([open/1]). + -export([stats/0]). -export([ @@ -38,6 +44,15 @@ -type action() :: add | delete. +-type options() :: #{ + max_batch_size => pos_integer(), + min_sync_interval => non_neg_integer(), + error_delay => non_neg_integer(), + error_retry_interval => non_neg_integer(), + initial_state => open | closed, + batch_handler => {module(), _Function :: atom(), _Args :: list()} +}. + -define(POOL, router_syncer_pool). -define(MAX_BATCH_SIZE, 1000). @@ -77,13 +92,23 @@ %% --spec start_link(atom(), pos_integer()) -> +-spec start_link(options()) -> + {ok, pid()} | {error, _Reason}. +start_link(Options) -> + gen_server:start_link(?MODULE, mk_state(Options), []). + +-spec start_link(_Name, options()) -> + {ok, pid()} | {error, _Reason}. +start_link(Name, Options) -> + gen_server:start_link(Name, ?MODULE, mk_state(Options), []). + +-spec start_link_pooled(atom(), pos_integer()) -> {ok, pid()}. -start_link(Pool, Id) -> +start_link_pooled(Pool, Id) -> gen_server:start_link( {local, emqx_utils:proc_name(?MODULE, Id)}, ?MODULE, - [Pool, Id], + {Pool, Id, mk_state(#{})}, [] ). @@ -93,9 +118,16 @@ when Opts :: #{reply => pid()}. push(Action, Topic, Dest, Opts) -> Worker = gproc_pool:pick_worker(?POOL, Topic), + push(Worker, Action, Topic, Dest, Opts). + +-spec push(_Ref, action(), emqx_types:topic(), emqx_router:dest(), Opts) -> + ok | _WaitRef :: reference() +when + Opts :: #{reply => pid()}. +push(Ref, Action, Topic, Dest, Opts) -> Prio = designate_prio(Action, Opts), Context = mk_push_context(Opts), - _ = erlang:send(Worker, ?PUSH(Prio, {Action, Topic, Dest, Context})), + _ = gproc:send(Ref, ?PUSH(Prio, {Action, Topic, Dest, Context})), case Context of [{MRef, _}] -> MRef; @@ -134,6 +166,14 @@ mk_push_context(_) -> %% +close(Ref) -> + gen_server:call(Ref, close, infinity). + +open(Ref) -> + gen_server:call(Ref, open, infinity). + +%% + -type stats() :: #{ size := non_neg_integer(), n_add := non_neg_integer(), @@ -149,10 +189,34 @@ stats() -> %% -init([Pool, Id]) -> - true = gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #{stash => stash_new()}}. +mk_state(Options) -> + #{ + state => maps:get(initial_state, Options, open), + stash => stash_new(), + retry_timer => undefined, + max_batch_size => maps:get(max_batch_size, Options, ?MAX_BATCH_SIZE), + min_sync_interval => maps:get(min_sync_interval, Options, ?MIN_SYNC_INTERVAL), + error_delay => maps:get(error_delay, Options, ?ERROR_DELAY), + error_retry_interval => maps:get(error_retry_interval, Options, ?ERROR_RETRY_INTERVAL), + batch_handler => maps:get(batch_handler, Options, default) + }. +%% + +init({Pool, Id, State}) -> + true = gproc_pool:connect_worker(Pool, {Pool, Id}), + {ok, State}; +init(State) -> + {ok, State}. + +handle_call(close, _From, State) -> + NState = State#{state := closed}, + {reply, ok, NState}; +handle_call(open, _From, State = #{state := closed}) -> + NState = run_batch_loop([], State#{state := open}), + {reply, ok, NState}; +handle_call(open, _From, State) -> + {reply, ok, State}; handle_call(stats, _From, State = #{stash := Stash}) -> {reply, stash_stats(Stash), State}; handle_call(_Call, _From, State) -> @@ -162,11 +226,11 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({timeout, _TRef, retry}, State) -> - NState = run_batch_loop([], maps:remove(retry_timer, State)), + NState = run_batch_loop([], State#{retry_timer := undefined}), {noreply, NState}; -handle_info(Push = ?PUSH(_, _), State) -> +handle_info(Push = ?PUSH(_, _), State = #{min_sync_interval := MSI}) -> %% NOTE: Wait a bit to collect potentially overlapping operations. - ok = timer:sleep(?MIN_SYNC_INTERVAL), + ok = timer:sleep(MSI), NState = run_batch_loop([Push], State), {noreply, NState}. @@ -175,12 +239,16 @@ terminate(_Reason, _State) -> %% -run_batch_loop(Incoming, State = #{stash := Stash0}) -> +run_batch_loop(Incoming, State = #{stash := Stash0, state := closed}) -> Stash1 = stash_add(Incoming, Stash0), Stash2 = stash_drain(Stash1), - {Batch, Stash3} = mk_batch(Stash2), + State#{stash := Stash2}; +run_batch_loop(Incoming, State = #{stash := Stash0, max_batch_size := MBS}) -> + Stash1 = stash_add(Incoming, Stash0), + Stash2 = stash_drain(Stash1), + {Batch, Stash3} = mk_batch(Stash2, MBS), ?tp_ignore_side_effects_in_prod(router_syncer_new_batch, batch_stats(Batch, Stash3)), - case run_batch(Batch) of + case run_batch(Batch, State) of Status = #{} -> ok = send_replies(Status, Batch), NState = cancel_retry_timer(State#{stash := Stash3}), @@ -203,37 +271,37 @@ run_batch_loop(Incoming, State = #{stash := Stash0}) -> batch => batch_stats(Batch, Stash3) }), NState = State#{stash := Stash2}, - ok = timer:sleep(?ERROR_DELAY), + ok = error_cooldown(NState), ensure_retry_timer(NState) end. +error_cooldown(#{error_delay := ED}) -> + timer:sleep(ED). + +ensure_retry_timer(State = #{retry_timer := undefined, error_retry_interval := ERI}) -> + TRef = emqx_utils:start_timer(ERI, retry), + State#{retry_timer := TRef}; ensure_retry_timer(State = #{retry_timer := _TRef}) -> - State; -ensure_retry_timer(State) -> - TRef = emqx_utils:start_timer(?ERROR_RETRY_INTERVAL, retry), - State#{retry_timer => TRef}. + State. cancel_retry_timer(State = #{retry_timer := TRef}) -> ok = emqx_utils:cancel_timer(TRef), - maps:remove(retry_timer, State); + State#{retry_timer := undefined}; cancel_retry_timer(State) -> State. %% -mk_batch(Stash) when map_size(Stash) =< ?MAX_BATCH_SIZE -> +mk_batch(Stash, BatchSize) when map_size(Stash) =< BatchSize -> %% This is perfect situation, we just use stash as batch w/o extra reallocations. {Stash, stash_new()}; -mk_batch(Stash) -> +mk_batch(Stash, BatchSize) -> %% Take a subset of stashed operations to form a batch. %% Note that stash is an unordered map, it's not a queue. The order of operations is %% not preserved strictly, only loosely, because of how we start from high priority %% operations and go down to low priority ones. This might cause some operations to %% stay in stash for unfairly long time, when there are many high priority operations. %% However, it's unclear how likely this is to happen in practice. - mk_batch(Stash, ?MAX_BATCH_SIZE). - -mk_batch(Stash, BatchSize) -> mk_batch(?PRIO_HI, #{}, BatchSize, Stash). mk_batch(Prio, Batch, SizeLeft, Stash) -> @@ -278,10 +346,12 @@ replyctx_send(Result, RefsPids) -> %% -run_batch(Batch) when map_size(Batch) > 0 -> +run_batch(Empty, _State) when Empty =:= #{} -> + #{}; +run_batch(Batch, #{batch_handler := default}) -> catch emqx_router:do_batch(Batch); -run_batch(_Empty) -> - #{}. +run_batch(Batch, #{batch_handler := {Module, Function, Args}}) -> + erlang:apply(Module, Function, [Batch | Args]). %%