Dispatcher

This commit is contained in:
Feng Lee 2016-08-10 12:18:55 +08:00
parent 078c39232b
commit bba8066639
1 changed files with 12 additions and 7 deletions

View File

@ -51,6 +51,8 @@
-define(PUBSUB, ?MODULE). -define(PUBSUB, ?MODULE).
-define(Dispatcher, emqttd_dispatcher).
%% @doc Start a pubsub server %% @doc Start a pubsub server
-spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}). -spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}).
start_link(Pool, Id, Env) -> start_link(Pool, Id, Env) ->
@ -95,7 +97,7 @@ publish(Topic, Msg) when is_binary(Topic) ->
%% Dispatch on the local node %% Dispatch on the local node
route([#mqtt_route{topic = To, node = Node}], route([#mqtt_route{topic = To, node = Node}],
Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() -> Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() ->
emqttd_dispatch:dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]}); ?Dispatcher:dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]});
%% Forward to other nodes %% Forward to other nodes
route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) -> route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) ->
@ -110,7 +112,7 @@ delivery(Msg) -> #mqtt_delivery{message = Msg, flows = []}.
%% @doc Forward message to another node... %% @doc Forward message to another node...
forward(Node, To, Delivery) -> forward(Node, To, Delivery) ->
rpc:cast(Node, emqttd_dispatch, dispatch, [To, Delivery]), {ok, Delivery}. rpc:cast(Node, ?Dispatcher, dispatch, [To, Delivery]), {ok, Delivery}.
subscriptions(Subscriber) -> subscriptions(Subscriber) ->
lists:map(fun({_, Topic}) -> lists:map(fun({_, Topic}) ->
@ -231,8 +233,8 @@ code_change(_OldVsn, State, _Extra) ->
do_subscribe(Topic, Subscriber, Options, State) -> do_subscribe(Topic, Subscriber, Options, State) ->
case ets:lookup(subproperty, {Topic, Subscriber}) of case ets:lookup(subproperty, {Topic, Subscriber}) of
[] -> [] ->
?Dispatcher:async_subscribe(Topic, Subscriber),
add_subscription(Subscriber, Topic), add_subscription(Subscriber, Topic),
emqttd_dispatch:async_subscribe(Topic, Subscriber),
ets:insert(subproperty, {{Topic, Subscriber}, Options}), ets:insert(subproperty, {{Topic, Subscriber}, Options}),
{ok, monitor_subpid(Subscriber, State)}; {ok, monitor_subpid(Subscriber, State)};
[_] -> [_] ->
@ -245,7 +247,7 @@ add_subscription(Subscriber, Topic) ->
do_unsubscribe(Topic, Subscriber, State) -> do_unsubscribe(Topic, Subscriber, State) ->
case ets:lookup(subproperty, {Topic, Subscriber}) of case ets:lookup(subproperty, {Topic, Subscriber}) of
[_] -> [_] ->
emqttd_dispatch:async_subscribe(Topic, Subscriber), ?Dispatcher:async_unsubscribe(Topic, Subscriber),
del_subscription(Subscriber, Topic), del_subscription(Subscriber, Topic),
ets:delete(subproperty, {Topic, Subscriber}), ets:delete(subproperty, {Topic, Subscriber}),
{ok, case ets:member(subscription, Subscriber) of {ok, case ets:member(subscription, Subscriber) of
@ -261,8 +263,11 @@ del_subscription(Subscriber, Topic) ->
subscriber_down(DownPid, Topic) -> subscriber_down(DownPid, Topic) ->
case ets:lookup(subproperty, {Topic, DownPid}) of case ets:lookup(subproperty, {Topic, DownPid}) of
[] -> emqttd_dispatch:async_subscribe(Topic, DownPid); %% warning??? [] ->
[_] -> emqttd_dispatch:async_subscribe(Topic, DownPid), %% here?
?Dispatcher:async_unsubscribe(Topic, DownPid);
[_] ->
?Dispatcher:async_unsubscribe(Topic, DownPid),
ets:delete(subproperty, {Topic, DownPid}) ets:delete(subproperty, {Topic, DownPid})
end. end.