emqx/apps/emqx_gateway_exproto/src/emqx_exproto_gcli.erl

211 lines
6.7 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% the gRPC client agent for ConnectionHandler service
-module(emqx_exproto_gcli).
-include_lib("emqx/include/logger.hrl").
-logger_header("[ExProtoGCli]").
-export([
init/2,
maybe_shoot/1,
maybe_shoot/3,
ack/2,
is_empty/1
]).
-define(CONN_HANDLER_MOD, emqx_exproto_v_1_connection_handler_client).
-define(CONN_UNARY_HANDLER_MOD, emqx_exproto_v_1_connection_unary_handler_client).
-type service_name() :: 'ConnectionUnaryHandler' | 'ConnectionHandler'.
-type grpc_client_state() ::
#{
owner := pid(),
service_name := service_name(),
client_opts := options(),
queue := queue:queue(),
inflight := atom() | undefined,
streams => map(),
middleman => pid() | undefined
}.
-type options() ::
#{channel := term()}.
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-spec init(service_name(), options()) -> grpc_client_state().
init(ServiceName, Options) ->
#{
owner => self(),
service_name => ServiceName,
client_opts => Options,
queue => queue:new(),
inflight => undefined
}.
-spec maybe_shoot(atom(), map(), grpc_client_state()) -> grpc_client_state().
maybe_shoot(FunName, Req, GState = #{inflight := undefined}) ->
shoot(FunName, Req, GState);
maybe_shoot(FunName, Req, GState) ->
enqueue(FunName, Req, GState).
-spec maybe_shoot(grpc_client_state()) -> grpc_client_state().
maybe_shoot(GState = #{inflight := undefined, queue := Q}) ->
case queue:is_empty(Q) of
true ->
GState;
false ->
{{value, {FunName, Req}}, Q1} = queue:out(Q),
shoot(FunName, Req, GState#{queue => Q1})
end.
-spec ack(atom(), grpc_client_state()) -> grpc_client_state().
ack(FunName, GState = #{inflight := FunName}) ->
GState#{inflight => undefined, middleman => undefined};
ack(_, _) ->
error(badarg).
-spec is_empty(grpc_client_state()) -> boolean().
is_empty(#{queue := Q, inflight := Inflight}) ->
Inflight == undefined andalso queue:is_empty(Q).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
enqueue(FunName, Req, GState = #{queue := Q}) ->
GState#{queue => queue:in({FunName, Req}, Q)}.
shoot(FunName, Req, GState) ->
ServiceName = maps:get(service_name, GState),
shoot(ServiceName, FunName, Req, GState).
shoot(
'ConnectionUnaryHandler',
FunName,
Req,
GState = #{owner := Owner, client_opts := Options}
) ->
Pid =
spawn(
fun() ->
try
Result = request(FunName, Req, Options),
hreply(Owner, Result, FunName)
catch
T:R:Stk ->
hreply(Owner, {error, {{T, R}, Stk}}, FunName)
end
end
),
GState#{inflight => FunName, middleman => Pid};
shoot(
'ConnectionHandler',
FunName,
Req,
GState
) ->
GState1 = streaming(FunName, Req, GState),
GState1#{inflight => FunName}.
%%--------------------------------------------------------------------
%% streaming
streaming(
FunName,
Req,
GState = #{owner := Owner, client_opts := Options}
) ->
Streams = maps:get(streams, GState, #{}),
case ensure_stream_opened(FunName, Options, Streams) of
{error, Reason} ->
?SLOG(error, #{
msg => "request_grpc_server_failed",
function => {FunName, Options},
reason => Reason
}),
hreply(Owner, {error, Reason}, FunName),
{ok, GState};
{ok, Stream} ->
case catch grpc_client:send(Stream, Req) of
ok ->
?SLOG(debug, #{
msg => "send_grpc_request_succeed",
function => FunName,
request => Req
}),
hreply(Owner, ok, FunName),
GState#{streams => Streams#{FunName => Stream}};
{'EXIT', {not_found, _Stk}} ->
%% Not found the stream, reopen it
?SLOG(info, #{
msg => "cannt_find_old_stream_ref",
function => FunName
}),
streaming(FunName, Req, GState#{streams => maps:remove(FunName, Streams)});
{'EXIT', {timeout, _Stk}} ->
?SLOG(error, #{
msg => "send_grpc_request_timeout",
function => FunName,
request => Req
}),
hreply(Owner, {error, timeout}, FunName),
GState;
{'EXIT', {Reason1, Stk}} ->
?SLOG(error, #{
msg => "send_grpc_request_failed",
function => FunName,
request => Req,
error => Reason1,
stacktrace => Stk
}),
hreply(Owner, {error, Reason1}, FunName),
GState
end
end.
ensure_stream_opened(FunName, Options, Streams) ->
case maps:get(FunName, Streams, undefined) of
undefined ->
case apply(?CONN_HANDLER_MOD, FunName, [Options]) of
{ok, Stream} -> {ok, Stream};
{error, Reason} -> {error, Reason}
end;
Stream ->
{ok, Stream}
end.
%%--------------------------------------------------------------------
%% unary
request(FunName, Req, Options) ->
case apply(?CONN_UNARY_HANDLER_MOD, FunName, [Req, Options]) of
{ok, _EmptySucc, _Md} ->
ok;
{error, Reason} ->
{error, Reason}
end.
hreply(Owner, Result, FunName) ->
Owner ! {hreply, FunName, Result},
ok.