This commit is contained in:
Ery Lee 2015-01-16 12:36:27 +08:00
parent 75d7e65672
commit b68a325301
4 changed files with 30 additions and 21 deletions

View File

@ -45,8 +45,8 @@
%% MQTT Client %% MQTT Client
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-record(mqtt_client, { -record(mqtt_client, {
client_id, client_id,
username username
}). }).
-type mqtt_client() :: #mqtt_client{}. -type mqtt_client() :: #mqtt_client{}.
@ -68,12 +68,12 @@
%% MQTT Message %% MQTT Message
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-record(mqtt_message, { -record(mqtt_message, {
msgid :: integer() | undefined, msgid :: integer() | undefined,
qos = ?QOS_0 :: mqtt_qos(), qos = ?QOS_0 :: mqtt_qos(),
retain = false :: boolean(), retain = false :: boolean(),
dup = false :: boolean(), dup = false :: boolean(),
topic :: binary(), topic :: binary(),
payload :: binary() payload :: binary()
}). }).
-type mqtt_message() :: #mqtt_message{}. -type mqtt_message() :: #mqtt_message{}.
@ -82,8 +82,8 @@
%% MQTT User Management %% MQTT User Management
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-record(mqtt_user, { -record(mqtt_user, {
username :: binary(), username :: binary(),
passwdhash :: binary() passwdhash :: binary()
}). }).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -47,6 +47,7 @@
-include("emqtt.hrl"). -include("emqtt.hrl").
-export([start_link/0, -export([start_link/0,
retain/1,
lookup/1, lookup/1,
insert/2, insert/2,
delete/1, delete/1,
@ -66,6 +67,11 @@
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
retain(Msg = #mqtt_message{retain = true}) ->
Msg;
retain(Msg) -> Msg.
lookup(Topic) -> lookup(Topic) ->
ets:lookup(retained_msg, Topic). ets:lookup(retained_msg, Topic).

View File

@ -65,7 +65,10 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
route(Message) -> route(Message) ->
emqtt_pubsub:publish(retained(reset_dup(Message))). %%TODO: hooks later.
emqtt_pubsub:publish(
emqtt_message:unset_flag(
emqtt_retained:retain(Message))).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% gen_server Function Definitions %% gen_server Function Definitions
@ -92,11 +95,4 @@ code_change(_OldVsn, State, _Extra) ->
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% Internal Function Definitions %% Internal Function Definitions
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
retained(Msg = #mqtt_message{retain = true, topic = Topic}) ->
emqtt_retained:insert(Topic, Msg), Msg;
retained(Msg) -> Msg.
reset_dup(Msg = #mqtt_message{dup = true}) -> Msg#mqtt_message{dup = false};
reset_dup(Msg) -> Msg.

View File

@ -235,17 +235,24 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state {
awaiting_comp = AwaitingComp, awaiting_comp = AwaitingComp,
expire_timer = ETimer}) -> expire_timer = ETimer}) ->
lager:info("Session ~s resumed by ~p", [ClientId, ClientPid]), lager:info("Session ~s resumed by ~p", [ClientId, ClientPid]),
%cancel timeout timer
erlang:cancel_timer(ETimer), erlang:cancel_timer(ETimer),
%% redelivery PUBREL %% redelivery PUBREL
[ ClientPid ! {redeliver, {?PUBREL, PacketId}} || PacketId <- maps:keys(AwaitingComp) ], lists:foreach(fun(PacketId) ->
ClientPid ! {redeliver, {?PUBREL, PacketId}}
end, maps:keys(AwaitingComp)),
%% redelivery messages that awaiting PUBACK or PUBREC %% redelivery messages that awaiting PUBACK or PUBREC
Dup = fun(Msg) -> Msg#mqtt_message{ dup = true } end, Dup = fun(Msg) -> Msg#mqtt_message{ dup = true } end,
[ ClientPid ! {dispatch, {self(), Dup(Message)}} || Message <- maps:values(AwaitingAck) ], lists:foreach(fun(Msg) ->
ClientPid ! {dispatch, {self(), Dup(Msg)}}
end, maps:values(AwaitingAck)),
%% send offline messages %% send offline messages
[ClientPid ! {dispatch, {self(), Message}} || Message <- emqtt_queue:all(Queue)], lists:foreach(fun(Msg) ->
ClientPid ! {dispatch, {self(), Msg}}
end, emqtt_queue:all(Queue)),
NewState = State#session_state{ client_pid = ClientPid, NewState = State#session_state{ client_pid = ClientPid,
msg_queue = emqtt_queue:clear(Queue), msg_queue = emqtt_queue:clear(Queue),