if_subsciption, async_subscribe

This commit is contained in:
Feng 2016-03-14 13:21:35 +08:00
parent f6fa6a9f71
commit 17f40f458f
4 changed files with 43 additions and 18 deletions

View File

@ -152,6 +152,9 @@
%% Default should be scheduler numbers %% Default should be scheduler numbers
{pool_size, 8}, {pool_size, 8},
%% Store Subscription: true | false
{subscription, true},
%% Route aging time(seconds) %% Route aging time(seconds)
{route_aging, 5} {route_aging, 5}
]}, ]},

View File

@ -144,6 +144,9 @@
%% Default should be scheduler numbers %% Default should be scheduler numbers
{pool_size, 8}, {pool_size, 8},
%% Subscription: ram | false
{subscription, ram},
%% Route aging time(seconds) %% Route aging time(seconds)
{route_aging, 5} {route_aging, 5}
]}, ]},

View File

@ -135,30 +135,34 @@ init([Pool, Id, Env]) ->
{ok, #state{pool = Pool, id = Id, env = Env, monitors = dict:new()}}. {ok, #state{pool = Pool, id = Id, env = Env, monitors = dict:new()}}.
handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) -> handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) ->
add_subscription_(ClientId, Topic, Qos), pubsub_subscribe_(SubPid, Topic),
set_subscription_stats(), if_subsciption(State, fun() ->
do_subscribe_(SubPid, Topic), add_subscription_(ClientId, Topic, Qos),
set_subscription_stats()
end),
ok(monitor_subscriber_(ClientId, SubPid, State)); ok(monitor_subscriber_(ClientId, SubPid, State));
handle_call({subscribe, SubPid, Topic}, _From, State) -> handle_call({subscribe, SubPid, Topic}, _From, State) ->
do_subscribe_(SubPid, Topic), pubsub_subscribe_(SubPid, Topic),
ok(monitor_subscriber_(undefined, SubPid, State)); ok(monitor_subscriber_(undefined, SubPid, State));
handle_call({update_subscription, ClientId, Topic, OldQos, NewQos}, _From, State) -> handle_call({update_subscription, ClientId, Topic, OldQos, NewQos}, _From, State) ->
OldSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = OldQos}, if_subsciption(State, fun() ->
NewSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = NewQos}, OldSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = OldQos},
mnesia:transaction(fun update_subscription_/2, [OldSub, NewSub]), NewSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = NewQos},
set_subscription_stats(), ok(State); mnesia:transaction(fun update_subscription_/2, [OldSub, NewSub]),
set_subscription_stats()
end), ok(State);
handle_call({unsubscribe, SubPid, ClientId, Topic, Qos}, From, State) -> handle_call({unsubscribe, SubPid, ClientId, Topic, Qos}, _From, State) ->
del_subscription_(ClientId, Topic, Qos), pubsub_unsubscribe_(SubPid, Topic),
set_subscription_stats(), if_subsciption(State, fun() ->
handle_call({unsubscribe, SubPid, Topic}, From, State); del_subscription_(ClientId, Topic, Qos),
set_subscription_stats()
end), ok(State);
handle_call({unsubscribe, SubPid, Topic}, _From, State) -> handle_call({unsubscribe, SubPid, Topic}, _From, State) ->
emqttd_pubsub:unsubscribe(Topic, SubPid), pubsub_unsubscribe_(SubPid, Topic), ok(State);
ets:delete_object(subscribed, {SubPid, Topic}),
ok(State);
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State). ?UNEXPECTED_REQ(Req, State).
@ -179,7 +183,7 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{monitors
{ok, {ClientId, _}} -> mnesia:dirty_delete(subscription, ClientId); {ok, {ClientId, _}} -> mnesia:dirty_delete(subscription, ClientId);
error -> ok error -> ok
end, end,
{noreply, State#state{monitors = dict:erase(DownPid, Monitors)}}; {noreply, State#state{monitors = dict:erase(DownPid, Monitors)}, hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State). ?UNEXPECTED_INFO(Info, State).
@ -194,6 +198,12 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal Functions %% Internal Functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
if_subsciption(#state{env = Env}, Fun) ->
case proplists:get_value(subscription, Env, true) of
false -> ok;
_true -> Fun()
end.
%% @private %% @private
%% @doc Add a subscription. %% @doc Add a subscription.
-spec(add_subscription_(binary(), binary(), mqtt_qos()) -> ok). -spec(add_subscription_(binary(), binary(), mqtt_qos()) -> ok).
@ -219,15 +229,20 @@ del_subscription_(Subscription) when is_record(Subscription, mqtt_subscription)
%% @private %% @private
%% @doc Call pubsub to subscribe %% @doc Call pubsub to subscribe
do_subscribe_(SubPid, Topic) -> pubsub_subscribe_(SubPid, Topic) ->
case ets:match(subscribed, {SubPid, Topic}) of case ets:match(subscribed, {SubPid, Topic}) of
[] -> [] ->
emqttd_pubsub:subscribe(Topic, SubPid), emqttd_pubsub:async_subscribe(Topic, SubPid),
ets:insert(subscribed, {SubPid, Topic}); ets:insert(subscribed, {SubPid, Topic});
[_] -> [_] ->
false false
end. end.
%% @private
pubsub_unsubscribe_(SubPid, Topic) ->
emqttd_pubsub:async_unsubscribe(Topic, SubPid),
ets:delete_object(subscribed, {SubPid, Topic}).
monitor_subscriber_(ClientId, SubPid, State = #state{monitors = Monitors}) -> monitor_subscriber_(ClientId, SubPid, State = #state{monitors = Monitors}) ->
case dict:find(SubPid, Monitors) of case dict:find(SubPid, Monitors) of
{ok, _} -> {ok, _} ->

View File

@ -112,6 +112,7 @@ subscribe_unsubscribe(_) ->
publish(_) -> publish(_) ->
Msg = emqttd_message:make(ct, <<"test/pubsub">>, <<"hello">>), Msg = emqttd_message:make(ct, <<"test/pubsub">>, <<"hello">>),
ok = emqttd:subscribe(<<"test/+">>), ok = emqttd:subscribe(<<"test/+">>),
timer:sleep(10),
emqttd:publish(Msg), emqttd:publish(Msg),
true = receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end. true = receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end.
@ -119,6 +120,7 @@ pubsub(_) ->
Self = self(), Self = self(),
emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 1}), emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 1}),
emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 2}), emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 2}),
timer:sleep(10),
[{Self, <<"a/b/c">>}] = ets:lookup(subscribed, Self), [{Self, <<"a/b/c">>}] = ets:lookup(subscribed, Self),
[{<<"a/b/c">>, Self}] = ets:lookup(subscriber, <<"a/b/c">>), [{<<"a/b/c">>, Self}] = ets:lookup(subscriber, <<"a/b/c">>),
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
@ -134,12 +136,14 @@ pubsub(_) ->
'pubsub#'(_) -> 'pubsub#'(_) ->
emqttd:subscribe(<<"a/#">>), emqttd:subscribe(<<"a/#">>),
timer:sleep(10),
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
true = receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end, true = receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end,
emqttd:unsubscribe(<<"a/#">>). emqttd:unsubscribe(<<"a/#">>).
'pubsub+'(_) -> 'pubsub+'(_) ->
emqttd:subscribe(<<"a/+/+">>), emqttd:subscribe(<<"a/+/+">>),
timer:sleep(10),
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
true = receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end, true = receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end,
emqttd:unsubscribe(<<"a/+/+">>). emqttd:unsubscribe(<<"a/+/+">>).