refactor(gw): unify the connection process module
This commit is contained in:
parent
c6b3447598
commit
ef6a38bfd2
|
@ -0,0 +1,97 @@
|
||||||
|
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc The Gateway channel behavior
|
||||||
|
%%
|
||||||
|
%% This module does not export any functions at the moment.
|
||||||
|
%% It is only used to standardize the implement of emqx_foo_channel.erl
|
||||||
|
%% module if it integrated with emqx_gateway_conn module
|
||||||
|
-module(emqx_gateway_channel).
|
||||||
|
|
||||||
|
-type channel() :: any().
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Info & Stats
|
||||||
|
|
||||||
|
%% @doc Get the channel detailed infomation.
|
||||||
|
-callback info(channel()) -> emqx_types:infos().
|
||||||
|
|
||||||
|
-callback info(Key :: atom() | [atom()], channel()) -> any().
|
||||||
|
|
||||||
|
%% @doc Get the channel statistic items
|
||||||
|
-callback stats(channel()) -> emqx_types:stats().
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Init
|
||||||
|
|
||||||
|
%% @doc Initialize the channel state
|
||||||
|
-callback init(emqx_types:conniinfo(), map()) -> channel().
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Handles
|
||||||
|
|
||||||
|
-type conn_state() :: idle | connecting | connected | disconnected | atom().
|
||||||
|
|
||||||
|
-type reply() :: {outgoing, emqx_gateway_frame:packet()}
|
||||||
|
| {outgoing, [emqx_gateway_frame:packet()]}
|
||||||
|
| {event, conn_state() | updated}
|
||||||
|
| {close, Reason :: atom()}.
|
||||||
|
|
||||||
|
-type replies() :: emqx_gateway_frame:packet() | reply() | [reply()].
|
||||||
|
|
||||||
|
%% @doc Handle the incoming frame
|
||||||
|
-callback handle_in(emqx_gateway_frame:frame() | {frame_error, any()},
|
||||||
|
channel())
|
||||||
|
-> {ok, channel()}
|
||||||
|
| {ok, replies(), channel()}
|
||||||
|
| {shutdown, Reason :: any(), channel()}
|
||||||
|
| {shutdown, Reason :: any(), replies(), channel()}.
|
||||||
|
|
||||||
|
%% @doc Handle the outgoing messages dispatched from PUB/SUB system
|
||||||
|
-callback handle_deliver(list(emqx_types:deliver()), channel())
|
||||||
|
-> {ok, channel()}
|
||||||
|
| {ok, replies(), channel()}.
|
||||||
|
|
||||||
|
%% @doc Handle the timeout event
|
||||||
|
-callback handle_timeout(reference(), Msg :: any(), channel())
|
||||||
|
-> {ok, channel()}
|
||||||
|
| {ok, replies(), channel()}
|
||||||
|
| {shutdown, Reason :: any(), channel()}.
|
||||||
|
|
||||||
|
%% @doc Handle the custom gen_server:call/2 for its connection process
|
||||||
|
-callback handle_call(Req :: any(), channel())
|
||||||
|
-> {reply, Reply :: any(), channel()}
|
||||||
|
| {shutdown, Reason :: any(), Reply :: any(), channel()}
|
||||||
|
| {shutdown, Reason :: any(), Reply :: any(),
|
||||||
|
emqx_gateway_frame:frame(), channel()}.
|
||||||
|
|
||||||
|
%% @doc Handle the custom gen_server:cast/2 for its connection process
|
||||||
|
-callback handle_cast(Req :: any(), channel())
|
||||||
|
-> ok
|
||||||
|
| {ok, channel()}
|
||||||
|
| {shutdown, Reason :: any(), channel()}.
|
||||||
|
|
||||||
|
%% @doc Handle the custom process messages for its connection process
|
||||||
|
-callback handle_info(Info :: any(), channel())
|
||||||
|
-> ok
|
||||||
|
| {ok, channel()}
|
||||||
|
| {shutdown, Reason :: any(), channel()}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Terminate
|
||||||
|
|
||||||
|
%% @doc The callback for process terminated
|
||||||
|
-callback terminate(any(), channel()) -> ok.
|
||||||
|
|
|
@ -15,8 +15,827 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc The behavior abstrat for TCP based gateway conn
|
%% @doc The behavior abstrat for TCP based gateway conn
|
||||||
%%
|
|
||||||
-module(emqx_gateway_conn).
|
-module(emqx_gateway_conn).
|
||||||
|
|
||||||
%% TODO: Gateway v0.2
|
-include_lib("emqx/include/types.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
-logger_header("[GW-Conn]").
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([ start_link/3
|
||||||
|
, stop/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ info/1
|
||||||
|
, stats/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ call/2
|
||||||
|
, call/3
|
||||||
|
, cast/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Callback
|
||||||
|
-export([init/6]).
|
||||||
|
|
||||||
|
%% Sys callbacks
|
||||||
|
-export([ system_continue/3
|
||||||
|
, system_terminate/4
|
||||||
|
, system_code_change/4
|
||||||
|
, system_get_state/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Internal callback
|
||||||
|
-export([wakeup_from_hib/2, recvloop/2]).
|
||||||
|
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
%% TCP/SSL/UDP/DTLS Wrapped Socket
|
||||||
|
socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
|
||||||
|
%% Peername of the connection
|
||||||
|
peername :: emqx_types:peername(),
|
||||||
|
%% Sockname of the connection
|
||||||
|
sockname :: emqx_types:peername(),
|
||||||
|
%% Sock State
|
||||||
|
sockstate :: emqx_types:sockstate(),
|
||||||
|
%% The {active, N} option
|
||||||
|
active_n :: pos_integer(),
|
||||||
|
%% Limiter
|
||||||
|
limiter :: maybe(emqx_limiter:limiter()),
|
||||||
|
%% Limit Timer
|
||||||
|
limit_timer :: maybe(reference()),
|
||||||
|
%% Parse State
|
||||||
|
parse_state :: emqx_gateway_frame:parse_state(),
|
||||||
|
%% Serialize options
|
||||||
|
serialize :: emqx_gateway_frame:serialize_opts(),
|
||||||
|
%% Channel State
|
||||||
|
channel :: emqx_gateway_channel:channel(),
|
||||||
|
%% GC State
|
||||||
|
gc_state :: maybe(emqx_gc:gc_state()),
|
||||||
|
%% Stats Timer
|
||||||
|
stats_timer :: disabled | maybe(reference()),
|
||||||
|
%% Idle Timeout
|
||||||
|
idle_timeout :: integer(),
|
||||||
|
%% Idle Timer
|
||||||
|
idle_timer :: maybe(reference()),
|
||||||
|
%% OOM Policy
|
||||||
|
oom_policy :: maybe(emqx_types:oom_policy()),
|
||||||
|
%% Frame Module
|
||||||
|
frame_mod :: atom(),
|
||||||
|
%% Channel Module
|
||||||
|
chann_mod :: atom()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-type(state() :: #state{}).
|
||||||
|
|
||||||
|
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
|
||||||
|
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
|
||||||
|
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
|
||||||
|
|
||||||
|
-define(ENABLED(X), (X =/= undefined)).
|
||||||
|
|
||||||
|
-dialyzer({nowarn_function,
|
||||||
|
[ system_terminate/4
|
||||||
|
, handle_call/3
|
||||||
|
, handle_msg/2
|
||||||
|
, shutdown/3
|
||||||
|
, stop/3
|
||||||
|
, parse_incoming/3
|
||||||
|
]}).
|
||||||
|
|
||||||
|
%% udp
|
||||||
|
start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) ->
|
||||||
|
Args = [self(), Socket, Peername, Options] ++ callback_modules(Options),
|
||||||
|
{ok, proc_lib:spawn_link(?MODULE, init, Args)};
|
||||||
|
|
||||||
|
%% tcp/ssl/dtls
|
||||||
|
start_link(esockd_transport, Sock, Options) ->
|
||||||
|
Socket = {esockd_transport, Sock},
|
||||||
|
Args = [self(), Socket, undefined, Options] ++ callback_modules(Options),
|
||||||
|
{ok, proc_lib:spawn_link(?MODULE, init, Args)}.
|
||||||
|
|
||||||
|
callback_modules(Options) ->
|
||||||
|
[maps:get(frame_mod, Options),
|
||||||
|
maps:get(chann_mod, Options)].
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc Get infos of the connection/channel.
|
||||||
|
-spec(info(pid()|state()) -> emqx_types:infos()).
|
||||||
|
info(CPid) when is_pid(CPid) ->
|
||||||
|
call(CPid, info);
|
||||||
|
info(State = #state{chann_mod = ChannMod, channel = Channel}) ->
|
||||||
|
ChanInfo = ChannMod:info(Channel),
|
||||||
|
SockInfo = maps:from_list(
|
||||||
|
info(?INFO_KEYS, State)),
|
||||||
|
ChanInfo#{sockinfo => SockInfo}.
|
||||||
|
|
||||||
|
info(Keys, State) when is_list(Keys) ->
|
||||||
|
[{Key, info(Key, State)} || Key <- Keys];
|
||||||
|
info(socktype, #state{socket = Socket}) ->
|
||||||
|
esockd_type(Socket);
|
||||||
|
info(peername, #state{peername = Peername}) ->
|
||||||
|
Peername;
|
||||||
|
info(sockname, #state{sockname = Sockname}) ->
|
||||||
|
Sockname;
|
||||||
|
info(sockstate, #state{sockstate = SockSt}) ->
|
||||||
|
SockSt;
|
||||||
|
info(active_n, #state{active_n = ActiveN}) ->
|
||||||
|
ActiveN.
|
||||||
|
|
||||||
|
-spec(stats(pid()|state()) -> emqx_types:stats()).
|
||||||
|
stats(CPid) when is_pid(CPid) ->
|
||||||
|
call(CPid, stats);
|
||||||
|
stats(#state{socket = Socket,
|
||||||
|
chann_mod = ChannMod,
|
||||||
|
channel = Channel}) ->
|
||||||
|
SockStats = case esockd_getstat(Socket, ?SOCK_STATS) of
|
||||||
|
{ok, Ss} -> Ss;
|
||||||
|
{error, _} -> []
|
||||||
|
end,
|
||||||
|
ConnStats = emqx_pd:get_counters(?CONN_STATS),
|
||||||
|
ChanStats = ChannMod:stats(Channel),
|
||||||
|
ProcStats = emqx_misc:proc_stats(),
|
||||||
|
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
|
||||||
|
|
||||||
|
call(Pid, Req) ->
|
||||||
|
call(Pid, Req, infinity).
|
||||||
|
|
||||||
|
call(Pid, Req, Timeout) ->
|
||||||
|
gen_server:call(Pid, Req, Timeout).
|
||||||
|
|
||||||
|
cast(Pid, Req) ->
|
||||||
|
gen_server:cast(Pid, Req).
|
||||||
|
|
||||||
|
stop(Pid) ->
|
||||||
|
gen_server:stop(Pid).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Wrapped funcs
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
esockd_peername({udp, _SockPid, _Sock}, Peername) ->
|
||||||
|
Peername;
|
||||||
|
esockd_peername({esockd_transport, Sock}, _Peername) ->
|
||||||
|
{ok, Peername} = esockd_transport:ensure_ok_or_exit(peername, [Sock]),
|
||||||
|
Peername.
|
||||||
|
|
||||||
|
esockd_wait(Socket = {udp, _SockPid, _Sock}) ->
|
||||||
|
{ok, Socket};
|
||||||
|
esockd_wait({esockd_transport, Sock}) ->
|
||||||
|
case esockd_transport:wait(Sock) of
|
||||||
|
{ok, NSock} -> {ok, {esockd_transport, NSock}};
|
||||||
|
R = {error, _} -> R
|
||||||
|
end.
|
||||||
|
|
||||||
|
esockd_close({udp, _SockPid, _Sock}) ->
|
||||||
|
%% nothing to do for udp socket
|
||||||
|
%%gen_udp:close(Sock);
|
||||||
|
ok;
|
||||||
|
esockd_close({esockd_transport, Sock}) ->
|
||||||
|
esockd_transport:fast_close(Sock).
|
||||||
|
|
||||||
|
esockd_ensure_ok_or_exit(peercert, {udp, _SockPid, _Sock}) ->
|
||||||
|
nossl;
|
||||||
|
esockd_ensure_ok_or_exit(Fun, {udp, _SockPid, Sock}) ->
|
||||||
|
esockd_transport:ensure_ok_or_exit(Fun, [Sock]);
|
||||||
|
esockd_ensure_ok_or_exit(Fun, {esockd_transport, Socket}) ->
|
||||||
|
esockd_transport:ensure_ok_or_exit(Fun, [Socket]).
|
||||||
|
|
||||||
|
esockd_type({udp, _, _}) ->
|
||||||
|
udp;
|
||||||
|
esockd_type({esockd_transport, Socket}) ->
|
||||||
|
esockd_transport:type(Socket).
|
||||||
|
|
||||||
|
esockd_setopts({udp, _, _}, _) ->
|
||||||
|
ok;
|
||||||
|
esockd_setopts({esockd_transport, Socket}, Opts) ->
|
||||||
|
%% FIXME: DTLS works??
|
||||||
|
esockd_transport:setopts(Socket, Opts).
|
||||||
|
|
||||||
|
esockd_getstat({udp, _SockPid, Sock}, Stats) ->
|
||||||
|
inet:getstat(Sock, Stats);
|
||||||
|
esockd_getstat({esockd_transport, Sock}, Stats) ->
|
||||||
|
esockd_transport:getstat(Sock, Stats).
|
||||||
|
|
||||||
|
esockd_send(Data, #state{socket = {udp, _SockPid, Sock},
|
||||||
|
peername = {Ip, Port}}) ->
|
||||||
|
gen_udp:send(Sock, Ip, Port, Data);
|
||||||
|
esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
|
||||||
|
esockd_transport:async_send(Sock, Data).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% callbacks
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
init(Parent, WrappedSock, Peername0, Options, FrameMod, ChannMod) ->
|
||||||
|
case esockd_wait(WrappedSock) of
|
||||||
|
{ok, NWrappedSock} ->
|
||||||
|
Peername = esockd_peername(NWrappedSock, Peername0),
|
||||||
|
run_loop(Parent, init_state(NWrappedSock, Peername,
|
||||||
|
Options, FrameMod, ChannMod));
|
||||||
|
{error, Reason} ->
|
||||||
|
ok = esockd_close(WrappedSock),
|
||||||
|
exit_on_sock_error(Reason)
|
||||||
|
end.
|
||||||
|
|
||||||
|
init_state(WrappedSock, Peername, Options, FrameMod, ChannMod) ->
|
||||||
|
{ok, Sockname} = esockd_ensure_ok_or_exit(sockname, WrappedSock),
|
||||||
|
Peercert = esockd_ensure_ok_or_exit(peercert, WrappedSock),
|
||||||
|
ConnInfo = #{socktype => esockd_type(WrappedSock),
|
||||||
|
peername => Peername,
|
||||||
|
sockname => Sockname,
|
||||||
|
peercert => Peercert,
|
||||||
|
conn_mod => ?MODULE
|
||||||
|
},
|
||||||
|
ActiveN = emqx_gateway_utils:active_n(Options),
|
||||||
|
%% FIXME:
|
||||||
|
%%Limiter = emqx_limiter:init(Options),
|
||||||
|
Limiter = undefined,
|
||||||
|
FrameOpts = emqx_gateway_utils:frame_options(Options),
|
||||||
|
ParseState = FrameMod:initial_parse_state(FrameOpts),
|
||||||
|
Serialize = FrameMod:serialize_opts(),
|
||||||
|
Channel = ChannMod:init(ConnInfo, Options),
|
||||||
|
GcState = emqx_gateway_utils:init_gc_state(Options),
|
||||||
|
StatsTimer = emqx_gateway_utils:stats_timer(Options),
|
||||||
|
IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
|
||||||
|
OomPolicy = emqx_gateway_utils:oom_policy(Options),
|
||||||
|
IdleTimer = emqx_misc:start_timer(IdleTimeout, idle_timeout),
|
||||||
|
#state{socket = WrappedSock,
|
||||||
|
peername = Peername,
|
||||||
|
sockname = Sockname,
|
||||||
|
sockstate = idle,
|
||||||
|
active_n = ActiveN,
|
||||||
|
limiter = Limiter,
|
||||||
|
parse_state = ParseState,
|
||||||
|
serialize = Serialize,
|
||||||
|
channel = Channel,
|
||||||
|
gc_state = GcState,
|
||||||
|
stats_timer = StatsTimer,
|
||||||
|
idle_timeout = IdleTimeout,
|
||||||
|
idle_timer = IdleTimer,
|
||||||
|
oom_policy = OomPolicy,
|
||||||
|
frame_mod = FrameMod,
|
||||||
|
chann_mod = ChannMod
|
||||||
|
}.
|
||||||
|
|
||||||
|
run_loop(Parent, State = #state{socket = Socket,
|
||||||
|
peername = Peername,
|
||||||
|
oom_policy = OomPolicy
|
||||||
|
}) ->
|
||||||
|
emqx_logger:set_metadata_peername(esockd:format(Peername)),
|
||||||
|
_ = emqx_misc:tune_heap_size(OomPolicy),
|
||||||
|
case activate_socket(State) of
|
||||||
|
{ok, NState} ->
|
||||||
|
hibernate(Parent, NState);
|
||||||
|
{error, Reason} ->
|
||||||
|
ok = esockd_close(Socket),
|
||||||
|
exit_on_sock_error(Reason)
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec exit_on_sock_error(atom()) -> no_return().
|
||||||
|
exit_on_sock_error(Reason) when Reason =:= einval;
|
||||||
|
Reason =:= enotconn;
|
||||||
|
Reason =:= closed ->
|
||||||
|
erlang:exit(normal);
|
||||||
|
exit_on_sock_error(timeout) ->
|
||||||
|
erlang:exit({shutdown, ssl_upgrade_timeout});
|
||||||
|
exit_on_sock_error(Reason) ->
|
||||||
|
erlang:exit({shutdown, Reason}).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Recv Loop
|
||||||
|
|
||||||
|
recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
||||||
|
receive
|
||||||
|
Msg ->
|
||||||
|
handle_recv(Msg, Parent, State)
|
||||||
|
after
|
||||||
|
IdleTimeout + 100 ->
|
||||||
|
hibernate(Parent, cancel_stats_timer(State))
|
||||||
|
end.
|
||||||
|
|
||||||
|
handle_recv({system, From, Request}, Parent, State) ->
|
||||||
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
|
||||||
|
handle_recv({'EXIT', Parent, Reason}, Parent, State) ->
|
||||||
|
%% FIXME: it's not trapping exit, should never receive an EXIT
|
||||||
|
terminate(Reason, State);
|
||||||
|
handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
||||||
|
case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of
|
||||||
|
{ok, NewState} ->
|
||||||
|
?MODULE:recvloop(Parent, NewState);
|
||||||
|
{stop, Reason, NewSate} ->
|
||||||
|
terminate(Reason, NewSate)
|
||||||
|
end.
|
||||||
|
|
||||||
|
hibernate(Parent, State) ->
|
||||||
|
proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).
|
||||||
|
|
||||||
|
%% Maybe do something here later.
|
||||||
|
wakeup_from_hib(Parent, State) ->
|
||||||
|
?MODULE:recvloop(Parent, State).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Ensure/cancel stats timer
|
||||||
|
|
||||||
|
ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
|
||||||
|
State#state{stats_timer = emqx_misc:start_timer(Timeout, emit_stats)};
|
||||||
|
ensure_stats_timer(_Timeout, State) -> State.
|
||||||
|
|
||||||
|
cancel_stats_timer(State = #state{stats_timer = TRef})
|
||||||
|
when is_reference(TRef) ->
|
||||||
|
ok = emqx_misc:cancel_timer(TRef),
|
||||||
|
State#state{stats_timer = undefined};
|
||||||
|
cancel_stats_timer(State) -> State.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Process next Msg
|
||||||
|
|
||||||
|
process_msg([], State) ->
|
||||||
|
{ok, State};
|
||||||
|
process_msg([Msg|More], State) ->
|
||||||
|
try
|
||||||
|
case handle_msg(Msg, State) of
|
||||||
|
ok ->
|
||||||
|
process_msg(More, State);
|
||||||
|
{ok, NState} ->
|
||||||
|
process_msg(More, NState);
|
||||||
|
{ok, Msgs, NState} ->
|
||||||
|
process_msg(append_msg(More, Msgs), NState);
|
||||||
|
{stop, Reason, NState} ->
|
||||||
|
{stop, Reason, NState}
|
||||||
|
end
|
||||||
|
catch
|
||||||
|
exit : normal ->
|
||||||
|
{stop, normal, State};
|
||||||
|
exit : shutdown ->
|
||||||
|
{stop, shutdown, State};
|
||||||
|
exit : {shutdown, _} = Shutdown ->
|
||||||
|
{stop, Shutdown, State};
|
||||||
|
Exception : Context : Stack ->
|
||||||
|
{stop, #{exception => Exception,
|
||||||
|
context => Context,
|
||||||
|
stacktrace => Stack}, State}
|
||||||
|
end.
|
||||||
|
|
||||||
|
append_msg([], Msgs) when is_list(Msgs) ->
|
||||||
|
Msgs;
|
||||||
|
append_msg([], Msg) -> [Msg];
|
||||||
|
append_msg(Q, Msgs) when is_list(Msgs) ->
|
||||||
|
lists:append(Q, Msgs);
|
||||||
|
append_msg(Q, Msg) ->
|
||||||
|
lists:append(Q, [Msg]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Handle a Msg
|
||||||
|
|
||||||
|
handle_msg({'$gen_call', From, Req}, State) ->
|
||||||
|
case handle_call(From, Req, State) of
|
||||||
|
{reply, Reply, NState} ->
|
||||||
|
gen_server:reply(From, Reply),
|
||||||
|
{ok, NState};
|
||||||
|
{reply, Reply, Msgs, NState} ->
|
||||||
|
gen_server:reply(From, Reply),
|
||||||
|
{ok, next_msgs(Msgs), NState};
|
||||||
|
{stop, Reason, Reply, NState} ->
|
||||||
|
gen_server:reply(From, Reply),
|
||||||
|
stop(Reason, NState)
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_msg({'$gen_cast', Req}, State) ->
|
||||||
|
with_channel(handle_cast, [Req], State);
|
||||||
|
|
||||||
|
handle_msg({datagram, _SockPid, Data}, State) ->
|
||||||
|
parse_incoming(Data, State);
|
||||||
|
|
||||||
|
handle_msg({Inet, _Sock, Data}, State)
|
||||||
|
when Inet == tcp;
|
||||||
|
Inet == ssl ->
|
||||||
|
parse_incoming(Data, State);
|
||||||
|
|
||||||
|
handle_msg({incoming, Packet},
|
||||||
|
State = #state{idle_timer = IdleTimer}) ->
|
||||||
|
IdleTimer /= undefined andalso
|
||||||
|
emqx_misc:cancel_timer(IdleTimer),
|
||||||
|
NState = State#state{idle_timer = undefined},
|
||||||
|
handle_incoming(Packet, NState);
|
||||||
|
|
||||||
|
handle_msg({outgoing, Data}, State) ->
|
||||||
|
handle_outgoing(Data, State);
|
||||||
|
|
||||||
|
handle_msg({Error, _Sock, Reason}, State)
|
||||||
|
when Error == tcp_error; Error == ssl_error ->
|
||||||
|
handle_info({sock_error, Reason}, State);
|
||||||
|
|
||||||
|
handle_msg({Closed, _Sock}, State)
|
||||||
|
when Closed == tcp_closed; Closed == ssl_closed ->
|
||||||
|
handle_info({sock_closed, Closed}, close_socket(State));
|
||||||
|
|
||||||
|
%% TODO: udp_passive???
|
||||||
|
handle_msg({Passive, _Sock}, State)
|
||||||
|
when Passive == tcp_passive; Passive == ssl_passive ->
|
||||||
|
%% In Stats
|
||||||
|
Bytes = emqx_pd:reset_counter(incoming_bytes),
|
||||||
|
Pubs = emqx_pd:reset_counter(incoming_pkt),
|
||||||
|
InStats = #{cnt => Pubs, oct => Bytes},
|
||||||
|
%% Ensure Rate Limit
|
||||||
|
NState = ensure_rate_limit(InStats, State),
|
||||||
|
%% Run GC and Check OOM
|
||||||
|
NState1 = check_oom(run_gc(InStats, NState)),
|
||||||
|
handle_info(activate_socket, NState1);
|
||||||
|
|
||||||
|
handle_msg(Deliver = {deliver, _Topic, _Msg},
|
||||||
|
State = #state{active_n = ActiveN}) ->
|
||||||
|
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
|
||||||
|
with_channel(handle_deliver, [Delivers], State);
|
||||||
|
|
||||||
|
%% Something sent
|
||||||
|
%% TODO: Who will deliver this message?
|
||||||
|
handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
|
||||||
|
case emqx_pd:get_counter(outgoing_pkt) > ActiveN of
|
||||||
|
true ->
|
||||||
|
Pubs = emqx_pd:reset_counter(outgoing_pkt),
|
||||||
|
Bytes = emqx_pd:reset_counter(outgoing_bytes),
|
||||||
|
OutStats = #{cnt => Pubs, oct => Bytes},
|
||||||
|
{ok, check_oom(run_gc(OutStats, State))};
|
||||||
|
false -> ok
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||||
|
handle_info({sock_error, Reason}, State);
|
||||||
|
|
||||||
|
handle_msg({close, Reason}, State) ->
|
||||||
|
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
|
||||||
|
handle_info({sock_closed, Reason}, close_socket(State));
|
||||||
|
|
||||||
|
handle_msg({event, connected}, State = #state{
|
||||||
|
chann_mod = ChannMod,
|
||||||
|
channel = Channel}) ->
|
||||||
|
Ctx = ChannMod:info(ctx, Channel),
|
||||||
|
ClientId = ChannMod:info(clientid, Channel),
|
||||||
|
emqx_gateway_ctx:insert_channel_info(
|
||||||
|
Ctx,
|
||||||
|
ClientId,
|
||||||
|
info(State),
|
||||||
|
stats(State)
|
||||||
|
);
|
||||||
|
|
||||||
|
handle_msg({event, disconnected}, State = #state{
|
||||||
|
chann_mod = ChannMod,
|
||||||
|
channel = Channel}) ->
|
||||||
|
Ctx = ChannMod:info(ctx, Channel),
|
||||||
|
ClientId = ChannMod:info(clientid, Channel),
|
||||||
|
emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)),
|
||||||
|
emqx_gateway_ctx:connection_closed(Ctx, ClientId),
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
handle_msg({event, _Other}, State = #state{
|
||||||
|
chann_mod = ChannMod,
|
||||||
|
channel = Channel}) ->
|
||||||
|
Ctx = ChannMod:info(ctx, Channel),
|
||||||
|
ClientId = ChannMod:info(clientid, Channel),
|
||||||
|
emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)),
|
||||||
|
emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)),
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
handle_msg({timeout, TRef, TMsg}, State) ->
|
||||||
|
handle_timeout(TRef, TMsg, State);
|
||||||
|
|
||||||
|
handle_msg(Shutdown = {shutdown, _Reason}, State) ->
|
||||||
|
stop(Shutdown, State);
|
||||||
|
|
||||||
|
handle_msg(Msg, State) ->
|
||||||
|
handle_info(Msg, State).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Terminate
|
||||||
|
|
||||||
|
-spec terminate(atom(), state()) -> no_return().
|
||||||
|
terminate(Reason, State = #state{
|
||||||
|
chann_mod = ChannMod,
|
||||||
|
channel = Channel}) ->
|
||||||
|
?LOG(debug, "Terminated due to ~p", [Reason]),
|
||||||
|
_ = ChannMod:terminate(Reason, Channel),
|
||||||
|
_ = close_socket(State),
|
||||||
|
exit(Reason).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Sys callbacks
|
||||||
|
|
||||||
|
system_continue(Parent, _Debug, State) ->
|
||||||
|
?MODULE:recvloop(Parent, State).
|
||||||
|
|
||||||
|
system_terminate(Reason, _Parent, _Debug, State) ->
|
||||||
|
terminate(Reason, State).
|
||||||
|
|
||||||
|
system_code_change(State, _Mod, _OldVsn, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
system_get_state(State) -> {ok, State}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Handle call
|
||||||
|
|
||||||
|
handle_call(_From, info, State) ->
|
||||||
|
{reply, info(State), State};
|
||||||
|
|
||||||
|
handle_call(_From, stats, State) ->
|
||||||
|
{reply, stats(State), State};
|
||||||
|
|
||||||
|
handle_call(_From, Req, State = #state{
|
||||||
|
chann_mod = ChannMod,
|
||||||
|
channel = Channel}) ->
|
||||||
|
case ChannMod:handle_call(Req, Channel) of
|
||||||
|
{reply, Reply, NChannel} ->
|
||||||
|
{reply, Reply, State#state{channel = NChannel}};
|
||||||
|
{reply, Reply, Replies, NChannel} ->
|
||||||
|
{reply, Reply, Replies, State#state{channel = NChannel}};
|
||||||
|
{shutdown, Reason, Reply, NChannel} ->
|
||||||
|
shutdown(Reason, Reply, State#state{channel = NChannel})
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Handle timeout
|
||||||
|
|
||||||
|
handle_timeout(_TRef, idle_timeout, State) ->
|
||||||
|
shutdown(idle_timeout, State);
|
||||||
|
|
||||||
|
handle_timeout(_TRef, limit_timeout, State) ->
|
||||||
|
NState = State#state{sockstate = idle,
|
||||||
|
limit_timer = undefined
|
||||||
|
},
|
||||||
|
handle_info(activate_socket, NState);
|
||||||
|
handle_timeout(TRef, keepalive, State = #state{
|
||||||
|
chann_mod = ChannMod,
|
||||||
|
socket = Socket,
|
||||||
|
channel = Channel})->
|
||||||
|
case ChannMod:info(conn_state, Channel) of
|
||||||
|
disconnected -> {ok, State};
|
||||||
|
_ ->
|
||||||
|
case esockd_getstat(Socket, [recv_oct, send_oct]) of
|
||||||
|
{ok, [{recv_oct, RecvOct}, {send_oct, SendOct}]} ->
|
||||||
|
handle_timeout(TRef, {keepalive, {RecvOct, SendOct}}, State);
|
||||||
|
{error, Reason} ->
|
||||||
|
handle_info({sock_error, Reason}, State)
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
handle_timeout(_TRef, emit_stats, State =
|
||||||
|
#state{chann_mod = ChannMod, channel = Channel}) ->
|
||||||
|
Ctx = ChannMod:info(ctx, Channel),
|
||||||
|
ClientId = ChannMod:info(clientid, Channel),
|
||||||
|
emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)),
|
||||||
|
{ok, State#state{stats_timer = undefined}};
|
||||||
|
|
||||||
|
handle_timeout(TRef, Msg, State) ->
|
||||||
|
with_channel(handle_timeout, [TRef, Msg], State).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Parse incoming data
|
||||||
|
|
||||||
|
parse_incoming(Data, State = #state{
|
||||||
|
chann_mod = ChannMod,
|
||||||
|
channel = Channel}) ->
|
||||||
|
?LOG(debug, "RECV ~0p", [Data]),
|
||||||
|
Oct = iolist_size(Data),
|
||||||
|
inc_counter(incoming_bytes, Oct),
|
||||||
|
Ctx = ChannMod:info(ctx, Channel),
|
||||||
|
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.received', Oct),
|
||||||
|
{Packets, NState} = parse_incoming(Data, [], State),
|
||||||
|
{ok, next_incoming_msgs(Packets), NState}.
|
||||||
|
|
||||||
|
parse_incoming(<<>>, Packets, State) ->
|
||||||
|
{Packets, State};
|
||||||
|
|
||||||
|
parse_incoming(Data, Packets,
|
||||||
|
State = #state{
|
||||||
|
frame_mod = FrameMod,
|
||||||
|
parse_state = ParseState}) ->
|
||||||
|
try FrameMod:parse(Data, ParseState) of
|
||||||
|
{more, NParseState} ->
|
||||||
|
{Packets, State#state{parse_state = NParseState}};
|
||||||
|
{ok, Packet, Rest, NParseState} ->
|
||||||
|
NState = State#state{parse_state = NParseState},
|
||||||
|
parse_incoming(Rest, [Packet|Packets], NState)
|
||||||
|
catch
|
||||||
|
error:Reason:Stk ->
|
||||||
|
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p",
|
||||||
|
[Reason, Stk, Data]),
|
||||||
|
{[{frame_error, Reason}|Packets], State}
|
||||||
|
end.
|
||||||
|
|
||||||
|
next_incoming_msgs([Packet]) ->
|
||||||
|
{incoming, Packet};
|
||||||
|
next_incoming_msgs(Packets) ->
|
||||||
|
[{incoming, Packet} || Packet <- lists:reverse(Packets)].
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Handle incoming packet
|
||||||
|
|
||||||
|
handle_incoming(Packet, State = #state{frame_mod = FrameMod}) ->
|
||||||
|
ok = inc_incoming_stats(Packet),
|
||||||
|
?LOG(debug, "RECV ~s", [FrameMod:format(Packet)]),
|
||||||
|
with_channel(handle_in, [Packet], State).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% With Channel
|
||||||
|
|
||||||
|
with_channel(Fun, Args, State = #state{
|
||||||
|
chann_mod = ChannMod,
|
||||||
|
channel = Channel}) ->
|
||||||
|
case erlang:apply(ChannMod, Fun, Args ++ [Channel]) of
|
||||||
|
ok -> {ok, State};
|
||||||
|
{ok, NChannel} ->
|
||||||
|
{ok, State#state{channel = NChannel}};
|
||||||
|
{ok, Replies, NChannel} ->
|
||||||
|
{ok, next_msgs(Replies), State#state{channel = NChannel}};
|
||||||
|
{shutdown, Reason, NChannel} ->
|
||||||
|
shutdown(Reason, State#state{channel = NChannel});
|
||||||
|
{shutdown, Reason, Packet, NChannel} ->
|
||||||
|
NState = State#state{channel = NChannel},
|
||||||
|
ok = handle_outgoing(Packet, NState),
|
||||||
|
shutdown(Reason, NState)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Handle outgoing packets
|
||||||
|
|
||||||
|
handle_outgoing(Packets, State) when is_list(Packets) ->
|
||||||
|
send(lists:map(serialize_and_inc_stats_fun(State), Packets), State);
|
||||||
|
|
||||||
|
handle_outgoing(Packet, State) ->
|
||||||
|
send((serialize_and_inc_stats_fun(State))(Packet), State).
|
||||||
|
|
||||||
|
serialize_and_inc_stats_fun(#state{
|
||||||
|
frame_mod = FrameMod,
|
||||||
|
chann_mod = ChannMod,
|
||||||
|
serialize = Serialize,
|
||||||
|
channel = Channel}) ->
|
||||||
|
Ctx = ChannMod:info(ctx, Channel),
|
||||||
|
fun(Packet) ->
|
||||||
|
case FrameMod:serialize_pkt(Packet, Serialize) of
|
||||||
|
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
|
||||||
|
[FrameMod:format(Packet)]),
|
||||||
|
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'),
|
||||||
|
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
|
||||||
|
<<>>;
|
||||||
|
Data -> ?LOG(debug, "SEND ~s", [FrameMod:format(Packet)]),
|
||||||
|
ok = inc_outgoing_stats(Packet),
|
||||||
|
Data
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Send data
|
||||||
|
|
||||||
|
-spec(send(iodata(), state()) -> ok).
|
||||||
|
send(IoData, State = #state{socket = Socket,
|
||||||
|
chann_mod = ChannMod,
|
||||||
|
channel = Channel}) ->
|
||||||
|
Ctx = ChannMod:info(ctx, Channel),
|
||||||
|
Oct = iolist_size(IoData),
|
||||||
|
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.sent', Oct),
|
||||||
|
inc_counter(outgoing_bytes, Oct),
|
||||||
|
case esockd_send(IoData, State) of
|
||||||
|
ok -> ok;
|
||||||
|
Error = {error, _Reason} ->
|
||||||
|
%% Send an inet_reply to postpone handling the error
|
||||||
|
self() ! {inet_reply, Socket, Error},
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Handle Info
|
||||||
|
|
||||||
|
handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
|
||||||
|
case activate_socket(State) of
|
||||||
|
{ok, NState = #state{sockstate = NewSst}} ->
|
||||||
|
if OldSst =/= NewSst ->
|
||||||
|
{ok, {event, NewSst}, NState};
|
||||||
|
true -> {ok, NState}
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
handle_info({sock_error, Reason}, State)
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_info({sock_error, Reason}, State) ->
|
||||||
|
?LOG(debug, "Socket error: ~p", [Reason]),
|
||||||
|
handle_info({sock_closed, Reason}, close_socket(State));
|
||||||
|
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
with_channel(handle_info, [Info], State).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Ensure rate limit
|
||||||
|
|
||||||
|
ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
||||||
|
case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of
|
||||||
|
false -> State;
|
||||||
|
{ok, Limiter1} ->
|
||||||
|
State#state{limiter = Limiter1};
|
||||||
|
{pause, Time, Limiter1} ->
|
||||||
|
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
|
||||||
|
TRef = emqx_misc:start_timer(Time, limit_timeout),
|
||||||
|
State#state{sockstate = blocked,
|
||||||
|
limiter = Limiter1,
|
||||||
|
limit_timer = TRef
|
||||||
|
}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Run GC and Check OOM
|
||||||
|
|
||||||
|
run_gc(Stats, State = #state{gc_state = GcSt}) ->
|
||||||
|
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
|
||||||
|
false -> State;
|
||||||
|
{_IsGC, GcSt1} ->
|
||||||
|
State#state{gc_state = GcSt1}
|
||||||
|
end.
|
||||||
|
|
||||||
|
check_oom(State = #state{oom_policy = OomPolicy}) ->
|
||||||
|
case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
|
||||||
|
{shutdown, Reason} ->
|
||||||
|
%% triggers terminate/2 callback immediately
|
||||||
|
erlang:exit({shutdown, Reason});
|
||||||
|
_Other -> ok
|
||||||
|
end,
|
||||||
|
State.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Activate Socket
|
||||||
|
|
||||||
|
activate_socket(State = #state{sockstate = closed}) ->
|
||||||
|
{ok, State};
|
||||||
|
activate_socket(State = #state{sockstate = blocked}) ->
|
||||||
|
{ok, State};
|
||||||
|
activate_socket(State = #state{socket = Socket,
|
||||||
|
active_n = N}) ->
|
||||||
|
%% FIXME: Works on dtls/udp ???
|
||||||
|
%% How to hanlde buffer?
|
||||||
|
case esockd_setopts(Socket, [{active, N}]) of
|
||||||
|
ok -> {ok, State#state{sockstate = running}};
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Close Socket
|
||||||
|
|
||||||
|
close_socket(State = #state{sockstate = closed}) -> State;
|
||||||
|
close_socket(State = #state{socket = Socket}) ->
|
||||||
|
ok = esockd_close(Socket),
|
||||||
|
State#state{sockstate = closed}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Inc incoming/outgoing stats
|
||||||
|
|
||||||
|
%% XXX: How to stats?
|
||||||
|
inc_incoming_stats(_Packet) ->
|
||||||
|
inc_counter(recv_pkt, 1),
|
||||||
|
ok.
|
||||||
|
%case Type =:= ?CMD_SEND of
|
||||||
|
% true ->
|
||||||
|
% inc_counter(recv_msg, 1),
|
||||||
|
% inc_counter(incoming_pubs, 1);
|
||||||
|
% false ->
|
||||||
|
% ok
|
||||||
|
%end,
|
||||||
|
%emqx_metrics:inc_recv(Packet).
|
||||||
|
|
||||||
|
inc_outgoing_stats(_Packet) ->
|
||||||
|
inc_counter(send_pkt, 1),
|
||||||
|
ok.
|
||||||
|
%case Type =:= ?CMD_MESSAGE of
|
||||||
|
% true ->
|
||||||
|
% inc_counter(send_msg, 1),
|
||||||
|
% inc_counter(outgoing_pubs, 1);
|
||||||
|
% false ->
|
||||||
|
% ok
|
||||||
|
%end,
|
||||||
|
%emqx_metrics:inc_sent(Packet).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Helper functions
|
||||||
|
|
||||||
|
-compile({inline, [next_msgs/1]}).
|
||||||
|
next_msgs(Event) when is_tuple(Event) ->
|
||||||
|
Event;
|
||||||
|
next_msgs(More) when is_list(More) ->
|
||||||
|
More.
|
||||||
|
|
||||||
|
shutdown(Reason, State) ->
|
||||||
|
stop({shutdown, Reason}, State).
|
||||||
|
|
||||||
|
shutdown(Reason, Reply, State) ->
|
||||||
|
stop({shutdown, Reason}, Reply, State).
|
||||||
|
|
||||||
|
stop(Reason, State) ->
|
||||||
|
{stop, Reason, State}.
|
||||||
|
|
||||||
|
stop(Reason, Reply, State) ->
|
||||||
|
{stop, Reason, Reply, State}.
|
||||||
|
|
||||||
|
inc_counter(Name, Value) ->
|
||||||
|
_ = emqx_pd:inc_counter(Name, Value),
|
||||||
|
ok.
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc The Gateway frame behavior
|
||||||
|
%%
|
||||||
|
%% This module does not export any functions at the moment.
|
||||||
|
%% It is only used to standardize the implement of emqx_foo_frame.erl
|
||||||
|
%% module if it integrated with emqx_gateway_conn module
|
||||||
|
%%
|
||||||
|
-module(emqx_gateway_frame).
|
||||||
|
|
||||||
|
|
||||||
|
-type parse_state() :: map().
|
||||||
|
|
||||||
|
-type frame() :: any().
|
||||||
|
|
||||||
|
-type parse_result() :: {ok, frame(),
|
||||||
|
Rest :: binary(), NewState :: parse_state()}
|
||||||
|
| {more, NewState :: parse_state()}.
|
||||||
|
|
||||||
|
-type serialize_options() :: map().
|
||||||
|
|
||||||
|
%% Callbacks
|
||||||
|
|
||||||
|
%% @doc Initial the frame parser states
|
||||||
|
-callback initial_parse_state(map()) -> parse_state().
|
||||||
|
|
||||||
|
-callback serialize_opts() -> serialize_options().
|
||||||
|
|
||||||
|
-callback serialize_pkt(Frame :: any(), serialize_options()) -> iodata().
|
||||||
|
|
||||||
|
-callback parse(binary(), parse_state()) -> parse_result().
|
||||||
|
|
||||||
|
-callback format(Frame :: any()) -> string().
|
||||||
|
|
|
@ -43,6 +43,7 @@
|
||||||
|
|
||||||
-define(ACTIVE_N, 100).
|
-define(ACTIVE_N, 100).
|
||||||
-define(DEFAULT_IDLE_TIMEOUT, 30000).
|
-define(DEFAULT_IDLE_TIMEOUT, 30000).
|
||||||
|
-define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}).
|
||||||
-define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304,
|
-define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304,
|
||||||
message_queue_len => 32000}).
|
message_queue_len => 32000}).
|
||||||
|
|
||||||
|
@ -158,7 +159,7 @@ init_gc_state(Options) ->
|
||||||
|
|
||||||
-spec force_gc_policy(map()) -> emqx_gc:opts() | undefined.
|
-spec force_gc_policy(map()) -> emqx_gc:opts() | undefined.
|
||||||
force_gc_policy(Options) ->
|
force_gc_policy(Options) ->
|
||||||
maps:get(force_gc_policy, Options, undefined).
|
maps:get(force_gc_policy, Options, ?DEFAULT_GC_OPTS).
|
||||||
|
|
||||||
-spec oom_policy(map()) -> emqx_types:oom_policy().
|
-spec oom_policy(map()) -> emqx_types:oom_policy().
|
||||||
oom_policy(Options) ->
|
oom_policy(Options) ->
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
|
|
||||||
-module(emqx_sn_channel).
|
-module(emqx_sn_channel).
|
||||||
|
|
||||||
|
-behavior(emqx_gateway_channel).
|
||||||
|
|
||||||
-include("src/mqttsn/include/emqx_sn.hrl").
|
-include("src/mqttsn/include/emqx_sn.hrl").
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
|
@ -1270,7 +1272,7 @@ handle_timeout(_TRef, {keepalive, _StatVal},
|
||||||
when ConnState =:= disconnected;
|
when ConnState =:= disconnected;
|
||||||
ConnState =:= asleep ->
|
ConnState =:= asleep ->
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
handle_timeout(_TRef, {keepalive, StatVal},
|
handle_timeout(_TRef, {keepalive, {StatVal, _}},
|
||||||
Channel = #channel{keepalive = Keepalive}) ->
|
Channel = #channel{keepalive = Keepalive}) ->
|
||||||
case emqx_keepalive:check(StatVal, Keepalive) of
|
case emqx_keepalive:check(StatVal, Keepalive) of
|
||||||
{ok, NKeepalive} ->
|
{ok, NKeepalive} ->
|
||||||
|
@ -1422,5 +1424,3 @@ returncode_name(?SN_RC_NOT_AUTHORIZE) -> rejected_not_authorize;
|
||||||
returncode_name(?SN_RC_FAILED_SESSION) -> rejected_failed_open_session;
|
returncode_name(?SN_RC_FAILED_SESSION) -> rejected_failed_open_session;
|
||||||
returncode_name(?SN_EXCEED_LIMITATION) -> rejected_exceed_limitation;
|
returncode_name(?SN_EXCEED_LIMITATION) -> rejected_exceed_limitation;
|
||||||
returncode_name(_) -> accepted.
|
returncode_name(_) -> accepted.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,807 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @doc MQTT-SN Connection process
|
|
||||||
-module(emqx_sn_conn).
|
|
||||||
|
|
||||||
-include_lib("emqx/include/types.hrl").
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
|
||||||
|
|
||||||
-logger_header("[SN-Conn]").
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([ start_link/3
|
|
||||||
, stop/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([ info/1
|
|
||||||
, stats/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([ call/2
|
|
||||||
, call/3
|
|
||||||
, cast/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
%% Callback
|
|
||||||
-export([init/4]).
|
|
||||||
|
|
||||||
%% Sys callbacks
|
|
||||||
-export([ system_continue/3
|
|
||||||
, system_terminate/4
|
|
||||||
, system_code_change/4
|
|
||||||
, system_get_state/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
%% Internal callback
|
|
||||||
-export([wakeup_from_hib/2, recvloop/2]).
|
|
||||||
|
|
||||||
-import(emqx_misc, [start_timer/2]).
|
|
||||||
|
|
||||||
-record(state, {
|
|
||||||
%% TCP/SSL/UDP/DTLS Wrapped Socket
|
|
||||||
socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
|
|
||||||
%% Peername of the connection
|
|
||||||
peername :: emqx_types:peername(),
|
|
||||||
%% Sockname of the connection
|
|
||||||
sockname :: emqx_types:peername(),
|
|
||||||
%% Sock State
|
|
||||||
sockstate :: emqx_types:sockstate(),
|
|
||||||
%% The {active, N} option
|
|
||||||
active_n :: pos_integer(),
|
|
||||||
%% Limiter
|
|
||||||
limiter :: maybe(emqx_limiter:limiter()),
|
|
||||||
%% Limit Timer
|
|
||||||
limit_timer :: maybe(reference()),
|
|
||||||
%% Parse State
|
|
||||||
parse_state :: emqx_sn_frame:parse_state(),
|
|
||||||
%% Serialize options
|
|
||||||
serialize :: emqx_sn_frame:serialize_opts(),
|
|
||||||
%% Channel State
|
|
||||||
channel :: emqx_sn_channel:channel(),
|
|
||||||
%% GC State
|
|
||||||
gc_state :: maybe(emqx_gc:gc_state()),
|
|
||||||
%% Stats Timer
|
|
||||||
stats_timer :: disabled | maybe(reference()),
|
|
||||||
%% Idle Timeout
|
|
||||||
idle_timeout :: integer(),
|
|
||||||
%% Idle Timer
|
|
||||||
idle_timer :: maybe(reference()),
|
|
||||||
%% OOM Policy
|
|
||||||
oom_policy :: maybe(emqx_types:oom_policy())
|
|
||||||
}).
|
|
||||||
|
|
||||||
-type(state() :: #state{}).
|
|
||||||
|
|
||||||
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
|
|
||||||
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
|
|
||||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
|
|
||||||
|
|
||||||
-define(ENABLED(X), (X =/= undefined)).
|
|
||||||
|
|
||||||
-define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}).
|
|
||||||
-define(DEFAULT_IDLE_TIMEOUT, 30000).
|
|
||||||
|
|
||||||
-dialyzer({nowarn_function,
|
|
||||||
[ system_terminate/4
|
|
||||||
, handle_call/3
|
|
||||||
, handle_msg/2
|
|
||||||
, shutdown/3
|
|
||||||
, stop/3
|
|
||||||
, parse_incoming/3
|
|
||||||
]}).
|
|
||||||
|
|
||||||
%% udp
|
|
||||||
start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) ->
|
|
||||||
Args = [self(), Socket, Peername, Options],
|
|
||||||
{ok, proc_lib:spawn_link(?MODULE, init, Args)};
|
|
||||||
|
|
||||||
%% tcp/ssl/dtls
|
|
||||||
start_link(esockd_transport, Sock, Options) ->
|
|
||||||
Socket = {esockd_transport, Sock},
|
|
||||||
case esockd_transport:peername(Sock) of
|
|
||||||
{ok, Peername} ->
|
|
||||||
Args = [self(), Socket, Peername, Options],
|
|
||||||
{ok, proc_lib:spawn_link(?MODULE, init, Args)};
|
|
||||||
R = {error, _} -> R
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% API
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @doc Get infos of the connection/channel.
|
|
||||||
-spec(info(pid()|state()) -> emqx_types:infos()).
|
|
||||||
info(CPid) when is_pid(CPid) ->
|
|
||||||
call(CPid, info);
|
|
||||||
info(State = #state{channel = Channel}) ->
|
|
||||||
ChanInfo = emqx_sn_channel:info(Channel),
|
|
||||||
SockInfo = maps:from_list(
|
|
||||||
info(?INFO_KEYS, State)),
|
|
||||||
ChanInfo#{sockinfo => SockInfo}.
|
|
||||||
|
|
||||||
info(Keys, State) when is_list(Keys) ->
|
|
||||||
[{Key, info(Key, State)} || Key <- Keys];
|
|
||||||
info(socktype, #state{socket = Socket}) ->
|
|
||||||
esockd_type(Socket);
|
|
||||||
info(peername, #state{peername = Peername}) ->
|
|
||||||
Peername;
|
|
||||||
info(sockname, #state{sockname = Sockname}) ->
|
|
||||||
Sockname;
|
|
||||||
info(sockstate, #state{sockstate = SockSt}) ->
|
|
||||||
SockSt;
|
|
||||||
info(active_n, #state{active_n = ActiveN}) ->
|
|
||||||
ActiveN.
|
|
||||||
|
|
||||||
-spec(stats(pid()|state()) -> emqx_types:stats()).
|
|
||||||
stats(CPid) when is_pid(CPid) ->
|
|
||||||
call(CPid, stats);
|
|
||||||
stats(#state{socket = Socket,
|
|
||||||
channel = Channel}) ->
|
|
||||||
SockStats = case esockd_getstat(Socket, ?SOCK_STATS) of
|
|
||||||
{ok, Ss} -> Ss;
|
|
||||||
{error, _} -> []
|
|
||||||
end,
|
|
||||||
ConnStats = emqx_pd:get_counters(?CONN_STATS),
|
|
||||||
ChanStats = emqx_sn_channel:stats(Channel),
|
|
||||||
ProcStats = emqx_misc:proc_stats(),
|
|
||||||
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
|
|
||||||
|
|
||||||
call(Pid, Req) ->
|
|
||||||
call(Pid, Req, infinity).
|
|
||||||
|
|
||||||
call(Pid, Req, Timeout) ->
|
|
||||||
gen_server:call(Pid, Req, Timeout).
|
|
||||||
|
|
||||||
cast(Pid, Req) ->
|
|
||||||
gen_server:cast(Pid, Req).
|
|
||||||
|
|
||||||
stop(Pid) ->
|
|
||||||
gen_server:stop(Pid).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Wrapped funcs
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
esockd_wait(Socket = {udp, _SockPid, _Sock}) ->
|
|
||||||
{ok, Socket};
|
|
||||||
esockd_wait({esockd_transport, Sock}) ->
|
|
||||||
case esockd_transport:wait(Sock) of
|
|
||||||
{ok, NSock} -> {ok, {esockd_transport, NSock}};
|
|
||||||
R = {error, _} -> R
|
|
||||||
end.
|
|
||||||
|
|
||||||
esockd_close({udp, _SockPid, _Sock}) ->
|
|
||||||
%% nothing to do for udp socket
|
|
||||||
%%gen_udp:close(Sock);
|
|
||||||
ok;
|
|
||||||
esockd_close({esockd_transport, Sock}) ->
|
|
||||||
esockd_transport:fast_close(Sock).
|
|
||||||
|
|
||||||
esockd_ensure_ok_or_exit(peercert, {udp, _SockPid, _Sock}) ->
|
|
||||||
nossl;
|
|
||||||
esockd_ensure_ok_or_exit(Fun, {udp, _SockPid, Sock}) ->
|
|
||||||
esockd_transport:ensure_ok_or_exit(Fun, [Sock]);
|
|
||||||
esockd_ensure_ok_or_exit(Fun, {esockd_transport, Socket}) ->
|
|
||||||
esockd_transport:ensure_ok_or_exit(Fun, [Socket]).
|
|
||||||
|
|
||||||
esockd_type({udp, _, _}) ->
|
|
||||||
udp;
|
|
||||||
esockd_type({esockd_transport, Socket}) ->
|
|
||||||
esockd_transport:type(Socket).
|
|
||||||
|
|
||||||
esockd_setopts({udp, _, _}, _) ->
|
|
||||||
ok;
|
|
||||||
esockd_setopts({esockd_transport, Socket}, Opts) ->
|
|
||||||
%% FIXME: DTLS works??
|
|
||||||
esockd_transport:setopts(Socket, Opts).
|
|
||||||
|
|
||||||
esockd_getstat({udp, _SockPid, Sock}, Stats) ->
|
|
||||||
inet:getstat(Sock, Stats);
|
|
||||||
esockd_getstat({esockd_transport, Sock}, Stats) ->
|
|
||||||
esockd_transport:getstat(Sock, Stats).
|
|
||||||
|
|
||||||
esockd_send(Data, #state{socket = {udp, _SockPid, Sock},
|
|
||||||
peername = {Ip, Port}}) ->
|
|
||||||
gen_udp:send(Sock, Ip, Port, Data);
|
|
||||||
esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
|
|
||||||
esockd_transport:async_send(Sock, Data).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% callbacks
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
init(Parent, WrappedSock, Peername, Options) ->
|
|
||||||
case esockd_wait(WrappedSock) of
|
|
||||||
{ok, NWrappedSock} ->
|
|
||||||
run_loop(Parent, init_state(NWrappedSock, Peername, Options));
|
|
||||||
{error, Reason} ->
|
|
||||||
ok = esockd_close(WrappedSock),
|
|
||||||
exit_on_sock_error(Reason)
|
|
||||||
end.
|
|
||||||
|
|
||||||
init_state(WrappedSock, Peername, Options) ->
|
|
||||||
{ok, Sockname} = esockd_ensure_ok_or_exit(sockname, WrappedSock),
|
|
||||||
Peercert = esockd_ensure_ok_or_exit(peercert, WrappedSock),
|
|
||||||
ConnInfo = #{socktype => esockd_type(WrappedSock),
|
|
||||||
peername => Peername,
|
|
||||||
sockname => Sockname,
|
|
||||||
peercert => Peercert,
|
|
||||||
conn_mod => ?MODULE
|
|
||||||
},
|
|
||||||
ActiveN = emqx_gateway_utils:active_n(Options),
|
|
||||||
%% FIXME:
|
|
||||||
%%Limiter = emqx_limiter:init(Options),
|
|
||||||
Limiter = undefined,
|
|
||||||
FrameOpts = emqx_gateway_utils:frame_options(Options),
|
|
||||||
ParseState = emqx_sn_frame:initial_parse_state(FrameOpts),
|
|
||||||
Serialize = emqx_sn_frame:serialize_opts(),
|
|
||||||
Channel = emqx_sn_channel:init(ConnInfo, Options),
|
|
||||||
GcState = emqx_gateway_utils:init_gc_state(Options),
|
|
||||||
StatsTimer = emqx_gateway_utils:stats_timer(Options),
|
|
||||||
IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
|
|
||||||
OomPolicy = emqx_gateway_utils:oom_policy(Options),
|
|
||||||
IdleTimer = start_timer(IdleTimeout, idle_timeout),
|
|
||||||
#state{socket = WrappedSock,
|
|
||||||
peername = Peername,
|
|
||||||
sockname = Sockname,
|
|
||||||
sockstate = idle,
|
|
||||||
active_n = ActiveN,
|
|
||||||
limiter = Limiter,
|
|
||||||
parse_state = ParseState,
|
|
||||||
serialize = Serialize,
|
|
||||||
channel = Channel,
|
|
||||||
gc_state = GcState,
|
|
||||||
stats_timer = StatsTimer,
|
|
||||||
idle_timeout = IdleTimeout,
|
|
||||||
idle_timer = IdleTimer,
|
|
||||||
oom_policy = OomPolicy
|
|
||||||
}.
|
|
||||||
|
|
||||||
run_loop(Parent, State = #state{socket = Socket,
|
|
||||||
peername = Peername,
|
|
||||||
oom_policy = OomPolicy
|
|
||||||
}) ->
|
|
||||||
emqx_logger:set_metadata_peername(esockd:format(Peername)),
|
|
||||||
_ = emqx_misc:tune_heap_size(OomPolicy),
|
|
||||||
case activate_socket(State) of
|
|
||||||
{ok, NState} ->
|
|
||||||
hibernate(Parent, NState);
|
|
||||||
{error, Reason} ->
|
|
||||||
ok = esockd_close(Socket),
|
|
||||||
exit_on_sock_error(Reason)
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec exit_on_sock_error(atom()) -> no_return().
|
|
||||||
exit_on_sock_error(Reason) when Reason =:= einval;
|
|
||||||
Reason =:= enotconn;
|
|
||||||
Reason =:= closed ->
|
|
||||||
erlang:exit(normal);
|
|
||||||
exit_on_sock_error(timeout) ->
|
|
||||||
erlang:exit({shutdown, ssl_upgrade_timeout});
|
|
||||||
exit_on_sock_error(Reason) ->
|
|
||||||
erlang:exit({shutdown, Reason}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Recv Loop
|
|
||||||
|
|
||||||
recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
|
||||||
receive
|
|
||||||
Msg ->
|
|
||||||
handle_recv(Msg, Parent, State)
|
|
||||||
after
|
|
||||||
IdleTimeout + 100 ->
|
|
||||||
hibernate(Parent, cancel_stats_timer(State))
|
|
||||||
end.
|
|
||||||
|
|
||||||
handle_recv({system, From, Request}, Parent, State) ->
|
|
||||||
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
|
|
||||||
handle_recv({'EXIT', Parent, Reason}, Parent, State) ->
|
|
||||||
%% FIXME: it's not trapping exit, should never receive an EXIT
|
|
||||||
terminate(Reason, State);
|
|
||||||
handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
|
||||||
case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of
|
|
||||||
{ok, NewState} ->
|
|
||||||
?MODULE:recvloop(Parent, NewState);
|
|
||||||
{stop, Reason, NewSate} ->
|
|
||||||
terminate(Reason, NewSate)
|
|
||||||
end.
|
|
||||||
|
|
||||||
hibernate(Parent, State) ->
|
|
||||||
proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).
|
|
||||||
|
|
||||||
%% Maybe do something here later.
|
|
||||||
wakeup_from_hib(Parent, State) ->
|
|
||||||
?MODULE:recvloop(Parent, State).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Ensure/cancel stats timer
|
|
||||||
|
|
||||||
ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
|
|
||||||
State#state{stats_timer = start_timer(Timeout, emit_stats)};
|
|
||||||
ensure_stats_timer(_Timeout, State) -> State.
|
|
||||||
|
|
||||||
cancel_stats_timer(State = #state{stats_timer = TRef})
|
|
||||||
when is_reference(TRef) ->
|
|
||||||
ok = emqx_misc:cancel_timer(TRef),
|
|
||||||
State#state{stats_timer = undefined};
|
|
||||||
cancel_stats_timer(State) -> State.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Process next Msg
|
|
||||||
|
|
||||||
process_msg([], State) ->
|
|
||||||
{ok, State};
|
|
||||||
process_msg([Msg|More], State) ->
|
|
||||||
try
|
|
||||||
case handle_msg(Msg, State) of
|
|
||||||
ok ->
|
|
||||||
process_msg(More, State);
|
|
||||||
{ok, NState} ->
|
|
||||||
process_msg(More, NState);
|
|
||||||
{ok, Msgs, NState} ->
|
|
||||||
process_msg(append_msg(More, Msgs), NState);
|
|
||||||
{stop, Reason, NState} ->
|
|
||||||
{stop, Reason, NState}
|
|
||||||
end
|
|
||||||
catch
|
|
||||||
exit : normal ->
|
|
||||||
{stop, normal, State};
|
|
||||||
exit : shutdown ->
|
|
||||||
{stop, shutdown, State};
|
|
||||||
exit : {shutdown, _} = Shutdown ->
|
|
||||||
{stop, Shutdown, State};
|
|
||||||
Exception : Context : Stack ->
|
|
||||||
{stop, #{exception => Exception,
|
|
||||||
context => Context,
|
|
||||||
stacktrace => Stack}, State}
|
|
||||||
end.
|
|
||||||
|
|
||||||
append_msg([], Msgs) when is_list(Msgs) ->
|
|
||||||
Msgs;
|
|
||||||
append_msg([], Msg) -> [Msg];
|
|
||||||
append_msg(Q, Msgs) when is_list(Msgs) ->
|
|
||||||
lists:append(Q, Msgs);
|
|
||||||
append_msg(Q, Msg) ->
|
|
||||||
lists:append(Q, [Msg]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle a Msg
|
|
||||||
|
|
||||||
handle_msg({'$gen_call', From, Req}, State) ->
|
|
||||||
case handle_call(From, Req, State) of
|
|
||||||
{reply, Reply, NState} ->
|
|
||||||
gen_server:reply(From, Reply),
|
|
||||||
{ok, NState};
|
|
||||||
{reply, Reply, Msgs, NState} ->
|
|
||||||
gen_server:reply(From, Reply),
|
|
||||||
{ok, next_msgs(Msgs), NState};
|
|
||||||
{stop, Reason, Reply, NState} ->
|
|
||||||
gen_server:reply(From, Reply),
|
|
||||||
stop(Reason, NState)
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_msg({'$gen_cast', Req}, State) ->
|
|
||||||
with_channel(handle_cast, [Req], State);
|
|
||||||
|
|
||||||
handle_msg({datagram, _SockPid, Data}, State) ->
|
|
||||||
parse_incoming(Data, State);
|
|
||||||
|
|
||||||
handle_msg({Inet, _Sock, Data}, State)
|
|
||||||
when Inet == tcp;
|
|
||||||
Inet == ssl ->
|
|
||||||
parse_incoming(Data, State);
|
|
||||||
|
|
||||||
handle_msg({incoming, Packet},
|
|
||||||
State = #state{idle_timer = IdleTimer}) ->
|
|
||||||
IdleTimer /= undefined andalso
|
|
||||||
emqx_misc:cancel_timer(IdleTimer),
|
|
||||||
NState = State#state{idle_timer = undefined},
|
|
||||||
handle_incoming(Packet, NState);
|
|
||||||
|
|
||||||
handle_msg({outgoing, Data}, State) ->
|
|
||||||
handle_outgoing(Data, State);
|
|
||||||
|
|
||||||
handle_msg({Error, _Sock, Reason}, State)
|
|
||||||
when Error == tcp_error; Error == ssl_error ->
|
|
||||||
handle_info({sock_error, Reason}, State);
|
|
||||||
|
|
||||||
handle_msg({Closed, _Sock}, State)
|
|
||||||
when Closed == tcp_closed; Closed == ssl_closed ->
|
|
||||||
handle_info({sock_closed, Closed}, close_socket(State));
|
|
||||||
|
|
||||||
%% TODO: udp_passive???
|
|
||||||
handle_msg({Passive, _Sock}, State)
|
|
||||||
when Passive == tcp_passive; Passive == ssl_passive ->
|
|
||||||
%% In Stats
|
|
||||||
Bytes = emqx_pd:reset_counter(incoming_bytes),
|
|
||||||
Pubs = emqx_pd:reset_counter(incoming_pkt),
|
|
||||||
InStats = #{cnt => Pubs, oct => Bytes},
|
|
||||||
%% Ensure Rate Limit
|
|
||||||
NState = ensure_rate_limit(InStats, State),
|
|
||||||
%% Run GC and Check OOM
|
|
||||||
NState1 = check_oom(run_gc(InStats, NState)),
|
|
||||||
handle_info(activate_socket, NState1);
|
|
||||||
|
|
||||||
handle_msg(Deliver = {deliver, _Topic, _Msg},
|
|
||||||
State = #state{active_n = ActiveN}) ->
|
|
||||||
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
|
|
||||||
with_channel(handle_deliver, [Delivers], State);
|
|
||||||
|
|
||||||
%% Something sent
|
|
||||||
%% TODO: Who will deliver this message?
|
|
||||||
handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
|
|
||||||
case emqx_pd:get_counter(outgoing_pkt) > ActiveN of
|
|
||||||
true ->
|
|
||||||
Pubs = emqx_pd:reset_counter(outgoing_pkt),
|
|
||||||
Bytes = emqx_pd:reset_counter(outgoing_bytes),
|
|
||||||
OutStats = #{cnt => Pubs, oct => Bytes},
|
|
||||||
{ok, check_oom(run_gc(OutStats, State))};
|
|
||||||
false -> ok
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
|
|
||||||
handle_info({sock_error, Reason}, State);
|
|
||||||
|
|
||||||
handle_msg({close, Reason}, State) ->
|
|
||||||
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
|
|
||||||
handle_info({sock_closed, Reason}, close_socket(State));
|
|
||||||
|
|
||||||
handle_msg({event, connected}, State = #state{channel = Channel}) ->
|
|
||||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
|
||||||
ClientId = emqx_sn_channel:info(clientid, Channel),
|
|
||||||
emqx_gateway_ctx:insert_channel_info(
|
|
||||||
Ctx,
|
|
||||||
ClientId,
|
|
||||||
info(State),
|
|
||||||
stats(State)
|
|
||||||
);
|
|
||||||
|
|
||||||
handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
|
|
||||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
|
||||||
ClientId = emqx_sn_channel:info(clientid, Channel),
|
|
||||||
emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)),
|
|
||||||
emqx_gateway_ctx:connection_closed(Ctx, ClientId),
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
handle_msg({event, _Other}, State = #state{channel = Channel}) ->
|
|
||||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
|
||||||
ClientId = emqx_sn_channel:info(clientid, Channel),
|
|
||||||
emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)),
|
|
||||||
emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)),
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
handle_msg({timeout, TRef, TMsg}, State) ->
|
|
||||||
handle_timeout(TRef, TMsg, State);
|
|
||||||
|
|
||||||
handle_msg(Shutdown = {shutdown, _Reason}, State) ->
|
|
||||||
stop(Shutdown, State);
|
|
||||||
|
|
||||||
handle_msg(Msg, State) ->
|
|
||||||
handle_info(Msg, State).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Terminate
|
|
||||||
|
|
||||||
-spec terminate(atom(), state()) -> no_return().
|
|
||||||
terminate(Reason, State = #state{channel = Channel}) ->
|
|
||||||
?LOG(debug, "Terminated due to ~p", [Reason]),
|
|
||||||
_ = emqx_sn_channel:terminate(Reason, Channel),
|
|
||||||
_ = close_socket(State),
|
|
||||||
exit(Reason).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Sys callbacks
|
|
||||||
|
|
||||||
system_continue(Parent, _Debug, State) ->
|
|
||||||
recvloop(Parent, State).
|
|
||||||
|
|
||||||
system_terminate(Reason, _Parent, _Debug, State) ->
|
|
||||||
terminate(Reason, State).
|
|
||||||
|
|
||||||
system_code_change(State, _Mod, _OldVsn, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
system_get_state(State) -> {ok, State}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle call
|
|
||||||
|
|
||||||
handle_call(_From, info, State) ->
|
|
||||||
{reply, info(State), State};
|
|
||||||
|
|
||||||
handle_call(_From, stats, State) ->
|
|
||||||
{reply, stats(State), State};
|
|
||||||
|
|
||||||
handle_call(_From, Req, State = #state{channel = Channel}) ->
|
|
||||||
case emqx_sn_channel:handle_call(Req, Channel) of
|
|
||||||
{reply, Reply, NChannel} ->
|
|
||||||
{reply, Reply, State#state{channel = NChannel}};
|
|
||||||
{reply, Reply, Replies, NChannel} ->
|
|
||||||
{reply, Reply, Replies, State#state{channel = NChannel}};
|
|
||||||
{shutdown, Reason, Reply, NChannel} ->
|
|
||||||
shutdown(Reason, Reply, State#state{channel = NChannel})
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle timeout
|
|
||||||
|
|
||||||
handle_timeout(_TRef, idle_timeout, State) ->
|
|
||||||
shutdown(idle_timeout, State);
|
|
||||||
|
|
||||||
handle_timeout(_TRef, limit_timeout, State) ->
|
|
||||||
NState = State#state{sockstate = idle,
|
|
||||||
limit_timer = undefined
|
|
||||||
},
|
|
||||||
handle_info(activate_socket, NState);
|
|
||||||
handle_timeout(TRef, keepalive, State = #state{socket = Socket,
|
|
||||||
channel = Channel})->
|
|
||||||
case emqx_sn_channel:info(conn_state, Channel) of
|
|
||||||
disconnected -> {ok, State};
|
|
||||||
_ ->
|
|
||||||
case esockd_getstat(Socket, [recv_oct]) of
|
|
||||||
{ok, [{recv_oct, RecvOct}]} ->
|
|
||||||
handle_timeout(TRef, {keepalive, RecvOct}, State);
|
|
||||||
{error, Reason} ->
|
|
||||||
handle_info({sock_error, Reason}, State)
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
handle_timeout(_TRef, emit_stats, State =
|
|
||||||
#state{channel = Channel}) ->
|
|
||||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
|
||||||
ClientId = emqx_sn_channel:info(clientid, Channel),
|
|
||||||
emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)),
|
|
||||||
{ok, State#state{stats_timer = undefined}};
|
|
||||||
|
|
||||||
handle_timeout(TRef, Msg, State) ->
|
|
||||||
with_channel(handle_timeout, [TRef, Msg], State).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Parse incoming data
|
|
||||||
|
|
||||||
parse_incoming(Data, State = #state{channel = Channel}) ->
|
|
||||||
?LOG(debug, "RECV ~0p", [Data]),
|
|
||||||
Oct = iolist_size(Data),
|
|
||||||
inc_counter(incoming_bytes, Oct),
|
|
||||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
|
||||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.received', Oct),
|
|
||||||
{Packets, NState} = parse_incoming(Data, [], State),
|
|
||||||
{ok, next_incoming_msgs(Packets), NState}.
|
|
||||||
|
|
||||||
parse_incoming(<<>>, Packets, State) ->
|
|
||||||
{Packets, State};
|
|
||||||
|
|
||||||
parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
|
||||||
try emqx_sn_frame:parse(Data, ParseState) of
|
|
||||||
{more, NParseState} ->
|
|
||||||
{Packets, State#state{parse_state = NParseState}};
|
|
||||||
{ok, Packet, Rest, NParseState} ->
|
|
||||||
NState = State#state{parse_state = NParseState},
|
|
||||||
parse_incoming(Rest, [Packet|Packets], NState)
|
|
||||||
catch
|
|
||||||
error:Reason:Stk ->
|
|
||||||
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p",
|
|
||||||
[Reason, Stk, Data]),
|
|
||||||
{[{frame_error, Reason}|Packets], State}
|
|
||||||
end.
|
|
||||||
|
|
||||||
next_incoming_msgs([Packet]) ->
|
|
||||||
{incoming, Packet};
|
|
||||||
next_incoming_msgs(Packets) ->
|
|
||||||
[{incoming, Packet} || Packet <- lists:reverse(Packets)].
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle incoming packet
|
|
||||||
|
|
||||||
handle_incoming(Packet, State) ->
|
|
||||||
ok = inc_incoming_stats(Packet),
|
|
||||||
?LOG(debug, "RECV ~s", [emqx_sn_frame:format(Packet)]),
|
|
||||||
with_channel(handle_in, [Packet], State).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% With Channel
|
|
||||||
|
|
||||||
with_channel(Fun, Args, State = #state{channel = Channel}) ->
|
|
||||||
case erlang:apply(emqx_sn_channel, Fun, Args ++ [Channel]) of
|
|
||||||
ok -> {ok, State};
|
|
||||||
{ok, NChannel} ->
|
|
||||||
{ok, State#state{channel = NChannel}};
|
|
||||||
{ok, Replies, NChannel} ->
|
|
||||||
{ok, next_msgs(Replies), State#state{channel = NChannel}};
|
|
||||||
{shutdown, Reason, NChannel} ->
|
|
||||||
shutdown(Reason, State#state{channel = NChannel});
|
|
||||||
{shutdown, Reason, Packet, NChannel} ->
|
|
||||||
NState = State#state{channel = NChannel},
|
|
||||||
ok = handle_outgoing(Packet, NState),
|
|
||||||
shutdown(Reason, NState)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle outgoing packets
|
|
||||||
|
|
||||||
handle_outgoing(Packets, State) when is_list(Packets) ->
|
|
||||||
send(lists:map(serialize_and_inc_stats_fun(State), Packets), State);
|
|
||||||
|
|
||||||
handle_outgoing(Packet, State) ->
|
|
||||||
send((serialize_and_inc_stats_fun(State))(Packet), State).
|
|
||||||
|
|
||||||
serialize_and_inc_stats_fun(#state{serialize = Serialize, channel = Channel}) ->
|
|
||||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
|
||||||
fun(Packet) ->
|
|
||||||
case emqx_sn_frame:serialize_pkt(Packet, Serialize) of
|
|
||||||
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
|
|
||||||
[emqx_sn_frame:format(Packet)]),
|
|
||||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'),
|
|
||||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
|
|
||||||
<<>>;
|
|
||||||
Data -> ?LOG(debug, "SEND ~s", [emqx_sn_frame:format(Packet)]),
|
|
||||||
ok = inc_outgoing_stats(Packet),
|
|
||||||
Data
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Send data
|
|
||||||
|
|
||||||
-spec(send(iodata(), state()) -> ok).
|
|
||||||
send(IoData, State = #state{socket = Socket, channel = Channel}) ->
|
|
||||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
|
||||||
Oct = iolist_size(IoData),
|
|
||||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.sent', Oct),
|
|
||||||
inc_counter(outgoing_bytes, Oct),
|
|
||||||
case esockd_send(IoData, State) of
|
|
||||||
ok -> ok;
|
|
||||||
Error = {error, _Reason} ->
|
|
||||||
%% Send an inet_reply to postpone handling the error
|
|
||||||
self() ! {inet_reply, Socket, Error},
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle Info
|
|
||||||
|
|
||||||
handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
|
|
||||||
case activate_socket(State) of
|
|
||||||
{ok, NState = #state{sockstate = NewSst}} ->
|
|
||||||
if OldSst =/= NewSst ->
|
|
||||||
{ok, {event, NewSst}, NState};
|
|
||||||
true -> {ok, NState}
|
|
||||||
end;
|
|
||||||
{error, Reason} ->
|
|
||||||
handle_info({sock_error, Reason}, State)
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_info({sock_error, Reason}, State) ->
|
|
||||||
?LOG(debug, "Socket error: ~p", [Reason]),
|
|
||||||
handle_info({sock_closed, Reason}, close_socket(State));
|
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
|
||||||
with_channel(handle_info, [Info], State).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Ensure rate limit
|
|
||||||
|
|
||||||
ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
|
||||||
case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of
|
|
||||||
false -> State;
|
|
||||||
{ok, Limiter1} ->
|
|
||||||
State#state{limiter = Limiter1};
|
|
||||||
{pause, Time, Limiter1} ->
|
|
||||||
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
|
|
||||||
TRef = start_timer(Time, limit_timeout),
|
|
||||||
State#state{sockstate = blocked,
|
|
||||||
limiter = Limiter1,
|
|
||||||
limit_timer = TRef
|
|
||||||
}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Run GC and Check OOM
|
|
||||||
|
|
||||||
run_gc(Stats, State = #state{gc_state = GcSt}) ->
|
|
||||||
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
|
|
||||||
false -> State;
|
|
||||||
{_IsGC, GcSt1} ->
|
|
||||||
State#state{gc_state = GcSt1}
|
|
||||||
end.
|
|
||||||
|
|
||||||
check_oom(State = #state{oom_policy = OomPolicy}) ->
|
|
||||||
case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
|
|
||||||
{shutdown, Reason} ->
|
|
||||||
%% triggers terminate/2 callback immediately
|
|
||||||
erlang:exit({shutdown, Reason});
|
|
||||||
_Other -> ok
|
|
||||||
end,
|
|
||||||
State.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Activate Socket
|
|
||||||
|
|
||||||
activate_socket(State = #state{sockstate = closed}) ->
|
|
||||||
{ok, State};
|
|
||||||
activate_socket(State = #state{sockstate = blocked}) ->
|
|
||||||
{ok, State};
|
|
||||||
activate_socket(State = #state{socket = Socket,
|
|
||||||
active_n = N}) ->
|
|
||||||
%% FIXME: Works on dtls/udp ???
|
|
||||||
%% How to hanlde buffer?
|
|
||||||
case esockd_setopts(Socket, [{active, N}]) of
|
|
||||||
ok -> {ok, State#state{sockstate = running}};
|
|
||||||
Error -> Error
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Close Socket
|
|
||||||
|
|
||||||
close_socket(State = #state{sockstate = closed}) -> State;
|
|
||||||
close_socket(State = #state{socket = Socket}) ->
|
|
||||||
ok = esockd_close(Socket),
|
|
||||||
State#state{sockstate = closed}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Inc incoming/outgoing stats
|
|
||||||
|
|
||||||
%% XXX: How to stats?
|
|
||||||
inc_incoming_stats(_Packet) ->
|
|
||||||
inc_counter(recv_pkt, 1),
|
|
||||||
ok.
|
|
||||||
%case Type =:= ?CMD_SEND of
|
|
||||||
% true ->
|
|
||||||
% inc_counter(recv_msg, 1),
|
|
||||||
% inc_counter(incoming_pubs, 1);
|
|
||||||
% false ->
|
|
||||||
% ok
|
|
||||||
%end,
|
|
||||||
%emqx_metrics:inc_recv(Packet).
|
|
||||||
|
|
||||||
inc_outgoing_stats(_Packet) ->
|
|
||||||
inc_counter(send_pkt, 1),
|
|
||||||
ok.
|
|
||||||
%case Type =:= ?CMD_MESSAGE of
|
|
||||||
% true ->
|
|
||||||
% inc_counter(send_msg, 1),
|
|
||||||
% inc_counter(outgoing_pubs, 1);
|
|
||||||
% false ->
|
|
||||||
% ok
|
|
||||||
%end,
|
|
||||||
%emqx_metrics:inc_sent(Packet).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Helper functions
|
|
||||||
|
|
||||||
-compile({inline, [next_msgs/1]}).
|
|
||||||
next_msgs(Event) when is_tuple(Event) ->
|
|
||||||
Event;
|
|
||||||
next_msgs(More) when is_list(More) ->
|
|
||||||
More.
|
|
||||||
|
|
||||||
-compile({inline, [shutdown/2, shutdown/3]}).
|
|
||||||
shutdown(Reason, State) ->
|
|
||||||
stop({shutdown, Reason}, State).
|
|
||||||
|
|
||||||
shutdown(Reason, Reply, State) ->
|
|
||||||
stop({shutdown, Reason}, Reply, State).
|
|
||||||
|
|
||||||
-compile({inline, [stop/2, stop/3]}).
|
|
||||||
stop(Reason, State) ->
|
|
||||||
{stop, Reason, State}.
|
|
||||||
|
|
||||||
stop(Reason, Reply, State) ->
|
|
||||||
{stop, Reason, Reply, State}.
|
|
||||||
|
|
||||||
inc_counter(Name, Value) ->
|
|
||||||
_ = emqx_pd:inc_counter(Name, Value),
|
|
||||||
ok.
|
|
|
@ -15,8 +15,11 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc The frame parser for MQTT-SN protocol
|
||||||
-module(emqx_sn_frame).
|
-module(emqx_sn_frame).
|
||||||
|
|
||||||
|
-behavior(emqx_gateway_frame).
|
||||||
|
|
||||||
-include("src/mqttsn/include/emqx_sn.hrl").
|
-include("src/mqttsn/include/emqx_sn.hrl").
|
||||||
|
|
||||||
-export([ initial_parse_state/1
|
-export([ initial_parse_state/1
|
||||||
|
|
|
@ -128,8 +128,13 @@ start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
|
||||||
|
|
||||||
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
|
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
|
||||||
Name = name(InstaId, Type),
|
Name = name(InstaId, Type),
|
||||||
|
NCfg = Cfg#{
|
||||||
|
ctx => Ctx,
|
||||||
|
frame_mod => emqx_sn_frame,
|
||||||
|
chann_mod => emqx_sn_channel
|
||||||
|
},
|
||||||
esockd:open_udp(Name, ListenOn, merge_default(SocketOpts),
|
esockd:open_udp(Name, ListenOn, merge_default(SocketOpts),
|
||||||
{emqx_sn_conn, start_link, [Cfg#{ctx => Ctx}]}).
|
{emqx_gateway_conn, start_link, [NCfg]}).
|
||||||
|
|
||||||
name(InstaId, Type) ->
|
name(InstaId, Type) ->
|
||||||
list_to_atom(lists:concat([InstaId, ":", Type])).
|
list_to_atom(lists:concat([InstaId, ":", Type])).
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
|
|
||||||
-module(emqx_stomp_channel).
|
-module(emqx_stomp_channel).
|
||||||
|
|
||||||
|
-behavior(emqx_gateway_channel).
|
||||||
|
|
||||||
-include("src/stomp/include/emqx_stomp.hrl").
|
-include("src/stomp/include/emqx_stomp.hrl").
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
@ -596,6 +598,16 @@ handle_call(Req, Channel) ->
|
||||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||||
reply(ignored, Channel).
|
reply(ignored, Channel).
|
||||||
|
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Handle cast
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec handle_cast(Req :: term(), channel())
|
||||||
|
-> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
|
||||||
|
handle_cast(_Req, Channel) ->
|
||||||
|
{ok, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle Info
|
%% Handle Info
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -1,905 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_stomp_connection).
|
|
||||||
|
|
||||||
-include("src/stomp/include/emqx_stomp.hrl").
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
||||||
|
|
||||||
-logger_header("[Stomp-Conn]").
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([ start_link/3
|
|
||||||
, stop/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([ info/1
|
|
||||||
, stats/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([ async_set_keepalive/3
|
|
||||||
, async_set_keepalive/4
|
|
||||||
, async_set_socket_options/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([ call/2
|
|
||||||
, call/3
|
|
||||||
, cast/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
%% Callback
|
|
||||||
-export([init/4]).
|
|
||||||
|
|
||||||
%% Sys callbacks
|
|
||||||
-export([ system_continue/3
|
|
||||||
, system_terminate/4
|
|
||||||
, system_code_change/4
|
|
||||||
, system_get_state/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
%% Internal callback
|
|
||||||
-export([wakeup_from_hib/2, recvloop/2, get_state/1]).
|
|
||||||
|
|
||||||
%% Export for CT
|
|
||||||
-export([set_field/3]).
|
|
||||||
|
|
||||||
-import(emqx_misc,
|
|
||||||
[ maybe_apply/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
-record(state, {
|
|
||||||
%% TCP/TLS Transport
|
|
||||||
transport :: esockd:transport(),
|
|
||||||
%% TCP/TLS Socket
|
|
||||||
socket :: esockd:socket(),
|
|
||||||
%% Peername of the connection
|
|
||||||
peername :: emqx_types:peername(),
|
|
||||||
%% Sockname of the connection
|
|
||||||
sockname :: emqx_types:peername(),
|
|
||||||
%% Sock State
|
|
||||||
sockstate :: emqx_types:sockstate(),
|
|
||||||
%% The {active, N} option
|
|
||||||
active_n :: pos_integer(),
|
|
||||||
%% Limiter
|
|
||||||
limiter :: emqx_limiter:limiter() | undefined,
|
|
||||||
%% Limit Timer
|
|
||||||
limit_timer :: reference() | undefined,
|
|
||||||
%% Parse State
|
|
||||||
parse_state :: emqx_stomp_frame:parse_state(),
|
|
||||||
%% Serialize options
|
|
||||||
serialize :: emqx_stomp_frame:serialize_opts(),
|
|
||||||
%% Channel State
|
|
||||||
channel :: emqx_stomp_channel:channel(),
|
|
||||||
%% GC State
|
|
||||||
gc_state :: emqx_gc:gc_state() | undefined,
|
|
||||||
%% Stats Timer
|
|
||||||
stats_timer :: disabled | reference() | undefined,
|
|
||||||
%% Idle Timeout
|
|
||||||
idle_timeout :: integer(),
|
|
||||||
%% Idle Timer
|
|
||||||
idle_timer :: reference() | undefined,
|
|
||||||
%% OOM Policy
|
|
||||||
oom_policy :: emqx_types:oom_policy() | undefined
|
|
||||||
}).
|
|
||||||
|
|
||||||
-type(state() :: #state{}).
|
|
||||||
|
|
||||||
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
|
|
||||||
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
|
|
||||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
|
|
||||||
|
|
||||||
-define(ENABLED(X), (X =/= undefined)).
|
|
||||||
|
|
||||||
%-define(ALARM_TCP_CONGEST(Channel),
|
|
||||||
% list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s",
|
|
||||||
% [emqx_stomp_channel:info(clientid, Channel),
|
|
||||||
% emqx_stomp_channel:info(username, Channel)]))).
|
|
||||||
%-define(ALARM_CONN_INFO_KEYS, [
|
|
||||||
% socktype, sockname, peername,
|
|
||||||
% clientid, username, proto_name, proto_ver, connected_at
|
|
||||||
%]).
|
|
||||||
%-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
|
|
||||||
%-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
|
|
||||||
|
|
||||||
-dialyzer({no_match, [info/2]}).
|
|
||||||
-dialyzer({nowarn_function, [ init/4
|
|
||||||
, init_state/3
|
|
||||||
, run_loop/2
|
|
||||||
, system_terminate/4
|
|
||||||
, system_code_change/4
|
|
||||||
]}).
|
|
||||||
|
|
||||||
-dialyzer({no_match, [ handle_call/3
|
|
||||||
, serialize_and_inc_stats_fun/1
|
|
||||||
]}).
|
|
||||||
|
|
||||||
-spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist())
|
|
||||||
-> {ok, pid()}).
|
|
||||||
start_link(Transport, Socket, Options) ->
|
|
||||||
Args = [self(), Transport, Socket, Options],
|
|
||||||
CPid = proc_lib:spawn_link(?MODULE, init, Args),
|
|
||||||
{ok, CPid}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% API
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @doc Get infos of the connection/channel.
|
|
||||||
-spec(info(pid()|state()) -> emqx_types:infos()).
|
|
||||||
info(CPid) when is_pid(CPid) ->
|
|
||||||
call(CPid, info);
|
|
||||||
info(State = #state{channel = Channel}) ->
|
|
||||||
ChanInfo = emqx_stomp_channel:info(Channel),
|
|
||||||
SockInfo = maps:from_list(
|
|
||||||
info(?INFO_KEYS, State)),
|
|
||||||
ChanInfo#{sockinfo => SockInfo}.
|
|
||||||
|
|
||||||
info(Keys, State) when is_list(Keys) ->
|
|
||||||
[{Key, info(Key, State)} || Key <- Keys];
|
|
||||||
info(socktype, #state{transport = Transport, socket = Socket}) ->
|
|
||||||
Transport:type(Socket);
|
|
||||||
info(peername, #state{peername = Peername}) ->
|
|
||||||
Peername;
|
|
||||||
info(sockname, #state{sockname = Sockname}) ->
|
|
||||||
Sockname;
|
|
||||||
info(sockstate, #state{sockstate = SockSt}) ->
|
|
||||||
SockSt;
|
|
||||||
info(active_n, #state{active_n = ActiveN}) ->
|
|
||||||
ActiveN;
|
|
||||||
info(stats_timer, #state{stats_timer = StatsTimer}) ->
|
|
||||||
StatsTimer;
|
|
||||||
info(limit_timer, #state{limit_timer = LimitTimer}) ->
|
|
||||||
LimitTimer;
|
|
||||||
info(limiter, #state{limiter = Limiter}) ->
|
|
||||||
maybe_apply(fun emqx_limiter:info/1, Limiter).
|
|
||||||
|
|
||||||
%% @doc Get stats of the connection/channel.
|
|
||||||
-spec(stats(pid()|state()) -> emqx_types:stats()).
|
|
||||||
stats(CPid) when is_pid(CPid) ->
|
|
||||||
call(CPid, stats);
|
|
||||||
stats(#state{transport = Transport,
|
|
||||||
socket = Socket,
|
|
||||||
channel = Channel}) ->
|
|
||||||
SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of
|
|
||||||
{ok, Ss} -> Ss;
|
|
||||||
{error, _} -> []
|
|
||||||
end,
|
|
||||||
ConnStats = emqx_pd:get_counters(?CONN_STATS),
|
|
||||||
ChanStats = emqx_stomp_channel:stats(Channel),
|
|
||||||
ProcStats = emqx_misc:proc_stats(),
|
|
||||||
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
|
|
||||||
|
|
||||||
%% @doc Set TCP keepalive socket options to override system defaults.
|
|
||||||
%% Idle: The number of seconds a connection needs to be idle before
|
|
||||||
%% TCP begins sending out keep-alive probes (Linux default 7200).
|
|
||||||
%% Interval: The number of seconds between TCP keep-alive probes
|
|
||||||
%% (Linux default 75).
|
|
||||||
%% Probes: The maximum number of TCP keep-alive probes to send before
|
|
||||||
%% giving up and killing the connection if no response is
|
|
||||||
%% obtained from the other end (Linux default 9).
|
|
||||||
%%
|
|
||||||
%% NOTE: This API sets TCP socket options, which has nothing to do with
|
|
||||||
%% the MQTT layer's keepalive (PINGREQ and PINGRESP).
|
|
||||||
async_set_keepalive(Idle, Interval, Probes) ->
|
|
||||||
async_set_keepalive(self(), Idle, Interval, Probes).
|
|
||||||
|
|
||||||
async_set_keepalive(Pid, Idle, Interval, Probes) ->
|
|
||||||
Options = [ {keepalive, true}
|
|
||||||
, {raw, 6, 4, <<Idle:32/native>>}
|
|
||||||
, {raw, 6, 5, <<Interval:32/native>>}
|
|
||||||
, {raw, 6, 6, <<Probes:32/native>>}
|
|
||||||
],
|
|
||||||
async_set_socket_options(Pid, Options).
|
|
||||||
|
|
||||||
%% @doc Set custom socket options.
|
|
||||||
%% This API is made async because the call might be originated from
|
|
||||||
%% a hookpoint callback (otherwise deadlock).
|
|
||||||
%% If failed to set, the error message is logged.
|
|
||||||
async_set_socket_options(Pid, Options) ->
|
|
||||||
cast(Pid, {async_set_socket_options, Options}).
|
|
||||||
|
|
||||||
cast(Pid, Req) ->
|
|
||||||
gen_server:cast(Pid, Req).
|
|
||||||
|
|
||||||
call(Pid, Req) ->
|
|
||||||
call(Pid, Req, infinity).
|
|
||||||
call(Pid, Req, Timeout) ->
|
|
||||||
gen_server:call(Pid, Req, Timeout).
|
|
||||||
|
|
||||||
stop(Pid) ->
|
|
||||||
gen_server:stop(Pid).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% callbacks
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
init(Parent, Transport, RawSocket, Options) ->
|
|
||||||
case Transport:wait(RawSocket) of
|
|
||||||
{ok, Socket} ->
|
|
||||||
run_loop(Parent, init_state(Transport, Socket, Options));
|
|
||||||
{error, Reason} ->
|
|
||||||
ok = Transport:fast_close(RawSocket),
|
|
||||||
exit_on_sock_error(Reason)
|
|
||||||
end.
|
|
||||||
|
|
||||||
init_state(Transport, Socket, Options) ->
|
|
||||||
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
|
|
||||||
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
|
|
||||||
Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
|
|
||||||
ConnInfo = #{socktype => Transport:type(Socket),
|
|
||||||
peername => Peername,
|
|
||||||
sockname => Sockname,
|
|
||||||
peercert => Peercert,
|
|
||||||
conn_mod => ?MODULE
|
|
||||||
},
|
|
||||||
ActiveN = emqx_gateway_utils:active_n(Options),
|
|
||||||
%% TODO: RateLimit ? How ?
|
|
||||||
Limiter = undefined,
|
|
||||||
%RateLimit = emqx_gateway_utils:ratelimit(Options),
|
|
||||||
%%Limiter = emqx_limiter:init(Zone, RateLimit),
|
|
||||||
FrameOpts = emqx_gateway_utils:frame_options(Options),
|
|
||||||
ParseState = emqx_stomp_frame:initial_parse_state(FrameOpts),
|
|
||||||
Serialize = emqx_stomp_frame:serialize_opts(),
|
|
||||||
Channel = emqx_stomp_channel:init(ConnInfo, Options),
|
|
||||||
GcState = emqx_gateway_utils:init_gc_state(Options),
|
|
||||||
StatsTimer = emqx_gateway_utils:stats_timer(Options),
|
|
||||||
IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
|
|
||||||
OomPolicy = emqx_gateway_utils:oom_policy(Options),
|
|
||||||
IdleTimer = emqx_misc:start_timer(IdleTimeout, idle_timeout),
|
|
||||||
#state{transport = Transport,
|
|
||||||
socket = Socket,
|
|
||||||
peername = Peername,
|
|
||||||
sockname = Sockname,
|
|
||||||
sockstate = idle,
|
|
||||||
active_n = ActiveN,
|
|
||||||
limiter = Limiter,
|
|
||||||
parse_state = ParseState,
|
|
||||||
serialize = Serialize,
|
|
||||||
channel = Channel,
|
|
||||||
gc_state = GcState,
|
|
||||||
stats_timer = StatsTimer,
|
|
||||||
idle_timeout = IdleTimeout,
|
|
||||||
idle_timer = IdleTimer,
|
|
||||||
oom_policy = OomPolicy
|
|
||||||
}.
|
|
||||||
|
|
||||||
run_loop(Parent, State = #state{transport = Transport,
|
|
||||||
socket = Socket,
|
|
||||||
peername = Peername,
|
|
||||||
oom_policy = OomPolicy}) ->
|
|
||||||
emqx_logger:set_metadata_peername(esockd:format(Peername)),
|
|
||||||
_ = emqx_misc:tune_heap_size(OomPolicy),
|
|
||||||
case activate_socket(State) of
|
|
||||||
{ok, NState} -> hibernate(Parent, NState);
|
|
||||||
{error, Reason} ->
|
|
||||||
ok = Transport:fast_close(Socket),
|
|
||||||
exit_on_sock_error(Reason)
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec exit_on_sock_error(any()) -> no_return().
|
|
||||||
exit_on_sock_error(Reason) when Reason =:= einval;
|
|
||||||
Reason =:= enotconn;
|
|
||||||
Reason =:= closed ->
|
|
||||||
erlang:exit(normal);
|
|
||||||
exit_on_sock_error(timeout) ->
|
|
||||||
erlang:exit({shutdown, ssl_upgrade_timeout});
|
|
||||||
exit_on_sock_error(Reason) ->
|
|
||||||
erlang:exit({shutdown, Reason}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Recv Loop
|
|
||||||
|
|
||||||
recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
|
||||||
receive
|
|
||||||
Msg ->
|
|
||||||
handle_recv(Msg, Parent, State)
|
|
||||||
after
|
|
||||||
IdleTimeout + 100 ->
|
|
||||||
hibernate(Parent, cancel_stats_timer(State))
|
|
||||||
end.
|
|
||||||
|
|
||||||
handle_recv({system, From, Request}, Parent, State) ->
|
|
||||||
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
|
|
||||||
handle_recv({'EXIT', Parent, Reason}, Parent, State) ->
|
|
||||||
%% FIXME: it's not trapping exit, should never receive an EXIT
|
|
||||||
terminate(Reason, State);
|
|
||||||
handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
|
||||||
case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of
|
|
||||||
{ok, NewState} ->
|
|
||||||
?MODULE:recvloop(Parent, NewState);
|
|
||||||
{stop, Reason, NewSate} ->
|
|
||||||
terminate(Reason, NewSate)
|
|
||||||
end.
|
|
||||||
|
|
||||||
hibernate(Parent, State) ->
|
|
||||||
proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).
|
|
||||||
|
|
||||||
%% Maybe do something here later.
|
|
||||||
wakeup_from_hib(Parent, State) ->
|
|
||||||
?MODULE:recvloop(Parent, State).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Ensure/cancel stats timer
|
|
||||||
|
|
||||||
ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
|
|
||||||
State#state{stats_timer = emqx_misc:start_timer(Timeout, emit_stats)};
|
|
||||||
ensure_stats_timer(_Timeout, State) -> State.
|
|
||||||
|
|
||||||
cancel_stats_timer(State = #state{stats_timer = TRef})
|
|
||||||
when is_reference(TRef) ->
|
|
||||||
?tp(debug, cancel_stats_timer, #{}),
|
|
||||||
ok = emqx_misc:cancel_timer(TRef),
|
|
||||||
State#state{stats_timer = undefined};
|
|
||||||
cancel_stats_timer(State) -> State.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Process next Msg
|
|
||||||
|
|
||||||
process_msg([], State) ->
|
|
||||||
{ok, State};
|
|
||||||
process_msg([Msg|More], State) ->
|
|
||||||
try
|
|
||||||
case handle_msg(Msg, State) of
|
|
||||||
ok ->
|
|
||||||
process_msg(More, State);
|
|
||||||
{ok, NState} ->
|
|
||||||
process_msg(More, NState);
|
|
||||||
{ok, Msgs, NState} ->
|
|
||||||
process_msg(append_msg(More, Msgs), NState);
|
|
||||||
{stop, Reason, NState} ->
|
|
||||||
{stop, Reason, NState}
|
|
||||||
end
|
|
||||||
catch
|
|
||||||
exit : normal ->
|
|
||||||
{stop, normal, State};
|
|
||||||
exit : shutdown ->
|
|
||||||
{stop, shutdown, State};
|
|
||||||
exit : {shutdown, _} = Shutdown ->
|
|
||||||
{stop, Shutdown, State};
|
|
||||||
Exception : Context : Stack ->
|
|
||||||
{stop, #{exception => Exception,
|
|
||||||
context => Context,
|
|
||||||
stacktrace => Stack}, State}
|
|
||||||
end.
|
|
||||||
|
|
||||||
append_msg([], Msgs) when is_list(Msgs) ->
|
|
||||||
Msgs;
|
|
||||||
append_msg([], Msg) -> [Msg];
|
|
||||||
append_msg(Q, Msgs) when is_list(Msgs) ->
|
|
||||||
lists:append(Q, Msgs);
|
|
||||||
append_msg(Q, Msg) ->
|
|
||||||
lists:append(Q, [Msg]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle a Msg
|
|
||||||
|
|
||||||
handle_msg({'$gen_call', From, Req}, State) ->
|
|
||||||
case handle_call(From, Req, State) of
|
|
||||||
{reply, Reply, NState} ->
|
|
||||||
gen_server:reply(From, Reply),
|
|
||||||
{ok, NState};
|
|
||||||
{stop, Reason, Reply, NState} ->
|
|
||||||
gen_server:reply(From, Reply),
|
|
||||||
stop(Reason, NState)
|
|
||||||
end;
|
|
||||||
handle_msg({'$gen_cast', Req}, State) ->
|
|
||||||
NewState = handle_cast(Req, State),
|
|
||||||
{ok, NewState};
|
|
||||||
|
|
||||||
handle_msg({Inet, _Sock, Data}, State = #state{channel = Channel})
|
|
||||||
when Inet == tcp;
|
|
||||||
Inet == ssl ->
|
|
||||||
?LOG(debug, "RECV ~0p", [Data]),
|
|
||||||
Oct = iolist_size(Data),
|
|
||||||
inc_counter(incoming_bytes, Oct),
|
|
||||||
Ctx = emqx_stomp_channel:info(ctx, Channel),
|
|
||||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.received', Oct),
|
|
||||||
parse_incoming(Data, State);
|
|
||||||
|
|
||||||
handle_msg({incoming, Packet}, State = #state{idle_timer = undefined}) ->
|
|
||||||
handle_incoming(Packet, State);
|
|
||||||
|
|
||||||
handle_msg({incoming, Packet},
|
|
||||||
State = #state{idle_timer = IdleTimer}) ->
|
|
||||||
ok = emqx_misc:cancel_timer(IdleTimer),
|
|
||||||
%% XXX: Serialize with input packets
|
|
||||||
%%Serialize = emqx_stomp_frame:serialize_opts(),
|
|
||||||
NState = State#state{idle_timer = undefined},
|
|
||||||
handle_incoming(Packet, NState);
|
|
||||||
|
|
||||||
handle_msg({outgoing, Packets}, State) ->
|
|
||||||
handle_outgoing(Packets, State);
|
|
||||||
|
|
||||||
handle_msg({Error, _Sock, Reason}, State)
|
|
||||||
when Error == tcp_error; Error == ssl_error ->
|
|
||||||
handle_info({sock_error, Reason}, State);
|
|
||||||
|
|
||||||
handle_msg({Closed, _Sock}, State)
|
|
||||||
when Closed == tcp_closed; Closed == ssl_closed ->
|
|
||||||
handle_info({sock_closed, Closed}, close_socket(State));
|
|
||||||
|
|
||||||
handle_msg({Passive, _Sock}, State)
|
|
||||||
when Passive == tcp_passive; Passive == ssl_passive ->
|
|
||||||
%% In Stats
|
|
||||||
Pubs = emqx_pd:reset_counter(incoming_pubs),
|
|
||||||
Bytes = emqx_pd:reset_counter(incoming_bytes),
|
|
||||||
InStats = #{cnt => Pubs, oct => Bytes},
|
|
||||||
%% Ensure Rate Limit
|
|
||||||
NState = ensure_rate_limit(InStats, State),
|
|
||||||
%% Run GC and Check OOM
|
|
||||||
NState1 = check_oom(run_gc(InStats, NState)),
|
|
||||||
handle_info(activate_socket, NState1);
|
|
||||||
|
|
||||||
handle_msg(Deliver = {deliver, _Topic, _Msg},
|
|
||||||
#state{active_n = ActiveN} = State) ->
|
|
||||||
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
|
|
||||||
with_channel(handle_deliver, [Delivers], State);
|
|
||||||
|
|
||||||
%% Something sent
|
|
||||||
handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
|
|
||||||
case emqx_pd:get_counter(outgoing_pubs) > ActiveN of
|
|
||||||
true ->
|
|
||||||
Pubs = emqx_pd:reset_counter(outgoing_pubs),
|
|
||||||
Bytes = emqx_pd:reset_counter(outgoing_bytes),
|
|
||||||
OutStats = #{cnt => Pubs, oct => Bytes},
|
|
||||||
{ok, run_gc(OutStats, State)};
|
|
||||||
%% FIXME: check oom ???
|
|
||||||
%%{ok, check_oom(run_gc(OutStats, State))};
|
|
||||||
false -> ok
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
|
|
||||||
handle_info({sock_error, Reason}, State);
|
|
||||||
|
|
||||||
handle_msg({connack, ConnAck}, State) ->
|
|
||||||
handle_outgoing(ConnAck, State);
|
|
||||||
|
|
||||||
handle_msg({close, Reason}, State) ->
|
|
||||||
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
|
|
||||||
handle_info({sock_closed, Reason}, close_socket(State));
|
|
||||||
|
|
||||||
handle_msg({event, connected}, State = #state{channel = Channel}) ->
|
|
||||||
Ctx = emqx_stomp_channel:info(ctx, Channel),
|
|
||||||
ClientId = emqx_stomp_channel:info(clientid, Channel),
|
|
||||||
emqx_gateway_ctx:insert_channel_info(
|
|
||||||
Ctx,
|
|
||||||
ClientId,
|
|
||||||
info(State),
|
|
||||||
stats(State)
|
|
||||||
);
|
|
||||||
|
|
||||||
handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
|
|
||||||
Ctx = emqx_stomp_channel:info(ctx, Channel),
|
|
||||||
ClientId = emqx_stomp_channel:info(clientid, Channel),
|
|
||||||
emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)),
|
|
||||||
emqx_gateway_ctx:connection_closed(Ctx, ClientId),
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
handle_msg({event, _Other}, State = #state{channel = Channel}) ->
|
|
||||||
Ctx = emqx_stomp_channel:info(ctx, Channel),
|
|
||||||
ClientId = emqx_stomp_channel:info(clientid, Channel),
|
|
||||||
emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)),
|
|
||||||
emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)),
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
handle_msg({timeout, TRef, TMsg}, State) ->
|
|
||||||
handle_timeout(TRef, TMsg, State);
|
|
||||||
|
|
||||||
handle_msg(Shutdown = {shutdown, _Reason}, State) ->
|
|
||||||
stop(Shutdown, State);
|
|
||||||
|
|
||||||
handle_msg(Msg, State) ->
|
|
||||||
handle_info(Msg, State).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Terminate
|
|
||||||
|
|
||||||
-spec terminate(any(), state()) -> no_return().
|
|
||||||
terminate(Reason, State = #state{channel = Channel, transport = _Transport,
|
|
||||||
socket = _Socket}) ->
|
|
||||||
try
|
|
||||||
Channel1 = emqx_stomp_channel:set_conn_state(disconnected, Channel),
|
|
||||||
%emqx_congestion:cancel_alarms(Socket, Transport, Channel1),
|
|
||||||
emqx_stomp_channel:terminate(Reason, Channel1),
|
|
||||||
close_socket_ok(State)
|
|
||||||
catch
|
|
||||||
E : C : S ->
|
|
||||||
?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
|
|
||||||
end,
|
|
||||||
?tp(info, terminate, #{reason => Reason}),
|
|
||||||
maybe_raise_excption(Reason).
|
|
||||||
|
|
||||||
%% close socket, discard new state, always return ok.
|
|
||||||
close_socket_ok(State) ->
|
|
||||||
_ = close_socket(State),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% tell truth about the original exception
|
|
||||||
maybe_raise_excption(#{exception := Exception,
|
|
||||||
context := Context,
|
|
||||||
stacktrace := Stacktrace
|
|
||||||
}) ->
|
|
||||||
erlang:raise(Exception, Context, Stacktrace);
|
|
||||||
maybe_raise_excption(Reason) ->
|
|
||||||
exit(Reason).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Sys callbacks
|
|
||||||
|
|
||||||
system_continue(Parent, _Debug, State) ->
|
|
||||||
?MODULE:recvloop(Parent, State).
|
|
||||||
|
|
||||||
system_terminate(Reason, _Parent, _Debug, State) ->
|
|
||||||
terminate(Reason, State).
|
|
||||||
|
|
||||||
system_code_change(State, _Mod, _OldVsn, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
system_get_state(State) -> {ok, State}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle call
|
|
||||||
|
|
||||||
handle_call(_From, info, State) ->
|
|
||||||
{reply, info(State), State};
|
|
||||||
|
|
||||||
handle_call(_From, stats, State) ->
|
|
||||||
{reply, stats(State), State};
|
|
||||||
|
|
||||||
%% TODO: How to set ratelimit ???
|
|
||||||
%%handle_call(_From, {ratelimit, Policy}, State = #state{channel = Channel}) ->
|
|
||||||
%% Zone = emqx_stomp_channel:info(zone, Channel),
|
|
||||||
%% Limiter = emqx_limiter:init(Zone, Policy),
|
|
||||||
%% {reply, ok, State#state{limiter = Limiter}};
|
|
||||||
|
|
||||||
handle_call(_From, Req, State = #state{channel = Channel}) ->
|
|
||||||
case emqx_stomp_channel:handle_call(Req, Channel) of
|
|
||||||
{reply, Reply, NChannel} ->
|
|
||||||
{reply, Reply, State#state{channel = NChannel}};
|
|
||||||
{shutdown, Reason, Reply, NChannel} ->
|
|
||||||
shutdown(Reason, Reply, State#state{channel = NChannel});
|
|
||||||
{shutdown, Reason, Reply, OutPacket, NChannel} ->
|
|
||||||
NState = State#state{channel = NChannel},
|
|
||||||
ok = handle_outgoing(OutPacket, NState),
|
|
||||||
shutdown(Reason, Reply, NState)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle timeout
|
|
||||||
|
|
||||||
handle_timeout(_TRef, idle_timeout, State) ->
|
|
||||||
shutdown(idle_timeout, State);
|
|
||||||
|
|
||||||
handle_timeout(_TRef, limit_timeout, State) ->
|
|
||||||
NState = State#state{sockstate = idle,
|
|
||||||
limit_timer = undefined
|
|
||||||
},
|
|
||||||
handle_info(activate_socket, NState);
|
|
||||||
|
|
||||||
handle_timeout(_TRef, emit_stats, State = #state{channel = Channel,
|
|
||||||
transport = _Transport,
|
|
||||||
socket = _Socket}) ->
|
|
||||||
%emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
|
|
||||||
Ctx = emqx_stomp_channel:info(ctx, Channel),
|
|
||||||
ClientId = emqx_stomp_channel:info(clientid, Channel),
|
|
||||||
emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)),
|
|
||||||
{ok, State#state{stats_timer = undefined}};
|
|
||||||
|
|
||||||
%% Abstraction ???
|
|
||||||
%handle_timeout(TRef, keepalive, State = #state{transport = Transport,
|
|
||||||
% socket = Socket,
|
|
||||||
% channel = Channel})->
|
|
||||||
% case emqx_stomp_channel:info(conn_state, Channel) of
|
|
||||||
% disconnected -> {ok, State};
|
|
||||||
% _ ->
|
|
||||||
% case Transport:getstat(Socket, [recv_oct]) of
|
|
||||||
% {ok, [{recv_oct, RecvOct}]} ->
|
|
||||||
% handle_timeout(TRef, {keepalive, RecvOct}, State);
|
|
||||||
% {error, Reason} ->
|
|
||||||
% handle_info({sock_error, Reason}, State)
|
|
||||||
% end
|
|
||||||
% end;
|
|
||||||
|
|
||||||
handle_timeout(TRef, TMsg, State = #state{transport = Transport,
|
|
||||||
socket = Socket,
|
|
||||||
channel = Channel
|
|
||||||
})
|
|
||||||
when TMsg =:= incoming;
|
|
||||||
TMsg =:= outgoing ->
|
|
||||||
Stat = case TMsg of incoming -> recv_oct; _ -> send_oct end,
|
|
||||||
case emqx_stomp_channel:info(conn_state, Channel) of
|
|
||||||
disconnected -> {ok, State};
|
|
||||||
_ ->
|
|
||||||
case Transport:getstat(Socket, [Stat]) of
|
|
||||||
{ok, [{recv_oct, RecvOct}]} ->
|
|
||||||
handle_timeout(TRef, {incoming, RecvOct}, State);
|
|
||||||
{ok, [{send_oct, SendOct}]} ->
|
|
||||||
handle_timeout(TRef, {outgoing, SendOct}, State);
|
|
||||||
{error, Reason} ->
|
|
||||||
handle_info({sock_error, Reason}, State)
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_timeout(TRef, Msg, State) ->
|
|
||||||
with_channel(handle_timeout, [TRef, Msg], State).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Parse incoming data
|
|
||||||
|
|
||||||
parse_incoming(Data, State) ->
|
|
||||||
{Packets, NState} = parse_incoming(Data, [], State),
|
|
||||||
{ok, next_incoming_msgs(Packets), NState}.
|
|
||||||
|
|
||||||
parse_incoming(<<>>, Packets, State) ->
|
|
||||||
{Packets, State};
|
|
||||||
|
|
||||||
parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
|
||||||
try emqx_stomp_frame:parse(Data, ParseState) of
|
|
||||||
{more, NParseState} ->
|
|
||||||
{Packets, State#state{parse_state = NParseState}};
|
|
||||||
{ok, Packet, Rest, NParseState} ->
|
|
||||||
NState = State#state{parse_state = NParseState},
|
|
||||||
parse_incoming(Rest, [Packet|Packets], NState)
|
|
||||||
catch
|
|
||||||
error:Reason:Stk ->
|
|
||||||
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p",
|
|
||||||
[Reason, Stk, Data]),
|
|
||||||
{[{frame_error, Reason}|Packets], State}
|
|
||||||
end.
|
|
||||||
|
|
||||||
next_incoming_msgs([Packet]) ->
|
|
||||||
{incoming, Packet};
|
|
||||||
next_incoming_msgs(Packets) ->
|
|
||||||
[{incoming, Packet} || Packet <- lists:reverse(Packets)].
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle incoming packet
|
|
||||||
|
|
||||||
handle_incoming(Packet, State) when is_record(Packet, stomp_frame) ->
|
|
||||||
ok = inc_incoming_stats(Packet),
|
|
||||||
?LOG(debug, "RECV ~s", [emqx_stomp_frame:format(Packet)]),
|
|
||||||
with_channel(handle_in, [Packet], State);
|
|
||||||
|
|
||||||
handle_incoming(FrameError, State) ->
|
|
||||||
with_channel(handle_in, [FrameError], State).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% With Channel
|
|
||||||
|
|
||||||
with_channel(Fun, Args, State = #state{channel = Channel}) ->
|
|
||||||
case erlang:apply(emqx_stomp_channel, Fun, Args ++ [Channel]) of
|
|
||||||
ok -> {ok, State};
|
|
||||||
{ok, NChannel} ->
|
|
||||||
{ok, State#state{channel = NChannel}};
|
|
||||||
{ok, Replies, NChannel} ->
|
|
||||||
{ok, next_msgs(Replies), State#state{channel = NChannel}};
|
|
||||||
{shutdown, Reason, NChannel} ->
|
|
||||||
shutdown(Reason, State#state{channel = NChannel});
|
|
||||||
{shutdown, Reason, Packet, NChannel} ->
|
|
||||||
NState = State#state{channel = NChannel},
|
|
||||||
ok = handle_outgoing(Packet, NState),
|
|
||||||
shutdown(Reason, NState)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle outgoing packets
|
|
||||||
|
|
||||||
handle_outgoing(Packets, State) when is_list(Packets) ->
|
|
||||||
send(lists:map(serialize_and_inc_stats_fun(State), Packets), State);
|
|
||||||
|
|
||||||
handle_outgoing(Packet, State) ->
|
|
||||||
send((serialize_and_inc_stats_fun(State))(Packet), State).
|
|
||||||
|
|
||||||
serialize_and_inc_stats_fun(#state{serialize = Serialize, channel = Channel}) ->
|
|
||||||
Ctx = emqx_stomp_channel:info(ctx, Channel),
|
|
||||||
fun(Packet) ->
|
|
||||||
case emqx_stomp_frame:serialize_pkt(Packet, Serialize) of
|
|
||||||
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
|
|
||||||
[emqx_stomp_frame:format(Packet)]),
|
|
||||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'),
|
|
||||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
|
|
||||||
<<>>;
|
|
||||||
Data -> ?LOG(debug, "SEND ~s", [emqx_stomp_frame:format(Packet)]),
|
|
||||||
ok = inc_outgoing_stats(Packet),
|
|
||||||
Data
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Send data
|
|
||||||
|
|
||||||
-spec(send(iodata(), state()) -> ok).
|
|
||||||
send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) ->
|
|
||||||
Ctx = emqx_stomp_channel:info(ctx, Channel),
|
|
||||||
Oct = iolist_size(IoData),
|
|
||||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.sent', Oct),
|
|
||||||
inc_counter(outgoing_bytes, Oct),
|
|
||||||
%emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
|
|
||||||
case Transport:async_send(Socket, IoData, [nosuspend]) of
|
|
||||||
ok -> ok;
|
|
||||||
Error = {error, _Reason} ->
|
|
||||||
%% Send an inet_reply to postpone handling the error
|
|
||||||
self() ! {inet_reply, Socket, Error},
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle Info
|
|
||||||
|
|
||||||
handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
|
|
||||||
case activate_socket(State) of
|
|
||||||
{ok, NState = #state{sockstate = NewSst}} ->
|
|
||||||
case OldSst =/= NewSst of
|
|
||||||
true -> {ok, {event, NewSst}, NState};
|
|
||||||
false -> {ok, NState}
|
|
||||||
end;
|
|
||||||
{error, Reason} ->
|
|
||||||
handle_info({sock_error, Reason}, State)
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_info({sock_error, Reason}, State) ->
|
|
||||||
case Reason =/= closed andalso Reason =/= einval of
|
|
||||||
true -> ?LOG(warning, "socket_error: ~p", [Reason]);
|
|
||||||
false -> ok
|
|
||||||
end,
|
|
||||||
handle_info({sock_closed, Reason}, close_socket(State));
|
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
|
||||||
with_channel(handle_info, [Info], State).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle Info
|
|
||||||
|
|
||||||
handle_cast({async_set_socket_options, Opts},
|
|
||||||
State = #state{transport = Transport,
|
|
||||||
socket = Socket
|
|
||||||
}) ->
|
|
||||||
case Transport:setopts(Socket, Opts) of
|
|
||||||
ok -> ?tp(info, "custom_socket_options_successfully", #{opts => Opts});
|
|
||||||
Err -> ?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err})
|
|
||||||
end,
|
|
||||||
State;
|
|
||||||
handle_cast(Req, State) ->
|
|
||||||
?tp(error, "received_unknown_cast", #{cast => Req}),
|
|
||||||
State.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Ensure rate limit
|
|
||||||
|
|
||||||
ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
|
||||||
case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of
|
|
||||||
false -> State;
|
|
||||||
{ok, Limiter1} ->
|
|
||||||
State#state{limiter = Limiter1};
|
|
||||||
{pause, Time, Limiter1} ->
|
|
||||||
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
|
|
||||||
TRef = emqx_misc:start_timer(Time, limit_timeout),
|
|
||||||
State#state{sockstate = blocked,
|
|
||||||
limiter = Limiter1,
|
|
||||||
limit_timer = TRef
|
|
||||||
}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Run GC and Check OOM
|
|
||||||
|
|
||||||
run_gc(Stats, State = #state{gc_state = GcSt}) ->
|
|
||||||
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
|
|
||||||
false -> State;
|
|
||||||
{_IsGC, GcSt1} ->
|
|
||||||
State#state{gc_state = GcSt1}
|
|
||||||
end.
|
|
||||||
|
|
||||||
check_oom(State = #state{oom_policy = OomPolicy}) ->
|
|
||||||
case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
|
|
||||||
{shutdown, Reason} ->
|
|
||||||
%% triggers terminate/2 callback immediately
|
|
||||||
erlang:exit({shutdown, Reason});
|
|
||||||
_Other -> ok
|
|
||||||
end,
|
|
||||||
State.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Activate Socket
|
|
||||||
|
|
||||||
-compile({inline, [activate_socket/1]}).
|
|
||||||
activate_socket(State = #state{sockstate = closed}) ->
|
|
||||||
{ok, State};
|
|
||||||
activate_socket(State = #state{sockstate = blocked}) ->
|
|
||||||
{ok, State};
|
|
||||||
activate_socket(State = #state{transport = Transport,
|
|
||||||
socket = Socket,
|
|
||||||
active_n = N}) ->
|
|
||||||
case Transport:setopts(Socket, [{active, N}]) of
|
|
||||||
ok -> {ok, State#state{sockstate = running}};
|
|
||||||
Error -> Error
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Close Socket
|
|
||||||
|
|
||||||
close_socket(State = #state{sockstate = closed}) -> State;
|
|
||||||
close_socket(State = #state{transport = Transport, socket = Socket}) ->
|
|
||||||
ok = Transport:fast_close(Socket),
|
|
||||||
State#state{sockstate = closed}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Inc incoming/outgoing stats
|
|
||||||
|
|
||||||
%% XXX: Other packet type?
|
|
||||||
inc_incoming_stats(_Packet) ->
|
|
||||||
inc_counter(recv_pkt, 1),
|
|
||||||
ok.
|
|
||||||
%case Type =:= ?CMD_SEND of
|
|
||||||
% true ->
|
|
||||||
% inc_counter(recv_msg, 1),
|
|
||||||
% inc_counter(incoming_pubs, 1);
|
|
||||||
% false ->
|
|
||||||
% ok
|
|
||||||
%end,
|
|
||||||
%emqx_metrics:inc_recv(Packet).
|
|
||||||
|
|
||||||
inc_outgoing_stats(_Packet) ->
|
|
||||||
inc_counter(send_pkt, 1),
|
|
||||||
ok.
|
|
||||||
%case Type =:= ?CMD_MESSAGE of
|
|
||||||
% true ->
|
|
||||||
% inc_counter(send_msg, 1),
|
|
||||||
% inc_counter(outgoing_pubs, 1);
|
|
||||||
% false ->
|
|
||||||
% ok
|
|
||||||
%end,
|
|
||||||
%emqx_metrics:inc_sent(Packet).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Helper functions
|
|
||||||
|
|
||||||
next_msgs(Packet) when is_record(Packet, stomp_frame) ->
|
|
||||||
{outgoing, Packet};
|
|
||||||
next_msgs(Event) when is_tuple(Event) ->
|
|
||||||
Event;
|
|
||||||
next_msgs(More) when is_list(More) ->
|
|
||||||
More.
|
|
||||||
|
|
||||||
shutdown(Reason, State) ->
|
|
||||||
stop({shutdown, Reason}, State).
|
|
||||||
|
|
||||||
shutdown(Reason, Reply, State) ->
|
|
||||||
stop({shutdown, Reason}, Reply, State).
|
|
||||||
|
|
||||||
stop(Reason, State) ->
|
|
||||||
{stop, Reason, State}.
|
|
||||||
|
|
||||||
stop(Reason, Reply, State) ->
|
|
||||||
{stop, Reason, Reply, State}.
|
|
||||||
|
|
||||||
inc_counter(Key, Inc) ->
|
|
||||||
_ = emqx_pd:inc_counter(Key, Inc),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% For CT tests
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
set_field(Name, Value, State) ->
|
|
||||||
Pos = emqx_misc:index_of(Name, record_info(fields, state)),
|
|
||||||
setelement(Pos+1, State, Value).
|
|
||||||
|
|
||||||
get_state(Pid) ->
|
|
||||||
State = sys:get_state(Pid),
|
|
||||||
maps:from_list(lists:zip(record_info(fields, state),
|
|
||||||
tl(tuple_to_list(State)))).
|
|
|
@ -68,6 +68,8 @@
|
||||||
|
|
||||||
-module(emqx_stomp_frame).
|
-module(emqx_stomp_frame).
|
||||||
|
|
||||||
|
-behavior(emqx_gateway_frame).
|
||||||
|
|
||||||
-include("src/stomp/include/emqx_stomp.hrl").
|
-include("src/stomp/include/emqx_stomp.hrl").
|
||||||
|
|
||||||
-export([ initial_parse_state/1
|
-export([ initial_parse_state/1
|
||||||
|
|
|
@ -113,8 +113,13 @@ start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
|
||||||
|
|
||||||
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
|
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
|
||||||
Name = name(InstaId, Type),
|
Name = name(InstaId, Type),
|
||||||
|
NCfg = Cfg#{
|
||||||
|
ctx => Ctx,
|
||||||
|
frame_mod => emqx_stomp_frame,
|
||||||
|
chann_mod => emqx_stomp_channel
|
||||||
|
},
|
||||||
esockd:open(Name, ListenOn, merge_default(SocketOpts),
|
esockd:open(Name, ListenOn, merge_default(SocketOpts),
|
||||||
{emqx_stomp_connection, start_link, [Cfg#{ctx => Ctx}]}).
|
{emqx_gateway_conn, start_link, [NCfg]}).
|
||||||
|
|
||||||
name(InstaId, Type) ->
|
name(InstaId, Type) ->
|
||||||
list_to_atom(lists:concat([InstaId, ":", Type])).
|
list_to_atom(lists:concat([InstaId, ":", Type])).
|
||||||
|
|
Loading…
Reference in New Issue