diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 3d1489a5c..3eed025b4 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -23,11 +23,11 @@ %%% %%% Route Table: %%% -%%% Topic -> {Pid1, Qos}, {Pid2, Qos}, ... +%%% Topic -> Pid1, Pid2, ... %%% %%% Reverse Route Table: %%% -%%% Pid -> {Topic1, Qos}, {Topic2, Qos}, ... +%%% Pid -> Topic1, Topic2, ... %%% %%% @end %%% @@ -72,17 +72,15 @@ ensure_tab(Tab, Opts) -> %% @doc Add Routes. %% @end %%------------------------------------------------------------------------------ --spec add_routes(list({binary(), mqtt_qos()}), pid()) -> ok. -add_routes(TopicTable, Pid) when is_pid(Pid) -> +-spec add_routes(list(binary()), pid()) -> ok. +add_routes(Topics, Pid) when is_pid(Pid) -> with_stats(fun() -> case lookup_routes(Pid) of [] -> erlang:monitor(process, Pid), - insert_routes(TopicTable, Pid); - TopicInEts -> - {NewTopics, UpdatedTopics} = diff(TopicTable, TopicInEts), - update_routes(UpdatedTopics, Pid), - insert_routes(NewTopics, Pid) + insert_routes(Topics, Pid); + InEts -> + insert_routes(Topics -- InEts, Pid) end end). @@ -90,9 +88,9 @@ add_routes(TopicTable, Pid) when is_pid(Pid) -> %% @doc Lookup Routes %% @end %%------------------------------------------------------------------------------ --spec lookup_routes(pid()) -> list({binary(), mqtt_qos()}). +-spec lookup_routes(pid()) -> list(binary()). lookup_routes(Pid) when is_pid(Pid) -> - [{Topic, Qos} || {_, Topic, Qos} <- ets:lookup(reverse_route, Pid)]. + [Topic || {_, Topic} <- ets:lookup(reverse_route, Pid)]. %%------------------------------------------------------------------------------ %% @doc Has Route? @@ -116,7 +114,7 @@ delete_routes(Topics, Pid) -> -spec delete_routes(pid()) -> ok. delete_routes(Pid) when is_pid(Pid) -> with_stats(fun() -> - Routes = [{Topic, Pid} || {Topic, _Qos} <- lookup_routes(Pid)], + Routes = [{Topic, Pid} || Topic <- lookup_routes(Pid)], ets:delete(reverse_route, Pid), lists:foreach(fun delete_route_only/1, Routes) end). @@ -132,8 +130,8 @@ route(Queue = <<"$Q/", _Q>>, Msg) -> emqttd_metrics:inc('messages/dropped'); Routes -> Idx = crypto:rand_uniform(1, length(Routes) + 1), - {_, SubPid, SubQos} = lists:nth(Idx, Routes), - SubPid ! {dispatch, tune_qos(SubQos, Msg)} + {_, SubPid} = lists:nth(Idx, Routes), + dispatch(SubPid, Queue, Msg) end; route(Topic, Msg) -> @@ -141,69 +139,33 @@ route(Topic, Msg) -> [] -> emqttd_metrics:inc('messages/dropped'); Routes -> - lists:foreach( - fun({_Topic, SubPid, SubQos}) -> - SubPid ! {dispatch, tune_qos(SubQos, Msg)} - end, Routes) + lists:foreach(fun({_Topic, SubPid}) -> + dispatch(SubPid, Topic, Msg) + end, Routes) end. -tune_qos(SubQos, Msg = #mqtt_message{qos = PubQos}) when PubQos > SubQos -> - Msg#mqtt_message{qos = SubQos}; -tune_qos(_SubQos, Msg) -> - Msg. +dispatch(SubPid, Topic, Msg) -> SubPid ! {dispatch, Topic, Msg}. %%%============================================================================= %%% Internal Functions %%%============================================================================= -diff(TopicTable, TopicInEts) -> - diff(TopicTable, TopicInEts, [], []). - -diff([], _TopicInEts, NewAcc, UpAcc) -> - {NewAcc, UpAcc}; - -diff([{Topic, Qos}|TopicTable], TopicInEts, NewAcc, UpAcc) -> - case lists:keyfind(Topic, 1, TopicInEts) of - {Topic, Qos} -> - diff(TopicTable, TopicInEts, NewAcc, UpAcc); - {Topic, _Qos} -> - diff(TopicTable, TopicInEts, NewAcc, [{Topic, Qos}|UpAcc]); - false -> - diff(TopicTable, TopicInEts, [{Topic, Qos}|NewAcc], UpAcc) - end. - insert_routes([], _Pid) -> ok; -insert_routes(TopicTable, Pid) -> - {Routes, ReverseRoutes} = routes(TopicTable, Pid), +insert_routes(Topics, Pid) -> + {Routes, ReverseRoutes} = routes(Topics, Pid), ets:insert(route, Routes), ets:insert(reverse_route, ReverseRoutes). -update_routes([], _Pid) -> - ok; -update_routes(TopicTable, Pid) -> - {Routes, ReverseRoutes} = routes(TopicTable, Pid), - lists:foreach(fun update_route/1, Routes), - lists:foreach(fun update_reverse_route/1, ReverseRoutes). - -update_route(Route = {Topic, Pid, _Qos}) -> - ets:match_delete(route, {Topic, Pid, '_'}), - ets:insert(route, Route). - -update_reverse_route(RevRoute = {Pid, Topic, _Qos}) -> - ets:match_delete(reverse_route, {Pid, Topic, '_'}), - ets:insert(reverse_route, RevRoute). - -routes(TopicTable, Pid) -> - F = fun(Topic, Qos) -> {{Topic, Pid, Qos}, {Pid, Topic, Qos}} end, - lists:unzip([F(Topic, Qos) || {Topic, Qos} <- TopicTable]). +routes(Topics, Pid) -> + lists:unzip([{{Topic, Pid}, {Pid, Topic}} || Topic <- Topics]). delete_route({Topic, Pid}) -> - ets:match_delete(reverse_route, {Pid, Topic, '_'}), - ets:match_delete(route, {Topic, Pid, '_'}). + ets:delete_object(reverse_route, {Pid, Topic}), + ets:delete_object(route, {Topic, Pid}). delete_route_only({Topic, Pid}) -> - ets:match_delete(route, {Topic, Pid, '_'}). + ets:delete_object(route, {Topic, Pid}). with_stats(Fun) -> Ok = Fun(), setstats(), Ok.