diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 75e2e58d5..71a9197d2 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -51,6 +51,8 @@ -define(PUBSUB, ?MODULE). +-define(Dispatcher, emqttd_dispatcher). + %% @doc Start a pubsub server -spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}). start_link(Pool, Id, Env) -> @@ -95,7 +97,7 @@ publish(Topic, Msg) when is_binary(Topic) -> %% Dispatch on the local node route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() -> - emqttd_dispatch:dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]}); + ?Dispatcher:dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]}); %% Forward to other nodes route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) -> @@ -110,7 +112,7 @@ delivery(Msg) -> #mqtt_delivery{message = Msg, flows = []}. %% @doc Forward message to another node... forward(Node, To, Delivery) -> - rpc:cast(Node, emqttd_dispatch, dispatch, [To, Delivery]), {ok, Delivery}. + rpc:cast(Node, ?Dispatcher, dispatch, [To, Delivery]), {ok, Delivery}. subscriptions(Subscriber) -> lists:map(fun({_, Topic}) -> @@ -231,8 +233,8 @@ code_change(_OldVsn, State, _Extra) -> do_subscribe(Topic, Subscriber, Options, State) -> case ets:lookup(subproperty, {Topic, Subscriber}) of [] -> + ?Dispatcher:async_subscribe(Topic, Subscriber), add_subscription(Subscriber, Topic), - emqttd_dispatch:async_subscribe(Topic, Subscriber), ets:insert(subproperty, {{Topic, Subscriber}, Options}), {ok, monitor_subpid(Subscriber, State)}; [_] -> @@ -245,7 +247,7 @@ add_subscription(Subscriber, Topic) -> do_unsubscribe(Topic, Subscriber, State) -> case ets:lookup(subproperty, {Topic, Subscriber}) of [_] -> - emqttd_dispatch:async_subscribe(Topic, Subscriber), + ?Dispatcher:async_unsubscribe(Topic, Subscriber), del_subscription(Subscriber, Topic), ets:delete(subproperty, {Topic, Subscriber}), {ok, case ets:member(subscription, Subscriber) of @@ -261,9 +263,12 @@ del_subscription(Subscriber, Topic) -> subscriber_down(DownPid, Topic) -> case ets:lookup(subproperty, {Topic, DownPid}) of - [] -> emqttd_dispatch:async_subscribe(Topic, DownPid); %% warning??? - [_] -> emqttd_dispatch:async_subscribe(Topic, DownPid), - ets:delete(subproperty, {Topic, DownPid}) + [] -> + %% here? + ?Dispatcher:async_unsubscribe(Topic, DownPid); + [_] -> + ?Dispatcher:async_unsubscribe(Topic, DownPid), + ets:delete(subproperty, {Topic, DownPid}) end. monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->