Improve the design of 'emqx_client' module

Use '{next_event, cast, Packet}' to replace 'gen_statem:cast/2'
This commit is contained in:
Feng Lee 2019-01-09 15:52:19 +08:00 committed by Feng Lee
parent 30f32de13a
commit 3e15ac0bbb
1 changed files with 19 additions and 14 deletions

View File

@ -39,10 +39,10 @@
-export([initialized/3, waiting_for_connack/3, connected/3]). -export([initialized/3, waiting_for_connack/3, connected/3]).
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). -export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]).
-export_type([client/0, properties/0, payload/0, -export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0,
pubopt/0, subopt/0, request_input/0, request_input/0, response_payload/0, request_handler/0,
response_payload/0, request_handler/0,
corr_data/0]). corr_data/0]).
-export_type([host/0, option/0]). -export_type([host/0, option/0]).
%% Default timeout %% Default timeout
@ -697,7 +697,8 @@ waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS,
waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode, waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode,
_SessPresent, _SessPresent,
Properties), State = #state{ proto_ver = ProtoVer}) -> Properties),
State = #state{proto_ver = ProtoVer}) ->
Reason = emqx_reason_codes:name(ReasonCode, ProtoVer), Reason = emqx_reason_codes:name(ReasonCode, ProtoVer),
case take_call(connect, State) of case take_call(connect, State) of
{value, #call{from = From}, _State} -> {value, #call{from = From}, _State} ->
@ -999,7 +1000,7 @@ should_ping(Sock) ->
handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State) handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State)
when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl -> when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl ->
emqx_logger:debug("RECV Data: ~p", [Data]), emqx_logger:debug("RECV Data: ~p", [Data]),
receive_loop(Data, run_sock(State)); process_incoming(Data, [], run_sock(State));
handle_event(info, {Error, _Sock, Reason}, _StateName, State) handle_event(info, {Error, _Sock, Reason}, _StateName, State)
when Error =:= tcp_error; Error =:= ssl_error -> when Error =:= tcp_error; Error =:= ssl_error ->
@ -1323,21 +1324,25 @@ run_sock(State = #state{socket = Sock}) ->
emqx_client_sock:setopts(Sock, [{active, once}]), State. emqx_client_sock:setopts(Sock, [{active, once}]), State.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Receive Loop %% Process incomming
receive_loop(<<>>, State) -> process_incoming(<<>>, [Packet], State) ->
{keep_state, State}; {keep_state, State, {next_event, cast, Packet}};
receive_loop(Bytes, State = #state{parse_state = ParseState}) -> process_incoming(<<>>, Packets, State) ->
case catch emqx_frame:parse(Bytes, ParseState) of NextEvts = [{next_event, cast, Packet} || Packet <- lists:reverse(Packets)],
{keep_state, State, NextEvts};
process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) ->
try emqx_frame:parse(Bytes, ParseState) of
{ok, Packet, Rest} -> {ok, Packet, Rest} ->
ok = gen_statem:cast(self(), Packet), process_incoming(Rest, [Packet|Packets], init_parse_state(State));
receive_loop(Rest, init_parse_state(State));
{more, NewParseState} -> {more, NewParseState} ->
{keep_state, State#state{parse_state = NewParseState}}; {keep_state, State#state{parse_state = NewParseState}};
{error, Reason} -> {error, Reason} ->
{stop, Reason}; {stop, Reason}
{'EXIT', Error} -> catch
error:Error ->
{stop, Error} {stop, Error}
end. end.