lookup/2, tune_qos

This commit is contained in:
Feng 2015-12-16 10:18:28 +08:00
parent 62e4c749d5
commit b172a78fcd
2 changed files with 32 additions and 6 deletions

View File

@ -42,8 +42,10 @@
%% API Exports
-export([start_link/4]).
-export([create/2, subscribe/1, subscribe/2,
unsubscribe/1, unsubscribe/2, publish/1]).
-export([create/2, lookup/2, subscribe/1, subscribe/2,
unsubscribe/1, unsubscribe/2, publish/1, delete/2]).
%% Subscriptions API
%% Local node
-export([match/1]).
@ -154,6 +156,30 @@ create(subscription, {SubId, Topic, Qos}) ->
{aborted, Error} -> {error, Error}
end.
%%------------------------------------------------------------------------------
%% @doc Lookup Topic or Subscription.
%% @end
%%------------------------------------------------------------------------------
-spec lookup(topic | subscription, binary()) -> list().
lookup(topic, Topic) ->
mnesia:dirty_read(topic, Topic);
lookup(subscription, ClientId) ->
mnesia:dirty_read(subscription, ClientId).
%%------------------------------------------------------------------------------
%% @doc Delete Topic or Subscription.
%% @end
%%------------------------------------------------------------------------------
delete(topic, _Topic) ->
{error, unsupported};
delete(subscription, ClientId) when is_binary(ClientId) ->
mnesia:dirty_deleate({subscription, ClientId});
delete(subscription, {ClientId, Topic}) when is_binary(ClientId) ->
mnesia:async_dirty(fun remove_subscriptions/2, [ClientId, [Topic]]).
%%------------------------------------------------------------------------------
%% @doc Subscribe Topics
%% @end
@ -363,7 +389,7 @@ remove_subscriptions(SubId, Topics) ->
lists:foreach(fun(Topic) ->
Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'},
Records = mnesia:match_object(subscription, Pattern, write),
[delete_subscription(Record) || Record <- Records]
lists:foreach(fun delete_subscription/1, Records)
end, Topics).
delete_subscription(Record) ->

View File

@ -483,7 +483,7 @@ handle_cast(Msg, State) ->
%% Dispatch Message
handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})
when is_record(Msg, mqtt_message) ->
dispatch(fixqos(Topic, Msg, Subscriptions), Session);
dispatch(tune_qos(Topic, Msg, Subscriptions), Session);
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined,
awaiting_ack = AwaitingAck}) ->
@ -603,7 +603,7 @@ dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
end.
fixqos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) ->
tune_qos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) ->
case dict:find(Topic, Subscriptions) of
{ok, SubQos} when PubQos > SubQos ->
Msg#mqtt_message{qos = SubQos};