diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl new file mode 100644 index 000000000..be385e720 --- /dev/null +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl @@ -0,0 +1,97 @@ +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc The Gateway channel behavior +%% +%% This module does not export any functions at the moment. +%% It is only used to standardize the implement of emqx_foo_channel.erl +%% module if it integrated with emqx_gateway_conn module +-module(emqx_gateway_channel). + +-type channel() :: any(). + +%%-------------------------------------------------------------------- +%% Info & Stats + +%% @doc Get the channel detailed infomation. +-callback info(channel()) -> emqx_types:infos(). + +-callback info(Key :: atom() | [atom()], channel()) -> any(). + +%% @doc Get the channel statistic items +-callback stats(channel()) -> emqx_types:stats(). + +%%-------------------------------------------------------------------- +%% Init + +%% @doc Initialize the channel state +-callback init(emqx_types:conniinfo(), map()) -> channel(). + +%%-------------------------------------------------------------------- +%% Handles + +-type conn_state() :: idle | connecting | connected | disconnected | atom(). + +-type reply() :: {outgoing, emqx_gateway_frame:packet()} + | {outgoing, [emqx_gateway_frame:packet()]} + | {event, conn_state() | updated} + | {close, Reason :: atom()}. + +-type replies() :: emqx_gateway_frame:packet() | reply() | [reply()]. + +%% @doc Handle the incoming frame +-callback handle_in(emqx_gateway_frame:frame() | {frame_error, any()}, + channel()) + -> {ok, channel()} + | {ok, replies(), channel()} + | {shutdown, Reason :: any(), channel()} + | {shutdown, Reason :: any(), replies(), channel()}. + +%% @doc Handle the outgoing messages dispatched from PUB/SUB system +-callback handle_deliver(list(emqx_types:deliver()), channel()) + -> {ok, channel()} + | {ok, replies(), channel()}. + +%% @doc Handle the timeout event +-callback handle_timeout(reference(), Msg :: any(), channel()) + -> {ok, channel()} + | {ok, replies(), channel()} + | {shutdown, Reason :: any(), channel()}. + +%% @doc Handle the custom gen_server:call/2 for its connection process +-callback handle_call(Req :: any(), channel()) + -> {reply, Reply :: any(), channel()} + | {shutdown, Reason :: any(), Reply :: any(), channel()} + | {shutdown, Reason :: any(), Reply :: any(), + emqx_gateway_frame:frame(), channel()}. + +%% @doc Handle the custom gen_server:cast/2 for its connection process +-callback handle_cast(Req :: any(), channel()) + -> ok + | {ok, channel()} + | {shutdown, Reason :: any(), channel()}. + +%% @doc Handle the custom process messages for its connection process +-callback handle_info(Info :: any(), channel()) + -> ok + | {ok, channel()} + | {shutdown, Reason :: any(), channel()}. + +%%-------------------------------------------------------------------- +%% Terminate + +%% @doc The callback for process terminated +-callback terminate(any(), channel()) -> ok. + diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 7392f0b20..5a2fc0dc8 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -15,8 +15,827 @@ %%-------------------------------------------------------------------- %% @doc The behavior abstrat for TCP based gateway conn -%% -module(emqx_gateway_conn). -%% TODO: Gateway v0.2 +-include_lib("emqx/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-logger_header("[GW-Conn]"). + +%% API +-export([ start_link/3 + , stop/1 + ]). + +-export([ info/1 + , stats/1 + ]). + +-export([ call/2 + , call/3 + , cast/2 + ]). + +%% Callback +-export([init/6]). + +%% Sys callbacks +-export([ system_continue/3 + , system_terminate/4 + , system_code_change/4 + , system_get_state/1 + ]). + +%% Internal callback +-export([wakeup_from_hib/2, recvloop/2]). + + +-record(state, { + %% TCP/SSL/UDP/DTLS Wrapped Socket + socket :: {esockd_transport, esockd:socket()} | {udp, _, _}, + %% Peername of the connection + peername :: emqx_types:peername(), + %% Sockname of the connection + sockname :: emqx_types:peername(), + %% Sock State + sockstate :: emqx_types:sockstate(), + %% The {active, N} option + active_n :: pos_integer(), + %% Limiter + limiter :: maybe(emqx_limiter:limiter()), + %% Limit Timer + limit_timer :: maybe(reference()), + %% Parse State + parse_state :: emqx_gateway_frame:parse_state(), + %% Serialize options + serialize :: emqx_gateway_frame:serialize_opts(), + %% Channel State + channel :: emqx_gateway_channel:channel(), + %% GC State + gc_state :: maybe(emqx_gc:gc_state()), + %% Stats Timer + stats_timer :: disabled | maybe(reference()), + %% Idle Timeout + idle_timeout :: integer(), + %% Idle Timer + idle_timer :: maybe(reference()), + %% OOM Policy + oom_policy :: maybe(emqx_types:oom_policy()), + %% Frame Module + frame_mod :: atom(), + %% Channel Module + chann_mod :: atom() + }). + +-type(state() :: #state{}). + +-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). +-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). +-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). + +-define(ENABLED(X), (X =/= undefined)). + +-dialyzer({nowarn_function, + [ system_terminate/4 + , handle_call/3 + , handle_msg/2 + , shutdown/3 + , stop/3 + , parse_incoming/3 + ]}). + +%% udp +start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) -> + Args = [self(), Socket, Peername, Options] ++ callback_modules(Options), + {ok, proc_lib:spawn_link(?MODULE, init, Args)}; + +%% tcp/ssl/dtls +start_link(esockd_transport, Sock, Options) -> + Socket = {esockd_transport, Sock}, + Args = [self(), Socket, undefined, Options] ++ callback_modules(Options), + {ok, proc_lib:spawn_link(?MODULE, init, Args)}. + +callback_modules(Options) -> + [maps:get(frame_mod, Options), + maps:get(chann_mod, Options)]. + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +%% @doc Get infos of the connection/channel. +-spec(info(pid()|state()) -> emqx_types:infos()). +info(CPid) when is_pid(CPid) -> + call(CPid, info); +info(State = #state{chann_mod = ChannMod, channel = Channel}) -> + ChanInfo = ChannMod:info(Channel), + SockInfo = maps:from_list( + info(?INFO_KEYS, State)), + ChanInfo#{sockinfo => SockInfo}. + +info(Keys, State) when is_list(Keys) -> + [{Key, info(Key, State)} || Key <- Keys]; +info(socktype, #state{socket = Socket}) -> + esockd_type(Socket); +info(peername, #state{peername = Peername}) -> + Peername; +info(sockname, #state{sockname = Sockname}) -> + Sockname; +info(sockstate, #state{sockstate = SockSt}) -> + SockSt; +info(active_n, #state{active_n = ActiveN}) -> + ActiveN. + +-spec(stats(pid()|state()) -> emqx_types:stats()). +stats(CPid) when is_pid(CPid) -> + call(CPid, stats); +stats(#state{socket = Socket, + chann_mod = ChannMod, + channel = Channel}) -> + SockStats = case esockd_getstat(Socket, ?SOCK_STATS) of + {ok, Ss} -> Ss; + {error, _} -> [] + end, + ConnStats = emqx_pd:get_counters(?CONN_STATS), + ChanStats = ChannMod:stats(Channel), + ProcStats = emqx_misc:proc_stats(), + lists:append([SockStats, ConnStats, ChanStats, ProcStats]). + +call(Pid, Req) -> + call(Pid, Req, infinity). + +call(Pid, Req, Timeout) -> + gen_server:call(Pid, Req, Timeout). + +cast(Pid, Req) -> + gen_server:cast(Pid, Req). + +stop(Pid) -> + gen_server:stop(Pid). + +%%-------------------------------------------------------------------- +%% Wrapped funcs +%%-------------------------------------------------------------------- + +esockd_peername({udp, _SockPid, _Sock}, Peername) -> + Peername; +esockd_peername({esockd_transport, Sock}, _Peername) -> + {ok, Peername} = esockd_transport:ensure_ok_or_exit(peername, [Sock]), + Peername. + +esockd_wait(Socket = {udp, _SockPid, _Sock}) -> + {ok, Socket}; +esockd_wait({esockd_transport, Sock}) -> + case esockd_transport:wait(Sock) of + {ok, NSock} -> {ok, {esockd_transport, NSock}}; + R = {error, _} -> R + end. + +esockd_close({udp, _SockPid, _Sock}) -> + %% nothing to do for udp socket + %%gen_udp:close(Sock); + ok; +esockd_close({esockd_transport, Sock}) -> + esockd_transport:fast_close(Sock). + +esockd_ensure_ok_or_exit(peercert, {udp, _SockPid, _Sock}) -> + nossl; +esockd_ensure_ok_or_exit(Fun, {udp, _SockPid, Sock}) -> + esockd_transport:ensure_ok_or_exit(Fun, [Sock]); +esockd_ensure_ok_or_exit(Fun, {esockd_transport, Socket}) -> + esockd_transport:ensure_ok_or_exit(Fun, [Socket]). + +esockd_type({udp, _, _}) -> + udp; +esockd_type({esockd_transport, Socket}) -> + esockd_transport:type(Socket). + +esockd_setopts({udp, _, _}, _) -> + ok; +esockd_setopts({esockd_transport, Socket}, Opts) -> + %% FIXME: DTLS works?? + esockd_transport:setopts(Socket, Opts). + +esockd_getstat({udp, _SockPid, Sock}, Stats) -> + inet:getstat(Sock, Stats); +esockd_getstat({esockd_transport, Sock}, Stats) -> + esockd_transport:getstat(Sock, Stats). + +esockd_send(Data, #state{socket = {udp, _SockPid, Sock}, + peername = {Ip, Port}}) -> + gen_udp:send(Sock, Ip, Port, Data); +esockd_send(Data, #state{socket = {esockd_transport, Sock}}) -> + esockd_transport:async_send(Sock, Data). + +%%-------------------------------------------------------------------- +%% callbacks +%%-------------------------------------------------------------------- + +init(Parent, WrappedSock, Peername0, Options, FrameMod, ChannMod) -> + case esockd_wait(WrappedSock) of + {ok, NWrappedSock} -> + Peername = esockd_peername(NWrappedSock, Peername0), + run_loop(Parent, init_state(NWrappedSock, Peername, + Options, FrameMod, ChannMod)); + {error, Reason} -> + ok = esockd_close(WrappedSock), + exit_on_sock_error(Reason) + end. + +init_state(WrappedSock, Peername, Options, FrameMod, ChannMod) -> + {ok, Sockname} = esockd_ensure_ok_or_exit(sockname, WrappedSock), + Peercert = esockd_ensure_ok_or_exit(peercert, WrappedSock), + ConnInfo = #{socktype => esockd_type(WrappedSock), + peername => Peername, + sockname => Sockname, + peercert => Peercert, + conn_mod => ?MODULE + }, + ActiveN = emqx_gateway_utils:active_n(Options), + %% FIXME: + %%Limiter = emqx_limiter:init(Options), + Limiter = undefined, + FrameOpts = emqx_gateway_utils:frame_options(Options), + ParseState = FrameMod:initial_parse_state(FrameOpts), + Serialize = FrameMod:serialize_opts(), + Channel = ChannMod:init(ConnInfo, Options), + GcState = emqx_gateway_utils:init_gc_state(Options), + StatsTimer = emqx_gateway_utils:stats_timer(Options), + IdleTimeout = emqx_gateway_utils:idle_timeout(Options), + OomPolicy = emqx_gateway_utils:oom_policy(Options), + IdleTimer = emqx_misc:start_timer(IdleTimeout, idle_timeout), + #state{socket = WrappedSock, + peername = Peername, + sockname = Sockname, + sockstate = idle, + active_n = ActiveN, + limiter = Limiter, + parse_state = ParseState, + serialize = Serialize, + channel = Channel, + gc_state = GcState, + stats_timer = StatsTimer, + idle_timeout = IdleTimeout, + idle_timer = IdleTimer, + oom_policy = OomPolicy, + frame_mod = FrameMod, + chann_mod = ChannMod + }. + +run_loop(Parent, State = #state{socket = Socket, + peername = Peername, + oom_policy = OomPolicy + }) -> + emqx_logger:set_metadata_peername(esockd:format(Peername)), + _ = emqx_misc:tune_heap_size(OomPolicy), + case activate_socket(State) of + {ok, NState} -> + hibernate(Parent, NState); + {error, Reason} -> + ok = esockd_close(Socket), + exit_on_sock_error(Reason) + end. + +-spec exit_on_sock_error(atom()) -> no_return(). +exit_on_sock_error(Reason) when Reason =:= einval; + Reason =:= enotconn; + Reason =:= closed -> + erlang:exit(normal); +exit_on_sock_error(timeout) -> + erlang:exit({shutdown, ssl_upgrade_timeout}); +exit_on_sock_error(Reason) -> + erlang:exit({shutdown, Reason}). + +%%-------------------------------------------------------------------- +%% Recv Loop + +recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) -> + receive + Msg -> + handle_recv(Msg, Parent, State) + after + IdleTimeout + 100 -> + hibernate(Parent, cancel_stats_timer(State)) + end. + +handle_recv({system, From, Request}, Parent, State) -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State); +handle_recv({'EXIT', Parent, Reason}, Parent, State) -> + %% FIXME: it's not trapping exit, should never receive an EXIT + terminate(Reason, State); +handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) -> + case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of + {ok, NewState} -> + ?MODULE:recvloop(Parent, NewState); + {stop, Reason, NewSate} -> + terminate(Reason, NewSate) + end. + +hibernate(Parent, State) -> + proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]). + +%% Maybe do something here later. +wakeup_from_hib(Parent, State) -> + ?MODULE:recvloop(Parent, State). + +%%-------------------------------------------------------------------- +%% Ensure/cancel stats timer + +ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) -> + State#state{stats_timer = emqx_misc:start_timer(Timeout, emit_stats)}; +ensure_stats_timer(_Timeout, State) -> State. + +cancel_stats_timer(State = #state{stats_timer = TRef}) + when is_reference(TRef) -> + ok = emqx_misc:cancel_timer(TRef), + State#state{stats_timer = undefined}; +cancel_stats_timer(State) -> State. + +%%-------------------------------------------------------------------- +%% Process next Msg + +process_msg([], State) -> + {ok, State}; +process_msg([Msg|More], State) -> + try + case handle_msg(Msg, State) of + ok -> + process_msg(More, State); + {ok, NState} -> + process_msg(More, NState); + {ok, Msgs, NState} -> + process_msg(append_msg(More, Msgs), NState); + {stop, Reason, NState} -> + {stop, Reason, NState} + end + catch + exit : normal -> + {stop, normal, State}; + exit : shutdown -> + {stop, shutdown, State}; + exit : {shutdown, _} = Shutdown -> + {stop, Shutdown, State}; + Exception : Context : Stack -> + {stop, #{exception => Exception, + context => Context, + stacktrace => Stack}, State} + end. + +append_msg([], Msgs) when is_list(Msgs) -> + Msgs; +append_msg([], Msg) -> [Msg]; +append_msg(Q, Msgs) when is_list(Msgs) -> + lists:append(Q, Msgs); +append_msg(Q, Msg) -> + lists:append(Q, [Msg]). + +%%-------------------------------------------------------------------- +%% Handle a Msg + +handle_msg({'$gen_call', From, Req}, State) -> + case handle_call(From, Req, State) of + {reply, Reply, NState} -> + gen_server:reply(From, Reply), + {ok, NState}; + {reply, Reply, Msgs, NState} -> + gen_server:reply(From, Reply), + {ok, next_msgs(Msgs), NState}; + {stop, Reason, Reply, NState} -> + gen_server:reply(From, Reply), + stop(Reason, NState) + end; + +handle_msg({'$gen_cast', Req}, State) -> + with_channel(handle_cast, [Req], State); + +handle_msg({datagram, _SockPid, Data}, State) -> + parse_incoming(Data, State); + +handle_msg({Inet, _Sock, Data}, State) + when Inet == tcp; + Inet == ssl -> + parse_incoming(Data, State); + +handle_msg({incoming, Packet}, + State = #state{idle_timer = IdleTimer}) -> + IdleTimer /= undefined andalso + emqx_misc:cancel_timer(IdleTimer), + NState = State#state{idle_timer = undefined}, + handle_incoming(Packet, NState); + +handle_msg({outgoing, Data}, State) -> + handle_outgoing(Data, State); + +handle_msg({Error, _Sock, Reason}, State) + when Error == tcp_error; Error == ssl_error -> + handle_info({sock_error, Reason}, State); + +handle_msg({Closed, _Sock}, State) + when Closed == tcp_closed; Closed == ssl_closed -> + handle_info({sock_closed, Closed}, close_socket(State)); + +%% TODO: udp_passive??? +handle_msg({Passive, _Sock}, State) + when Passive == tcp_passive; Passive == ssl_passive -> + %% In Stats + Bytes = emqx_pd:reset_counter(incoming_bytes), + Pubs = emqx_pd:reset_counter(incoming_pkt), + InStats = #{cnt => Pubs, oct => Bytes}, + %% Ensure Rate Limit + NState = ensure_rate_limit(InStats, State), + %% Run GC and Check OOM + NState1 = check_oom(run_gc(InStats, NState)), + handle_info(activate_socket, NState1); + +handle_msg(Deliver = {deliver, _Topic, _Msg}, + State = #state{active_n = ActiveN}) -> + Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], + with_channel(handle_deliver, [Delivers], State); + +%% Something sent +%% TODO: Who will deliver this message? +handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> + case emqx_pd:get_counter(outgoing_pkt) > ActiveN of + true -> + Pubs = emqx_pd:reset_counter(outgoing_pkt), + Bytes = emqx_pd:reset_counter(outgoing_bytes), + OutStats = #{cnt => Pubs, oct => Bytes}, + {ok, check_oom(run_gc(OutStats, State))}; + false -> ok + end; + +handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> + handle_info({sock_error, Reason}, State); + +handle_msg({close, Reason}, State) -> + ?LOG(debug, "Force to close the socket due to ~p", [Reason]), + handle_info({sock_closed, Reason}, close_socket(State)); + +handle_msg({event, connected}, State = #state{ + chann_mod = ChannMod, + channel = Channel}) -> + Ctx = ChannMod:info(ctx, Channel), + ClientId = ChannMod:info(clientid, Channel), + emqx_gateway_ctx:insert_channel_info( + Ctx, + ClientId, + info(State), + stats(State) + ); + +handle_msg({event, disconnected}, State = #state{ + chann_mod = ChannMod, + channel = Channel}) -> + Ctx = ChannMod:info(ctx, Channel), + ClientId = ChannMod:info(clientid, Channel), + emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)), + emqx_gateway_ctx:connection_closed(Ctx, ClientId), + {ok, State}; + +handle_msg({event, _Other}, State = #state{ + chann_mod = ChannMod, + channel = Channel}) -> + Ctx = ChannMod:info(ctx, Channel), + ClientId = ChannMod:info(clientid, Channel), + emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)), + emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)), + {ok, State}; + +handle_msg({timeout, TRef, TMsg}, State) -> + handle_timeout(TRef, TMsg, State); + +handle_msg(Shutdown = {shutdown, _Reason}, State) -> + stop(Shutdown, State); + +handle_msg(Msg, State) -> + handle_info(Msg, State). + +%%-------------------------------------------------------------------- +%% Terminate + +-spec terminate(atom(), state()) -> no_return(). +terminate(Reason, State = #state{ + chann_mod = ChannMod, + channel = Channel}) -> + ?LOG(debug, "Terminated due to ~p", [Reason]), + _ = ChannMod:terminate(Reason, Channel), + _ = close_socket(State), + exit(Reason). + +%%-------------------------------------------------------------------- +%% Sys callbacks + +system_continue(Parent, _Debug, State) -> + ?MODULE:recvloop(Parent, State). + +system_terminate(Reason, _Parent, _Debug, State) -> + terminate(Reason, State). + +system_code_change(State, _Mod, _OldVsn, _Extra) -> + {ok, State}. + +system_get_state(State) -> {ok, State}. + +%%-------------------------------------------------------------------- +%% Handle call + +handle_call(_From, info, State) -> + {reply, info(State), State}; + +handle_call(_From, stats, State) -> + {reply, stats(State), State}; + +handle_call(_From, Req, State = #state{ + chann_mod = ChannMod, + channel = Channel}) -> + case ChannMod:handle_call(Req, Channel) of + {reply, Reply, NChannel} -> + {reply, Reply, State#state{channel = NChannel}}; + {reply, Reply, Replies, NChannel} -> + {reply, Reply, Replies, State#state{channel = NChannel}}; + {shutdown, Reason, Reply, NChannel} -> + shutdown(Reason, Reply, State#state{channel = NChannel}) + end. + +%%-------------------------------------------------------------------- +%% Handle timeout + +handle_timeout(_TRef, idle_timeout, State) -> + shutdown(idle_timeout, State); + +handle_timeout(_TRef, limit_timeout, State) -> + NState = State#state{sockstate = idle, + limit_timer = undefined + }, + handle_info(activate_socket, NState); +handle_timeout(TRef, keepalive, State = #state{ + chann_mod = ChannMod, + socket = Socket, + channel = Channel})-> + case ChannMod:info(conn_state, Channel) of + disconnected -> {ok, State}; + _ -> + case esockd_getstat(Socket, [recv_oct, send_oct]) of + {ok, [{recv_oct, RecvOct}, {send_oct, SendOct}]} -> + handle_timeout(TRef, {keepalive, {RecvOct, SendOct}}, State); + {error, Reason} -> + handle_info({sock_error, Reason}, State) + end + end; +handle_timeout(_TRef, emit_stats, State = + #state{chann_mod = ChannMod, channel = Channel}) -> + Ctx = ChannMod:info(ctx, Channel), + ClientId = ChannMod:info(clientid, Channel), + emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)), + {ok, State#state{stats_timer = undefined}}; + +handle_timeout(TRef, Msg, State) -> + with_channel(handle_timeout, [TRef, Msg], State). + +%%-------------------------------------------------------------------- +%% Parse incoming data + +parse_incoming(Data, State = #state{ + chann_mod = ChannMod, + channel = Channel}) -> + ?LOG(debug, "RECV ~0p", [Data]), + Oct = iolist_size(Data), + inc_counter(incoming_bytes, Oct), + Ctx = ChannMod:info(ctx, Channel), + ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.received', Oct), + {Packets, NState} = parse_incoming(Data, [], State), + {ok, next_incoming_msgs(Packets), NState}. + +parse_incoming(<<>>, Packets, State) -> + {Packets, State}; + +parse_incoming(Data, Packets, + State = #state{ + frame_mod = FrameMod, + parse_state = ParseState}) -> + try FrameMod:parse(Data, ParseState) of + {more, NParseState} -> + {Packets, State#state{parse_state = NParseState}}; + {ok, Packet, Rest, NParseState} -> + NState = State#state{parse_state = NParseState}, + parse_incoming(Rest, [Packet|Packets], NState) + catch + error:Reason:Stk -> + ?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p", + [Reason, Stk, Data]), + {[{frame_error, Reason}|Packets], State} + end. + +next_incoming_msgs([Packet]) -> + {incoming, Packet}; +next_incoming_msgs(Packets) -> + [{incoming, Packet} || Packet <- lists:reverse(Packets)]. + +%%-------------------------------------------------------------------- +%% Handle incoming packet + +handle_incoming(Packet, State = #state{frame_mod = FrameMod}) -> + ok = inc_incoming_stats(Packet), + ?LOG(debug, "RECV ~s", [FrameMod:format(Packet)]), + with_channel(handle_in, [Packet], State). + +%%-------------------------------------------------------------------- +%% With Channel + +with_channel(Fun, Args, State = #state{ + chann_mod = ChannMod, + channel = Channel}) -> + case erlang:apply(ChannMod, Fun, Args ++ [Channel]) of + ok -> {ok, State}; + {ok, NChannel} -> + {ok, State#state{channel = NChannel}}; + {ok, Replies, NChannel} -> + {ok, next_msgs(Replies), State#state{channel = NChannel}}; + {shutdown, Reason, NChannel} -> + shutdown(Reason, State#state{channel = NChannel}); + {shutdown, Reason, Packet, NChannel} -> + NState = State#state{channel = NChannel}, + ok = handle_outgoing(Packet, NState), + shutdown(Reason, NState) + end. + +%%-------------------------------------------------------------------- +%% Handle outgoing packets + +handle_outgoing(Packets, State) when is_list(Packets) -> + send(lists:map(serialize_and_inc_stats_fun(State), Packets), State); + +handle_outgoing(Packet, State) -> + send((serialize_and_inc_stats_fun(State))(Packet), State). + +serialize_and_inc_stats_fun(#state{ + frame_mod = FrameMod, + chann_mod = ChannMod, + serialize = Serialize, + channel = Channel}) -> + Ctx = ChannMod:info(ctx, Channel), + fun(Packet) -> + case FrameMod:serialize_pkt(Packet, Serialize) of + <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", + [FrameMod:format(Packet)]), + ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'), + ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'), + <<>>; + Data -> ?LOG(debug, "SEND ~s", [FrameMod:format(Packet)]), + ok = inc_outgoing_stats(Packet), + Data + end + end. + +%%-------------------------------------------------------------------- +%% Send data + +-spec(send(iodata(), state()) -> ok). +send(IoData, State = #state{socket = Socket, + chann_mod = ChannMod, + channel = Channel}) -> + Ctx = ChannMod:info(ctx, Channel), + Oct = iolist_size(IoData), + ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.sent', Oct), + inc_counter(outgoing_bytes, Oct), + case esockd_send(IoData, State) of + ok -> ok; + Error = {error, _Reason} -> + %% Send an inet_reply to postpone handling the error + self() ! {inet_reply, Socket, Error}, + ok + end. + +%%-------------------------------------------------------------------- +%% Handle Info + +handle_info(activate_socket, State = #state{sockstate = OldSst}) -> + case activate_socket(State) of + {ok, NState = #state{sockstate = NewSst}} -> + if OldSst =/= NewSst -> + {ok, {event, NewSst}, NState}; + true -> {ok, NState} + end; + {error, Reason} -> + handle_info({sock_error, Reason}, State) + end; + +handle_info({sock_error, Reason}, State) -> + ?LOG(debug, "Socket error: ~p", [Reason]), + handle_info({sock_closed, Reason}, close_socket(State)); + +handle_info(Info, State) -> + with_channel(handle_info, [Info], State). + +%%-------------------------------------------------------------------- +%% Ensure rate limit + +ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> + case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of + false -> State; + {ok, Limiter1} -> + State#state{limiter = Limiter1}; + {pause, Time, Limiter1} -> + ?LOG(warning, "Pause ~pms due to rate limit", [Time]), + TRef = emqx_misc:start_timer(Time, limit_timeout), + State#state{sockstate = blocked, + limiter = Limiter1, + limit_timer = TRef + } + end. + +%%-------------------------------------------------------------------- +%% Run GC and Check OOM + +run_gc(Stats, State = #state{gc_state = GcSt}) -> + case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of + false -> State; + {_IsGC, GcSt1} -> + State#state{gc_state = GcSt1} + end. + +check_oom(State = #state{oom_policy = OomPolicy}) -> + case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of + {shutdown, Reason} -> + %% triggers terminate/2 callback immediately + erlang:exit({shutdown, Reason}); + _Other -> ok + end, + State. + +%%-------------------------------------------------------------------- +%% Activate Socket + +activate_socket(State = #state{sockstate = closed}) -> + {ok, State}; +activate_socket(State = #state{sockstate = blocked}) -> + {ok, State}; +activate_socket(State = #state{socket = Socket, + active_n = N}) -> + %% FIXME: Works on dtls/udp ??? + %% How to hanlde buffer? + case esockd_setopts(Socket, [{active, N}]) of + ok -> {ok, State#state{sockstate = running}}; + Error -> Error + end. + +%%-------------------------------------------------------------------- +%% Close Socket + +close_socket(State = #state{sockstate = closed}) -> State; +close_socket(State = #state{socket = Socket}) -> + ok = esockd_close(Socket), + State#state{sockstate = closed}. + +%%-------------------------------------------------------------------- +%% Inc incoming/outgoing stats + +%% XXX: How to stats? +inc_incoming_stats(_Packet) -> + inc_counter(recv_pkt, 1), + ok. + %case Type =:= ?CMD_SEND of + % true -> + % inc_counter(recv_msg, 1), + % inc_counter(incoming_pubs, 1); + % false -> + % ok + %end, + %emqx_metrics:inc_recv(Packet). + +inc_outgoing_stats(_Packet) -> + inc_counter(send_pkt, 1), + ok. + %case Type =:= ?CMD_MESSAGE of + % true -> + % inc_counter(send_msg, 1), + % inc_counter(outgoing_pubs, 1); + % false -> + % ok + %end, + %emqx_metrics:inc_sent(Packet). + +%%-------------------------------------------------------------------- +%% Helper functions + +-compile({inline, [next_msgs/1]}). +next_msgs(Event) when is_tuple(Event) -> + Event; +next_msgs(More) when is_list(More) -> + More. + +shutdown(Reason, State) -> + stop({shutdown, Reason}, State). + +shutdown(Reason, Reply, State) -> + stop({shutdown, Reason}, Reply, State). + +stop(Reason, State) -> + {stop, Reason, State}. + +stop(Reason, Reply, State) -> + {stop, Reason, Reply, State}. + +inc_counter(Name, Value) -> + _ = emqx_pd:inc_counter(Name, Value), + ok. diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_frame.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_frame.erl new file mode 100644 index 000000000..4cce837aa --- /dev/null +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_frame.erl @@ -0,0 +1,48 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc The Gateway frame behavior +%% +%% This module does not export any functions at the moment. +%% It is only used to standardize the implement of emqx_foo_frame.erl +%% module if it integrated with emqx_gateway_conn module +%% +-module(emqx_gateway_frame). + + +-type parse_state() :: map(). + +-type frame() :: any(). + +-type parse_result() :: {ok, frame(), + Rest :: binary(), NewState :: parse_state()} + | {more, NewState :: parse_state()}. + +-type serialize_options() :: map(). + +%% Callbacks + +%% @doc Initial the frame parser states +-callback initial_parse_state(map()) -> parse_state(). + +-callback serialize_opts() -> serialize_options(). + +-callback serialize_pkt(Frame :: any(), serialize_options()) -> iodata(). + +-callback parse(binary(), parse_state()) -> parse_result(). + +-callback format(Frame :: any()) -> string(). + diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index b3292c895..04eca9e5d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -43,6 +43,7 @@ -define(ACTIVE_N, 100). -define(DEFAULT_IDLE_TIMEOUT, 30000). +-define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}). -define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304, message_queue_len => 32000}). @@ -158,7 +159,7 @@ init_gc_state(Options) -> -spec force_gc_policy(map()) -> emqx_gc:opts() | undefined. force_gc_policy(Options) -> - maps:get(force_gc_policy, Options, undefined). + maps:get(force_gc_policy, Options, ?DEFAULT_GC_OPTS). -spec oom_policy(map()) -> emqx_types:oom_policy(). oom_policy(Options) -> diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index e4875bb36..c83a65e64 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -16,6 +16,8 @@ -module(emqx_sn_channel). +-behavior(emqx_gateway_channel). + -include("src/mqttsn/include/emqx_sn.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). @@ -1270,7 +1272,7 @@ handle_timeout(_TRef, {keepalive, _StatVal}, when ConnState =:= disconnected; ConnState =:= asleep -> {ok, Channel}; -handle_timeout(_TRef, {keepalive, StatVal}, +handle_timeout(_TRef, {keepalive, {StatVal, _}}, Channel = #channel{keepalive = Keepalive}) -> case emqx_keepalive:check(StatVal, Keepalive) of {ok, NKeepalive} -> @@ -1422,5 +1424,3 @@ returncode_name(?SN_RC_NOT_AUTHORIZE) -> rejected_not_authorize; returncode_name(?SN_RC_FAILED_SESSION) -> rejected_failed_open_session; returncode_name(?SN_EXCEED_LIMITATION) -> rejected_exceed_limitation; returncode_name(_) -> accepted. - - diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_conn.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_conn.erl deleted file mode 100644 index 9fac0159d..000000000 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_conn.erl +++ /dev/null @@ -1,807 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc MQTT-SN Connection process --module(emqx_sn_conn). - --include_lib("emqx/include/types.hrl"). --include_lib("emqx/include/logger.hrl"). - --logger_header("[SN-Conn]"). - -%% API --export([ start_link/3 - , stop/1 - ]). - --export([ info/1 - , stats/1 - ]). - --export([ call/2 - , call/3 - , cast/2 - ]). - -%% Callback --export([init/4]). - -%% Sys callbacks --export([ system_continue/3 - , system_terminate/4 - , system_code_change/4 - , system_get_state/1 - ]). - -%% Internal callback --export([wakeup_from_hib/2, recvloop/2]). - --import(emqx_misc, [start_timer/2]). - --record(state, { - %% TCP/SSL/UDP/DTLS Wrapped Socket - socket :: {esockd_transport, esockd:socket()} | {udp, _, _}, - %% Peername of the connection - peername :: emqx_types:peername(), - %% Sockname of the connection - sockname :: emqx_types:peername(), - %% Sock State - sockstate :: emqx_types:sockstate(), - %% The {active, N} option - active_n :: pos_integer(), - %% Limiter - limiter :: maybe(emqx_limiter:limiter()), - %% Limit Timer - limit_timer :: maybe(reference()), - %% Parse State - parse_state :: emqx_sn_frame:parse_state(), - %% Serialize options - serialize :: emqx_sn_frame:serialize_opts(), - %% Channel State - channel :: emqx_sn_channel:channel(), - %% GC State - gc_state :: maybe(emqx_gc:gc_state()), - %% Stats Timer - stats_timer :: disabled | maybe(reference()), - %% Idle Timeout - idle_timeout :: integer(), - %% Idle Timer - idle_timer :: maybe(reference()), - %% OOM Policy - oom_policy :: maybe(emqx_types:oom_policy()) - }). - --type(state() :: #state{}). - --define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). --define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). --define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). - --define(ENABLED(X), (X =/= undefined)). - --define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}). --define(DEFAULT_IDLE_TIMEOUT, 30000). - --dialyzer({nowarn_function, - [ system_terminate/4 - , handle_call/3 - , handle_msg/2 - , shutdown/3 - , stop/3 - , parse_incoming/3 - ]}). - -%% udp -start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) -> - Args = [self(), Socket, Peername, Options], - {ok, proc_lib:spawn_link(?MODULE, init, Args)}; - -%% tcp/ssl/dtls -start_link(esockd_transport, Sock, Options) -> - Socket = {esockd_transport, Sock}, - case esockd_transport:peername(Sock) of - {ok, Peername} -> - Args = [self(), Socket, Peername, Options], - {ok, proc_lib:spawn_link(?MODULE, init, Args)}; - R = {error, _} -> R - end. - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -%% @doc Get infos of the connection/channel. --spec(info(pid()|state()) -> emqx_types:infos()). -info(CPid) when is_pid(CPid) -> - call(CPid, info); -info(State = #state{channel = Channel}) -> - ChanInfo = emqx_sn_channel:info(Channel), - SockInfo = maps:from_list( - info(?INFO_KEYS, State)), - ChanInfo#{sockinfo => SockInfo}. - -info(Keys, State) when is_list(Keys) -> - [{Key, info(Key, State)} || Key <- Keys]; -info(socktype, #state{socket = Socket}) -> - esockd_type(Socket); -info(peername, #state{peername = Peername}) -> - Peername; -info(sockname, #state{sockname = Sockname}) -> - Sockname; -info(sockstate, #state{sockstate = SockSt}) -> - SockSt; -info(active_n, #state{active_n = ActiveN}) -> - ActiveN. - --spec(stats(pid()|state()) -> emqx_types:stats()). -stats(CPid) when is_pid(CPid) -> - call(CPid, stats); -stats(#state{socket = Socket, - channel = Channel}) -> - SockStats = case esockd_getstat(Socket, ?SOCK_STATS) of - {ok, Ss} -> Ss; - {error, _} -> [] - end, - ConnStats = emqx_pd:get_counters(?CONN_STATS), - ChanStats = emqx_sn_channel:stats(Channel), - ProcStats = emqx_misc:proc_stats(), - lists:append([SockStats, ConnStats, ChanStats, ProcStats]). - -call(Pid, Req) -> - call(Pid, Req, infinity). - -call(Pid, Req, Timeout) -> - gen_server:call(Pid, Req, Timeout). - -cast(Pid, Req) -> - gen_server:cast(Pid, Req). - -stop(Pid) -> - gen_server:stop(Pid). - -%%-------------------------------------------------------------------- -%% Wrapped funcs -%%-------------------------------------------------------------------- - -esockd_wait(Socket = {udp, _SockPid, _Sock}) -> - {ok, Socket}; -esockd_wait({esockd_transport, Sock}) -> - case esockd_transport:wait(Sock) of - {ok, NSock} -> {ok, {esockd_transport, NSock}}; - R = {error, _} -> R - end. - -esockd_close({udp, _SockPid, _Sock}) -> - %% nothing to do for udp socket - %%gen_udp:close(Sock); - ok; -esockd_close({esockd_transport, Sock}) -> - esockd_transport:fast_close(Sock). - -esockd_ensure_ok_or_exit(peercert, {udp, _SockPid, _Sock}) -> - nossl; -esockd_ensure_ok_or_exit(Fun, {udp, _SockPid, Sock}) -> - esockd_transport:ensure_ok_or_exit(Fun, [Sock]); -esockd_ensure_ok_or_exit(Fun, {esockd_transport, Socket}) -> - esockd_transport:ensure_ok_or_exit(Fun, [Socket]). - -esockd_type({udp, _, _}) -> - udp; -esockd_type({esockd_transport, Socket}) -> - esockd_transport:type(Socket). - -esockd_setopts({udp, _, _}, _) -> - ok; -esockd_setopts({esockd_transport, Socket}, Opts) -> - %% FIXME: DTLS works?? - esockd_transport:setopts(Socket, Opts). - -esockd_getstat({udp, _SockPid, Sock}, Stats) -> - inet:getstat(Sock, Stats); -esockd_getstat({esockd_transport, Sock}, Stats) -> - esockd_transport:getstat(Sock, Stats). - -esockd_send(Data, #state{socket = {udp, _SockPid, Sock}, - peername = {Ip, Port}}) -> - gen_udp:send(Sock, Ip, Port, Data); -esockd_send(Data, #state{socket = {esockd_transport, Sock}}) -> - esockd_transport:async_send(Sock, Data). - -%%-------------------------------------------------------------------- -%% callbacks -%%-------------------------------------------------------------------- - -init(Parent, WrappedSock, Peername, Options) -> - case esockd_wait(WrappedSock) of - {ok, NWrappedSock} -> - run_loop(Parent, init_state(NWrappedSock, Peername, Options)); - {error, Reason} -> - ok = esockd_close(WrappedSock), - exit_on_sock_error(Reason) - end. - -init_state(WrappedSock, Peername, Options) -> - {ok, Sockname} = esockd_ensure_ok_or_exit(sockname, WrappedSock), - Peercert = esockd_ensure_ok_or_exit(peercert, WrappedSock), - ConnInfo = #{socktype => esockd_type(WrappedSock), - peername => Peername, - sockname => Sockname, - peercert => Peercert, - conn_mod => ?MODULE - }, - ActiveN = emqx_gateway_utils:active_n(Options), - %% FIXME: - %%Limiter = emqx_limiter:init(Options), - Limiter = undefined, - FrameOpts = emqx_gateway_utils:frame_options(Options), - ParseState = emqx_sn_frame:initial_parse_state(FrameOpts), - Serialize = emqx_sn_frame:serialize_opts(), - Channel = emqx_sn_channel:init(ConnInfo, Options), - GcState = emqx_gateway_utils:init_gc_state(Options), - StatsTimer = emqx_gateway_utils:stats_timer(Options), - IdleTimeout = emqx_gateway_utils:idle_timeout(Options), - OomPolicy = emqx_gateway_utils:oom_policy(Options), - IdleTimer = start_timer(IdleTimeout, idle_timeout), - #state{socket = WrappedSock, - peername = Peername, - sockname = Sockname, - sockstate = idle, - active_n = ActiveN, - limiter = Limiter, - parse_state = ParseState, - serialize = Serialize, - channel = Channel, - gc_state = GcState, - stats_timer = StatsTimer, - idle_timeout = IdleTimeout, - idle_timer = IdleTimer, - oom_policy = OomPolicy - }. - -run_loop(Parent, State = #state{socket = Socket, - peername = Peername, - oom_policy = OomPolicy - }) -> - emqx_logger:set_metadata_peername(esockd:format(Peername)), - _ = emqx_misc:tune_heap_size(OomPolicy), - case activate_socket(State) of - {ok, NState} -> - hibernate(Parent, NState); - {error, Reason} -> - ok = esockd_close(Socket), - exit_on_sock_error(Reason) - end. - --spec exit_on_sock_error(atom()) -> no_return(). -exit_on_sock_error(Reason) when Reason =:= einval; - Reason =:= enotconn; - Reason =:= closed -> - erlang:exit(normal); -exit_on_sock_error(timeout) -> - erlang:exit({shutdown, ssl_upgrade_timeout}); -exit_on_sock_error(Reason) -> - erlang:exit({shutdown, Reason}). - -%%-------------------------------------------------------------------- -%% Recv Loop - -recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) -> - receive - Msg -> - handle_recv(Msg, Parent, State) - after - IdleTimeout + 100 -> - hibernate(Parent, cancel_stats_timer(State)) - end. - -handle_recv({system, From, Request}, Parent, State) -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State); -handle_recv({'EXIT', Parent, Reason}, Parent, State) -> - %% FIXME: it's not trapping exit, should never receive an EXIT - terminate(Reason, State); -handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) -> - case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of - {ok, NewState} -> - ?MODULE:recvloop(Parent, NewState); - {stop, Reason, NewSate} -> - terminate(Reason, NewSate) - end. - -hibernate(Parent, State) -> - proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]). - -%% Maybe do something here later. -wakeup_from_hib(Parent, State) -> - ?MODULE:recvloop(Parent, State). - -%%-------------------------------------------------------------------- -%% Ensure/cancel stats timer - -ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) -> - State#state{stats_timer = start_timer(Timeout, emit_stats)}; -ensure_stats_timer(_Timeout, State) -> State. - -cancel_stats_timer(State = #state{stats_timer = TRef}) - when is_reference(TRef) -> - ok = emqx_misc:cancel_timer(TRef), - State#state{stats_timer = undefined}; -cancel_stats_timer(State) -> State. - -%%-------------------------------------------------------------------- -%% Process next Msg - -process_msg([], State) -> - {ok, State}; -process_msg([Msg|More], State) -> - try - case handle_msg(Msg, State) of - ok -> - process_msg(More, State); - {ok, NState} -> - process_msg(More, NState); - {ok, Msgs, NState} -> - process_msg(append_msg(More, Msgs), NState); - {stop, Reason, NState} -> - {stop, Reason, NState} - end - catch - exit : normal -> - {stop, normal, State}; - exit : shutdown -> - {stop, shutdown, State}; - exit : {shutdown, _} = Shutdown -> - {stop, Shutdown, State}; - Exception : Context : Stack -> - {stop, #{exception => Exception, - context => Context, - stacktrace => Stack}, State} - end. - -append_msg([], Msgs) when is_list(Msgs) -> - Msgs; -append_msg([], Msg) -> [Msg]; -append_msg(Q, Msgs) when is_list(Msgs) -> - lists:append(Q, Msgs); -append_msg(Q, Msg) -> - lists:append(Q, [Msg]). - -%%-------------------------------------------------------------------- -%% Handle a Msg - -handle_msg({'$gen_call', From, Req}, State) -> - case handle_call(From, Req, State) of - {reply, Reply, NState} -> - gen_server:reply(From, Reply), - {ok, NState}; - {reply, Reply, Msgs, NState} -> - gen_server:reply(From, Reply), - {ok, next_msgs(Msgs), NState}; - {stop, Reason, Reply, NState} -> - gen_server:reply(From, Reply), - stop(Reason, NState) - end; - -handle_msg({'$gen_cast', Req}, State) -> - with_channel(handle_cast, [Req], State); - -handle_msg({datagram, _SockPid, Data}, State) -> - parse_incoming(Data, State); - -handle_msg({Inet, _Sock, Data}, State) - when Inet == tcp; - Inet == ssl -> - parse_incoming(Data, State); - -handle_msg({incoming, Packet}, - State = #state{idle_timer = IdleTimer}) -> - IdleTimer /= undefined andalso - emqx_misc:cancel_timer(IdleTimer), - NState = State#state{idle_timer = undefined}, - handle_incoming(Packet, NState); - -handle_msg({outgoing, Data}, State) -> - handle_outgoing(Data, State); - -handle_msg({Error, _Sock, Reason}, State) - when Error == tcp_error; Error == ssl_error -> - handle_info({sock_error, Reason}, State); - -handle_msg({Closed, _Sock}, State) - when Closed == tcp_closed; Closed == ssl_closed -> - handle_info({sock_closed, Closed}, close_socket(State)); - -%% TODO: udp_passive??? -handle_msg({Passive, _Sock}, State) - when Passive == tcp_passive; Passive == ssl_passive -> - %% In Stats - Bytes = emqx_pd:reset_counter(incoming_bytes), - Pubs = emqx_pd:reset_counter(incoming_pkt), - InStats = #{cnt => Pubs, oct => Bytes}, - %% Ensure Rate Limit - NState = ensure_rate_limit(InStats, State), - %% Run GC and Check OOM - NState1 = check_oom(run_gc(InStats, NState)), - handle_info(activate_socket, NState1); - -handle_msg(Deliver = {deliver, _Topic, _Msg}, - State = #state{active_n = ActiveN}) -> - Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], - with_channel(handle_deliver, [Delivers], State); - -%% Something sent -%% TODO: Who will deliver this message? -handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> - case emqx_pd:get_counter(outgoing_pkt) > ActiveN of - true -> - Pubs = emqx_pd:reset_counter(outgoing_pkt), - Bytes = emqx_pd:reset_counter(outgoing_bytes), - OutStats = #{cnt => Pubs, oct => Bytes}, - {ok, check_oom(run_gc(OutStats, State))}; - false -> ok - end; - -handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> - handle_info({sock_error, Reason}, State); - -handle_msg({close, Reason}, State) -> - ?LOG(debug, "Force to close the socket due to ~p", [Reason]), - handle_info({sock_closed, Reason}, close_socket(State)); - -handle_msg({event, connected}, State = #state{channel = Channel}) -> - Ctx = emqx_sn_channel:info(ctx, Channel), - ClientId = emqx_sn_channel:info(clientid, Channel), - emqx_gateway_ctx:insert_channel_info( - Ctx, - ClientId, - info(State), - stats(State) - ); - -handle_msg({event, disconnected}, State = #state{channel = Channel}) -> - Ctx = emqx_sn_channel:info(ctx, Channel), - ClientId = emqx_sn_channel:info(clientid, Channel), - emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)), - emqx_gateway_ctx:connection_closed(Ctx, ClientId), - {ok, State}; - -handle_msg({event, _Other}, State = #state{channel = Channel}) -> - Ctx = emqx_sn_channel:info(ctx, Channel), - ClientId = emqx_sn_channel:info(clientid, Channel), - emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)), - emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)), - {ok, State}; - -handle_msg({timeout, TRef, TMsg}, State) -> - handle_timeout(TRef, TMsg, State); - -handle_msg(Shutdown = {shutdown, _Reason}, State) -> - stop(Shutdown, State); - -handle_msg(Msg, State) -> - handle_info(Msg, State). - -%%-------------------------------------------------------------------- -%% Terminate - --spec terminate(atom(), state()) -> no_return(). -terminate(Reason, State = #state{channel = Channel}) -> - ?LOG(debug, "Terminated due to ~p", [Reason]), - _ = emqx_sn_channel:terminate(Reason, Channel), - _ = close_socket(State), - exit(Reason). - -%%-------------------------------------------------------------------- -%% Sys callbacks - -system_continue(Parent, _Debug, State) -> - recvloop(Parent, State). - -system_terminate(Reason, _Parent, _Debug, State) -> - terminate(Reason, State). - -system_code_change(State, _Mod, _OldVsn, _Extra) -> - {ok, State}. - -system_get_state(State) -> {ok, State}. - -%%-------------------------------------------------------------------- -%% Handle call - -handle_call(_From, info, State) -> - {reply, info(State), State}; - -handle_call(_From, stats, State) -> - {reply, stats(State), State}; - -handle_call(_From, Req, State = #state{channel = Channel}) -> - case emqx_sn_channel:handle_call(Req, Channel) of - {reply, Reply, NChannel} -> - {reply, Reply, State#state{channel = NChannel}}; - {reply, Reply, Replies, NChannel} -> - {reply, Reply, Replies, State#state{channel = NChannel}}; - {shutdown, Reason, Reply, NChannel} -> - shutdown(Reason, Reply, State#state{channel = NChannel}) - end. - -%%-------------------------------------------------------------------- -%% Handle timeout - -handle_timeout(_TRef, idle_timeout, State) -> - shutdown(idle_timeout, State); - -handle_timeout(_TRef, limit_timeout, State) -> - NState = State#state{sockstate = idle, - limit_timer = undefined - }, - handle_info(activate_socket, NState); -handle_timeout(TRef, keepalive, State = #state{socket = Socket, - channel = Channel})-> - case emqx_sn_channel:info(conn_state, Channel) of - disconnected -> {ok, State}; - _ -> - case esockd_getstat(Socket, [recv_oct]) of - {ok, [{recv_oct, RecvOct}]} -> - handle_timeout(TRef, {keepalive, RecvOct}, State); - {error, Reason} -> - handle_info({sock_error, Reason}, State) - end - end; -handle_timeout(_TRef, emit_stats, State = - #state{channel = Channel}) -> - Ctx = emqx_sn_channel:info(ctx, Channel), - ClientId = emqx_sn_channel:info(clientid, Channel), - emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)), - {ok, State#state{stats_timer = undefined}}; - -handle_timeout(TRef, Msg, State) -> - with_channel(handle_timeout, [TRef, Msg], State). - -%%-------------------------------------------------------------------- -%% Parse incoming data - -parse_incoming(Data, State = #state{channel = Channel}) -> - ?LOG(debug, "RECV ~0p", [Data]), - Oct = iolist_size(Data), - inc_counter(incoming_bytes, Oct), - Ctx = emqx_sn_channel:info(ctx, Channel), - ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.received', Oct), - {Packets, NState} = parse_incoming(Data, [], State), - {ok, next_incoming_msgs(Packets), NState}. - -parse_incoming(<<>>, Packets, State) -> - {Packets, State}; - -parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> - try emqx_sn_frame:parse(Data, ParseState) of - {more, NParseState} -> - {Packets, State#state{parse_state = NParseState}}; - {ok, Packet, Rest, NParseState} -> - NState = State#state{parse_state = NParseState}, - parse_incoming(Rest, [Packet|Packets], NState) - catch - error:Reason:Stk -> - ?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p", - [Reason, Stk, Data]), - {[{frame_error, Reason}|Packets], State} - end. - -next_incoming_msgs([Packet]) -> - {incoming, Packet}; -next_incoming_msgs(Packets) -> - [{incoming, Packet} || Packet <- lists:reverse(Packets)]. - -%%-------------------------------------------------------------------- -%% Handle incoming packet - -handle_incoming(Packet, State) -> - ok = inc_incoming_stats(Packet), - ?LOG(debug, "RECV ~s", [emqx_sn_frame:format(Packet)]), - with_channel(handle_in, [Packet], State). - -%%-------------------------------------------------------------------- -%% With Channel - -with_channel(Fun, Args, State = #state{channel = Channel}) -> - case erlang:apply(emqx_sn_channel, Fun, Args ++ [Channel]) of - ok -> {ok, State}; - {ok, NChannel} -> - {ok, State#state{channel = NChannel}}; - {ok, Replies, NChannel} -> - {ok, next_msgs(Replies), State#state{channel = NChannel}}; - {shutdown, Reason, NChannel} -> - shutdown(Reason, State#state{channel = NChannel}); - {shutdown, Reason, Packet, NChannel} -> - NState = State#state{channel = NChannel}, - ok = handle_outgoing(Packet, NState), - shutdown(Reason, NState) - end. - -%%-------------------------------------------------------------------- -%% Handle outgoing packets - -handle_outgoing(Packets, State) when is_list(Packets) -> - send(lists:map(serialize_and_inc_stats_fun(State), Packets), State); - -handle_outgoing(Packet, State) -> - send((serialize_and_inc_stats_fun(State))(Packet), State). - -serialize_and_inc_stats_fun(#state{serialize = Serialize, channel = Channel}) -> - Ctx = emqx_sn_channel:info(ctx, Channel), - fun(Packet) -> - case emqx_sn_frame:serialize_pkt(Packet, Serialize) of - <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", - [emqx_sn_frame:format(Packet)]), - ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'), - ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'), - <<>>; - Data -> ?LOG(debug, "SEND ~s", [emqx_sn_frame:format(Packet)]), - ok = inc_outgoing_stats(Packet), - Data - end - end. - -%%-------------------------------------------------------------------- -%% Send data - --spec(send(iodata(), state()) -> ok). -send(IoData, State = #state{socket = Socket, channel = Channel}) -> - Ctx = emqx_sn_channel:info(ctx, Channel), - Oct = iolist_size(IoData), - ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.sent', Oct), - inc_counter(outgoing_bytes, Oct), - case esockd_send(IoData, State) of - ok -> ok; - Error = {error, _Reason} -> - %% Send an inet_reply to postpone handling the error - self() ! {inet_reply, Socket, Error}, - ok - end. - -%%-------------------------------------------------------------------- -%% Handle Info - -handle_info(activate_socket, State = #state{sockstate = OldSst}) -> - case activate_socket(State) of - {ok, NState = #state{sockstate = NewSst}} -> - if OldSst =/= NewSst -> - {ok, {event, NewSst}, NState}; - true -> {ok, NState} - end; - {error, Reason} -> - handle_info({sock_error, Reason}, State) - end; - -handle_info({sock_error, Reason}, State) -> - ?LOG(debug, "Socket error: ~p", [Reason]), - handle_info({sock_closed, Reason}, close_socket(State)); - -handle_info(Info, State) -> - with_channel(handle_info, [Info], State). - -%%-------------------------------------------------------------------- -%% Ensure rate limit - -ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> - case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of - false -> State; - {ok, Limiter1} -> - State#state{limiter = Limiter1}; - {pause, Time, Limiter1} -> - ?LOG(warning, "Pause ~pms due to rate limit", [Time]), - TRef = start_timer(Time, limit_timeout), - State#state{sockstate = blocked, - limiter = Limiter1, - limit_timer = TRef - } - end. - -%%-------------------------------------------------------------------- -%% Run GC and Check OOM - -run_gc(Stats, State = #state{gc_state = GcSt}) -> - case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of - false -> State; - {_IsGC, GcSt1} -> - State#state{gc_state = GcSt1} - end. - -check_oom(State = #state{oom_policy = OomPolicy}) -> - case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of - {shutdown, Reason} -> - %% triggers terminate/2 callback immediately - erlang:exit({shutdown, Reason}); - _Other -> ok - end, - State. - -%%-------------------------------------------------------------------- -%% Activate Socket - -activate_socket(State = #state{sockstate = closed}) -> - {ok, State}; -activate_socket(State = #state{sockstate = blocked}) -> - {ok, State}; -activate_socket(State = #state{socket = Socket, - active_n = N}) -> - %% FIXME: Works on dtls/udp ??? - %% How to hanlde buffer? - case esockd_setopts(Socket, [{active, N}]) of - ok -> {ok, State#state{sockstate = running}}; - Error -> Error - end. - -%%-------------------------------------------------------------------- -%% Close Socket - -close_socket(State = #state{sockstate = closed}) -> State; -close_socket(State = #state{socket = Socket}) -> - ok = esockd_close(Socket), - State#state{sockstate = closed}. - -%%-------------------------------------------------------------------- -%% Inc incoming/outgoing stats - -%% XXX: How to stats? -inc_incoming_stats(_Packet) -> - inc_counter(recv_pkt, 1), - ok. - %case Type =:= ?CMD_SEND of - % true -> - % inc_counter(recv_msg, 1), - % inc_counter(incoming_pubs, 1); - % false -> - % ok - %end, - %emqx_metrics:inc_recv(Packet). - -inc_outgoing_stats(_Packet) -> - inc_counter(send_pkt, 1), - ok. - %case Type =:= ?CMD_MESSAGE of - % true -> - % inc_counter(send_msg, 1), - % inc_counter(outgoing_pubs, 1); - % false -> - % ok - %end, - %emqx_metrics:inc_sent(Packet). - -%%-------------------------------------------------------------------- -%% Helper functions - --compile({inline, [next_msgs/1]}). -next_msgs(Event) when is_tuple(Event) -> - Event; -next_msgs(More) when is_list(More) -> - More. - --compile({inline, [shutdown/2, shutdown/3]}). -shutdown(Reason, State) -> - stop({shutdown, Reason}, State). - -shutdown(Reason, Reply, State) -> - stop({shutdown, Reason}, Reply, State). - --compile({inline, [stop/2, stop/3]}). -stop(Reason, State) -> - {stop, Reason, State}. - -stop(Reason, Reply, State) -> - {stop, Reason, Reply, State}. - -inc_counter(Name, Value) -> - _ = emqx_pd:inc_counter(Name, Value), - ok. diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl index c9b9b137d..343232854 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl @@ -15,8 +15,11 @@ %% limitations under the License. %%-------------------------------------------------------------------- +%% @doc The frame parser for MQTT-SN protocol -module(emqx_sn_frame). +-behavior(emqx_gateway_frame). + -include("src/mqttsn/include/emqx_sn.hrl"). -export([ initial_parse_state/1 diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index 7fe257d5c..411ec7bb4 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -128,8 +128,13 @@ start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) -> Name = name(InstaId, Type), + NCfg = Cfg#{ + ctx => Ctx, + frame_mod => emqx_sn_frame, + chann_mod => emqx_sn_channel + }, esockd:open_udp(Name, ListenOn, merge_default(SocketOpts), - {emqx_sn_conn, start_link, [Cfg#{ctx => Ctx}]}). + {emqx_gateway_conn, start_link, [NCfg]}). name(InstaId, Type) -> list_to_atom(lists:concat([InstaId, ":", Type])). diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index d4b4c7fe1..e071df433 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -16,6 +16,8 @@ -module(emqx_stomp_channel). +-behavior(emqx_gateway_channel). + -include("src/stomp/include/emqx_stomp.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -596,6 +598,16 @@ handle_call(Req, Channel) -> ?LOG(error, "Unexpected call: ~p", [Req]), reply(ignored, Channel). + +%%-------------------------------------------------------------------- +%% Handle cast +%%-------------------------------------------------------------------- + +-spec handle_cast(Req :: term(), channel()) + -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}. +handle_cast(_Req, Channel) -> + {ok, Channel}. + %%-------------------------------------------------------------------- %% Handle Info %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_connection.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_connection.erl deleted file mode 100644 index 82653af4e..000000000 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_connection.erl +++ /dev/null @@ -1,905 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_stomp_connection). - --include("src/stomp/include/emqx_stomp.hrl"). --include_lib("emqx/include/logger.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). - --logger_header("[Stomp-Conn]"). - -%% API --export([ start_link/3 - , stop/1 - ]). - --export([ info/1 - , stats/1 - ]). - --export([ async_set_keepalive/3 - , async_set_keepalive/4 - , async_set_socket_options/2 - ]). - --export([ call/2 - , call/3 - , cast/2 - ]). - -%% Callback --export([init/4]). - -%% Sys callbacks --export([ system_continue/3 - , system_terminate/4 - , system_code_change/4 - , system_get_state/1 - ]). - -%% Internal callback --export([wakeup_from_hib/2, recvloop/2, get_state/1]). - -%% Export for CT --export([set_field/3]). - --import(emqx_misc, - [ maybe_apply/2 - ]). - --record(state, { - %% TCP/TLS Transport - transport :: esockd:transport(), - %% TCP/TLS Socket - socket :: esockd:socket(), - %% Peername of the connection - peername :: emqx_types:peername(), - %% Sockname of the connection - sockname :: emqx_types:peername(), - %% Sock State - sockstate :: emqx_types:sockstate(), - %% The {active, N} option - active_n :: pos_integer(), - %% Limiter - limiter :: emqx_limiter:limiter() | undefined, - %% Limit Timer - limit_timer :: reference() | undefined, - %% Parse State - parse_state :: emqx_stomp_frame:parse_state(), - %% Serialize options - serialize :: emqx_stomp_frame:serialize_opts(), - %% Channel State - channel :: emqx_stomp_channel:channel(), - %% GC State - gc_state :: emqx_gc:gc_state() | undefined, - %% Stats Timer - stats_timer :: disabled | reference() | undefined, - %% Idle Timeout - idle_timeout :: integer(), - %% Idle Timer - idle_timer :: reference() | undefined, - %% OOM Policy - oom_policy :: emqx_types:oom_policy() | undefined - }). - --type(state() :: #state{}). - --define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). --define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). --define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). - --define(ENABLED(X), (X =/= undefined)). - -%-define(ALARM_TCP_CONGEST(Channel), -% list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s", -% [emqx_stomp_channel:info(clientid, Channel), -% emqx_stomp_channel:info(username, Channel)]))). -%-define(ALARM_CONN_INFO_KEYS, [ -% socktype, sockname, peername, -% clientid, username, proto_name, proto_ver, connected_at -%]). -%-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]). -%-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). - --dialyzer({no_match, [info/2]}). --dialyzer({nowarn_function, [ init/4 - , init_state/3 - , run_loop/2 - , system_terminate/4 - , system_code_change/4 - ]}). - --dialyzer({no_match, [ handle_call/3 - , serialize_and_inc_stats_fun/1 - ]}). - --spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist()) - -> {ok, pid()}). -start_link(Transport, Socket, Options) -> - Args = [self(), Transport, Socket, Options], - CPid = proc_lib:spawn_link(?MODULE, init, Args), - {ok, CPid}. - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -%% @doc Get infos of the connection/channel. --spec(info(pid()|state()) -> emqx_types:infos()). -info(CPid) when is_pid(CPid) -> - call(CPid, info); -info(State = #state{channel = Channel}) -> - ChanInfo = emqx_stomp_channel:info(Channel), - SockInfo = maps:from_list( - info(?INFO_KEYS, State)), - ChanInfo#{sockinfo => SockInfo}. - -info(Keys, State) when is_list(Keys) -> - [{Key, info(Key, State)} || Key <- Keys]; -info(socktype, #state{transport = Transport, socket = Socket}) -> - Transport:type(Socket); -info(peername, #state{peername = Peername}) -> - Peername; -info(sockname, #state{sockname = Sockname}) -> - Sockname; -info(sockstate, #state{sockstate = SockSt}) -> - SockSt; -info(active_n, #state{active_n = ActiveN}) -> - ActiveN; -info(stats_timer, #state{stats_timer = StatsTimer}) -> - StatsTimer; -info(limit_timer, #state{limit_timer = LimitTimer}) -> - LimitTimer; -info(limiter, #state{limiter = Limiter}) -> - maybe_apply(fun emqx_limiter:info/1, Limiter). - -%% @doc Get stats of the connection/channel. --spec(stats(pid()|state()) -> emqx_types:stats()). -stats(CPid) when is_pid(CPid) -> - call(CPid, stats); -stats(#state{transport = Transport, - socket = Socket, - channel = Channel}) -> - SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of - {ok, Ss} -> Ss; - {error, _} -> [] - end, - ConnStats = emqx_pd:get_counters(?CONN_STATS), - ChanStats = emqx_stomp_channel:stats(Channel), - ProcStats = emqx_misc:proc_stats(), - lists:append([SockStats, ConnStats, ChanStats, ProcStats]). - -%% @doc Set TCP keepalive socket options to override system defaults. -%% Idle: The number of seconds a connection needs to be idle before -%% TCP begins sending out keep-alive probes (Linux default 7200). -%% Interval: The number of seconds between TCP keep-alive probes -%% (Linux default 75). -%% Probes: The maximum number of TCP keep-alive probes to send before -%% giving up and killing the connection if no response is -%% obtained from the other end (Linux default 9). -%% -%% NOTE: This API sets TCP socket options, which has nothing to do with -%% the MQTT layer's keepalive (PINGREQ and PINGRESP). -async_set_keepalive(Idle, Interval, Probes) -> - async_set_keepalive(self(), Idle, Interval, Probes). - -async_set_keepalive(Pid, Idle, Interval, Probes) -> - Options = [ {keepalive, true} - , {raw, 6, 4, <>} - , {raw, 6, 5, <>} - , {raw, 6, 6, <>} - ], - async_set_socket_options(Pid, Options). - -%% @doc Set custom socket options. -%% This API is made async because the call might be originated from -%% a hookpoint callback (otherwise deadlock). -%% If failed to set, the error message is logged. -async_set_socket_options(Pid, Options) -> - cast(Pid, {async_set_socket_options, Options}). - -cast(Pid, Req) -> - gen_server:cast(Pid, Req). - -call(Pid, Req) -> - call(Pid, Req, infinity). -call(Pid, Req, Timeout) -> - gen_server:call(Pid, Req, Timeout). - -stop(Pid) -> - gen_server:stop(Pid). - -%%-------------------------------------------------------------------- -%% callbacks -%%-------------------------------------------------------------------- - -init(Parent, Transport, RawSocket, Options) -> - case Transport:wait(RawSocket) of - {ok, Socket} -> - run_loop(Parent, init_state(Transport, Socket, Options)); - {error, Reason} -> - ok = Transport:fast_close(RawSocket), - exit_on_sock_error(Reason) - end. - -init_state(Transport, Socket, Options) -> - {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), - {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), - Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]), - ConnInfo = #{socktype => Transport:type(Socket), - peername => Peername, - sockname => Sockname, - peercert => Peercert, - conn_mod => ?MODULE - }, - ActiveN = emqx_gateway_utils:active_n(Options), - %% TODO: RateLimit ? How ? - Limiter = undefined, - %RateLimit = emqx_gateway_utils:ratelimit(Options), - %%Limiter = emqx_limiter:init(Zone, RateLimit), - FrameOpts = emqx_gateway_utils:frame_options(Options), - ParseState = emqx_stomp_frame:initial_parse_state(FrameOpts), - Serialize = emqx_stomp_frame:serialize_opts(), - Channel = emqx_stomp_channel:init(ConnInfo, Options), - GcState = emqx_gateway_utils:init_gc_state(Options), - StatsTimer = emqx_gateway_utils:stats_timer(Options), - IdleTimeout = emqx_gateway_utils:idle_timeout(Options), - OomPolicy = emqx_gateway_utils:oom_policy(Options), - IdleTimer = emqx_misc:start_timer(IdleTimeout, idle_timeout), - #state{transport = Transport, - socket = Socket, - peername = Peername, - sockname = Sockname, - sockstate = idle, - active_n = ActiveN, - limiter = Limiter, - parse_state = ParseState, - serialize = Serialize, - channel = Channel, - gc_state = GcState, - stats_timer = StatsTimer, - idle_timeout = IdleTimeout, - idle_timer = IdleTimer, - oom_policy = OomPolicy - }. - -run_loop(Parent, State = #state{transport = Transport, - socket = Socket, - peername = Peername, - oom_policy = OomPolicy}) -> - emqx_logger:set_metadata_peername(esockd:format(Peername)), - _ = emqx_misc:tune_heap_size(OomPolicy), - case activate_socket(State) of - {ok, NState} -> hibernate(Parent, NState); - {error, Reason} -> - ok = Transport:fast_close(Socket), - exit_on_sock_error(Reason) - end. - --spec exit_on_sock_error(any()) -> no_return(). -exit_on_sock_error(Reason) when Reason =:= einval; - Reason =:= enotconn; - Reason =:= closed -> - erlang:exit(normal); -exit_on_sock_error(timeout) -> - erlang:exit({shutdown, ssl_upgrade_timeout}); -exit_on_sock_error(Reason) -> - erlang:exit({shutdown, Reason}). - -%%-------------------------------------------------------------------- -%% Recv Loop - -recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) -> - receive - Msg -> - handle_recv(Msg, Parent, State) - after - IdleTimeout + 100 -> - hibernate(Parent, cancel_stats_timer(State)) - end. - -handle_recv({system, From, Request}, Parent, State) -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State); -handle_recv({'EXIT', Parent, Reason}, Parent, State) -> - %% FIXME: it's not trapping exit, should never receive an EXIT - terminate(Reason, State); -handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) -> - case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of - {ok, NewState} -> - ?MODULE:recvloop(Parent, NewState); - {stop, Reason, NewSate} -> - terminate(Reason, NewSate) - end. - -hibernate(Parent, State) -> - proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]). - -%% Maybe do something here later. -wakeup_from_hib(Parent, State) -> - ?MODULE:recvloop(Parent, State). - -%%-------------------------------------------------------------------- -%% Ensure/cancel stats timer - -ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) -> - State#state{stats_timer = emqx_misc:start_timer(Timeout, emit_stats)}; -ensure_stats_timer(_Timeout, State) -> State. - -cancel_stats_timer(State = #state{stats_timer = TRef}) - when is_reference(TRef) -> - ?tp(debug, cancel_stats_timer, #{}), - ok = emqx_misc:cancel_timer(TRef), - State#state{stats_timer = undefined}; -cancel_stats_timer(State) -> State. - -%%-------------------------------------------------------------------- -%% Process next Msg - -process_msg([], State) -> - {ok, State}; -process_msg([Msg|More], State) -> - try - case handle_msg(Msg, State) of - ok -> - process_msg(More, State); - {ok, NState} -> - process_msg(More, NState); - {ok, Msgs, NState} -> - process_msg(append_msg(More, Msgs), NState); - {stop, Reason, NState} -> - {stop, Reason, NState} - end - catch - exit : normal -> - {stop, normal, State}; - exit : shutdown -> - {stop, shutdown, State}; - exit : {shutdown, _} = Shutdown -> - {stop, Shutdown, State}; - Exception : Context : Stack -> - {stop, #{exception => Exception, - context => Context, - stacktrace => Stack}, State} - end. - -append_msg([], Msgs) when is_list(Msgs) -> - Msgs; -append_msg([], Msg) -> [Msg]; -append_msg(Q, Msgs) when is_list(Msgs) -> - lists:append(Q, Msgs); -append_msg(Q, Msg) -> - lists:append(Q, [Msg]). - -%%-------------------------------------------------------------------- -%% Handle a Msg - -handle_msg({'$gen_call', From, Req}, State) -> - case handle_call(From, Req, State) of - {reply, Reply, NState} -> - gen_server:reply(From, Reply), - {ok, NState}; - {stop, Reason, Reply, NState} -> - gen_server:reply(From, Reply), - stop(Reason, NState) - end; -handle_msg({'$gen_cast', Req}, State) -> - NewState = handle_cast(Req, State), - {ok, NewState}; - -handle_msg({Inet, _Sock, Data}, State = #state{channel = Channel}) - when Inet == tcp; - Inet == ssl -> - ?LOG(debug, "RECV ~0p", [Data]), - Oct = iolist_size(Data), - inc_counter(incoming_bytes, Oct), - Ctx = emqx_stomp_channel:info(ctx, Channel), - ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.received', Oct), - parse_incoming(Data, State); - -handle_msg({incoming, Packet}, State = #state{idle_timer = undefined}) -> - handle_incoming(Packet, State); - -handle_msg({incoming, Packet}, - State = #state{idle_timer = IdleTimer}) -> - ok = emqx_misc:cancel_timer(IdleTimer), - %% XXX: Serialize with input packets - %%Serialize = emqx_stomp_frame:serialize_opts(), - NState = State#state{idle_timer = undefined}, - handle_incoming(Packet, NState); - -handle_msg({outgoing, Packets}, State) -> - handle_outgoing(Packets, State); - -handle_msg({Error, _Sock, Reason}, State) - when Error == tcp_error; Error == ssl_error -> - handle_info({sock_error, Reason}, State); - -handle_msg({Closed, _Sock}, State) - when Closed == tcp_closed; Closed == ssl_closed -> - handle_info({sock_closed, Closed}, close_socket(State)); - -handle_msg({Passive, _Sock}, State) - when Passive == tcp_passive; Passive == ssl_passive -> - %% In Stats - Pubs = emqx_pd:reset_counter(incoming_pubs), - Bytes = emqx_pd:reset_counter(incoming_bytes), - InStats = #{cnt => Pubs, oct => Bytes}, - %% Ensure Rate Limit - NState = ensure_rate_limit(InStats, State), - %% Run GC and Check OOM - NState1 = check_oom(run_gc(InStats, NState)), - handle_info(activate_socket, NState1); - -handle_msg(Deliver = {deliver, _Topic, _Msg}, - #state{active_n = ActiveN} = State) -> - Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], - with_channel(handle_deliver, [Delivers], State); - -%% Something sent -handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> - case emqx_pd:get_counter(outgoing_pubs) > ActiveN of - true -> - Pubs = emqx_pd:reset_counter(outgoing_pubs), - Bytes = emqx_pd:reset_counter(outgoing_bytes), - OutStats = #{cnt => Pubs, oct => Bytes}, - {ok, run_gc(OutStats, State)}; - %% FIXME: check oom ??? - %%{ok, check_oom(run_gc(OutStats, State))}; - false -> ok - end; - -handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> - handle_info({sock_error, Reason}, State); - -handle_msg({connack, ConnAck}, State) -> - handle_outgoing(ConnAck, State); - -handle_msg({close, Reason}, State) -> - ?LOG(debug, "Force to close the socket due to ~p", [Reason]), - handle_info({sock_closed, Reason}, close_socket(State)); - -handle_msg({event, connected}, State = #state{channel = Channel}) -> - Ctx = emqx_stomp_channel:info(ctx, Channel), - ClientId = emqx_stomp_channel:info(clientid, Channel), - emqx_gateway_ctx:insert_channel_info( - Ctx, - ClientId, - info(State), - stats(State) - ); - -handle_msg({event, disconnected}, State = #state{channel = Channel}) -> - Ctx = emqx_stomp_channel:info(ctx, Channel), - ClientId = emqx_stomp_channel:info(clientid, Channel), - emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)), - emqx_gateway_ctx:connection_closed(Ctx, ClientId), - {ok, State}; - -handle_msg({event, _Other}, State = #state{channel = Channel}) -> - Ctx = emqx_stomp_channel:info(ctx, Channel), - ClientId = emqx_stomp_channel:info(clientid, Channel), - emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)), - emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)), - {ok, State}; - -handle_msg({timeout, TRef, TMsg}, State) -> - handle_timeout(TRef, TMsg, State); - -handle_msg(Shutdown = {shutdown, _Reason}, State) -> - stop(Shutdown, State); - -handle_msg(Msg, State) -> - handle_info(Msg, State). - -%%-------------------------------------------------------------------- -%% Terminate - --spec terminate(any(), state()) -> no_return(). -terminate(Reason, State = #state{channel = Channel, transport = _Transport, - socket = _Socket}) -> - try - Channel1 = emqx_stomp_channel:set_conn_state(disconnected, Channel), - %emqx_congestion:cancel_alarms(Socket, Transport, Channel1), - emqx_stomp_channel:terminate(Reason, Channel1), - close_socket_ok(State) - catch - E : C : S -> - ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S}) - end, - ?tp(info, terminate, #{reason => Reason}), - maybe_raise_excption(Reason). - -%% close socket, discard new state, always return ok. -close_socket_ok(State) -> - _ = close_socket(State), - ok. - -%% tell truth about the original exception -maybe_raise_excption(#{exception := Exception, - context := Context, - stacktrace := Stacktrace - }) -> - erlang:raise(Exception, Context, Stacktrace); -maybe_raise_excption(Reason) -> - exit(Reason). - -%%-------------------------------------------------------------------- -%% Sys callbacks - -system_continue(Parent, _Debug, State) -> - ?MODULE:recvloop(Parent, State). - -system_terminate(Reason, _Parent, _Debug, State) -> - terminate(Reason, State). - -system_code_change(State, _Mod, _OldVsn, _Extra) -> - {ok, State}. - -system_get_state(State) -> {ok, State}. - -%%-------------------------------------------------------------------- -%% Handle call - -handle_call(_From, info, State) -> - {reply, info(State), State}; - -handle_call(_From, stats, State) -> - {reply, stats(State), State}; - -%% TODO: How to set ratelimit ??? -%%handle_call(_From, {ratelimit, Policy}, State = #state{channel = Channel}) -> -%% Zone = emqx_stomp_channel:info(zone, Channel), -%% Limiter = emqx_limiter:init(Zone, Policy), -%% {reply, ok, State#state{limiter = Limiter}}; - -handle_call(_From, Req, State = #state{channel = Channel}) -> - case emqx_stomp_channel:handle_call(Req, Channel) of - {reply, Reply, NChannel} -> - {reply, Reply, State#state{channel = NChannel}}; - {shutdown, Reason, Reply, NChannel} -> - shutdown(Reason, Reply, State#state{channel = NChannel}); - {shutdown, Reason, Reply, OutPacket, NChannel} -> - NState = State#state{channel = NChannel}, - ok = handle_outgoing(OutPacket, NState), - shutdown(Reason, Reply, NState) - end. - -%%-------------------------------------------------------------------- -%% Handle timeout - -handle_timeout(_TRef, idle_timeout, State) -> - shutdown(idle_timeout, State); - -handle_timeout(_TRef, limit_timeout, State) -> - NState = State#state{sockstate = idle, - limit_timer = undefined - }, - handle_info(activate_socket, NState); - -handle_timeout(_TRef, emit_stats, State = #state{channel = Channel, - transport = _Transport, - socket = _Socket}) -> - %emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel), - Ctx = emqx_stomp_channel:info(ctx, Channel), - ClientId = emqx_stomp_channel:info(clientid, Channel), - emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)), - {ok, State#state{stats_timer = undefined}}; - -%% Abstraction ??? -%handle_timeout(TRef, keepalive, State = #state{transport = Transport, -% socket = Socket, -% channel = Channel})-> -% case emqx_stomp_channel:info(conn_state, Channel) of -% disconnected -> {ok, State}; -% _ -> -% case Transport:getstat(Socket, [recv_oct]) of -% {ok, [{recv_oct, RecvOct}]} -> -% handle_timeout(TRef, {keepalive, RecvOct}, State); -% {error, Reason} -> -% handle_info({sock_error, Reason}, State) -% end -% end; - -handle_timeout(TRef, TMsg, State = #state{transport = Transport, - socket = Socket, - channel = Channel - }) - when TMsg =:= incoming; - TMsg =:= outgoing -> - Stat = case TMsg of incoming -> recv_oct; _ -> send_oct end, - case emqx_stomp_channel:info(conn_state, Channel) of - disconnected -> {ok, State}; - _ -> - case Transport:getstat(Socket, [Stat]) of - {ok, [{recv_oct, RecvOct}]} -> - handle_timeout(TRef, {incoming, RecvOct}, State); - {ok, [{send_oct, SendOct}]} -> - handle_timeout(TRef, {outgoing, SendOct}, State); - {error, Reason} -> - handle_info({sock_error, Reason}, State) - end - end; - -handle_timeout(TRef, Msg, State) -> - with_channel(handle_timeout, [TRef, Msg], State). - -%%-------------------------------------------------------------------- -%% Parse incoming data - -parse_incoming(Data, State) -> - {Packets, NState} = parse_incoming(Data, [], State), - {ok, next_incoming_msgs(Packets), NState}. - -parse_incoming(<<>>, Packets, State) -> - {Packets, State}; - -parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> - try emqx_stomp_frame:parse(Data, ParseState) of - {more, NParseState} -> - {Packets, State#state{parse_state = NParseState}}; - {ok, Packet, Rest, NParseState} -> - NState = State#state{parse_state = NParseState}, - parse_incoming(Rest, [Packet|Packets], NState) - catch - error:Reason:Stk -> - ?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p", - [Reason, Stk, Data]), - {[{frame_error, Reason}|Packets], State} - end. - -next_incoming_msgs([Packet]) -> - {incoming, Packet}; -next_incoming_msgs(Packets) -> - [{incoming, Packet} || Packet <- lists:reverse(Packets)]. - -%%-------------------------------------------------------------------- -%% Handle incoming packet - -handle_incoming(Packet, State) when is_record(Packet, stomp_frame) -> - ok = inc_incoming_stats(Packet), - ?LOG(debug, "RECV ~s", [emqx_stomp_frame:format(Packet)]), - with_channel(handle_in, [Packet], State); - -handle_incoming(FrameError, State) -> - with_channel(handle_in, [FrameError], State). - -%%-------------------------------------------------------------------- -%% With Channel - -with_channel(Fun, Args, State = #state{channel = Channel}) -> - case erlang:apply(emqx_stomp_channel, Fun, Args ++ [Channel]) of - ok -> {ok, State}; - {ok, NChannel} -> - {ok, State#state{channel = NChannel}}; - {ok, Replies, NChannel} -> - {ok, next_msgs(Replies), State#state{channel = NChannel}}; - {shutdown, Reason, NChannel} -> - shutdown(Reason, State#state{channel = NChannel}); - {shutdown, Reason, Packet, NChannel} -> - NState = State#state{channel = NChannel}, - ok = handle_outgoing(Packet, NState), - shutdown(Reason, NState) - end. - -%%-------------------------------------------------------------------- -%% Handle outgoing packets - -handle_outgoing(Packets, State) when is_list(Packets) -> - send(lists:map(serialize_and_inc_stats_fun(State), Packets), State); - -handle_outgoing(Packet, State) -> - send((serialize_and_inc_stats_fun(State))(Packet), State). - -serialize_and_inc_stats_fun(#state{serialize = Serialize, channel = Channel}) -> - Ctx = emqx_stomp_channel:info(ctx, Channel), - fun(Packet) -> - case emqx_stomp_frame:serialize_pkt(Packet, Serialize) of - <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", - [emqx_stomp_frame:format(Packet)]), - ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'), - ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'), - <<>>; - Data -> ?LOG(debug, "SEND ~s", [emqx_stomp_frame:format(Packet)]), - ok = inc_outgoing_stats(Packet), - Data - end - end. - -%%-------------------------------------------------------------------- -%% Send data - --spec(send(iodata(), state()) -> ok). -send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) -> - Ctx = emqx_stomp_channel:info(ctx, Channel), - Oct = iolist_size(IoData), - ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.sent', Oct), - inc_counter(outgoing_bytes, Oct), - %emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel), - case Transport:async_send(Socket, IoData, [nosuspend]) of - ok -> ok; - Error = {error, _Reason} -> - %% Send an inet_reply to postpone handling the error - self() ! {inet_reply, Socket, Error}, - ok - end. - -%%-------------------------------------------------------------------- -%% Handle Info - -handle_info(activate_socket, State = #state{sockstate = OldSst}) -> - case activate_socket(State) of - {ok, NState = #state{sockstate = NewSst}} -> - case OldSst =/= NewSst of - true -> {ok, {event, NewSst}, NState}; - false -> {ok, NState} - end; - {error, Reason} -> - handle_info({sock_error, Reason}, State) - end; - -handle_info({sock_error, Reason}, State) -> - case Reason =/= closed andalso Reason =/= einval of - true -> ?LOG(warning, "socket_error: ~p", [Reason]); - false -> ok - end, - handle_info({sock_closed, Reason}, close_socket(State)); - -handle_info(Info, State) -> - with_channel(handle_info, [Info], State). - -%%-------------------------------------------------------------------- -%% Handle Info - -handle_cast({async_set_socket_options, Opts}, - State = #state{transport = Transport, - socket = Socket - }) -> - case Transport:setopts(Socket, Opts) of - ok -> ?tp(info, "custom_socket_options_successfully", #{opts => Opts}); - Err -> ?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err}) - end, - State; -handle_cast(Req, State) -> - ?tp(error, "received_unknown_cast", #{cast => Req}), - State. - -%%-------------------------------------------------------------------- -%% Ensure rate limit - -ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> - case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of - false -> State; - {ok, Limiter1} -> - State#state{limiter = Limiter1}; - {pause, Time, Limiter1} -> - ?LOG(warning, "Pause ~pms due to rate limit", [Time]), - TRef = emqx_misc:start_timer(Time, limit_timeout), - State#state{sockstate = blocked, - limiter = Limiter1, - limit_timer = TRef - } - end. - -%%-------------------------------------------------------------------- -%% Run GC and Check OOM - -run_gc(Stats, State = #state{gc_state = GcSt}) -> - case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of - false -> State; - {_IsGC, GcSt1} -> - State#state{gc_state = GcSt1} - end. - -check_oom(State = #state{oom_policy = OomPolicy}) -> - case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of - {shutdown, Reason} -> - %% triggers terminate/2 callback immediately - erlang:exit({shutdown, Reason}); - _Other -> ok - end, - State. - -%%-------------------------------------------------------------------- -%% Activate Socket - --compile({inline, [activate_socket/1]}). -activate_socket(State = #state{sockstate = closed}) -> - {ok, State}; -activate_socket(State = #state{sockstate = blocked}) -> - {ok, State}; -activate_socket(State = #state{transport = Transport, - socket = Socket, - active_n = N}) -> - case Transport:setopts(Socket, [{active, N}]) of - ok -> {ok, State#state{sockstate = running}}; - Error -> Error - end. - -%%-------------------------------------------------------------------- -%% Close Socket - -close_socket(State = #state{sockstate = closed}) -> State; -close_socket(State = #state{transport = Transport, socket = Socket}) -> - ok = Transport:fast_close(Socket), - State#state{sockstate = closed}. - -%%-------------------------------------------------------------------- -%% Inc incoming/outgoing stats - -%% XXX: Other packet type? -inc_incoming_stats(_Packet) -> - inc_counter(recv_pkt, 1), - ok. - %case Type =:= ?CMD_SEND of - % true -> - % inc_counter(recv_msg, 1), - % inc_counter(incoming_pubs, 1); - % false -> - % ok - %end, - %emqx_metrics:inc_recv(Packet). - -inc_outgoing_stats(_Packet) -> - inc_counter(send_pkt, 1), - ok. - %case Type =:= ?CMD_MESSAGE of - % true -> - % inc_counter(send_msg, 1), - % inc_counter(outgoing_pubs, 1); - % false -> - % ok - %end, - %emqx_metrics:inc_sent(Packet). - -%%-------------------------------------------------------------------- -%% Helper functions - -next_msgs(Packet) when is_record(Packet, stomp_frame) -> - {outgoing, Packet}; -next_msgs(Event) when is_tuple(Event) -> - Event; -next_msgs(More) when is_list(More) -> - More. - -shutdown(Reason, State) -> - stop({shutdown, Reason}, State). - -shutdown(Reason, Reply, State) -> - stop({shutdown, Reason}, Reply, State). - -stop(Reason, State) -> - {stop, Reason, State}. - -stop(Reason, Reply, State) -> - {stop, Reason, Reply, State}. - -inc_counter(Key, Inc) -> - _ = emqx_pd:inc_counter(Key, Inc), - ok. - -%%-------------------------------------------------------------------- -%% For CT tests -%%-------------------------------------------------------------------- - -set_field(Name, Value, State) -> - Pos = emqx_misc:index_of(Name, record_info(fields, state)), - setelement(Pos+1, State, Value). - -get_state(Pid) -> - State = sys:get_state(Pid), - maps:from_list(lists:zip(record_info(fields, state), - tl(tuple_to_list(State)))). diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl index b77439ba7..c003da078 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl @@ -68,6 +68,8 @@ -module(emqx_stomp_frame). +-behavior(emqx_gateway_frame). + -include("src/stomp/include/emqx_stomp.hrl"). -export([ initial_parse_state/1 diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl index 5592a7839..487b15077 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl @@ -113,8 +113,13 @@ start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) -> Name = name(InstaId, Type), + NCfg = Cfg#{ + ctx => Ctx, + frame_mod => emqx_stomp_frame, + chann_mod => emqx_stomp_channel + }, esockd:open(Name, ListenOn, merge_default(SocketOpts), - {emqx_stomp_connection, start_link, [Cfg#{ctx => Ctx}]}). + {emqx_gateway_conn, start_link, [NCfg]}). name(InstaId, Type) -> list_to_atom(lists:concat([InstaId, ":", Type])).