From 2e4ec4888da9cc4a353ce98ecd57f016c8581de4 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 17 May 2023 17:03:00 +0800 Subject: [PATCH] fix(exproto): ensure the on_socket_closed event delivering correctly --- .../src/emqx_exproto_channel.erl | 48 +++++++++++++++---- .../src/emqx_exproto_gsvr.erl | 2 + 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl index 9e87bd58f..0caf4d7b2 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl @@ -75,6 +75,7 @@ -define(TIMER_TABLE, #{ alive_timer => keepalive, + force_timer => force_close, idle_timer => force_close_idle }). @@ -300,7 +301,7 @@ handle_timeout( {ok, Replies, NChannel1} end; handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> - {shutdown, {error, {force_close, Reason}}, Channel}; + {shutdown, Reason, Channel}; handle_timeout(_TRef, force_close_idle, Channel) -> {shutdown, idle_timeout, Channel}; handle_timeout(_TRef, Msg, Channel) -> @@ -327,10 +328,20 @@ handle_call( Channel = #channel{conn_state = connected} ) -> ?SLOG(warning, #{ - msg => "ingore_duplicated_authorized_command", + msg => "ingore_duplicated_authenticate_command", request_clientinfo => ClientInfo }), {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; +handle_call( + {auth, ClientInfo, _Password}, + _From, + Channel = #channel{conn_state = disconnected} +) -> + ?SLOG(warning, #{ + msg => "authenticate_command_after_socket_disconnected", + request_clientinfo => ClientInfo + }), + {reply, {error, ?RESP_PERMISSION_DENY, <<"Client socket disconnected">>}, Channel}; handle_call( {auth, ClientInfo0, Password}, _From, @@ -486,13 +497,21 @@ handle_cast(Req, Channel) -> | {shutdown, Reason :: term(), channel()}. handle_info( {sock_closed, Reason}, - Channel + Channel = #channel{gcli = GClient} ) -> - Channel1 = ensure_disconnected({sock_closed, Reason}, Channel), - {shutdown, Reason, Channel1}; + case emqx_exproto_gcli:is_empty(GClient) of + true -> + Channel1 = ensure_disconnected(Reason, Channel), + {shutdown, Reason, Channel1}; + _ -> + %% delayed close process for flushing all callback funcs to gRPC server + Channel1 = Channel#channel{closed_reason = Reason}, + Channel2 = ensure_timer(force_timer, Channel1), + {ok, ensure_disconnected(Reason, Channel2)} + end; handle_info( {hreply, FunName, Result}, - Channel = #channel{gcli = GClient} + Channel0 = #channel{gcli = GClient0, timers = Timers} ) when FunName =:= on_socket_created; FunName =:= on_socket_closed; @@ -500,12 +519,19 @@ handle_info( FunName =:= on_received_messages; FunName =:= on_timer_timeout -> + GClient = emqx_exproto_gcli:ack(FunName, GClient0), + Channel = Channel0#channel{gcli = GClient}, + + ShutdownNow = + emqx_exproto_gcli:is_empty(GClient) andalso + maps:get(force_timer, Timers, undefined) =/= undefined, case Result of - ok -> - GClient1 = emqx_exproto_gcli:maybe_shoot( - emqx_exproto_gcli:ack(FunName, GClient) - ), + ok when not ShutdownNow -> + GClient1 = emqx_exproto_gcli:maybe_shoot(GClient), {ok, Channel#channel{gcli = GClient1}}; + ok when ShutdownNow -> + Channel1 = cancel_timer(force_timer, Channel), + {shutdown, Channel1#channel.closed_reason, Channel1}; {error, Reason} -> {shutdown, {error, {FunName, Reason}}, Channel} end; @@ -690,6 +716,8 @@ remove_timer_ref(Name, Channel = #channel{timers = Timers}) -> interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) -> IdleTimeout; +interval(force_timer, _) -> + 15000; interval(alive_timer, #channel{keepalive = Keepalive}) -> emqx_keepalive:info(interval, Keepalive). diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl index 5bbe7bf37..e048fd73e 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl @@ -176,6 +176,8 @@ call(ConnStr, Req) -> {error, ?RESP_PARAMS_TYPE_ERROR, <<"The conn type error">>}; exit:noproc -> {error, ?RESP_CONN_PROCESS_NOT_ALIVE, <<"Connection process is not alive">>}; + exit:{noproc, _} -> + {error, ?RESP_CONN_PROCESS_NOT_ALIVE, <<"Connection process is not alive">>}; exit:timeout -> {error, ?RESP_UNKNOWN, <<"Connection is not answered">>}; Class:Reason:Stk ->