diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index edd6ac41a..653fb49c7 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -44,9 +44,9 @@ rewrite_subscribe({_ClientId, _Username}, {Topic, Opts}, Sections) -> lager:info("Rewrite subscribe: ~p", [{Topic, Opts}]), {ok, {match_topic(Topic, Sections), Opts}}. -rewrite_unsubscribe({_ClientId, _Username}, {Topic, Opts}, Sections) -> - lager:info("Rewrite unsubscribe: ~p", [{Topic, Opts}]), - {ok, {match_topic(Topic, Sections), Opts}}. +rewrite_unsubscribe({_ClientId, _Username}, Topic, Sections) -> + lager:info("Rewrite unsubscribe: ~p", [Topic]), + {ok, match_topic(Topic, Sections)}. rewrite_publish(Message=#mqtt_message{topic = Topic}, Sections) -> %%TODO: this will not work if the client is always online. diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 1af6274bf..487181a0e 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -26,7 +26,7 @@ -export([start_link/3, subscribe/2, unsubscribe/2, publish/2, async_subscribe/2, async_unsubscribe/2]). --export([subscribers/1]). +-export([subscribers/1, dispatch/2]). %% gen_server. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 6c4ab5c97..85701e249 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -84,7 +84,7 @@ packet_id = 1, %% Client’s subscriptions. - subscriptions :: dict:dict(), + subscriptions :: map(), %% Inflight qos1, qos2 messages sent to the client but unacked, %% QoS 1 and QoS 2 messages which have been sent to the Client, @@ -323,7 +323,7 @@ handle_cast({unsubscribe, TopicTable}, Session = #session{client_id = Client {ok, _Qos} -> emqttd:unsubscribe(Topic, ClientId), emqttd:run_hooks('session.unsubscribed', [ClientId, Username], {Topic, Opts}), - dict:erase(Topic, SubMap); + maps:remove(Topic, SubMap); error -> SubMap end @@ -561,8 +561,8 @@ dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) end. -tune_qos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) -> - case dict:find(Topic, Subscriptions) of +tune_qos(Topic, Msg = #mqtt_message{qos = PubQos}, SubMap) -> + case maps:find(Topic, SubMap) of {ok, SubQos} when PubQos > SubQos -> Msg#mqtt_message{qos = SubQos}; {ok, _SubQos} ->