From 17f40f458fba38da161d8f4d7cd72c53f30bb22b Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 14 Mar 2016 13:21:35 +0800 Subject: [PATCH] if_subsciption, async_subscribe --- rel/files/emqttd.config.development | 3 ++ rel/files/emqttd.config.production | 3 ++ src/emqttd_server.erl | 51 +++++++++++++++++++---------- test/emqttd_SUITE.erl | 4 +++ 4 files changed, 43 insertions(+), 18 deletions(-) diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index d0666ae46..c655b3c3c 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -152,6 +152,9 @@ %% Default should be scheduler numbers {pool_size, 8}, + %% Store Subscription: true | false + {subscription, true}, + %% Route aging time(seconds) {route_aging, 5} ]}, diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index b1ad6122b..58d932388 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -144,6 +144,9 @@ %% Default should be scheduler numbers {pool_size, 8}, + %% Subscription: ram | false + {subscription, ram}, + %% Route aging time(seconds) {route_aging, 5} ]}, diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl index 0fafb14a4..1466d8a7a 100644 --- a/src/emqttd_server.erl +++ b/src/emqttd_server.erl @@ -135,30 +135,34 @@ init([Pool, Id, Env]) -> {ok, #state{pool = Pool, id = Id, env = Env, monitors = dict:new()}}. handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) -> - add_subscription_(ClientId, Topic, Qos), - set_subscription_stats(), - do_subscribe_(SubPid, Topic), + pubsub_subscribe_(SubPid, Topic), + if_subsciption(State, fun() -> + add_subscription_(ClientId, Topic, Qos), + set_subscription_stats() + end), ok(monitor_subscriber_(ClientId, SubPid, State)); handle_call({subscribe, SubPid, Topic}, _From, State) -> - do_subscribe_(SubPid, Topic), + pubsub_subscribe_(SubPid, Topic), ok(monitor_subscriber_(undefined, SubPid, State)); handle_call({update_subscription, ClientId, Topic, OldQos, NewQos}, _From, State) -> - OldSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = OldQos}, - NewSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = NewQos}, - mnesia:transaction(fun update_subscription_/2, [OldSub, NewSub]), - set_subscription_stats(), ok(State); + if_subsciption(State, fun() -> + OldSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = OldQos}, + NewSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = NewQos}, + mnesia:transaction(fun update_subscription_/2, [OldSub, NewSub]), + set_subscription_stats() + end), ok(State); -handle_call({unsubscribe, SubPid, ClientId, Topic, Qos}, From, State) -> - del_subscription_(ClientId, Topic, Qos), - set_subscription_stats(), - handle_call({unsubscribe, SubPid, Topic}, From, State); +handle_call({unsubscribe, SubPid, ClientId, Topic, Qos}, _From, State) -> + pubsub_unsubscribe_(SubPid, Topic), + if_subsciption(State, fun() -> + del_subscription_(ClientId, Topic, Qos), + set_subscription_stats() + end), ok(State); handle_call({unsubscribe, SubPid, Topic}, _From, State) -> - emqttd_pubsub:unsubscribe(Topic, SubPid), - ets:delete_object(subscribed, {SubPid, Topic}), - ok(State); + pubsub_unsubscribe_(SubPid, Topic), ok(State); handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). @@ -179,7 +183,7 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{monitors {ok, {ClientId, _}} -> mnesia:dirty_delete(subscription, ClientId); error -> ok end, - {noreply, State#state{monitors = dict:erase(DownPid, Monitors)}}; + {noreply, State#state{monitors = dict:erase(DownPid, Monitors)}, hibernate}; handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). @@ -194,6 +198,12 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Functions %%-------------------------------------------------------------------- +if_subsciption(#state{env = Env}, Fun) -> + case proplists:get_value(subscription, Env, true) of + false -> ok; + _true -> Fun() + end. + %% @private %% @doc Add a subscription. -spec(add_subscription_(binary(), binary(), mqtt_qos()) -> ok). @@ -219,15 +229,20 @@ del_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) %% @private %% @doc Call pubsub to subscribe -do_subscribe_(SubPid, Topic) -> +pubsub_subscribe_(SubPid, Topic) -> case ets:match(subscribed, {SubPid, Topic}) of [] -> - emqttd_pubsub:subscribe(Topic, SubPid), + emqttd_pubsub:async_subscribe(Topic, SubPid), ets:insert(subscribed, {SubPid, Topic}); [_] -> false end. +%% @private +pubsub_unsubscribe_(SubPid, Topic) -> + emqttd_pubsub:async_unsubscribe(Topic, SubPid), + ets:delete_object(subscribed, {SubPid, Topic}). + monitor_subscriber_(ClientId, SubPid, State = #state{monitors = Monitors}) -> case dict:find(SubPid, Monitors) of {ok, _} -> diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index 3650a24db..badc975eb 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -112,6 +112,7 @@ subscribe_unsubscribe(_) -> publish(_) -> Msg = emqttd_message:make(ct, <<"test/pubsub">>, <<"hello">>), ok = emqttd:subscribe(<<"test/+">>), + timer:sleep(10), emqttd:publish(Msg), true = receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end. @@ -119,6 +120,7 @@ pubsub(_) -> Self = self(), emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 1}), emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 2}), + timer:sleep(10), [{Self, <<"a/b/c">>}] = ets:lookup(subscribed, Self), [{<<"a/b/c">>, Self}] = ets:lookup(subscriber, <<"a/b/c">>), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), @@ -134,12 +136,14 @@ pubsub(_) -> 'pubsub#'(_) -> emqttd:subscribe(<<"a/#">>), + timer:sleep(10), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), true = receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end, emqttd:unsubscribe(<<"a/#">>). 'pubsub+'(_) -> emqttd:subscribe(<<"a/+/+">>), + timer:sleep(10), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), true = receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end, emqttd:unsubscribe(<<"a/+/+">>).