fix issue#39 remove old subscription with different Qos #39

This commit is contained in:
Ery Lee 2015-01-15 00:03:10 +08:00
parent 0ae43e316c
commit 45b63a6b13
1 changed files with 73 additions and 28 deletions

View File

@ -60,6 +60,20 @@
terminate/2,
code_change/3]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec topics() -> list(topic()).
-spec subscribe({binary(), mqtt_qos()} | list(), pid()) -> {ok, list(mqtt_qos())}.
-spec unsubscribe(binary() | list(binary()), pid()) -> ok.
-endif.
%%----------------------------------------------------------------------------
-record(state, {}).
%% ------------------------------------------------------------------
@ -75,25 +89,26 @@ start_link() ->
%%
%% @doc All topics
%%
-spec topics() -> list(topic()).
topics() ->
mnesia:dirty_all_keys(topic).
%%
%% @doc Subscribe Topic
%% @doc Subscribe Topic or Topics
%%
-spec subscribe({Topic :: binary(), Qos :: mqtt_qos()}, SubPid :: pid()) -> any().
subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) ->
gen_server:call(?SERVER, {subscribe, {Topic, Qos}, SubPid}).
subscribe([{Topic, Qos}], SubPid);
subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) ->
gen_server:call(?SERVER, {subscribe, Topics, SubPid}).
%%
%% @doc Unsubscribe Topic
%% @doc Unsubscribe Topic or Topics
%%
-spec unsubscribe(Topic :: binary(), SubPid :: pid()) -> any().
unsubscribe(Topic, SubPid) when is_binary(Topic) and is_pid(SubPid) ->
gen_server:cast(?SERVER, {unsubscribe, Topic, SubPid}).
unsubscribe([Topic], SubPid);
unsubscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) ->
gen_server:cast(?SERVER, {unsubscribe, Topics, SubPid}).
%%
%% @doc Publish to cluster node.
@ -143,29 +158,23 @@ init([]) ->
ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]),
{ok, #state{}}.
handle_call({subscribe, {Topic, Qos}, SubPid}, _From, State) ->
case mnesia:transaction(fun trie_add/1, [Topic]) of
{atomic, _} ->
case get({subscriber, SubPid}) of
undefined ->
MonRef = erlang:monitor(process, SubPid),
put({subcriber, SubPid}, MonRef),
put({submon, MonRef}, SubPid);
_ ->
already_monitored
end,
ets:insert(topic_subscriber, #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}),
{reply, ok, State};
{aborted, Reason} ->
{reply, {error, Reason}, State}
end;
handle_call({subscribe, Topics, SubPid}, _From, State) ->
Result = [subscribe_topic({Topic, Qos}, SubPid) || {Topic, Qos} <- Topics],
Reply =
case [Err || Err = {error, _} <- Result] of
[] -> {ok, [Qos || {ok, Qos} <- Result]};
Errors -> hd(Errors)
end,
{reply, Reply, State};
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({unsubscribe, Topic, SubPid}, State) ->
ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}),
try_remove_topic(Topic),
handle_cast({unsubscribe, Topics, SubPid}, State) ->
lists:foreach(fun(Topic) ->
ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}),
try_remove_topic(Topic)
end, Topics),
{noreply, State};
handle_cast(Msg, State) ->
@ -196,6 +205,42 @@ code_change(_OldVsn, State, _Extra) ->
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
subscribe_topic({Topic, Qos}, SubPid) ->
case mnesia:transaction(fun trie_add/1, [Topic]) of
{atomic, _} ->
case get({subscriber, SubPid}) of
undefined ->
%%TODO: refactor later...
MonRef = erlang:monitor(process, SubPid),
put({subcriber, SubPid}, MonRef),
put({submon, MonRef}, SubPid);
_ ->
already_monitored
end,
%% remove duplicated subscribers
try_remove_subscriber({Topic, Qos}, SubPid),
ets:insert(topic_subscriber, #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}),
%TODO: GrantedQos??
{ok, Qos};
{aborted, Reason} ->
{error, Reason}
end.
try_remove_subscriber({Topic, Qos}, SubPid) ->
case ets:lookup(topic_subscriber, Topic) of
[] ->
not_found;
Subs ->
DupSubs = [Sub || Sub = #topic_subscriber{qos = OldQos, subpid = OldPid}
<- Subs, Qos =/= OldQos, OldPid =:= SubPid],
case DupSubs of
[] -> ok;
[DupSub] ->
lager:warning("PubSub: remove duplicated subscriber ~p", [DupSub]),
ets:delete(topic_subscriber, DupSub)
end
end.
try_remove_topic(Name) when is_binary(Name) ->
case ets:member(topic_subscriber, Name) of
false ->
@ -218,7 +263,7 @@ trie_add(Topic) when is_binary(Topic) ->
[TrieNode=#topic_trie_node{topic=undefined}] ->
mnesia:write(TrieNode#topic_trie_node{topic=Topic});
[#topic_trie_node{topic=Topic}] ->
ignore;
{atomic, already_exist};
[] ->
%add trie path
[trie_add_path(Triple) || Triple <- emqtt_topic:triples(Topic)],