diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index c347d73f7..93efa938b 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTT/TCP Connection +%% @doc MQTT/TCP Connection. -module(emqttd_client). @@ -57,7 +57,7 @@ rate_limit, packet_size, parser, proto_state, keepalive, enable_stats}). --define(INFO_KEYS, [peername, conn_state, await_recv]). +-define(INFO_KEYS, [peername, conn_state, await_recv, enable_stats]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -125,7 +125,7 @@ do_init(Conn, Env, Peername) -> enable_stats = EnableStats}), IdleTimout = get_value(client_idle_timeout, Env, 30000), gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout, - {backoff, 1000, 1000, 5000}). + {backoff, 1000, 1000, 10000}). send_fun(Conn, Peername) -> Self = self(), diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 7ee9a7a6a..4b3c3ca76 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -35,15 +35,15 @@ -export([process/2]). --record(proto_stats, {recv_pkt = 0, recv_msg = 0, send_pkt = 0, send_msg = 0}). +-record(proto_stats, {enable_stats = false, recv_pkt = 0, recv_msg = 0, + send_pkt = 0, send_msg = 0}). %% Protocol State --record(proto_state, {peername, sendfun, connected = false, - client_id, client_pid, clean_sess, - proto_ver, proto_name, username, is_superuser = false, - will_msg, keepalive, max_clientid_len = ?MAX_CLIENTID_LEN, - session, stats, ws_initial_headers, %% Headers from first HTTP request for websocket client - connected_at}). +%% ws_initial_headers: Headers from first HTTP request for WebSocket Client. +-record(proto_state, {peername, sendfun, connected = false, client_id, client_pid, + clean_sess, proto_ver, proto_name, username, is_superuser, + will_msg, keepalive, max_clientid_len, session, stats_data, + ws_initial_headers, connected_at}). -type(proto_state() :: #proto_state{}). @@ -58,20 +58,22 @@ %% @doc Init protocol init(Peername, SendFun, Opts) -> + EnableStats = get_value(client_enable_stats, Opts, false), MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), WsInitialHeaders = get_value(ws_initial_headers, Opts), #proto_state{peername = Peername, sendfun = SendFun, - max_clientid_len = MaxLen, client_pid = self(), - stats = #proto_stats{}, - ws_initial_headers = WsInitialHeaders}. + max_clientid_len = MaxLen, + is_superuser = false, + ws_initial_headers = WsInitialHeaders, + stats_data = #proto_stats{enable_stats = EnableStats}}. info(ProtoState) -> ?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS). -stats(#proto_state{stats = Stats}) -> - ?record_to_proplist(proto_stats, Stats). +stats(#proto_state{stats_data = Stats}) -> + tl(?record_to_proplist(proto_stats, Stats)). clientid(#proto_state{client_id = ClientId}) -> ClientId. @@ -109,9 +111,9 @@ session(#proto_state{session = Session}) -> %% A Client can only send the CONNECT Packet once over a Network Connection. -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}). received(Packet = ?PACKET(?CONNECT), - State = #proto_state{connected = false, stats = Stats}) -> + State = #proto_state{connected = false, stats_data = Stats}) -> trace(recv, Packet, State), Stats1 = inc_stats(recv, ?CONNECT, Stats), - process(Packet, State#proto_state{connected = true, stats = Stats1}); + process(Packet, State#proto_state{connected = true, stats_data = Stats1}); received(?PACKET(?CONNECT), State = #proto_state{connected = true}) -> {error, protocol_bad_connect, State}; @@ -120,11 +122,11 @@ received(?PACKET(?CONNECT), State = #proto_state{connected = true}) -> received(_Packet, State = #proto_state{connected = false}) -> {error, protocol_not_connected, State}; -received(Packet = ?PACKET(Type), State = #proto_state{stats = Stats}) -> +received(Packet = ?PACKET(Type), State = #proto_state{stats_data = Stats}) -> trace(recv, Packet, State), Stats1 = inc_stats(recv, Type, Stats), case validate_packet(Packet) of ok -> - process(Packet, State#proto_state{stats = Stats1}); + process(Packet, State#proto_state{stats_data = Stats1}); {error, Reason} -> {error, Reason, State} end. @@ -315,12 +317,12 @@ send(Msg, State = #proto_state{client_id = ClientId, username = Username}) send(emqttd_message:to_packet(Msg), State); send(Packet = ?PACKET(Type), - State = #proto_state{sendfun = SendFun, stats = Stats}) -> + State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> trace(send, Packet, State), emqttd_metrics:sent(Packet), SendFun(Packet), Stats1 = inc_stats(send, Type, Stats), - {ok, State#proto_state{stats = Stats1}}. + {ok, State#proto_state{stats_data = Stats1}}. trace(recv, Packet, ProtoState) -> ?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState); @@ -328,6 +330,9 @@ trace(recv, Packet, ProtoState) -> trace(send, Packet, ProtoState) -> ?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState). +inc_stats(_Direct, _Type, Stats = #proto_stats{enable_stats = false}) -> + Stats; + inc_stats(recv, Type, Stats) -> #proto_stats{recv_pkt = Pkt, recv_msg = Msg} = Stats, inc_stats(Type, #proto_stats.recv_pkt, Pkt, #proto_stats.recv_msg, Msg, Stats); diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 71cb4b344..c6ea5c1ad 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -14,6 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- +%% @doc MQTT WebSocket Connection. + -module(emqttd_ws_client). -behaviour(gen_server2). @@ -24,6 +26,8 @@ -include("emqttd_protocol.hrl"). +-import(proplists, [get_value/3]). + %% API Exports -export([start_link/4]). @@ -44,13 +48,14 @@ -export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]). %% WebSocket Client State --record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive, +-record(wsclient_state, {ws_pid, peername, connection, proto_state, keepalive, enable_stats}). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). --define(WSLOG(Level, Peer, Format, Args), - lager:Level("WsClient(~s): " ++ Format, [Peer | Args])). +-define(WSLOG(Level, Format, Args, State), + lager:Level("WsClient(~s): " ++ Format, + [esockd_net:format(State#wsclient_state.peername) | Args])). %% @doc Start WebSocket Client. start_link(Env, WsPid, Req, ReplyChannel) -> @@ -84,22 +89,16 @@ init([Env, WsPid, Req, ReplyChannel]) -> {ok, Peername} = Req:get(peername), Headers = mochiweb_headers:to_list( mochiweb_request:get(headers, Req)), - %% SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, - SendFun = fun(Packet) -> - Data = emqttd_serializer:serialize(Packet), - emqttd_metrics:inc('bytes/sent', iolist_size(Data)), - ReplyChannel({binary, Data}) - end, - EnableStats = proplists:get_value(client_enable_stats, Env, false), - ProtoState = emqttd_protocol:init(Peername, SendFun, + ProtoState = emqttd_protocol:init(Peername, send_fun(ReplyChannel), [{ws_initial_headers, Headers} | Env]), - IdleTimeout = proplists:get_value(client_idle_timeout, Env, 30000), + IdleTimeout = get_value(client_idle_timeout, Env, 30000), + EnableStats = get_value(client_enable_stats, Env, false), {ok, #wsclient_state{ws_pid = WsPid, - peer = Req:get(peer), + peername = Peername, connection = Req:get(connection), proto_state = ProtoState, enable_stats = EnableStats}, - IdleTimeout, {backoff, 1000, 1000, 5000}, ?MODULE}. + IdleTimeout, {backoff, 1000, 1000, 10000}, ?MODULE}. prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end. @@ -107,12 +106,12 @@ prioritise_call(Msg, _From, _Len, _State) -> prioritise_info(Msg, _Len, _State) -> case Msg of {redeliver, _} -> 5; _ -> 0 end. -handle_pre_hibernate(State = #wsclient_state{peer = Peer}) -> - io:format("WsClient(~s) will hibernate!~n", [Peer]), +handle_pre_hibernate(State) -> {hibernate, emit_stats(State)}. -handle_call(info, From, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> - Info = [{websocket, true}, {peer, Peer} | emqttd_protocol:info(ProtoState)], +handle_call(info, From, State = #wsclient_state{peername = Peername, + proto_state = ProtoState}) -> + Info = [{websocket, true}, {peername, Peername} | emqttd_protocol:info(ProtoState)], {reply, Stats, _, _} = handle_call(stats, From, State), reply(lists:append(Info, Stats), State); @@ -127,17 +126,17 @@ handle_call(kick, _From, State) -> handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> reply(emqttd_protocol:session(ProtoState), State); -handle_call(Req, _From, State = #wsclient_state{peer = Peer}) -> - ?WSLOG(error, Peer, "Unexpected request: ~p", [Req]), - reply({error, unsupported_request}, State). +handle_call(Req, _From, State) -> + ?WSLOG(error, "Unexpected request: ~p", [Req], State), + reply({error, unexpected_request}, State). -handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> +handle_cast({received, Packet}, State = #wsclient_state{proto_state = ProtoState}) -> emqttd_metrics:received(Packet), case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> {noreply, State#wsclient_state{proto_state = ProtoState1}, hibernate}; {error, Error} -> - ?WSLOG(error, Peer, "Protocol error - ~p", [Error]), + ?WSLOG(error, "Protocol error - ~p", [Error], State), shutdown(Error, State); {error, Error, ProtoState1} -> shutdown(Error, State#wsclient_state{proto_state = ProtoState1}); @@ -145,8 +144,8 @@ handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state stop(Reason, State#wsclient_state{proto_state = ProtoState1}) end; -handle_cast(Msg, State = #wsclient_state{peer = Peer}) -> - ?WSLOG(error, Peer, "Unexpected msg: ~p", [Msg]), +handle_cast(Msg, State) -> + ?WSLOG(error, "Unexpected Msg: ~p", [Msg], State), {noreply, State, hibernate}. handle_info({subscribe, TopicTable}, State) -> @@ -186,43 +185,36 @@ handle_info(emit_stats, State) -> handle_info(timeout, State) -> shutdown(idle_timeout, State); -handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{peer = Peer}) -> - ?WSLOG(warning, Peer, "clientid '~s' conflict with ~p", [ClientId, NewPid]), +handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> + ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), shutdown(conflict, State); -handle_info({keepalive, start, Interval}, State = #wsclient_state{peer = Peer, connection = Conn}) -> - ?WSLOG(debug, Peer, "Keepalive at the interval of ~p", [Interval]), - StatFun = fun() -> - case Conn:getstat([recv_oct]) of - {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; - {error, Error} -> {error, Error} - end - end, - KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), +handle_info({keepalive, start, Interval}, State = #wsclient_state{connection = Conn}) -> + ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State), + KeepAlive = emqttd_keepalive:start(stat_fun(Conn), Interval, {keepalive, check}), {noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate}; -handle_info({keepalive, check}, State = #wsclient_state{peer = Peer, - keepalive = KeepAlive}) -> +handle_info({keepalive, check}, State = #wsclient_state{keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> {noreply, emit_stats(State#wsclient_state{keepalive = KeepAlive1}), hibernate}; {error, timeout} -> - ?WSLOG(debug, Peer, "Keepalive Timeout!", []), + ?WSLOG(debug, "Keepalive Timeout!", [], State), shutdown(keepalive_timeout, State); {error, Error} -> - ?WSLOG(warning, Peer, "Keepalive error - ~p", [Error]), + ?WSLOG(warning, "Keepalive error - ~p", [Error], State), shutdown(keepalive_error, State) end; handle_info({'EXIT', WsPid, normal}, State = #wsclient_state{ws_pid = WsPid}) -> stop(normal, State); -handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{peer = Peer, ws_pid = WsPid}) -> - ?WSLOG(error, Peer, "shutdown: ~p",[Reason]), +handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) -> + ?WSLOG(error, "shutdown: ~p",[Reason], State), shutdown(Reason, State); -handle_info(Info, State = #wsclient_state{peer = Peer}) -> - ?WSLOG(error, Peer, "Unexpected Info: ~p", [Info]), +handle_info(Info, State) -> + ?WSLOG(error, "Unexpected Info: ~p", [Info], State), {noreply, State, hibernate}. terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) -> @@ -241,6 +233,21 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- +send_fun(ReplyChannel) -> + fun(Packet) -> + Data = emqttd_serializer:serialize(Packet), + emqttd_metrics:inc('bytes/sent', iolist_size(Data)), + ReplyChannel({binary, Data}) + end. + +stat_fun(Conn) -> + fun() -> + case Conn:getstat([recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; + {error, Error} -> {error, Error} + end + end. + emit_stats(State = #wsclient_state{proto_state = ProtoState}) -> emit_stats(emqttd_protocol:clientid(ProtoState), State).