From 5854bfab5717f52bc9270058d6b47df6111bc117 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sat, 7 Aug 2021 17:03:54 +0800 Subject: [PATCH] fix(exproto): retry the grpc request if the stream closed --- apps/emqx_exproto/src/emqx_exproto_channel.erl | 12 ++++++------ apps/emqx_exproto/src/emqx_exproto_gcli.erl | 13 ++++++++++--- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index cc0d6690b..ba6205c9a 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -243,7 +243,7 @@ handle_timeout(_TRef, {keepalive, StatVal}, end; handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> - {shutdown, {error, {force_close, Reason}}, Channel}; + {shutdown, Reason, Channel}; handle_timeout(_TRef, Msg, Channel) -> ?WARN("Unexpected timeout: ~p", [Msg]), @@ -269,7 +269,7 @@ handle_call({auth, ClientInfo0, Password}, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo), - NConnInfo = enrich_conninfo(ClientInfo1, ConnInfo), + NConnInfo = enrich_conninfo(ClientInfo0, ConnInfo), Channel1 = Channel#channel{conninfo = NConnInfo, clientinfo = ClientInfo1}, @@ -373,13 +373,13 @@ handle_info({sock_closed, Reason}, case queue:len(Queue) =:= 0 andalso Inflight =:= undefined of true -> - Channel1 = ensure_disconnected({sock_closed, Reason}, Channel), - {shutdown, {sock_closed, Reason}, Channel1}; + Channel1 = ensure_disconnected(Reason, Channel), + {shutdown, Reason, Channel1}; _ -> %% delayed close process for flushing all callback funcs to gRPC server - Channel1 = Channel#channel{closed_reason = {sock_closed, Reason}}, + Channel1 = Channel#channel{closed_reason = Reason}, Channel2 = ensure_timer(force_timer, Channel1), - {ok, ensure_disconnected({sock_closed, Reason}, Channel2)} + {ok, ensure_disconnected(Reason, Channel2)} end; handle_info({hreply, on_socket_created, ok}, Channel) -> diff --git a/apps/emqx_exproto/src/emqx_exproto_gcli.erl b/apps/emqx_exproto/src/emqx_exproto_gcli.erl index 650922c4b..26b0e0d35 100644 --- a/apps/emqx_exproto/src/emqx_exproto_gcli.erl +++ b/apps/emqx_exproto/src/emqx_exproto_gcli.erl @@ -89,15 +89,22 @@ handle_cast({rpc, Fun, Req, Options, From}, State = #state{streams = Streams}) - {ok, Stream} -> case catch grpc_client:send(Stream, Req) of ok -> - ?LOG(debug, "Send to ~p method successfully, request: ~0p", [Fun, Req]), + ?LOG(debug, "Send to ~s method successfully, request: ~0p", [Fun, Req]), reply(From, Fun, ok), {noreply, State#state{streams = Streams#{Fun => Stream}}}; + {'EXIT', {not_found, _Stk}} -> + %% Not found the stream, reopen it + ?LOG(info, "Can not find the old stream ref for ~s; " + "re-try with a new stream!", [Fun]), + handle_cast({rpc, Fun, Req, Options, From}, + State#state{streams = maps:remove(Fun, Streams)}); {'EXIT', {timeout, _Stk}} -> - ?LOG(error, "Send to ~p method timeout, request: ~0p", [Fun, Req]), + ?LOG(error, "Send to ~s method timeout, request: ~0p", [Fun, Req]), reply(From, Fun, {error, timeout}), {noreply, State#state{streams = Streams#{Fun => Stream}}}; {'EXIT', {Reason1, _Stk}} -> - ?LOG(error, "Send to ~p method failure, request: ~0p, stacktrace: ~0p", [Fun, Req, _Stk]), + ?LOG(error, "Send to ~s method failure, request: ~0p, reason: ~p, " + "stacktrace: ~0p", [Fun, Req, Reason1, _Stk]), reply(From, Fun, {error, Reason1}), {noreply, State#state{streams = Streams#{Fun => undefined}}} end