Rewrite the 'emqx_connection' module using a raw erlang process

This commit is contained in:
Feng Lee 2019-10-08 17:59:11 +08:00
parent e61173e9bc
commit e718fa8249
2 changed files with 374 additions and 375 deletions

View File

@ -31,7 +31,7 @@
, caps/1
]).
%% Exports for unit tests:(
%% Test Exports
-export([set_field/3]).
-export([ init/2
@ -97,6 +97,13 @@
disconnected_at := pos_integer()
}).
-type(action() :: {enter, connected | disconnected}
| {close, Reason :: atom()}
| {outgoing, emqx_types:packet()}
| {outgoing, [emqx_types:packet()]}).
-type(output() :: emqx_types:packet() | action() | [action()]).
-define(TIMER_TABLE, #{
stats_timer => emit_stats,
alive_timer => keepalive,
@ -223,12 +230,9 @@ init_gc_state(Zone) ->
-spec(handle_in(emqx_types:packet(), channel())
-> {ok, channel()}
| {ok, emqx_types:packet(), channel()}
| {ok, list(emqx_types:packet()), channel()}
| {close, channel()}
| {close, emqx_types:packet(), channel()}
| {stop, Error :: term(), channel()}
| {stop, Error :: term(), emqx_types:packet(), channel()}).
| {ok, output(), channel()}
| {stop, Reason :: term(), channel()}
| {stop, Reason :: term(), output(), channel()}).
handle_in(?CONNECT_PACKET(_), Channel = #channel{state = #{state_name := connected}}) ->
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel);
@ -243,35 +247,36 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
{ok, NConnPkt, NChannel} ->
process_connect(NConnPkt, NChannel);
{error, ReasonCode, NChannel} ->
handle_out({connack, emqx_reason_codes:formalized(connack, ReasonCode), ConnPkt}, NChannel)
ReasonName = emqx_reason_codes:formalized(connack, ReasonCode),
handle_out({connack, ReasonName, ConnPkt}, NChannel)
end;
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
Channel1 = inc_pub_stats(publish_in, Channel),
NChannel = inc_pub_stats(publish_in, Channel),
case emqx_packet:check(Packet) of
ok -> handle_publish(Packet, Channel1);
ok -> handle_publish(Packet, NChannel);
{error, ReasonCode} ->
handle_out({disconnect, ReasonCode}, Channel1)
handle_out({disconnect, ReasonCode}, NChannel)
end;
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
Channel = #channel{clientinfo = ClientInfo, session = Session}) ->
Channel1 = inc_pub_stats(puback_in, Channel),
NChannel = inc_pub_stats(puback_in, Channel),
case emqx_session:puback(PacketId, Session) of
{ok, Msg, Publishes, NSession} ->
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
handle_out({publish, Publishes}, Channel1#channel{session = NSession});
handle_out({publish, Publishes}, NChannel#channel{session = NSession});
{ok, Msg, NSession} ->
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
{ok, Channel1#channel{session = NSession}};
{ok, NChannel#channel{session = NSession}};
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
ok = emqx_metrics:inc('packets.puback.inuse'),
{ok, Channel1};
{ok, NChannel};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
ok = emqx_metrics:inc('packets.puback.missed'),
{ok, Channel1}
{ok, NChannel}
end;
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode),
@ -342,7 +347,7 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
end;
handle_in(?PACKET(?PINGREQ), Channel) ->
{ok, ?PACKET(?PINGRESP), Channel};
{ok, Channel, {outgoing, ?PACKET(?PINGRESP)}};
handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) ->
#{proto_ver := ProtoVer, expiry_interval := OldInterval} = ConnInfo,
@ -360,7 +365,7 @@ handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninf
{stop, ReasonName, Channel1};
true ->
Channel2 = Channel1#channel{conninfo = ConnInfo#{expiry_interval => Interval}},
{close, ReasonName, Channel2}
{ok, {close, ReasonName}, Channel2}
end;
handle_in(?AUTH_PACKET(), Channel) ->
@ -371,7 +376,8 @@ handle_in({frame_error, Reason}, Channel = #channel{state = FsmState}) ->
#{state_name := initialized} ->
{stop, {shutdown, Reason}, Channel};
#{state_name := connecting} ->
{stop, {shutdown, Reason}, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel};
Packet = ?CONNACK_PACKET(?RC_MALFORMED_PACKET),
{stop, {shutdown, Reason}, Packet, Channel};
#{state_name := connected} ->
handle_out({disconnect, ?RC_MALFORMED_PACKET}, Channel);
#{state_name := disconnected} ->
@ -553,7 +559,8 @@ handle_out({connack, ?RC_SUCCESS, SP, ConnPkt},
resuming = false,
pendings = []},
{ok, Packets, _} = handle_out({publish, Publishes}, Channel3),
{ok, [AckPacket|Packets], Channel3}
Output = [{outgoing, [AckPacket|Packets]}, {enter, connected}],
{ok, Output, Channel3}
end;
handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo,
@ -594,7 +601,7 @@ handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
end
end, [], Publishes),
NChannel = inc_pub_stats(publish_out, length(Packets), Channel),
{ok, lists:reverse(Packets), NChannel};
{ok, {outgoing, lists:reverse(Packets)}, NChannel};
%% Ignore loop deliver
handle_out({publish, _PacketId, #message{from = ClientId,
@ -640,7 +647,6 @@ handle_out({disconnect, ReasonCode}, Channel = #channel{conninfo = #{proto_ver :
ReasonName = emqx_reason_codes:name(ReasonCode, ProtoVer),
handle_out({disconnect, ReasonCode, ReasonName}, Channel);
%%TODO: Improve later...
handle_out({disconnect, ReasonCode, ReasonName},
Channel = #channel{conninfo = #{proto_ver := ProtoVer,
expiry_interval := ExpiryInterval}}) ->
@ -650,14 +656,19 @@ handle_out({disconnect, ReasonCode, ReasonName},
{0, _Ver} ->
{stop, ReasonName, Channel};
{?UINT_MAX, ?MQTT_PROTO_V5} ->
{close, ReasonName, ?DISCONNECT_PACKET(ReasonCode), Channel};
Output = [{outgoing, ?DISCONNECT_PACKET(ReasonCode)},
{close, ReasonName}],
{ok, Output, Channel};
{?UINT_MAX, _Ver} ->
{close, ReasonName, Channel};
{ok, {close, ReasonName}, Channel};
{Interval, ?MQTT_PROTO_V5} ->
NChannel = ensure_timer(expire_timer, Interval, Channel),
{close, ReasonName, ?DISCONNECT_PACKET(ReasonCode), NChannel};
Output = [{outgoing, ?DISCONNECT_PACKET(ReasonCode)},
{close, ReasonName}],
{ok, Output, NChannel};
{Interval, _Ver} ->
{close, ReasonName, ensure_timer(expire_timer, Interval, Channel)}
NChannel = ensure_timer(expire_timer, Interval, Channel),
{ok, {close, ReasonName}, NChannel}
end;
handle_out({Type, Data}, Channel) ->

View File

@ -14,41 +14,40 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% MQTT TCP/SSL Connection
-module(emqx_connection).
-behaviour(gen_statem).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Connection]").
-logger_header("[MQTT]").
-export([start_link/3]).
%% API
-export([ start_link/3
, call/2
, stop/1
]).
%% APIs
-export([ info/1
, stats/1
]).
-export([call/2]).
-export([init/4]).
%% state callbacks
-export([ idle/3
, connected/3
, disconnected/3
%% Sys callbacks
-export([ system_continue/3
, system_terminate/4
, system_code_change/4
, system_get_state/1
]).
%% gen_statem callbacks
-export([ init/1
, callback_mode/0
, code_change/4
, terminate/3
]).
%% Internal callbacks
-export([wakeup_from_hib/2]).
-record(state, {
%% Parent
parent :: pid(),
%% TCP/TLS Transport
transport :: esockd:transport(),
%% TCP/TLS Socket
@ -60,7 +59,7 @@
%% The {active, N} option
active_n :: pos_integer(),
%% The active state
active_state :: running | blocked,
active_st :: idle | running | blocked | closed,
%% Publish Limit
pub_limit :: maybe(esockd_rate_limit:bucket()),
%% Rate Limit
@ -72,39 +71,40 @@
%% Serialize function
serialize :: emqx_frame:serialize_fun(),
%% Channel State
chan_state :: emqx_channel:channel()
chan_state :: emqx_channel:channel(),
%% Idle timer
idle_timer :: reference()
}).
-type(state() :: #state{}).
-define(ACTIVE_N, 100).
-define(HANDLE(T, C, D), handle((T), (C), (D))).
-define(INFO_KEYS, [socktype, peername, sockname, active_n, active_state,
pub_limit, rate_limit]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
%% @doc Start the connection.
-spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist())
-> {ok, pid()}).
start_link(Transport, Socket, Options) ->
{ok, proc_lib:spawn_link(?MODULE, init, [{Transport, Socket, Options}])}.
CPid = proc_lib:spawn_link(?MODULE, init, [self(), Transport, Socket, Options]),
{ok, CPid}.
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%% @doc Get infos of the connection.
%% @doc Get infos of the connection/channel.
-spec(info(pid()|state()) -> emqx_types:infos()).
info(CPid) when is_pid(CPid) ->
call(CPid, info);
info(Conn = #state{chan_state = ChanState}) ->
info(State = #state{chan_state = ChanState}) ->
ChanInfo = emqx_channel:info(ChanState),
SockInfo = maps:from_list(info(?INFO_KEYS, Conn)),
SockInfo = maps:from_list(info(?INFO_KEYS, State)),
maps:merge(ChanInfo, #{sockinfo => SockInfo}).
info(Keys, Conn) when is_list(Keys) ->
[{Key, info(Key, Conn)} || Key <- Keys];
info(Keys, State) when is_list(Keys) ->
[{Key, info(Key, State)} || Key <- Keys];
info(socktype, #state{transport = Transport, socket = Socket}) ->
Transport:type(Socket);
info(peername, #state{peername = Peername}) ->
@ -113,7 +113,7 @@ info(sockname, #state{sockname = Sockname}) ->
Sockname;
info(active_n, #state{active_n = ActiveN}) ->
ActiveN;
info(active_state, #state{active_state = ActiveSt}) ->
info(active_st, #state{active_st= ActiveSt}) ->
ActiveSt;
info(pub_limit, #state{pub_limit = PubLimit}) ->
limit_info(PubLimit);
@ -125,7 +125,7 @@ info(chan_state, #state{chan_state = ChanState}) ->
limit_info(Limit) ->
emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit).
%% @doc Get stats of the channel.
%% @doc Get stats of the connection/channel.
-spec(stats(pid()|state()) -> emqx_types:stats()).
stats(CPid) when is_pid(CPid) ->
call(CPid, stats);
@ -141,18 +141,21 @@ stats(#state{transport = Transport,
ProcStats = emqx_misc:proc_stats(),
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
%% kick|discard|takeover
-spec(call(pid(), Req :: term()) -> Reply :: term()).
call(CPid, Req) -> gen_statem:call(CPid, Req).
call(Pid, Req) ->
gen_server:call(Pid, Req, infinity).
stop(Pid) ->
gen_server:stop(Pid).
%%--------------------------------------------------------------------
%% gen_statem callbacks
%% callbacks
%%--------------------------------------------------------------------
init({Transport, RawSocket, Options}) ->
init(Parent, Transport, RawSocket, Options) ->
{ok, Socket} = Transport:wait(RawSocket),
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
ConnInfo = #{socktype => Transport:type(Socket),
peername => Peername,
@ -160,7 +163,6 @@ init({Transport, RawSocket, Options}) ->
peercert => Peercert,
conn_mod => ?MODULE
},
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
Zone = proplists:get_value(zone, Options),
ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)),
@ -169,327 +171,328 @@ init({Transport, RawSocket, Options}) ->
ParseState = emqx_frame:initial_parse_state(FrameOpts),
Serialize = emqx_frame:serialize_fun(),
ChanState = emqx_channel:init(ConnInfo, Options),
State = #state{transport = Transport,
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
IdleTimer = emqx_misc:start_timer(IdleTimout, idle_timeout),
HibAfterTimeout = emqx_zone:get_env(Zone, hibernate_after, IdleTimout*2),
State = #state{parent = Parent,
transport = Transport,
socket = Socket,
peername = Peername,
sockname = Sockname,
active_n = ActiveN,
active_state = running,
active_st = idle,
pub_limit = PubLimit,
rate_limit = RateLimit,
parse_state = ParseState,
serialize = Serialize,
chan_state = ChanState
chan_state = ChanState,
idle_timer = IdleTimer
},
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
gen_statem:enter_loop(?MODULE, [{hibernate_after, 2 * IdleTimout}],
idle, State, self(), [IdleTimout]).
case activate_socket(State) of
{ok, NState} ->
recvloop(NState, #{hibernate_after => HibAfterTimeout});
{error, Reason} ->
Transport:fast_close(Socket),
erlang:exit({shutdown, Reason})
end.
-compile({inline, [init_limiter/1]}).
init_limiter(undefined) -> undefined;
init_limiter({Rate, Burst}) ->
esockd_rate_limit:new(Rate, Burst).
-compile({inline, [callback_mode/0]}).
callback_mode() ->
[state_functions, state_enter].
%%--------------------------------------------------------------------
%% Recv Loop
recvloop(State = #state{parent = Parent},
Options = #{hibernate_after := HibAfterTimeout}) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Options});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
process_msg([Msg], State, Options)
after
HibAfterTimeout ->
hibernate(State, Options)
end.
hibernate(State, Options) ->
proc_lib:hibernate(?MODULE, wakeup_from_hib, [State, Options]).
wakeup_from_hib(State, Options) ->
%% Maybe do something later here.
recvloop(State, Options).
%%--------------------------------------------------------------------
%% Idle State
%% Process next Msg
idle(enter, _, State) ->
case activate_socket(State) of
ok -> keep_state_and_data;
{error, Reason} ->
shutdown(Reason, State)
process_msg([], State, Options) ->
recvloop(State, Options);
process_msg([Msg|More], State, Options) ->
case catch handle_msg(Msg, State) of
ok ->
process_msg(More, State, Options);
{ok, NState} ->
process_msg(More, NState, Options);
{ok, NextMsgs, NState} ->
process_msg(append_msg(NextMsgs, More), NState, Options);
{stop, Reason} ->
terminate(Reason, State);
{stop, Reason, NState} ->
terminate(Reason, NState);
{'EXIT', Reason} ->
terminate(Reason, State)
end.
-compile({inline, [append_msg/2]}).
append_msg(NextMsgs, L) when is_list(NextMsgs) ->
lists:append(NextMsgs, L);
append_msg(NextMsg, L) -> [NextMsg|L].
%%--------------------------------------------------------------------
%% Handle a Msg
handle_msg({'$gen_call', From, Req}, State) ->
case handle_call(From, Req, State) of
{reply, Reply, NState} ->
gen_server:reply(From, Reply),
{ok, NState};
{stop, Reason, Reply, NState} ->
gen_server:reply(From, Reply),
{stop, Reason, NState}
end;
idle(timeout, _Timeout, State) ->
shutdown(idle_timeout, State);
idle(cast, {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end,
Serialize = emqx_frame:serialize_fun(ConnPkt),
NState = State#state{serialize = Serialize},
handle_incoming(Packet, SuccFun, NState);
idle(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end,
handle_incoming(Packet, SuccFun, State);
idle(cast, {incoming, FrameError = {frame_error, _Reason}}, State) ->
handle_incoming(FrameError, State);
idle(EventType, Content, State) ->
?HANDLE(EventType, Content, State).
%%--------------------------------------------------------------------
%% Connected State
connected(enter, _PrevSt, State) ->
ok = register_self(State),
keep_state_and_data;
connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
handle_incoming(Packet, fun keep_state/1, State);
connected(cast, {incoming, FrameError = {frame_error, _Reason}}, State) ->
handle_incoming(FrameError, State);
connected(info, Deliver = {deliver, _Topic, _Msg}, State) ->
handle_deliver(emqx_misc:drain_deliver([Deliver]), State);
connected(EventType, Content, State) ->
?HANDLE(EventType, Content, State).
%%--------------------------------------------------------------------
%% Disconnected State
disconnected(enter, _, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_info(disconnected, ChanState) of
{ok, NChanState} ->
ok = register_self(State#state{chan_state = NChanState}),
keep_state(State#state{chan_state = NChanState});
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState})
end;
disconnected(info, Deliver = {deliver, _Topic, _Msg}, State) ->
handle_deliver([Deliver], State);
disconnected(EventType, Content, State) ->
?HANDLE(EventType, Content, State).
%%--------------------------------------------------------------------
%% Handle call
handle({call, From}, info, State) ->
reply(From, info(State), State);
handle({call, From}, stats, State) ->
reply(From, stats(State), State);
handle({call, From}, state, State) ->
reply(From, State, State);
handle({call, From}, Req, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_call(Req, ChanState) of
{ok, Reply, NChanState} ->
reply(From, Reply, State#state{chan_state = NChanState});
{stop, Reason, Reply, NChanState} ->
ok = gen_statem:reply(From, Reply),
stop(Reason, State#state{chan_state = NChanState});
{stop, Reason, Packet, Reply, NChanState} ->
handle_outgoing(Packet, State#state{chan_state = NChanState}),
ok = gen_statem:reply(From, Reply),
stop(Reason, State#state{chan_state = NChanState})
end;
%%--------------------------------------------------------------------
%% Handle cast
handle(cast, Msg, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_info(Msg, ChanState) of
ok -> {ok, State};
{ok, NChanState} ->
keep_state(State#state{chan_state = NChanState});
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState})
end;
%%--------------------------------------------------------------------
%% Handle info
%% Handle incoming data
handle(info, {Inet, _Sock, Data}, State = #state{chan_state = ChanState})
handle_msg({Inet, _Sock, Data}, State = #state{chan_state = ChanState})
when Inet == tcp; Inet == ssl ->
?LOG(debug, "RECV ~p", [Data]),
Oct = iolist_size(Data),
emqx_pd:update_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct),
NChanState = emqx_channel:received(Oct, ChanState),
NState = State#state{chan_state = NChanState},
process_incoming(Data, NState);
State1 = State#state{chan_state = NChanState},
{Packets, State2} = parse_incoming(Data, State1),
{ok, next_incoming_msgs(Packets), State2};
handle(info, {Error, _Sock, Reason}, State)
when Error == tcp_error; Error == ssl_error ->
shutdown(Reason, State);
handle(info, {Closed, _Sock}, State)
when Closed == tcp_closed; Closed == ssl_closed ->
{next_state, disconnected, State};
handle(info, {Passive, _Sock}, State)
when Passive == tcp_passive; Passive == ssl_passive ->
%% Rate limit here:)
NState = ensure_rate_limit(State),
case activate_socket(NState) of
ok -> keep_state(NState);
{error, Reason} ->
shutdown(Reason, NState)
end;
handle(info, activate_socket, State) ->
%% Rate limit timer expired.
NState = State#state{active_state = running,
limit_timer = undefined
%% Handle incoming packets
handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
State = #state{idle_timer = IdleTimer}) ->
ok = emqx_misc:cancel_timer(IdleTimer),
NState = State#state{serialize = emqx_frame:serialize_fun(ConnPkt),
idle_timer = undefined
},
case activate_socket(NState) of
ok -> keep_state(NState);
{error, Reason} ->
shutdown(Reason, NState)
end;
handle_incoming(Packet, NState);
handle(info, {inet_reply, _Sock, ok}, _State) ->
%% something sent
keep_state_and_data;
handle_msg({incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
handle_incoming(Packet, State);
handle(info, {inet_reply, _Sock, {error, Reason}}, State) ->
shutdown(Reason, State);
handle(info, {timeout, TRef, keepalive},
State = #state{transport = Transport, socket = Socket}) ->
case Transport:getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} ->
handle_timeout(TRef, {keepalive, RecvOct}, State);
{error, Reason} ->
shutdown(Reason, State)
end;
handle(info, {timeout, TRef, emit_stats}, State) ->
handle_timeout(TRef, {emit_stats, stats(State)}, State);
handle(info, {timeout, TRef, Msg}, State) ->
handle_timeout(TRef, Msg, State);
handle(info, {shutdown, Reason}, State) ->
shutdown(Reason, State);
handle(info, Info, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_info(Info, ChanState) of
{ok, NChanState} ->
keep_state(State#state{chan_state = NChanState});
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState})
end.
code_change(_Vsn, State, Data, _Extra) ->
{ok, State, Data}.
terminate(Reason, _StateName, #state{transport = Transport,
socket = Socket,
chan_state = ChanState
}) ->
?LOG(debug, "Terminated for ~p", [Reason]),
ok = Transport:fast_close(Socket),
emqx_channel:terminate(Reason, ChanState).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
register_self(State = #state{active_n = ActiveN,
active_state = ActiveSt,
handle_msg({enter, connected}, State = #state{active_n = ActiveN,
active_st = ActiveSt,
chan_state = ChanState
}) ->
ChanAttrs = emqx_channel:attrs(ChanState),
SockAttrs = #{active_n => ActiveN,
active_state => ActiveSt
active_st => ActiveSt
},
Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
emqx_channel:handle_info({register, Attrs, stats(State)}, ChanState).
emqx_channel:handle_info({register, Attrs, stats(State)}, ChanState);
handle_msg({Error, _Sock, Reason}, State)
when Error == tcp_error; Error == ssl_error ->
handle_sockerr(Reason, State);
handle_msg({Closed, _Sock}, State)
when Closed == tcp_closed; Closed == ssl_closed ->
socket_closed(Closed, State);
handle_msg({Passive, _Sock}, State)
when Passive == tcp_passive; Passive == ssl_passive ->
%% Rate limit and activate socket here.
NState = ensure_rate_limit(State),
case activate_socket(NState) of
{ok, NState} -> {ok, NState};
{error, Reason} ->
handle_sockerr(Reason, State)
end;
%% Rate limit timer expired.
handle_msg(activate_socket, State) ->
NState = State#state{active_st = idle,
limit_timer = undefined
},
case activate_socket(NState) of
{ok, NState} -> {ok, NState};
{error, Reason} ->
handle_sockerr(Reason, State)
end;
handle_msg(Deliver = {deliver, _Topic, _Msg},
State = #state{chan_state = ChanState}) ->
Delivers = emqx_misc:drain_deliver([Deliver]),
Result = emqx_channel:handle_out({deliver, Delivers}, ChanState),
handle_chan_return(Result, State);
handle_msg({outgoing, Packets}, State) ->
handle_outgoing(Packets, State);
%% something sent
handle_msg({inet_reply, _Sock, ok}, _State) ->
ok;
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
handle_sockerr(Reason, State);
handle_msg({timeout, TRef, TMsg}, State) when is_reference(TRef) ->
handle_timeout(TRef, TMsg, State);
handle_msg(Shutdown = {shutdown, _Reason}, State) ->
{stop, Shutdown, State};
handle_msg(Msg, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_info(Msg, ChanState) of
{ok, NChanState} ->
{ok, State#state{chan_state = NChanState}};
{stop, Reason, NChanState} ->
{stop, Reason, State#state{chan_state = NChanState}}
end.
%%--------------------------------------------------------------------
%% Process incoming data
%% Terminate
-compile({inline, [process_incoming/2]}).
process_incoming(Data, State) ->
process_incoming(Data, [], State).
terminate(Reason, #state{transport = Transport,
socket = Socket,
active_st = ActiveSt,
chan_state = ChanState}) ->
?LOG(debug, "Terminated for ~p", [Reason]),
ActiveSt =:= closed orelse Transport:fast_close(Socket),
emqx_channel:terminate(Reason, ChanState),
exit(Reason).
process_incoming(<<>>, Packets, State) ->
keep_state(State, next_incoming_events(Packets));
%%--------------------------------------------------------------------
%% Sys callbacks
process_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
system_continue(_Parent, _Deb, {State, Options}) ->
recvloop(State, Options).
system_terminate(Reason, _Parent, _Deb, {State, _}) ->
terminate(Reason, State).
system_code_change(Misc, _, _, _) ->
{ok, Misc}.
system_get_state({State, _Options}) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Handle call
handle_call(_From, info, State) ->
{reply, info(State), State};
handle_call(_From, stats, State) ->
{reply, stats(State), State};
%% TODO: the handle_outgoing is not right ...
handle_call(_From, Req, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_call(Req, ChanState) of
{ok, Reply, NChanState} ->
{reply, Reply, State#state{chan_state = NChanState}};
{stop, Reason, Reply, NChanState} ->
{stop, Reason, Reply, State#state{chan_state = NChanState}};
{stop, Reason, Packet, Reply, NChanState} ->
State1 = State#state{chan_state = NChanState},
{ok, State2} = handle_outgoing(Packet, State1),
{stop, Reason, Reply, State2}
end.
%%--------------------------------------------------------------------
%% Handle timeout
handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
{stop, idle_timeout, State};
handle_timeout(TRef, emit_stats, State) ->
handle_timeout(TRef, {emit_stats, stats(State)}, State);
handle_timeout(TRef, keepalive, State = #state{transport = Transport,
socket = Socket}) ->
case Transport:getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} ->
handle_timeout(TRef, {keepalive, RecvOct}, State);
{error, Reason} ->
handle_sockerr(Reason, State)
end;
handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) ->
Result = emqx_channel:handle_timeout(TRef, Msg, ChanState),
handle_chan_return(Result, State).
%%--------------------------------------------------------------------
%% Parse incoming data.
parse_incoming(Data, State) ->
parse_incoming(Data, [], State).
parse_incoming(<<>>, Packets, State) ->
{Packets, State};
parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
try emqx_frame:parse(Data, ParseState) of
{more, NParseState} ->
NState = State#state{parse_state = NParseState},
keep_state(NState, next_incoming_events(Packets));
{Packets, State#state{parse_state = NParseState}};
{ok, Packet, Rest, NParseState} ->
NState = State#state{parse_state = NParseState},
process_incoming(Rest, [Packet|Packets], NState)
parse_incoming(Rest, [Packet|Packets], NState)
catch
error:Reason:Stk ->
?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nFrame data:~p",
[Reason, Stk, Data]),
keep_state(State, next_incoming_events(Packets++[{frame_error, Reason}]))
{[{frame_error, Reason}|Packets], State}
end.
-compile({inline, [next_incoming_events/1]}).
next_incoming_events([]) -> [];
next_incoming_events(Packets) ->
[next_event(cast, {incoming, Packet}) || Packet <- Packets].
next_incoming_msgs([Packet]) ->
{incoming, Packet};
next_incoming_msgs(Packets) ->
[{incoming, Packet} || Packet <- lists:reverse(Packets)].
%%--------------------------------------------------------------------
%% Handle incoming packet
handle_incoming(Packet = ?PACKET(Type), SuccFun, State = #state{chan_state = ChanState}) ->
handle_incoming(Packet = ?PACKET(Type), State = #state{chan_state = ChanState}) ->
_ = inc_incoming_stats(Type),
_ = emqx_metrics:inc_recv(Packet),
ok = emqx_metrics:inc_recv(Packet),
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
case emqx_channel:handle_in(Packet, ChanState) of
{ok, NChanState} ->
SuccFun(State#state{chan_state= NChanState});
{ok, OutPackets, NChanState} ->
NState = State#state{chan_state = NChanState},
handle_outgoing(OutPackets, SuccFun, NState);
{close, Reason, NChanState} ->
close(Reason, State#state{chan_state = NChanState});
{close, Reason, OutPackets, NChanState} ->
NState = State#state{chan_state= NChanState},
close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState));
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState});
{stop, Reason, OutPackets, NChanState} ->
NState = State#state{chan_state= NChanState},
stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState))
end.
Result = emqx_channel:handle_in(Packet, ChanState),
handle_chan_return(Result, State);
handle_incoming(FrameError = {frame_error, _Reason}, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_in(FrameError, ChanState) of
{close, Reason, NChanState} ->
close(Reason, State#state{chan_state = NChanState});
{close, Reason, OutPackets, NChanState} ->
NState = State#state{chan_state= NChanState},
close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState));
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState});
{stop, Reason, OutPackets, NChanState} ->
NState = State#state{chan_state= NChanState},
stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState))
end.
Result = emqx_channel:handle_in(FrameError, ChanState),
handle_chan_return(Result, State).
%%-------------------------------------------------------------------
%% Handle deliver
handle_deliver(Delivers, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_out({deliver, Delivers}, ChanState) of
{ok, NChanState} ->
keep_state(State#state{chan_state = NChanState});
{ok, Packets, NChanState} ->
handle_outgoing(Packets, fun keep_state/1, State#state{chan_state = NChanState})
end.
handle_chan_return({ok, NChanState}, State) ->
{ok, State#state{chan_state = NChanState}};
handle_chan_return({ok, OutPacket, NChanState}, State)
when is_record(OutPacket, mqtt_packet) ->
{ok, {outgoing, OutPacket}, State#state{chan_state = NChanState}};
handle_chan_return({ok, Actions, NChanState}, State) ->
{ok, Actions, State#state{chan_state = NChanState}};
handle_chan_return({stop, Reason, NChanState}, State) ->
{stop, Reason, State#state{chan_state = NChanState}};
handle_chan_return({stop, Reason, OutPackets, NChanState}, State) ->
NState = State#state{chan_state = NChanState},
{ok, NState1} = handle_outgoing(OutPackets, NState),
{stop, Reason, NState1}.
%%--------------------------------------------------------------------
%% Handle outgoing packets
handle_outgoing(Packets, State) when is_list(Packets) ->
send(lists:map(serialize_and_inc_stats_fun(State), Packets), State);
handle_outgoing(Packet, State) ->
handle_outgoing(Packet, fun (_) -> ok end, State).
handle_outgoing(Packets, SuccFun, State) when is_list(Packets) ->
send(lists:map(serialize_and_inc_stats_fun(State), Packets), SuccFun, State);
handle_outgoing(Packet, SuccFun, State) ->
send((serialize_and_inc_stats_fun(State))(Packet), SuccFun, State).
send((serialize_and_inc_stats_fun(State))(Packet), State).
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet = ?PACKET(Type)) ->
@ -507,37 +510,68 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
%%--------------------------------------------------------------------
%% Send data
send(IoData, SuccFun, State = #state{transport = Transport,
send(IoData, State = #state{transport = Transport,
socket = Socket,
chan_state = ChanState}) ->
Oct = iolist_size(IoData),
ok = emqx_metrics:inc('bytes.sent', Oct),
case Transport:async_send(Socket, IoData) of
ok -> NChanState = emqx_channel:sent(Oct, ChanState),
SuccFun(State#state{chan_state = NChanState});
{error, Reason} ->
shutdown(Reason, State)
ok ->
NChanState = emqx_channel:sent(Oct, ChanState),
{ok, State#state{chan_state = NChanState}};
Error = {error, _Reason} ->
%% Simulate an inet_reply to postpone handling the error
self() ! {inet_reply, Socket, Error},
{ok, State}
end.
%%--------------------------------------------------------------------
%% Handle timeout
%% Handle sockerr
handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_timeout(TRef, Msg, ChanState) of
handle_sockerr(_Reason, State = #state{active_st = closed}) ->
{ok, State};
handle_sockerr(Reason, State = #state{transport = Transport,
socket = Socket,
chan_state = ChanState}) ->
?LOG(debug, "Socket error: ~p", [Reason]),
ok = Transport:fast_close(Socket),
NState = State#state{active_st = closed},
case emqx_channel:handle_info({sockerr, Reason}, ChanState) of
{ok, NChanState} ->
keep_state(State#state{chan_state = NChanState});
{ok, Packets, NChanState} ->
handle_outgoing(Packets, fun keep_state/1, State#state{chan_state = NChanState});
{close, Reason, NChanState} ->
close(Reason, State#state{chan_state = NChanState});
{close, Reason, OutPackets, NChanState} ->
NState = State#state{chan_state= NChanState},
close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState));
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState});
{stop, Reason, OutPackets, NChanState} ->
NState = State#state{chan_state= NChanState},
stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState))
{ok, NState#state{chan_state = NChanState}};
{stop, NChanState} ->
{stop, {shutdown, Reason}, NState#state{chan_state = NChanState}}
end.
socket_closed(Closed, State = #state{transport = Transport,
socket = Socket,
chan_state = ChanState}) ->
?LOG(debug, "Socket closed: ~p", [Closed]),
ok = Transport:fast_close(Socket),
NState = State#state{active_st = closed},
case emqx_channel:handle_info({sock_closed, Closed}, ChanState) of
{ok, NChanState} ->
{ok, NState#state{chan_state = NChanState}};
{stop, NChanState} ->
NState = NState#state{chan_state = NChanState},
{stop, {shutdown, Closed}, NState}
end.
%%--------------------------------------------------------------------
%% Activate Socket
-compile({inline, [activate_socket/1]}).
activate_socket(State = #state{active_st = closed}) ->
{ok, State};
activate_socket(State = #state{active_st = blocked}) ->
{ok, State};
activate_socket(State = #state{transport = Transport,
socket = Socket,
active_n = N}) ->
case Transport:setopts(Socket, [{active, N}]) of
ok -> {ok, State#state{active_st = running}};
Error -> Error
end.
%%--------------------------------------------------------------------
@ -561,22 +595,10 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) ->
{Pause, Rl1} ->
?LOG(debug, "Pause ~pms due to rate limit", [Pause]),
TRef = erlang:send_after(Pause, self(), activate_socket),
NState = State#state{active_state = blocked,
limit_timer = TRef
},
NState = State#state{active_st = blocked, limit_timer = TRef},
setelement(Pos, NState, Rl1)
end.
%%--------------------------------------------------------------------
%% Activate Socket
-compile({inline, [activate_socket/1]}).
activate_socket(#state{active_state = blocked}) -> ok;
activate_socket(#state{transport = Transport,
socket = Socket,
active_n = N}) ->
Transport:setopts(Socket, [{active, N}]).
%%--------------------------------------------------------------------
%% Inc incoming/outgoing stats
@ -590,44 +612,10 @@ inc_incoming_stats(Type) when is_integer(Type) ->
true -> ok
end.
-compile({inline, [inc_outgoing_stats/1]}).
inc_outgoing_stats(Type) ->
emqx_pd:update_counter(send_pkt, 1),
(Type == ?PUBLISH)
andalso emqx_pd:update_counter(send_msg, 1).
%%--------------------------------------------------------------------
%% Helper functions
-compile({inline,
[ reply/3
, keep_state/1
, keep_state/2
, next_event/2
, shutdown/2
, stop/2
]}).
reply(From, Reply, State) ->
{keep_state, State, [{reply, From, Reply}]}.
keep_state(State) ->
{keep_state, State}.
keep_state(State, Events) ->
{keep_state, State, Events}.
next_event(Type, Content) ->
{next_event, Type, Content}.
close(Reason, State = #state{transport = Transport, socket = Socket}) ->
?LOG(warning, "Closed for ~p", [Reason]),
ok = Transport:fast_close(Socket),
{next_state, disconnected, State}.
shutdown(Reason, State) ->
stop({shutdown, Reason}, State).
stop(Reason, State) ->
{stop, Reason, State}.