1023 lines
30 KiB
Erlang
1023 lines
30 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @doc The behavior abstract for TCP based gateway conn
|
|
-module(emqx_gateway_conn).
|
|
|
|
-include_lib("emqx/include/types.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
%% API
|
|
-export([
|
|
start_link/3,
|
|
stop/1
|
|
]).
|
|
|
|
-export([
|
|
info/1,
|
|
stats/1
|
|
]).
|
|
|
|
-export([
|
|
call/2,
|
|
call/3,
|
|
cast/2
|
|
]).
|
|
|
|
%% Callback
|
|
-export([init/6]).
|
|
|
|
%% Sys callbacks
|
|
-export([
|
|
system_continue/3,
|
|
system_terminate/4,
|
|
system_code_change/4,
|
|
system_get_state/1
|
|
]).
|
|
|
|
%% Internal callback
|
|
-export([wakeup_from_hib/2, recvloop/2]).
|
|
|
|
%% for channel module
|
|
-export([keepalive_stats/1]).
|
|
|
|
-record(state, {
|
|
%% TCP/SSL/UDP/DTLS Wrapped Socket
|
|
socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
|
|
%% Peername of the connection
|
|
peername :: emqx_types:peername(),
|
|
%% Sockname of the connection
|
|
sockname :: emqx_types:peername(),
|
|
%% Sock State
|
|
sockstate :: emqx_types:sockstate(),
|
|
%% The {active, N} option
|
|
active_n :: pos_integer(),
|
|
%% Limiter
|
|
limiter :: maybe(emqx_htb_limiter:limiter()),
|
|
%% Limit Timer
|
|
limit_timer :: maybe(reference()),
|
|
%% Parse State
|
|
parse_state :: emqx_gateway_frame:parse_state(),
|
|
%% Serialize options
|
|
serialize :: emqx_gateway_frame:serialize_opts(),
|
|
%% Channel State
|
|
channel :: emqx_gateway_channel:channel(),
|
|
%% GC State
|
|
gc_state :: maybe(emqx_gc:gc_state()),
|
|
%% Stats Timer
|
|
stats_timer :: disabled | maybe(reference()),
|
|
%% Idle Timeout
|
|
idle_timeout :: integer(),
|
|
%% Idle Timer
|
|
idle_timer :: maybe(reference()),
|
|
%% OOM Policy
|
|
oom_policy :: maybe(emqx_types:oom_policy()),
|
|
%% Frame Module
|
|
frame_mod :: atom(),
|
|
%% Channel Module
|
|
chann_mod :: atom(),
|
|
%% Listener Tag
|
|
listener :: listener() | undefined
|
|
}).
|
|
|
|
-type listener() :: {GwName :: atom(), LisType :: atom(), LisName :: atom()}.
|
|
-type state() :: #state{}.
|
|
|
|
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
|
|
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
|
|
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
|
|
|
|
-define(ENABLED(X), (X =/= undefined)).
|
|
|
|
-dialyzer(
|
|
{nowarn_function, [
|
|
system_terminate/4,
|
|
handle_call/3,
|
|
handle_msg/2,
|
|
shutdown/3,
|
|
stop/3,
|
|
parse_incoming/3
|
|
]}
|
|
).
|
|
|
|
%% udp
|
|
start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) ->
|
|
Args = [self(), Socket, Peername, Options] ++ callback_modules(Options),
|
|
{ok, proc_lib:spawn_link(?MODULE, init, Args)};
|
|
%% tcp/ssl/dtls
|
|
start_link(esockd_transport, Sock, Options) ->
|
|
Socket = {esockd_transport, Sock},
|
|
Args = [self(), Socket, undefined, Options] ++ callback_modules(Options),
|
|
{ok, proc_lib:spawn_link(?MODULE, init, Args)}.
|
|
|
|
callback_modules(Options) ->
|
|
[
|
|
maps:get(frame_mod, Options),
|
|
maps:get(chann_mod, Options)
|
|
].
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% API
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @doc Get infos of the connection/channel.
|
|
-spec info(pid() | state()) -> emqx_types:infos().
|
|
info(CPid) when is_pid(CPid) ->
|
|
call(CPid, info);
|
|
info(State = #state{chann_mod = ChannMod, channel = Channel}) ->
|
|
ChanInfo = ChannMod:info(Channel),
|
|
SockInfo = maps:from_list(
|
|
info(?INFO_KEYS, State)
|
|
),
|
|
ChanInfo#{sockinfo => SockInfo}.
|
|
|
|
info(Keys, State) when is_list(Keys) ->
|
|
[{Key, info(Key, State)} || Key <- Keys];
|
|
info(socktype, #state{socket = Socket}) ->
|
|
esockd_type(Socket);
|
|
info(peername, #state{peername = Peername}) ->
|
|
Peername;
|
|
info(sockname, #state{sockname = Sockname}) ->
|
|
Sockname;
|
|
info(sockstate, #state{sockstate = SockSt}) ->
|
|
SockSt;
|
|
info(active_n, #state{active_n = ActiveN}) ->
|
|
ActiveN.
|
|
|
|
-spec stats(pid() | state()) -> emqx_types:stats().
|
|
stats(CPid) when is_pid(CPid) ->
|
|
call(CPid, stats);
|
|
stats(#state{
|
|
socket = Socket,
|
|
chann_mod = ChannMod,
|
|
channel = Channel
|
|
}) ->
|
|
SockStats =
|
|
case esockd_getstat(Socket, ?SOCK_STATS) of
|
|
{ok, Ss} -> Ss;
|
|
{error, _} -> []
|
|
end,
|
|
ConnStats = emqx_pd:get_counters(?CONN_STATS),
|
|
ChanStats = ChannMod:stats(Channel),
|
|
ProcStats = emqx_utils:proc_stats(),
|
|
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
|
|
|
|
call(Pid, Req) ->
|
|
call(Pid, Req, infinity).
|
|
|
|
call(Pid, Req, Timeout) ->
|
|
gen_server:call(Pid, Req, Timeout).
|
|
|
|
cast(Pid, Req) ->
|
|
gen_server:cast(Pid, Req).
|
|
|
|
stop(Pid) ->
|
|
gen_server:stop(Pid).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Wrapped funcs
|
|
%%--------------------------------------------------------------------
|
|
|
|
esockd_peername({udp, _SockPid, _Sock}, Peername) ->
|
|
Peername;
|
|
esockd_peername({esockd_transport, Sock}, _Peername) ->
|
|
{ok, Peername} = esockd_transport:ensure_ok_or_exit(peername, [Sock]),
|
|
Peername.
|
|
|
|
esockd_wait(Socket = {udp, _SockPid, _Sock}) ->
|
|
{ok, Socket};
|
|
esockd_wait({esockd_transport, Sock}) ->
|
|
case esockd_transport:wait(Sock) of
|
|
{ok, NSock} -> {ok, {esockd_transport, NSock}};
|
|
R = {error, _} -> R
|
|
end.
|
|
|
|
esockd_close({udp, _SockPid, _Sock}) ->
|
|
%% nothing to do for udp socket
|
|
%%gen_udp:close(Sock);
|
|
ok;
|
|
esockd_close({esockd_transport, Sock}) ->
|
|
esockd_transport:fast_close(Sock).
|
|
|
|
esockd_ensure_ok_or_exit(peercert, {udp, _SockPid, _Sock}) ->
|
|
nossl;
|
|
esockd_ensure_ok_or_exit(Fun, {udp, _SockPid, Sock}) ->
|
|
esockd_transport:ensure_ok_or_exit(Fun, [Sock]);
|
|
esockd_ensure_ok_or_exit(Fun, {esockd_transport, Socket}) ->
|
|
esockd_transport:ensure_ok_or_exit(Fun, [Socket]).
|
|
|
|
esockd_type({udp, _, _}) ->
|
|
udp;
|
|
esockd_type({esockd_transport, Socket}) ->
|
|
esockd_transport:type(Socket).
|
|
|
|
esockd_setopts({udp, _, _}, _) ->
|
|
ok;
|
|
esockd_setopts({esockd_transport, Socket}, Opts) ->
|
|
%% FIXME: DTLS works??
|
|
esockd_transport:setopts(Socket, Opts).
|
|
|
|
esockd_getstat({udp, _SockPid, Sock}, Stats) ->
|
|
inet:getstat(Sock, Stats);
|
|
esockd_getstat({esockd_transport, Sock}, Stats) ->
|
|
esockd_transport:getstat(Sock, Stats).
|
|
|
|
esockd_send(Data, #state{
|
|
socket = {udp, _SockPid, Sock},
|
|
peername = {Ip, Port}
|
|
}) ->
|
|
gen_udp:send(Sock, Ip, Port, Data);
|
|
esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
|
|
esockd_transport:async_send(Sock, Data).
|
|
|
|
keepalive_stats(recv) ->
|
|
emqx_pd:get_counter(recv_pkt);
|
|
keepalive_stats(send) ->
|
|
emqx_pd:get_counter(send_pkt).
|
|
|
|
is_datadram_socket({esockd_transport, _}) -> false;
|
|
is_datadram_socket({udp, _, _}) -> true.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
init(Parent, WrappedSock, Peername0, Options, FrameMod, ChannMod) ->
|
|
case esockd_wait(WrappedSock) of
|
|
{ok, NWrappedSock} ->
|
|
Peername = esockd_peername(NWrappedSock, Peername0),
|
|
run_loop(
|
|
Parent,
|
|
init_state(
|
|
NWrappedSock,
|
|
Peername,
|
|
Options,
|
|
FrameMod,
|
|
ChannMod
|
|
)
|
|
);
|
|
{error, Reason} ->
|
|
ok = esockd_close(WrappedSock),
|
|
exit_on_sock_error(Reason)
|
|
end.
|
|
|
|
init_state(WrappedSock, Peername, Options, FrameMod, ChannMod) ->
|
|
{ok, Sockname} = esockd_ensure_ok_or_exit(sockname, WrappedSock),
|
|
Peercert = esockd_ensure_ok_or_exit(peercert, WrappedSock),
|
|
ConnInfo = #{
|
|
socktype => esockd_type(WrappedSock),
|
|
peername => Peername,
|
|
sockname => Sockname,
|
|
peercert => Peercert,
|
|
conn_mod => ?MODULE
|
|
},
|
|
ActiveN = emqx_gateway_utils:active_n(Options),
|
|
%% FIXME: TODO
|
|
%%Limiter = emqx_limiter:init(Options),
|
|
Limiter = undefined,
|
|
FrameOpts = emqx_gateway_utils:frame_options(Options),
|
|
ParseState = FrameMod:initial_parse_state(FrameOpts),
|
|
Serialize = FrameMod:serialize_opts(),
|
|
Channel = ChannMod:init(ConnInfo, Options),
|
|
GcState = emqx_gateway_utils:init_gc_state(Options),
|
|
StatsTimer = emqx_gateway_utils:stats_timer(Options),
|
|
IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
|
|
OomPolicy = emqx_gateway_utils:oom_policy(Options),
|
|
IdleTimer = emqx_utils:start_timer(IdleTimeout, idle_timeout),
|
|
#state{
|
|
socket = WrappedSock,
|
|
peername = Peername,
|
|
sockname = Sockname,
|
|
sockstate = idle,
|
|
active_n = ActiveN,
|
|
limiter = Limiter,
|
|
parse_state = ParseState,
|
|
serialize = Serialize,
|
|
channel = Channel,
|
|
gc_state = GcState,
|
|
stats_timer = StatsTimer,
|
|
idle_timeout = IdleTimeout,
|
|
idle_timer = IdleTimer,
|
|
oom_policy = OomPolicy,
|
|
frame_mod = FrameMod,
|
|
chann_mod = ChannMod,
|
|
listener = maps:get(listener, Options, undefined)
|
|
}.
|
|
|
|
run_loop(
|
|
Parent,
|
|
State = #state{
|
|
socket = Socket,
|
|
peername = Peername,
|
|
oom_policy = OomPolicy
|
|
}
|
|
) ->
|
|
emqx_logger:set_metadata_peername(esockd:format(Peername)),
|
|
_ = emqx_utils:tune_heap_size(OomPolicy),
|
|
case activate_socket(State) of
|
|
{ok, NState} ->
|
|
hibernate(Parent, NState);
|
|
{error, Reason} ->
|
|
ok = esockd_close(Socket),
|
|
exit_on_sock_error(Reason)
|
|
end.
|
|
|
|
-spec exit_on_sock_error(atom()) -> no_return().
|
|
exit_on_sock_error(Reason) when
|
|
Reason =:= einval;
|
|
Reason =:= enotconn;
|
|
Reason =:= closed
|
|
->
|
|
erlang:exit(normal);
|
|
exit_on_sock_error(timeout) ->
|
|
erlang:exit({shutdown, ssl_upgrade_timeout});
|
|
exit_on_sock_error(Reason) ->
|
|
erlang:exit({shutdown, Reason}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Recv Loop
|
|
|
|
recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
|
receive
|
|
Msg ->
|
|
handle_recv(Msg, Parent, State)
|
|
after IdleTimeout + 100 ->
|
|
hibernate(Parent, cancel_stats_timer(State))
|
|
end.
|
|
|
|
handle_recv({system, From, Request}, Parent, State) ->
|
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
|
|
handle_recv({'EXIT', Parent, Reason}, Parent, State) ->
|
|
%% FIXME: it's not trapping exit, should never receive an EXIT
|
|
terminate(Reason, State);
|
|
handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
|
case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of
|
|
{ok, NewState} ->
|
|
?MODULE:recvloop(Parent, NewState);
|
|
{stop, Reason, NewSate} ->
|
|
terminate(Reason, NewSate)
|
|
end.
|
|
|
|
hibernate(Parent, State) ->
|
|
proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).
|
|
|
|
%% Maybe do something here later.
|
|
wakeup_from_hib(Parent, State) ->
|
|
?MODULE:recvloop(Parent, State).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Ensure/cancel stats timer
|
|
|
|
ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
|
|
State#state{stats_timer = emqx_utils:start_timer(Timeout, emit_stats)};
|
|
ensure_stats_timer(_Timeout, State) ->
|
|
State.
|
|
|
|
cancel_stats_timer(State = #state{stats_timer = TRef}) when
|
|
is_reference(TRef)
|
|
->
|
|
ok = emqx_utils:cancel_timer(TRef),
|
|
State#state{stats_timer = undefined};
|
|
cancel_stats_timer(State) ->
|
|
State.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Process next Msg
|
|
|
|
process_msg([], State) ->
|
|
{ok, State};
|
|
process_msg([Msg | More], State) ->
|
|
try
|
|
case handle_msg(Msg, State) of
|
|
ok ->
|
|
process_msg(More, State);
|
|
{ok, NState} ->
|
|
process_msg(More, NState);
|
|
{ok, Msgs, NState} ->
|
|
process_msg(append_msg(More, Msgs), NState);
|
|
{stop, Reason, NState} ->
|
|
{stop, Reason, NState}
|
|
end
|
|
catch
|
|
exit:normal ->
|
|
{stop, normal, State};
|
|
exit:shutdown ->
|
|
{stop, shutdown, State};
|
|
exit:{shutdown, _} = Shutdown ->
|
|
{stop, Shutdown, State};
|
|
Exception:Context:Stack ->
|
|
{stop,
|
|
#{
|
|
exception => Exception,
|
|
context => Context,
|
|
stacktrace => Stack
|
|
},
|
|
State}
|
|
end.
|
|
|
|
append_msg([], Msgs) when is_list(Msgs) ->
|
|
Msgs;
|
|
append_msg([], Msg) ->
|
|
[Msg];
|
|
append_msg(Q, Msgs) when is_list(Msgs) ->
|
|
lists:append(Q, Msgs);
|
|
append_msg(Q, Msg) ->
|
|
lists:append(Q, [Msg]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle a Msg
|
|
|
|
handle_msg({'$gen_call', From, Req}, State) ->
|
|
case handle_call(From, Req, State) of
|
|
{noreply, NState} ->
|
|
{ok, NState};
|
|
{noreply, Msgs, NState} ->
|
|
{ok, next_msgs(Msgs), NState};
|
|
{reply, Reply, NState} ->
|
|
gen_server:reply(From, Reply),
|
|
{ok, NState};
|
|
{reply, Reply, Msgs, NState} ->
|
|
gen_server:reply(From, Reply),
|
|
{ok, next_msgs(Msgs), NState};
|
|
{stop, Reason, Reply, NState} ->
|
|
gen_server:reply(From, Reply),
|
|
stop(Reason, NState)
|
|
end;
|
|
handle_msg({'$gen_cast', Req}, State) ->
|
|
with_channel(handle_cast, [Req], State);
|
|
handle_msg({datagram, _SockPid, Data}, State) ->
|
|
parse_incoming(Data, State);
|
|
handle_msg({Inet, _Sock, Data}, State) when
|
|
Inet == tcp;
|
|
Inet == ssl
|
|
->
|
|
parse_incoming(Data, State);
|
|
handle_msg(
|
|
{incoming, Packet},
|
|
State = #state{idle_timer = IdleTimer}
|
|
) ->
|
|
IdleTimer /= undefined andalso
|
|
emqx_utils:cancel_timer(IdleTimer),
|
|
NState = State#state{idle_timer = undefined},
|
|
handle_incoming(Packet, NState);
|
|
handle_msg({outgoing, Data}, State) ->
|
|
handle_outgoing(Data, State);
|
|
handle_msg({Error, _Sock, Reason}, State) when
|
|
Error == tcp_error; Error == ssl_error
|
|
->
|
|
handle_info({sock_error, Reason}, State);
|
|
handle_msg({Closed, _Sock}, State) when
|
|
Closed == tcp_closed; Closed == ssl_closed
|
|
->
|
|
handle_info({sock_closed, Closed}, close_socket(State));
|
|
%% TODO: udp_passive???
|
|
handle_msg({Passive, _Sock}, State) when
|
|
Passive == tcp_passive; Passive == ssl_passive
|
|
->
|
|
%% In Stats
|
|
Bytes = emqx_pd:reset_counter(incoming_bytes),
|
|
Pubs = emqx_pd:reset_counter(incoming_pkt),
|
|
InStats = #{cnt => Pubs, oct => Bytes},
|
|
%% Ensure Rate Limit
|
|
NState = ensure_rate_limit(InStats, State),
|
|
%% Run GC and Check OOM
|
|
NState1 = check_oom(run_gc(InStats, NState)),
|
|
handle_info(activate_socket, NState1);
|
|
handle_msg(
|
|
Deliver = {deliver, _Topic, _Msg},
|
|
State = #state{active_n = ActiveN}
|
|
) ->
|
|
Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)],
|
|
with_channel(handle_deliver, [Delivers], State);
|
|
%% Something sent
|
|
%% TODO: Who will deliver this message?
|
|
handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
|
|
case emqx_pd:get_counter(outgoing_pkt) > ActiveN of
|
|
true ->
|
|
Pubs = emqx_pd:reset_counter(outgoing_pkt),
|
|
Bytes = emqx_pd:reset_counter(outgoing_bytes),
|
|
OutStats = #{cnt => Pubs, oct => Bytes},
|
|
{ok, check_oom(run_gc(OutStats, State))};
|
|
false ->
|
|
ok
|
|
end;
|
|
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
|
|
handle_info({sock_error, Reason}, State);
|
|
handle_msg({close, Reason}, State) ->
|
|
?SLOG(debug, #{msg => "force_socket_close", reason => Reason}),
|
|
handle_info({sock_closed, Reason}, close_socket(State));
|
|
handle_msg(
|
|
{event, connected},
|
|
State = #state{
|
|
chann_mod = ChannMod,
|
|
channel = Channel
|
|
}
|
|
) ->
|
|
Ctx = ChannMod:info(ctx, Channel),
|
|
ClientId = ChannMod:info(clientid, Channel),
|
|
emqx_gateway_ctx:insert_channel_info(
|
|
Ctx,
|
|
ClientId,
|
|
info(State),
|
|
stats(State)
|
|
);
|
|
handle_msg(
|
|
{event, disconnected},
|
|
State = #state{
|
|
chann_mod = ChannMod,
|
|
channel = Channel
|
|
}
|
|
) ->
|
|
Ctx = ChannMod:info(ctx, Channel),
|
|
ClientId = ChannMod:info(clientid, Channel),
|
|
emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)),
|
|
emqx_gateway_ctx:connection_closed(Ctx, ClientId),
|
|
{ok, State};
|
|
handle_msg(
|
|
{event, _Other},
|
|
State = #state{
|
|
chann_mod = ChannMod,
|
|
channel = Channel
|
|
}
|
|
) ->
|
|
Ctx = ChannMod:info(ctx, Channel),
|
|
ClientId = ChannMod:info(clientid, Channel),
|
|
emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)),
|
|
emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)),
|
|
{ok, State};
|
|
handle_msg({timeout, TRef, TMsg}, State) ->
|
|
handle_timeout(TRef, TMsg, State);
|
|
handle_msg(Shutdown = {shutdown, _Reason}, State) ->
|
|
stop(Shutdown, State);
|
|
handle_msg(Msg, State) ->
|
|
handle_info(Msg, State).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Terminate
|
|
|
|
-spec terminate(atom(), state()) -> no_return().
|
|
terminate(
|
|
Reason,
|
|
State = #state{
|
|
chann_mod = ChannMod,
|
|
channel = Channel
|
|
}
|
|
) ->
|
|
_ = ChannMod:terminate(Reason, Channel),
|
|
_ = close_socket(State),
|
|
ClientId =
|
|
try ChannMod:info(clientid, Channel) of
|
|
Id -> Id
|
|
catch
|
|
_:_ -> undefined
|
|
end,
|
|
?tp(debug, conn_process_terminated, #{reason => Reason, clientid => ClientId}),
|
|
exit(Reason).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Sys callbacks
|
|
|
|
system_continue(Parent, _Debug, State) ->
|
|
?MODULE:recvloop(Parent, State).
|
|
|
|
system_terminate(Reason, _Parent, _Debug, State) ->
|
|
terminate(Reason, State).
|
|
|
|
system_code_change(State, _Mod, _OldVsn, _Extra) ->
|
|
{ok, State}.
|
|
|
|
system_get_state(State) -> {ok, State}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle call
|
|
|
|
handle_call(_From, info, State) ->
|
|
{reply, info(State), State};
|
|
handle_call(_From, stats, State) ->
|
|
{reply, stats(State), State};
|
|
handle_call(
|
|
From,
|
|
Req,
|
|
State = #state{
|
|
chann_mod = ChannMod,
|
|
channel = Channel
|
|
}
|
|
) ->
|
|
case ChannMod:handle_call(Req, From, Channel) of
|
|
{noreply, NChannel} ->
|
|
{noreply, State#state{channel = NChannel}};
|
|
{noreply, Msgs, NChannel} ->
|
|
{noreply, Msgs, State#state{channel = NChannel}};
|
|
{reply, Reply, NChannel} ->
|
|
{reply, Reply, State#state{channel = NChannel}};
|
|
{reply, Reply, Msgs, NChannel} ->
|
|
{reply, Reply, Msgs, State#state{channel = NChannel}};
|
|
{shutdown, Reason, Reply, NChannel} ->
|
|
shutdown(Reason, Reply, State#state{channel = NChannel});
|
|
{shutdown, Reason, Reply, Packet, NChannel} ->
|
|
NState = State#state{channel = NChannel},
|
|
ok = handle_outgoing(Packet, NState),
|
|
shutdown(Reason, Reply, NState)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle timeout
|
|
|
|
handle_timeout(_TRef, idle_timeout, State) ->
|
|
shutdown(idle_timeout, State);
|
|
handle_timeout(_TRef, limit_timeout, State) ->
|
|
NState = State#state{
|
|
sockstate = idle,
|
|
limit_timer = undefined
|
|
},
|
|
handle_info(activate_socket, NState);
|
|
handle_timeout(
|
|
TRef,
|
|
Keepalive,
|
|
State = #state{
|
|
chann_mod = ChannMod,
|
|
channel = Channel
|
|
}
|
|
) when
|
|
Keepalive == keepalive;
|
|
Keepalive == keepalive_send
|
|
->
|
|
StatVal =
|
|
case Keepalive of
|
|
keepalive -> keepalive_stats(recv);
|
|
keepalive_send -> keepalive_stats(send)
|
|
end,
|
|
case ChannMod:info(conn_state, Channel) of
|
|
disconnected ->
|
|
{ok, State};
|
|
_ ->
|
|
handle_timeout(TRef, {Keepalive, StatVal}, State)
|
|
end;
|
|
handle_timeout(
|
|
_TRef,
|
|
emit_stats,
|
|
State =
|
|
#state{chann_mod = ChannMod, channel = Channel}
|
|
) ->
|
|
Ctx = ChannMod:info(ctx, Channel),
|
|
ClientId = ChannMod:info(clientid, Channel),
|
|
emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)),
|
|
{ok, State#state{stats_timer = undefined}};
|
|
handle_timeout(TRef, Msg, State) ->
|
|
with_channel(handle_timeout, [TRef, Msg], State).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Parse incoming data
|
|
|
|
parse_incoming(
|
|
Data,
|
|
State = #state{
|
|
chann_mod = ChannMod,
|
|
channel = Channel
|
|
}
|
|
) ->
|
|
?SLOG(debug, #{msg => "RECV_data", data => Data}),
|
|
Oct = iolist_size(Data),
|
|
inc_counter(incoming_bytes, Oct),
|
|
Ctx = ChannMod:info(ctx, Channel),
|
|
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.received', Oct),
|
|
{Packets, NState} = parse_incoming(Data, [], State),
|
|
{ok, next_incoming_msgs(Packets), NState}.
|
|
|
|
parse_incoming(<<>>, Packets, State) ->
|
|
{Packets, State};
|
|
parse_incoming(
|
|
Data,
|
|
Packets,
|
|
State = #state{
|
|
frame_mod = FrameMod,
|
|
parse_state = ParseState
|
|
}
|
|
) ->
|
|
try FrameMod:parse(Data, ParseState) of
|
|
{more, NParseState} ->
|
|
{Packets, State#state{parse_state = NParseState}};
|
|
{ok, Packet, Rest, NParseState} ->
|
|
NState = State#state{parse_state = NParseState},
|
|
parse_incoming(Rest, [Packet | Packets], NState)
|
|
catch
|
|
error:Reason:Stk ->
|
|
?SLOG(error, #{
|
|
msg => "parse_frame_failed",
|
|
at_state => ParseState,
|
|
input_bytes => Data,
|
|
reason => Reason,
|
|
stacktrace => Stk
|
|
}),
|
|
{[{frame_error, Reason} | Packets], State}
|
|
end.
|
|
|
|
next_incoming_msgs([Packet]) ->
|
|
{incoming, Packet};
|
|
next_incoming_msgs(Packets) ->
|
|
[{incoming, Packet} || Packet <- lists:reverse(Packets)].
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle incoming packet
|
|
|
|
handle_incoming(
|
|
Packet,
|
|
State = #state{
|
|
channel = Channel,
|
|
frame_mod = FrameMod,
|
|
chann_mod = ChannMod
|
|
}
|
|
) ->
|
|
Ctx = ChannMod:info(ctx, Channel),
|
|
ok = inc_incoming_stats(Ctx, FrameMod, Packet),
|
|
?SLOG(debug, #{
|
|
msg => "RECV_packet",
|
|
packet => FrameMod:format(Packet)
|
|
}),
|
|
with_channel(handle_in, [Packet], State).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% With Channel
|
|
|
|
with_channel(
|
|
Fun,
|
|
Args,
|
|
State = #state{
|
|
chann_mod = ChannMod,
|
|
channel = Channel
|
|
}
|
|
) ->
|
|
case erlang:apply(ChannMod, Fun, Args ++ [Channel]) of
|
|
ok ->
|
|
{ok, State};
|
|
{ok, NChannel} ->
|
|
{ok, State#state{channel = NChannel}};
|
|
{ok, Replies, NChannel} ->
|
|
{ok, next_msgs(Replies), State#state{channel = NChannel}};
|
|
{shutdown, Reason, NChannel} ->
|
|
shutdown(Reason, State#state{channel = NChannel});
|
|
{shutdown, Reason, Packet, NChannel} ->
|
|
NState = State#state{channel = NChannel},
|
|
ok = handle_outgoing(Packet, NState),
|
|
shutdown(Reason, NState)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle outgoing packets
|
|
|
|
handle_outgoing(_Packets = [], _State) ->
|
|
ok;
|
|
handle_outgoing(
|
|
Packets,
|
|
State = #state{socket = Socket}
|
|
) when is_list(Packets) ->
|
|
case is_datadram_socket(Socket) of
|
|
false ->
|
|
send(
|
|
lists:map(serialize_and_inc_stats_fun(State), Packets),
|
|
State
|
|
);
|
|
_ ->
|
|
lists:foreach(
|
|
fun(Packet) ->
|
|
handle_outgoing(Packet, State)
|
|
end,
|
|
Packets
|
|
)
|
|
end;
|
|
handle_outgoing(Packet, State) ->
|
|
send((serialize_and_inc_stats_fun(State))(Packet), State).
|
|
|
|
serialize_and_inc_stats_fun(#state{
|
|
frame_mod = FrameMod,
|
|
chann_mod = ChannMod,
|
|
serialize = Serialize,
|
|
channel = Channel
|
|
}) ->
|
|
Ctx = ChannMod:info(ctx, Channel),
|
|
fun(Packet) ->
|
|
try
|
|
Data = FrameMod:serialize_pkt(Packet, Serialize),
|
|
?SLOG(debug, #{
|
|
msg => "SEND_packet",
|
|
%% XXX: optimize it, less cpu comsuption?
|
|
packet => FrameMod:format(Packet)
|
|
}),
|
|
ok = inc_outgoing_stats(Ctx, FrameMod, Packet),
|
|
Data
|
|
catch
|
|
_:too_large ->
|
|
?SLOG(warning, #{
|
|
msg => "packet_too_large_discarded",
|
|
packet => FrameMod:format(Packet)
|
|
}),
|
|
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'),
|
|
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
|
|
<<>>;
|
|
_:Reason ->
|
|
?SLOG(warning, #{
|
|
msg => "packet_serialize_failure",
|
|
reason => Reason,
|
|
packet => FrameMod:format(Packet)
|
|
}),
|
|
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
|
|
<<>>
|
|
end
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Send data
|
|
|
|
-spec send(iodata(), state()) -> ok.
|
|
send(
|
|
IoData,
|
|
State = #state{
|
|
socket = Socket,
|
|
chann_mod = ChannMod,
|
|
channel = Channel
|
|
}
|
|
) ->
|
|
?SLOG(debug, #{msg => "SEND_data", data => IoData}),
|
|
Ctx = ChannMod:info(ctx, Channel),
|
|
Oct = iolist_size(IoData),
|
|
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.sent', Oct),
|
|
inc_counter(outgoing_bytes, Oct),
|
|
case esockd_send(IoData, State) of
|
|
ok ->
|
|
ok;
|
|
Error = {error, _Reason} ->
|
|
%% Send an inet_reply to postpone handling the error
|
|
self() ! {inet_reply, Socket, Error},
|
|
ok
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle Info
|
|
|
|
handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
|
|
case activate_socket(State) of
|
|
{ok, NState = #state{sockstate = NewSst}} ->
|
|
if
|
|
OldSst =/= NewSst ->
|
|
{ok, {event, NewSst}, NState};
|
|
true ->
|
|
{ok, NState}
|
|
end;
|
|
{error, Reason} ->
|
|
handle_info({sock_error, Reason}, State)
|
|
end;
|
|
handle_info({sock_error, Reason}, State) ->
|
|
?SLOG(debug, #{
|
|
msg => "sock_error",
|
|
reason => Reason
|
|
}),
|
|
handle_info({sock_closed, Reason}, close_socket(State));
|
|
handle_info(Info, State) ->
|
|
with_channel(handle_info, [Info], State).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Ensure rate limit
|
|
|
|
%% ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
|
%% case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of
|
|
%% false ->
|
|
%% State;
|
|
%% {ok, Limiter1} ->
|
|
%% State#state{limiter = Limiter1};
|
|
%% {pause, Time, Limiter1} ->
|
|
%% %% XXX: which limiter reached?
|
|
%% ?SLOG(warning, #{
|
|
%% msg => "reach_rate_limit",
|
|
%% pause => Time
|
|
%% }),
|
|
%% TRef = emqx_utils:start_timer(Time, limit_timeout),
|
|
%% State#state{
|
|
%% sockstate = blocked,
|
|
%% limiter = Limiter1,
|
|
%% limit_timer = TRef
|
|
%% }
|
|
%% end.
|
|
|
|
%% TODO
|
|
%% Why do we need this?
|
|
%% Why not use the esockd connection limiter (based on emqx_htb_limiter) directly?
|
|
ensure_rate_limit(_Stats, State) ->
|
|
State.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Run GC and Check OOM
|
|
|
|
run_gc(Stats, State = #state{gc_state = GcSt}) ->
|
|
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
|
|
false -> State;
|
|
{_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
|
|
end.
|
|
|
|
check_oom(State = #state{oom_policy = OomPolicy}) ->
|
|
case ?ENABLED(OomPolicy) andalso emqx_utils:check_oom(OomPolicy) of
|
|
{shutdown, Reason} ->
|
|
%% triggers terminate/2 callback immediately
|
|
erlang:exit({shutdown, Reason});
|
|
_Other ->
|
|
ok
|
|
end,
|
|
State.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Activate Socket
|
|
|
|
activate_socket(State = #state{sockstate = closed}) ->
|
|
{ok, State};
|
|
activate_socket(State = #state{sockstate = blocked}) ->
|
|
{ok, State};
|
|
activate_socket(
|
|
State = #state{
|
|
socket = Socket,
|
|
active_n = N
|
|
}
|
|
) ->
|
|
%% FIXME: Works on dtls/udp ???
|
|
%% How to handle buffer?
|
|
case esockd_setopts(Socket, [{active, N}]) of
|
|
ok -> {ok, State#state{sockstate = running}};
|
|
Error -> Error
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Close Socket
|
|
|
|
close_socket(State = #state{sockstate = closed}) ->
|
|
State;
|
|
close_socket(State = #state{socket = Socket}) ->
|
|
ok = esockd_close(Socket),
|
|
State#state{sockstate = closed}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Inc incoming/outgoing stats
|
|
|
|
inc_incoming_stats(Ctx, FrameMod, Packet) ->
|
|
inc_counter(recv_pkt, 1),
|
|
case FrameMod:is_message(Packet) of
|
|
true ->
|
|
inc_counter(recv_msg, 1),
|
|
inc_counter(incoming_pubs, 1);
|
|
false ->
|
|
ok
|
|
end,
|
|
Name = list_to_atom(
|
|
lists:concat(["packets.", FrameMod:type(Packet), ".received"])
|
|
),
|
|
emqx_gateway_ctx:metrics_inc(Ctx, Name).
|
|
|
|
inc_outgoing_stats(Ctx, FrameMod, Packet) ->
|
|
inc_counter(send_pkt, 1),
|
|
case FrameMod:is_message(Packet) of
|
|
true ->
|
|
inc_counter(send_msg, 1),
|
|
inc_counter(outgoing_pubs, 1);
|
|
false ->
|
|
ok
|
|
end,
|
|
Name = list_to_atom(
|
|
lists:concat(["packets.", FrameMod:type(Packet), ".sent"])
|
|
),
|
|
emqx_gateway_ctx:metrics_inc(Ctx, Name).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Helper functions
|
|
|
|
next_msgs(Event) when is_tuple(Event) ->
|
|
Event;
|
|
next_msgs(More) when is_list(More) ->
|
|
More.
|
|
|
|
shutdown(Reason, State) ->
|
|
stop({shutdown, Reason}, State).
|
|
|
|
shutdown(Reason, Reply, State) ->
|
|
stop({shutdown, Reason}, Reply, State).
|
|
|
|
stop(Reason, State) ->
|
|
{stop, Reason, State}.
|
|
|
|
stop(Reason, Reply, State) ->
|
|
{stop, Reason, Reply, State}.
|
|
|
|
inc_counter(Name, Value) ->
|
|
_ = emqx_pd:inc_counter(Name, Value),
|
|
ok.
|