This commit is contained in:
Feng 2015-12-07 21:20:35 +08:00
parent fd6a350d9c
commit 9e0fe19c25
1 changed files with 23 additions and 61 deletions

View File

@ -23,11 +23,11 @@
%%% %%%
%%% Route Table: %%% Route Table:
%%% %%%
%%% Topic -> {Pid1, Qos}, {Pid2, Qos}, ... %%% Topic -> Pid1, Pid2, ...
%%% %%%
%%% Reverse Route Table: %%% Reverse Route Table:
%%% %%%
%%% Pid -> {Topic1, Qos}, {Topic2, Qos}, ... %%% Pid -> Topic1, Topic2, ...
%%% %%%
%%% @end %%% @end
%%% %%%
@ -72,17 +72,15 @@ ensure_tab(Tab, Opts) ->
%% @doc Add Routes. %% @doc Add Routes.
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec add_routes(list({binary(), mqtt_qos()}), pid()) -> ok. -spec add_routes(list(binary()), pid()) -> ok.
add_routes(TopicTable, Pid) when is_pid(Pid) -> add_routes(Topics, Pid) when is_pid(Pid) ->
with_stats(fun() -> with_stats(fun() ->
case lookup_routes(Pid) of case lookup_routes(Pid) of
[] -> [] ->
erlang:monitor(process, Pid), erlang:monitor(process, Pid),
insert_routes(TopicTable, Pid); insert_routes(Topics, Pid);
TopicInEts -> InEts ->
{NewTopics, UpdatedTopics} = diff(TopicTable, TopicInEts), insert_routes(Topics -- InEts, Pid)
update_routes(UpdatedTopics, Pid),
insert_routes(NewTopics, Pid)
end end
end). end).
@ -90,9 +88,9 @@ add_routes(TopicTable, Pid) when is_pid(Pid) ->
%% @doc Lookup Routes %% @doc Lookup Routes
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec lookup_routes(pid()) -> list({binary(), mqtt_qos()}). -spec lookup_routes(pid()) -> list(binary()).
lookup_routes(Pid) when is_pid(Pid) -> lookup_routes(Pid) when is_pid(Pid) ->
[{Topic, Qos} || {_, Topic, Qos} <- ets:lookup(reverse_route, Pid)]. [Topic || {_, Topic} <- ets:lookup(reverse_route, Pid)].
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Has Route? %% @doc Has Route?
@ -116,7 +114,7 @@ delete_routes(Topics, Pid) ->
-spec delete_routes(pid()) -> ok. -spec delete_routes(pid()) -> ok.
delete_routes(Pid) when is_pid(Pid) -> delete_routes(Pid) when is_pid(Pid) ->
with_stats(fun() -> with_stats(fun() ->
Routes = [{Topic, Pid} || {Topic, _Qos} <- lookup_routes(Pid)], Routes = [{Topic, Pid} || Topic <- lookup_routes(Pid)],
ets:delete(reverse_route, Pid), ets:delete(reverse_route, Pid),
lists:foreach(fun delete_route_only/1, Routes) lists:foreach(fun delete_route_only/1, Routes)
end). end).
@ -132,8 +130,8 @@ route(Queue = <<"$Q/", _Q>>, Msg) ->
emqttd_metrics:inc('messages/dropped'); emqttd_metrics:inc('messages/dropped');
Routes -> Routes ->
Idx = crypto:rand_uniform(1, length(Routes) + 1), Idx = crypto:rand_uniform(1, length(Routes) + 1),
{_, SubPid, SubQos} = lists:nth(Idx, Routes), {_, SubPid} = lists:nth(Idx, Routes),
SubPid ! {dispatch, tune_qos(SubQos, Msg)} dispatch(SubPid, Queue, Msg)
end; end;
route(Topic, Msg) -> route(Topic, Msg) ->
@ -141,69 +139,33 @@ route(Topic, Msg) ->
[] -> [] ->
emqttd_metrics:inc('messages/dropped'); emqttd_metrics:inc('messages/dropped');
Routes -> Routes ->
lists:foreach( lists:foreach(fun({_Topic, SubPid}) ->
fun({_Topic, SubPid, SubQos}) -> dispatch(SubPid, Topic, Msg)
SubPid ! {dispatch, tune_qos(SubQos, Msg)}
end, Routes) end, Routes)
end. end.
tune_qos(SubQos, Msg = #mqtt_message{qos = PubQos}) when PubQos > SubQos -> dispatch(SubPid, Topic, Msg) -> SubPid ! {dispatch, Topic, Msg}.
Msg#mqtt_message{qos = SubQos};
tune_qos(_SubQos, Msg) ->
Msg.
%%%============================================================================= %%%=============================================================================
%%% Internal Functions %%% Internal Functions
%%%============================================================================= %%%=============================================================================
diff(TopicTable, TopicInEts) ->
diff(TopicTable, TopicInEts, [], []).
diff([], _TopicInEts, NewAcc, UpAcc) ->
{NewAcc, UpAcc};
diff([{Topic, Qos}|TopicTable], TopicInEts, NewAcc, UpAcc) ->
case lists:keyfind(Topic, 1, TopicInEts) of
{Topic, Qos} ->
diff(TopicTable, TopicInEts, NewAcc, UpAcc);
{Topic, _Qos} ->
diff(TopicTable, TopicInEts, NewAcc, [{Topic, Qos}|UpAcc]);
false ->
diff(TopicTable, TopicInEts, [{Topic, Qos}|NewAcc], UpAcc)
end.
insert_routes([], _Pid) -> insert_routes([], _Pid) ->
ok; ok;
insert_routes(TopicTable, Pid) -> insert_routes(Topics, Pid) ->
{Routes, ReverseRoutes} = routes(TopicTable, Pid), {Routes, ReverseRoutes} = routes(Topics, Pid),
ets:insert(route, Routes), ets:insert(route, Routes),
ets:insert(reverse_route, ReverseRoutes). ets:insert(reverse_route, ReverseRoutes).
update_routes([], _Pid) -> routes(Topics, Pid) ->
ok; lists:unzip([{{Topic, Pid}, {Pid, Topic}} || Topic <- Topics]).
update_routes(TopicTable, Pid) ->
{Routes, ReverseRoutes} = routes(TopicTable, Pid),
lists:foreach(fun update_route/1, Routes),
lists:foreach(fun update_reverse_route/1, ReverseRoutes).
update_route(Route = {Topic, Pid, _Qos}) ->
ets:match_delete(route, {Topic, Pid, '_'}),
ets:insert(route, Route).
update_reverse_route(RevRoute = {Pid, Topic, _Qos}) ->
ets:match_delete(reverse_route, {Pid, Topic, '_'}),
ets:insert(reverse_route, RevRoute).
routes(TopicTable, Pid) ->
F = fun(Topic, Qos) -> {{Topic, Pid, Qos}, {Pid, Topic, Qos}} end,
lists:unzip([F(Topic, Qos) || {Topic, Qos} <- TopicTable]).
delete_route({Topic, Pid}) -> delete_route({Topic, Pid}) ->
ets:match_delete(reverse_route, {Pid, Topic, '_'}), ets:delete_object(reverse_route, {Pid, Topic}),
ets:match_delete(route, {Topic, Pid, '_'}). ets:delete_object(route, {Topic, Pid}).
delete_route_only({Topic, Pid}) -> delete_route_only({Topic, Pid}) ->
ets:match_delete(route, {Topic, Pid, '_'}). ets:delete_object(route, {Topic, Pid}).
with_stats(Fun) -> with_stats(Fun) ->
Ok = Fun(), setstats(), Ok. Ok = Fun(), setstats(), Ok.