fix(exproto): retry the grpc request if the stream closed
This commit is contained in:
parent
bf0036bf81
commit
5854bfab57
|
@ -243,7 +243,7 @@ handle_timeout(_TRef, {keepalive, StatVal},
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
|
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
|
||||||
{shutdown, {error, {force_close, Reason}}, Channel};
|
{shutdown, Reason, Channel};
|
||||||
|
|
||||||
handle_timeout(_TRef, Msg, Channel) ->
|
handle_timeout(_TRef, Msg, Channel) ->
|
||||||
?WARN("Unexpected timeout: ~p", [Msg]),
|
?WARN("Unexpected timeout: ~p", [Msg]),
|
||||||
|
@ -269,7 +269,7 @@ handle_call({auth, ClientInfo0, Password},
|
||||||
Channel = #channel{conninfo = ConnInfo,
|
Channel = #channel{conninfo = ConnInfo,
|
||||||
clientinfo = ClientInfo}) ->
|
clientinfo = ClientInfo}) ->
|
||||||
ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo),
|
ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo),
|
||||||
NConnInfo = enrich_conninfo(ClientInfo1, ConnInfo),
|
NConnInfo = enrich_conninfo(ClientInfo0, ConnInfo),
|
||||||
|
|
||||||
Channel1 = Channel#channel{conninfo = NConnInfo,
|
Channel1 = Channel#channel{conninfo = NConnInfo,
|
||||||
clientinfo = ClientInfo1},
|
clientinfo = ClientInfo1},
|
||||||
|
@ -373,13 +373,13 @@ handle_info({sock_closed, Reason},
|
||||||
case queue:len(Queue) =:= 0
|
case queue:len(Queue) =:= 0
|
||||||
andalso Inflight =:= undefined of
|
andalso Inflight =:= undefined of
|
||||||
true ->
|
true ->
|
||||||
Channel1 = ensure_disconnected({sock_closed, Reason}, Channel),
|
Channel1 = ensure_disconnected(Reason, Channel),
|
||||||
{shutdown, {sock_closed, Reason}, Channel1};
|
{shutdown, Reason, Channel1};
|
||||||
_ ->
|
_ ->
|
||||||
%% delayed close process for flushing all callback funcs to gRPC server
|
%% delayed close process for flushing all callback funcs to gRPC server
|
||||||
Channel1 = Channel#channel{closed_reason = {sock_closed, Reason}},
|
Channel1 = Channel#channel{closed_reason = Reason},
|
||||||
Channel2 = ensure_timer(force_timer, Channel1),
|
Channel2 = ensure_timer(force_timer, Channel1),
|
||||||
{ok, ensure_disconnected({sock_closed, Reason}, Channel2)}
|
{ok, ensure_disconnected(Reason, Channel2)}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({hreply, on_socket_created, ok}, Channel) ->
|
handle_info({hreply, on_socket_created, ok}, Channel) ->
|
||||||
|
|
|
@ -89,15 +89,22 @@ handle_cast({rpc, Fun, Req, Options, From}, State = #state{streams = Streams}) -
|
||||||
{ok, Stream} ->
|
{ok, Stream} ->
|
||||||
case catch grpc_client:send(Stream, Req) of
|
case catch grpc_client:send(Stream, Req) of
|
||||||
ok ->
|
ok ->
|
||||||
?LOG(debug, "Send to ~p method successfully, request: ~0p", [Fun, Req]),
|
?LOG(debug, "Send to ~s method successfully, request: ~0p", [Fun, Req]),
|
||||||
reply(From, Fun, ok),
|
reply(From, Fun, ok),
|
||||||
{noreply, State#state{streams = Streams#{Fun => Stream}}};
|
{noreply, State#state{streams = Streams#{Fun => Stream}}};
|
||||||
|
{'EXIT', {not_found, _Stk}} ->
|
||||||
|
%% Not found the stream, reopen it
|
||||||
|
?LOG(info, "Can not find the old stream ref for ~s; "
|
||||||
|
"re-try with a new stream!", [Fun]),
|
||||||
|
handle_cast({rpc, Fun, Req, Options, From},
|
||||||
|
State#state{streams = maps:remove(Fun, Streams)});
|
||||||
{'EXIT', {timeout, _Stk}} ->
|
{'EXIT', {timeout, _Stk}} ->
|
||||||
?LOG(error, "Send to ~p method timeout, request: ~0p", [Fun, Req]),
|
?LOG(error, "Send to ~s method timeout, request: ~0p", [Fun, Req]),
|
||||||
reply(From, Fun, {error, timeout}),
|
reply(From, Fun, {error, timeout}),
|
||||||
{noreply, State#state{streams = Streams#{Fun => Stream}}};
|
{noreply, State#state{streams = Streams#{Fun => Stream}}};
|
||||||
{'EXIT', {Reason1, _Stk}} ->
|
{'EXIT', {Reason1, _Stk}} ->
|
||||||
?LOG(error, "Send to ~p method failure, request: ~0p, stacktrace: ~0p", [Fun, Req, _Stk]),
|
?LOG(error, "Send to ~s method failure, request: ~0p, reason: ~p, "
|
||||||
|
"stacktrace: ~0p", [Fun, Req, Reason1, _Stk]),
|
||||||
reply(From, Fun, {error, Reason1}),
|
reply(From, Fun, {error, Reason1}),
|
||||||
{noreply, State#state{streams = Streams#{Fun => undefined}}}
|
{noreply, State#state{streams = Streams#{Fun => undefined}}}
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue