diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index be216cfda..49fe6c652 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -271,7 +271,7 @@ handle_call({auth, ClientInfo0, Password}, _From, conninfo = ConnInfo, clientinfo = ClientInfo}) -> ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo), - ConnInfo1 = enrich_conninfo(ClientInfo1, ConnInfo), + ConnInfo1 = enrich_conninfo(ClientInfo0, ConnInfo), Channel1 = Channel#channel{conninfo = ConnInfo1, clientinfo = ClientInfo1}, @@ -383,7 +383,7 @@ handle_info({sock_closed, Reason}, andalso Inflight =:= undefined of true -> Channel1 = ensure_disconnected({sock_closed, Reason}, Channel), - {shutdown, {sock_closed, Reason}, Channel1}; + {shutdown, Reason, Channel1}; _ -> %% delayed close process for flushing all callback funcs to gRPC server Channel1 = Channel#channel{closed_reason = {sock_closed, Reason}}, diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl index f30e2b6f7..82adc8fb3 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl @@ -89,15 +89,25 @@ handle_cast({rpc, Fun, Req, Options, From}, State = #state{streams = Streams}) - {ok, Stream} -> case catch grpc_client:send(Stream, Req) of ok -> - ?LOG(debug, "Send to ~p method successfully, request: ~0p", [Fun, Req]), + ?LOG(debug, "Send to ~p method successfully, request: ~0p", + [Fun, Req]), reply(From, Fun, ok), {noreply, State#state{streams = Streams#{Fun => Stream}}}; + {'EXIT', {not_found, _Stk}} -> + %% Not found the stream, reopen it + ?LOG(info, "Can not find the old stream ref for ~s; " + "re-try with a new stream!", [Fun]), + handle_cast( + {rpc, Fun, Req, Options, From}, + State#state{streams = maps:remove(Fun, Streams)}); {'EXIT', {timeout, _Stk}} -> - ?LOG(error, "Send to ~p method timeout, request: ~0p", [Fun, Req]), + ?LOG(error, "Send to ~p method timeout, request: ~0p", + [Fun, Req]), reply(From, Fun, {error, timeout}), {noreply, State#state{streams = Streams#{Fun => Stream}}}; {'EXIT', {Reason1, _Stk}} -> - ?LOG(error, "Send to ~p method failure, request: ~0p, stacktrace: ~0p", [Fun, Req, _Stk]), + ?LOG(error, "Send to ~p method failure, request: ~0p, " + "stacktrace: ~0p", [Fun, Req, _Stk]), reply(From, Fun, {error, Reason1}), {noreply, State#state{streams = Streams#{Fun => undefined}}} end