From e61173e9bcf0256e9026c6069d4344176bb5454c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 6 Oct 2019 18:07:42 +0800 Subject: [PATCH 01/14] Add TODO --- TODO | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 TODO diff --git a/TODO b/TODO new file mode 100644 index 000000000..ca4c31fc3 --- /dev/null +++ b/TODO @@ -0,0 +1,8 @@ + +- emqx_connection + Biz message + Timeout message + Sys message + EXIT message + Batch Process + Postponed Queue From e718fa8249ad9d734898edb11f370d5f13a48eb7 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 8 Oct 2019 17:59:11 +0800 Subject: [PATCH 02/14] Rewrite the 'emqx_connection' module using a raw erlang process --- src/emqx_channel.erl | 67 ++-- src/emqx_connection.erl | 682 ++++++++++++++++++++-------------------- 2 files changed, 374 insertions(+), 375 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 01e32410d..670b8f702 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -31,7 +31,7 @@ , caps/1 ]). -%% Exports for unit tests:( +%% Test Exports -export([set_field/3]). -export([ init/2 @@ -97,6 +97,13 @@ disconnected_at := pos_integer() }). +-type(action() :: {enter, connected | disconnected} + | {close, Reason :: atom()} + | {outgoing, emqx_types:packet()} + | {outgoing, [emqx_types:packet()]}). + +-type(output() :: emqx_types:packet() | action() | [action()]). + -define(TIMER_TABLE, #{ stats_timer => emit_stats, alive_timer => keepalive, @@ -223,12 +230,9 @@ init_gc_state(Zone) -> -spec(handle_in(emqx_types:packet(), channel()) -> {ok, channel()} - | {ok, emqx_types:packet(), channel()} - | {ok, list(emqx_types:packet()), channel()} - | {close, channel()} - | {close, emqx_types:packet(), channel()} - | {stop, Error :: term(), channel()} - | {stop, Error :: term(), emqx_types:packet(), channel()}). + | {ok, output(), channel()} + | {stop, Reason :: term(), channel()} + | {stop, Reason :: term(), output(), channel()}). handle_in(?CONNECT_PACKET(_), Channel = #channel{state = #{state_name := connected}}) -> handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); @@ -243,35 +247,36 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> {ok, NConnPkt, NChannel} -> process_connect(NConnPkt, NChannel); {error, ReasonCode, NChannel} -> - handle_out({connack, emqx_reason_codes:formalized(connack, ReasonCode), ConnPkt}, NChannel) + ReasonName = emqx_reason_codes:formalized(connack, ReasonCode), + handle_out({connack, ReasonName, ConnPkt}, NChannel) end; handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> - Channel1 = inc_pub_stats(publish_in, Channel), + NChannel = inc_pub_stats(publish_in, Channel), case emqx_packet:check(Packet) of - ok -> handle_publish(Packet, Channel1); + ok -> handle_publish(Packet, NChannel); {error, ReasonCode} -> - handle_out({disconnect, ReasonCode}, Channel1) + handle_out({disconnect, ReasonCode}, NChannel) end; handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> - Channel1 = inc_pub_stats(puback_in, Channel), + NChannel = inc_pub_stats(puback_in, Channel), case emqx_session:puback(PacketId, Session) of {ok, Msg, Publishes, NSession} -> ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), - handle_out({publish, Publishes}, Channel1#channel{session = NSession}); + handle_out({publish, Publishes}, NChannel#channel{session = NSession}); {ok, Msg, NSession} -> ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), - {ok, Channel1#channel{session = NSession}}; + {ok, NChannel#channel{session = NSession}}; {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), ok = emqx_metrics:inc('packets.puback.inuse'), - {ok, Channel1}; + {ok, NChannel}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.puback.missed'), - {ok, Channel1} + {ok, NChannel} end; handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), @@ -342,7 +347,7 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), end; handle_in(?PACKET(?PINGREQ), Channel) -> - {ok, ?PACKET(?PINGRESP), Channel}; + {ok, Channel, {outgoing, ?PACKET(?PINGRESP)}}; handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) -> #{proto_ver := ProtoVer, expiry_interval := OldInterval} = ConnInfo, @@ -360,7 +365,7 @@ handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninf {stop, ReasonName, Channel1}; true -> Channel2 = Channel1#channel{conninfo = ConnInfo#{expiry_interval => Interval}}, - {close, ReasonName, Channel2} + {ok, {close, ReasonName}, Channel2} end; handle_in(?AUTH_PACKET(), Channel) -> @@ -371,7 +376,8 @@ handle_in({frame_error, Reason}, Channel = #channel{state = FsmState}) -> #{state_name := initialized} -> {stop, {shutdown, Reason}, Channel}; #{state_name := connecting} -> - {stop, {shutdown, Reason}, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel}; + Packet = ?CONNACK_PACKET(?RC_MALFORMED_PACKET), + {stop, {shutdown, Reason}, Packet, Channel}; #{state_name := connected} -> handle_out({disconnect, ?RC_MALFORMED_PACKET}, Channel); #{state_name := disconnected} -> @@ -528,7 +534,7 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel = %% Handle outgoing packet %%-------------------------------------------------------------------- -%%TODO: RunFold or Pipeline +%% TODO: RunFold or Pipeline handle_out({connack, ?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo, @@ -553,7 +559,8 @@ handle_out({connack, ?RC_SUCCESS, SP, ConnPkt}, resuming = false, pendings = []}, {ok, Packets, _} = handle_out({publish, Publishes}, Channel3), - {ok, [AckPacket|Packets], Channel3} + Output = [{outgoing, [AckPacket|Packets]}, {enter, connected}], + {ok, Output, Channel3} end; handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo, @@ -594,12 +601,12 @@ handle_out({publish, Publishes}, Channel) when is_list(Publishes) -> end end, [], Publishes), NChannel = inc_pub_stats(publish_out, length(Packets), Channel), - {ok, lists:reverse(Packets), NChannel}; + {ok, {outgoing, lists:reverse(Packets)}, NChannel}; %% Ignore loop deliver handle_out({publish, _PacketId, #message{from = ClientId, flags = #{nl := true}}}, - Channel = #channel{clientinfo = #{clientid := ClientId}}) -> + Channel = #channel{clientinfo = #{clientid := ClientId}}) -> {ok, Channel}; handle_out({publish, PacketId, Msg}, Channel = @@ -640,7 +647,6 @@ handle_out({disconnect, ReasonCode}, Channel = #channel{conninfo = #{proto_ver : ReasonName = emqx_reason_codes:name(ReasonCode, ProtoVer), handle_out({disconnect, ReasonCode, ReasonName}, Channel); -%%TODO: Improve later... handle_out({disconnect, ReasonCode, ReasonName}, Channel = #channel{conninfo = #{proto_ver := ProtoVer, expiry_interval := ExpiryInterval}}) -> @@ -650,14 +656,19 @@ handle_out({disconnect, ReasonCode, ReasonName}, {0, _Ver} -> {stop, ReasonName, Channel}; {?UINT_MAX, ?MQTT_PROTO_V5} -> - {close, ReasonName, ?DISCONNECT_PACKET(ReasonCode), Channel}; + Output = [{outgoing, ?DISCONNECT_PACKET(ReasonCode)}, + {close, ReasonName}], + {ok, Output, Channel}; {?UINT_MAX, _Ver} -> - {close, ReasonName, Channel}; + {ok, {close, ReasonName}, Channel}; {Interval, ?MQTT_PROTO_V5} -> NChannel = ensure_timer(expire_timer, Interval, Channel), - {close, ReasonName, ?DISCONNECT_PACKET(ReasonCode), NChannel}; + Output = [{outgoing, ?DISCONNECT_PACKET(ReasonCode)}, + {close, ReasonName}], + {ok, Output, NChannel}; {Interval, _Ver} -> - {close, ReasonName, ensure_timer(expire_timer, Interval, Channel)} + NChannel = ensure_timer(expire_timer, Interval, Channel), + {ok, {close, ReasonName}, NChannel} end; handle_out({Type, Data}, Channel) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index e577fd2cc..571d08ba1 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -14,41 +14,40 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% MQTT TCP/SSL Connection -module(emqx_connection). --behaviour(gen_statem). - -include("emqx.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). -include("types.hrl"). --logger_header("[Connection]"). +-logger_header("[MQTT]"). --export([start_link/3]). +%% API +-export([ start_link/3 + , call/2 + , stop/1 + ]). -%% APIs -export([ info/1 , stats/1 ]). --export([call/2]). +-export([init/4]). -%% state callbacks --export([ idle/3 - , connected/3 - , disconnected/3 +%% Sys callbacks +-export([ system_continue/3 + , system_terminate/4 + , system_code_change/4 + , system_get_state/1 ]). -%% gen_statem callbacks --export([ init/1 - , callback_mode/0 - , code_change/4 - , terminate/3 - ]). +%% Internal callbacks +-export([wakeup_from_hib/2]). -record(state, { + %% Parent + parent :: pid(), %% TCP/TLS Transport transport :: esockd:transport(), %% TCP/TLS Socket @@ -60,7 +59,7 @@ %% The {active, N} option active_n :: pos_integer(), %% The active state - active_state :: running | blocked, + active_st :: idle | running | blocked | closed, %% Publish Limit pub_limit :: maybe(esockd_rate_limit:bucket()), %% Rate Limit @@ -72,39 +71,40 @@ %% Serialize function serialize :: emqx_frame:serialize_fun(), %% Channel State - chan_state :: emqx_channel:channel() + chan_state :: emqx_channel:channel(), + %% Idle timer + idle_timer :: reference() }). -type(state() :: #state{}). -define(ACTIVE_N, 100). --define(HANDLE(T, C, D), handle((T), (C), (D))). -define(INFO_KEYS, [socktype, peername, sockname, active_n, active_state, pub_limit, rate_limit]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -%% @doc Start the connection. -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist()) -> {ok, pid()}). start_link(Transport, Socket, Options) -> - {ok, proc_lib:spawn_link(?MODULE, init, [{Transport, Socket, Options}])}. + CPid = proc_lib:spawn_link(?MODULE, init, [self(), Transport, Socket, Options]), + {ok, CPid}. %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- -%% @doc Get infos of the connection. +%% @doc Get infos of the connection/channel. -spec(info(pid()|state()) -> emqx_types:infos()). info(CPid) when is_pid(CPid) -> call(CPid, info); -info(Conn = #state{chan_state = ChanState}) -> +info(State = #state{chan_state = ChanState}) -> ChanInfo = emqx_channel:info(ChanState), - SockInfo = maps:from_list(info(?INFO_KEYS, Conn)), + SockInfo = maps:from_list(info(?INFO_KEYS, State)), maps:merge(ChanInfo, #{sockinfo => SockInfo}). -info(Keys, Conn) when is_list(Keys) -> - [{Key, info(Key, Conn)} || Key <- Keys]; +info(Keys, State) when is_list(Keys) -> + [{Key, info(Key, State)} || Key <- Keys]; info(socktype, #state{transport = Transport, socket = Socket}) -> Transport:type(Socket); info(peername, #state{peername = Peername}) -> @@ -113,7 +113,7 @@ info(sockname, #state{sockname = Sockname}) -> Sockname; info(active_n, #state{active_n = ActiveN}) -> ActiveN; -info(active_state, #state{active_state = ActiveSt}) -> +info(active_st, #state{active_st= ActiveSt}) -> ActiveSt; info(pub_limit, #state{pub_limit = PubLimit}) -> limit_info(PubLimit); @@ -125,7 +125,7 @@ info(chan_state, #state{chan_state = ChanState}) -> limit_info(Limit) -> emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit). -%% @doc Get stats of the channel. +%% @doc Get stats of the connection/channel. -spec(stats(pid()|state()) -> emqx_types:stats()). stats(CPid) when is_pid(CPid) -> call(CPid, stats); @@ -141,18 +141,21 @@ stats(#state{transport = Transport, ProcStats = emqx_misc:proc_stats(), lists:append([SockStats, ConnStats, ChanStats, ProcStats]). -%% kick|discard|takeover --spec(call(pid(), Req :: term()) -> Reply :: term()). -call(CPid, Req) -> gen_statem:call(CPid, Req). +call(Pid, Req) -> + gen_server:call(Pid, Req, infinity). + +stop(Pid) -> + gen_server:stop(Pid). %%-------------------------------------------------------------------- -%% gen_statem callbacks +%% callbacks %%-------------------------------------------------------------------- -init({Transport, RawSocket, Options}) -> +init(Parent, Transport, RawSocket, Options) -> {ok, Socket} = Transport:wait(RawSocket), {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), + emqx_logger:set_metadata_peername(esockd_net:format(Peername)), Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]), ConnInfo = #{socktype => Transport:type(Socket), peername => Peername, @@ -160,7 +163,6 @@ init({Transport, RawSocket, Options}) -> peercert => Peercert, conn_mod => ?MODULE }, - emqx_logger:set_metadata_peername(esockd_net:format(Peername)), Zone = proplists:get_value(zone, Options), ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)), @@ -169,327 +171,328 @@ init({Transport, RawSocket, Options}) -> ParseState = emqx_frame:initial_parse_state(FrameOpts), Serialize = emqx_frame:serialize_fun(), ChanState = emqx_channel:init(ConnInfo, Options), - State = #state{transport = Transport, + IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), + IdleTimer = emqx_misc:start_timer(IdleTimout, idle_timeout), + HibAfterTimeout = emqx_zone:get_env(Zone, hibernate_after, IdleTimout*2), + State = #state{parent = Parent, + transport = Transport, socket = Socket, peername = Peername, sockname = Sockname, active_n = ActiveN, - active_state = running, + active_st = idle, pub_limit = PubLimit, rate_limit = RateLimit, parse_state = ParseState, serialize = Serialize, - chan_state = ChanState + chan_state = ChanState, + idle_timer = IdleTimer }, - IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), - gen_statem:enter_loop(?MODULE, [{hibernate_after, 2 * IdleTimout}], - idle, State, self(), [IdleTimout]). + case activate_socket(State) of + {ok, NState} -> + recvloop(NState, #{hibernate_after => HibAfterTimeout}); + {error, Reason} -> + Transport:fast_close(Socket), + erlang:exit({shutdown, Reason}) + end. -compile({inline, [init_limiter/1]}). init_limiter(undefined) -> undefined; init_limiter({Rate, Burst}) -> esockd_rate_limit:new(Rate, Burst). --compile({inline, [callback_mode/0]}). -callback_mode() -> - [state_functions, state_enter]. +%%-------------------------------------------------------------------- +%% Recv Loop + +recvloop(State = #state{parent = Parent}, + Options = #{hibernate_after := HibAfterTimeout}) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Options}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + process_msg([Msg], State, Options) + after + HibAfterTimeout -> + hibernate(State, Options) + end. + +hibernate(State, Options) -> + proc_lib:hibernate(?MODULE, wakeup_from_hib, [State, Options]). + +wakeup_from_hib(State, Options) -> + %% Maybe do something later here. + recvloop(State, Options). %%-------------------------------------------------------------------- -%% Idle State +%% Process next Msg -idle(enter, _, State) -> - case activate_socket(State) of - ok -> keep_state_and_data; - {error, Reason} -> - shutdown(Reason, State) +process_msg([], State, Options) -> + recvloop(State, Options); +process_msg([Msg|More], State, Options) -> + case catch handle_msg(Msg, State) of + ok -> + process_msg(More, State, Options); + {ok, NState} -> + process_msg(More, NState, Options); + {ok, NextMsgs, NState} -> + process_msg(append_msg(NextMsgs, More), NState, Options); + {stop, Reason} -> + terminate(Reason, State); + {stop, Reason, NState} -> + terminate(Reason, NState); + {'EXIT', Reason} -> + terminate(Reason, State) + end. + +-compile({inline, [append_msg/2]}). +append_msg(NextMsgs, L) when is_list(NextMsgs) -> + lists:append(NextMsgs, L); +append_msg(NextMsg, L) -> [NextMsg|L]. + +%%-------------------------------------------------------------------- +%% Handle a Msg + +handle_msg({'$gen_call', From, Req}, State) -> + case handle_call(From, Req, State) of + {reply, Reply, NState} -> + gen_server:reply(From, Reply), + {ok, NState}; + {stop, Reason, Reply, NState} -> + gen_server:reply(From, Reply), + {stop, Reason, NState} end; -idle(timeout, _Timeout, State) -> - shutdown(idle_timeout, State); - -idle(cast, {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) -> - SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end, - Serialize = emqx_frame:serialize_fun(ConnPkt), - NState = State#state{serialize = Serialize}, - handle_incoming(Packet, SuccFun, NState); - -idle(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> - SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end, - handle_incoming(Packet, SuccFun, State); - -idle(cast, {incoming, FrameError = {frame_error, _Reason}}, State) -> - handle_incoming(FrameError, State); - -idle(EventType, Content, State) -> - ?HANDLE(EventType, Content, State). - -%%-------------------------------------------------------------------- -%% Connected State - -connected(enter, _PrevSt, State) -> - ok = register_self(State), - keep_state_and_data; - -connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> - handle_incoming(Packet, fun keep_state/1, State); - -connected(cast, {incoming, FrameError = {frame_error, _Reason}}, State) -> - handle_incoming(FrameError, State); - -connected(info, Deliver = {deliver, _Topic, _Msg}, State) -> - handle_deliver(emqx_misc:drain_deliver([Deliver]), State); - -connected(EventType, Content, State) -> - ?HANDLE(EventType, Content, State). - -%%-------------------------------------------------------------------- -%% Disconnected State - -disconnected(enter, _, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_info(disconnected, ChanState) of - {ok, NChanState} -> - ok = register_self(State#state{chan_state = NChanState}), - keep_state(State#state{chan_state = NChanState}); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) - end; - -disconnected(info, Deliver = {deliver, _Topic, _Msg}, State) -> - handle_deliver([Deliver], State); - -disconnected(EventType, Content, State) -> - ?HANDLE(EventType, Content, State). - -%%-------------------------------------------------------------------- -%% Handle call - -handle({call, From}, info, State) -> - reply(From, info(State), State); - -handle({call, From}, stats, State) -> - reply(From, stats(State), State); - -handle({call, From}, state, State) -> - reply(From, State, State); - -handle({call, From}, Req, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_call(Req, ChanState) of - {ok, Reply, NChanState} -> - reply(From, Reply, State#state{chan_state = NChanState}); - {stop, Reason, Reply, NChanState} -> - ok = gen_statem:reply(From, Reply), - stop(Reason, State#state{chan_state = NChanState}); - {stop, Reason, Packet, Reply, NChanState} -> - handle_outgoing(Packet, State#state{chan_state = NChanState}), - ok = gen_statem:reply(From, Reply), - stop(Reason, State#state{chan_state = NChanState}) - end; - -%%-------------------------------------------------------------------- -%% Handle cast - -handle(cast, Msg, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_info(Msg, ChanState) of - ok -> {ok, State}; - {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) - end; - -%%-------------------------------------------------------------------- -%% Handle info - %% Handle incoming data -handle(info, {Inet, _Sock, Data}, State = #state{chan_state = ChanState}) +handle_msg({Inet, _Sock, Data}, State = #state{chan_state = ChanState}) when Inet == tcp; Inet == ssl -> ?LOG(debug, "RECV ~p", [Data]), Oct = iolist_size(Data), emqx_pd:update_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), NChanState = emqx_channel:received(Oct, ChanState), - NState = State#state{chan_state = NChanState}, - process_incoming(Data, NState); + State1 = State#state{chan_state = NChanState}, + {Packets, State2} = parse_incoming(Data, State1), + {ok, next_incoming_msgs(Packets), State2}; -handle(info, {Error, _Sock, Reason}, State) +%% Handle incoming packets +handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, + State = #state{idle_timer = IdleTimer}) -> + ok = emqx_misc:cancel_timer(IdleTimer), + NState = State#state{serialize = emqx_frame:serialize_fun(ConnPkt), + idle_timer = undefined + }, + handle_incoming(Packet, NState); + +handle_msg({incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> + handle_incoming(Packet, State); + +handle_msg({enter, connected}, State = #state{active_n = ActiveN, + active_st = ActiveSt, + chan_state = ChanState + }) -> + ChanAttrs = emqx_channel:attrs(ChanState), + SockAttrs = #{active_n => ActiveN, + active_st => ActiveSt + }, + Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), + emqx_channel:handle_info({register, Attrs, stats(State)}, ChanState); + +handle_msg({Error, _Sock, Reason}, State) when Error == tcp_error; Error == ssl_error -> - shutdown(Reason, State); + handle_sockerr(Reason, State); -handle(info, {Closed, _Sock}, State) +handle_msg({Closed, _Sock}, State) when Closed == tcp_closed; Closed == ssl_closed -> - {next_state, disconnected, State}; + socket_closed(Closed, State); -handle(info, {Passive, _Sock}, State) +handle_msg({Passive, _Sock}, State) when Passive == tcp_passive; Passive == ssl_passive -> - %% Rate limit here:) + %% Rate limit and activate socket here. NState = ensure_rate_limit(State), case activate_socket(NState) of - ok -> keep_state(NState); + {ok, NState} -> {ok, NState}; {error, Reason} -> - shutdown(Reason, NState) + handle_sockerr(Reason, State) end; -handle(info, activate_socket, State) -> - %% Rate limit timer expired. - NState = State#state{active_state = running, - limit_timer = undefined +%% Rate limit timer expired. +handle_msg(activate_socket, State) -> + NState = State#state{active_st = idle, + limit_timer = undefined }, case activate_socket(NState) of - ok -> keep_state(NState); + {ok, NState} -> {ok, NState}; {error, Reason} -> - shutdown(Reason, NState) + handle_sockerr(Reason, State) end; -handle(info, {inet_reply, _Sock, ok}, _State) -> - %% something sent - keep_state_and_data; +handle_msg(Deliver = {deliver, _Topic, _Msg}, + State = #state{chan_state = ChanState}) -> + Delivers = emqx_misc:drain_deliver([Deliver]), + Result = emqx_channel:handle_out({deliver, Delivers}, ChanState), + handle_chan_return(Result, State); -handle(info, {inet_reply, _Sock, {error, Reason}}, State) -> - shutdown(Reason, State); +handle_msg({outgoing, Packets}, State) -> + handle_outgoing(Packets, State); -handle(info, {timeout, TRef, keepalive}, - State = #state{transport = Transport, socket = Socket}) -> +%% something sent +handle_msg({inet_reply, _Sock, ok}, _State) -> + ok; + +handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> + handle_sockerr(Reason, State); + +handle_msg({timeout, TRef, TMsg}, State) when is_reference(TRef) -> + handle_timeout(TRef, TMsg, State); + +handle_msg(Shutdown = {shutdown, _Reason}, State) -> + {stop, Shutdown, State}; + +handle_msg(Msg, State = #state{chan_state = ChanState}) -> + case emqx_channel:handle_info(Msg, ChanState) of + {ok, NChanState} -> + {ok, State#state{chan_state = NChanState}}; + {stop, Reason, NChanState} -> + {stop, Reason, State#state{chan_state = NChanState}} + end. + +%%-------------------------------------------------------------------- +%% Terminate + +terminate(Reason, #state{transport = Transport, + socket = Socket, + active_st = ActiveSt, + chan_state = ChanState}) -> + ?LOG(debug, "Terminated for ~p", [Reason]), + ActiveSt =:= closed orelse Transport:fast_close(Socket), + emqx_channel:terminate(Reason, ChanState), + exit(Reason). + +%%-------------------------------------------------------------------- +%% Sys callbacks + +system_continue(_Parent, _Deb, {State, Options}) -> + recvloop(State, Options). + +system_terminate(Reason, _Parent, _Deb, {State, _}) -> + terminate(Reason, State). + +system_code_change(Misc, _, _, _) -> + {ok, Misc}. + +system_get_state({State, _Options}) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Handle call + +handle_call(_From, info, State) -> + {reply, info(State), State}; + +handle_call(_From, stats, State) -> + {reply, stats(State), State}; + +%% TODO: the handle_outgoing is not right ... +handle_call(_From, Req, State = #state{chan_state = ChanState}) -> + case emqx_channel:handle_call(Req, ChanState) of + {ok, Reply, NChanState} -> + {reply, Reply, State#state{chan_state = NChanState}}; + {stop, Reason, Reply, NChanState} -> + {stop, Reason, Reply, State#state{chan_state = NChanState}}; + {stop, Reason, Packet, Reply, NChanState} -> + State1 = State#state{chan_state = NChanState}, + {ok, State2} = handle_outgoing(Packet, State1), + {stop, Reason, Reply, State2} + end. + +%%-------------------------------------------------------------------- +%% Handle timeout + +handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) -> + {stop, idle_timeout, State}; + +handle_timeout(TRef, emit_stats, State) -> + handle_timeout(TRef, {emit_stats, stats(State)}, State); + +handle_timeout(TRef, keepalive, State = #state{transport = Transport, + socket = Socket}) -> case Transport:getstat(Socket, [recv_oct]) of {ok, [{recv_oct, RecvOct}]} -> handle_timeout(TRef, {keepalive, RecvOct}, State); {error, Reason} -> - shutdown(Reason, State) + handle_sockerr(Reason, State) end; -handle(info, {timeout, TRef, emit_stats}, State) -> - handle_timeout(TRef, {emit_stats, stats(State)}, State); - -handle(info, {timeout, TRef, Msg}, State) -> - handle_timeout(TRef, Msg, State); - -handle(info, {shutdown, Reason}, State) -> - shutdown(Reason, State); - -handle(info, Info, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_info(Info, ChanState) of - {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) - end. - -code_change(_Vsn, State, Data, _Extra) -> - {ok, State, Data}. - -terminate(Reason, _StateName, #state{transport = Transport, - socket = Socket, - chan_state = ChanState - }) -> - ?LOG(debug, "Terminated for ~p", [Reason]), - ok = Transport:fast_close(Socket), - emqx_channel:terminate(Reason, ChanState). +handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) -> + Result = emqx_channel:handle_timeout(TRef, Msg, ChanState), + handle_chan_return(Result, State). %%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- +%% Parse incoming data. -register_self(State = #state{active_n = ActiveN, - active_state = ActiveSt, - chan_state = ChanState - }) -> - ChanAttrs = emqx_channel:attrs(ChanState), - SockAttrs = #{active_n => ActiveN, - active_state => ActiveSt - }, - Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), - emqx_channel:handle_info({register, Attrs, stats(State)}, ChanState). +parse_incoming(Data, State) -> + parse_incoming(Data, [], State). -%%-------------------------------------------------------------------- -%% Process incoming data +parse_incoming(<<>>, Packets, State) -> + {Packets, State}; --compile({inline, [process_incoming/2]}). -process_incoming(Data, State) -> - process_incoming(Data, [], State). - -process_incoming(<<>>, Packets, State) -> - keep_state(State, next_incoming_events(Packets)); - -process_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> +parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> try emqx_frame:parse(Data, ParseState) of {more, NParseState} -> - NState = State#state{parse_state = NParseState}, - keep_state(NState, next_incoming_events(Packets)); + {Packets, State#state{parse_state = NParseState}}; {ok, Packet, Rest, NParseState} -> NState = State#state{parse_state = NParseState}, - process_incoming(Rest, [Packet|Packets], NState) + parse_incoming(Rest, [Packet|Packets], NState) catch error:Reason:Stk -> ?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nFrame data:~p", [Reason, Stk, Data]), - keep_state(State, next_incoming_events(Packets++[{frame_error, Reason}])) + {[{frame_error, Reason}|Packets], State} end. --compile({inline, [next_incoming_events/1]}). -next_incoming_events([]) -> []; -next_incoming_events(Packets) -> - [next_event(cast, {incoming, Packet}) || Packet <- Packets]. +next_incoming_msgs([Packet]) -> + {incoming, Packet}; +next_incoming_msgs(Packets) -> + [{incoming, Packet} || Packet <- lists:reverse(Packets)]. %%-------------------------------------------------------------------- %% Handle incoming packet -handle_incoming(Packet = ?PACKET(Type), SuccFun, State = #state{chan_state = ChanState}) -> +handle_incoming(Packet = ?PACKET(Type), State = #state{chan_state = ChanState}) -> _ = inc_incoming_stats(Type), - _ = emqx_metrics:inc_recv(Packet), + ok = emqx_metrics:inc_recv(Packet), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), - case emqx_channel:handle_in(Packet, ChanState) of - {ok, NChanState} -> - SuccFun(State#state{chan_state= NChanState}); - {ok, OutPackets, NChanState} -> - NState = State#state{chan_state = NChanState}, - handle_outgoing(OutPackets, SuccFun, NState); - {close, Reason, NChanState} -> - close(Reason, State#state{chan_state = NChanState}); - {close, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}); - {stop, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)) - end. + Result = emqx_channel:handle_in(Packet, ChanState), + handle_chan_return(Result, State); handle_incoming(FrameError = {frame_error, _Reason}, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_in(FrameError, ChanState) of - {close, Reason, NChanState} -> - close(Reason, State#state{chan_state = NChanState}); - {close, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}); - {stop, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)) - end. + Result = emqx_channel:handle_in(FrameError, ChanState), + handle_chan_return(Result, State). -%%------------------------------------------------------------------- -%% Handle deliver - -handle_deliver(Delivers, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_out({deliver, Delivers}, ChanState) of - {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); - {ok, Packets, NChanState} -> - handle_outgoing(Packets, fun keep_state/1, State#state{chan_state = NChanState}) - end. +handle_chan_return({ok, NChanState}, State) -> + {ok, State#state{chan_state = NChanState}}; +handle_chan_return({ok, OutPacket, NChanState}, State) + when is_record(OutPacket, mqtt_packet) -> + {ok, {outgoing, OutPacket}, State#state{chan_state = NChanState}}; +handle_chan_return({ok, Actions, NChanState}, State) -> + {ok, Actions, State#state{chan_state = NChanState}}; +handle_chan_return({stop, Reason, NChanState}, State) -> + {stop, Reason, State#state{chan_state = NChanState}}; +handle_chan_return({stop, Reason, OutPackets, NChanState}, State) -> + NState = State#state{chan_state = NChanState}, + {ok, NState1} = handle_outgoing(OutPackets, NState), + {stop, Reason, NState1}. %%-------------------------------------------------------------------- %% Handle outgoing packets +handle_outgoing(Packets, State) when is_list(Packets) -> + send(lists:map(serialize_and_inc_stats_fun(State), Packets), State); + handle_outgoing(Packet, State) -> - handle_outgoing(Packet, fun (_) -> ok end, State). - -handle_outgoing(Packets, SuccFun, State) when is_list(Packets) -> - send(lists:map(serialize_and_inc_stats_fun(State), Packets), SuccFun, State); - -handle_outgoing(Packet, SuccFun, State) -> - send((serialize_and_inc_stats_fun(State))(Packet), SuccFun, State). + send((serialize_and_inc_stats_fun(State))(Packet), State). serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet = ?PACKET(Type)) -> @@ -507,37 +510,68 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> %%-------------------------------------------------------------------- %% Send data -send(IoData, SuccFun, State = #state{transport = Transport, - socket = Socket, - chan_state = ChanState}) -> +send(IoData, State = #state{transport = Transport, + socket = Socket, + chan_state = ChanState}) -> Oct = iolist_size(IoData), ok = emqx_metrics:inc('bytes.sent', Oct), case Transport:async_send(Socket, IoData) of - ok -> NChanState = emqx_channel:sent(Oct, ChanState), - SuccFun(State#state{chan_state = NChanState}); - {error, Reason} -> - shutdown(Reason, State) + ok -> + NChanState = emqx_channel:sent(Oct, ChanState), + {ok, State#state{chan_state = NChanState}}; + Error = {error, _Reason} -> + %% Simulate an inet_reply to postpone handling the error + self() ! {inet_reply, Socket, Error}, + {ok, State} end. %%-------------------------------------------------------------------- -%% Handle timeout +%% Handle sockerr -handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_timeout(TRef, Msg, ChanState) of +handle_sockerr(_Reason, State = #state{active_st = closed}) -> + {ok, State}; + +handle_sockerr(Reason, State = #state{transport = Transport, + socket = Socket, + chan_state = ChanState}) -> + ?LOG(debug, "Socket error: ~p", [Reason]), + ok = Transport:fast_close(Socket), + NState = State#state{active_st = closed}, + case emqx_channel:handle_info({sockerr, Reason}, ChanState) of {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); - {ok, Packets, NChanState} -> - handle_outgoing(Packets, fun keep_state/1, State#state{chan_state = NChanState}); - {close, Reason, NChanState} -> - close(Reason, State#state{chan_state = NChanState}); - {close, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}); - {stop, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)) + {ok, NState#state{chan_state = NChanState}}; + {stop, NChanState} -> + {stop, {shutdown, Reason}, NState#state{chan_state = NChanState}} + end. + +socket_closed(Closed, State = #state{transport = Transport, + socket = Socket, + chan_state = ChanState}) -> + ?LOG(debug, "Socket closed: ~p", [Closed]), + ok = Transport:fast_close(Socket), + NState = State#state{active_st = closed}, + case emqx_channel:handle_info({sock_closed, Closed}, ChanState) of + {ok, NChanState} -> + {ok, NState#state{chan_state = NChanState}}; + {stop, NChanState} -> + NState = NState#state{chan_state = NChanState}, + {stop, {shutdown, Closed}, NState} + end. + +%%-------------------------------------------------------------------- +%% Activate Socket + +-compile({inline, [activate_socket/1]}). +activate_socket(State = #state{active_st = closed}) -> + {ok, State}; +activate_socket(State = #state{active_st = blocked}) -> + {ok, State}; +activate_socket(State = #state{transport = Transport, + socket = Socket, + active_n = N}) -> + case Transport:setopts(Socket, [{active, N}]) of + ok -> {ok, State#state{active_st = running}}; + Error -> Error end. %%-------------------------------------------------------------------- @@ -561,22 +595,10 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> {Pause, Rl1} -> ?LOG(debug, "Pause ~pms due to rate limit", [Pause]), TRef = erlang:send_after(Pause, self(), activate_socket), - NState = State#state{active_state = blocked, - limit_timer = TRef - }, + NState = State#state{active_st = blocked, limit_timer = TRef}, setelement(Pos, NState, Rl1) end. -%%-------------------------------------------------------------------- -%% Activate Socket - --compile({inline, [activate_socket/1]}). -activate_socket(#state{active_state = blocked}) -> ok; -activate_socket(#state{transport = Transport, - socket = Socket, - active_n = N}) -> - Transport:setopts(Socket, [{active, N}]). - %%-------------------------------------------------------------------- %% Inc incoming/outgoing stats @@ -590,44 +612,10 @@ inc_incoming_stats(Type) when is_integer(Type) -> true -> ok end. + -compile({inline, [inc_outgoing_stats/1]}). inc_outgoing_stats(Type) -> emqx_pd:update_counter(send_pkt, 1), (Type == ?PUBLISH) andalso emqx_pd:update_counter(send_msg, 1). -%%-------------------------------------------------------------------- -%% Helper functions - --compile({inline, - [ reply/3 - , keep_state/1 - , keep_state/2 - , next_event/2 - , shutdown/2 - , stop/2 - ]}). - -reply(From, Reply, State) -> - {keep_state, State, [{reply, From, Reply}]}. - -keep_state(State) -> - {keep_state, State}. - -keep_state(State, Events) -> - {keep_state, State, Events}. - -next_event(Type, Content) -> - {next_event, Type, Content}. - -close(Reason, State = #state{transport = Transport, socket = Socket}) -> - ?LOG(warning, "Closed for ~p", [Reason]), - ok = Transport:fast_close(Socket), - {next_state, disconnected, State}. - -shutdown(Reason, State) -> - stop({shutdown, Reason}, State). - -stop(Reason, State) -> - {stop, Reason, State}. - From d50c0088d2376f4ecf84a1d215a634873fb8999d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 10 Oct 2019 13:57:20 +0800 Subject: [PATCH 03/14] Add a new type 'sockstate/0' --- TODO | 8 -------- 1 file changed, 8 deletions(-) delete mode 100644 TODO diff --git a/TODO b/TODO deleted file mode 100644 index ca4c31fc3..000000000 --- a/TODO +++ /dev/null @@ -1,8 +0,0 @@ - -- emqx_connection - Biz message - Timeout message - Sys message - EXIT message - Batch Process - Postponed Queue From 0c37c65a594f02d657087d0586e8fabd0287cb9f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 10 Oct 2019 13:59:34 +0800 Subject: [PATCH 04/14] Add a new type 'sockstate/0' --- src/emqx_types.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/emqx_types.erl b/src/emqx_types.erl index c2744b3c6..c895393b9 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -31,7 +31,9 @@ , subid/0 ]). --export_type([ conninfo/0 +-export_type([ socktype/0 + , sockstate/0 + , conninfo/0 , clientinfo/0 , clientid/0 , username/0 @@ -97,6 +99,7 @@ -type(subid() :: binary() | atom()). -type(socktype() :: tcp | udp | ssl | proxy | atom()). +-type(sockstate() :: idle | running | blocked | closed). -type(conninfo() :: #{socktype := socktype(), sockname := peername(), peername := peername(), From 7512d6cb038b7349dec209bf8cb4501b23d15ca9 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Fri, 11 Oct 2019 11:13:52 +0800 Subject: [PATCH 05/14] Check topic level for publish packet and optimize the handling of rap --- src/emqx_channel.erl | 6 +++--- src/emqx_mqtt_caps.erl | 12 ++++++++++-- src/emqx_session.erl | 8 ++++---- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 01e32410d..36a1a7a40 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -1064,11 +1064,11 @@ check_pub_alias(_Packet, _Channel) -> ok. %% Check Pub Caps check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, - retain = Retain - } + retain = Retain}, + variable = #mqtt_packet_publish{topic_name = Topic} }, #channel{clientinfo = #{zone := Zone}}) -> - emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}). + emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}). %% Check Sub check_subscribe(TopicFilter, SubOpts, Channel) -> diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 25d2ee5e4..a1bdefcb0 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -46,7 +46,7 @@ -define(UNLIMITED, 0). --define(PUBCAP_KEYS, [max_topic_alias, +-define(PUBCAP_KEYS, [max_topic_levels, max_qos_allowed, retain_available ]). @@ -73,8 +73,16 @@ retain => boolean()}) -> ok_or_error(emqx_types:reason_code())). check_pub(Zone, Flags) when is_map(Flags) -> - do_check_pub(Flags, get_caps(Zone, publish)). + do_check_pub(case maps:take(topic, Flags) of + {Topic, Flags1} -> + Flags1#{topic_levels => emqx_topic:levels(Topic)}; + error -> + Flags + end, get_caps(Zone, publish)). +do_check_pub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) + when Limit > 0, Levels > Limit -> + {error, ?RC_TOPIC_NAME_INVALID}; do_check_pub(#{qos := QoS}, #{max_qos_allowed := MaxQoS}) when QoS > MaxQoS -> {error, ?RC_QOS_NOT_SUPPORTED}; diff --git a/src/emqx_session.erl b/src/emqx_session.erl index a945e23f7..a66118752 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -539,12 +539,12 @@ enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos= false}) -> enrich_subopt(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session); -enrich_subopt([{rap, 1}|Opts], Msg, Session) -> - enrich_subopt(Opts, Msg, Session); -enrich_subopt([{rap, 0}|Opts], Msg = #message{headers = #{retained := true}}, Session) -> - enrich_subopt(Opts, Msg, Session); +enrich_subopt([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session) -> + enrich_subopt(Opts, emqx_message:set_flag(retain, true, Msg), Session); enrich_subopt([{rap, 0}|Opts], Msg, Session) -> enrich_subopt(Opts, emqx_message:set_flag(retain, false, Msg), Session); +enrich_subopt([{rap, 1}|Opts], Msg, Session) -> + enrich_subopt(Opts, Msg, Session); enrich_subopt([{subid, SubId}|Opts], Msg, Session) -> Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg), enrich_subopt(Opts, Msg1, Session). From ebea3cc3925d8589d50e13212319ab337cd3c1e9 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Thu, 10 Oct 2019 19:27:35 +0800 Subject: [PATCH 06/14] Defend the ssl upgrade failure --- src/emqx_connection.erl | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index e577fd2cc..186fbd2fe 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -150,7 +150,14 @@ call(CPid, Req) -> gen_statem:call(CPid, Req). %%-------------------------------------------------------------------- init({Transport, RawSocket, Options}) -> - {ok, Socket} = Transport:wait(RawSocket), + case Transport:wait(RawSocket) of + {ok, Socket} -> + do_init(Transport, Socket, Options); + {error, Reason} -> + ?LOG(warning, "connection failed to establish: ~p", [Reason]) + end. + +do_init(Transport, Socket, Options) -> {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]), From 47a192ee34621030d8d2c8bc3b907a6c22303d40 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sat, 12 Oct 2019 14:48:39 +0800 Subject: [PATCH 07/14] Avoid process crash report for arbitrary exit reason --- src/emqx_channel.erl | 14 ++++++++------ test/emqx_channel_SUITE.erl | 4 ++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 36a1a7a40..00ef653b4 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -357,7 +357,7 @@ handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninf OldInterval == 0 andalso Interval > OldInterval -> handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel1); Interval == 0 -> - {stop, ReasonName, Channel1}; + shutdown(ReasonName, Channel1); true -> Channel2 = Channel1#channel{conninfo = ConnInfo#{expiry_interval => Interval}}, {close, ReasonName, Channel2} @@ -369,9 +369,9 @@ handle_in(?AUTH_PACKET(), Channel) -> handle_in({frame_error, Reason}, Channel = #channel{state = FsmState}) -> case FsmState of #{state_name := initialized} -> - {stop, {shutdown, Reason}, Channel}; + shutdown(Reason, Channel); #{state_name := connecting} -> - {stop, {shutdown, Reason}, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel}; + shutdown(Reason, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel); #{state_name := connected} -> handle_out({disconnect, ?RC_MALFORMED_PACKET}, Channel); #{state_name := disconnected} -> @@ -564,7 +564,7 @@ handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn _Ver -> emqx_reason_codes:compat(connack, ReasonCode) end, Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer), - {stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel}; + shutdown(Reason, ?CONNACK_PACKET(ReasonCode1), Channel); handle_out({deliver, Delivers}, Channel = #channel{state = #{state_name := disconnected}, session = Session}) -> @@ -646,9 +646,9 @@ handle_out({disconnect, ReasonCode, ReasonName}, expiry_interval := ExpiryInterval}}) -> case {ExpiryInterval, ProtoVer} of {0, ?MQTT_PROTO_V5} -> - {stop, ReasonName, ?DISCONNECT_PACKET(ReasonCode), Channel}; + shutdown(ReasonName, ?DISCONNECT_PACKET(ReasonCode), Channel); {0, _Ver} -> - {stop, ReasonName, Channel}; + shutdown(ReasonName, Channel); {?UINT_MAX, ?MQTT_PROTO_V5} -> {close, ReasonName, ?DISCONNECT_PACKET(ReasonCode), Channel}; {?UINT_MAX, _Ver} -> @@ -1206,3 +1206,5 @@ flag(false) -> 0. shutdown(Reason, Channel) -> {stop, {shutdown, Reason}, Channel}. +shutdown(Reason, Packets, Channel) -> + {stop, {shutdown, Reason}, Packets, Channel}. diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 6623db872..24d930385 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -164,7 +164,7 @@ t_handle_pingreq(_) -> t_handle_disconnect(_) -> with_channel( fun(Channel) -> - {stop, normal, Channel1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel), + {stop, {shutdown, normal}, Channel1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel), ?assertEqual(undefined, emqx_channel:info(will_msg, Channel1)) end). @@ -172,7 +172,7 @@ t_handle_in_auth(_) -> with_channel( fun(Channel) -> Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR), - {stop, implementation_specific_error, Packet, Channel} = handle_in(?AUTH_PACKET(), Channel) + {stop, {shutdown, implementation_specific_error}, Packet, Channel} = handle_in(?AUTH_PACKET(), Channel) end). %%-------------------------------------------------------------------- From 54e11d3bb5b0b5f4312d1d469772b83a3a5fcddb Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sat, 12 Oct 2019 15:10:49 +0800 Subject: [PATCH 08/14] Fix client_id options in testcases --- test/emqx_SUITE.erl | 2 +- test/emqx_client_SUITE.erl | 14 +++++++------- test/emqx_connection_SUITE.erl | 2 +- test/emqx_modules_SUITE.erl | 8 ++++---- test/emqx_msg_expiry_interval_SUITE.erl | 8 ++++---- test/emqx_request_responser_SUITE.erl | 4 ++-- test/emqx_shared_sub_SUITE.erl | 12 ++++++------ test/emqx_tracer_SUITE.erl | 2 +- 8 files changed, 26 insertions(+), 26 deletions(-) diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index 0a6ef4d0a..563683bf1 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -43,7 +43,7 @@ t_get_env(_) -> t_emqx_pubsub_api(_) -> emqx:start(), true = emqx:is_running(node()), - {ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]), + {ok, C} = emqtt:start_link([{host, "localhost"}, {client_id, "myclient"}]), {ok, _} = emqtt:connect(C), ClientId = <<"myclient">>, Topic = <<"mytopic">>, diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index 0867448f3..1e74982c8 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -95,9 +95,9 @@ t_cm(_) -> IdleTimeout = emqx_zone:get_env(external, idle_timeout, 30000), emqx_zone:set_env(external, idle_timeout, 1000), ClientId = <<"myclient">>, - {ok, C} = emqtt:start_link([{clientid, ClientId}]), + {ok, C} = emqtt:start_link([{client_id, ClientId}]), {ok, _} = emqtt:connect(C), - ct:sleep(50), + ct:sleep(500), #{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_attrs(ClientId), emqtt:subscribe(C, <<"mytopic">>, 0), ct:sleep(1200), @@ -135,13 +135,13 @@ t_will_message(_Config) -> t_offline_message_queueing(_) -> {ok, C1} = emqtt:start_link([{clean_start, false}, - {clientid, <<"c1">>}]), + {client_id, <<"c1">>}]), {ok, _} = emqtt:connect(C1), {ok, _, [2]} = emqtt:subscribe(C1, nth(6, ?WILD_TOPICS), 2), ok = emqtt:disconnect(C1), {ok, C2} = emqtt:start_link([{clean_start, true}, - {clientid, <<"c2">>}]), + {client_id, <<"c2">>}]), {ok, _} = emqtt:connect(C2), ok = emqtt:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0), @@ -149,7 +149,7 @@ t_offline_message_queueing(_) -> {ok, _} = emqtt:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2), timer:sleep(10), emqtt:disconnect(C2), - {ok, C3} = emqtt:start_link([{clean_start, false}, {clientid, <<"c1">>}]), + {ok, C3} = emqtt:start_link([{clean_start, false}, {client_id, <<"c1">>}]), {ok, _} = emqtt:connect(C3), timer:sleep(10), @@ -197,7 +197,7 @@ t_overlapping_subscriptions(_) -> t_redelivery_on_reconnect(_) -> ct:pal("Redelivery on reconnect test starting"), - {ok, C1} = emqtt:start_link([{clean_start, false}, {clientid, <<"c">>}]), + {ok, C1} = emqtt:start_link([{clean_start, false}, {client_id, <<"c">>}]), {ok, _} = emqtt:connect(C1), {ok, _, [2]} = emqtt:subscribe(C1, nth(7, ?WILD_TOPICS), 2), @@ -210,7 +210,7 @@ t_redelivery_on_reconnect(_) -> timer:sleep(10), ok = emqtt:disconnect(C1), ?assertEqual(0, length(recv_msgs(2))), - {ok, C2} = emqtt:start_link([{clean_start, false}, {clientid, <<"c">>}]), + {ok, C2} = emqtt:start_link([{clean_start, false}, {client_id, <<"c">>}]), {ok, _} = emqtt:connect(C2), timer:sleep(10), diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 39cc67dee..d321d2c85 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -33,7 +33,7 @@ end_per_suite(_Config) -> t_basic(_) -> Topic = <<"TopicA">>, - {ok, C} = emqtt:start_link([{port, 1883}, {clientid, <<"hello">>}]), + {ok, C} = emqtt:start_link([{port, 1883}, {client_id, <<"hello">>}]), {ok, _} = emqtt:connect(C), {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1), {ok, _, [2]} = emqtt:subscribe(C, Topic, qos2), diff --git a/test/emqx_modules_SUITE.erl b/test/emqx_modules_SUITE.erl index 437a21aec..12f304f54 100644 --- a/test/emqx_modules_SUITE.erl +++ b/test/emqx_modules_SUITE.erl @@ -55,11 +55,11 @@ end_per_suite(_Config) -> %% Test case for emqx_mod_presence t_mod_presence(_) -> ok = emqx_mod_presence:load([{qos, ?QOS_1}]), - {ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]), + {ok, C1} = emqtt:start_link([{client_id, <<"monsys">>}]), {ok, _} = emqtt:connect(C1), {ok, _Props, [?QOS_1]} = emqtt:subscribe(C1, <<"$SYS/brokers/+/clients/#">>, qos1), %% Connected Presence - {ok, C2} = emqtt:start_link([{clientid, <<"clientid">>}, + {ok, C2} = emqtt:start_link([{client_id, <<"clientid">>}, {username, <<"username">>}]), {ok, _} = emqtt:connect(C2), ok = recv_and_check_presence(<<"clientid">>, <<"connected">>), @@ -98,7 +98,7 @@ recv_and_check_presence(ClientId, Presence) -> t_mod_subscription(_) -> emqx_mod_subscription:load([{<<"connected/%c/%u">>, ?QOS_0}]), {ok, C} = emqtt:start_link([{host, "localhost"}, - {clientid, "myclient"}, + {client_id, "myclient"}, {username, "admin"}]), {ok, _} = emqtt:connect(C), emqtt:publish(C, <<"connected/myclient/admin">>, <<"Hello world">>, ?QOS_0), @@ -111,7 +111,7 @@ t_mod_subscription(_) -> %% Test case for emqx_mod_write t_mod_rewrite(_Config) -> ok = emqx_mod_rewrite:load(?RULES), - {ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]), + {ok, C} = emqtt:start_link([{client_id, <<"rewrite_client">>}]), {ok, _} = emqtt:connect(C), OrigTopics = [<<"x/y/2">>, <<"x/1/2">>, <<"y/a/z/b">>, <<"y/def">>], DestTopics = [<<"z/y/2">>, <<"x/1/2">>, <<"y/z/b">>, <<"y/def">>], diff --git a/test/emqx_msg_expiry_interval_SUITE.erl b/test/emqx_msg_expiry_interval_SUITE.erl index 063cf19dc..6fe18ff5a 100644 --- a/test/emqx_msg_expiry_interval_SUITE.erl +++ b/test/emqx_msg_expiry_interval_SUITE.erl @@ -42,8 +42,8 @@ t_message_expiry_interval_2(_) -> emqtt:stop(ClientA). message_expiry_interval_init() -> - {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), {ok, _} = emqtt:connect(ClientA), {ok, _} = emqtt:connect(ClientB), %% subscribe and disconnect client-b @@ -58,7 +58,7 @@ message_expiry_interval_exipred(ClientA, QoS) -> ct:sleep(1500), %% resume the session for client-b - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), {ok, _} = emqtt:connect(ClientB1), %% verify client-b could not receive the publish message @@ -78,7 +78,7 @@ message_expiry_interval_not_exipred(ClientA, QoS) -> %% wait for 1s and then resume the session for client-b, the message should not expires %% as Message-Expiry-Interval = 20s ct:sleep(1000), - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), {ok, _} = emqtt:connect(ClientB1), %% verify client-b could receive the publish message and the Message-Expiry-Interval is set diff --git a/test/emqx_request_responser_SUITE.erl b/test/emqx_request_responser_SUITE.erl index 7d66c7760..d603f2b96 100644 --- a/test/emqx_request_responser_SUITE.erl +++ b/test/emqx_request_responser_SUITE.erl @@ -42,7 +42,7 @@ request_response_per_qos(QoS) -> RspTopic = <<"response_topic">>, {ok, Requester} = emqx_request_sender:start_link(RspTopic, QoS, [{proto_ver, v5}, - {clientid, <<"requester">>}, + {client_id, <<"requester">>}, {properties, #{ 'Request-Response-Information' => 1}}]), %% This is a square service Square = fun(_CorrData, ReqBin) -> @@ -51,7 +51,7 @@ request_response_per_qos(QoS) -> end, {ok, Responser} = emqx_request_handler:start_link(ReqTopic, QoS, Square, [{proto_ver, v5}, - {clientid, <<"responser">>} + {client_id, <<"responser">>} ]), ok = emqx_request_sender:send(Requester, ReqTopic, RspTopic, <<"corr-1">>, <<"2">>, QoS), receive diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 9fec11171..14baf8a76 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -80,9 +80,9 @@ t_no_connection_nack(_) -> ShareTopic = <<"$share/", Group/binary, $/, Topic/binary>>, ExpProp = [{properties, #{'Session-Expiry-Interval' => timer:seconds(30)}}], - {ok, SubConnPid1} = emqtt:start_link([{clientid, Subscriber1}] ++ ExpProp), + {ok, SubConnPid1} = emqtt:start_link([{client_id, Subscriber1}] ++ ExpProp), {ok, _Props} = emqtt:connect(SubConnPid1), - {ok, SubConnPid2} = emqtt:start_link([{clientid, Subscriber2}] ++ ExpProp), + {ok, SubConnPid2} = emqtt:start_link([{client_id, Subscriber2}] ++ ExpProp), {ok, _Props} = emqtt:connect(SubConnPid2), emqtt:subscribe(SubConnPid1, ShareTopic, QoS), emqtt:subscribe(SubConnPid1, ShareTopic, QoS), @@ -151,9 +151,9 @@ t_not_so_sticky(_) -> ok = ensure_config(sticky), ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, - {ok, C1} = emqtt:start_link([{clientid, ClientId1}]), + {ok, C1} = emqtt:start_link([{client_id, ClientId1}]), {ok, _} = emqtt:connect(C1), - {ok, C2} = emqtt:start_link([{clientid, ClientId2}]), + {ok, C2} = emqtt:start_link([{client_id, ClientId2}]), {ok, _} = emqtt:connect(C2), emqtt:subscribe(C1, {<<"$share/group1/foo/bar">>, 0}), @@ -179,9 +179,9 @@ test_two_messages(Strategy, WithAck) -> Topic = <<"foo/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, - {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), + {ok, ConnPid1} = emqtt:start_link([{client_id, ClientId1}]), {ok, _} = emqtt:connect(ConnPid1), - {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]), + {ok, ConnPid2} = emqtt:start_link([{client_id, ClientId2}]), {ok, _} = emqtt:connect(ConnPid2), Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), diff --git a/test/emqx_tracer_SUITE.erl b/test/emqx_tracer_SUITE.erl index ece0f5799..2eee6a1e1 100644 --- a/test/emqx_tracer_SUITE.erl +++ b/test/emqx_tracer_SUITE.erl @@ -35,7 +35,7 @@ end_per_suite(_Config) -> t_start_traces(_Config) -> {ok, T} = emqtt:start_link([{host, "localhost"}, - {clientid, <<"client">>}, + {client_id, <<"client">>}, {username, <<"testuser">>}, {password, <<"pass">>} ]), From 8c1e452b6e37bf120102b8290c44b4be0ba8aa9d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 12 Oct 2019 16:56:18 +0800 Subject: [PATCH 09/14] Add 'hibernate_after' option for zone --- etc/emqx.conf | 5 +++++ priv/emqx.schema | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/etc/emqx.conf b/etc/emqx.conf index a211d91d9..9415834ef 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -554,6 +554,11 @@ mqtt.ignore_loop_deliver = false ## Value: duration zone.external.idle_timeout = 15s +## Hibernate after a duration of idle state. +## +## Value: duration +zone.external.hibernate_after = 60s + ## Publish limit for the external MQTT connections. ## ## Value: Number,Duration diff --git a/priv/emqx.schema b/priv/emqx.schema index 5e408ef38..abe646a10 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -725,6 +725,12 @@ end}. {datatype, {duration, ms}} ]}. +%% @doc Hibernate after a duration of idle state. +{mapping, "zone.$name.hibernate_after", "emqx.zones", [ + {default, "60s"}, + {datatype, {duration, ms}} +]}. + {mapping, "zone.$name.allow_anonymous", "emqx.zones", [ {datatype, {enum, [true, false]}} ]}. From cce0dbd3cf5320c2e2e6d9d3ab6d64cf45a0efe1 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 12 Oct 2019 17:05:37 +0800 Subject: [PATCH 10/14] Improve the connection, ws_connection and channel modules --- src/emqx_channel.erl | 186 +++++++++---------- src/emqx_connection.erl | 350 +++++++++++++++++++----------------- src/emqx_ws_connection.erl | 267 +++++++++++++-------------- test/emqx_channel_SUITE.erl | 28 +-- test/emqx_pool_SUITE.erl | 1 + 5 files changed, 422 insertions(+), 410 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 670b8f702..36c1ac247 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -40,14 +40,9 @@ , handle_call/2 , handle_info/2 , handle_timeout/3 - , disconnect/2 , terminate/2 ]). --export([ received/2 - , sent/2 - ]). - -import(emqx_misc, [ run_fold/3 , pipeline/3 @@ -75,8 +70,8 @@ pub_stats :: emqx_types:stats(), %% Timers timers :: #{atom() => disabled | maybe(reference())}, - %% Fsm State - state :: fsm_state(), + %% Conn State + conn_state :: conn_state(), %% GC State gc_state :: maybe(emqx_gc:gc_state()), %% Takeover @@ -89,13 +84,7 @@ -opaque(channel() :: #channel{}). --type(fsm_state() :: #{state_name := initialized - | connecting - | connected - | disconnected, - connected_at := pos_integer(), - disconnected_at := pos_integer() - }). +-type(conn_state() :: idle | connecting | connected | disconnected). -type(action() :: {enter, connected | disconnected} | {close, Reason :: atom()} @@ -113,7 +102,7 @@ will_timer => will_message }). --define(ATTR_KEYS, [conninfo, clientinfo, state, session]). +-define(ATTR_KEYS, [conninfo, clientinfo, session, conn_state]). -define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases, alias_maximum, gc_state]). @@ -136,8 +125,8 @@ info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> maybe_apply(fun emqx_session:info/1, Session); -info(state, #channel{state = State}) -> - State; +info(conn_state, #channel{conn_state = ConnState}) -> + ConnState; info(keepalive, #channel{keepalive = Keepalive}) -> maybe_apply(fun emqx_keepalive:info/1, Keepalive); info(topic_aliases, #channel{topic_aliases = Aliases}) -> @@ -211,7 +200,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) -> clientinfo = ClientInfo, pub_stats = #{}, timers = #{stats_timer => StatsTimer}, - state = #{state_name => initialized}, + conn_state = idle, gc_state = init_gc_state(Zone), takeover = false, resuming = false, @@ -228,12 +217,16 @@ init_gc_state(Zone) -> %% Handle incoming packet %%-------------------------------------------------------------------- --spec(handle_in(emqx_types:packet(), channel()) +-spec(handle_in(Bytes :: pos_integer() | emqx_types:packet(), channel()) -> {ok, channel()} | {ok, output(), channel()} | {stop, Reason :: term(), channel()} | {stop, Reason :: term(), output(), channel()}). -handle_in(?CONNECT_PACKET(_), Channel = #channel{state = #{state_name := connected}}) -> +handle_in(Bytes, Channel) when is_integer(Bytes) -> + NChannel = maybe_gc_and_check_oom(Bytes, Channel), + {ok, ensure_timer(stats_timer, NChannel)}; + +handle_in(?CONNECT_PACKET(_), Channel = #channel{conn_state = connected}) -> handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> @@ -347,7 +340,7 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), end; handle_in(?PACKET(?PINGREQ), Channel) -> - {ok, Channel, {outgoing, ?PACKET(?PINGRESP)}}; + {ok, ?PACKET(?PINGRESP), Channel}; handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) -> #{proto_ver := ProtoVer, expiry_interval := OldInterval} = ConnInfo, @@ -371,19 +364,19 @@ handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninf handle_in(?AUTH_PACKET(), Channel) -> handle_out({disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}, Channel); -handle_in({frame_error, Reason}, Channel = #channel{state = FsmState}) -> - case FsmState of - #{state_name := initialized} -> - {stop, {shutdown, Reason}, Channel}; - #{state_name := connecting} -> - Packet = ?CONNACK_PACKET(?RC_MALFORMED_PACKET), - {stop, {shutdown, Reason}, Packet, Channel}; - #{state_name := connected} -> - handle_out({disconnect, ?RC_MALFORMED_PACKET}, Channel); - #{state_name := disconnected} -> - ?LOG(error, "Unexpected frame error: ~p", [Reason]), - {ok, Channel} - end; +handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> + {stop, {shutdown, Reason}, Channel}; + +handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) -> + Packet = ?CONNACK_PACKET(?RC_MALFORMED_PACKET), + {stop, {shutdown, Reason}, Packet, Channel}; + +handle_in({frame_error, _Reason}, Channel = #channel{conn_state = connected}) -> + handle_out({disconnect, ?RC_MALFORMED_PACKET}, Channel); + +handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) -> + ?LOG(error, "Unexpected frame error: ~p", [Reason]), + {ok, Channel}; handle_in(Packet, Channel) -> ?LOG(error, "Unexpected incoming: ~p", [Packet]), @@ -534,31 +527,59 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel = %% Handle outgoing packet %%-------------------------------------------------------------------- -%% TODO: RunFold or Pipeline +-spec(handle_out(integer()|term(), channel()) + -> {ok, channel()} + | {ok, output(), channel()} + | {stop, Reason :: term(), channel()} + | {stop, Reason :: term(), output(), channel()}). +handle_out(Bytes, Channel) when is_integer(Bytes) -> + NChannel = maybe_gc_and_check_oom(Bytes, Channel), + {ok, ensure_timer(stats_timer, NChannel)}; + +handle_out(Delivers, Channel = #channel{conn_state = disconnected, + session = Session}) + when is_list(Delivers) -> + NSession = emqx_session:enqueue(Delivers, Session), + {ok, Channel#channel{session = NSession}}; + +handle_out(Delivers, Channel = #channel{takeover = true, + pendings = Pendings}) + when is_list(Delivers) -> + {ok, Channel#channel{pendings = lists:append(Pendings, Delivers)}}; + +handle_out(Delivers, Channel = #channel{session = Session}) when is_list(Delivers) -> + case emqx_session:deliver(Delivers, Session) of + {ok, Publishes, NSession} -> + NChannel = Channel#channel{session = NSession}, + handle_out({publish, Publishes}, ensure_timer(retry_timer, NChannel)); + {ok, NSession} -> + {ok, Channel#channel{session = NSession}} + end; + handle_out({connack, ?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo, - state = FsmState}) -> + clientinfo = ClientInfo}) -> AckProps = run_fold([fun enrich_caps/2, fun enrich_server_keepalive/2, fun enrich_assigned_clientid/2], #{}, Channel), - FsmState1 = FsmState#{state_name => connected, - connected_at => erlang:system_time(second) - }, - Channel1 = Channel#channel{state = FsmState1, + ConnInfo1 = ConnInfo#{connected_at => erlang:system_time(second)}, + Channel1 = Channel#channel{conninfo = ConnInfo1, will_msg = emqx_packet:will_msg(ConnPkt), + conn_state = connected, alias_maximum = init_alias_maximum(ConnPkt, ClientInfo) }, Channel2 = ensure_keepalive(AckProps, Channel1), ok = emqx_hooks:run('client.connected', [ClientInfo, ?RC_SUCCESS, ConnInfo]), AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps), case maybe_resume_session(Channel2) of - ignore -> {ok, AckPacket, Channel2}; + ignore -> + Output = [{outgoing, AckPacket}, {enter, connected}], + {ok, Output, Channel2}; {ok, Publishes, NSession} -> Channel3 = Channel2#channel{session = NSession, resuming = false, pendings = []}, - {ok, Packets, _} = handle_out({publish, Publishes}, Channel3), + {ok, {outgoing, Packets}, _} = handle_out({publish, Publishes}, Channel3), Output = [{outgoing, [AckPacket|Packets]}, {enter, connected}], {ok, Output, Channel3} end; @@ -573,24 +594,6 @@ handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer), {stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel}; -handle_out({deliver, Delivers}, Channel = #channel{state = #{state_name := disconnected}, - session = Session}) -> - NSession = emqx_session:enqueue(Delivers, Session), - {ok, Channel#channel{session = NSession}}; - -handle_out({deliver, Delivers}, Channel = #channel{takeover = true, - pendings = Pendings}) -> - {ok, Channel#channel{pendings = lists:append(Pendings, Delivers)}}; - -handle_out({deliver, Delivers}, Channel = #channel{session = Session}) -> - case emqx_session:deliver(Delivers, Session) of - {ok, Publishes, NSession} -> - NChannel = Channel#channel{session = NSession}, - handle_out({publish, Publishes}, ensure_timer(retry_timer, NChannel)); - {ok, NSession} -> - {ok, Channel#channel{session = NSession}} - end; - handle_out({publish, Publishes}, Channel) when is_list(Publishes) -> Packets = lists:foldl( fun(Publish, Acc) -> @@ -679,28 +682,33 @@ handle_out({Type, Data}, Channel) -> %% Handle call %%-------------------------------------------------------------------- +-spec(handle_call(Req :: term(), channel()) + -> {reply, Reply :: term(), channel()} + | {stop, Reason :: term(), Reply :: term(), channel()}). handle_call(kick, Channel) -> {stop, {shutdown, kicked}, ok, Channel}; -handle_call(discard, Channel = #channel{state = #{state_name := connected}}) -> +handle_call(discard, Channel = #channel{conn_state = connected}) -> Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), - {stop, {shutdown, discarded}, Packet, ok, Channel}; -handle_call(discard, Channel = #channel{state = #{state_name := disconnected}}) -> + {stop, {shutdown, discarded}, ok, Packet, Channel}; + +handle_call(discard, Channel = #channel{conn_state = disconnected}) -> {stop, {shutdown, discarded}, ok, Channel}; %% Session Takeover handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> - {ok, Session, Channel#channel{takeover = true}}; + {reply, Session, Channel#channel{takeover = true}}; handle_call({takeover, 'end'}, Channel = #channel{session = Session, pendings = Pendings}) -> ok = emqx_session:takeover(Session), - AllPendings = lists:append(emqx_misc:drain_deliver(), Pendings), + Delivers = emqx_misc:drain_deliver(), + AllPendings = lists:append(Delivers, Pendings), {stop, {shutdown, takeovered}, AllPendings, Channel}; handle_call(Req, Channel) -> ?LOG(error, "Unexpected call: ~p", [Req]), - {ok, ignored, Channel}. + {reply, ignored, Channel}. %%-------------------------------------------------------------------- %% Handle Info @@ -727,26 +735,21 @@ handle_info({register, Attrs, Stats}, #channel{clientinfo = #{clientid := Client emqx_cm:set_chan_attrs(ClientId, Attrs), emqx_cm:set_chan_stats(ClientId, Stats); -%%TODO: Fixme later -%%handle_info(disconnected, Channel = #channel{connected = undefined}) -> -%% shutdown(closed, Channel); - -handle_info(disconnected, Channel = #channel{state = #{state_name := disconnected}}) -> +handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) -> {ok, Channel}; -handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := ExpiryInterval}, - clientinfo = ClientInfo = #{zone := Zone}, - will_msg = WillMsg}) -> +handle_info({sock_closed, _Reason}, Channel = #channel{conninfo = ConnInfo, + clientinfo = ClientInfo = #{zone := Zone}, + will_msg = WillMsg}) -> emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), - Channel1 = ensure_disconnected(Channel), + ConnInfo1 = ConnInfo#{disconnected_at => erlang:system_time(second)}, + Channel1 = Channel#channel{conninfo = ConnInfo1, conn_state = disconnected}, Channel2 = case timer:seconds(will_delay_interval(WillMsg)) of - 0 -> - publish_will_msg(WillMsg), - Channel1#channel{will_msg = undefined}; - _ -> - ensure_timer(will_timer, Channel1) + 0 -> publish_will_msg(WillMsg), + Channel1#channel{will_msg = undefined}; + _ -> ensure_timer(will_timer, Channel1) end, - case ExpiryInterval of + case maps:get(expiry_interval, ConnInfo) of ?UINT_MAX -> {ok, Channel2}; Int when Int > 0 -> @@ -757,6 +760,7 @@ handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := Exp handle_info(Info, Channel) -> ?LOG(error, "Unexpected info: ~p~n", [Info]), + error(unexpected_info), {ok, Channel}. %%-------------------------------------------------------------------- @@ -870,14 +874,11 @@ will_delay_interval(undefined) -> 0; will_delay_interval(WillMsg) -> emqx_message:get_header('Will-Delay-Interval', WillMsg, 0). -%% TODO: Implement later. -disconnect(_Reason, Channel) -> {ok, Channel}. - %%-------------------------------------------------------------------- %% Terminate %%-------------------------------------------------------------------- -terminate(_, #channel{state = #{state_name := initialized}}) -> +terminate(_, #channel{conn_state = idle}) -> ok; terminate(normal, #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]); @@ -888,14 +889,6 @@ terminate(Reason, #channel{conninfo = ConnInfo, clientinfo = ClientInfo, will_ms publish_will_msg(WillMsg), ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]). --spec(received(pos_integer(), channel()) -> channel()). -received(Oct, Channel) -> - ensure_timer(stats_timer, maybe_gc_and_check_oom(Oct, Channel)). - --spec(sent(pos_integer(), channel()) -> channel()). -sent(Oct, Channel) -> - ensure_timer(stats_timer, maybe_gc_and_check_oom(Oct, Channel)). - %% TODO: Improve will msg:) publish_will_msg(undefined) -> ok; @@ -1153,11 +1146,6 @@ init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, inbound => emqx_mqtt_caps:get_caps(Zone, max_topic_alias, 0)}; init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined. -ensure_disconnected(Channel = #channel{state = FsmState}) -> - Channel#channel{state = FsmState#{state_name := disconnected, - disconnected_at => erlang:system_time(second) - }}. - ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) -> ensure_keepalive_timer(Interval, Channel); ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 571d08ba1..3fa0db145 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -14,6 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- +%% MQTT/TCP Connection -module(emqx_connection). -include("emqx.hrl"). @@ -25,7 +26,6 @@ %% API -export([ start_link/3 - , call/2 , stop/1 ]). @@ -33,6 +33,9 @@ , stats/1 ]). +-export([call/2]). + +%% callback -export([init/4]). %% Sys callbacks @@ -56,10 +59,10 @@ peername :: emqx_types:peername(), %% Sockname of the connection sockname :: emqx_types:peername(), + %% Sock state + sockstate :: emqx_types:sockstate(), %% The {active, N} option active_n :: pos_integer(), - %% The active state - active_st :: idle | running | blocked | closed, %% Publish Limit pub_limit :: maybe(esockd_rate_limit:bucket()), %% Rate Limit @@ -71,7 +74,7 @@ %% Serialize function serialize :: emqx_frame:serialize_fun(), %% Channel State - chan_state :: emqx_channel:channel(), + channel :: emqx_channel:channel(), %% Idle timer idle_timer :: reference() }). @@ -79,7 +82,7 @@ -type(state() :: #state{}). -define(ACTIVE_N, 100). --define(INFO_KEYS, [socktype, peername, sockname, active_n, active_state, +-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n, pub_limit, rate_limit]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -98,8 +101,8 @@ start_link(Transport, Socket, Options) -> -spec(info(pid()|state()) -> emqx_types:infos()). info(CPid) when is_pid(CPid) -> call(CPid, info); -info(State = #state{chan_state = ChanState}) -> - ChanInfo = emqx_channel:info(ChanState), +info(State = #state{channel = Channel}) -> + ChanInfo = emqx_channel:info(Channel), SockInfo = maps:from_list(info(?INFO_KEYS, State)), maps:merge(ChanInfo, #{sockinfo => SockInfo}). @@ -111,16 +114,16 @@ info(peername, #state{peername = Peername}) -> Peername; info(sockname, #state{sockname = Sockname}) -> Sockname; +info(sockstate, #state{sockstate = SockSt}) -> + SockSt; info(active_n, #state{active_n = ActiveN}) -> ActiveN; -info(active_st, #state{active_st= ActiveSt}) -> - ActiveSt; info(pub_limit, #state{pub_limit = PubLimit}) -> limit_info(PubLimit); info(rate_limit, #state{rate_limit = RateLimit}) -> limit_info(RateLimit); -info(chan_state, #state{chan_state = ChanState}) -> - emqx_channel:info(ChanState). +info(channel, #state{channel = Channel}) -> + emqx_channel:info(Channel). limit_info(Limit) -> emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit). @@ -129,15 +132,15 @@ limit_info(Limit) -> -spec(stats(pid()|state()) -> emqx_types:stats()). stats(CPid) when is_pid(CPid) -> call(CPid, stats); -stats(#state{transport = Transport, - socket = Socket, - chan_state = ChanState}) -> +stats(#state{transport = Transport, + socket = Socket, + channel = Channel}) -> SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of {ok, Ss} -> Ss; {error, _} -> [] end, ConnStats = emqx_pd:get_counters(?CONN_STATS), - ChanStats = emqx_channel:stats(ChanState), + ChanStats = emqx_channel:stats(Channel), ProcStats = emqx_misc:proc_stats(), lists:append([SockStats, ConnStats, ChanStats, ProcStats]). @@ -152,7 +155,23 @@ stop(Pid) -> %%-------------------------------------------------------------------- init(Parent, Transport, RawSocket, Options) -> - {ok, Socket} = Transport:wait(RawSocket), + case Transport:wait(RawSocket) of + {ok, Socket} -> + do_init(Parent, Transport, Socket, Options); + {error, Reason} when Reason =:= enotconn; + Reason =:= einval; + Reason =:= closed -> + Transport:fast_close(RawSocket), + exit(normal); + {error, timeout} -> + Transport:fast_close(RawSocket), + exit({shutdown, ssl_upgrade_timeout}); + {error, Reason} -> + Transport:fast_close(RawSocket), + exit(Reason) + end. + +do_init(Parent, Transport, Socket, Options) -> {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), emqx_logger:set_metadata_peername(esockd_net:format(Peername)), @@ -170,27 +189,32 @@ init(Parent, Transport, RawSocket, Options) -> FrameOpts = emqx_zone:frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), Serialize = emqx_frame:serialize_fun(), - ChanState = emqx_channel:init(ConnInfo, Options), + Channel = emqx_channel:init(ConnInfo, Options), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), IdleTimer = emqx_misc:start_timer(IdleTimout, idle_timeout), HibAfterTimeout = emqx_zone:get_env(Zone, hibernate_after, IdleTimout*2), - State = #state{parent = Parent, - transport = Transport, - socket = Socket, - peername = Peername, - sockname = Sockname, - active_n = ActiveN, - active_st = idle, - pub_limit = PubLimit, - rate_limit = RateLimit, - parse_state = ParseState, - serialize = Serialize, - chan_state = ChanState, - idle_timer = IdleTimer + State = #state{parent = Parent, + transport = Transport, + socket = Socket, + peername = Peername, + sockname = Sockname, + sockstate = idle, + active_n = ActiveN, + pub_limit = PubLimit, + rate_limit = RateLimit, + parse_state = ParseState, + serialize = Serialize, + channel = Channel, + idle_timer = IdleTimer }, case activate_socket(State) of {ok, NState} -> recvloop(NState, #{hibernate_after => HibAfterTimeout}); + {error, Reason} when Reason =:= einval; + Reason =:= enotconn; + Reason =:= closed -> + Transport:fast_close(Socket), + exit(normal); {error, Reason} -> Transport:fast_close(Socket), erlang:exit({shutdown, Reason}) @@ -208,7 +232,8 @@ recvloop(State = #state{parent = Parent}, Options = #{hibernate_after := HibAfterTimeout}) -> receive {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Options}); + sys:handle_system_msg(Request, From, Parent, + ?MODULE, [], {State, Options}); {'EXIT', Parent, Reason} -> terminate(Reason, State); Msg -> @@ -230,6 +255,7 @@ wakeup_from_hib(State, Options) -> process_msg([], State, Options) -> recvloop(State, Options); + process_msg([Msg|More], State, Options) -> case catch handle_msg(Msg, State) of ok -> @@ -246,11 +272,6 @@ process_msg([Msg|More], State, Options) -> terminate(Reason, State) end. --compile({inline, [append_msg/2]}). -append_msg(NextMsgs, L) when is_list(NextMsgs) -> - lists:append(NextMsgs, L); -append_msg(NextMsg, L) -> [NextMsg|L]. - %%-------------------------------------------------------------------- %% Handle a Msg @@ -261,51 +282,37 @@ handle_msg({'$gen_call', From, Req}, State) -> {ok, NState}; {stop, Reason, Reply, NState} -> gen_server:reply(From, Reply), - {stop, Reason, NState} + stop(Reason, NState) end; -%% Handle incoming data -handle_msg({Inet, _Sock, Data}, State = #state{chan_state = ChanState}) +handle_msg({Inet, _Sock, Data}, State = #state{channel = Channel}) when Inet == tcp; Inet == ssl -> ?LOG(debug, "RECV ~p", [Data]), Oct = iolist_size(Data), emqx_pd:update_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), - NChanState = emqx_channel:received(Oct, ChanState), - State1 = State#state{chan_state = NChanState}, - {Packets, State2} = parse_incoming(Data, State1), - {ok, next_incoming_msgs(Packets), State2}; + {ok, NChannel} = emqx_channel:handle_in(Oct, Channel), + process_incoming(Data, State#state{channel = NChannel}); -%% Handle incoming packets handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State = #state{idle_timer = IdleTimer}) -> ok = emqx_misc:cancel_timer(IdleTimer), - NState = State#state{serialize = emqx_frame:serialize_fun(ConnPkt), + Serialize = emqx_frame:serialize_fun(ConnPkt), + NState = State#state{serialize = Serialize, idle_timer = undefined }, handle_incoming(Packet, NState); -handle_msg({incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> +handle_msg({incoming, Packet}, State) -> handle_incoming(Packet, State); -handle_msg({enter, connected}, State = #state{active_n = ActiveN, - active_st = ActiveSt, - chan_state = ChanState - }) -> - ChanAttrs = emqx_channel:attrs(ChanState), - SockAttrs = #{active_n => ActiveN, - active_st => ActiveSt - }, - Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), - emqx_channel:handle_info({register, Attrs, stats(State)}, ChanState); - handle_msg({Error, _Sock, Reason}, State) when Error == tcp_error; Error == ssl_error -> - handle_sockerr(Reason, State); + handle_info({sock_error, Reason}, State); handle_msg({Closed, _Sock}, State) when Closed == tcp_closed; Closed == ssl_closed -> - socket_closed(Closed, State); + handle_info(sock_closed, State); handle_msg({Passive, _Sock}, State) when Passive == tcp_passive; Passive == ssl_passive -> @@ -314,73 +321,67 @@ handle_msg({Passive, _Sock}, State) case activate_socket(NState) of {ok, NState} -> {ok, NState}; {error, Reason} -> - handle_sockerr(Reason, State) + {ok, {sock_error, Reason}, NState} end; %% Rate limit timer expired. handle_msg(activate_socket, State) -> - NState = State#state{active_st = idle, + NState = State#state{sockstate = idle, limit_timer = undefined }, case activate_socket(NState) of {ok, NState} -> {ok, NState}; {error, Reason} -> - handle_sockerr(Reason, State) + {ok, {sock_error, Reason}, State} end; handle_msg(Deliver = {deliver, _Topic, _Msg}, - State = #state{chan_state = ChanState}) -> + State = #state{channel = Channel}) -> Delivers = emqx_misc:drain_deliver([Deliver]), - Result = emqx_channel:handle_out({deliver, Delivers}, ChanState), - handle_chan_return(Result, State); + Result = emqx_channel:handle_out(Delivers, Channel), + handle_return(Result, State); handle_msg({outgoing, Packets}, State) -> - handle_outgoing(Packets, State); + {ok, handle_outgoing(Packets, State)}; %% something sent handle_msg({inet_reply, _Sock, ok}, _State) -> ok; handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> - handle_sockerr(Reason, State); + handle_info({sock_error, Reason}, State); -handle_msg({timeout, TRef, TMsg}, State) when is_reference(TRef) -> +handle_msg({timeout, TRef, TMsg}, State) -> handle_timeout(TRef, TMsg, State); handle_msg(Shutdown = {shutdown, _Reason}, State) -> - {stop, Shutdown, State}; + stop(Shutdown, State); -handle_msg(Msg, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_info(Msg, ChanState) of - {ok, NChanState} -> - {ok, State#state{chan_state = NChanState}}; - {stop, Reason, NChanState} -> - {stop, Reason, State#state{chan_state = NChanState}} - end. +handle_msg(Msg, State) -> handle_info(Msg, State). %%-------------------------------------------------------------------- %% Terminate -terminate(Reason, #state{transport = Transport, - socket = Socket, - active_st = ActiveSt, - chan_state = ChanState}) -> +terminate(Reason, #state{transport = Transport, + socket = Socket, + sockstate = SockSt, + channel = Channel}) -> ?LOG(debug, "Terminated for ~p", [Reason]), - ActiveSt =:= closed orelse Transport:fast_close(Socket), - emqx_channel:terminate(Reason, ChanState), + SockSt =:= closed orelse Transport:fast_close(Socket), + emqx_channel:terminate(Reason, Channel), exit(Reason). %%-------------------------------------------------------------------- %% Sys callbacks system_continue(_Parent, _Deb, {State, Options}) -> - recvloop(State, Options). + recvloop(State, Options). system_terminate(Reason, _Parent, _Deb, {State, _}) -> - terminate(Reason, State). + terminate(Reason, State). system_code_change(Misc, _, _, _) -> - {ok, Misc}. + {ok, Misc}. system_get_state({State, _Options}) -> {ok, State}. @@ -394,24 +395,23 @@ handle_call(_From, info, State) -> handle_call(_From, stats, State) -> {reply, stats(State), State}; -%% TODO: the handle_outgoing is not right ... -handle_call(_From, Req, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_call(Req, ChanState) of - {ok, Reply, NChanState} -> - {reply, Reply, State#state{chan_state = NChanState}}; - {stop, Reason, Reply, NChanState} -> - {stop, Reason, Reply, State#state{chan_state = NChanState}}; - {stop, Reason, Packet, Reply, NChanState} -> - State1 = State#state{chan_state = NChanState}, - {ok, State2} = handle_outgoing(Packet, State1), - {stop, Reason, Reply, State2} +handle_call(_From, Req, State = #state{channel = Channel}) -> + case emqx_channel:handle_call(Req, Channel) of + {reply, Reply, NChannel} -> + {reply, Reply, State#state{channel = NChannel}}; + {stop, Reason, Reply, NChannel} -> + {stop, Reason, Reply, State#state{channel = NChannel}}; + {stop, Reason, Reply, OutPacket, NChannel} -> + NState = State#state{channel = NChannel}, + NState1 = handle_outgoing(OutPacket, NState), + {stop, Reason, Reply, NState1} end. %%-------------------------------------------------------------------- %% Handle timeout handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) -> - {stop, idle_timeout, State}; + stop(idle_timeout, State); handle_timeout(TRef, emit_stats, State) -> handle_timeout(TRef, {emit_stats, stats(State)}, State); @@ -422,16 +422,21 @@ handle_timeout(TRef, keepalive, State = #state{transport = Transport, {ok, [{recv_oct, RecvOct}]} -> handle_timeout(TRef, {keepalive, RecvOct}, State); {error, Reason} -> - handle_sockerr(Reason, State) + handle_info({sockerr, Reason}, State) end; -handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) -> - Result = emqx_channel:handle_timeout(TRef, Msg, ChanState), - handle_chan_return(Result, State). +handle_timeout(TRef, Msg, State = #state{channel = Channel}) -> + handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State). %%-------------------------------------------------------------------- -%% Parse incoming data. +%% Process/Parse incoming data. +-compile({inline, [process_incoming/2]}). +process_incoming(Data, State) -> + {Packets, NState} = parse_incoming(Data, State), + {ok, next_incoming_msgs(Packets), NState}. + +-compile({inline, [parse_incoming/2]}). parse_incoming(Data, State) -> parse_incoming(Data, [], State). @@ -460,30 +465,30 @@ next_incoming_msgs(Packets) -> %%-------------------------------------------------------------------- %% Handle incoming packet -handle_incoming(Packet = ?PACKET(Type), State = #state{chan_state = ChanState}) -> +handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) -> _ = inc_incoming_stats(Type), ok = emqx_metrics:inc_recv(Packet), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), - Result = emqx_channel:handle_in(Packet, ChanState), - handle_chan_return(Result, State); + handle_return(emqx_channel:handle_in(Packet, Channel), State); -handle_incoming(FrameError = {frame_error, _Reason}, State = #state{chan_state = ChanState}) -> - Result = emqx_channel:handle_in(FrameError, ChanState), - handle_chan_return(Result, State). +handle_incoming(FrameError, State = #state{channel = Channel}) -> + handle_return(emqx_channel:handle_in(FrameError, Channel), State). -handle_chan_return({ok, NChanState}, State) -> - {ok, State#state{chan_state = NChanState}}; -handle_chan_return({ok, OutPacket, NChanState}, State) - when is_record(OutPacket, mqtt_packet) -> - {ok, {outgoing, OutPacket}, State#state{chan_state = NChanState}}; -handle_chan_return({ok, Actions, NChanState}, State) -> - {ok, Actions, State#state{chan_state = NChanState}}; -handle_chan_return({stop, Reason, NChanState}, State) -> - {stop, Reason, State#state{chan_state = NChanState}}; -handle_chan_return({stop, Reason, OutPackets, NChanState}, State) -> - NState = State#state{chan_state = NChanState}, - {ok, NState1} = handle_outgoing(OutPackets, NState), - {stop, Reason, NState1}. +%%-------------------------------------------------------------------- +%% Handle channel return + +handle_return(ok, State) -> + {ok, State}; +handle_return({ok, NChannel}, State) -> + {ok, State#state{channel = NChannel}}; +handle_return({ok, Replies, NChannel}, State) -> + {ok, next_msgs(Replies), State#state{channel = NChannel}}; +handle_return({stop, Reason, NChannel}, State) -> + stop(Reason, State#state{channel = NChannel}); +handle_return({stop, Reason, OutPacket, NChannel}, State) -> + NState = State#state{channel = NChannel}, + NState1 = handle_outgoing(OutPacket, NState), + stop(Reason, NState1). %%-------------------------------------------------------------------- %% Handle outgoing packets @@ -510,70 +515,73 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> %%-------------------------------------------------------------------- %% Send data -send(IoData, State = #state{transport = Transport, - socket = Socket, - chan_state = ChanState}) -> +send(IoData, State = #state{transport = Transport, + socket = Socket, + channel = Channel}) -> Oct = iolist_size(IoData), ok = emqx_metrics:inc('bytes.sent', Oct), case Transport:async_send(Socket, IoData) of ok -> - NChanState = emqx_channel:sent(Oct, ChanState), - {ok, State#state{chan_state = NChanState}}; + {ok, NChannel} = emqx_channel:handle_out(Oct, Channel), + State#state{channel = NChannel}; Error = {error, _Reason} -> %% Simulate an inet_reply to postpone handling the error - self() ! {inet_reply, Socket, Error}, - {ok, State} + self() ! {inet_reply, Socket, Error}, State end. %%-------------------------------------------------------------------- -%% Handle sockerr +%% Handle Info -handle_sockerr(_Reason, State = #state{active_st = closed}) -> - {ok, State}; +handle_info({enter, _}, State = #state{active_n = ActiveN, + sockstate = SockSt, + channel = Channel}) -> + ChanAttrs = emqx_channel:attrs(Channel), + SockAttrs = #{active_n => ActiveN, + sockstate => SockSt + }, + Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), + handle_info({register, Attrs, stats(State)}, State); -handle_sockerr(Reason, State = #state{transport = Transport, - socket = Socket, - chan_state = ChanState}) -> +handle_info({sockerr, _Reason}, #state{sockstate = closed}) -> ok; +handle_info({sockerr, Reason}, State) -> ?LOG(debug, "Socket error: ~p", [Reason]), - ok = Transport:fast_close(Socket), - NState = State#state{active_st = closed}, - case emqx_channel:handle_info({sockerr, Reason}, ChanState) of - {ok, NChanState} -> - {ok, NState#state{chan_state = NChanState}}; - {stop, NChanState} -> - {stop, {shutdown, Reason}, NState#state{chan_state = NChanState}} - end. + handle_info({sock_closed, Reason}, close_socket(State)); -socket_closed(Closed, State = #state{transport = Transport, - socket = Socket, - chan_state = ChanState}) -> - ?LOG(debug, "Socket closed: ~p", [Closed]), - ok = Transport:fast_close(Socket), - NState = State#state{active_st = closed}, - case emqx_channel:handle_info({sock_closed, Closed}, ChanState) of - {ok, NChanState} -> - {ok, NState#state{chan_state = NChanState}}; - {stop, NChanState} -> - NState = NState#state{chan_state = NChanState}, - {stop, {shutdown, Closed}, NState} - end. +handle_info(sock_closed, #state{sockstate = closed}) -> ok; +handle_info(sock_closed, State) -> + ?LOG(debug, "Socket closed"), + handle_info({sock_closed, closed}, close_socket(State)); + +handle_info({close, Reason}, State) -> + ?LOG(debug, "Force close due to : ~p", [Reason]), + {ok, close_socket(State)}; + +handle_info(Info, State = #state{channel = Channel}) -> + handle_return(emqx_channel:handle_info(Info, Channel), State). %%-------------------------------------------------------------------- %% Activate Socket -compile({inline, [activate_socket/1]}). -activate_socket(State = #state{active_st = closed}) -> +activate_socket(State = #state{sockstate = closed}) -> {ok, State}; -activate_socket(State = #state{active_st = blocked}) -> +activate_socket(State = #state{sockstate = blocked}) -> {ok, State}; activate_socket(State = #state{transport = Transport, socket = Socket, active_n = N}) -> case Transport:setopts(Socket, [{active, N}]) of - ok -> {ok, State#state{active_st = running}}; + ok -> {ok, State#state{sockstate = running}}; Error -> Error end. +%%-------------------------------------------------------------------- +%% Close Socket + +close_socket(State = #state{transport = Transport, socket = Socket}) -> + ok = Transport:fast_close(Socket), + State#state{sockstate = closed}. + %%-------------------------------------------------------------------- %% Ensure rate limit @@ -595,7 +603,7 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> {Pause, Rl1} -> ?LOG(debug, "Pause ~pms due to rate limit", [Pause]), TRef = erlang:send_after(Pause, self(), activate_socket), - NState = State#state{active_st = blocked, limit_timer = TRef}, + NState = State#state{sockstate = blocked, limit_timer = TRef}, setelement(Pos, NState, Rl1) end. @@ -612,10 +620,28 @@ inc_incoming_stats(Type) when is_integer(Type) -> true -> ok end. - -compile({inline, [inc_outgoing_stats/1]}). inc_outgoing_stats(Type) -> emqx_pd:update_counter(send_pkt, 1), - (Type == ?PUBLISH) - andalso emqx_pd:update_counter(send_msg, 1). + (Type == ?PUBLISH) andalso emqx_pd:update_counter(send_msg, 1). + +%%-------------------------------------------------------------------- +%% Helper functions + +-compile({inline, [append_msg/2]}). +append_msg(Msgs, Q) when is_list(Msgs) -> + lists:append(Msgs, Q); +append_msg(Msg, Q) -> [Msg|Q]. + +-compile({inline, [next_msgs/1]}). +next_msgs(Packet) when is_record(Packet, mqtt_packet) -> + {outgoing, Packet}; +next_msgs(Action) when is_tuple(Action) -> + Action; +next_msgs(Actions) when is_list(Actions) -> + Actions. + +-compile({inline, [stop/2]}). +stop(Reason, State) -> + {stop, Reason, State}. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 7434fe932..10198b5a8 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% MQTT WebSocket Connection +%% MQTT/WS Connection -module(emqx_ws_connection). -include("emqx.hrl"). @@ -22,8 +22,9 @@ -include("logger.hrl"). -include("types.hrl"). --logger_header("[WsConnection]"). +-logger_header("[MQTT/WS]"). +%% API -export([ info/1 , stats/1 ]). @@ -35,6 +36,7 @@ , websocket_init/1 , websocket_handle/2 , websocket_info/2 + , websocket_close/2 , terminate/3 ]). @@ -43,14 +45,14 @@ peername :: emqx_types:peername(), %% Sockname of the ws connection sockname :: emqx_types:peername(), - %% Conn state - conn_state :: idle | connected | disconnected, + %% Sock state + sockstate :: emqx_types:sockstate(), %% Parser State parse_state :: emqx_frame:parse_state(), %% Serialize function serialize :: emqx_frame:serialize_fun(), - %% Channel State - chan_state :: emqx_channel:channel(), + %% Channel + channel :: emqx_channel:channel(), %% Out Pending Packets pendings :: list(emqx_types:packet()), %% The stop reason @@ -59,7 +61,7 @@ -type(state() :: #state{}). --define(INFO_KEYS, [socktype, peername, sockname, active_state]). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). @@ -70,8 +72,8 @@ -spec(info(pid()|state()) -> emqx_types:infos()). info(WsPid) when is_pid(WsPid) -> call(WsPid, info); -info(WsConn = #state{chan_state = ChanState}) -> - ChanInfo = emqx_channel:info(ChanState), +info(WsConn = #state{channel = Channel}) -> + ChanInfo = emqx_channel:info(Channel), SockInfo = maps:from_list(info(?INFO_KEYS, WsConn)), maps:merge(ChanInfo, #{sockinfo => SockInfo}). @@ -83,18 +85,18 @@ info(peername, #state{peername = Peername}) -> Peername; info(sockname, #state{sockname = Sockname}) -> Sockname; -info(active_state, _State) -> - running; -info(chan_state, #state{chan_state = ChanState}) -> - emqx_channel:info(ChanState). +info(sockstate, #state{sockstate = SockSt}) -> + SockSt; +info(channel, #state{channel = Channel}) -> + emqx_channel:info(Channel). -spec(stats(pid()|state()) -> emqx_types:stats()). stats(WsPid) when is_pid(WsPid) -> call(WsPid, stats); -stats(#state{chan_state = ChanState}) -> +stats(#state{channel = Channel}) -> SockStats = emqx_pd:get_counters(?SOCK_STATS), ConnStats = emqx_pd:get_counters(?CONN_STATS), - ChanStats = emqx_channel:stats(ChanState), + ChanStats = emqx_channel:stats(Channel), ProcStats = emqx_misc:proc_stats(), lists:append([SockStats, ConnStats, ChanStats, ProcStats]). @@ -168,27 +170,26 @@ websocket_init([Req, Opts]) -> FrameOpts = emqx_zone:frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), Serialize = emqx_frame:serialize_fun(), - ChanState = emqx_channel:init(ConnInfo, Opts), + Channel = emqx_channel:init(ConnInfo, Opts), emqx_logger:set_metadata_peername(esockd_net:format(Peername)), {ok, #state{peername = Peername, sockname = Sockname, - conn_state = idle, + sockstate = idle, parse_state = ParseState, serialize = Serialize, - chan_state = ChanState, + channel = Channel, pendings = [] }}. websocket_handle({binary, Data}, State) when is_list(Data) -> websocket_handle({binary, iolist_to_binary(Data)}, State); -websocket_handle({binary, Data}, State = #state{chan_state = ChanState}) -> +websocket_handle({binary, Data}, State = #state{channel = Channel}) -> ?LOG(debug, "RECV ~p", [Data]), Oct = iolist_size(Data), ok = inc_recv_stats(1, Oct), - NChanState = emqx_channel:received(Oct, ChanState), - NState = State#state{chan_state = NChanState}, - process_incoming(Data, NState); + {ok, NChannel} = emqx_channel:handle_in(Oct, Channel), + process_incoming(Data, State#state{channel = NChannel}); %% Pings should be replied with pongs, cowboy does it automatically %% Pongs can be safely ignored. Clause here simply prevents crash. @@ -203,56 +204,27 @@ websocket_handle({FrameType, _}, State) -> ?LOG(error, "Unexpected frame - ~p", [FrameType]), stop({shutdown, unexpected_ws_frame}, State). -websocket_info({call, From, info}, State) -> - gen_server:reply(From, info(State)), - {ok, State}; +websocket_info({call, From, Req}, State) -> + handle_call(From, Req, State); -websocket_info({call, From, stats}, State) -> - gen_server:reply(From, stats(State)), - {ok, State}; - -websocket_info({call, From, state}, State) -> - gen_server:reply(From, State), - {ok, State}; - -websocket_info({call, From, Req}, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_call(Req, ChanState) of - {ok, Reply, NChanState} -> - _ = gen_server:reply(From, Reply), - {ok, State#state{chan_state = NChanState}}; - {stop, Reason, Reply, NChanState} -> - _ = gen_server:reply(From, Reply), - stop(Reason, State#state{chan_state = NChanState}) - end; - -websocket_info({cast, Msg}, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_info(Msg, ChanState) of - ok -> {ok, State}; - {ok, NChanState} -> - {ok, State#state{chan_state = NChanState}}; - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) - end; +websocket_info({cast, Msg}, State = #state{channel = Channel}) -> + handle_return(emqx_channel:handle_info(Msg, Channel), State); websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) -> - NState = State#state{serialize = emqx_frame:serialize_fun(ConnPkt)}, - handle_incoming(Packet, fun connected/1, NState); + Serialize = emqx_frame:serialize_fun(ConnPkt), + NState = State#state{sockstate = running, + serialize = Serialize + }, + handle_incoming(Packet, NState); -websocket_info({incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> - handle_incoming(Packet, fun reply/1, State); - -websocket_info({incoming, FrameError = {frame_error, _Reason}}, State) -> - handle_incoming(FrameError, State); +websocket_info({incoming, Packet}, State) -> + handle_incoming(Packet, State); websocket_info(Deliver = {deliver, _Topic, _Msg}, - State = #state{chan_state = ChanState}) -> + State = #state{channel = Channel}) -> Delivers = emqx_misc:drain_deliver([Deliver]), - case emqx_channel:handle_out({deliver, Delivers}, ChanState) of - {ok, NChanState} -> - reply(State#state{chan_state = NChanState}); - {ok, Packets, NChanState} -> - reply(enqueue(Packets, State#state{chan_state = NChanState})) - end; + Result = emqx_channel:handle_out(Delivers, Channel), + handle_return(Result, State); websocket_info({timeout, TRef, keepalive}, State) when is_reference(TRef) -> RecvOct = emqx_pd:get_counter(recv_oct), @@ -264,60 +236,70 @@ websocket_info({timeout, TRef, emit_stats}, State) when is_reference(TRef) -> websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) -> handle_timeout(TRef, Msg, State); +websocket_info({close, Reason}, State) -> + stop({shutdown, Reason}, State); + websocket_info({shutdown, Reason}, State) -> stop({shutdown, Reason}, State); websocket_info({stop, Reason}, State) -> stop(Reason, State); -websocket_info(Info, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_info(Info, ChanState) of - {ok, NChanState} -> - {ok, State#state{chan_state = NChanState}}; - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) - end. +websocket_info(Info, State) -> + handle_info(Info, State). -terminate(SockError, _Req, #state{chan_state = ChanState, +websocket_close(Reason, State) -> + ?LOG(debug, "WebSocket closed due to ~p~n", [Reason]), + handle_info({sock_closed, Reason}, State). + +terminate(SockError, _Req, #state{channel = Channel, stop_reason = Reason}) -> ?LOG(debug, "Terminated for ~p, sockerror: ~p", [Reason, SockError]), - emqx_channel:terminate(Reason, ChanState). + emqx_channel:terminate(Reason, Channel). %%-------------------------------------------------------------------- -%% Connected callback +%% Handle call -connected(State = #state{chan_state = ChanState}) -> - ChanAttrs = emqx_channel:attrs(ChanState), - SockAttrs = #{active_state => running}, - Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), - ok = emqx_channel:handle_info({register, Attrs, stats(State)}, ChanState), - reply(State#state{conn_state = connected}). +handle_call(From, info, State) -> + gen_server:reply(From, info(State)), + {ok, State}; -%%-------------------------------------------------------------------- -%% Close +handle_call(From, stats, State) -> + gen_server:reply(From, stats(State)), + {ok, State}; -close(Reason, State) -> - ?LOG(warning, "Closed for ~p", [Reason]), - reply(State#state{conn_state = disconnected}). +handle_call(From, Req, State = #state{channel = Channel}) -> + case emqx_channel:handle_call(Req, Channel) of + {reply, Reply, NChannel} -> + _ = gen_server:reply(From, Reply), + {ok, State#state{channel = NChannel}}; + {stop, Reason, Reply, NChannel} -> + _ = gen_server:reply(From, Reply), + stop(Reason, State#state{channel = NChannel}); + {stop, Reason, Reply, OutPacket, NChannel} -> + gen_server:reply(From, Reply), + NState = State#state{channel = NChannel}, + stop(Reason, enqueue(OutPacket, NState)) + end. %%-------------------------------------------------------------------- %% Handle timeout -handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_timeout(TRef, Msg, ChanState) of - {ok, NChanState} -> - {ok, State#state{chan_state = NChanState}}; - {ok, Packets, NChanState} -> - NState = State#state{chan_state = NChanState}, - reply(enqueue(Packets, NState)); - {close, Reason, NChanState} -> - close(Reason, State#state{chan_state = NChanState}); - {close, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - close(Reason, enqueue(OutPackets, NState)); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) - end. +handle_timeout(TRef, Msg, State = #state{channel = Channel}) -> + handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State). + +%%-------------------------------------------------------------------- +%% Handle Info + +handle_info({enter, _}, State = #state{channel = Channel}) -> + ChanAttrs = emqx_channel:attrs(Channel), + SockAttrs = maps:from_list(info(?INFO_KEYS, State)), + Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), + ok = emqx_channel:handle_info({register, Attrs, stats(State)}, Channel), + reply(State); + +handle_info(Info, State = #state{channel = Channel}) -> + handle_return(emqx_channel:handle_info(Info, Channel), State). %%-------------------------------------------------------------------- %% Process incoming data @@ -343,48 +325,39 @@ process_incoming(Data, State = #state{parse_state = ParseState}) -> %%-------------------------------------------------------------------- %% Handle incoming packets -handle_incoming(Packet = ?PACKET(Type), SuccFun, - State = #state{chan_state = ChanState}) -> +handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) -> _ = inc_incoming_stats(Type), _ = emqx_metrics:inc_recv(Packet), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), - case emqx_channel:handle_in(Packet, ChanState) of - {ok, NChanState} -> - SuccFun(State#state{chan_state= NChanState}); - {ok, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - SuccFun(enqueue(OutPackets, NState)); - {close, Reason, NChanState} -> - close(Reason, State#state{chan_state = NChanState}); - {close, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - close(Reason, enqueue(OutPackets, NState)); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}); - {stop, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state= NChanState}, - stop(Reason, enqueue(OutPackets, NState)) - end. + handle_return(emqx_channel:handle_in(Packet, Channel), State); -handle_incoming(FrameError = {frame_error, _Reason}, - State = #state{chan_state = ChanState}) -> - case emqx_channel:handle_in(FrameError, ChanState) of - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}); - {stop, Reason, OutPackets, NChanState} -> - NState = State#state{chan_state = NChanState}, - stop(Reason, enqueue(OutPackets, NState)) - end. +handle_incoming(FrameError, State = #state{channel = Channel}) -> + handle_return(emqx_channel:handle_in(FrameError, Channel), State). + +%%-------------------------------------------------------------------- +%% Handle channel return + +handle_return(ok, State) -> + reply(State); +handle_return({ok, NChannel}, State) -> + reply(State#state{channel= NChannel}); +handle_return({ok, Replies, NChannel}, State) -> + reply(Replies, State#state{channel= NChannel}); +handle_return({stop, Reason, NChannel}, State) -> + stop(Reason, State#state{channel = NChannel}); +handle_return({stop, Reason, OutPacket, NChannel}, State) -> + NState = State#state{channel = NChannel}, + stop(Reason, enqueue(OutPacket, NState)). %%-------------------------------------------------------------------- %% Handle outgoing packets -handle_outgoing(Packets, State = #state{chan_state = ChanState}) -> +handle_outgoing(Packets, State = #state{channel = Channel}) -> IoData = lists:map(serialize_and_inc_stats_fun(State), Packets), Oct = iolist_size(IoData), ok = inc_sent_stats(length(Packets), Oct), - NChanState = emqx_channel:sent(Oct, ChanState), - {{binary, IoData}, State#state{chan_state = NChanState}}. + {ok, NChannel} = emqx_channel:handle_out(Oct, Channel), + {{binary, IoData}, State#state{channel = NChannel}}. %% TODO: Duplicated with emqx_channel:serialize_and_inc_stats_fun/1 serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> @@ -433,7 +406,25 @@ inc_sent_stats(Cnt, Oct) -> %%-------------------------------------------------------------------- %% Reply or Stop --compile({inline, [reply/1]}). +reply(Packet, State) when is_record(Packet, mqtt_packet) -> + reply(enqueue(Packet, State)); +reply({outgoing, Packets}, State) -> + reply(enqueue(Packets, State)); +reply(Close = {close, _Reason}, State) -> + self() ! Close, + reply(State); + +reply([], State) -> + reply(State); +reply([Packet|More], State) when is_record(Packet, mqtt_packet) -> + reply(More, enqueue(Packet, State)); +reply([{outgoing, Packets}|More], State) -> + reply(More, enqueue(Packets, State)); +reply([Other|More], State) -> + self() ! Other, + reply(More, State). + +-compile({inline, [reply/1, enqueue/2]}). reply(State = #state{pendings = []}) -> {ok, State}; @@ -441,6 +432,11 @@ reply(State = #state{pendings = Pendings}) -> {Reply, NState} = handle_outgoing(Pendings, State), {reply, Reply, NState#state{pendings = []}}. +enqueue(Packet, State) when is_record(Packet, mqtt_packet) -> + enqueue([Packet], State); +enqueue(Packets, State = #state{pendings = Pendings}) -> + State#state{pendings = lists:append(Pendings, Packets)}. + stop(Reason, State = #state{pendings = []}) -> {stop, State#state{stop_reason = Reason}}; stop(Reason, State = #state{pendings = Pendings}) -> @@ -448,8 +444,3 @@ stop(Reason, State = #state{pendings = Pendings}) -> State2 = State1#state{pendings = [], stop_reason = Reason}, {reply, [Reply, close], State2}. -enqueue(Packet, State) when is_record(Packet, mqtt_packet) -> - enqueue([Packet], State); -enqueue(Packets, State = #state{pendings = Pendings}) -> - State#state{pendings = lists:append(Pendings, Packets)}. - diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 6623db872..a77d69c10 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -64,17 +64,18 @@ t_handle_connect(_) -> is_bridge = false, clean_start = true, keepalive = 30, - properties = #{}, + properties = undefined, clientid = <<"clientid">>, username = <<"username">>, password = <<"passwd">> }, with_channel( fun(Channel) -> - {ok, ?CONNACK_PACKET(?RC_SUCCESS), Channel1} - = handle_in(?CONNECT_PACKET(ConnPkt), Channel), - #{clientid := ClientId, username := Username} - = emqx_channel:info(clientinfo, Channel1), + ConnAck = ?CONNACK_PACKET(?RC_SUCCESS, 0, #{}), + ExpectedOutput = [{outgoing, ConnAck},{enter, connected}], + {ok, Output, Channel1} = handle_in(?CONNECT_PACKET(ConnPkt), Channel), + ?assertEqual(ExpectedOutput, Output), + #{clientid := ClientId, username := Username} = emqx_channel:info(clientinfo, Channel1), ?assertEqual(<<"clientid">>, ClientId), ?assertEqual(<<"username">>, Username) end). @@ -180,7 +181,7 @@ t_handle_in_auth(_) -> %%-------------------------------------------------------------------- t_handle_deliver(_) -> - with_channel( + with_connected_channel( fun(Channel) -> TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS#{qos => ?QOS_2}}], {ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel1} @@ -188,7 +189,7 @@ t_handle_deliver(_) -> Msg0 = emqx_message:make(<<"clientx">>, ?QOS_0, <<"t0">>, <<"qos0">>), Msg1 = emqx_message:make(<<"clientx">>, ?QOS_1, <<"t1">>, <<"qos1">>), Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}], - {ok, Packets, _Ch} = emqx_channel:handle_out({deliver, Delivers}, Channel1), + {ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_out(Delivers, Channel1), ?assertEqual([?QOS_0, ?QOS_1], [emqx_packet:qos(Pkt)|| Pkt <- Packets]) end). @@ -206,10 +207,9 @@ t_handle_out_connack(_) -> }, with_channel( fun(Channel) -> - {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, _), _} + {ok, [{outgoing, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}, {enter, connected}], _Chan} = handle_out({connack, ?RC_SUCCESS, 0, ConnPkt}, Channel), - {stop, {shutdown, not_authorized}, - ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} + {stop, {shutdown, not_authorized}, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} = handle_out({connack, ?RC_NOT_AUTHORIZED, ConnPkt}, Channel) end). @@ -220,7 +220,7 @@ t_handle_out_publish(_) -> Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)}, {ok, ?PUBLISH_PACKET(?QOS_0), Channel} = handle_out(Pub0, Channel), {ok, ?PUBLISH_PACKET(?QOS_1), Channel} = handle_out(Pub1, Channel), - {ok, Packets, Channel1} = handle_out({publish, [Pub0, Pub1]}, Channel), + {ok, {outgoing, Packets}, Channel1} = handle_out({publish, [Pub0, Pub1]}, Channel), ?assertEqual(2, length(Packets)), ?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, Channel1)) end). @@ -304,6 +304,12 @@ t_terminate(_) -> %% Helper functions %%-------------------------------------------------------------------- +with_connected_channel(TestFun) -> + with_channel( + fun(Channel) -> + TestFun(emqx_channel:set_field(conn_state, connected, Channel)) + end). + with_channel(TestFun) -> with_channel(#{}, TestFun). diff --git a/test/emqx_pool_SUITE.erl b/test/emqx_pool_SUITE.erl index 8ec9d6941..5aca5ca3f 100644 --- a/test/emqx_pool_SUITE.erl +++ b/test/emqx_pool_SUITE.erl @@ -39,6 +39,7 @@ groups() -> ]. init_per_suite(Config) -> + ok = emqx_logger:set_log_level(emergency), application:ensure_all_started(gproc), Config. From 7f730ffec67e3835a547c034474cb5430644b10e Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sat, 12 Oct 2019 18:05:34 +0800 Subject: [PATCH 11/14] Revert client_id options in the testcases --- test/emqx_SUITE.erl | 2 +- test/emqx_client_SUITE.erl | 12 ++++++------ test/emqx_connection_SUITE.erl | 2 +- test/emqx_modules_SUITE.erl | 8 ++++---- test/emqx_msg_expiry_interval_SUITE.erl | 8 ++++---- test/emqx_request_responser_SUITE.erl | 4 ++-- test/emqx_shared_sub_SUITE.erl | 12 ++++++------ test/emqx_tracer_SUITE.erl | 2 +- 8 files changed, 25 insertions(+), 25 deletions(-) diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index 563683bf1..0a6ef4d0a 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -43,7 +43,7 @@ t_get_env(_) -> t_emqx_pubsub_api(_) -> emqx:start(), true = emqx:is_running(node()), - {ok, C} = emqtt:start_link([{host, "localhost"}, {client_id, "myclient"}]), + {ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]), {ok, _} = emqtt:connect(C), ClientId = <<"myclient">>, Topic = <<"mytopic">>, diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index 1e74982c8..8c3d55cec 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -95,7 +95,7 @@ t_cm(_) -> IdleTimeout = emqx_zone:get_env(external, idle_timeout, 30000), emqx_zone:set_env(external, idle_timeout, 1000), ClientId = <<"myclient">>, - {ok, C} = emqtt:start_link([{client_id, ClientId}]), + {ok, C} = emqtt:start_link([{clientid, ClientId}]), {ok, _} = emqtt:connect(C), ct:sleep(500), #{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_attrs(ClientId), @@ -135,13 +135,13 @@ t_will_message(_Config) -> t_offline_message_queueing(_) -> {ok, C1} = emqtt:start_link([{clean_start, false}, - {client_id, <<"c1">>}]), + {clientid, <<"c1">>}]), {ok, _} = emqtt:connect(C1), {ok, _, [2]} = emqtt:subscribe(C1, nth(6, ?WILD_TOPICS), 2), ok = emqtt:disconnect(C1), {ok, C2} = emqtt:start_link([{clean_start, true}, - {client_id, <<"c2">>}]), + {clientid, <<"c2">>}]), {ok, _} = emqtt:connect(C2), ok = emqtt:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0), @@ -149,7 +149,7 @@ t_offline_message_queueing(_) -> {ok, _} = emqtt:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2), timer:sleep(10), emqtt:disconnect(C2), - {ok, C3} = emqtt:start_link([{clean_start, false}, {client_id, <<"c1">>}]), + {ok, C3} = emqtt:start_link([{clean_start, false}, {clientid, <<"c1">>}]), {ok, _} = emqtt:connect(C3), timer:sleep(10), @@ -197,7 +197,7 @@ t_overlapping_subscriptions(_) -> t_redelivery_on_reconnect(_) -> ct:pal("Redelivery on reconnect test starting"), - {ok, C1} = emqtt:start_link([{clean_start, false}, {client_id, <<"c">>}]), + {ok, C1} = emqtt:start_link([{clean_start, false}, {clientid, <<"c">>}]), {ok, _} = emqtt:connect(C1), {ok, _, [2]} = emqtt:subscribe(C1, nth(7, ?WILD_TOPICS), 2), @@ -210,7 +210,7 @@ t_redelivery_on_reconnect(_) -> timer:sleep(10), ok = emqtt:disconnect(C1), ?assertEqual(0, length(recv_msgs(2))), - {ok, C2} = emqtt:start_link([{clean_start, false}, {client_id, <<"c">>}]), + {ok, C2} = emqtt:start_link([{clean_start, false}, {clientid, <<"c">>}]), {ok, _} = emqtt:connect(C2), timer:sleep(10), diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index d321d2c85..39cc67dee 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -33,7 +33,7 @@ end_per_suite(_Config) -> t_basic(_) -> Topic = <<"TopicA">>, - {ok, C} = emqtt:start_link([{port, 1883}, {client_id, <<"hello">>}]), + {ok, C} = emqtt:start_link([{port, 1883}, {clientid, <<"hello">>}]), {ok, _} = emqtt:connect(C), {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1), {ok, _, [2]} = emqtt:subscribe(C, Topic, qos2), diff --git a/test/emqx_modules_SUITE.erl b/test/emqx_modules_SUITE.erl index 12f304f54..437a21aec 100644 --- a/test/emqx_modules_SUITE.erl +++ b/test/emqx_modules_SUITE.erl @@ -55,11 +55,11 @@ end_per_suite(_Config) -> %% Test case for emqx_mod_presence t_mod_presence(_) -> ok = emqx_mod_presence:load([{qos, ?QOS_1}]), - {ok, C1} = emqtt:start_link([{client_id, <<"monsys">>}]), + {ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]), {ok, _} = emqtt:connect(C1), {ok, _Props, [?QOS_1]} = emqtt:subscribe(C1, <<"$SYS/brokers/+/clients/#">>, qos1), %% Connected Presence - {ok, C2} = emqtt:start_link([{client_id, <<"clientid">>}, + {ok, C2} = emqtt:start_link([{clientid, <<"clientid">>}, {username, <<"username">>}]), {ok, _} = emqtt:connect(C2), ok = recv_and_check_presence(<<"clientid">>, <<"connected">>), @@ -98,7 +98,7 @@ recv_and_check_presence(ClientId, Presence) -> t_mod_subscription(_) -> emqx_mod_subscription:load([{<<"connected/%c/%u">>, ?QOS_0}]), {ok, C} = emqtt:start_link([{host, "localhost"}, - {client_id, "myclient"}, + {clientid, "myclient"}, {username, "admin"}]), {ok, _} = emqtt:connect(C), emqtt:publish(C, <<"connected/myclient/admin">>, <<"Hello world">>, ?QOS_0), @@ -111,7 +111,7 @@ t_mod_subscription(_) -> %% Test case for emqx_mod_write t_mod_rewrite(_Config) -> ok = emqx_mod_rewrite:load(?RULES), - {ok, C} = emqtt:start_link([{client_id, <<"rewrite_client">>}]), + {ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]), {ok, _} = emqtt:connect(C), OrigTopics = [<<"x/y/2">>, <<"x/1/2">>, <<"y/a/z/b">>, <<"y/def">>], DestTopics = [<<"z/y/2">>, <<"x/1/2">>, <<"y/z/b">>, <<"y/def">>], diff --git a/test/emqx_msg_expiry_interval_SUITE.erl b/test/emqx_msg_expiry_interval_SUITE.erl index 6fe18ff5a..063cf19dc 100644 --- a/test/emqx_msg_expiry_interval_SUITE.erl +++ b/test/emqx_msg_expiry_interval_SUITE.erl @@ -42,8 +42,8 @@ t_message_expiry_interval_2(_) -> emqtt:stop(ClientA). message_expiry_interval_init() -> - {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), {ok, _} = emqtt:connect(ClientA), {ok, _} = emqtt:connect(ClientB), %% subscribe and disconnect client-b @@ -58,7 +58,7 @@ message_expiry_interval_exipred(ClientA, QoS) -> ct:sleep(1500), %% resume the session for client-b - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), {ok, _} = emqtt:connect(ClientB1), %% verify client-b could not receive the publish message @@ -78,7 +78,7 @@ message_expiry_interval_not_exipred(ClientA, QoS) -> %% wait for 1s and then resume the session for client-b, the message should not expires %% as Message-Expiry-Interval = 20s ct:sleep(1000), - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), {ok, _} = emqtt:connect(ClientB1), %% verify client-b could receive the publish message and the Message-Expiry-Interval is set diff --git a/test/emqx_request_responser_SUITE.erl b/test/emqx_request_responser_SUITE.erl index d603f2b96..7d66c7760 100644 --- a/test/emqx_request_responser_SUITE.erl +++ b/test/emqx_request_responser_SUITE.erl @@ -42,7 +42,7 @@ request_response_per_qos(QoS) -> RspTopic = <<"response_topic">>, {ok, Requester} = emqx_request_sender:start_link(RspTopic, QoS, [{proto_ver, v5}, - {client_id, <<"requester">>}, + {clientid, <<"requester">>}, {properties, #{ 'Request-Response-Information' => 1}}]), %% This is a square service Square = fun(_CorrData, ReqBin) -> @@ -51,7 +51,7 @@ request_response_per_qos(QoS) -> end, {ok, Responser} = emqx_request_handler:start_link(ReqTopic, QoS, Square, [{proto_ver, v5}, - {client_id, <<"responser">>} + {clientid, <<"responser">>} ]), ok = emqx_request_sender:send(Requester, ReqTopic, RspTopic, <<"corr-1">>, <<"2">>, QoS), receive diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 14baf8a76..9fec11171 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -80,9 +80,9 @@ t_no_connection_nack(_) -> ShareTopic = <<"$share/", Group/binary, $/, Topic/binary>>, ExpProp = [{properties, #{'Session-Expiry-Interval' => timer:seconds(30)}}], - {ok, SubConnPid1} = emqtt:start_link([{client_id, Subscriber1}] ++ ExpProp), + {ok, SubConnPid1} = emqtt:start_link([{clientid, Subscriber1}] ++ ExpProp), {ok, _Props} = emqtt:connect(SubConnPid1), - {ok, SubConnPid2} = emqtt:start_link([{client_id, Subscriber2}] ++ ExpProp), + {ok, SubConnPid2} = emqtt:start_link([{clientid, Subscriber2}] ++ ExpProp), {ok, _Props} = emqtt:connect(SubConnPid2), emqtt:subscribe(SubConnPid1, ShareTopic, QoS), emqtt:subscribe(SubConnPid1, ShareTopic, QoS), @@ -151,9 +151,9 @@ t_not_so_sticky(_) -> ok = ensure_config(sticky), ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, - {ok, C1} = emqtt:start_link([{client_id, ClientId1}]), + {ok, C1} = emqtt:start_link([{clientid, ClientId1}]), {ok, _} = emqtt:connect(C1), - {ok, C2} = emqtt:start_link([{client_id, ClientId2}]), + {ok, C2} = emqtt:start_link([{clientid, ClientId2}]), {ok, _} = emqtt:connect(C2), emqtt:subscribe(C1, {<<"$share/group1/foo/bar">>, 0}), @@ -179,9 +179,9 @@ test_two_messages(Strategy, WithAck) -> Topic = <<"foo/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, - {ok, ConnPid1} = emqtt:start_link([{client_id, ClientId1}]), + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), {ok, _} = emqtt:connect(ConnPid1), - {ok, ConnPid2} = emqtt:start_link([{client_id, ClientId2}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]), {ok, _} = emqtt:connect(ConnPid2), Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), diff --git a/test/emqx_tracer_SUITE.erl b/test/emqx_tracer_SUITE.erl index 2eee6a1e1..ece0f5799 100644 --- a/test/emqx_tracer_SUITE.erl +++ b/test/emqx_tracer_SUITE.erl @@ -35,7 +35,7 @@ end_per_suite(_Config) -> t_start_traces(_Config) -> {ok, T} = emqtt:start_link([{host, "localhost"}, - {client_id, <<"client">>}, + {clientid, <<"client">>}, {username, <<"testuser">>}, {password, <<"pass">>} ]), From 6233aa7d46fb75dec8be055eaddc9f994e2a4643 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 12 Oct 2019 19:14:33 +0800 Subject: [PATCH 12/14] Ensure the 'conn_state' be 'disconnected' after socket closed --- src/emqx_channel.erl | 13 +++++++------ src/emqx_connection.erl | 2 +- test/emqx_ctl_SUITE.erl | 1 + 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index e66817e39..2a92ba755 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -737,9 +737,9 @@ handle_info({register, Attrs, Stats}, #channel{clientinfo = #{clientid := Client handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) -> {ok, Channel}; -handle_info({sock_closed, _Reason}, Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo = #{zone := Zone}, - will_msg = WillMsg}) -> +handle_info({sock_closed, Reason}, Channel = #channel{conninfo = ConnInfo, + clientinfo = ClientInfo = #{zone := Zone}, + will_msg = WillMsg}) -> emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), ConnInfo1 = ConnInfo#{disconnected_at => erlang:system_time(second)}, Channel1 = Channel#channel{conninfo = ConnInfo1, conn_state = disconnected}, @@ -750,11 +750,11 @@ handle_info({sock_closed, _Reason}, Channel = #channel{conninfo = ConnInfo, end, case maps:get(expiry_interval, ConnInfo) of ?UINT_MAX -> - {ok, Channel2}; + {ok, {enter, disconnected}, Channel2}; Int when Int > 0 -> - {ok, ensure_timer(expire_timer, Channel2)}; + {ok, {enter, disconnected}, ensure_timer(expire_timer, Channel2)}; _Other -> - shutdown(closed, Channel2) + shutdown(Reason, Channel2) end; handle_info(Info, Channel) -> @@ -1206,3 +1206,4 @@ shutdown(Reason, Channel) -> shutdown(Reason, Packets, Channel) -> {stop, {shutdown, Reason}, Packets, Channel}. + diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 3fa0db145..02d521814 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -554,7 +554,7 @@ handle_info(sock_closed, State) -> handle_info({close, Reason}, State) -> ?LOG(debug, "Force close due to : ~p", [Reason]), - {ok, close_socket(State)}; + handle_info({sock_closed, Reason}, close_socket(State)); handle_info(Info, State = #state{channel = Channel}) -> handle_return(emqx_channel:handle_info(Info, Channel), State). diff --git a/test/emqx_ctl_SUITE.erl b/test/emqx_ctl_SUITE.erl index eaec942a5..bdc31a321 100644 --- a/test/emqx_ctl_SUITE.erl +++ b/test/emqx_ctl_SUITE.erl @@ -25,6 +25,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> + ok = emqx_logger:set_log_level(emergency), Config. end_per_suite(_Config) -> From a1877f3f42be8020b56bf67bcac7fd3c697c67d8 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 14 Oct 2019 12:38:57 +0800 Subject: [PATCH 13/14] Return the '{enter, connected}' event first --- src/emqx_channel.erl | 6 ++---- test/emqx_channel_SUITE.erl | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 2a92ba755..1160c460d 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -572,15 +572,13 @@ handle_out({connack, ?RC_SUCCESS, SP, ConnPkt}, AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps), case maybe_resume_session(Channel2) of ignore -> - Output = [{outgoing, AckPacket}, {enter, connected}], - {ok, Output, Channel2}; + {ok, [{enter, connected}, {outgoing, AckPacket}], Channel2}; {ok, Publishes, NSession} -> Channel3 = Channel2#channel{session = NSession, resuming = false, pendings = []}, {ok, {outgoing, Packets}, _} = handle_out({publish, Publishes}, Channel3), - Output = [{outgoing, [AckPacket|Packets]}, {enter, connected}], - {ok, Output, Channel3} + {ok, [{enter, connected}, {outgoing, [AckPacket|Packets]}], Channel3} end; handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo, diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 8e169d597..6e73d0703 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -72,7 +72,7 @@ t_handle_connect(_) -> with_channel( fun(Channel) -> ConnAck = ?CONNACK_PACKET(?RC_SUCCESS, 0, #{}), - ExpectedOutput = [{outgoing, ConnAck},{enter, connected}], + ExpectedOutput = [{enter, connected},{outgoing, ConnAck}], {ok, Output, Channel1} = handle_in(?CONNECT_PACKET(ConnPkt), Channel), ?assertEqual(ExpectedOutput, Output), #{clientid := ClientId, username := Username} = emqx_channel:info(clientinfo, Channel1), @@ -207,7 +207,7 @@ t_handle_out_connack(_) -> }, with_channel( fun(Channel) -> - {ok, [{outgoing, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}, {enter, connected}], _Chan} + {ok, [{enter, connected},{outgoing, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}], _Chan} = handle_out({connack, ?RC_SUCCESS, 0, ConnPkt}, Channel), {stop, {shutdown, not_authorized}, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} = handle_out({connack, ?RC_NOT_AUTHORIZED, ConnPkt}, Channel) From 53dda488338412b9a9f068ea7915ebef9d4146ae Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 14 Oct 2019 17:01:41 +0800 Subject: [PATCH 14/14] Fix the 'function_clause' error when session is undefined --- src/emqx_channel.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 1160c460d..5f4821897 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -153,6 +153,8 @@ attrs(session, #channel{session = Session}) -> attrs(Key, Channel) -> info(Key, Channel). -spec(stats(channel()) -> emqx_types:stats()). +stats(#channel{pub_stats = PubStats, session = undefined}) -> + maps:to_list(PubStats); stats(#channel{pub_stats = PubStats, session = Session}) -> maps:to_list(PubStats) ++ emqx_session:stats(Session).