From 57da71eb1d1fe230928f6fdb591e56903491f3bc Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 29 Jun 2023 19:04:03 +0200 Subject: [PATCH 1/2] perf(broker): avoid usorting aggregated routes in a loop --- apps/emqx/src/emqx_broker.erl | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 1bbe4dc82..fef93768b 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -285,16 +285,16 @@ aggre([#route{topic = To, dest = Node}]) when is_atom(Node) -> aggre([#route{topic = To, dest = {Group, _Node}}]) -> [{To, Group}]; aggre(Routes) -> - lists:foldl( - fun - (#route{topic = To, dest = Node}, Acc) when is_atom(Node) -> - [{To, Node} | Acc]; - (#route{topic = To, dest = {Group, _Node}}, Acc) -> - lists:usort([{To, Group} | Acc]) - end, - [], - Routes - ). + aggre(Routes, false, []). + +aggre([#route{topic = To, dest = Node} | Rest], Dedup, Acc) when is_atom(Node) -> + aggre(Rest, Dedup, [{To, Node} | Acc]); +aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, Acc) -> + aggre(Rest, true, [{To, Group} | Acc]); +aggre([], false, Acc) -> + Acc; +aggre([], true, Acc) -> + lists:usort(Acc). %% @doc Forward message to another node. -spec forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode :: sync | async) -> From 0f25df3aa8cecb00683b22653a1bb04cb3e233f5 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 29 Jun 2023 19:06:39 +0200 Subject: [PATCH 2/2] perf(router): employ `maps:foreach/2` instead --- apps/emqx/src/emqx_session_router.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index 47e4bdf74..1567f9e62 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -243,8 +243,8 @@ handle_call(Req, _From, State) -> handle_cast({delete_routes, SessionID, Subscriptions}, State) -> %% TODO: Make a batch for deleting all routes. - Fun = fun({Topic, _}) -> do_delete_route(Topic, SessionID) end, - ok = lists:foreach(Fun, maps:to_list(Subscriptions)), + Fun = fun(Topic, _) -> do_delete_route(Topic, SessionID) end, + ok = maps:foreach(Fun, Subscriptions), {noreply, State}; handle_cast({resume_end, SessionID, Pid}, State) -> case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of