Set some attributes when session resumed

This commit is contained in:
周子博 2018-10-27 13:59:17 +08:00
parent 92251d4a8a
commit 7c14ba11d6
2 changed files with 17 additions and 14 deletions

View File

@ -46,7 +46,7 @@
-export([start_link/1]). -export([start_link/1]).
-export([info/1, attrs/1]). -export([info/1, attrs/1]).
-export([stats/1]). -export([stats/1]).
-export([resume/3, discard/2]). -export([resume/2, discard/2]).
-export([update_expiry_interval/2, update_misc/2]). -export([update_expiry_interval/2, update_misc/2]).
-export([subscribe/2, subscribe/4]). -export([subscribe/2, subscribe/4]).
-export([publish/3]). -export([publish/3]).
@ -311,9 +311,9 @@ unsubscribe(SPid, PacketId, Properties, TopicFilters) ->
UnsubReq = {PacketId, Properties, TopicFilters}, UnsubReq = {PacketId, Properties, TopicFilters},
gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}). gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}).
-spec(resume(spid(), pid(), emqx:message() | undefined) -> ok). -spec(resume(spid(), map()) -> ok).
resume(SPid, ConnPid, WillMsg) -> resume(SPid, SessAttrs) ->
gen_server:cast(SPid, {resume, ConnPid, WillMsg}). gen_server:cast(SPid, {resume, SessAttrs}).
%% @doc Discard the session %% @doc Discard the session
-spec(discard(spid(), ByPid :: pid()) -> ok). -spec(discard(spid(), ByPid :: pid()) -> ok).
@ -517,7 +517,9 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
end; end;
%% RESUME: %% RESUME:
handle_cast({resume, ConnPid, WillMsg}, State = #state{client_id = ClientId, handle_cast({resume, #{conn_pid := ConnPid,
will_msg := WillMsg,
expiry_interval := SessionExpiryInterval}}, State = #state{client_id = ClientId,
conn_pid = OldConnPid, conn_pid = OldConnPid,
clean_start = CleanStart, clean_start = CleanStart,
retry_timer = RetryTimer, retry_timer = RetryTimer,
@ -545,6 +547,7 @@ handle_cast({resume, ConnPid, WillMsg}, State = #state{client_id = Client
awaiting_rel = #{}, awaiting_rel = #{},
await_rel_timer = undefined, await_rel_timer = undefined,
expiry_timer = undefined, expiry_timer = undefined,
expiry_interval = SessionExpiryInterval,
will_delay_timer = undefined, will_delay_timer = undefined,
will_msg = WillMsg}, will_msg = WillMsg},

View File

@ -93,11 +93,11 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
resume_session(ClientId) -> resume_session(ClientId) ->
resume_session(ClientId, #{conn_pid => self(), will_msg => undefined}). resume_session(ClientId, #{conn_pid => self(), will_msg => undefined}).
resume_session(ClientId, #{conn_pid := ConnPid, will_msg := WillMsg}) -> resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) ->
case lookup_session(ClientId) of case lookup_session(ClientId) of
[] -> {error, not_found}; [] -> {error, not_found};
[{_ClientId, SPid}] -> [{_ClientId, SPid}] ->
ok = emqx_session:resume(SPid, ConnPid, WillMsg), ok = emqx_session:resume(SPid, SessAttrs),
{ok, SPid}; {ok, SPid};
Sessions -> Sessions ->
[{_, SPid}|StaleSessions] = lists:reverse(Sessions), [{_, SPid}|StaleSessions] = lists:reverse(Sessions),
@ -105,7 +105,7 @@ resume_session(ClientId, #{conn_pid := ConnPid, will_msg := WillMsg}) ->
lists:foreach(fun({_, StalePid}) -> lists:foreach(fun({_, StalePid}) ->
catch emqx_session:discard(StalePid, ConnPid) catch emqx_session:discard(StalePid, ConnPid)
end, StaleSessions), end, StaleSessions),
ok = emqx_session:resume(SPid, ConnPid, WillMsg), ok = emqx_session:resume(SPid, SessAttrs),
{ok, SPid} {ok, SPid}
end. end.