diff --git a/apps/emqtt/include/emqtt.hrl b/apps/emqtt/include/emqtt.hrl index c73fc8f71..ff67fafde 100644 --- a/apps/emqtt/include/emqtt.hrl +++ b/apps/emqtt/include/emqtt.hrl @@ -45,8 +45,8 @@ %% MQTT Client %%------------------------------------------------------------------------------ -record(mqtt_client, { - client_id, - username + client_id, + username }). -type mqtt_client() :: #mqtt_client{}. @@ -68,12 +68,12 @@ %% MQTT Message %%------------------------------------------------------------------------------ -record(mqtt_message, { - msgid :: integer() | undefined, - qos = ?QOS_0 :: mqtt_qos(), - retain = false :: boolean(), - dup = false :: boolean(), - topic :: binary(), - payload :: binary() + msgid :: integer() | undefined, + qos = ?QOS_0 :: mqtt_qos(), + retain = false :: boolean(), + dup = false :: boolean(), + topic :: binary(), + payload :: binary() }). -type mqtt_message() :: #mqtt_message{}. @@ -82,8 +82,8 @@ %% MQTT User Management %%------------------------------------------------------------------------------ -record(mqtt_user, { - username :: binary(), - passwdhash :: binary() + username :: binary(), + passwdhash :: binary() }). %%------------------------------------------------------------------------------ diff --git a/apps/emqtt/src/emqtt_retained.erl b/apps/emqtt/src/emqtt_retained.erl index 96c605537..f7edd79f1 100644 --- a/apps/emqtt/src/emqtt_retained.erl +++ b/apps/emqtt/src/emqtt_retained.erl @@ -47,6 +47,7 @@ -include("emqtt.hrl"). -export([start_link/0, + retain/1, lookup/1, insert/2, delete/1, @@ -66,6 +67,11 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +retain(Msg = #mqtt_message{retain = true}) -> + Msg; + +retain(Msg) -> Msg. + lookup(Topic) -> ets:lookup(retained_msg, Topic). diff --git a/apps/emqtt/src/emqtt_router.erl b/apps/emqtt/src/emqtt_router.erl index be4fbfc06..09eeaf0ce 100644 --- a/apps/emqtt/src/emqtt_router.erl +++ b/apps/emqtt/src/emqtt_router.erl @@ -65,7 +65,10 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). route(Message) -> - emqtt_pubsub:publish(retained(reset_dup(Message))). + %%TODO: hooks later. + emqtt_pubsub:publish( + emqtt_message:unset_flag( + emqtt_retained:retain(Message))). %% ------------------------------------------------------------------ %% gen_server Function Definitions @@ -92,11 +95,4 @@ code_change(_OldVsn, State, _Extra) -> %% ------------------------------------------------------------------ %% Internal Function Definitions %% ------------------------------------------------------------------ -retained(Msg = #mqtt_message{retain = true, topic = Topic}) -> - emqtt_retained:insert(Topic, Msg), Msg; - -retained(Msg) -> Msg. - -reset_dup(Msg = #mqtt_message{dup = true}) -> Msg#mqtt_message{dup = false}; -reset_dup(Msg) -> Msg. diff --git a/apps/emqtt/src/emqtt_session.erl b/apps/emqtt/src/emqtt_session.erl index e4d1e04c7..430f82ae9 100644 --- a/apps/emqtt/src/emqtt_session.erl +++ b/apps/emqtt/src/emqtt_session.erl @@ -235,17 +235,24 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state { awaiting_comp = AwaitingComp, expire_timer = ETimer}) -> lager:info("Session ~s resumed by ~p", [ClientId, ClientPid]), + %cancel timeout timer erlang:cancel_timer(ETimer), %% redelivery PUBREL - [ ClientPid ! {redeliver, {?PUBREL, PacketId}} || PacketId <- maps:keys(AwaitingComp) ], + lists:foreach(fun(PacketId) -> + ClientPid ! {redeliver, {?PUBREL, PacketId}} + end, maps:keys(AwaitingComp)), %% redelivery messages that awaiting PUBACK or PUBREC Dup = fun(Msg) -> Msg#mqtt_message{ dup = true } end, - [ ClientPid ! {dispatch, {self(), Dup(Message)}} || Message <- maps:values(AwaitingAck) ], + lists:foreach(fun(Msg) -> + ClientPid ! {dispatch, {self(), Dup(Msg)}} + end, maps:values(AwaitingAck)), %% send offline messages - [ClientPid ! {dispatch, {self(), Message}} || Message <- emqtt_queue:all(Queue)], + lists:foreach(fun(Msg) -> + ClientPid ! {dispatch, {self(), Msg}} + end, emqtt_queue:all(Queue)), NewState = State#session_state{ client_pid = ClientPid, msg_queue = emqtt_queue:clear(Queue),