From e718fa8249ad9d734898edb11f370d5f13a48eb7 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 8 Oct 2019 17:59:11 +0800 Subject: [PATCH] Rewrite the 'emqx_connection' module using a raw erlang process --- src/emqx_channel.erl | 67 ++-- src/emqx_connection.erl | 682 ++++++++++++++++++++-------------------- 2 files changed, 374 insertions(+), 375 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 01e32410d..670b8f702 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -31,7 +31,7 @@ , caps/1 ]). -%% Exports for unit tests:( +%% Test Exports -export([set_field/3]). -export([ init/2 @@ -97,6 +97,13 @@ disconnected_at := pos_integer() }). +-type(action() :: {enter, connected | disconnected} + | {close, Reason :: atom()} + | {outgoing, emqx_types:packet()} + | {outgoing, [emqx_types:packet()]}). + +-type(output() :: emqx_types:packet() | action() | [action()]). + -define(TIMER_TABLE, #{ stats_timer => emit_stats, alive_timer => keepalive, @@ -223,12 +230,9 @@ init_gc_state(Zone) -> -spec(handle_in(emqx_types:packet(), channel()) -> {ok, channel()} - | {ok, emqx_types:packet(), channel()} - | {ok, list(emqx_types:packet()), channel()} - | {close, channel()} - | {close, emqx_types:packet(), channel()} - | {stop, Error :: term(), channel()} - | {stop, Error :: term(), emqx_types:packet(), channel()}). + | {ok, output(), channel()} + | {stop, Reason :: term(), channel()} + | {stop, Reason :: term(), output(), channel()}). handle_in(?CONNECT_PACKET(_), Channel = #channel{state = #{state_name := connected}}) -> handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); @@ -243,35 +247,36 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> {ok, NConnPkt, NChannel} -> process_connect(NConnPkt, NChannel); {error, ReasonCode, NChannel} -> - handle_out({connack, emqx_reason_codes:formalized(connack, ReasonCode), ConnPkt}, NChannel) + ReasonName = emqx_reason_codes:formalized(connack, ReasonCode), + handle_out({connack, ReasonName, ConnPkt}, NChannel) end; handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> - Channel1 = inc_pub_stats(publish_in, Channel), + NChannel = inc_pub_stats(publish_in, Channel), case emqx_packet:check(Packet) of - ok -> handle_publish(Packet, Channel1); + ok -> handle_publish(Packet, NChannel); {error, ReasonCode} -> - handle_out({disconnect, ReasonCode}, Channel1) + handle_out({disconnect, ReasonCode}, NChannel) end; handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> - Channel1 = inc_pub_stats(puback_in, Channel), + NChannel = inc_pub_stats(puback_in, Channel), case emqx_session:puback(PacketId, Session) of {ok, Msg, Publishes, NSession} -> ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), - handle_out({publish, Publishes}, Channel1#channel{session = NSession}); + handle_out({publish, Publishes}, NChannel#channel{session = NSession}); {ok, Msg, NSession} -> ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), - {ok, Channel1#channel{session = NSession}}; + {ok, NChannel#channel{session = NSession}}; {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), ok = emqx_metrics:inc('packets.puback.inuse'), - {ok, Channel1}; + {ok, NChannel}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.puback.missed'), - {ok, Channel1} + {ok, NChannel} end; handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), @@ -342,7 +347,7 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), end; handle_in(?PACKET(?PINGREQ), Channel) -> - {ok, ?PACKET(?PINGRESP), Channel}; + {ok, Channel, {outgoing, ?PACKET(?PINGRESP)}}; handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) -> #{proto_ver := ProtoVer, expiry_interval := OldInterval} = ConnInfo, @@ -360,7 +365,7 @@ handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninf {stop, ReasonName, Channel1}; true -> Channel2 = Channel1#channel{conninfo = ConnInfo#{expiry_interval => Interval}}, - {close, ReasonName, Channel2} + {ok, {close, ReasonName}, Channel2} end; handle_in(?AUTH_PACKET(), Channel) -> @@ -371,7 +376,8 @@ handle_in({frame_error, Reason}, Channel = #channel{state = FsmState}) -> #{state_name := initialized} -> {stop, {shutdown, Reason}, Channel}; #{state_name := connecting} -> - {stop, {shutdown, Reason}, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel}; + Packet = ?CONNACK_PACKET(?RC_MALFORMED_PACKET), + {stop, {shutdown, Reason}, Packet, Channel}; #{state_name := connected} -> handle_out({disconnect, ?RC_MALFORMED_PACKET}, Channel); #{state_name := disconnected} -> @@ -528,7 +534,7 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel = %% Handle outgoing packet %%-------------------------------------------------------------------- -%%TODO: RunFold or Pipeline +%% TODO: RunFold or Pipeline handle_out({connack, ?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo, @@ -553,7 +559,8 @@ handle_out({connack, ?RC_SUCCESS, SP, ConnPkt}, resuming = false, pendings = []}, {ok, Packets, _} = handle_out({publish, Publishes}, Channel3), - {ok, [AckPacket|Packets], Channel3} + Output = [{outgoing, [AckPacket|Packets]}, {enter, connected}], + {ok, Output, Channel3} end; handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo, @@ -594,12 +601,12 @@ handle_out({publish, Publishes}, Channel) when is_list(Publishes) -> end end, [], Publishes), NChannel = inc_pub_stats(publish_out, length(Packets), Channel), - {ok, lists:reverse(Packets), NChannel}; + {ok, {outgoing, lists:reverse(Packets)}, NChannel}; %% Ignore loop deliver handle_out({publish, _PacketId, #message{from = ClientId, flags = #{nl := true}}}, - Channel = #channel{clientinfo = #{clientid := ClientId}}) -> + Channel = #channel{clientinfo = #{clientid := ClientId}}) -> {ok, Channel}; handle_out({publish, PacketId, Msg}, Channel = @@ -640,7 +647,6 @@ handle_out({disconnect, ReasonCode}, Channel = #channel{conninfo = #{proto_ver : ReasonName = emqx_reason_codes:name(ReasonCode, ProtoVer), handle_out({disconnect, ReasonCode, ReasonName}, Channel); -%%TODO: Improve later... handle_out({disconnect, ReasonCode, ReasonName}, Channel = #channel{conninfo = #{proto_ver := ProtoVer, expiry_interval := ExpiryInterval}}) -> @@ -650,14 +656,19 @@ handle_out({disconnect, ReasonCode, ReasonName}, {0, _Ver} -> {stop, ReasonName, Channel}; {?UINT_MAX, ?MQTT_PROTO_V5} -> - {close, ReasonName, ?DISCONNECT_PACKET(ReasonCode), Channel}; + Output = [{outgoing, ?DISCONNECT_PACKET(ReasonCode)}, + {close, ReasonName}], + {ok, Output, Channel}; {?UINT_MAX, _Ver} -> - {close, ReasonName, Channel}; + {ok, {close, ReasonName}, Channel}; {Interval, ?MQTT_PROTO_V5} -> NChannel = ensure_timer(expire_timer, Interval, Channel), - {close, ReasonName, ?DISCONNECT_PACKET(ReasonCode), NChannel}; + Output = [{outgoing, ?DISCONNECT_PACKET(ReasonCode)}, + {close, ReasonName}], + {ok, Output, NChannel}; {Interval, _Ver} -> - {close, ReasonName, ensure_timer(expire_timer, Interval, Channel)} + NChannel = ensure_timer(expire_timer, Interval, Channel), + {ok, {close, ReasonName}, NChannel} end; handle_out({Type, Data}, Channel) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index e577fd2cc..571d08ba1 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -14,41 +14,40 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% MQTT TCP/SSL Connection -module(emqx_connection). --behaviour(gen_statem). - -include("emqx.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). -include("types.hrl"). --logger_header("[Connection]"). +-logger_header("[MQTT]"). --export([start_link/3]). +%% API +-export([ start_link/3 + , call/2 + , stop/1 + ]). -%% APIs -export([ info/1 , stats/1 ]). --export([call/2]). +-export([init/4]). -%% state callbacks --export([ idle/3 - , connected/3 - , disconnected/3 +%% Sys callbacks +-export([ system_continue/3 + , system_terminate/4 + , system_code_change/4 + , system_get_state/1 ]). -%% gen_statem callbacks --export([ init/1 - , callback_mode/0 - , code_change/4 - , terminate/3 - ]). +%% Internal callbacks +-export([wakeup_from_hib/2]). -record(state, { + %% Parent + parent :: pid(), %% TCP/TLS Transport transport :: esockd:transport(), %% TCP/TLS Socket @@ -60,7 +59,7 @@ %% The {active, N} option active_n :: pos_integer(), %% The active state - active_state :: running | blocked, + active_st :: idle | running | blocked | closed, %% Publish Limit pub_limit :: maybe(esockd_rate_limit:bucket()), %% Rate Limit @@ -72,39 +71,40 @@ %% Serialize function serialize :: emqx_frame:serialize_fun(), %% Channel State - chan_state :: emqx_channel:channel() + chan_state :: emqx_channel:channel(), + %% Idle timer + idle_timer :: reference() }). -type(state() :: #state{}). -define(ACTIVE_N, 100). --define(HANDLE(T, C, D), handle((T), (C), (D))). -define(INFO_KEYS, [socktype, peername, sockname, active_n, active_state, pub_limit, rate_limit]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -%% @doc Start the connection. -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist()) -> {ok, pid()}). start_link(Transport, Socket, Options) -> - {ok, proc_lib:spawn_link(?MODULE, init, [{Transport, Socket, Options}])}. + CPid = proc_lib:spawn_link(?MODULE, init, [self(), Transport, Socket, Options]), + {ok, CPid}. %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- -%% @doc Get infos of the connection. +%% @doc Get infos of the connection/channel. -spec(info(pid()|state()) -> emqx_types:infos()). info(CPid) when is_pid(CPid) -> call(CPid, info); -info(Conn = #state{chan_state = ChanState}) -> +info(State = #state{chan_state = ChanState}) -> ChanInfo = emqx_channel:info(ChanState), - SockInfo = maps:from_list(info(?INFO_KEYS, Conn)), + SockInfo = maps:from_list(info(?INFO_KEYS, State)), maps:merge(ChanInfo, #{sockinfo => SockInfo}). -info(Keys, Conn) when is_list(Keys) -> - [{Key, info(Key, Conn)} || Key <- Keys]; +info(Keys, State) when is_list(Keys) -> + [{Key, info(Key, State)} || Key <- Keys]; info(socktype, #state{transport = Transport, socket = Socket}) -> Transport:type(Socket); info(peername, #state{peername = Peername}) -> @@ -113,7 +113,7 @@ info(sockname, #state{sockname = Sockname}) -> Sockname; info(active_n, #state{active_n = ActiveN}) -> ActiveN; -info(active_state, #state{active_state = ActiveSt}) -> +info(active_st, #state{active_st= ActiveSt}) -> ActiveSt; info(pub_limit, #state{pub_limit = PubLimit}) -> limit_info(PubLimit); @@ -125,7 +125,7 @@ info(chan_state, #state{chan_state = ChanState}) -> limit_info(Limit) -> emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit). -%% @doc Get stats of the channel. +%% @doc Get stats of the connection/channel. -spec(stats(pid()|state()) -> emqx_types:stats()). stats(CPid) when is_pid(CPid) -> call(CPid, stats); @@ -141,18 +141,21 @@ stats(#state{transport = Transport, ProcStats = emqx_misc:proc_stats(), lists:append([SockStats, ConnStats, ChanStats, ProcStats]). -%% kick|discard|takeover --spec(call(pid(), Req :: term()) -> Reply :: term()). -call(CPid, Req) -> gen_statem:call(CPid, Req). +call(Pid, Req) -> + gen_server:call(Pid, Req, infinity). + +stop(Pid) -> + gen_server:stop(Pid). %%-------------------------------------------------------------------- -%% gen_statem callbacks +%% callbacks %%-------------------------------------------------------------------- -init({Transport, RawSocket, Options}) -> +init(Parent, Transport, RawSocket, Options) -> {ok, Socket} = Transport:wait(RawSocket), {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), + emqx_logger:set_metadata_peername(esockd_net:format(Peername)), Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]), ConnInfo = #{socktype => Transport:type(Socket), peername => Peername, @@ -160,7 +163,6 @@ init({Transport, RawSocket, Options}) -> peercert => Peercert, conn_mod => ?MODULE }, - emqx_logger:set_metadata_peername(esockd_net:format(Peername)), Zone = proplists:get_value(zone, Options), ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)), @@ -169,327 +171,328 @@ init({Transport, RawSocket, Options}) -> ParseState = emqx_frame:initial_parse_state(FrameOpts), Serialize = emqx_frame:serialize_fun(), ChanState = emqx_channel:init(ConnInfo, Options), - State = #state{transport = Transport, + IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), + IdleTimer = emqx_misc:start_timer(IdleTimout, idle_timeout), + HibAfterTimeout = emqx_zone:get_env(Zone, hibernate_after, IdleTimout*2), + State = #state{parent = Parent, + transport = Transport, socket = Socket, peername = Peername, sockname = Sockname, active_n = ActiveN, - active_state = running, + active_st = idle, pub_limit = PubLimit, rate_limit = RateLimit, parse_state = ParseState, serialize = Serialize, - chan_state = ChanState + chan_state = ChanState, + idle_timer = IdleTimer }, - IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), - gen_statem:enter_loop(?MODULE, [{hibernate_after, 2 * IdleTimout}], - idle, State, self(), [IdleTimout]). + case activate_socket(State) of + {ok, NState} -> + recvloop(NState, #{hibernate_after => HibAfterTimeout}); + {error, Reason} -> + Transport:fast_close(Socket), + erlang:exit({shutdown, Reason}) + end. -compile({inline, [init_limiter/1]}). init_limiter(undefined) -> undefined; init_limiter({Rate, Burst}) -> esockd_rate_limit:new(Rate, Burst). --compile({inline, [callback_mode/0]}). -callback_mode() -> - [state_functions, state_enter]. +%%-------------------------------------------------------------------- +%% Recv Loop + +recvloop(State = #state{parent = Parent}, + Options = #{hibernate_after := HibAfterTimeout}) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Options}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + process_msg([Msg], State, Options) + after + HibAfterTimeout -> + hibernate(State, Options) + end. + +hibernate(State, Options) -> + proc_lib:hibernate(?MODULE, wakeup_from_hib, [State, Options]). + +wakeup_from_hib(State, Options) -> + %% Maybe do something later here. + recvloop(State, Options). %%-------------------------------------------------------------------- -%% Idle State +%% Process next Msg -idle(enter, _, State) -> - case activate_socket(State) of - ok -> keep_state_and_data; - {error, Reason} -> - shutdown(Reason, State) +process_msg([], State, Options) -> + recvloop(State, Options); +process_msg([Msg|More], State, Options) -> + case catch handle_msg(Msg, State) of + ok -> + process_msg(More, State, Options); + {ok, NState} -> + process_msg(More, NState, Options); + {ok, NextMsgs, NState} -> + process_msg(append_msg(NextMsgs, More), NState, Options); + {stop, Reason} -> + terminate(Reason, State); + {stop, Reason, NState} -> + terminate(Reason, NState); + {'EXIT', Reason} -> + terminate(Reason, State) + end. + +-compile({inline, [append_msg/2]}). +append_msg(NextMsgs, L) when is_list(NextMsgs) -> + lists:append(NextMsgs, L); +append_msg(NextMsg, L) -> [NextMsg|L]. + +%%-------------------------------------------------------------------- +%% Handle a Msg + +handle_msg({'$gen_call', From, Req}, State) -> + case handle_call(From, Req, State) of + {reply, Reply, NState} -> + gen_server:reply(From, Reply), + {ok, NState}; + {stop, Reason, Reply, NState} -> + gen_server:reply(From, Reply), + {stop, Reason, NState} end; -idle(timeout, _Timeout, State) -> - shutdown(idle_timeout, State); - -idle(cast, {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) -> - SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end, - Serialize = emqx_frame:serialize_fun(ConnPkt), - NState = State#state{serialize = Serialize}, - handle_incoming(Packet, SuccFun, NState); - -idle(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> - SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end, - handle_incoming(Packet, SuccFun, State); - -idle(cast, {incoming, FrameError = {frame_error, _Reason}}, State) -> - handle_incoming(FrameError, State); - -idle(EventType, Content, State) -> - ?HANDLE(EventType, Content, State). - -%%-------------------------------------------------------------------- -%% Connected State - -connected(enter, _PrevSt, State) -> - ok = register_self(State), - keep_state_and_data; - -connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> - handle_incoming(Packet, fun keep_state/1, State); - -connected(cast, {incoming, FrameError = {frame_error, _Reason}}, State) -> - handle_incoming(FrameError, State); - -connected(info, Deliver = {deliver, _Topic, _Msg}, State) -> - handle_deliver(emqx_misc:drain_deliver([Deliver]), State); - -connected(EventType, Content, State) -> - ?HANDLE(EventType, Content, State). - -%%-------------------------------------------------------------------- -%% Disconnected State - -disconnected(enter, _, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_info(disconnected, ChanState) of - {ok, NChanState} -> - ok = register_self(State#state{chan_state = NChanState}), - keep_state(State#state{chan_state = NChanState}); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) - end; - -disconnected(info, Deliver = {deliver, _Topic, _Msg}, State) -> - handle_deliver([Deliver], State); - -disconnected(EventType, Content, State) -> - ?HANDLE(EventType, Content, State). - -%%-------------------------------------------------------------------- -%% Handle call - -handle({call, From}, info, State) -> - reply(From, info(State), State); - -handle({call, From}, stats, State) -> - reply(From, stats(State), State); - -handle({call, From}, state, State) -> - reply(From, State, State); - -handle({call, From}, Req, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_call(Req, ChanState) of - {ok, Reply, NChanState} -> - reply(From, Reply, State#state{chan_state = NChanState}); - {stop, Reason, Reply, NChanState} -> - ok = gen_statem:reply(From, Reply), - stop(Reason, State#state{chan_state = NChanState}); - {stop, Reason, Packet, Reply, NChanState} -> - handle_outgoing(Packet, State#state{chan_state = NChanState}), - ok = gen_statem:reply(From, Reply), - stop(Reason, State#state{chan_state = NChanState}) - end; - -%%-------------------------------------------------------------------- -%% Handle cast - -handle(cast, Msg, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_info(Msg, ChanState) of - ok -> {ok, State}; - {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) - end; - -%%-------------------------------------------------------------------- -%% Handle info - %% Handle incoming data -handle(info, {Inet, _Sock, Data}, State = #state{chan_state = ChanState}) +handle_msg({Inet, _Sock, Data}, State = #state{chan_state = ChanState}) when Inet == tcp; Inet == ssl -> ?LOG(debug, "RECV ~p", [Data]), Oct = iolist_size(Data), emqx_pd:update_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), NChanState = emqx_channel:received(Oct, ChanState), - NState = State#state{chan_state = NChanState}, - process_incoming(Data, NState); + State1 = State#state{chan_state = NChanState}, + {Packets, State2} = parse_incoming(Data, State1), + {ok, next_incoming_msgs(Packets), State2}; -handle(info, {Error, _Sock, Reason}, State) +%% Handle incoming packets +handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, + State = #state{idle_timer = IdleTimer}) -> + ok = emqx_misc:cancel_timer(IdleTimer), + NState = State#state{serialize = emqx_frame:serialize_fun(ConnPkt), + idle_timer = undefined + }, + handle_incoming(Packet, NState); + +handle_msg({incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> + handle_incoming(Packet, State); + +handle_msg({enter, connected}, State = #state{active_n = ActiveN, + active_st = ActiveSt, + chan_state = ChanState + }) -> + ChanAttrs = emqx_channel:attrs(ChanState), + SockAttrs = #{active_n => ActiveN, + active_st => ActiveSt + }, + Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), + emqx_channel:handle_info({register, Attrs, stats(State)}, ChanState); + +handle_msg({Error, _Sock, Reason}, State) when Error == tcp_error; Error == ssl_error -> - shutdown(Reason, State); + handle_sockerr(Reason, State); -handle(info, {Closed, _Sock}, State) +handle_msg({Closed, _Sock}, State) when Closed == tcp_closed; Closed == ssl_closed -> - {next_state, disconnected, State}; + socket_closed(Closed, State); -handle(info, {Passive, _Sock}, State) +handle_msg({Passive, _Sock}, State) when Passive == tcp_passive; Passive == ssl_passive -> - %% Rate limit here:) + %% Rate limit and activate socket here. NState = ensure_rate_limit(State), case activate_socket(NState) of - ok -> keep_state(NState); + {ok, NState} -> {ok, NState}; {error, Reason} -> - shutdown(Reason, NState) + handle_sockerr(Reason, State) end; -handle(info, activate_socket, State) -> - %% Rate limit timer expired. - NState = State#state{active_state = running, - limit_timer = undefined +%% Rate limit timer expired. +handle_msg(activate_socket, State) -> + NState = State#state{active_st = idle, + limit_timer = undefined }, case activate_socket(NState) of - ok -> keep_state(NState); + {ok, NState} -> {ok, NState}; {error, Reason} -> - shutdown(Reason, NState) + handle_sockerr(Reason, State) end; -handle(info, {inet_reply, _Sock, ok}, _State) -> - %% something sent - keep_state_and_data; +handle_msg(Deliver = {deliver, _Topic, _Msg}, + State = #state{chan_state = ChanState}) -> + Delivers = emqx_misc:drain_deliver([Deliver]), + Result = emqx_channel:handle_out({deliver, Delivers}, ChanState), + handle_chan_return(Result, State); -handle(info, {inet_reply, _Sock, {error, Reason}}, State) -> - shutdown(Reason, State); +handle_msg({outgoing, Packets}, State) -> + handle_outgoing(Packets, State); -handle(info, {timeout, TRef, keepalive}, - State = #state{transport = Transport, socket = Socket}) -> +%% something sent +handle_msg({inet_reply, _Sock, ok}, _State) -> + ok; + +handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> + handle_sockerr(Reason, State); + +handle_msg({timeout, TRef, TMsg}, State) when is_reference(TRef) -> + handle_timeout(TRef, TMsg, State); + +handle_msg(Shutdown = {shutdown, _Reason}, State) -> + {stop, Shutdown, State}; + +handle_msg(Msg, State = #state{chan_state = ChanState}) -> + case emqx_channel:handle_info(Msg, ChanState) of + {ok, NChanState} -> + {ok, State#state{chan_state = NChanState}}; + {stop, Reason, NChanState} -> + {stop, Reason, State#state{chan_state = NChanState}} + end. + +%%-------------------------------------------------------------------- +%% Terminate + +terminate(Reason, #state{transport = Transport, + socket = Socket, + active_st = ActiveSt, + chan_state = ChanState}) -> + ?LOG(debug, "Terminated for ~p", [Reason]), + ActiveSt =:= closed orelse Transport:fast_close(Socket), + emqx_channel:terminate(Reason, ChanState), + exit(Reason). + +%%-------------------------------------------------------------------- +%% Sys callbacks + +system_continue(_Parent, _Deb, {State, Options}) -> + recvloop(State, Options). + +system_terminate(Reason, _Parent, _Deb, {State, _}) -> + terminate(Reason, State). + +system_code_change(Misc, _, _, _) -> + {ok, Misc}. + +system_get_state({State, _Options}) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Handle call + +handle_call(_From, info, State) -> + {reply, info(State), State}; + +handle_call(_From, stats, State) -> + {reply, stats(State), State}; + +%% TODO: the handle_outgoing is not right ... +handle_call(_From, Req, State = #state{chan_state = ChanState}) -> + case emqx_channel:handle_call(Req, ChanState) of + {ok, Reply, NChanState} -> + {reply, Reply, State#state{chan_state = NChanState}}; + {stop, Reason, Reply, NChanState} -> + {stop, Reason, Reply, State#state{chan_state = NChanState}}; + {stop, Reason, Packet, Reply, NChanState} -> + State1 = State#state{chan_state = NChanState}, + {ok, State2} = handle_outgoing(Packet, State1), + {stop, Reason, Reply, State2} + end. + +%%-------------------------------------------------------------------- +%% Handle timeout + +handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) -> + {stop, idle_timeout, State}; + +handle_timeout(TRef, emit_stats, State) -> + handle_timeout(TRef, {emit_stats, stats(State)}, State); + +handle_timeout(TRef, keepalive, State = #state{transport = Transport, + socket = Socket}) -> case Transport:getstat(Socket, [recv_oct]) of {ok, [{recv_oct, RecvOct}]} -> handle_timeout(TRef, {keepalive, RecvOct}, State); {error, Reason} -> - shutdown(Reason, State) + handle_sockerr(Reason, State) end; -handle(info, {timeout, TRef, emit_stats}, State) -> - handle_timeout(TRef, {emit_stats, stats(State)}, State); - -handle(info, {timeout, TRef, Msg}, State) -> - handle_timeout(TRef, Msg, State); - -handle(info, {shutdown, Reason}, State) -> - shutdown(Reason, State); - -handle(info, Info, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_info(Info, ChanState) of - {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) - end. - -code_change(_Vsn, State, Data, _Extra) -> - {ok, State, Data}. - -terminate(Reason, _StateName, #state{transport = Transport, - socket = Socket, - chan_state = ChanState - }) -> - ?LOG(debug, "Terminated for ~p", [Reason]), - ok = Transport:fast_close(Socket), - emqx_channel:terminate(Reason, ChanState). +handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) -> + Result = emqx_channel:handle_timeout(TRef, Msg, ChanState), + handle_chan_return(Result, State). %%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- +%% Parse incoming data. -register_self(State = #state{active_n = ActiveN, - active_state = ActiveSt, - chan_state = ChanState - }) -> - ChanAttrs = emqx_channel:attrs(ChanState), - SockAttrs = #{active_n => ActiveN, - active_state => ActiveSt - }, - Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), - emqx_channel:handle_info({register, Attrs, stats(State)}, ChanState). +parse_incoming(Data, State) -> + parse_incoming(Data, [], State). -%%-------------------------------------------------------------------- -%% Process incoming data +parse_incoming(<<>>, Packets, State) -> + {Packets, State}; --compile({inline, [process_incoming/2]}). -process_incoming(Data, State) -> - process_incoming(Data, [], State). - -process_incoming(<<>>, Packets, State) -> - keep_state(State, next_incoming_events(Packets)); - -process_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> +parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> try emqx_frame:parse(Data, ParseState) of {more, NParseState} -> - NState = State#state{parse_state = NParseState}, - keep_state(NState, next_incoming_events(Packets)); + {Packets, State#state{parse_state = NParseState}}; {ok, Packet, Rest, NParseState} -> NState = State#state{parse_state = NParseState}, - process_incoming(Rest, [Packet|Packets], NState) + parse_incoming(Rest, [Packet|Packets], NState) catch error:Reason:Stk -> ?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nFrame data:~p", [Reason, Stk, Data]), - keep_state(State, next_incoming_events(Packets++[{frame_error, Reason}])) + {[{frame_error, Reason}|Packets], State} end. --compile({inline, [next_incoming_events/1]}). -next_incoming_events([]) -> []; -next_incoming_events(Packets) -> - [next_event(cast, {incoming, Packet}) || Packet <- Packets]. +next_incoming_msgs([Packet]) -> + {incoming, Packet}; +next_incoming_msgs(Packets) -> + [{incoming, Packet} || Packet <- lists:reverse(Packets)]. %%-------------------------------------------------------------------- %% Handle incoming packet -handle_incoming(Packet = ?PACKET(Type), SuccFun, State = #state{chan_state = ChanState}) -> +handle_incoming(Packet = ?PACKET(Type), State = #state{chan_state = ChanState}) -> _ = inc_incoming_stats(Type), - _ = emqx_metrics:inc_recv(Packet), + ok = emqx_metrics:inc_recv(Packet), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), - case emqx_channel:handle_in(Packet, ChanState) of - {ok, NChanState} -> - SuccFun(State#state{chan_state= NChanState}); - {ok, OutPackets, NChanState} -> - NState = State#state{chan_state = NChanState}, - handle_outgoing(OutPackets, SuccFun, NState); - {close, Reason, NChanState} -> - close(Reason, State#state{chan_state = NChanState}); - {close, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}); - {stop, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)) - end. + Result = emqx_channel:handle_in(Packet, ChanState), + handle_chan_return(Result, State); handle_incoming(FrameError = {frame_error, _Reason}, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_in(FrameError, ChanState) of - {close, Reason, NChanState} -> - close(Reason, State#state{chan_state = NChanState}); - {close, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}); - {stop, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)) - end. + Result = emqx_channel:handle_in(FrameError, ChanState), + handle_chan_return(Result, State). -%%------------------------------------------------------------------- -%% Handle deliver - -handle_deliver(Delivers, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_out({deliver, Delivers}, ChanState) of - {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); - {ok, Packets, NChanState} -> - handle_outgoing(Packets, fun keep_state/1, State#state{chan_state = NChanState}) - end. +handle_chan_return({ok, NChanState}, State) -> + {ok, State#state{chan_state = NChanState}}; +handle_chan_return({ok, OutPacket, NChanState}, State) + when is_record(OutPacket, mqtt_packet) -> + {ok, {outgoing, OutPacket}, State#state{chan_state = NChanState}}; +handle_chan_return({ok, Actions, NChanState}, State) -> + {ok, Actions, State#state{chan_state = NChanState}}; +handle_chan_return({stop, Reason, NChanState}, State) -> + {stop, Reason, State#state{chan_state = NChanState}}; +handle_chan_return({stop, Reason, OutPackets, NChanState}, State) -> + NState = State#state{chan_state = NChanState}, + {ok, NState1} = handle_outgoing(OutPackets, NState), + {stop, Reason, NState1}. %%-------------------------------------------------------------------- %% Handle outgoing packets +handle_outgoing(Packets, State) when is_list(Packets) -> + send(lists:map(serialize_and_inc_stats_fun(State), Packets), State); + handle_outgoing(Packet, State) -> - handle_outgoing(Packet, fun (_) -> ok end, State). - -handle_outgoing(Packets, SuccFun, State) when is_list(Packets) -> - send(lists:map(serialize_and_inc_stats_fun(State), Packets), SuccFun, State); - -handle_outgoing(Packet, SuccFun, State) -> - send((serialize_and_inc_stats_fun(State))(Packet), SuccFun, State). + send((serialize_and_inc_stats_fun(State))(Packet), State). serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet = ?PACKET(Type)) -> @@ -507,37 +510,68 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> %%-------------------------------------------------------------------- %% Send data -send(IoData, SuccFun, State = #state{transport = Transport, - socket = Socket, - chan_state = ChanState}) -> +send(IoData, State = #state{transport = Transport, + socket = Socket, + chan_state = ChanState}) -> Oct = iolist_size(IoData), ok = emqx_metrics:inc('bytes.sent', Oct), case Transport:async_send(Socket, IoData) of - ok -> NChanState = emqx_channel:sent(Oct, ChanState), - SuccFun(State#state{chan_state = NChanState}); - {error, Reason} -> - shutdown(Reason, State) + ok -> + NChanState = emqx_channel:sent(Oct, ChanState), + {ok, State#state{chan_state = NChanState}}; + Error = {error, _Reason} -> + %% Simulate an inet_reply to postpone handling the error + self() ! {inet_reply, Socket, Error}, + {ok, State} end. %%-------------------------------------------------------------------- -%% Handle timeout +%% Handle sockerr -handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_timeout(TRef, Msg, ChanState) of +handle_sockerr(_Reason, State = #state{active_st = closed}) -> + {ok, State}; + +handle_sockerr(Reason, State = #state{transport = Transport, + socket = Socket, + chan_state = ChanState}) -> + ?LOG(debug, "Socket error: ~p", [Reason]), + ok = Transport:fast_close(Socket), + NState = State#state{active_st = closed}, + case emqx_channel:handle_info({sockerr, Reason}, ChanState) of {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); - {ok, Packets, NChanState} -> - handle_outgoing(Packets, fun keep_state/1, State#state{chan_state = NChanState}); - {close, Reason, NChanState} -> - close(Reason, State#state{chan_state = NChanState}); - {close, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}); - {stop, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)) + {ok, NState#state{chan_state = NChanState}}; + {stop, NChanState} -> + {stop, {shutdown, Reason}, NState#state{chan_state = NChanState}} + end. + +socket_closed(Closed, State = #state{transport = Transport, + socket = Socket, + chan_state = ChanState}) -> + ?LOG(debug, "Socket closed: ~p", [Closed]), + ok = Transport:fast_close(Socket), + NState = State#state{active_st = closed}, + case emqx_channel:handle_info({sock_closed, Closed}, ChanState) of + {ok, NChanState} -> + {ok, NState#state{chan_state = NChanState}}; + {stop, NChanState} -> + NState = NState#state{chan_state = NChanState}, + {stop, {shutdown, Closed}, NState} + end. + +%%-------------------------------------------------------------------- +%% Activate Socket + +-compile({inline, [activate_socket/1]}). +activate_socket(State = #state{active_st = closed}) -> + {ok, State}; +activate_socket(State = #state{active_st = blocked}) -> + {ok, State}; +activate_socket(State = #state{transport = Transport, + socket = Socket, + active_n = N}) -> + case Transport:setopts(Socket, [{active, N}]) of + ok -> {ok, State#state{active_st = running}}; + Error -> Error end. %%-------------------------------------------------------------------- @@ -561,22 +595,10 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> {Pause, Rl1} -> ?LOG(debug, "Pause ~pms due to rate limit", [Pause]), TRef = erlang:send_after(Pause, self(), activate_socket), - NState = State#state{active_state = blocked, - limit_timer = TRef - }, + NState = State#state{active_st = blocked, limit_timer = TRef}, setelement(Pos, NState, Rl1) end. -%%-------------------------------------------------------------------- -%% Activate Socket - --compile({inline, [activate_socket/1]}). -activate_socket(#state{active_state = blocked}) -> ok; -activate_socket(#state{transport = Transport, - socket = Socket, - active_n = N}) -> - Transport:setopts(Socket, [{active, N}]). - %%-------------------------------------------------------------------- %% Inc incoming/outgoing stats @@ -590,44 +612,10 @@ inc_incoming_stats(Type) when is_integer(Type) -> true -> ok end. + -compile({inline, [inc_outgoing_stats/1]}). inc_outgoing_stats(Type) -> emqx_pd:update_counter(send_pkt, 1), (Type == ?PUBLISH) andalso emqx_pd:update_counter(send_msg, 1). -%%-------------------------------------------------------------------- -%% Helper functions - --compile({inline, - [ reply/3 - , keep_state/1 - , keep_state/2 - , next_event/2 - , shutdown/2 - , stop/2 - ]}). - -reply(From, Reply, State) -> - {keep_state, State, [{reply, From, Reply}]}. - -keep_state(State) -> - {keep_state, State}. - -keep_state(State, Events) -> - {keep_state, State, Events}. - -next_event(Type, Content) -> - {next_event, Type, Content}. - -close(Reason, State = #state{transport = Transport, socket = Socket}) -> - ?LOG(warning, "Closed for ~p", [Reason]), - ok = Transport:fast_close(Socket), - {next_state, disconnected, State}. - -shutdown(Reason, State) -> - stop({shutdown, Reason}, State). - -stop(Reason, State) -> - {stop, Reason, State}. -