emqx/apps/emqx_exproto/src/emqx_exproto_gcli.erl

139 lines
4.8 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% the gRPC client worker for ConnectionHandler service
-module(emqx_exproto_gcli).
-behaviour(gen_server).
-include_lib("emqx/include/logger.hrl").
-logger_header("[ExProto gClient]").
%% APIs
-export([async_call/3]).
-export([start_link/2]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(state, {
pool,
id,
streams
}).
-define(CONN_ADAPTER_MOD, emqx_exproto_v_1_connection_handler_client).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
start_link(Pool, Id) ->
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)},
?MODULE, [Pool, Id], []).
async_call(FunName, Req = #{conn := Conn}, Options) ->
cast(pick(Conn), {rpc, FunName, Req, Options, self()}).
%%--------------------------------------------------------------------
%% cast, pick
%%--------------------------------------------------------------------
-compile({inline, [cast/2, pick/1]}).
cast(Deliver, Msg) ->
gen_server:cast(Deliver, Msg).
pick(Conn) ->
gproc_pool:pick_worker(exproto_gcli_pool, Conn).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Pool, Id]) ->
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #state{pool = Pool, id = Id, streams = #{}}}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({rpc, Fun, Req, Options, From}, State = #state{streams = Streams}) ->
case ensure_stream_opened(Fun, Options, Streams) of
{error, Reason} ->
?LOG(error, "CALL ~0p:~0p(~0p) failed, reason: ~0p",
[?CONN_ADAPTER_MOD, Fun, Options, Reason]),
reply(From, Fun, {error, Reason}),
{noreply, State#state{streams = Streams#{Fun => undefined}}};
{ok, Stream} ->
case catch grpc_client:send(Stream, Req) of
ok ->
?LOG(debug, "Send to ~s method successfully, request: ~0p", [Fun, Req]),
reply(From, Fun, ok),
{noreply, State#state{streams = Streams#{Fun => Stream}}};
{'EXIT', {not_found, _Stk}} ->
%% Not found the stream, reopen it
?LOG(info, "Can not find the old stream ref for ~s; "
"re-try with a new stream!", [Fun]),
handle_cast({rpc, Fun, Req, Options, From},
State#state{streams = maps:remove(Fun, Streams)});
{'EXIT', {timeout, _Stk}} ->
?LOG(error, "Send to ~s method timeout, request: ~0p", [Fun, Req]),
reply(From, Fun, {error, timeout}),
{noreply, State#state{streams = Streams#{Fun => Stream}}};
{'EXIT', {Reason1, _Stk}} ->
?LOG(error, "Send to ~s method failure, request: ~0p, reason: ~p, "
"stacktrace: ~0p", [Fun, Req, Reason1, _Stk]),
reply(From, Fun, {error, Reason1}),
{noreply, State#state{streams = Streams#{Fun => undefined}}}
end
end.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
reply(Pid, Fun, Result) ->
Pid ! {hreply, Fun, Result},
ok.
ensure_stream_opened(Fun, Options, Streams) ->
case maps:get(Fun, Streams, undefined) of
undefined ->
case apply(?CONN_ADAPTER_MOD, Fun, [Options]) of
{ok, Stream} -> {ok, Stream};
{error, Reason} -> {error, Reason}
end;
Stream -> {ok, Stream}
end.