diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 1bbe4dc82..fef93768b 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -285,16 +285,16 @@ aggre([#route{topic = To, dest = Node}]) when is_atom(Node) -> aggre([#route{topic = To, dest = {Group, _Node}}]) -> [{To, Group}]; aggre(Routes) -> - lists:foldl( - fun - (#route{topic = To, dest = Node}, Acc) when is_atom(Node) -> - [{To, Node} | Acc]; - (#route{topic = To, dest = {Group, _Node}}, Acc) -> - lists:usort([{To, Group} | Acc]) - end, - [], - Routes - ). + aggre(Routes, false, []). + +aggre([#route{topic = To, dest = Node} | Rest], Dedup, Acc) when is_atom(Node) -> + aggre(Rest, Dedup, [{To, Node} | Acc]); +aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, Acc) -> + aggre(Rest, true, [{To, Group} | Acc]); +aggre([], false, Acc) -> + Acc; +aggre([], true, Acc) -> + lists:usort(Acc). %% @doc Forward message to another node. -spec forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode :: sync | async) -> diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index 47e4bdf74..1567f9e62 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -243,8 +243,8 @@ handle_call(Req, _From, State) -> handle_cast({delete_routes, SessionID, Subscriptions}, State) -> %% TODO: Make a batch for deleting all routes. - Fun = fun({Topic, _}) -> do_delete_route(Topic, SessionID) end, - ok = lists:foreach(Fun, maps:to_list(Subscriptions)), + Fun = fun(Topic, _) -> do_delete_route(Topic, SessionID) end, + ok = maps:foreach(Fun, Subscriptions), {noreply, State}; handle_cast({resume_end, SessionID, Pid}, State) -> case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of