fix(gw-exproto): fix diaylzer warnings

This commit is contained in:
JianBo He 2021-07-22 21:27:19 +08:00
parent 5f47ceb118
commit fd828ad216
3 changed files with 24 additions and 724 deletions

View File

@ -100,11 +100,11 @@
%%--------------------------------------------------------------------
%% @doc Get infos of the channel.
-spec(info(channel()) -> emqx_types:infos()).
-spec info(channel()) -> emqx_types:infos().
info(Channel) ->
maps:from_list(info(?INFO_KEYS, Channel)).
-spec(info(list(atom())|atom(), channel()) -> term()).
-spec info(list(atom())|atom(), channel()) -> term().
info(Keys, Channel) when is_list(Keys) ->
[{Key, info(Key, Channel)} || Key <- Keys];
info(conninfo, #channel{conninfo = ConnInfo}) ->
@ -127,7 +127,7 @@ info(will_msg, _) ->
info(ctx, #channel{ctx = Ctx}) ->
Ctx.
-spec(stats(channel()) -> emqx_types:stats()).
-spec stats(channel()) -> emqx_types:stats().
stats(#channel{subscriptions = Subs}) ->
[{subscriptions_cnt, maps:size(Subs)},
{subscriptions_max, 0},
@ -144,7 +144,7 @@ stats(#channel{subscriptions = Subs}) ->
%% Init the channel
%%--------------------------------------------------------------------
-spec(init(emqx_exproto_types:conninfo(), proplists:proplist()) -> channel()).
-spec init(emqx_exproto_types:conninfo(), map()) -> channel().
init(ConnInfo = #{socktype := Socktype,
peername := Peername,
sockname := Sockname,
@ -201,16 +201,16 @@ address({Host, Port}) ->
%% Handle incoming packet
%%--------------------------------------------------------------------
-spec(handle_in(binary(), channel())
-spec handle_in(binary(), channel())
-> {ok, channel()}
| {shutdown, Reason :: term(), channel()}).
| {shutdown, Reason :: term(), channel()}.
handle_in(Data, Channel) ->
Req = #{bytes => Data},
{ok, try_dispatch(on_received_bytes, wrap(Req), Channel)}.
-spec(handle_deliver(list(emqx_types:deliver()), channel())
-spec handle_deliver(list(emqx_types:deliver()), channel())
-> {ok, channel()}
| {shutdown, Reason :: term(), channel()}).
| {shutdown, Reason :: term(), channel()}.
handle_deliver(Delivers, Channel = #channel{ctx = Ctx,
clientinfo = ClientInfo}) ->
%% XXX: ?? Nack delivers from shared subscriptions
@ -233,9 +233,9 @@ handle_deliver(Delivers, Channel = #channel{ctx = Ctx,
Req = #{messages => Msgs},
{ok, try_dispatch(on_received_messages, wrap(Req), Channel)}.
-spec(handle_timeout(reference(), Msg :: term(), channel())
-spec handle_timeout(reference(), Msg :: term(), channel())
-> {ok, channel()}
| {shutdown, Reason :: term(), channel()}).
| {shutdown, Reason :: term(), channel()}.
handle_timeout(_TRef, {keepalive, _StatVal},
Channel = #channel{keepalive = undefined}) ->
{ok, Channel};
@ -257,10 +257,10 @@ handle_timeout(_TRef, Msg, Channel) ->
?WARN("Unexpected timeout: ~p", [Msg]),
{ok, Channel}.
-spec(handle_call(any(), channel())
-> {reply, Reply :: term(), channel()}
| {reply, Reply :: term(), replies(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), channel()}).
-spec handle_call(any(), channel())
-> {reply, Reply :: term(), channel()}
| {reply, Reply :: term(), replies(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), channel()}.
handle_call({send, Data}, Channel) ->
{reply, ok, [{outgoing, Data}], Channel};
@ -355,17 +355,17 @@ handle_call(Req, Channel) ->
?LOG(warning, "Unexpected call: ~p", [Req]),
{reply, {error, unexpected_call}, Channel}.
-spec(handle_cast(any(), channel())
-> {ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}).
-spec handle_cast(any(), channel())
-> {ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}.
handle_cast(Req, Channel) ->
?WARN("Unexpected call: ~p", [Req]),
{ok, Channel}.
-spec(handle_info(any(), channel())
-> {ok, channel()}
| {shutdown, Reason :: term(), channel()}).
-spec handle_info(any(), channel())
-> {ok, channel()}
| {shutdown, Reason :: term(), channel()}.
handle_info({subscribe, TopicFilters}, Channel) ->
do_subscribe(TopicFilters, Channel);
@ -401,7 +401,7 @@ handle_info(Info, Channel) ->
?LOG(warning, "Unexpected info: ~p", [Info]),
{ok, Channel}.
-spec(terminate(any(), channel()) -> channel()).
-spec terminate(any(), channel()) -> channel().
terminate(Reason, Channel) ->
Req = #{reason => stringfy(Reason)},
try_dispatch(on_socket_closed, wrap(Req), Channel).

View File

@ -1,700 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% TCP/TLS/UDP/DTLS Connection
-module(emqx_exproto_conn).
-include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[ExProto Conn]").
%% API
-export([ start_link/3
, stop/1
]).
-export([ info/1
, stats/1
]).
-export([ call/2
, call/3
, cast/2
]).
%% Callback
-export([init/4]).
%% Sys callbacks
-export([ system_continue/3
, system_terminate/4
, system_code_change/4
, system_get_state/1
]).
%% Internal callback
-export([wakeup_from_hib/2]).
-import(emqx_misc, [start_timer/2]).
-record(state, {
%% TCP/SSL/UDP/DTLS Wrapped Socket
socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
%% Peername of the connection
peername :: emqx_types:peername(),
%% Sockname of the connection
sockname :: emqx_types:peername(),
%% Sock State
sockstate :: emqx_types:sockstate(),
%% The {active, N} option
active_n :: pos_integer(),
%% BACKW: e4.2.0-e4.2.1
%% We should remove it
sendfun :: function() | undefined,
%% Limiter
limiter :: maybe(emqx_limiter:limiter()),
%% Limit Timer
limit_timer :: maybe(reference()),
%% Channel State
channel :: emqx_exproto_channel:channel(),
%% GC State
gc_state :: maybe(emqx_gc:gc_state()),
%% Stats Timer
stats_timer :: disabled | maybe(reference()),
%% Idle Timeout
idle_timeout :: integer(),
%% Idle Timer
idle_timer :: maybe(reference())
}).
-type(state() :: #state{}).
-define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
-define(ENABLED(X), (X =/= undefined)).
-dialyzer({nowarn_function,
[ system_terminate/4
, handle_call/3
, handle_msg/2
, shutdown/3
, stop/3
]}).
%% udp
start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) ->
Args = [self(), Socket, Peername, Options],
{ok, proc_lib:spawn_link(?MODULE, init, Args)};
%% tcp/ssl/dtls
start_link(esockd_transport, Sock, Options) ->
Socket = {esockd_transport, Sock},
Args = [self(), Socket, undefined, Options],
{ok, proc_lib:spawn_link(?MODULE, init, Args)}.
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%% @doc Get infos of the connection/channel.
-spec(info(pid()|state()) -> emqx_types:infos()).
info(CPid) when is_pid(CPid) ->
call(CPid, info);
info(State = #state{channel = Channel}) ->
ChanInfo = emqx_exproto_channel:info(Channel),
SockInfo = maps:from_list(
info(?INFO_KEYS, State)),
ChanInfo#{sockinfo => SockInfo}.
info(Keys, State) when is_list(Keys) ->
[{Key, info(Key, State)} || Key <- Keys];
info(socktype, #state{socket = Socket}) ->
esockd_type(Socket);
info(peername, #state{peername = Peername}) ->
Peername;
info(sockname, #state{sockname = Sockname}) ->
Sockname;
info(sockstate, #state{sockstate = SockSt}) ->
SockSt;
info(active_n, #state{active_n = ActiveN}) ->
ActiveN.
-spec(stats(pid()|state()) -> emqx_types:stats()).
stats(CPid) when is_pid(CPid) ->
call(CPid, stats);
stats(#state{socket = Socket,
channel = Channel}) ->
SockStats = case esockd_getstat(Socket, ?SOCK_STATS) of
{ok, Ss} -> Ss;
{error, _} -> []
end,
ConnStats = emqx_pd:get_counters(?CONN_STATS),
ChanStats = emqx_exproto_channel:stats(Channel),
ProcStats = emqx_misc:proc_stats(),
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
call(Pid, Req) ->
call(Pid, Req, infinity).
call(Pid, Req, Timeout) ->
gen_server:call(Pid, Req, Timeout).
cast(Pid, Req) ->
gen_server:cast(Pid, Req).
stop(Pid) ->
gen_server:stop(Pid).
%%--------------------------------------------------------------------
%% Wrapped funcs
%%--------------------------------------------------------------------
esockd_peername({udp, _SockPid, _Sock}, Peername) ->
Peername;
esockd_peername({esockd_transport, Sock}, _Peername) ->
{ok, Peername} = esockd_transport:ensure_ok_or_exit(peername, [Sock]),
Peername.
esockd_wait(Socket = {udp, _SockPid, _Sock}) ->
{ok, Socket};
esockd_wait({esockd_transport, Sock}) ->
case esockd_transport:wait(Sock) of
{ok, NSock} -> {ok, {esockd_transport, NSock}};
R = {error, _} -> R
end.
esockd_close({udp, _SockPid, _Sock}) ->
%% nothing to do for udp socket
%%gen_udp:close(Sock);
ok;
esockd_close({esockd_transport, Sock}) ->
esockd_transport:fast_close(Sock).
esockd_ensure_ok_or_exit(peercert, {udp, _SockPid, _Sock}) ->
nossl;
esockd_ensure_ok_or_exit(Fun, {udp, _SockPid, Sock}) ->
esockd_transport:ensure_ok_or_exit(Fun, [Sock]);
esockd_ensure_ok_or_exit(Fun, {esockd_transport, Socket}) ->
esockd_transport:ensure_ok_or_exit(Fun, [Socket]).
esockd_type({udp, _, _}) ->
udp;
esockd_type({esockd_transport, Socket}) ->
esockd_transport:type(Socket).
esockd_setopts({udp, _, _}, _) ->
ok;
esockd_setopts({esockd_transport, Socket}, Opts) ->
%% FIXME: DTLS works??
esockd_transport:setopts(Socket, Opts).
esockd_getstat({udp, _SockPid, Sock}, Stats) ->
inet:getstat(Sock, Stats);
esockd_getstat({esockd_transport, Sock}, Stats) ->
esockd_transport:getstat(Sock, Stats).
send(Data, #state{socket = {udp, _SockPid, Sock}, peername = {Ip, Port}}) ->
gen_udp:send(Sock, Ip, Port, Data);
send(Data, #state{socket = {esockd_transport, Sock}}) ->
esockd_transport:async_send(Sock, Data).
%%--------------------------------------------------------------------
%% callbacks
%%--------------------------------------------------------------------
-define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}).
-define(DEFAULT_IDLE_TIMEOUT, 30000).
-define(DEFAULT_OOM_POLICY, #{enable => true, max_heap_size => 4194304,
max_message_queue_len => 32000}).
init(Parent, WrappedSock, Peername0, Options) ->
case esockd_wait(WrappedSock) of
{ok, NWrappedSock} ->
Peername = esockd_peername(NWrappedSock, Peername0),
run_loop(Parent, init_state(NWrappedSock, Peername, Options));
{error, Reason} ->
ok = esockd_close(WrappedSock),
exit_on_sock_error(Reason)
end.
init_state(WrappedSock, Peername, Options) ->
{ok, Sockname} = esockd_ensure_ok_or_exit(sockname, WrappedSock),
Peercert = esockd_ensure_ok_or_exit(peercert, WrappedSock),
ConnInfo = #{socktype => esockd_type(WrappedSock),
peername => Peername,
sockname => Sockname,
peercert => Peercert,
conn_mod => ?MODULE
},
ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
%% FIXME:
%%Limiter = emqx_limiter:init(Options),
Channel = emqx_exproto_channel:init(ConnInfo, Options),
GcState = emqx_gc:init(?DEFAULT_GC_OPTS),
IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT),
IdleTimer = start_timer(IdleTimeout, idle_timeout),
#state{socket = WrappedSock,
peername = Peername,
sockname = Sockname,
sockstate = idle,
active_n = ActiveN,
sendfun = undefined,
limiter = undefined,
channel = Channel,
gc_state = GcState,
stats_timer = undefined,
idle_timeout = IdleTimeout,
idle_timer = IdleTimer
}.
run_loop(Parent, State = #state{socket = Socket,
peername = Peername}) ->
emqx_logger:set_metadata_peername(esockd:format(Peername)),
_ = emqx_misc:tune_heap_size(?DEFAULT_OOM_POLICY),
case activate_socket(State) of
{ok, NState} ->
hibernate(Parent, NState);
{error, Reason} ->
ok = esockd_close(Socket),
exit_on_sock_error(Reason)
end.
-spec exit_on_sock_error(atom()) -> no_return().
exit_on_sock_error(Reason) when Reason =:= einval;
Reason =:= enotconn;
Reason =:= closed ->
erlang:exit(normal);
exit_on_sock_error(timeout) ->
erlang:exit({shutdown, ssl_upgrade_timeout});
exit_on_sock_error(Reason) ->
erlang:exit({shutdown, Reason}).
%%--------------------------------------------------------------------
%% Recv Loop
recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State))
after
IdleTimeout ->
hibernate(Parent, cancel_stats_timer(State))
end.
hibernate(Parent, State) ->
proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).
%% Maybe do something here later.
wakeup_from_hib(Parent, State) -> recvloop(Parent, State).
%%--------------------------------------------------------------------
%% Ensure/cancel stats timer
-compile({inline, [ensure_stats_timer/2]}).
ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
State#state{stats_timer = start_timer(Timeout, emit_stats)};
ensure_stats_timer(_Timeout, State) -> State.
-compile({inline, [cancel_stats_timer/1]}).
cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) ->
ok = emqx_misc:cancel_timer(TRef),
State#state{stats_timer = undefined};
cancel_stats_timer(State) -> State.
%%--------------------------------------------------------------------
%% Process next Msg
process_msg([], Parent, State) -> recvloop(Parent, State);
process_msg([Msg|More], Parent, State) ->
case catch handle_msg(Msg, State) of
ok ->
process_msg(More, Parent, State);
{ok, NState} ->
process_msg(More, Parent, NState);
{ok, Msgs, NState} ->
process_msg(append_msg(More, Msgs), Parent, NState);
{stop, Reason} ->
terminate(Reason, State);
{stop, Reason, NState} ->
terminate(Reason, NState);
{'EXIT', Reason} ->
terminate(Reason, State)
end.
-compile({inline, [append_msg/2]}).
append_msg([], Msgs) when is_list(Msgs) ->
Msgs;
append_msg([], Msg) -> [Msg];
append_msg(Q, Msgs) when is_list(Msgs) ->
lists:append(Q, Msgs);
append_msg(Q, Msg) ->
lists:append(Q, [Msg]).
%%--------------------------------------------------------------------
%% Handle a Msg
handle_msg({'$gen_call', From, Req}, State) ->
case handle_call(From, Req, State) of
{reply, Reply, NState} ->
gen_server:reply(From, Reply),
{ok, NState};
{reply, Reply, Msgs, NState} ->
gen_server:reply(From, Reply),
{ok, next_msgs(Msgs), NState};
{stop, Reason, Reply, NState} ->
gen_server:reply(From, Reply),
stop(Reason, NState)
end;
handle_msg({'$gen_cast', Req}, State) ->
with_channel(handle_cast, [Req], State);
handle_msg({datagram, _SockPid, Data}, State) ->
process_incoming(Data, State);
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
process_incoming(Data, State);
handle_msg({outgoing, Data}, State) ->
handle_outgoing(Data, State);
handle_msg({Error, _Sock, Reason}, State)
when Error == tcp_error; Error == ssl_error ->
handle_info({sock_error, Reason}, State);
handle_msg({Closed, _Sock}, State)
when Closed == tcp_closed; Closed == ssl_closed ->
handle_info({sock_closed, Closed}, close_socket(State));
%% TODO: udp_passive???
handle_msg({Passive, _Sock}, State)
when Passive == tcp_passive; Passive == ssl_passive ->
%% In Stats
Bytes = emqx_pd:reset_counter(incoming_bytes),
Pubs = emqx_pd:reset_counter(incoming_pkt),
InStats = #{cnt => Pubs, oct => Bytes},
%% Ensure Rate Limit
NState = ensure_rate_limit(InStats, State),
%% Run GC and Check OOM
NState1 = check_oom(run_gc(InStats, NState)),
handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg},
State = #state{active_n = ActiveN}) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
%% TODO: Who will deliver this message?
handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
case emqx_pd:get_counter(outgoing_pkt) > ActiveN of
true ->
Pubs = emqx_pd:reset_counter(outgoing_pkt),
Bytes = emqx_pd:reset_counter(outgoing_bytes),
OutStats = #{cnt => Pubs, oct => Bytes},
{ok, check_oom(run_gc(OutStats, State))};
false -> ok
end;
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
handle_info({sock_error, Reason}, State);
handle_msg({close, Reason}, State) ->
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State));
handle_msg({event, connected}, State = #state{channel = Channel}) ->
ClientId = emqx_exproto_channel:info(clientid, Channel),
emqx_cm:insert_channel_info(ClientId, info(State), stats(State));
handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
ClientId = emqx_exproto_channel:info(clientid, Channel),
emqx_cm:set_chan_info(ClientId, info(State)),
emqx_cm:connection_closed(ClientId),
{ok, State};
%handle_msg({event, _Other}, State = #state{channel = Channel}) ->
% ClientId = emqx_exproto_channel:info(clientid, Channel),
% emqx_cm:set_chan_info(ClientId, info(State)),
% emqx_cm:set_chan_stats(ClientId, stats(State)),
% {ok, State};
handle_msg({timeout, TRef, TMsg}, State) ->
handle_timeout(TRef, TMsg, State);
handle_msg(Shutdown = {shutdown, _Reason}, State) ->
stop(Shutdown, State);
handle_msg(Msg, State) ->
handle_info(Msg, State).
%%--------------------------------------------------------------------
%% Terminate
-spec terminate(atom(), state()) -> no_return().
terminate(Reason, State = #state{channel = Channel}) ->
?LOG(debug, "Terminated due to ~p", [Reason]),
_ = emqx_exproto_channel:terminate(Reason, Channel),
_ = close_socket(State),
exit(Reason).
%%--------------------------------------------------------------------
%% Sys callbacks
system_continue(Parent, _Debug, State) ->
recvloop(Parent, State).
system_terminate(Reason, _Parent, _Debug, State) ->
terminate(Reason, State).
system_code_change(State, _Mod, _OldVsn, _Extra) ->
{ok, State}.
system_get_state(State) -> {ok, State}.
%%--------------------------------------------------------------------
%% Handle call
handle_call(_From, info, State) ->
{reply, info(State), State};
handle_call(_From, stats, State) ->
{reply, stats(State), State};
handle_call(_From, Req, State = #state{channel = Channel}) ->
case emqx_exproto_channel:handle_call(Req, Channel) of
{reply, Reply, NChannel} ->
{reply, Reply, State#state{channel = NChannel}};
{reply, Reply, Replies, NChannel} ->
{reply, Reply, Replies, State#state{channel = NChannel}};
{shutdown, Reason, Reply, NChannel} ->
shutdown(Reason, Reply, State#state{channel = NChannel})
end.
%%--------------------------------------------------------------------
%% Handle timeout
handle_timeout(_TRef, idle_timeout, State) ->
shutdown(idle_timeout, State);
handle_timeout(_TRef, limit_timeout, State) ->
NState = State#state{sockstate = idle,
limit_timer = undefined
},
handle_info(activate_socket, NState);
handle_timeout(TRef, keepalive, State = #state{socket = Socket,
channel = Channel})->
case emqx_exproto_channel:info(conn_state, Channel) of
disconnected -> {ok, State};
_ ->
case esockd_getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} ->
handle_timeout(TRef, {keepalive, RecvOct}, State);
{error, Reason} ->
handle_info({sock_error, Reason}, State)
end
end;
handle_timeout(_TRef, emit_stats, State =
#state{channel = Channel}) ->
case emqx_exproto_channel:info(clientid, Channel) of
undefined ->
ignore;
ClientId ->
emqx_cm:set_chan_stats(ClientId, stats(State))
end,
{ok, State#state{stats_timer = undefined}};
handle_timeout(TRef, Msg, State) ->
with_channel(handle_timeout, [TRef, Msg], State).
%%--------------------------------------------------------------------
%% Parse incoming data
-compile({inline, [process_incoming/2]}).
process_incoming(Data, State = #state{idle_timer = IdleTimer}) ->
?LOG(debug, "RECV ~0p", [Data]),
Oct = iolist_size(Data),
inc_counter(incoming_bytes, Oct),
inc_counter(incoming_pkt, 1),
inc_counter(recv_pkt, 1),
inc_counter(recv_msg, 1),
% TODO:
%ok = emqx_metrics:inc('bytes.received', Oct),
ok = emqx_misc:cancel_timer(IdleTimer),
NState = State#state{idle_timer = undefined},
with_channel(handle_in, [Data], NState).
%%--------------------------------------------------------------------
%% With Channel
with_channel(Fun, Args, State = #state{channel = Channel}) ->
case erlang:apply(emqx_exproto_channel, Fun, Args ++ [Channel]) of
ok -> {ok, State};
{ok, NChannel} ->
{ok, State#state{channel = NChannel}};
{ok, Replies, NChannel} ->
{ok, next_msgs(Replies), State#state{channel = NChannel}};
{shutdown, Reason, NChannel} ->
shutdown(Reason, State#state{channel = NChannel})
end.
%%--------------------------------------------------------------------
%% Handle outgoing packets
handle_outgoing(IoData, State = #state{socket = Socket}) ->
?LOG(debug, "SEND ~0p", [IoData]),
Oct = iolist_size(IoData),
inc_counter(send_pkt, 1),
inc_counter(send_msg, 1),
inc_counter(outgoing_pkt, 1),
inc_counter(outgoing_bytes, Oct),
%% FIXME:
%%ok = emqx_metrics:inc('bytes.sent', Oct),
case send(IoData, State) of
ok -> ok;
Error = {error, _Reason} ->
%% Send an inet_reply to postpone handling the error
self() ! {inet_reply, Socket, Error},
ok
end.
%%--------------------------------------------------------------------
%% Handle Info
handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
case activate_socket(State) of
{ok, NState = #state{sockstate = NewSst}} ->
if OldSst =/= NewSst ->
{ok, {event, NewSst}, NState};
true -> {ok, NState}
end;
{error, Reason} ->
handle_info({sock_error, Reason}, State)
end;
handle_info({sock_error, Reason}, State) ->
?LOG(debug, "Socket error: ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State));
handle_info(Info, State) ->
with_channel(handle_info, [Info], State).
%%--------------------------------------------------------------------
%% Ensure rate limit
ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of
false -> State;
{ok, Limiter1} ->
State#state{limiter = Limiter1};
{pause, Time, Limiter1} ->
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
TRef = start_timer(Time, limit_timeout),
State#state{sockstate = blocked,
limiter = Limiter1,
limit_timer = TRef
}
end.
%%--------------------------------------------------------------------
%% Run GC and Check OOM
run_gc(Stats, State = #state{gc_state = GcSt}) ->
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
false -> State;
{_IsGC, GcSt1} ->
State#state{gc_state = GcSt1}
end.
check_oom(State) ->
OomPolicy = ?DEFAULT_OOM_POLICY,
case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
Shutdown = {shutdown, _Reason} ->
erlang:send(self(), Shutdown);
_Other -> ok
end,
State.
%%--------------------------------------------------------------------
%% Activate Socket
-compile({inline, [activate_socket/1]}).
activate_socket(State = #state{sockstate = closed}) ->
{ok, State};
activate_socket(State = #state{sockstate = blocked}) ->
{ok, State};
activate_socket(State = #state{socket = Socket,
active_n = N}) ->
%% FIXME: Works on dtls/udp ???
%% How to hanlde buffer?
case esockd_setopts(Socket, [{active, N}]) of
ok -> {ok, State#state{sockstate = running}};
Error -> Error
end.
%%--------------------------------------------------------------------
%% Close Socket
close_socket(State = #state{sockstate = closed}) -> State;
close_socket(State = #state{socket = Socket}) ->
ok = esockd_close(Socket),
State#state{sockstate = closed}.
%%--------------------------------------------------------------------
%% Helper functions
-compile({inline, [next_msgs/1]}).
next_msgs(Event) when is_tuple(Event) ->
Event;
next_msgs(More) when is_list(More) ->
More.
-compile({inline, [shutdown/2, shutdown/3]}).
shutdown(Reason, State) ->
stop({shutdown, Reason}, State).
shutdown(Reason, Reply, State) ->
stop({shutdown, Reason}, Reply, State).
-compile({inline, [stop/2, stop/3]}).
stop(Reason, State) ->
{stop, Reason, State}.
stop(Reason, Reply, State) ->
{stop, Reason, Reply, State}.
inc_counter(Name, Value) ->
_ = emqx_pd:inc_counter(Name, Value),
ok.

View File

@ -68,7 +68,7 @@ start_grpc_server(InstaId, Options = #{bind := ListenOn}) ->
SslOpts ->
[{ssl_options, SslOpts}]
end,
grpc:start_server(InstaId, ListenOn, Services, SvrOptions),
_ = grpc:start_server(InstaId, ListenOn, Services, SvrOptions),
io:format("Start ~s gRPC server on ~p successfully.~n",
[InstaId, ListenOn]).
@ -82,7 +82,7 @@ start_grpc_client_channel(InstaId, Options = #{address := UriStr}) ->
"~s://~s:~w", [Scheme, Host, Port])
),
ClientOpts = case Scheme of
https ->
"https" ->
SslOpts = maps:to_list(maps:get(ssl, Options, #{})),
#{gun_opts =>
#{transport => ssl,