This commit is contained in:
Feng 2015-06-14 07:13:08 +08:00
parent 9c666cef70
commit a1d778b081
2 changed files with 59 additions and 62 deletions

View File

@ -58,6 +58,10 @@
-record(mqtt_message, { -record(mqtt_message, {
%% topic is first for message may be retained %% topic is first for message may be retained
topic :: binary(), topic :: binary(),
%% clientid from
from :: binary() | atom(),
%% sender pid ??
sender :: pid(),
qos = ?QOS_0 :: mqtt_qos(), qos = ?QOS_0 :: mqtt_qos(),
retain = false :: boolean(), retain = false :: boolean(),
dup = false :: boolean(), dup = false :: boolean(),

View File

@ -225,7 +225,7 @@ puback(Session = #session{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBAC
Session#session{awaiting_ack = maps:remove(PacketId, Awaiting)}; Session#session{awaiting_ack = maps:remove(PacketId, Awaiting)};
puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) -> puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {puback, PacketId}); gen_server:cast(SessPid, {puback, {?PUBACK, PacketId});
%% PUBREC %% PUBREC
puback(Session = #session{clientid = ClientId, puback(Session = #session{clientid = ClientId,
@ -239,7 +239,7 @@ puback(Session = #session{clientid = ClientId,
awaiting_comp = maps:put(PacketId, true, AwaitingComp)}; awaiting_comp = maps:put(PacketId, true, AwaitingComp)};
puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) -> puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {pubrec, PacketId}), SessPid; gen_server:cast(SessPid, {puback, {?PUBREC, PacketId});
%% PUBREL %% PUBREL
puback(Session = #session{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> puback(Session = #session{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) ->
@ -253,7 +253,7 @@ puback(Session = #session{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBRE
Session#session{awaiting_rel = maps:remove(PacketId, Awaiting)}; Session#session{awaiting_rel = maps:remove(PacketId, Awaiting)};
puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) -> puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) ->
cast(SessPid, {pubrel, PacketId}); gen_server:cast(SessPid, {puback, {?PUBREL, PacketId});
%% PUBCOMP %% PUBCOMP
puback(Session = #session{clientid = ClientId, puback(Session = #session{clientid = ClientId,
@ -265,7 +265,9 @@ puback(Session = #session{clientid = ClientId,
Session#session{awaiting_comp = maps:remove(PacketId, AwaitingComp)}; Session#session{awaiting_comp = maps:remove(PacketId, AwaitingComp)};
puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
cast(SessPid, {pubcomp, PacketId}). gen_server:cast(SessPid, {puback, {?PUBCOMP, PacketId});
wait_ack
timeout(awaiting_rel, MsgId, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) -> timeout(awaiting_rel, MsgId, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) ->
case maps:find(MsgId, Awaiting) of case maps:find(MsgId, Awaiting) of
@ -443,45 +445,31 @@ handle_cast({resume, ClientId, ClientPid}, State = #session{
msg_queue = emqttd_queue:clear(Queue), msg_queue = emqttd_queue:clear(Queue),
expire_timer = undefined}, hibernate}; expire_timer = undefined}, hibernate};
handle_cast({publish, ClientId, {?QOS_2, Message}}, State) -> handle_cast({publish, ClientId, {?QOS_2, Message}}, Session) ->
NewState = publish(State, ClientId, {?QOS_2, Message}), {noreply, publish(Session, ClientId, {?QOS_2, Message})};
{noreply, NewState};
handle_cast({puback, PacketId}, State) -> handle_cast({puback, {PubAck, PacketId}, Session) ->
NewState = puback(State, {?PUBACK, PacketId}), {noreply, puback(Session, {PubAck, PacketId})};
{noreply, NewState};
handle_cast({pubrec, PacketId}, State) -> handle_cast({destroy, ClientId}, Session = #session{clientid = ClientId}) ->
NewState = puback(State, {?PUBREC, PacketId}),
{noreply, NewState};
handle_cast({pubrel, PacketId}, State) ->
NewState = puback(State, {?PUBREL, PacketId}),
{noreply, NewState};
handle_cast({pubcomp, PacketId}, State) ->
NewState = puback(State, {?PUBCOMP, PacketId}),
{noreply, NewState};
handle_cast({destroy, ClientId}, State = #session{clientid = ClientId}) ->
lager:warning("Session ~s destroyed", [ClientId]), lager:warning("Session ~s destroyed", [ClientId]),
{stop, normal, State}; {stop, normal, Session};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]),
{noreply, State}. {noreply, State}.
handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) -> handle_info({dispatch, {_From, Messages}}, Session) when is_list(Messages) ->
F = fun(Message, S) -> dispatch(Message, S) end, F = fun(Message, S) -> dispatch(Message, S) end,
{noreply, lists:foldl(F, State, Messages)}; {noreply, lists:foldl(F, Session, Messages)};
handle_info({dispatch, {_From, Message}}, State) -> handle_info({dispatch, {_From, Message}}, State) ->
{noreply, dispatch(Message, State)}; {noreply, dispatch(Message, State)};
handle_info({'EXIT', ClientPid, Reason}, State = #session{clientid = ClientId, handle_info({'EXIT', ClientPid, Reason}, Session = #session{clientid = ClientId,
client_pid = ClientPid}) -> client_pid = ClientPid}) ->
lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]), lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]),
{noreply, start_expire_timer(State#session{client_pid = undefined})}; {noreply, start_expire_timer(Session#session{client_pid = undefined})};
handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) -> handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) ->
lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]), lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]),
@ -491,49 +479,53 @@ handle_info(session_expired, State = #session{clientid = ClientId}) ->
lager:warning("Session ~s expired!", [ClientId]), lager:warning("Session ~s expired!", [ClientId]),
{stop, {shutdown, expired}, State}; {stop, {shutdown, expired}, State};
handle_info({timeout, awaiting_rel, MsgId}, SessState) -> handle_info({timeout, awaiting_rel, MsgId}, Session) ->
NewState = timeout(awaiting_rel, MsgId, SessState), {noreply, timeout(awaiting_rel, MsgId, Session)};
{noreply, NewState};
handle_info(Info, State) -> handle_info(Info, Session) ->
lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]), lager:critical("Unexpected Info: ~p, Session: ~p", [Info, Session]),
{noreply, State}. {noreply, Session}.
terminate(_Reason, _State) -> terminate(_Reason, _Session) ->
ok. ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, Session, _Extra) ->
{ok, State}. {ok, Session}.
%%%============================================================================= %%%=============================================================================
%%% Internal functions %%% Dispatch message from broker -> client.
%%%============================================================================= %%%=============================================================================
%% client is offline %% queued the message if client is offline
dispatch(Msg, Session = #session{client_pid = undefined}) -> dispatch(Msg, Session = #session{client_pid = undefined}) ->
queue(Msg, Session); queue(Msg, Session);
%% dispatch qos0 directly %% dispatch qos0 directly to client process
dispatch(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) -> dispatch(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
ClientPid ! {dispatch, {self(), Msg}}, Session; ClientPid ! {dispatch, {self(), Msg}}, Session;
%% queue if inflight_queue is full %% dispatch qos1/2 messages and wait for puback
dispatch(Msg = #mqtt_message{qos = Qos}, Session = #session{inflight_window = InflightWin, dispatch(Msg = #mqtt_message{qos = Qos}, Session = #session{clientid = ClientId,
inflight_queue = InflightQ}) message_id = MsgId,
when (Qos > ?QOS_0) andalso (length(InflightQ) >= InflightWin) -> pending_queue = Q,
%%TODO: set alarms inflight_window = Win})
lager:error([{clientid, ClientId}], "Session ~s inflight_queue is full!", [ClientId]),
queue(Msg, Session);
%% dispatch and await ack
dispatch(Msg = #mqtt_message{qos = Qos}, Session = #session{client_pid = ClientPid})
when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
%% assign msgid and await
{NewMsg, NewState} = await_ack(Msg, Session), case emqttd_mqwin:is_full(InflightWin) of
ClientPid ! {dispatch, {self(), NewMsg}}, true ->
lager:error("Session ~s inflight window is full!", [ClientId]),
Session#session{pending_queue = emqttd_mqueue:in(Msg, Q)};
false ->
Msg1 = Msg#mqtt_message{msgid = MsgId},
Msg2 =
if
Qos =:= ?QOS_2 -> Msg1#mqtt_message{dup = false};
true -> Msg1
end,
ClientPid ! {dispatch, {self(), Msg2}},
NewWin = emqttd_mqwin:in(Msg2, Win),
await_ack(Msg2, next_msgid(Session#session{inflight_window = NewWin}))
end.
queue(Msg, Session = #session{pending_queue= Queue}) -> queue(Msg, Session = #session{pending_queue= Queue}) ->
Session#session{pending_queue = emqttd_mqueue:in(Msg, Queue)}. Session#session{pending_queue = emqttd_mqueue:in(Msg, Queue)}.
@ -544,8 +536,9 @@ next_msgid(State = #session{message_id = 16#ffff}) ->
next_msgid(State = #session{message_id = MsgId}) -> next_msgid(State = #session{message_id = MsgId}) ->
State#session{message_id = MsgId + 1}. State#session{message_id = MsgId + 1}.
start_expire_timer(State = #session{expires = Expires, expire_timer = OldTimer}) -> start_expire_timer(Session = #session{expired_after = Expires,
expired_timer = OldTimer}) ->
emqttd_util:cancel_timer(OldTimer), emqttd_util:cancel_timer(OldTimer),
Timer = erlang:send_after(Expires * 1000, self(), session_expired), Timer = erlang:send_after(Expires * 1000, self(), session_expired),
State#session{expire_timer = Timer}. Session#session{expired_timer = Timer}.