diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index e58f55738..c347d73f7 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -52,16 +52,18 @@ -export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]). %% Client State --record(client_state, {connection, connname, peername, peerhost, peerport, await_recv, - conn_state, rate_limit, packet_limit, parse_state, proto_state, +%% Unused fields: connname, peerhost, peerport +-record(client_state, {connection, peername, conn_state, await_recv, + rate_limit, packet_size, parser, proto_state, keepalive, enable_stats}). --define(INFO_KEYS, [connname, peername, peerhost, peerport, await_recv, conn_state]). +-define(INFO_KEYS, [peername, conn_state, await_recv]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(LOG(Level, Format, Args, State), - lager:Level("Client(~s): " ++ Format, [State#client_state.connname | Args])). + lager:Level("Client(~s): " ++ Format, + [esockd_net:format(State#client_state.peername) | Args])). start_link(Conn, Env) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}. @@ -96,50 +98,47 @@ session(CPid) -> init([Conn0, Env]) -> {ok, Conn} = Conn0:wait(), - {PeerHost, PeerPort, PeerName} = case Conn:peername() of - {ok, Peer = {Host, Port}} -> - {Host, Port, Peer}; - {error, enotconn} -> - Conn:fast_close(), - exit(normal); - {error, Reason} -> - Conn:fast_close(), - exit({shutdown, Reason}) - end, - ConnName = esockd_net:format(PeerName), + {ok, Peername} -> do_init(Conn, Env, Peername); + {error, enotconn} -> Conn:fast_close(), + exit(normal); + {error, Reason} -> Conn:fast_close(), + exit({shutdown, Reason}) + end. + +do_init(Conn, Env, Peername) -> + %% Send Fun + SendFun = send_fun(Conn, Peername), + RateLimit = get_value(rate_limit, Conn:opts()), + PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE), + Parser = emqttd_parser:initial_state(PacketSize), + ProtoState = emqttd_protocol:init(Peername, SendFun, Env), + EnableStats = get_value(client_enable_stats, Env, false), + State = run_socket(#client_state{connection = Conn, + peername = Peername, + await_recv = false, + conn_state = running, + rate_limit = RateLimit, + packet_size = PacketSize, + parser = Parser, + proto_state = ProtoState, + enable_stats = EnableStats}), + IdleTimout = get_value(client_idle_timeout, Env, 30000), + gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout, + {backoff, 1000, 1000, 5000}). + +send_fun(Conn, Peername) -> Self = self(), - %% Send Packet... - SendFun = fun(Packet) -> + fun(Packet) -> Data = emqttd_serializer:serialize(Packet), - ?LOG(debug, "SEND ~p", [Data], #client_state{connname = ConnName}), + ?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}), emqttd_metrics:inc('bytes/sent', iolist_size(Data)), try Conn:async_send(Data) of true -> ok catch error:Error -> Self ! {shutdown, Error} end - end, - RateLimit = get_value(rate_limit, Conn:opts()), - PacketLimit = proplists:get_value(max_packet_size, Env, ?MAX_PACKET_LEN), - ParseState = emqttd_parser:initial_state(PacketLimit), - ProtoState = emqttd_protocol:init(PeerName, SendFun, Env), - EnableStats = get_value(client_enable_stats, Env, false), - State = run_socket(#client_state{connection = Conn, - connname = ConnName, - peername = PeerName, - peerhost = PeerHost, - peerport = PeerPort, - await_recv = false, - conn_state = running, - rate_limit = RateLimit, - packet_limit = PacketLimit, - parse_state = ParseState, - proto_state = ProtoState, - enable_stats = EnableStats}), - IdleTimout = get_value(client_idle_timeout, Env, 30000), - gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout, - {backoff, 1000, 1000, 5000}). + end. prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end. @@ -147,8 +146,8 @@ prioritise_call(Msg, _From, _Len, _State) -> prioritise_info(Msg, _Len, _State) -> case Msg of {redeliver, _} -> 5; _ -> 0 end. -handle_pre_hibernate(State = #client_state{connname = Connname}) -> - io:format("Client(~s) will hibernate!~n", [Connname]), +handle_pre_hibernate(State = #client_state{peername = Peername}) -> + io:format("Client(~s) will hibernate!~n", [esockd_net:format(Peername)]), {hibernate, emit_stats(State)}. handle_call(info, From, State = #client_state{proto_state = ProtoState}) -> @@ -295,17 +294,17 @@ code_change(_OldVsn, State, _Extra) -> received(<<>>, State) -> {noreply, State, hibernate}; -received(Bytes, State = #client_state{parse_state = ParseState, - packet_limit = PacketLimit, +received(Bytes, State = #client_state{parser = Parser, + packet_size = PacketSize, proto_state = ProtoState}) -> - case catch emqttd_parser:parse(Bytes, ParseState) of - {more, NewParseState} -> - {noreply, run_socket(State#client_state{parse_state = NewParseState}), hibernate}; + case catch emqttd_parser:parse(Bytes, Parser) of + {more, NewParser} -> + {noreply, run_socket(State#client_state{parser = NewParser}), hibernate}; {ok, Packet, Rest} -> emqttd_metrics:received(Packet), case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - received(Rest, State#client_state{parse_state = emqttd_parser:initial_state(PacketLimit), + received(Rest, State#client_state{parser = emqttd_parser:initial_state(PacketSize), proto_state = ProtoState1}); {error, Error} -> ?LOG(error, "Protocol error - ~p", [Error], State),