mqtt_pubsub, mqtt_subscription, mqtt_subscriber

This commit is contained in:
Feng Lee 2016-08-10 15:41:10 +08:00
parent 6686139bc6
commit 162b7ec229
3 changed files with 46 additions and 48 deletions

View File

@ -122,21 +122,19 @@ forward(Node, To, Delivery) ->
subscriptions(Subscriber) ->
lists:map(fun({_, Topic}) ->
subscription(Topic, Subscriber)
end, ets:lookup(subscription, Subscriber)).
end, ets:lookup(mqtt_subscription, Subscriber)).
subscription(Topic, Subscriber) ->
{Topic, ets:lookup_element(subproperty, {Topic, Subscriber}, 2)}.
{Topic, ets:lookup_element(mqtt_pubsub, {Topic, Subscriber}, 2)}.
is_subscribed(Topic, Subscriber) when is_binary(Topic) ->
ets:member(subproperty, {Topic, Subscriber}).
ets:member(mqtt_pubsub, {Topic, Subscriber}).
setqos(Topic, Subscriber, Qos) when is_binary(Topic) ->
call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}).
dump() ->
[{subscriber, ets:tab2list(subscriber)},
{subscription, ets:tab2list(subscription)},
{subproperty, ets:tab2list(subproperty)}].
[{Tab, ets:tab2list(Tab)} || Tab <- [mqtt_pubsub, mqtt_subscription, mqtt_subscriber]].
%% @doc Unsubscribe
-spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()).
@ -188,10 +186,10 @@ handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
handle_call({setqos, Topic, Subscriber, Qos}, _From, State) ->
Key = {Topic, Subscriber},
case ets:lookup(subproperty, Key) of
case ets:lookup(mqtt_pubsub, Key) of
[{_, Opts}] ->
Opts1 = lists:ukeymerge(1, [{qos, Qos}], Opts),
ets:insert(subproperty, {Key, Opts1}),
ets:insert(mqtt_pubsub, {Key, Opts1}),
{reply, ok, State};
[] ->
{reply, {error, {subscription_not_found, Topic}}, State}
@ -237,26 +235,26 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
do_subscribe(Topic, Subscriber, Options, State) ->
case ets:lookup(subproperty, {Topic, Subscriber}) of
case ets:lookup(mqtt_pubsub, {Topic, Subscriber}) of
[] ->
?Dispatcher:async_subscribe(Topic, Subscriber),
add_subscription(Subscriber, Topic),
ets:insert(subproperty, {{Topic, Subscriber}, Options}),
ets:insert(mqtt_pubsub, {{Topic, Subscriber}, Options}),
{ok, monitor_subpid(Subscriber, State)};
[_] ->
{error, {already_subscribed, Topic}}
end.
add_subscription(Subscriber, Topic) ->
ets:insert(subscription, {Subscriber, Topic}).
ets:insert(mqtt_subscription, {Subscriber, Topic}).
do_unsubscribe(Topic, Subscriber, State) ->
case ets:lookup(subproperty, {Topic, Subscriber}) of
case ets:lookup(mqtt_pubsub, {Topic, Subscriber}) of
[_] ->
?Dispatcher:async_unsubscribe(Topic, Subscriber),
del_subscription(Subscriber, Topic),
ets:delete(subproperty, {Topic, Subscriber}),
{ok, case ets:member(subscription, Subscriber) of
ets:delete(mqtt_pubsub, {Topic, Subscriber}),
{ok, case ets:member(mqtt_subscription, Subscriber) of
true -> State;
false -> demonitor_subpid(Subscriber, State)
end};
@ -265,22 +263,22 @@ do_unsubscribe(Topic, Subscriber, State) ->
end.
del_subscription(Subscriber, Topic) ->
ets:delete_object(subscription, {Subscriber, Topic}).
ets:delete_object(mqtt_subscription, {Subscriber, Topic}).
subscriber_down_(Subscriber) ->
lists:foreach(fun({_, Topic}) ->
subscriber_down_(Subscriber, Topic)
end, ets:lookup(subscription, Subscriber)),
ets:delete(subscription, Subscriber).
end, ets:lookup(mqtt_subscription, Subscriber)),
ets:delete(mqtt_subscription, Subscriber).
subscriber_down_(DownPid, Topic) ->
case ets:lookup(subproperty, {Topic, DownPid}) of
case ets:lookup(mqtt_pubsub, {Topic, DownPid}) of
[] ->
%% here?
?Dispatcher:async_unsubscribe(Topic, DownPid);
[_] ->
?Dispatcher:async_unsubscribe(Topic, DownPid),
ets:delete(subproperty, {Topic, DownPid})
ets:delete(mqtt_pubsub, {Topic, DownPid})
end.
monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
@ -294,7 +292,7 @@ demonitor_subpid(_SubPid, State) ->
State.
setstats(State) when is_record(State, state) ->
emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(subscriber, size)),
emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', ets:info(subscription, size)),
emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(mqtt_subscriber, size)),
emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', ets:info(mqtt_subscription, size)),
State.

View File

@ -43,7 +43,7 @@ pubsub_pool() ->
init([Env]) ->
%% Create ETS Tables
[create_tab(Tab) || Tab <- [subscriber, subscription, subproperty]],
[create_tab(Tab) || Tab <- [mqtt_pubsub, mqtt_subscriber, mqtt_subscription]],
%% Dispatcher Pool
DispatcherMFA = {emqttd_dispatcher, start_link, [Env]},
@ -68,19 +68,19 @@ pool_sup(Name, Env, MFA) ->
%% Create PubSub Tables
%%--------------------------------------------------------------------
create_tab(subscriber) ->
create_tab(mqtt_pubsub) ->
%% Subproperty: {Topic, Sub} -> [{qos, 1}]
ensure_tab(mqtt_pubsub, [public, named_table, set | ?CONCURRENCY_OPTS]);
create_tab(mqtt_subscriber) ->
%% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN
%% duplicate_bag: o(1) insert
ensure_tab(subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]);
ensure_tab(mqtt_subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]);
create_tab(subscription) ->
create_tab(mqtt_subscription) ->
%% Subscription: Sub -> Topic1, Topic2, Topic3, ..., TopicN
%% bag: o(n) insert
ensure_tab(subscription, [public, named_table, bag | ?CONCURRENCY_OPTS]);
create_tab(subproperty) ->
%% Subproperty: {Topic, Sub} -> [{qos, 1}]
ensure_tab(subproperty, [public, named_table, ordered_set | ?CONCURRENCY_OPTS]).
ensure_tab(mqtt_subscription, [public, named_table, bag | ?CONCURRENCY_OPTS]).
ensure_tab(Tab, Opts) ->
case ets:info(Tab, name) of undefined -> ets:new(Tab, Opts); _ -> ok end.

View File

@ -53,14 +53,14 @@
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = emqttd_mnesia:create_table(route, [
ok = emqttd_mnesia:create_table(mqtt_route, [
{type, bag},
{ram_copies, [node()]},
{record_name, mqtt_route},
{attributes, record_info(fields, mqtt_route)}]);
mnesia(copy) ->
ok = emqttd_mnesia:copy_table(route, ram_copies).
ok = emqttd_mnesia:copy_table(mqtt_route, ram_copies).
%%--------------------------------------------------------------------
%% Start the Router
@ -74,17 +74,17 @@ start_link() ->
%%--------------------------------------------------------------------
topics() ->
mnesia:dirty_all_keys(route).
mnesia:dirty_all_keys(mqtt_route).
local_topics() ->
ets:select(local_route, [{{'$1', '_'}, [], ['$1']}]).
ets:select(mqtt_local_route, [{{'$1', '_'}, [], ['$1']}]).
%% @doc Match Routes.
-spec(match(Topic:: binary()) -> [mqtt_route()]).
match(Topic) when is_binary(Topic) ->
Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]),
%% Optimize: route table will be replicated to all nodes.
lists:append([ets:lookup(route, To) || To <- [Topic | Matched]]).
lists:append([ets:lookup(mqtt_route, To) || To <- [Topic | Matched]]).
%% @doc Print Routes.
-spec(print(Topic :: binary()) -> [ok]).
@ -114,17 +114,17 @@ add_routes(Routes) ->
%% @private
add_route_(Route = #mqtt_route{topic = Topic}) ->
case mnesia:wread({route, Topic}) of
case mnesia:wread({mqtt_route, Topic}) of
[] ->
case emqttd_topic:wildcard(Topic) of
true -> emqttd_trie:insert(Topic);
false -> ok
end,
mnesia:write(route, Route, write);
mnesia:write(Route);
Records ->
case lists:member(Route, Records) of
true -> ok;
false -> mnesia:write(route, Route, write)
false -> mnesia:write(Route)
end
end.
@ -149,27 +149,27 @@ del_routes(Routes) ->
end.
del_route_(Route = #mqtt_route{topic = Topic}) ->
case mnesia:wread({route, Topic}) of
case mnesia:wread({mqtt_route, Topic}) of
[] ->
ok;
[Route] ->
%% Remove route and trie
mnesia:delete_object(route, Route, write),
mnesia:delete_object(Route),
case emqttd_topic:wildcard(Topic) of
true -> emqttd_trie:delete(Topic);
false -> ok
end;
_More ->
%% Remove route only
mnesia:delete_object(route, Route, write)
mnesia:delete_object(Route)
end.
%% @doc Has Route?
-spec(has_route(binary()) -> boolean()).
has_route(Topic) ->
Routes = case mnesia:is_transaction() of
true -> mnesia:read(route, Topic);
false -> mnesia:dirty_read(route, Topic)
true -> mnesia:read(mqtt_route, Topic);
false -> mnesia:dirty_read(mqtt_route, Topic)
end,
length(Routes) > 0.
@ -196,11 +196,11 @@ del_local_route(Topic) ->
-spec(match_local(binary()) -> [mqtt_route()]).
match_local(Name) ->
[#mqtt_route{topic = {local, Filter}, node = Node}
|| {Filter, Node} <- ets:tab2list(local_route),
|| {Filter, Node} <- ets:tab2list(mqtt_local_route),
emqttd_topic:match(Name, Filter)].
dump() ->
[{route, ets:tab2list(route)}, {local_route, ets:tab2list(local_route)}].
[{route, ets:tab2list(mqtt_route)}, {local_route, ets:tab2list(mqtt_local_route)}].
stop() -> gen_server:call(?ROUTER, stop).
@ -209,8 +209,8 @@ stop() -> gen_server:call(?ROUTER, stop).
%%--------------------------------------------------------------------
init([]) ->
ets:new(local_route, [set, named_table, protected]),
mnesia:subscribe(system),
ets:new(mqtt_local_route, [set, named_table, protected]),
{ok, TRef} = timer:send_interval(timer:seconds(1), stats),
{ok, #state{stats_timer = TRef}}.
@ -222,11 +222,11 @@ handle_call(_Req, _From, State) ->
handle_cast({add_local_route, Topic}, State) ->
%% why node()...?
ets:insert(local_route, {Topic, node()}),
ets:insert(mqtt_local_route, {Topic, node()}),
{noreply, State};
handle_cast({del_local_route, Topic}, State) ->
ets:delete(local_route, Topic),
ets:delete(mqtt_local_route, Topic),
{noreply, State};
handle_cast(_Msg, State) ->