fix issue #163 - Protocol Compliant - Session Present Flag

This commit is contained in:
Feng 2015-11-05 12:29:29 +08:00
parent 5aa30f0b1c
commit 22889f552c
3 changed files with 26 additions and 14 deletions

View File

@ -209,6 +209,11 @@
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{return_code = ReturnCode}}). variable = #mqtt_packet_connack{return_code = ReturnCode}}).
-define(CONNACK_PACKET(ReturnCode, SessPresent),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{ack_flags = SessPresent,
return_code = ReturnCode}}).
-define(PUBLISH_PACKET(Qos, PacketId), -define(PUBLISH_PACKET(Qos, PacketId),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
qos = Qos}, qos = Qos},

View File

@ -149,7 +149,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
trace(recv, Packet, State1), trace(recv, Packet, State1),
{ReturnCode1, State3} = {ReturnCode1, SessPresent, State3} =
case validate_connect(Var, State1) of case validate_connect(Var, State1) of
?CONNACK_ACCEPT -> ?CONNACK_ACCEPT ->
case emqttd_access_control:auth(client(State1), Password) of case emqttd_access_control:auth(client(State1), Password) of
@ -159,27 +159,27 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
%% Start session %% Start session
case emqttd_sm:start_session(CleanSess, clientid(State2)) of case emqttd_sm:start_session(CleanSess, clientid(State2)) of
{ok, Session} -> {ok, Session, SP} ->
%% Register the client %% Register the client
emqttd_cm:register(client(State2)), emqttd_cm:register(client(State2)),
%% Start keepalive %% Start keepalive
start_keepalive(KeepAlive), start_keepalive(KeepAlive),
%% ACCEPT %% ACCEPT
{?CONNACK_ACCEPT, State2#proto_state{session = Session}}; {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session}};
{error, Error} -> {error, Error} ->
exit({shutdown, Error}) exit({shutdown, Error})
end; end;
{error, Reason}-> {error, Reason}->
?LOG(error, "Username '~s' login failed for ~s", [Username, Reason], State1), ?LOG(error, "Username '~s' login failed for ~s", [Username, Reason], State1),
{?CONNACK_CREDENTIALS, State1} {?CONNACK_CREDENTIALS, false, State1}
end; end;
ReturnCode -> ReturnCode ->
{ReturnCode, State1} {ReturnCode, false, State1}
end, end,
%% Run hooks %% Run hooks
emqttd_broker:foreach_hooks('client.connected', [ReturnCode1, client(State3)]), emqttd_broker:foreach_hooks('client.connected', [ReturnCode1, client(State3)]),
%% Send connack %% Send connack
send(?CONNACK_PACKET(ReturnCode1), State3); send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3);
process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) -> process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
case check_acl(publish, Topic, client(State)) of case check_acl(publish, Topic, client(State)) of
@ -405,3 +405,6 @@ check_acl(publish, Topic, Client) ->
check_acl(subscribe, Topic, Client) -> check_acl(subscribe, Topic, Client) ->
emqttd_access_control:check_acl(Client, subscribe, Topic). emqttd_access_control:check_acl(Client, subscribe, Topic).
sp(true) -> 1;
sp(false) -> 0.

View File

@ -24,7 +24,6 @@
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_sm). -module(emqttd_sm).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
@ -57,7 +56,7 @@
-define(SM_POOL, ?MODULE). -define(SM_POOL, ?MODULE).
-define(CALL_TIMEOUT, 60000). -define(TIMEOUT, 60000).
-define(LOG(Level, Format, Args, Session), -define(LOG(Level, Format, Args, Session),
lager:Level("SM(~s): " ++ Format, [Session#mqtt_session.client_id | Args])). lager:Level("SM(~s): " ++ Format, [Session#mqtt_session.client_id | Args])).
@ -103,7 +102,7 @@ pool() -> ?SM_POOL.
%% @doc Start a session %% @doc Start a session
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid()} | {error, any()}. -spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}.
start_session(CleanSess, ClientId) -> start_session(CleanSess, ClientId) ->
SM = gproc_pool:pick_worker(?SM_POOL, ClientId), SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
call(SM, {start_session, {CleanSess, ClientId, self()}}). call(SM, {start_session, {CleanSess, ClientId, self()}}).
@ -144,7 +143,7 @@ sesstab(true) -> mqtt_transient_session;
sesstab(false) -> mqtt_persistent_session. sesstab(false) -> mqtt_persistent_session.
call(SM, Req) -> call(SM, Req) ->
gen_server2:call(SM, Req, ?CALL_TIMEOUT). %%infinity). gen_server2:call(SM, Req, ?TIMEOUT). %%infinity).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -168,20 +167,20 @@ handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
case lookup_session(ClientId) of case lookup_session(ClientId) of
undefined -> undefined ->
%% create session locally %% create session locally
{reply, create_session(false, ClientId, ClientPid), State}; reply(create_session(false, ClientId, ClientPid), false, State);
Session -> Session ->
{reply, resume_session(Session, ClientPid), State} reply(resume_session(Session, ClientPid), true, State)
end; end;
%% transient session %% transient session
handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
case lookup_session(ClientId) of case lookup_session(ClientId) of
undefined -> undefined ->
{reply, create_session(true, ClientId, ClientPid), State}; reply(create_session(true, ClientId, ClientPid), false, State);
Session -> Session ->
case destroy_session(Session) of case destroy_session(Session) of
ok -> ok ->
{reply, create_session(true, ClientId, ClientPid), State}; reply(create_session(true, ClientId, ClientPid), false, State);
{error, Error} -> {error, Error} ->
{reply, {error, Error}, State} {reply, {error, Error}, State}
end end
@ -302,3 +301,8 @@ remove_session(Session) ->
{aborted, Error} -> {error, Error} {aborted, Error} -> {error, Error}
end. end.
reply({ok, SessPid}, SP, State) ->
{reply, {ok, SessPid, SP}, State};
reply({error, Error}, _SP, State) ->
{reply, {error, Error}, State}.