Using client id rather then session pid
This commit is contained in:
parent
7eda2609f3
commit
29beb42aa2
|
@ -792,22 +792,20 @@ shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict;
|
||||||
emqx_cm:unregister_connection(ClientId);
|
emqx_cm:unregister_connection(ClientId);
|
||||||
shutdown(Reason, PState = #pstate{connected = true,
|
shutdown(Reason, PState = #pstate{connected = true,
|
||||||
client_id = ClientId,
|
client_id = ClientId,
|
||||||
will_msg = WillMsg,
|
will_msg = WillMsg}) ->
|
||||||
session = Session}) ->
|
|
||||||
?LOG(info, "Shutdown for ~p", [Reason], PState),
|
?LOG(info, "Shutdown for ~p", [Reason], PState),
|
||||||
_ = send_willmsg(WillMsg, Session),
|
_ = send_willmsg(WillMsg, ClientId),
|
||||||
emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
|
emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
|
||||||
emqx_cm:unregister_connection(ClientId).
|
emqx_cm:unregister_connection(ClientId).
|
||||||
|
|
||||||
send_willmsg(undefined, _Session) ->
|
send_willmsg(undefined, _ClientId) ->
|
||||||
ignore;
|
ignore;
|
||||||
send_willmsg(WillMsg = #message{topic = Topic,
|
send_willmsg(WillMsg = #message{topic = Topic,
|
||||||
headers = #{'Will-Delay-Interval' := Interval}}, Session)
|
headers = #{'Will-Delay-Interval' := Interval} = Headers}, ClientId)
|
||||||
when is_integer(Interval), Interval > 0 ->
|
when is_integer(Interval), Interval > 0 ->
|
||||||
SendAfter = integer_to_binary(Interval),
|
SendAfter = integer_to_binary(Interval),
|
||||||
Session1 = list_to_binary(pid_to_list(Session)),
|
emqx_broker:publish(WillMsg#message{topic = emqx_topic:join([<<"$will">>, SendAfter, Topic]), headers = Headers#{client_id => ClientId}});
|
||||||
emqx_broker:publish(WillMsg#message{topic = <<"$will/", Session1/binary, "/", SendAfter/binary, "/", Topic/binary>>});
|
send_willmsg(WillMsg, _ClientId) ->
|
||||||
send_willmsg(WillMsg, _Session) ->
|
|
||||||
emqx_broker:publish(WillMsg).
|
emqx_broker:publish(WillMsg).
|
||||||
|
|
||||||
start_keepalive(0, _PState) ->
|
start_keepalive(0, _PState) ->
|
||||||
|
|
|
@ -542,7 +542,7 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
|
||||||
%% Clean Session: true -> false???
|
%% Clean Session: true -> false???
|
||||||
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
|
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
|
||||||
|
|
||||||
emqx_hooks:run('session.resumed', [#{client_id => ClientId, session => self()}, attrs(State)]),
|
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
|
||||||
|
|
||||||
%% Replay delivery and Dequeue pending messages
|
%% Replay delivery and Dequeue pending messages
|
||||||
noreply(dequeue(retry_delivery(true, State1)));
|
noreply(dequeue(retry_delivery(true, State1)));
|
||||||
|
|
Loading…
Reference in New Issue