fix(exproto): ensure the on_socket_closed event delivering correctly

This commit is contained in:
JianBo He 2023-05-17 17:03:00 +08:00
parent 9dff026f50
commit 2e4ec4888d
2 changed files with 40 additions and 10 deletions

View File

@ -75,6 +75,7 @@
-define(TIMER_TABLE, #{ -define(TIMER_TABLE, #{
alive_timer => keepalive, alive_timer => keepalive,
force_timer => force_close,
idle_timer => force_close_idle idle_timer => force_close_idle
}). }).
@ -300,7 +301,7 @@ handle_timeout(
{ok, Replies, NChannel1} {ok, Replies, NChannel1}
end; end;
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
{shutdown, {error, {force_close, Reason}}, Channel}; {shutdown, Reason, Channel};
handle_timeout(_TRef, force_close_idle, Channel) -> handle_timeout(_TRef, force_close_idle, Channel) ->
{shutdown, idle_timeout, Channel}; {shutdown, idle_timeout, Channel};
handle_timeout(_TRef, Msg, Channel) -> handle_timeout(_TRef, Msg, Channel) ->
@ -327,10 +328,20 @@ handle_call(
Channel = #channel{conn_state = connected} Channel = #channel{conn_state = connected}
) -> ) ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "ingore_duplicated_authorized_command", msg => "ingore_duplicated_authenticate_command",
request_clientinfo => ClientInfo request_clientinfo => ClientInfo
}), }),
{reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
handle_call(
{auth, ClientInfo, _Password},
_From,
Channel = #channel{conn_state = disconnected}
) ->
?SLOG(warning, #{
msg => "authenticate_command_after_socket_disconnected",
request_clientinfo => ClientInfo
}),
{reply, {error, ?RESP_PERMISSION_DENY, <<"Client socket disconnected">>}, Channel};
handle_call( handle_call(
{auth, ClientInfo0, Password}, {auth, ClientInfo0, Password},
_From, _From,
@ -486,13 +497,21 @@ handle_cast(Req, Channel) ->
| {shutdown, Reason :: term(), channel()}. | {shutdown, Reason :: term(), channel()}.
handle_info( handle_info(
{sock_closed, Reason}, {sock_closed, Reason},
Channel Channel = #channel{gcli = GClient}
) -> ) ->
Channel1 = ensure_disconnected({sock_closed, Reason}, Channel), case emqx_exproto_gcli:is_empty(GClient) of
{shutdown, Reason, Channel1}; true ->
Channel1 = ensure_disconnected(Reason, Channel),
{shutdown, Reason, Channel1};
_ ->
%% delayed close process for flushing all callback funcs to gRPC server
Channel1 = Channel#channel{closed_reason = Reason},
Channel2 = ensure_timer(force_timer, Channel1),
{ok, ensure_disconnected(Reason, Channel2)}
end;
handle_info( handle_info(
{hreply, FunName, Result}, {hreply, FunName, Result},
Channel = #channel{gcli = GClient} Channel0 = #channel{gcli = GClient0, timers = Timers}
) when ) when
FunName =:= on_socket_created; FunName =:= on_socket_created;
FunName =:= on_socket_closed; FunName =:= on_socket_closed;
@ -500,12 +519,19 @@ handle_info(
FunName =:= on_received_messages; FunName =:= on_received_messages;
FunName =:= on_timer_timeout FunName =:= on_timer_timeout
-> ->
GClient = emqx_exproto_gcli:ack(FunName, GClient0),
Channel = Channel0#channel{gcli = GClient},
ShutdownNow =
emqx_exproto_gcli:is_empty(GClient) andalso
maps:get(force_timer, Timers, undefined) =/= undefined,
case Result of case Result of
ok -> ok when not ShutdownNow ->
GClient1 = emqx_exproto_gcli:maybe_shoot( GClient1 = emqx_exproto_gcli:maybe_shoot(GClient),
emqx_exproto_gcli:ack(FunName, GClient)
),
{ok, Channel#channel{gcli = GClient1}}; {ok, Channel#channel{gcli = GClient1}};
ok when ShutdownNow ->
Channel1 = cancel_timer(force_timer, Channel),
{shutdown, Channel1#channel.closed_reason, Channel1};
{error, Reason} -> {error, Reason} ->
{shutdown, {error, {FunName, Reason}}, Channel} {shutdown, {error, {FunName, Reason}}, Channel}
end; end;
@ -690,6 +716,8 @@ remove_timer_ref(Name, Channel = #channel{timers = Timers}) ->
interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) -> interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) ->
IdleTimeout; IdleTimeout;
interval(force_timer, _) ->
15000;
interval(alive_timer, #channel{keepalive = Keepalive}) -> interval(alive_timer, #channel{keepalive = Keepalive}) ->
emqx_keepalive:info(interval, Keepalive). emqx_keepalive:info(interval, Keepalive).

View File

@ -176,6 +176,8 @@ call(ConnStr, Req) ->
{error, ?RESP_PARAMS_TYPE_ERROR, <<"The conn type error">>}; {error, ?RESP_PARAMS_TYPE_ERROR, <<"The conn type error">>};
exit:noproc -> exit:noproc ->
{error, ?RESP_CONN_PROCESS_NOT_ALIVE, <<"Connection process is not alive">>}; {error, ?RESP_CONN_PROCESS_NOT_ALIVE, <<"Connection process is not alive">>};
exit:{noproc, _} ->
{error, ?RESP_CONN_PROCESS_NOT_ALIVE, <<"Connection process is not alive">>};
exit:timeout -> exit:timeout ->
{error, ?RESP_UNKNOWN, <<"Connection is not answered">>}; {error, ?RESP_UNKNOWN, <<"Connection is not answered">>};
Class:Reason:Stk -> Class:Reason:Stk ->