Use gen_server2 to reduce the CPU/memory usage

This commit is contained in:
Feng Lee 2017-02-21 18:45:40 +08:00
parent b8fb73576e
commit 3cdf2377c8
5 changed files with 174 additions and 163 deletions

View File

@ -18,7 +18,7 @@
-module(emqttd_client). -module(emqttd_client).
-behaviour(gen_server). -behaviour(gen_server2).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
@ -48,11 +48,13 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]). code_change/3, terminate/2]).
%% gen_server2 Callbacks
-export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]).
%% Client State %% Client State
-record(client_state, {connection, connname, peername, peerhost, peerport, -record(client_state, {connection, connname, peername, peerhost, peerport,
await_recv, conn_state, rate_limit, parser_fun, await_recv, conn_state, rate_limit, parser_fun,
proto_state, packet_opts, keepalive, enable_stats, proto_state, packet_opts, keepalive, enable_stats}).
stats_timer}).
-define(INFO_KEYS, [connname, peername, peerhost, peerport, await_recv, conn_state]). -define(INFO_KEYS, [connname, peername, peerhost, peerport, await_recv, conn_state]).
@ -65,19 +67,19 @@ start_link(Conn, Env) ->
{ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}. {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}.
info(CPid) -> info(CPid) ->
gen_server:call(CPid, info). gen_server2:call(CPid, info).
stats(CPid) -> stats(CPid) ->
gen_server:call(CPid, stats). gen_server2:call(CPid, stats).
kick(CPid) -> kick(CPid) ->
gen_server:call(CPid, kick). gen_server2:call(CPid, kick).
set_rate_limit(Cpid, Rl) -> set_rate_limit(Cpid, Rl) ->
gen_server:call(Cpid, {set_rate_limit, Rl}). gen_server2:call(Cpid, {set_rate_limit, Rl}).
get_rate_limit(Cpid) -> get_rate_limit(Cpid) ->
gen_server:call(Cpid, get_rate_limit). gen_server2:call(Cpid, get_rate_limit).
subscribe(CPid, TopicTable) -> subscribe(CPid, TopicTable) ->
CPid ! {subscribe, TopicTable}. CPid ! {subscribe, TopicTable}.
@ -135,30 +137,41 @@ init([Conn0, Env]) ->
packet_opts = Env, packet_opts = Env,
enable_stats = EnableStats}), enable_stats = EnableStats}),
IdleTimout = get_value(client_idle_timeout, Env, 30000), IdleTimout = get_value(client_idle_timeout, Env, 30000),
gen_server:enter_loop(?MODULE, [], maybe_enable_stats(State), IdleTimout). gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout,
{backoff, 1000, 1000, 5000}).
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end.
prioritise_info(Msg, _Len, _State) ->
case Msg of {redeliver, _} -> 5; _ -> 0 end.
handle_pre_hibernate(State = #client_state{connname = Connname}) ->
io:format("Client(~s) will hibernate!~n", [Connname]),
{hibernate, emit_stats(State)}.
handle_call(info, From, State = #client_state{proto_state = ProtoState}) -> handle_call(info, From, State = #client_state{proto_state = ProtoState}) ->
ProtoInfo = emqttd_protocol:info(ProtoState), ProtoInfo = emqttd_protocol:info(ProtoState),
ClientInfo = ?record_to_proplist(client_state, State, ?INFO_KEYS), ClientInfo = ?record_to_proplist(client_state, State, ?INFO_KEYS),
{reply, Stats, _} = handle_call(stats, From, State), {reply, Stats, _, _} = handle_call(stats, From, State),
{reply, lists:append([ClientInfo, ProtoInfo, Stats]), State}; reply(lists:append([ClientInfo, ProtoInfo, Stats]), State);
handle_call(stats, _From, State = #client_state{proto_state = ProtoState}) -> handle_call(stats, _From, State = #client_state{proto_state = ProtoState}) ->
{reply, lists:append([emqttd_misc:proc_stats(), reply(lists:append([emqttd_misc:proc_stats(),
emqttd_protocol:stats(ProtoState), emqttd_protocol:stats(ProtoState),
sock_stats(State)]), State}; sock_stats(State)]), State);
handle_call(kick, _From, State) -> handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State}; {stop, {shutdown, kick}, ok, State};
handle_call({set_rate_limit, Rl}, _From, State) -> handle_call({set_rate_limit, Rl}, _From, State) ->
{reply, ok, State#client_state{rate_limit = Rl}}; reply(ok, State#client_state{rate_limit = Rl});
handle_call(get_rate_limit, _From, State = #client_state{rate_limit = Rl}) -> handle_call(get_rate_limit, _From, State = #client_state{rate_limit = Rl}) ->
{reply, Rl, State}; reply(Rl, State);
handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> handle_call(session, _From, State = #client_state{proto_state = ProtoState}) ->
{reply, emqttd_protocol:session(ProtoState), State}; reply(emqttd_protocol:session(ProtoState), State);
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State). ?UNEXPECTED_REQ(Req, State).
@ -198,12 +211,12 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
emqttd_protocol:pubrel(PacketId, ProtoState) emqttd_protocol:pubrel(PacketId, ProtoState)
end, State); end, State);
handle_info(emit_stats, State) ->
{noreply, emit_stats(State), hibernate};
handle_info(timeout, State) -> handle_info(timeout, State) ->
shutdown(idle_timeout, State); shutdown(idle_timeout, State);
handle_info({timeout, _Timer, emit_stats}, State) ->
hibernate(maybe_enable_stats(emit_stats(State)));
%% Fix issue #535 %% Fix issue #535
handle_info({shutdown, Error}, State) -> handle_info({shutdown, Error}, State) ->
shutdown(Error, State); shutdown(Error, State);
@ -213,7 +226,7 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
shutdown(conflict, State); shutdown(conflict, State);
handle_info(activate_sock, State) -> handle_info(activate_sock, State) ->
hibernate(run_socket(State#client_state{conn_state = running})); {noreply, run_socket(State#client_state{conn_state = running}), hibernate};
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
Size = iolist_size(Data), Size = iolist_size(Data),
@ -239,12 +252,12 @@ handle_info({keepalive, start, Interval}, State = #client_state{connection = Con
end end
end, end,
KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
{noreply, stats_by_keepalive(State#client_state{keepalive = KeepAlive})}; {noreply, State#client_state{keepalive = KeepAlive}, hibernate};
handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) -> handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
case emqttd_keepalive:check(KeepAlive) of case emqttd_keepalive:check(KeepAlive) of
{ok, KeepAlive1} -> {ok, KeepAlive1} ->
hibernate(emit_stats(State#client_state{keepalive = KeepAlive1})); {noreply, State#client_state{keepalive = KeepAlive1}, hibernate};
{error, timeout} -> {error, timeout} ->
?LOG(debug, "Keepalive timeout", [], State), ?LOG(debug, "Keepalive timeout", [], State),
shutdown(keepalive_timeout, State); shutdown(keepalive_timeout, State);
@ -279,7 +292,7 @@ code_change(_OldVsn, State, _Extra) ->
%% Receive and parse tcp data %% Receive and parse tcp data
received(<<>>, State) -> received(<<>>, State) ->
hibernate(State); {noreply, State, hibernate};
received(Bytes, State = #client_state{parser_fun = ParserFun, received(Bytes, State = #client_state{parser_fun = ParserFun,
packet_opts = PacketOpts, packet_opts = PacketOpts,
@ -332,33 +345,25 @@ run_socket(State = #client_state{connection = Conn}) ->
with_proto(Fun, State = #client_state{proto_state = ProtoState}) -> with_proto(Fun, State = #client_state{proto_state = ProtoState}) ->
{ok, ProtoState1} = Fun(ProtoState), {ok, ProtoState1} = Fun(ProtoState),
{noreply, State#client_state{proto_state = ProtoState1}}. {noreply, State#client_state{proto_state = ProtoState1}, hibernate}.
maybe_enable_stats(State = #client_state{enable_stats = false}) ->
State;
maybe_enable_stats(State = #client_state{enable_stats = keepalive}) ->
State;
maybe_enable_stats(State = #client_state{enable_stats = Interval}) ->
State#client_state{stats_timer = emqttd_misc:start_timer(Interval, self(), emit_stats)}.
stats_by_keepalive(State) ->
State#client_state{enable_stats = keepalive}.
emit_stats(State = #client_state{enable_stats = false}) ->
State;
emit_stats(State = #client_state{proto_state = ProtoState}) -> emit_stats(State = #client_state{proto_state = ProtoState}) ->
{reply, Stats, _} = handle_call(stats, undefined, State), emit_stats(emqttd_protocol:clientid(ProtoState), State).
emqttd_stats:set_client_stats(emqttd_protocol:clientid(ProtoState), Stats),
emit_stats(_ClientId, State = #client_state{enable_stats = false}) ->
State;
emit_stats(undefined, State) ->
State;
emit_stats(ClientId, State) ->
{reply, Stats, _, _} = handle_call(stats, undefined, State),
emqttd_stats:set_client_stats(ClientId, Stats),
State. State.
sock_stats(#client_state{connection = Conn}) -> sock_stats(#client_state{connection = Conn}) ->
case Conn:getstat(?SOCK_STATS) of case Conn:getstat(?SOCK_STATS) of {ok, Ss} -> Ss; {error, _} -> [] end.
{ok, Ss} -> Ss;
{error, _} -> []
end.
hibernate(State) -> reply(Reply, State) ->
{noreply, State, hibernate}. {reply, Reply, State, hibernate}.
shutdown(Reason, State) -> shutdown(Reason, State) ->
stop({shutdown, Reason}, State). stop({shutdown, Reason}, State).

View File

@ -19,7 +19,7 @@
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1, -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1,
proc_stats/0, proc_stats/1]). proc_stats/0, proc_stats/1, inc_stats/1]).
%% @doc Merge Options %% @doc Merge Options
merge_opts(Defaults, Options) -> merge_opts(Defaults, Options) ->
@ -61,3 +61,5 @@ proc_stats(Pid) ->
{value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats), {value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats),
[{mailbox_len, V} | Stats1]. [{mailbox_len, V} | Stats1].
inc_stats(Key) -> put(Key, get(Key) + 1).

View File

@ -187,6 +187,8 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
emqttd_cm:reg(client(State2)), emqttd_cm:reg(client(State2)),
%% Start keepalive %% Start keepalive
start_keepalive(KeepAlive), start_keepalive(KeepAlive),
%% Emit Stats
self() ! emit_stats,
%% ACCEPT %% ACCEPT
{?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}}; {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}};
{error, Error} -> {error, Error} ->

View File

@ -74,7 +74,8 @@
terminate/2, code_change/3]). terminate/2, code_change/3]).
%% gen_server2 Message Priorities %% gen_server2 Message Priorities
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3,
handle_pre_hibernate/1]).
-record(state, -record(state,
{ {
@ -114,7 +115,7 @@
max_inflight = 32 :: non_neg_integer(), max_inflight = 32 :: non_neg_integer(),
%% Retry interval for redelivering QoS1/2 messages %% Retry interval for redelivering QoS1/2 messages
retry_interval = 20000 :: pos_integer(), retry_interval = 20000 :: timeout(),
%% Retry Timer %% Retry Timer
retry_timer :: reference(), retry_timer :: reference(),
@ -129,7 +130,7 @@
awaiting_rel :: map(), awaiting_rel :: map(),
%% Awaiting PUBREL timeout %% Awaiting PUBREL timeout
await_rel_timeout = 20000 :: pos_integer(), await_rel_timeout = 20000 :: timeout(),
%% Max Packets that Awaiting PUBREL %% Max Packets that Awaiting PUBREL
max_awaiting_rel = 100 :: non_neg_integer(), max_awaiting_rel = 100 :: non_neg_integer(),
@ -138,16 +139,13 @@
await_rel_timer :: reference(), await_rel_timer :: reference(),
%% Session Expiry Interval %% Session Expiry Interval
expiry_interval = 7200000 :: pos_integer(), expiry_interval = 7200000 :: timeout(),
%% Expired Timer %% Expired Timer
expiry_timer :: reference(), expiry_timer :: reference(),
%% Enable Stats %% Enable Stats
enable_stats :: false | pos_integer(), enable_stats :: boolean(),
%% Stats Timer
stats_timer :: reference(),
created_at :: erlang:timestamp() created_at :: erlang:timestamp()
}). }).
@ -301,10 +299,9 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
expiry_interval = get_value(expiry_interval, Env), expiry_interval = get_value(expiry_interval, Env),
enable_stats = EnableStats, enable_stats = EnableStats,
created_at = os:timestamp()}, created_at = os:timestamp()},
emqttd_stats:set_session_stats(ClientId, stats(State)),
emqttd_sm:register_session(ClientId, CleanSess, info(State)), emqttd_sm:register_session(ClientId, CleanSess, info(State)),
emqttd_hooks:run('session.created', [ClientId, Username]), emqttd_hooks:run('session.created', [ClientId, Username]),
{ok, State, hibernate, {backoff, 1000, 1000, 5000}, ?MODULE}. {ok, emit_stats(State), hibernate, {backoff, 1000, 1000, 10000}, ?MODULE}.
init_stats(Keys) -> init_stats(Keys) ->
lists:foreach(fun(K) -> put(K, 0) end, Keys). lists:foreach(fun(K) -> put(K, 0) end, Keys).
@ -336,10 +333,13 @@ prioritise_info(Msg, _Len, _State) ->
_ -> 0 _ -> 0
end. end.
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PacketId}}, handle_pre_hibernate(State) ->
_From, State = #state{awaiting_rel = AwaitingRel, {hibernate, emit_stats(State)}.
await_rel_timer = Timer,
await_rel_timeout = Timeout}) -> handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PacketId}}, _From,
State = #state{awaiting_rel = AwaitingRel,
await_rel_timer = Timer,
await_rel_timeout = Timeout}) ->
case is_awaiting_full(State) of case is_awaiting_full(State) of
false -> false ->
State1 = case Timer == undefined of State1 = case Timer == undefined of
@ -413,51 +413,55 @@ handle_cast({unsubscribe, _From, TopicTable},
%% PUBACK: %% PUBACK:
handle_cast({puback, PacketId}, State = #state{inflight = Inflight}) -> handle_cast({puback, PacketId}, State = #state{inflight = Inflight}) ->
case Inflight:contain(PacketId) of {noreply,
true -> case Inflight:contain(PacketId) of
noreply(dequeue(acked(puback, PacketId, State))); true ->
false -> dequeue(acked(puback, PacketId, State));
?LOG(warning, "The PUBACK ~p is not inflight: ~p", false ->
[PacketId, Inflight:window()], State), ?LOG(warning, "PUBACK ~p missed inflight: ~p",
emqttd_metrics:inc('packets/puback/missed'), [PacketId, Inflight:window()], State),
noreply(State) emqttd_metrics:inc('packets/puback/missed'),
end; State
end, hibernate};
%% PUBREC: %% PUBREC:
handle_cast({pubrec, PacketId}, State = #state{inflight = Inflight}) -> handle_cast({pubrec, PacketId}, State = #state{inflight = Inflight}) ->
case Inflight:contain(PacketId) of {noreply,
true -> case Inflight:contain(PacketId) of
noreply(acked(pubrec, PacketId, State)); true ->
false -> acked(pubrec, PacketId, State);
?LOG(warning, "The PUBREC ~p is not inflight: ~p", false ->
[PacketId, Inflight:window()], State), ?LOG(warning, "PUBREC ~p missed inflight: ~p",
emqttd_metrics:inc('packets/pubrec/missed'), [PacketId, Inflight:window()], State),
noreply(State) emqttd_metrics:inc('packets/pubrec/missed'),
end; State
end, hibernate};
%% PUBREL: %% PUBREL:
handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) -> handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) ->
case maps:take(PacketId, AwaitingRel) of {noreply,
{Msg, AwaitingRel1} -> case maps:take(PacketId, AwaitingRel) of
spawn(emqttd_server, publish, [Msg]),%%:) {Msg, AwaitingRel1} ->
noreply(State#state{awaiting_rel = AwaitingRel1}); spawn(emqttd_server, publish, [Msg]), %%:)
error -> State#state{awaiting_rel = AwaitingRel1};
?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State), error ->
emqttd_metrics:inc('packets/pubrel/missed'), ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State),
noreply(State) emqttd_metrics:inc('packets/pubrel/missed'),
end; State
end, hibernate};
%% PUBCOMP: %% PUBCOMP:
handle_cast({pubcomp, PacketId}, State = #state{inflight = Inflight}) -> handle_cast({pubcomp, PacketId}, State = #state{inflight = Inflight}) ->
case Inflight:contain(PacketId) of {noreply,
true -> case Inflight:contain(PacketId) of
noreply(dequeue(acked(pubcomp, PacketId, State))); true ->
false -> dequeue(acked(pubcomp, PacketId, State));
?LOG(warning, "The PUBCOMP ~p is not inflight: ~p", false ->
[PacketId, Inflight:window()], State), ?LOG(warning, "The PUBCOMP ~p is not inflight: ~p",
emqttd_metrics:inc('packets/pubcomp/missed'), [PacketId, Inflight:window()], State),
noreply(State) emqttd_metrics:inc('packets/pubcomp/missed'),
end; State
end, hibernate};
%% RESUME: %% RESUME:
handle_cast({resume, ClientId, ClientPid}, handle_cast({resume, ClientId, ClientPid},
@ -466,14 +470,13 @@ handle_cast({resume, ClientId, ClientPid},
clean_sess = CleanSess, clean_sess = CleanSess,
retry_timer = RetryTimer, retry_timer = RetryTimer,
await_rel_timer = AwaitTimer, await_rel_timer = AwaitTimer,
stats_timer = StatsTimer,
expiry_timer = ExpireTimer}) -> expiry_timer = ExpireTimer}) ->
?LOG(info, "Resumed by ~p", [ClientPid], State), ?LOG(info, "Resumed by ~p", [ClientPid], State),
%% Cancel Timers %% Cancel Timers
lists:foreach(fun emqttd_misc:cancel_timer/1, lists:foreach(fun emqttd_misc:cancel_timer/1,
[RetryTimer, AwaitTimer, StatsTimer, ExpireTimer]), [RetryTimer, AwaitTimer, ExpireTimer]),
case kick(ClientId, OldClientPid, ClientPid) of case kick(ClientId, OldClientPid, ClientPid) of
ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], State); ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], State);
@ -503,13 +506,13 @@ handle_cast({resume, ClientId, ClientPid},
%% Replay delivery and Dequeue pending messages %% Replay delivery and Dequeue pending messages
hibernate(emit_stats(dequeue(retry_delivery(true, State1)))); hibernate(emit_stats(dequeue(retry_delivery(true, State1))));
handle_cast({destroy, ClientId}, State = #state{client_id = ClientId, handle_cast({destroy, ClientId},
client_pid = undefined}) -> State = #state{client_id = ClientId, client_pid = undefined}) ->
?LOG(warning, "Destroyed", [], State), ?LOG(warning, "Destroyed", [], State),
shutdown(destroy, State); shutdown(destroy, State);
handle_cast({destroy, ClientId}, State = #state{client_id = ClientId, handle_cast({destroy, ClientId},
client_pid = OldClientPid}) -> State = #state{client_id = ClientId, client_pid = OldClientPid}) ->
?LOG(warning, "kickout ~p", [OldClientPid], State), ?LOG(warning, "kickout ~p", [OldClientPid], State),
shutdown(conflict, State); shutdown(conflict, State);
@ -518,27 +521,25 @@ handle_cast(Msg, State) ->
%% Dispatch Message %% Dispatch Message
handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) -> handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) ->
noreply(dispatch(tune_qos(Topic, Msg, State), State)); {noreply, dispatch(tune_qos(Topic, Msg, State), State), hibernate};
%% Do nothing if the client has been disconnected. %% Do nothing if the client has been disconnected.
handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) -> handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) ->
hibernate(emit_stats(State#state{retry_timer = undefined})); hibernate(emit_stats(State#state{retry_timer = undefined}));
handle_info({timeout, _Timer, retry_delivery}, State) -> handle_info({timeout, _Timer, retry_delivery}, State) ->
noreply(emit_stats(retry_delivery(false, State#state{retry_timer = undefined}))); hibernate(emit_stats(retry_delivery(false, State#state{retry_timer = undefined})));
handle_info({timeout, _Timer, check_awaiting_rel}, State) -> handle_info({timeout, _Timer, check_awaiting_rel}, State) ->
noreply(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined}))); hibernate(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined})));
handle_info({timeout, _Timer, emit_stats}, State) ->
hibernate(maybe_enable_stats(emit_stats(State)));
handle_info({timeout, _Timer, expired}, State) -> handle_info({timeout, _Timer, expired}, State) ->
?LOG(info, "Expired, shutdown now.", [], State), ?LOG(info, "Expired, shutdown now.", [], State),
shutdown(expired, State); shutdown(expired, State);
handle_info({'EXIT', ClientPid, _Reason}, handle_info({'EXIT', ClientPid, _Reason},
State = #state{clean_sess = true, client_pid = ClientPid}) -> State = #state{clean_sess = true,
client_pid = ClientPid}) ->
{stop, normal, State}; {stop, normal, State};
handle_info({'EXIT', ClientPid, Reason}, handle_info({'EXIT', ClientPid, Reason},
@ -548,7 +549,7 @@ handle_info({'EXIT', ClientPid, Reason},
?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State), ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State),
ExpireTimer = start_timer(Interval, expired), ExpireTimer = start_timer(Interval, expired),
State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer}, State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer},
hibernate(maybe_enable_stats(emit_stats(State1))); hibernate(emit_stats(State1));
handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) -> handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) ->
%%ignore %%ignore
@ -690,7 +691,8 @@ dispatch(Msg = #mqtt_message{qos = QoS},
end. end.
enqueue_msg(Msg, State = #state{mqueue = Q}) -> enqueue_msg(Msg, State = #state{mqueue = Q}) ->
inc(enqueue_msg), State#state{mqueue = emqttd_mqueue:in(Msg, Q)}. emqttd_misc:inc_stats(enqueue_msg),
State#state{mqueue = emqttd_mqueue:in(Msg, Q)}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Deliver %% Deliver
@ -700,7 +702,8 @@ redeliver(Msg = #mqtt_message{qos = QoS}, State) ->
deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State). deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State).
deliver(Msg, #state{client_pid = Pid}) -> deliver(Msg, #state{client_pid = Pid}) ->
inc(deliver_msg), Pid ! {deliver, Msg}. emqttd_misc:inc_stats(deliver_msg),
Pid ! {deliver, Msg}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Awaiting ACK for QoS1/QoS2 Messages %% Awaiting ACK for QoS1/QoS2 Messages
@ -785,14 +788,6 @@ next_msg_id(State = #state{next_msg_id = Id}) ->
%% Emit session stats %% Emit session stats
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
maybe_enable_stats(State = #state{enable_stats = false}) ->
State;
maybe_enable_stats(State = #state{client_pid = Pid}) when is_pid(Pid) ->
State;
maybe_enable_stats(State = #state{enable_stats = Interval}) ->
StatsTimer = start_timer(Interval, emit_stats),
State#state{stats_timer = StatsTimer}.
emit_stats(State = #state{enable_stats = false}) -> emit_stats(State = #state{enable_stats = false}) ->
State; State;
emit_stats(State = #state{client_id = ClientId}) -> emit_stats(State = #state{client_id = ClientId}) ->
@ -803,13 +798,8 @@ emit_stats(State = #state{client_id = ClientId}) ->
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
inc(Key) -> put(Key, get(Key) + 1).
reply(Reply, State) -> reply(Reply, State) ->
{reply, Reply, State}. {reply, Reply, State, hibernate}.
noreply(State) ->
{noreply, State}.
hibernate(State) -> hibernate(State) ->
{noreply, State, hibernate}. {noreply, State, hibernate}.

View File

@ -16,7 +16,7 @@
-module(emqttd_ws_client). -module(emqttd_ws_client).
-behaviour(gen_server). -behaviour(gen_server2).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
@ -40,9 +40,12 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
%% gen_server2 Callbacks
-export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]).
%% WebSocket Client State %% WebSocket Client State
-record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive, -record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive,
enable_stats, stats_timer}). enable_stats}).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
@ -54,13 +57,13 @@ start_link(Env, WsPid, Req, ReplyChannel) ->
gen_server:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel], []). gen_server:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel], []).
info(CPid) -> info(CPid) ->
gen_server:call(CPid, info). gen_server2:call(CPid, info).
stats(CPid) -> stats(CPid) ->
gen_server:call(CPid, stats). gen_server2:call(CPid, stats).
kick(CPid) -> kick(CPid) ->
gen_server:call(CPid, kick). gen_server2:call(CPid, kick).
subscribe(CPid, TopicTable) -> subscribe(CPid, TopicTable) ->
CPid ! {subscribe, TopicTable}. CPid ! {subscribe, TopicTable}.
@ -69,7 +72,7 @@ unsubscribe(CPid, Topics) ->
CPid ! {unsubscribe, Topics}. CPid ! {unsubscribe, Topics}.
session(CPid) -> session(CPid) ->
gen_server:call(CPid, session). gen_server2:call(CPid, session).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server Callbacks %% gen_server Callbacks
@ -90,28 +93,39 @@ init([Env, WsPid, Req, ReplyChannel]) ->
EnableStats = proplists:get_value(client_enable_stats, Env, false), EnableStats = proplists:get_value(client_enable_stats, Env, false),
ProtoState = emqttd_protocol:init(Peername, SendFun, ProtoState = emqttd_protocol:init(Peername, SendFun,
[{ws_initial_headers, Headers} | Env]), [{ws_initial_headers, Headers} | Env]),
{ok, maybe_enable_stats(#wsclient_state{ws_pid = WsPid, IdleTimeout = proplists:get_value(client_idle_timeout, Env, 30000),
peer = Req:get(peer), {ok, #wsclient_state{ws_pid = WsPid,
connection = Req:get(connection), peer = Req:get(peer),
proto_state = ProtoState, connection = Req:get(connection),
enable_stats = EnableStats}), proto_state = ProtoState,
proplists:get_value(client_idle_timeout, Env, 30000)}. enable_stats = EnableStats},
IdleTimeout, {backoff, 1000, 1000, 5000}, ?MODULE}.
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end.
prioritise_info(Msg, _Len, _State) ->
case Msg of {redeliver, _} -> 5; _ -> 0 end.
handle_pre_hibernate(State = #wsclient_state{peer = Peer}) ->
io:format("WsClient(~s) will hibernate!~n", [Peer]),
{hibernate, emit_stats(State)}.
handle_call(info, From, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> handle_call(info, From, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
Info = [{websocket, true}, {peer, Peer} | emqttd_protocol:info(ProtoState)], Info = [{websocket, true}, {peer, Peer} | emqttd_protocol:info(ProtoState)],
{reply, Stats, _} = handle_call(stats, From, State), {reply, Stats, _, _} = handle_call(stats, From, State),
{reply, lists:append(Info, Stats), State}; reply(lists:append(Info, Stats), State);
handle_call(stats, _From, State = #wsclient_state{proto_state = ProtoState}) -> handle_call(stats, _From, State = #wsclient_state{proto_state = ProtoState}) ->
{reply, lists:append([emqttd_misc:proc_stats(), reply(lists:append([emqttd_misc:proc_stats(),
wsock_stats(State), wsock_stats(State),
emqttd_protocol:stats(ProtoState)]), State}; emqttd_protocol:stats(ProtoState)]), State);
handle_call(kick, _From, State) -> handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State}; {stop, {shutdown, kick}, ok, State};
handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
{reply, emqttd_protocol:session(ProtoState), State}; reply(emqttd_protocol:session(ProtoState), State);
handle_call(Req, _From, State = #wsclient_state{peer = Peer}) -> handle_call(Req, _From, State = #wsclient_state{peer = Peer}) ->
?WSLOG(error, Peer, "Unexpected request: ~p", [Req]), ?WSLOG(error, Peer, "Unexpected request: ~p", [Req]),
@ -166,8 +180,8 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
emqttd_protocol:pubrel(PacketId, ProtoState) emqttd_protocol:pubrel(PacketId, ProtoState)
end, State); end, State);
handle_info({timeout, _Timer, emit_stats}, State) -> handle_info(emit_stats, State) ->
{noreply, maybe_enable_stats(emit_stats(State)), hibernate}; {noreply, emit_stats(State), hibernate};
handle_info(timeout, State) -> handle_info(timeout, State) ->
shutdown(idle_timeout, State); shutdown(idle_timeout, State);
@ -185,7 +199,7 @@ handle_info({keepalive, start, Interval}, State = #wsclient_state{peer = Peer, c
end end
end, end,
KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
{noreply, stats_by_keepalive(State#wsclient_state{keepalive = KeepAlive})}; {noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate};
handle_info({keepalive, check}, State = #wsclient_state{peer = Peer, handle_info({keepalive, check}, State = #wsclient_state{peer = Peer,
keepalive = KeepAlive}) -> keepalive = KeepAlive}) ->
@ -209,7 +223,7 @@ handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{peer = Peer, ws_pid
handle_info(Info, State = #wsclient_state{peer = Peer}) -> handle_info(Info, State = #wsclient_state{peer = Peer}) ->
?WSLOG(error, Peer, "Unexpected Info: ~p", [Info]), ?WSLOG(error, Peer, "Unexpected Info: ~p", [Info]),
{noreply, State}. {noreply, State, hibernate}.
terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) -> terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) ->
emqttd_keepalive:cancel(KeepAlive), emqttd_keepalive:cancel(KeepAlive),
@ -227,21 +241,16 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
maybe_enable_stats(State = #wsclient_state{enable_stats = false}) ->
State;
maybe_enable_stats(State = #wsclient_state{enable_stats = keepalive}) ->
State;
maybe_enable_stats(State = #wsclient_state{enable_stats = Interval}) ->
State#wsclient_state{stats_timer = emqttd_misc:start_timer(Interval, self(), emit_stats)}.
stats_by_keepalive(State) ->
State#wsclient_state{enable_stats = keepalive}.
emit_stats(State = #wsclient_state{enable_stats = false}) ->
State;
emit_stats(State = #wsclient_state{proto_state = ProtoState}) -> emit_stats(State = #wsclient_state{proto_state = ProtoState}) ->
{reply, Stats, _} = handle_call(stats, undefined, State), emit_stats(emqttd_protocol:clientid(ProtoState), State).
emqttd_stats:set_client_stats(emqttd_protocol:clientid(ProtoState), Stats),
emit_stats(_ClientId, State = #wsclient_state{enable_stats = false}) ->
State;
emit_stats(undefined, State) ->
State;
emit_stats(ClientId, State) ->
{reply, Stats, _, _} = handle_call(stats, undefined, State),
emqttd_stats:set_client_stats(ClientId, Stats),
State. State.
wsock_stats(#wsclient_state{connection = Conn}) -> wsock_stats(#wsclient_state{connection = Conn}) ->
@ -252,7 +261,10 @@ wsock_stats(#wsclient_state{connection = Conn}) ->
with_proto(Fun, State = #wsclient_state{proto_state = ProtoState}) -> with_proto(Fun, State = #wsclient_state{proto_state = ProtoState}) ->
{ok, ProtoState1} = Fun(ProtoState), {ok, ProtoState1} = Fun(ProtoState),
{noreply, State#wsclient_state{proto_state = ProtoState1}}. {noreply, State#wsclient_state{proto_state = ProtoState1}, hibernate}.
reply(Reply, State) ->
{reply, Reply, State, hibernate}.
shutdown(Reason, State) -> shutdown(Reason, State) ->
stop({shutdown, Reason}, State). stop({shutdown, Reason}, State).