diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 6107704c3..9c9524ee6 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -122,21 +122,19 @@ forward(Node, To, Delivery) -> subscriptions(Subscriber) -> lists:map(fun({_, Topic}) -> subscription(Topic, Subscriber) - end, ets:lookup(subscription, Subscriber)). + end, ets:lookup(mqtt_subscription, Subscriber)). subscription(Topic, Subscriber) -> - {Topic, ets:lookup_element(subproperty, {Topic, Subscriber}, 2)}. + {Topic, ets:lookup_element(mqtt_pubsub, {Topic, Subscriber}, 2)}. is_subscribed(Topic, Subscriber) when is_binary(Topic) -> - ets:member(subproperty, {Topic, Subscriber}). + ets:member(mqtt_pubsub, {Topic, Subscriber}). setqos(Topic, Subscriber, Qos) when is_binary(Topic) -> call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}). dump() -> - [{subscriber, ets:tab2list(subscriber)}, - {subscription, ets:tab2list(subscription)}, - {subproperty, ets:tab2list(subproperty)}]. + [{Tab, ets:tab2list(Tab)} || Tab <- [mqtt_pubsub, mqtt_subscription, mqtt_subscriber]]. %% @doc Unsubscribe -spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()). @@ -188,10 +186,10 @@ handle_call({unsubscribe, Topic, Subscriber}, _From, State) -> handle_call({setqos, Topic, Subscriber, Qos}, _From, State) -> Key = {Topic, Subscriber}, - case ets:lookup(subproperty, Key) of + case ets:lookup(mqtt_pubsub, Key) of [{_, Opts}] -> Opts1 = lists:ukeymerge(1, [{qos, Qos}], Opts), - ets:insert(subproperty, {Key, Opts1}), + ets:insert(mqtt_pubsub, {Key, Opts1}), {reply, ok, State}; [] -> {reply, {error, {subscription_not_found, Topic}}, State} @@ -237,26 +235,26 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- do_subscribe(Topic, Subscriber, Options, State) -> - case ets:lookup(subproperty, {Topic, Subscriber}) of + case ets:lookup(mqtt_pubsub, {Topic, Subscriber}) of [] -> ?Dispatcher:async_subscribe(Topic, Subscriber), add_subscription(Subscriber, Topic), - ets:insert(subproperty, {{Topic, Subscriber}, Options}), + ets:insert(mqtt_pubsub, {{Topic, Subscriber}, Options}), {ok, monitor_subpid(Subscriber, State)}; [_] -> {error, {already_subscribed, Topic}} end. add_subscription(Subscriber, Topic) -> - ets:insert(subscription, {Subscriber, Topic}). + ets:insert(mqtt_subscription, {Subscriber, Topic}). do_unsubscribe(Topic, Subscriber, State) -> - case ets:lookup(subproperty, {Topic, Subscriber}) of + case ets:lookup(mqtt_pubsub, {Topic, Subscriber}) of [_] -> ?Dispatcher:async_unsubscribe(Topic, Subscriber), del_subscription(Subscriber, Topic), - ets:delete(subproperty, {Topic, Subscriber}), - {ok, case ets:member(subscription, Subscriber) of + ets:delete(mqtt_pubsub, {Topic, Subscriber}), + {ok, case ets:member(mqtt_subscription, Subscriber) of true -> State; false -> demonitor_subpid(Subscriber, State) end}; @@ -265,22 +263,22 @@ do_unsubscribe(Topic, Subscriber, State) -> end. del_subscription(Subscriber, Topic) -> - ets:delete_object(subscription, {Subscriber, Topic}). + ets:delete_object(mqtt_subscription, {Subscriber, Topic}). subscriber_down_(Subscriber) -> lists:foreach(fun({_, Topic}) -> subscriber_down_(Subscriber, Topic) - end, ets:lookup(subscription, Subscriber)), - ets:delete(subscription, Subscriber). + end, ets:lookup(mqtt_subscription, Subscriber)), + ets:delete(mqtt_subscription, Subscriber). subscriber_down_(DownPid, Topic) -> - case ets:lookup(subproperty, {Topic, DownPid}) of + case ets:lookup(mqtt_pubsub, {Topic, DownPid}) of [] -> %% here? ?Dispatcher:async_unsubscribe(Topic, DownPid); [_] -> ?Dispatcher:async_unsubscribe(Topic, DownPid), - ets:delete(subproperty, {Topic, DownPid}) + ets:delete(mqtt_pubsub, {Topic, DownPid}) end. monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> @@ -294,7 +292,7 @@ demonitor_subpid(_SubPid, State) -> State. setstats(State) when is_record(State, state) -> - emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(subscriber, size)), - emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', ets:info(subscription, size)), + emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(mqtt_subscriber, size)), + emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', ets:info(mqtt_subscription, size)), State. diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 4663151ed..9b794dcfd 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -43,7 +43,7 @@ pubsub_pool() -> init([Env]) -> %% Create ETS Tables - [create_tab(Tab) || Tab <- [subscriber, subscription, subproperty]], + [create_tab(Tab) || Tab <- [mqtt_pubsub, mqtt_subscriber, mqtt_subscription]], %% Dispatcher Pool DispatcherMFA = {emqttd_dispatcher, start_link, [Env]}, @@ -68,19 +68,19 @@ pool_sup(Name, Env, MFA) -> %% Create PubSub Tables %%-------------------------------------------------------------------- -create_tab(subscriber) -> +create_tab(mqtt_pubsub) -> + %% Subproperty: {Topic, Sub} -> [{qos, 1}] + ensure_tab(mqtt_pubsub, [public, named_table, set | ?CONCURRENCY_OPTS]); + +create_tab(mqtt_subscriber) -> %% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN %% duplicate_bag: o(1) insert - ensure_tab(subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]); + ensure_tab(mqtt_subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]); -create_tab(subscription) -> +create_tab(mqtt_subscription) -> %% Subscription: Sub -> Topic1, Topic2, Topic3, ..., TopicN %% bag: o(n) insert - ensure_tab(subscription, [public, named_table, bag | ?CONCURRENCY_OPTS]); - -create_tab(subproperty) -> - %% Subproperty: {Topic, Sub} -> [{qos, 1}] - ensure_tab(subproperty, [public, named_table, ordered_set | ?CONCURRENCY_OPTS]). + ensure_tab(mqtt_subscription, [public, named_table, bag | ?CONCURRENCY_OPTS]). ensure_tab(Tab, Opts) -> case ets:info(Tab, name) of undefined -> ets:new(Tab, Opts); _ -> ok end. diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index d946e945e..d77a86488 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -53,14 +53,14 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = emqttd_mnesia:create_table(route, [ + ok = emqttd_mnesia:create_table(mqtt_route, [ {type, bag}, {ram_copies, [node()]}, {record_name, mqtt_route}, {attributes, record_info(fields, mqtt_route)}]); mnesia(copy) -> - ok = emqttd_mnesia:copy_table(route, ram_copies). + ok = emqttd_mnesia:copy_table(mqtt_route, ram_copies). %%-------------------------------------------------------------------- %% Start the Router @@ -74,17 +74,17 @@ start_link() -> %%-------------------------------------------------------------------- topics() -> - mnesia:dirty_all_keys(route). + mnesia:dirty_all_keys(mqtt_route). local_topics() -> - ets:select(local_route, [{{'$1', '_'}, [], ['$1']}]). + ets:select(mqtt_local_route, [{{'$1', '_'}, [], ['$1']}]). %% @doc Match Routes. -spec(match(Topic:: binary()) -> [mqtt_route()]). match(Topic) when is_binary(Topic) -> Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]), %% Optimize: route table will be replicated to all nodes. - lists:append([ets:lookup(route, To) || To <- [Topic | Matched]]). + lists:append([ets:lookup(mqtt_route, To) || To <- [Topic | Matched]]). %% @doc Print Routes. -spec(print(Topic :: binary()) -> [ok]). @@ -114,17 +114,17 @@ add_routes(Routes) -> %% @private add_route_(Route = #mqtt_route{topic = Topic}) -> - case mnesia:wread({route, Topic}) of + case mnesia:wread({mqtt_route, Topic}) of [] -> case emqttd_topic:wildcard(Topic) of true -> emqttd_trie:insert(Topic); false -> ok end, - mnesia:write(route, Route, write); + mnesia:write(Route); Records -> case lists:member(Route, Records) of true -> ok; - false -> mnesia:write(route, Route, write) + false -> mnesia:write(Route) end end. @@ -149,27 +149,27 @@ del_routes(Routes) -> end. del_route_(Route = #mqtt_route{topic = Topic}) -> - case mnesia:wread({route, Topic}) of + case mnesia:wread({mqtt_route, Topic}) of [] -> ok; [Route] -> %% Remove route and trie - mnesia:delete_object(route, Route, write), + mnesia:delete_object(Route), case emqttd_topic:wildcard(Topic) of true -> emqttd_trie:delete(Topic); false -> ok end; _More -> %% Remove route only - mnesia:delete_object(route, Route, write) + mnesia:delete_object(Route) end. %% @doc Has Route? -spec(has_route(binary()) -> boolean()). has_route(Topic) -> Routes = case mnesia:is_transaction() of - true -> mnesia:read(route, Topic); - false -> mnesia:dirty_read(route, Topic) + true -> mnesia:read(mqtt_route, Topic); + false -> mnesia:dirty_read(mqtt_route, Topic) end, length(Routes) > 0. @@ -196,11 +196,11 @@ del_local_route(Topic) -> -spec(match_local(binary()) -> [mqtt_route()]). match_local(Name) -> [#mqtt_route{topic = {local, Filter}, node = Node} - || {Filter, Node} <- ets:tab2list(local_route), + || {Filter, Node} <- ets:tab2list(mqtt_local_route), emqttd_topic:match(Name, Filter)]. dump() -> - [{route, ets:tab2list(route)}, {local_route, ets:tab2list(local_route)}]. + [{route, ets:tab2list(mqtt_route)}, {local_route, ets:tab2list(mqtt_local_route)}]. stop() -> gen_server:call(?ROUTER, stop). @@ -209,8 +209,8 @@ stop() -> gen_server:call(?ROUTER, stop). %%-------------------------------------------------------------------- init([]) -> - ets:new(local_route, [set, named_table, protected]), mnesia:subscribe(system), + ets:new(mqtt_local_route, [set, named_table, protected]), {ok, TRef} = timer:send_interval(timer:seconds(1), stats), {ok, #state{stats_timer = TRef}}. @@ -222,11 +222,11 @@ handle_call(_Req, _From, State) -> handle_cast({add_local_route, Topic}, State) -> %% why node()...? - ets:insert(local_route, {Topic, node()}), + ets:insert(mqtt_local_route, {Topic, node()}), {noreply, State}; handle_cast({del_local_route, Topic}, State) -> - ets:delete(local_route, Topic), + ets:delete(mqtt_local_route, Topic), {noreply, State}; handle_cast(_Msg, State) ->