Merge pull request #11183 from keynslug/perf/EMQX-9591/routing
perf(broker): avoid usorting aggregated routes in a loop
This commit is contained in:
commit
a127fd6571
|
@ -285,16 +285,16 @@ aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
|
||||||
aggre([#route{topic = To, dest = {Group, _Node}}]) ->
|
aggre([#route{topic = To, dest = {Group, _Node}}]) ->
|
||||||
[{To, Group}];
|
[{To, Group}];
|
||||||
aggre(Routes) ->
|
aggre(Routes) ->
|
||||||
lists:foldl(
|
aggre(Routes, false, []).
|
||||||
fun
|
|
||||||
(#route{topic = To, dest = Node}, Acc) when is_atom(Node) ->
|
aggre([#route{topic = To, dest = Node} | Rest], Dedup, Acc) when is_atom(Node) ->
|
||||||
[{To, Node} | Acc];
|
aggre(Rest, Dedup, [{To, Node} | Acc]);
|
||||||
(#route{topic = To, dest = {Group, _Node}}, Acc) ->
|
aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, Acc) ->
|
||||||
lists:usort([{To, Group} | Acc])
|
aggre(Rest, true, [{To, Group} | Acc]);
|
||||||
end,
|
aggre([], false, Acc) ->
|
||||||
[],
|
Acc;
|
||||||
Routes
|
aggre([], true, Acc) ->
|
||||||
).
|
lists:usort(Acc).
|
||||||
|
|
||||||
%% @doc Forward message to another node.
|
%% @doc Forward message to another node.
|
||||||
-spec forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode :: sync | async) ->
|
-spec forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode :: sync | async) ->
|
||||||
|
|
|
@ -243,8 +243,8 @@ handle_call(Req, _From, State) ->
|
||||||
|
|
||||||
handle_cast({delete_routes, SessionID, Subscriptions}, State) ->
|
handle_cast({delete_routes, SessionID, Subscriptions}, State) ->
|
||||||
%% TODO: Make a batch for deleting all routes.
|
%% TODO: Make a batch for deleting all routes.
|
||||||
Fun = fun({Topic, _}) -> do_delete_route(Topic, SessionID) end,
|
Fun = fun(Topic, _) -> do_delete_route(Topic, SessionID) end,
|
||||||
ok = lists:foreach(Fun, maps:to_list(Subscriptions)),
|
ok = maps:foreach(Fun, Subscriptions),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_cast({resume_end, SessionID, Pid}, State) ->
|
handle_cast({resume_end, SessionID, Pid}, State) ->
|
||||||
case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
|
case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
|
||||||
|
|
Loading…
Reference in New Issue