map()
This commit is contained in:
commit
bd61ddb03c
|
@ -44,9 +44,9 @@ rewrite_subscribe({_ClientId, _Username}, {Topic, Opts}, Sections) ->
|
||||||
lager:info("Rewrite subscribe: ~p", [{Topic, Opts}]),
|
lager:info("Rewrite subscribe: ~p", [{Topic, Opts}]),
|
||||||
{ok, {match_topic(Topic, Sections), Opts}}.
|
{ok, {match_topic(Topic, Sections), Opts}}.
|
||||||
|
|
||||||
rewrite_unsubscribe({_ClientId, _Username}, {Topic, Opts}, Sections) ->
|
rewrite_unsubscribe({_ClientId, _Username}, Topic, Sections) ->
|
||||||
lager:info("Rewrite unsubscribe: ~p", [{Topic, Opts}]),
|
lager:info("Rewrite unsubscribe: ~p", [Topic]),
|
||||||
{ok, {match_topic(Topic, Sections), Opts}}.
|
{ok, match_topic(Topic, Sections)}.
|
||||||
|
|
||||||
rewrite_publish(Message=#mqtt_message{topic = Topic}, Sections) ->
|
rewrite_publish(Message=#mqtt_message{topic = Topic}, Sections) ->
|
||||||
%%TODO: this will not work if the client is always online.
|
%%TODO: this will not work if the client is always online.
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
-export([start_link/3, subscribe/2, unsubscribe/2, publish/2,
|
-export([start_link/3, subscribe/2, unsubscribe/2, publish/2,
|
||||||
async_subscribe/2, async_unsubscribe/2]).
|
async_subscribe/2, async_unsubscribe/2]).
|
||||||
|
|
||||||
-export([subscribers/1]).
|
-export([subscribers/1, dispatch/2]).
|
||||||
|
|
||||||
%% gen_server.
|
%% gen_server.
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
|
|
|
@ -84,7 +84,7 @@
|
||||||
packet_id = 1,
|
packet_id = 1,
|
||||||
|
|
||||||
%% Client’s subscriptions.
|
%% Client’s subscriptions.
|
||||||
subscriptions :: dict:dict(),
|
subscriptions :: map(),
|
||||||
|
|
||||||
%% Inflight qos1, qos2 messages sent to the client but unacked,
|
%% Inflight qos1, qos2 messages sent to the client but unacked,
|
||||||
%% QoS 1 and QoS 2 messages which have been sent to the Client,
|
%% QoS 1 and QoS 2 messages which have been sent to the Client,
|
||||||
|
@ -323,7 +323,7 @@ handle_cast({unsubscribe, TopicTable}, Session = #session{client_id = Client
|
||||||
{ok, _Qos} ->
|
{ok, _Qos} ->
|
||||||
emqttd:unsubscribe(Topic, ClientId),
|
emqttd:unsubscribe(Topic, ClientId),
|
||||||
emqttd:run_hooks('session.unsubscribed', [ClientId, Username], {Topic, Opts}),
|
emqttd:run_hooks('session.unsubscribed', [ClientId, Username], {Topic, Opts}),
|
||||||
dict:erase(Topic, SubMap);
|
maps:remove(Topic, SubMap);
|
||||||
error ->
|
error ->
|
||||||
SubMap
|
SubMap
|
||||||
end
|
end
|
||||||
|
@ -561,8 +561,8 @@ dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ
|
||||||
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
|
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
tune_qos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) ->
|
tune_qos(Topic, Msg = #mqtt_message{qos = PubQos}, SubMap) ->
|
||||||
case dict:find(Topic, Subscriptions) of
|
case maps:find(Topic, SubMap) of
|
||||||
{ok, SubQos} when PubQos > SubQos ->
|
{ok, SubQos} when PubQos > SubQos ->
|
||||||
Msg#mqtt_message{qos = SubQos};
|
Msg#mqtt_message{qos = SubQos};
|
||||||
{ok, _SubQos} ->
|
{ok, _SubQos} ->
|
||||||
|
|
Loading…
Reference in New Issue