From 5007650bd2f6ade5f2e6621abad2a08163bdcafd Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 26 Jan 2024 18:26:34 +0200 Subject: [PATCH] perf(emqx_broker): pick broker pool worker by topic/shard pair to distribute the load more evenly. Fixes: EMQX-11812 --- apps/emqx/src/emqx_broker.erl | 114 +++++++++++++++++---------- apps/emqx/src/emqx_router_syncer.erl | 26 +++--- 2 files changed, 88 insertions(+), 52 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index ac9116cbd..23679700e 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -85,6 +85,16 @@ %% Guards -define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))). +-define(cast_or_eval(Pid, Msg, Expr), + case Pid =:= self() of + true -> + _ = Expr, + ok; + false -> + cast(Pid, Msg) + end +). + -spec start_link(atom(), pos_integer()) -> startlink_ret(). start_link(Pool, Id) -> ok = create_tabs(), @@ -159,15 +169,7 @@ do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) -> %% https://emqx.atlassian.net/browse/EMQX-10214 I = emqx_broker_helper:get_sub_shard(SubPid, Topic), true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}), - %% NOTE - %% We are relying on the local state to minimize global routing state changes, - %% thus it's important that some operations on ETS tables on the same topic - %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of - %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of - %% `unsubscribe` codepath. So we have to pick a worker according to the topic, - %% but not shard. If there are topics with high number of shards, then the - %% load across the pool will be unbalanced. - Sync = call(pick(Topic), {subscribe, Topic, SubPid, I}), + Sync = call(pick({Topic, I}), {subscribe, Topic, SubPid, I}), case Sync of ok -> ok; @@ -218,15 +220,7 @@ do_unsubscribe2(Topic, SubPid, SubOpts) when 0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts); _ -> ok end, - %% NOTE - %% We are relying on the local state to minimize global routing state changes, - %% thus it's important that some operations on ETS tables on the same topic - %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of - %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of - %% `unsubscribe` codepath. So we have to pick a worker according to the topic, - %% but not shard. If there are topics with high number of shards, then the - %% load across the pool will be unbalanced. - cast(pick(Topic), {unsubscribed, Topic, SubPid, I}); + cast(pick({Topic, I}), {unsubscribed, Topic, SubPid, I}); do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when is_binary(Group), is_binary(Topic), is_pid(SubPid) -> @@ -501,8 +495,8 @@ cast(Broker, Req) -> gen_server:cast(Broker, Req). %% Pick a broker -pick(Topic) -> - gproc_pool:pick_worker(broker_pool, Topic). +pick(TopicShard) -> + gproc_pool:pick_worker(broker_pool, TopicShard). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -514,36 +508,72 @@ init([Pool, Id]) -> handle_call({subscribe, Topic, SubPid, 0}, {From, _Tag}, State) -> Existed = ets:member(?SUBSCRIBER, Topic), + Result = maybe_add_route(Existed, Topic, From), + assert_ok_result(Result), true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), - Result = maybe_add_route(Existed, Topic, From), - {reply, Result, State}; -handle_call({subscribe, Topic, SubPid, I}, {From, _Tag}, State) -> - Existed = ets:member(?SUBSCRIBER, Topic), - true = ets:insert(?SUBSCRIBER, [ - {Topic, {shard, I}}, - {{shard, Topic, I}, SubPid} - ]), - Result = maybe_add_route(Existed, Topic, From), {reply, Result, State}; +handle_call({subscribe, Topic, SubPid, I}, _From, State) -> + Existed = ets:member(?SUBSCRIBER, {shard, Topic, I}), + Recs = [{{shard, Topic, I}, SubPid}], + Recs1 = + case Existed of + false -> + %% This will attempt to add a route per each new shard. + %% The overhead must be negligible, but the consistency in general + %% and race conditions safety is expected to be stronger. + %% The main purpose is to solve the race when + %% `{shard, Topic, N}` (where N > 0) + %% is the first ever processed subscribe request per `Topic`. + %% It inserts `{Topic, {shard, I}}` to `?SUBSCRIBER` tab. + %% After that, another broker worker starts processing + %% `{shard, Topic, 0}` sub and already observers `{shard, Topic, N}`, + %% i.e. `ets:member(?SUBSCRIBER, Topic)` returns false, + %% so it doesn't add the route. + %% Even if this happens, this cast is expected to be processed eventually + %% and the route should be added (unless the worker restarts...) + ?cast_or_eval( + pick({Topic, 0}), + {subscribed, Topic, shard, I}, + sync_route(add, Topic, #{}) + ), + [{Topic, {shard, I}} | Recs]; + true -> + Recs + end, + true = ets:insert(?SUBSCRIBER, Recs1), + {reply, ok, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. +handle_cast({subscribed, Topic, shard, _I}, State) -> + %% Do not need to 'maybe add' (i.e. to check if the route exists). + %% It was already checked that this shard is newely added. + _ = sync_route(add, Topic, #{}), + {noreply, State}; +handle_cast({unsubscribed, Topic, shard, _I}, State) -> + _ = maybe_delete_route(Topic), + {noreply, State}; handle_cast({unsubscribed, Topic, SubPid, 0}, State) -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - Exists = ets:member(?SUBSCRIBER, Topic), - _Result = maybe_delete_route(Exists, Topic), + _ = maybe_delete_route(Topic), {noreply, State}; handle_cast({unsubscribed, Topic, SubPid, I}, State) -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), case ets:member(?SUBSCRIBER, {shard, Topic, I}) of false -> - ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}); + ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), + %% Do not attempt to delete any routes here, + %% let it be handled only by the same pool worker per topic (0 shard), + %% so that all route deletes are serialized. + ?cast_or_eval( + pick({Topic, 0}), + {unsubscribed, Topic, shard, I}, + maybe_delete_route(Topic) + ); true -> - true + ok end, - Exists = ets:member(?SUBSCRIBER, Topic), - _Result = maybe_delete_route(Exists, Topic), {noreply, State}; handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), @@ -582,7 +612,7 @@ do_dispatch(Topic, #delivery{message = Msg}) -> {ok, DispN} end. -%% Donot dispatch to share subscriber here. +%% Don't dispatch to share subscriber here. %% we do it in `emqx_shared_sub.erl` with configured strategy do_dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> case erlang:is_process_alive(SubPid) of @@ -603,15 +633,19 @@ do_dispatch({shard, I}, Topic, Msg) -> %% +assert_ok_result(ok) -> ok; +assert_ok_result(Ref) when is_reference(Ref) -> ok. + maybe_add_route(_Existed = false, Topic, ReplyTo) -> sync_route(add, Topic, #{reply => ReplyTo}); maybe_add_route(_Existed = true, _Topic, _ReplyTo) -> ok. -maybe_delete_route(_Exists = false, Topic) -> - sync_route(delete, Topic, #{}); -maybe_delete_route(_Exists = true, _Topic) -> - ok. +maybe_delete_route(Topic) -> + case ets:member(?SUBSCRIBER, Topic) of + true -> ok; + false -> sync_route(delete, Topic, #{}) + end. sync_route(Action, Topic, ReplyTo) -> EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]), diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index dccc681c8..fef5a1fc7 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -97,7 +97,7 @@ push(Action, Topic, Dest, Opts) -> Context = mk_push_context(Opts), _ = erlang:send(Worker, ?PUSH(Prio, {Action, Topic, Dest, Context})), case Context of - {MRef, _} -> + [{MRef, _}] -> MRef; [] -> ok @@ -128,7 +128,7 @@ designate_prio(delete, #{}) -> mk_push_context(#{reply := To}) -> MRef = erlang:make_ref(), - {MRef, To}; + [{MRef, To}]; mk_push_context(_) -> []. @@ -272,8 +272,8 @@ send_replies(Errors, Batch) -> replyctx_send(_Result, []) -> noreply; -replyctx_send(Result, {MRef, Pid}) -> - _ = erlang:send(Pid, {MRef, Result}), +replyctx_send(Result, RefsPids) -> + _ = lists:foreach(fun({MRef, Pid}) -> erlang:send(Pid, {MRef, Result}) end, RefsPids), ok. %% @@ -316,10 +316,11 @@ stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) -> Stash#{Route := RouteOpMerged} end. -merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) -> - %% NOTE: This should not happen anyway. - _ = replyctx_send(ignored, Ctx1), - DestOp; +merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), ?ROUTEOP(Action, Prio2, Ctx2)) -> + %% NOTE: This can happen as topic shard can be processed concurrently + %% by different broker worker, see emqx_broker for more details. + MergedCtx = Ctx1 ++ Ctx2, + ?ROUTEOP(Action, Prio2, MergedCtx); merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), DestOp = ?ROUTEOP(_Action2, _Prio2, _Ctx2)) -> %% NOTE: Latter cancel the former. %% Strictly speaking, in ideal conditions we could just cancel both, because they @@ -352,7 +353,7 @@ stash_stats(Stash) -> batch_test() -> Dest = node(), - Ctx = fun(N) -> {N, self()} end, + Ctx = fun(N) -> [{N, self()}] end, Stash = stash_add( [ ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(1))), @@ -375,6 +376,7 @@ batch_test() -> stash_new() ), {Batch, StashLeft} = mk_batch(Stash, 5), + ?assertMatch( #{ {<<"t/1">>, Dest} := {add, ?PRIO_LO, _}, @@ -392,16 +394,16 @@ batch_test() -> }, StashLeft ), + + %% Replies are only sent to superseded ops: ?assertEqual( [ - {2, ignored}, {1, ok}, {5, ok}, - {7, ignored}, {4, ok}, {9, ok}, + {7, ok}, {8, ok}, - {13, ignored}, {11, ok} ], emqx_utils_stream:consume(emqx_utils_stream:mqueue(0))