Improve the 'enable_stats' design of client, session

This commit is contained in:
Feng 2017-02-22 12:10:52 +08:00
parent 700ec7aaef
commit f4c4e5635c
3 changed files with 77 additions and 65 deletions

View File

@ -14,7 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc MQTT/TCP Connection %% @doc MQTT/TCP Connection.
-module(emqttd_client). -module(emqttd_client).
@ -57,7 +57,7 @@
rate_limit, packet_size, parser, proto_state, rate_limit, packet_size, parser, proto_state,
keepalive, enable_stats}). keepalive, enable_stats}).
-define(INFO_KEYS, [peername, conn_state, await_recv]). -define(INFO_KEYS, [peername, conn_state, await_recv, enable_stats]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
@ -125,7 +125,7 @@ do_init(Conn, Env, Peername) ->
enable_stats = EnableStats}), enable_stats = EnableStats}),
IdleTimout = get_value(client_idle_timeout, Env, 30000), IdleTimout = get_value(client_idle_timeout, Env, 30000),
gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout, gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout,
{backoff, 1000, 1000, 5000}). {backoff, 1000, 1000, 10000}).
send_fun(Conn, Peername) -> send_fun(Conn, Peername) ->
Self = self(), Self = self(),

View File

@ -35,15 +35,15 @@
-export([process/2]). -export([process/2]).
-record(proto_stats, {recv_pkt = 0, recv_msg = 0, send_pkt = 0, send_msg = 0}). -record(proto_stats, {enable_stats = false, recv_pkt = 0, recv_msg = 0,
send_pkt = 0, send_msg = 0}).
%% Protocol State %% Protocol State
-record(proto_state, {peername, sendfun, connected = false, %% ws_initial_headers: Headers from first HTTP request for WebSocket Client.
client_id, client_pid, clean_sess, -record(proto_state, {peername, sendfun, connected = false, client_id, client_pid,
proto_ver, proto_name, username, is_superuser = false, clean_sess, proto_ver, proto_name, username, is_superuser,
will_msg, keepalive, max_clientid_len = ?MAX_CLIENTID_LEN, will_msg, keepalive, max_clientid_len, session, stats_data,
session, stats, ws_initial_headers, %% Headers from first HTTP request for websocket client ws_initial_headers, connected_at}).
connected_at}).
-type(proto_state() :: #proto_state{}). -type(proto_state() :: #proto_state{}).
@ -58,20 +58,22 @@
%% @doc Init protocol %% @doc Init protocol
init(Peername, SendFun, Opts) -> init(Peername, SendFun, Opts) ->
EnableStats = get_value(client_enable_stats, Opts, false),
MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
WsInitialHeaders = get_value(ws_initial_headers, Opts), WsInitialHeaders = get_value(ws_initial_headers, Opts),
#proto_state{peername = Peername, #proto_state{peername = Peername,
sendfun = SendFun, sendfun = SendFun,
max_clientid_len = MaxLen,
client_pid = self(), client_pid = self(),
stats = #proto_stats{}, max_clientid_len = MaxLen,
ws_initial_headers = WsInitialHeaders}. is_superuser = false,
ws_initial_headers = WsInitialHeaders,
stats_data = #proto_stats{enable_stats = EnableStats}}.
info(ProtoState) -> info(ProtoState) ->
?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS). ?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS).
stats(#proto_state{stats = Stats}) -> stats(#proto_state{stats_data = Stats}) ->
?record_to_proplist(proto_stats, Stats). tl(?record_to_proplist(proto_stats, Stats)).
clientid(#proto_state{client_id = ClientId}) -> clientid(#proto_state{client_id = ClientId}) ->
ClientId. ClientId.
@ -109,9 +111,9 @@ session(#proto_state{session = Session}) ->
%% A Client can only send the CONNECT Packet once over a Network Connection. %% A Client can only send the CONNECT Packet once over a Network Connection.
-spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}). -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}).
received(Packet = ?PACKET(?CONNECT), received(Packet = ?PACKET(?CONNECT),
State = #proto_state{connected = false, stats = Stats}) -> State = #proto_state{connected = false, stats_data = Stats}) ->
trace(recv, Packet, State), Stats1 = inc_stats(recv, ?CONNECT, Stats), trace(recv, Packet, State), Stats1 = inc_stats(recv, ?CONNECT, Stats),
process(Packet, State#proto_state{connected = true, stats = Stats1}); process(Packet, State#proto_state{connected = true, stats_data = Stats1});
received(?PACKET(?CONNECT), State = #proto_state{connected = true}) -> received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
{error, protocol_bad_connect, State}; {error, protocol_bad_connect, State};
@ -120,11 +122,11 @@ received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
received(_Packet, State = #proto_state{connected = false}) -> received(_Packet, State = #proto_state{connected = false}) ->
{error, protocol_not_connected, State}; {error, protocol_not_connected, State};
received(Packet = ?PACKET(Type), State = #proto_state{stats = Stats}) -> received(Packet = ?PACKET(Type), State = #proto_state{stats_data = Stats}) ->
trace(recv, Packet, State), Stats1 = inc_stats(recv, Type, Stats), trace(recv, Packet, State), Stats1 = inc_stats(recv, Type, Stats),
case validate_packet(Packet) of case validate_packet(Packet) of
ok -> ok ->
process(Packet, State#proto_state{stats = Stats1}); process(Packet, State#proto_state{stats_data = Stats1});
{error, Reason} -> {error, Reason} ->
{error, Reason, State} {error, Reason, State}
end. end.
@ -315,12 +317,12 @@ send(Msg, State = #proto_state{client_id = ClientId, username = Username})
send(emqttd_message:to_packet(Msg), State); send(emqttd_message:to_packet(Msg), State);
send(Packet = ?PACKET(Type), send(Packet = ?PACKET(Type),
State = #proto_state{sendfun = SendFun, stats = Stats}) -> State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
trace(send, Packet, State), trace(send, Packet, State),
emqttd_metrics:sent(Packet), emqttd_metrics:sent(Packet),
SendFun(Packet), SendFun(Packet),
Stats1 = inc_stats(send, Type, Stats), Stats1 = inc_stats(send, Type, Stats),
{ok, State#proto_state{stats = Stats1}}. {ok, State#proto_state{stats_data = Stats1}}.
trace(recv, Packet, ProtoState) -> trace(recv, Packet, ProtoState) ->
?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState); ?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);
@ -328,6 +330,9 @@ trace(recv, Packet, ProtoState) ->
trace(send, Packet, ProtoState) -> trace(send, Packet, ProtoState) ->
?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState). ?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState).
inc_stats(_Direct, _Type, Stats = #proto_stats{enable_stats = false}) ->
Stats;
inc_stats(recv, Type, Stats) -> inc_stats(recv, Type, Stats) ->
#proto_stats{recv_pkt = Pkt, recv_msg = Msg} = Stats, #proto_stats{recv_pkt = Pkt, recv_msg = Msg} = Stats,
inc_stats(Type, #proto_stats.recv_pkt, Pkt, #proto_stats.recv_msg, Msg, Stats); inc_stats(Type, #proto_stats.recv_pkt, Pkt, #proto_stats.recv_msg, Msg, Stats);

View File

@ -14,6 +14,8 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc MQTT WebSocket Connection.
-module(emqttd_ws_client). -module(emqttd_ws_client).
-behaviour(gen_server2). -behaviour(gen_server2).
@ -24,6 +26,8 @@
-include("emqttd_protocol.hrl"). -include("emqttd_protocol.hrl").
-import(proplists, [get_value/3]).
%% API Exports %% API Exports
-export([start_link/4]). -export([start_link/4]).
@ -44,13 +48,14 @@
-export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]). -export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]).
%% WebSocket Client State %% WebSocket Client State
-record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive, -record(wsclient_state, {ws_pid, peername, connection, proto_state, keepalive,
enable_stats}). enable_stats}).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
-define(WSLOG(Level, Peer, Format, Args), -define(WSLOG(Level, Format, Args, State),
lager:Level("WsClient(~s): " ++ Format, [Peer | Args])). lager:Level("WsClient(~s): " ++ Format,
[esockd_net:format(State#wsclient_state.peername) | Args])).
%% @doc Start WebSocket Client. %% @doc Start WebSocket Client.
start_link(Env, WsPid, Req, ReplyChannel) -> start_link(Env, WsPid, Req, ReplyChannel) ->
@ -84,22 +89,16 @@ init([Env, WsPid, Req, ReplyChannel]) ->
{ok, Peername} = Req:get(peername), {ok, Peername} = Req:get(peername),
Headers = mochiweb_headers:to_list( Headers = mochiweb_headers:to_list(
mochiweb_request:get(headers, Req)), mochiweb_request:get(headers, Req)),
%% SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, ProtoState = emqttd_protocol:init(Peername, send_fun(ReplyChannel),
SendFun = fun(Packet) ->
Data = emqttd_serializer:serialize(Packet),
emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
ReplyChannel({binary, Data})
end,
EnableStats = proplists:get_value(client_enable_stats, Env, false),
ProtoState = emqttd_protocol:init(Peername, SendFun,
[{ws_initial_headers, Headers} | Env]), [{ws_initial_headers, Headers} | Env]),
IdleTimeout = proplists:get_value(client_idle_timeout, Env, 30000), IdleTimeout = get_value(client_idle_timeout, Env, 30000),
EnableStats = get_value(client_enable_stats, Env, false),
{ok, #wsclient_state{ws_pid = WsPid, {ok, #wsclient_state{ws_pid = WsPid,
peer = Req:get(peer), peername = Peername,
connection = Req:get(connection), connection = Req:get(connection),
proto_state = ProtoState, proto_state = ProtoState,
enable_stats = EnableStats}, enable_stats = EnableStats},
IdleTimeout, {backoff, 1000, 1000, 5000}, ?MODULE}. IdleTimeout, {backoff, 1000, 1000, 10000}, ?MODULE}.
prioritise_call(Msg, _From, _Len, _State) -> prioritise_call(Msg, _From, _Len, _State) ->
case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end. case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end.
@ -107,12 +106,12 @@ prioritise_call(Msg, _From, _Len, _State) ->
prioritise_info(Msg, _Len, _State) -> prioritise_info(Msg, _Len, _State) ->
case Msg of {redeliver, _} -> 5; _ -> 0 end. case Msg of {redeliver, _} -> 5; _ -> 0 end.
handle_pre_hibernate(State = #wsclient_state{peer = Peer}) -> handle_pre_hibernate(State) ->
io:format("WsClient(~s) will hibernate!~n", [Peer]),
{hibernate, emit_stats(State)}. {hibernate, emit_stats(State)}.
handle_call(info, From, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> handle_call(info, From, State = #wsclient_state{peername = Peername,
Info = [{websocket, true}, {peer, Peer} | emqttd_protocol:info(ProtoState)], proto_state = ProtoState}) ->
Info = [{websocket, true}, {peername, Peername} | emqttd_protocol:info(ProtoState)],
{reply, Stats, _, _} = handle_call(stats, From, State), {reply, Stats, _, _} = handle_call(stats, From, State),
reply(lists:append(Info, Stats), State); reply(lists:append(Info, Stats), State);
@ -127,17 +126,17 @@ handle_call(kick, _From, State) ->
handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
reply(emqttd_protocol:session(ProtoState), State); reply(emqttd_protocol:session(ProtoState), State);
handle_call(Req, _From, State = #wsclient_state{peer = Peer}) -> handle_call(Req, _From, State) ->
?WSLOG(error, Peer, "Unexpected request: ~p", [Req]), ?WSLOG(error, "Unexpected request: ~p", [Req], State),
reply({error, unsupported_request}, State). reply({error, unexpected_request}, State).
handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> handle_cast({received, Packet}, State = #wsclient_state{proto_state = ProtoState}) ->
emqttd_metrics:received(Packet), emqttd_metrics:received(Packet),
case emqttd_protocol:received(Packet, ProtoState) of case emqttd_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} -> {ok, ProtoState1} ->
{noreply, State#wsclient_state{proto_state = ProtoState1}, hibernate}; {noreply, State#wsclient_state{proto_state = ProtoState1}, hibernate};
{error, Error} -> {error, Error} ->
?WSLOG(error, Peer, "Protocol error - ~p", [Error]), ?WSLOG(error, "Protocol error - ~p", [Error], State),
shutdown(Error, State); shutdown(Error, State);
{error, Error, ProtoState1} -> {error, Error, ProtoState1} ->
shutdown(Error, State#wsclient_state{proto_state = ProtoState1}); shutdown(Error, State#wsclient_state{proto_state = ProtoState1});
@ -145,8 +144,8 @@ handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state
stop(Reason, State#wsclient_state{proto_state = ProtoState1}) stop(Reason, State#wsclient_state{proto_state = ProtoState1})
end; end;
handle_cast(Msg, State = #wsclient_state{peer = Peer}) -> handle_cast(Msg, State) ->
?WSLOG(error, Peer, "Unexpected msg: ~p", [Msg]), ?WSLOG(error, "Unexpected Msg: ~p", [Msg], State),
{noreply, State, hibernate}. {noreply, State, hibernate}.
handle_info({subscribe, TopicTable}, State) -> handle_info({subscribe, TopicTable}, State) ->
@ -186,43 +185,36 @@ handle_info(emit_stats, State) ->
handle_info(timeout, State) -> handle_info(timeout, State) ->
shutdown(idle_timeout, State); shutdown(idle_timeout, State);
handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{peer = Peer}) -> handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
?WSLOG(warning, Peer, "clientid '~s' conflict with ~p", [ClientId, NewPid]), ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
shutdown(conflict, State); shutdown(conflict, State);
handle_info({keepalive, start, Interval}, State = #wsclient_state{peer = Peer, connection = Conn}) -> handle_info({keepalive, start, Interval}, State = #wsclient_state{connection = Conn}) ->
?WSLOG(debug, Peer, "Keepalive at the interval of ~p", [Interval]), ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
StatFun = fun() -> KeepAlive = emqttd_keepalive:start(stat_fun(Conn), Interval, {keepalive, check}),
case Conn:getstat([recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
{error, Error} -> {error, Error}
end
end,
KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
{noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate}; {noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate};
handle_info({keepalive, check}, State = #wsclient_state{peer = Peer, handle_info({keepalive, check}, State = #wsclient_state{keepalive = KeepAlive}) ->
keepalive = KeepAlive}) ->
case emqttd_keepalive:check(KeepAlive) of case emqttd_keepalive:check(KeepAlive) of
{ok, KeepAlive1} -> {ok, KeepAlive1} ->
{noreply, emit_stats(State#wsclient_state{keepalive = KeepAlive1}), hibernate}; {noreply, emit_stats(State#wsclient_state{keepalive = KeepAlive1}), hibernate};
{error, timeout} -> {error, timeout} ->
?WSLOG(debug, Peer, "Keepalive Timeout!", []), ?WSLOG(debug, "Keepalive Timeout!", [], State),
shutdown(keepalive_timeout, State); shutdown(keepalive_timeout, State);
{error, Error} -> {error, Error} ->
?WSLOG(warning, Peer, "Keepalive error - ~p", [Error]), ?WSLOG(warning, "Keepalive error - ~p", [Error], State),
shutdown(keepalive_error, State) shutdown(keepalive_error, State)
end; end;
handle_info({'EXIT', WsPid, normal}, State = #wsclient_state{ws_pid = WsPid}) -> handle_info({'EXIT', WsPid, normal}, State = #wsclient_state{ws_pid = WsPid}) ->
stop(normal, State); stop(normal, State);
handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{peer = Peer, ws_pid = WsPid}) -> handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) ->
?WSLOG(error, Peer, "shutdown: ~p",[Reason]), ?WSLOG(error, "shutdown: ~p",[Reason], State),
shutdown(Reason, State); shutdown(Reason, State);
handle_info(Info, State = #wsclient_state{peer = Peer}) -> handle_info(Info, State) ->
?WSLOG(error, Peer, "Unexpected Info: ~p", [Info]), ?WSLOG(error, "Unexpected Info: ~p", [Info], State),
{noreply, State, hibernate}. {noreply, State, hibernate}.
terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) -> terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) ->
@ -241,6 +233,21 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
send_fun(ReplyChannel) ->
fun(Packet) ->
Data = emqttd_serializer:serialize(Packet),
emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
ReplyChannel({binary, Data})
end.
stat_fun(Conn) ->
fun() ->
case Conn:getstat([recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
{error, Error} -> {error, Error}
end
end.
emit_stats(State = #wsclient_state{proto_state = ProtoState}) -> emit_stats(State = #wsclient_state{proto_state = ProtoState}) ->
emit_stats(emqttd_protocol:clientid(ProtoState), State). emit_stats(emqttd_protocol:clientid(ProtoState), State).