From 3e15ac0bbbe5f8f6ab62a4facba85b5bf76834cd Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 9 Jan 2019 15:52:19 +0800 Subject: [PATCH] Improve the design of 'emqx_client' module Use '{next_event, cast, Packet}' to replace 'gen_statem:cast/2' --- src/emqx_client.erl | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 015bcc51b..4c19bbcc1 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -39,10 +39,10 @@ -export([initialized/3, waiting_for_connack/3, connected/3]). -export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). --export_type([client/0, properties/0, payload/0, - pubopt/0, subopt/0, request_input/0, - response_payload/0, request_handler/0, +-export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, + request_input/0, response_payload/0, request_handler/0, corr_data/0]). + -export_type([host/0, option/0]). %% Default timeout @@ -697,7 +697,8 @@ waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS, waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode, _SessPresent, - Properties), State = #state{ proto_ver = ProtoVer}) -> + Properties), + State = #state{proto_ver = ProtoVer}) -> Reason = emqx_reason_codes:name(ReasonCode, ProtoVer), case take_call(connect, State) of {value, #call{from = From}, _State} -> @@ -999,7 +1000,7 @@ should_ping(Sock) -> handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State) when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl -> emqx_logger:debug("RECV Data: ~p", [Data]), - receive_loop(Data, run_sock(State)); + process_incoming(Data, [], run_sock(State)); handle_event(info, {Error, _Sock, Reason}, _StateName, State) when Error =:= tcp_error; Error =:= ssl_error -> @@ -1323,21 +1324,25 @@ run_sock(State = #state{socket = Sock}) -> emqx_client_sock:setopts(Sock, [{active, once}]), State. %%------------------------------------------------------------------------------ -%% Receive Loop +%% Process incomming -receive_loop(<<>>, State) -> - {keep_state, State}; +process_incoming(<<>>, [Packet], State) -> + {keep_state, State, {next_event, cast, Packet}}; -receive_loop(Bytes, State = #state{parse_state = ParseState}) -> - case catch emqx_frame:parse(Bytes, ParseState) of +process_incoming(<<>>, Packets, State) -> + NextEvts = [{next_event, cast, Packet} || Packet <- lists:reverse(Packets)], + {keep_state, State, NextEvts}; + +process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) -> + try emqx_frame:parse(Bytes, ParseState) of {ok, Packet, Rest} -> - ok = gen_statem:cast(self(), Packet), - receive_loop(Rest, init_parse_state(State)); + process_incoming(Rest, [Packet|Packets], init_parse_state(State)); {more, NewParseState} -> {keep_state, State#state{parse_state = NewParseState}}; {error, Reason} -> - {stop, Reason}; - {'EXIT', Error} -> + {stop, Reason} + catch + error:Error -> {stop, Error} end.