Merge pull request #8626 from lafirest/fix/connect_event_5

fix(channel): Adjust the timing of the `client.connected` event
This commit is contained in:
lafirest 2022-08-03 16:01:17 +08:00 committed by GitHub
commit 11165f4a9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 4 additions and 4 deletions

View File

@ -357,7 +357,7 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
}, },
case authenticate(?CONNECT_PACKET(NConnPkt), NChannel1) of case authenticate(?CONNECT_PACKET(NConnPkt), NChannel1) of
{ok, Properties, NChannel2} -> {ok, Properties, NChannel2} ->
process_connect(Properties, ensure_connected(NChannel2)); process_connect(Properties, NChannel2);
{continue, Properties, NChannel2} -> {continue, Properties, NChannel2} ->
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2); handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2);
{error, ReasonCode} -> {error, ReasonCode} ->
@ -381,7 +381,7 @@ handle_in(
{ok, NProperties, NChannel} -> {ok, NProperties, NChannel} ->
case ConnState of case ConnState of
connecting -> connecting ->
process_connect(NProperties, ensure_connected(NChannel)); process_connect(NProperties, NChannel);
_ -> _ ->
handle_out( handle_out(
auth, auth,
@ -611,7 +611,7 @@ process_connect(
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
{ok, #{session := Session, present := false}} -> {ok, #{session := Session, present := false}} ->
NChannel = Channel#channel{session = Session}, NChannel = Channel#channel{session = Session},
handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, NChannel); handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, ensure_connected(NChannel));
{ok, #{session := Session, present := true, pendings := Pendings}} -> {ok, #{session := Session, present := true, pendings := Pendings}} ->
Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())), Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
NChannel = Channel#channel{ NChannel = Channel#channel{
@ -619,7 +619,7 @@ process_connect(
resuming = true, resuming = true,
pendings = Pendings1 pendings = Pendings1
}, },
handle_out(connack, {?RC_SUCCESS, sp(true), AckProps}, NChannel); handle_out(connack, {?RC_SUCCESS, sp(true), AckProps}, ensure_connected(NChannel));
{error, client_id_unavailable} -> {error, client_id_unavailable} ->
handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel); handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel);
{error, Reason} -> {error, Reason} ->