Replace gen_server2 with gen_server for we cannot trace the size of drain queue

This commit is contained in:
Feng Lee 2018-02-26 13:24:29 +08:00
parent 057ef7b668
commit d5893ba2be
10 changed files with 84 additions and 1776 deletions

View File

@ -16,9 +16,7 @@
-module(emqx_bridge).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server2).
-behaviour(gen_server).
-include("emqx.hrl").
@ -61,7 +59,7 @@
-spec(start_link(any(), pos_integer(), atom(), binary(), [option()]) ->
{ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id, Node, Topic, Options) ->
gen_server2:start_link(?MODULE, [Pool, Id, Node, Topic, Options], []).
gen_server:start_link(?MODULE, [Pool, Id, Node, Topic, Options], []).
%%--------------------------------------------------------------------
%% gen_server callbacks

View File

@ -18,9 +18,7 @@
-module(emqx_client).
-behaviour(gen_server2).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server).
-include("emqx.hrl").
@ -48,8 +46,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).
%% gen_server2 Callbacks
-export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]).
%% TODO: How to emit stats?
-export([handle_pre_hibernate/1]).
%% Client State
%% Unused fields: connname, peerhost, peerport
@ -69,19 +67,19 @@ start_link(Conn, Env) ->
{ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}.
info(CPid) ->
gen_server2:call(CPid, info).
gen_server:call(CPid, info).
stats(CPid) ->
gen_server2:call(CPid, stats).
gen_server:call(CPid, stats).
kick(CPid) ->
gen_server2:call(CPid, kick).
gen_server:call(CPid, kick).
set_rate_limit(Cpid, Rl) ->
gen_server2:call(Cpid, {set_rate_limit, Rl}).
gen_server:call(Cpid, {set_rate_limit, Rl}).
get_rate_limit(Cpid) ->
gen_server2:call(Cpid, get_rate_limit).
gen_server:call(Cpid, get_rate_limit).
subscribe(CPid, TopicTable) ->
CPid ! {subscribe, TopicTable}.
@ -90,10 +88,10 @@ unsubscribe(CPid, Topics) ->
CPid ! {unsubscribe, Topics}.
session(CPid) ->
gen_server2:call(CPid, session, infinity).
gen_server:call(CPid, session, infinity).
clean_acl_cache(CPid, Topic) ->
gen_server2:call(CPid, {clean_acl_cache, Topic}).
gen_server:call(CPid, {clean_acl_cache, Topic}).
%%--------------------------------------------------------------------
%% gen_server Callbacks
@ -130,8 +128,8 @@ do_init(Conn, Env, Peername) ->
enable_stats = EnableStats,
idle_timeout = IdleTimout,
force_gc_count = ForceGcCount}),
gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout,
{backoff, 2000, 2000, 20000}).
gen_server:enter_loop(?MODULE, [{hibernate_after, 10000}],
State, self(), IdleTimout).
send_fun(Conn, Peername) ->
Self = self(),
@ -148,12 +146,6 @@ send_fun(Conn, Peername) ->
end
end.
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end.
prioritise_info(Msg, _Len, _State) ->
case Msg of {redeliver, _} -> 5; _ -> 0 end.
handle_pre_hibernate(State) ->
{hibernate, emqx_gc:reset_conn_gc_count(#client_state.force_gc_count, emit_stats(State))}.
@ -241,7 +233,7 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
shutdown(conflict, State);
handle_info(activate_sock, State) ->
{noreply, run_socket(State#client_state{conn_state = running}), hibernate};
{noreply, run_socket(State#client_state{conn_state = running})};
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
Size = iolist_size(Data),
@ -253,7 +245,7 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
shutdown(Reason, State);
handle_info({inet_reply, _Sock, ok}, State) ->
{noreply, gc(State), hibernate}; %% Tune GC
{noreply, gc(State)}; %% Tune GC
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
shutdown(Reason, State);
@ -268,7 +260,7 @@ handle_info({keepalive, start, Interval}, State = #client_state{connection = Con
end,
case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of
{ok, KeepAlive} ->
{noreply, State#client_state{keepalive = KeepAlive}, hibernate};
{noreply, State#client_state{keepalive = KeepAlive}};
{error, Error} ->
?LOG(warning, "Keepalive error - ~p", [Error], State),
shutdown(Error, State)
@ -277,7 +269,7 @@ handle_info({keepalive, start, Interval}, State = #client_state{connection = Con
handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
case emqx_keepalive:check(KeepAlive) of
{ok, KeepAlive1} ->
{noreply, State#client_state{keepalive = KeepAlive1}, hibernate};
{noreply, State#client_state{keepalive = KeepAlive1}};
{error, timeout} ->
?LOG(debug, "Keepalive timeout", [], State),
shutdown(keepalive_timeout, State);
@ -314,7 +306,7 @@ code_change(_OldVsn, State, _Extra) ->
%% Receive and Parse TCP Data
received(<<>>, State) ->
{noreply, gc(State), hibernate};
{noreply, gc(State)};
received(Bytes, State = #client_state{parser = Parser,
packet_size = PacketSize,
@ -368,7 +360,7 @@ run_socket(State = #client_state{connection = Conn}) ->
with_proto(Fun, State = #client_state{proto_state = ProtoState}) ->
{ok, ProtoState1} = Fun(ProtoState),
{noreply, State#client_state{proto_state = ProtoState1}, hibernate}.
{noreply, State#client_state{proto_state = ProtoState1}}.
emit_stats(State = #client_state{proto_state = ProtoState}) ->
emit_stats(emqx_protocol:clientid(ProtoState), State).

View File

@ -18,7 +18,7 @@
-module(emqx_cm).
-behaviour(gen_server2).
-behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
@ -35,9 +35,6 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% gen_server2 priorities
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
-record(state, {pool, id, statsfun, monitors}).
-define(POOL, ?MODULE).
@ -49,7 +46,7 @@
%% @doc Start Client Manager
-spec(start_link(atom(), pos_integer(), fun()) -> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id, StatsFun) ->
gen_server2:start_link(?MODULE, [Pool, Id, StatsFun], []).
gen_server:start_link(?MODULE, [Pool, Id, StatsFun], []).
%% @doc Lookup Client by ClientId
-spec(lookup(binary()) -> mqtt_client() | undefined).
@ -67,12 +64,12 @@ lookup_proc(ClientId) when is_binary(ClientId) ->
%% @doc Register ClientId with Pid.
-spec(reg(mqtt_client()) -> ok).
reg(Client = #mqtt_client{client_id = ClientId}) ->
gen_server2:call(pick(ClientId), {reg, Client}, 120000).
gen_server:call(pick(ClientId), {reg, Client}, 120000).
%% @doc Unregister clientId with pid.
-spec(unreg(binary()) -> ok).
unreg(ClientId) when is_binary(ClientId) ->
gen_server2:cast(pick(ClientId), {unreg, ClientId, self()}).
gen_server:cast(pick(ClientId), {unreg, ClientId, self()}).
pick(ClientId) -> gproc_pool:pick_worker(?POOL, ClientId).
@ -84,15 +81,6 @@ init([Pool, Id, StatsFun]) ->
?GPROC_POOL(join, Pool, Id),
{ok, #state{pool = Pool, id = Id, statsfun = StatsFun, monitors = dict:new()}}.
prioritise_call(Req, _From, _Len, _State) ->
case Req of {reg, _Client} -> 2; _ -> 1 end.
prioritise_cast(Msg, _Len, _State) ->
case Msg of {unreg, _ClientId, _Pid} -> 9; _ -> 1 end.
prioritise_info(_Msg, _Len, _State) ->
3.
handle_call({reg, Client = #mqtt_client{client_id = ClientId,
client_pid = Pid}}, _From, State) ->
case lookup_proc(ClientId) of

View File

@ -16,9 +16,7 @@
-module(emqx_pubsub).
-behaviour(gen_server2).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server).
-include("emqx.hrl").
@ -48,7 +46,8 @@
-spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id, Env) ->
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)},
?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]).
%%--------------------------------------------------------------------
%% PubSub API
@ -152,10 +151,10 @@ async_unsubscribe(Topic, Subscriber, Options) ->
cast(pick(Topic), {unsubscribe, Topic, Subscriber, Options}).
call(PubSub, Req) when is_pid(PubSub) ->
gen_server2:call(PubSub, Req, infinity).
gen_server:call(PubSub, Req, infinity).
cast(PubSub, Msg) when is_pid(PubSub) ->
gen_server2:cast(PubSub, Msg).
gen_server:cast(PubSub, Msg).
pick(Topic) ->
gproc_pool:pick_worker(pubsub, Topic).
@ -166,8 +165,7 @@ pick(Topic) ->
init([Pool, Id, Env]) ->
?GPROC_POOL(join, Pool, Id),
{ok, #state{pool = Pool, id = Id, env = Env},
hibernate, {backoff, 2000, 2000, 20000}}.
{ok, #state{pool = Pool, id = Id, env = Env}, hibernate}.
handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
add_subscriber(Topic, Subscriber, Options),
@ -247,8 +245,8 @@ setstats(State) ->
State.
reply(Reply, State) ->
{reply, Reply, State, hibernate}.
{reply, Reply, State}.
noreply(State) ->
{noreply, State, hibernate}.
{noreply, State}.

View File

@ -16,9 +16,7 @@
-module(emqx_server).
-behaviour(gen_server2).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server).
-include("emqx.hrl").
@ -51,7 +49,8 @@
%% @doc Start the server
-spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id, Env) ->
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)},
?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]).
%%--------------------------------------------------------------------
%% PubSub API
@ -167,10 +166,10 @@ subscribed(Topic, {SubId, SubPid}) when is_binary(Topic), is_binary(SubId), is_p
ets:member(mqtt_subproperty, {Topic, {SubId, SubPid}}).
call(Server, Req) ->
gen_server2:call(Server, Req, infinity).
gen_server:call(Server, Req, infinity).
cast(Server, Msg) when is_pid(Server) ->
gen_server2:cast(Server, Msg).
gen_server:cast(Server, Msg).
pick(SubPid) when is_pid(SubPid) ->
gproc_pool:pick_worker(server, SubPid);
@ -190,7 +189,7 @@ init([Pool, Id, Env]) ->
?GPROC_POOL(join, Pool, Id),
State = #state{pool = Pool, id = Id, env = Env,
subids = #{}, submon = emqx_pmon:new()},
{ok, State, hibernate, {backoff, 2000, 2000, 20000}}.
{ok, State, hibernate}.
handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
case do_subscribe(Topic, Subscriber, Options, State) of
@ -321,8 +320,8 @@ setstats(State) ->
ets:info(mqtt_subscription, size)), State.
reply(Reply, State) ->
{reply, Reply, State, hibernate}.
{reply, Reply, State}.
noreply(State) ->
{noreply, State, hibernate}.
{noreply, State}.

View File

@ -45,7 +45,7 @@
-module(emqx_session).
-behaviour(gen_server2).
-behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
@ -73,9 +73,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% gen_server2 Message Priorities
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3,
handle_pre_hibernate/1]).
%% TODO: gen_server Message Priorities
-export([handle_pre_hibernate/1]).
-define(MQueue, emqx_mqueue).
@ -175,7 +174,8 @@
%% @doc Start a Session
-spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, term()}).
start_link(CleanSess, {ClientId, Username}, ClientPid) ->
gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []).
gen_server:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid],
[{hibernate_after, 10000}]).
%%--------------------------------------------------------------------
%% PubSub API
@ -184,13 +184,13 @@ start_link(CleanSess, {ClientId, Username}, ClientPid) ->
%% @doc Subscribe topics
-spec(subscribe(pid(), [{binary(), [emqx_topic:option()]}]) -> ok).
subscribe(Session, TopicTable) -> %%TODO: the ack function??...
gen_server2:cast(Session, {subscribe, self(), TopicTable, fun(_) -> ok end}).
gen_server:cast(Session, {subscribe, self(), TopicTable, fun(_) -> ok end}).
-spec(subscribe(pid(), mqtt_packet_id(), [{binary(), [emqx_topic:option()]}]) -> ok).
subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??...
From = self(),
AckFun = fun(GrantedQos) -> From ! {suback, PacketId, GrantedQos} end,
gen_server2:cast(Session, {subscribe, From, TopicTable, AckFun}).
gen_server:cast(Session, {subscribe, From, TopicTable, AckFun}).
%% @doc Publish Message
-spec(publish(pid(), mqtt_message()) -> ok | {error, term()}).
@ -204,50 +204,50 @@ publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) ->
publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) ->
%% Publish QoS2 to Session
gen_server2:call(Session, {publish, Msg}, ?TIMEOUT).
gen_server:call(Session, {publish, Msg}, ?TIMEOUT).
%% @doc PubAck Message
-spec(puback(pid(), mqtt_packet_id()) -> ok).
puback(Session, PacketId) ->
gen_server2:cast(Session, {puback, PacketId}).
gen_server:cast(Session, {puback, PacketId}).
-spec(pubrec(pid(), mqtt_packet_id()) -> ok).
pubrec(Session, PacketId) ->
gen_server2:cast(Session, {pubrec, PacketId}).
gen_server:cast(Session, {pubrec, PacketId}).
-spec(pubrel(pid(), mqtt_packet_id()) -> ok).
pubrel(Session, PacketId) ->
gen_server2:cast(Session, {pubrel, PacketId}).
gen_server:cast(Session, {pubrel, PacketId}).
-spec(pubcomp(pid(), mqtt_packet_id()) -> ok).
pubcomp(Session, PacketId) ->
gen_server2:cast(Session, {pubcomp, PacketId}).
gen_server:cast(Session, {pubcomp, PacketId}).
%% @doc Unsubscribe the topics
-spec(unsubscribe(pid(), [{binary(), [emqx_topic:option()]}]) -> ok).
unsubscribe(Session, TopicTable) ->
gen_server2:cast(Session, {unsubscribe, self(), TopicTable}).
gen_server:cast(Session, {unsubscribe, self(), TopicTable}).
%% @doc Resume the session
-spec(resume(pid(), mqtt_client_id(), pid()) -> ok).
resume(Session, ClientId, ClientPid) ->
gen_server2:cast(Session, {resume, ClientId, ClientPid}).
gen_server:cast(Session, {resume, ClientId, ClientPid}).
%% @doc Get session state
state(Session) when is_pid(Session) ->
gen_server2:call(Session, state).
gen_server:call(Session, state).
%% @doc Get session info
-spec(info(pid() | #state{}) -> list(tuple())).
info(Session) when is_pid(Session) ->
gen_server2:call(Session, info);
gen_server:call(Session, info);
info(State) when is_record(State, state) ->
?record_to_proplist(state, State, ?INFO_KEYS).
-spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})).
stats(Session) when is_pid(Session) ->
gen_server2:call(Session, stats);
gen_server:call(Session, stats);
stats(#state{max_subscriptions = MaxSubscriptions,
subscriptions = Subscriptions,
@ -272,7 +272,7 @@ stats(#state{max_subscriptions = MaxSubscriptions,
%% @doc Destroy the session
-spec(destroy(pid(), mqtt_client_id()) -> ok).
destroy(Session, ClientId) ->
gen_server2:cast(Session, {destroy, ClientId}).
gen_server:cast(Session, {destroy, ClientId}).
%%--------------------------------------------------------------------
%% gen_server Callbacks
@ -311,7 +311,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
created_at = os:timestamp()},
emqx_sm:register_session(ClientId, CleanSess, info(State)),
emqx_hooks:run('session.created', [ClientId, Username]),
{ok, emit_stats(State), hibernate, {backoff, 1000, 1000, 10000}}.
{ok, emit_stats(State), hibernate}.
init_stats(Keys) ->
lists:foreach(fun(K) -> put(K, 0) end, Keys).
@ -319,30 +319,6 @@ init_stats(Keys) ->
binding(ClientPid) ->
case node(ClientPid) =:= node() of true -> local; false -> remote end.
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end.
prioritise_cast(Msg, _Len, _State) ->
case Msg of
{destroy, _} -> 10;
{resume, _, _} -> 9;
{pubrel, _} -> 8;
{pubcomp, _} -> 8;
{pubrec, _} -> 8;
{puback, _} -> 7;
{unsubscribe, _, _} -> 6;
{subscribe, _, _} -> 5;
_ -> 0
end.
prioritise_info(Msg, _Len, _State) ->
case Msg of
{'EXIT', _, _} -> 10;
{timeout, _, _} -> 5;
{dispatch, _, _} -> 1;
_ -> 0
end.
handle_pre_hibernate(State) ->
{hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}.
@ -406,7 +382,7 @@ handle_cast({subscribe, From, TopicTable, AckFun},
{[NewQos|QosAcc], SubMap1}
end, {[], Subscriptions}, TopicTable),
AckFun(lists:reverse(GrantedQos)),
hibernate(emit_stats(State#state{subscriptions = Subscriptions1}));
{noreply, emit_stats(State#state{subscriptions = Subscriptions1}), hibernate};
handle_cast({unsubscribe, From, TopicTable},
State = #state{client_id = ClientId,
@ -428,7 +404,7 @@ handle_cast({unsubscribe, From, TopicTable},
SubMap
end
end, Subscriptions, TopicTable),
hibernate(emit_stats(State#state{subscriptions = Subscriptions1}));
{noreply, emit_stats(State#state{subscriptions = Subscriptions1}), hibernate};
%% PUBACK:
handle_cast({puback, PacketId}, State = #state{inflight = Inflight}) ->
@ -525,7 +501,7 @@ handle_cast({resume, ClientId, ClientPid},
end,
%% Replay delivery and Dequeue pending messages
hibernate(emit_stats(dequeue(retry_delivery(true, State1))));
{noreply, emit_stats(dequeue(retry_delivery(true, State1)))};
handle_cast({destroy, ClientId},
State = #state{client_id = ClientId, client_pid = undefined}) ->
@ -543,21 +519,21 @@ handle_cast(Msg, State) ->
%% Ignore Messages delivered by self
handle_info({dispatch, _Topic, #mqtt_message{from = {ClientId, _}}},
State = #state{client_id = ClientId, ignore_loop_deliver = true}) ->
hibernate(State);
{noreply, State};
%% Dispatch Message
handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) ->
hibernate(gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State)));
{noreply, gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))};
%% Do nothing if the client has been disconnected.
handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) ->
hibernate(emit_stats(State#state{retry_timer = undefined}));
{noreply, emit_stats(State#state{retry_timer = undefined})};
handle_info({timeout, _Timer, retry_delivery}, State) ->
hibernate(emit_stats(retry_delivery(false, State#state{retry_timer = undefined})));
{noreply, emit_stats(retry_delivery(false, State#state{retry_timer = undefined}))};
handle_info({timeout, _Timer, check_awaiting_rel}, State) ->
hibernate(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined})));
{noreply, expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined}))};
handle_info({timeout, _Timer, expired}, State) ->
?LOG(info, "Expired, shutdown now.", [], State),
@ -574,17 +550,17 @@ handle_info({'EXIT', ClientPid, Reason},
?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State),
ExpireTimer = start_timer(Interval, expired),
State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer},
hibernate(emit_stats(State1));
{noreply, emit_stats(State1), hibernate};
handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) ->
%%ignore
hibernate(State);
{noreply, State, hibernate};
handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) ->
?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p",
[ClientPid, Pid, Reason], State),
hibernate(State);
{noreply, State, hibernate};
handle_info(Info, Session) ->
?UNEXPECTED_INFO(Info, Session).
@ -857,9 +833,6 @@ inc_stats(Key) -> put(Key, get(Key) + 1).
reply(Reply, State) ->
{reply, Reply, State, hibernate}.
hibernate(State) ->
{noreply, State, hibernate}.
shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.

View File

@ -16,9 +16,7 @@
-module(emqx_sm).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server2).
-behaviour(gen_server).
-include("emqx.hrl").
@ -44,9 +42,6 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% gen_server2 priorities
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
-record(state, {pool, id, monitors}).
-define(POOL, ?MODULE).
@ -78,7 +73,7 @@ mnesia(copy) ->
%% @doc Start a session manager
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id) ->
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
%% @doc Start a session
-spec(start_session(boolean(), {binary(), binary() | undefined}) -> {ok, pid(), boolean()} | {error, term()}).
@ -123,7 +118,7 @@ dispatch(ClientId, Topic, Msg) ->
end.
call(SM, Req) ->
gen_server2:call(SM, Req, ?TIMEOUT). %%infinity).
gen_server:call(SM, Req, ?TIMEOUT). %%infinity).
%% @doc for debug.
local_sessions() ->
@ -137,15 +132,6 @@ init([Pool, Id]) ->
?GPROC_POOL(join, Pool, Id),
{ok, #state{pool = Pool, id = Id, monitors = dict:new()}}.
prioritise_call(_Msg, _From, _Len, _State) ->
1.
prioritise_cast(_Msg, _Len, _State) ->
0.
prioritise_info(_Msg, _Len, _State) ->
2.
%% Persistent Session
handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, State) ->
case lookup_session(ClientId) of

View File

@ -18,7 +18,7 @@
-module(emqx_ws_client).
-behaviour(gen_server2).
-behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
@ -46,8 +46,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% gen_server2 Callbacks
-export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]).
%% TODO: remove ...
-export([handle_pre_hibernate/1]).
%% WebSocket Client State
-record(wsclient_state, {ws_pid, peername, connection, proto_state, keepalive,
@ -61,17 +61,17 @@
%% @doc Start WebSocket Client.
start_link(Env, WsPid, Req, ReplyChannel) ->
gen_server2:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel],
[{spawn_opt, ?FULLSWEEP_OPTS}]). %% Tune GC.
gen_server:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel],
[[{hibernate_after, 10000}]]).
info(CPid) ->
gen_server2:call(CPid, info).
gen_server:call(CPid, info).
stats(CPid) ->
gen_server2:call(CPid, stats).
gen_server:call(CPid, stats).
kick(CPid) ->
gen_server2:call(CPid, kick).
gen_server:call(CPid, kick).
subscribe(CPid, TopicTable) ->
CPid ! {subscribe, TopicTable}.
@ -80,10 +80,10 @@ unsubscribe(CPid, Topics) ->
CPid ! {unsubscribe, Topics}.
session(CPid) ->
gen_server2:call(CPid, session).
gen_server:call(CPid, session).
clean_acl_cache(CPid, Topic) ->
gen_server2:call(CPid, {clean_acl_cache, Topic}).
gen_server:call(CPid, {clean_acl_cache, Topic}).
%%--------------------------------------------------------------------
%% gen_server Callbacks
@ -116,12 +116,6 @@ init([Env, WsPid, Req, ReplyChannel]) ->
exit({shutdown, Reason})
end.
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end.
prioritise_info(Msg, _Len, _State) ->
case Msg of {redeliver, _} -> 5; _ -> 0 end.
handle_pre_hibernate(State = #wsclient_state{ws_pid = WsPid}) ->
erlang:garbage_collect(WsPid),
{hibernate, emqx_gc:reset_conn_gc_count(#wsclient_state.force_gc_count, emit_stats(State))}.

File diff suppressed because it is too large Load Diff

View File

@ -1,259 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
%%
%% Priority queues have essentially the same interface as ordinary
%% queues, except that a) there is an in/3 that takes a priority, and
%% b) we have only implemented the core API we need.
%%
%% Priorities should be integers - the higher the value the higher the
%% priority - but we don't actually check that.
%%
%% in/2 inserts items with priority 0.
%%
%% We optimise the case where a priority queue is being used just like
%% an ordinary queue. When that is the case we represent the priority
%% queue as an ordinary queue. We could just call into the 'queue'
%% module for that, but for efficiency we implement the relevant
%% functions directly in here, thus saving on inter-module calls and
%% eliminating a level of boxing.
%%
%% When the queue contains items with non-zero priorities, it is
%% represented as a sorted kv list with the inverted Priority as the
%% key and an ordinary queue as the value. Here again we use our own
%% ordinary queue implemention for efficiency, often making recursive
%% calls into the same function knowing that ordinary queues represent
%% a base case.
-module(priority_queue).
-export([new/0, is_queue/1, is_empty/1, len/1, plen/2, to_list/1, from_list/1,
in/2, in/3, out/1, out/2, out_p/1, join/2, filter/2, fold/3, highest/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-type(q() :: pqueue()).
-type(priority() :: integer() | 'infinity').
-type(squeue() :: {queue, [any()], [any()], non_neg_integer()}).
-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
-export_type([q/0]).
-spec(new/0 :: () -> pqueue()).
-spec(is_queue/1 :: (any()) -> boolean()).
-spec(is_empty/1 :: (pqueue()) -> boolean()).
-spec(len/1 :: (pqueue()) -> non_neg_integer()).
-spec(plen/2 :: (priority(), pqueue()) -> non_neg_integer()).
-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
-spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()).
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
-spec(out_p/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()).
-spec(fold/3 ::
(fun ((any(), priority(), A) -> A), A, pqueue()) -> A).
-spec(highest/1 :: (pqueue()) -> priority() | 'empty').
-endif.
%%----------------------------------------------------------------------------
new() ->
{queue, [], [], 0}.
is_queue({queue, R, F, L}) when is_list(R), is_list(F), is_integer(L) ->
true;
is_queue({pqueue, Queues}) when is_list(Queues) ->
lists:all(fun ({infinity, Q}) -> is_queue(Q);
({P, Q}) -> is_integer(P) andalso is_queue(Q)
end, Queues);
is_queue(_) ->
false.
is_empty({queue, [], [], 0}) ->
true;
is_empty(_) ->
false.
len({queue, _R, _F, L}) ->
L;
len({pqueue, Queues}) ->
lists:sum([len(Q) || {_, Q} <- Queues]).
plen(0, {queue, _R, _F, L}) ->
L;
plen(_, {queue, _R, _F, _}) ->
0;
plen(P, {pqueue, Queues}) ->
case lists:keysearch(maybe_negate_priority(P), 1, Queues) of
{value, {_, Q}} -> len(Q);
false -> 0
end.
to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) ->
[{0, V} || V <- Out ++ lists:reverse(In, [])];
to_list({pqueue, Queues}) ->
[{maybe_negate_priority(P), V} || {P, Q} <- Queues,
{0, V} <- to_list(Q)].
from_list(L) ->
lists:foldl(fun ({P, E}, Q) -> in(E, P, Q) end, new(), L).
in(Item, Q) ->
in(Item, 0, Q).
in(X, 0, {queue, [_] = In, [], 1}) ->
{queue, [X], In, 2};
in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) ->
{queue, [X|In], Out, Len + 1};
in(X, Priority, _Q = {queue, [], [], 0}) ->
in(X, Priority, {pqueue, []});
in(X, Priority, Q = {queue, _, _, _}) ->
in(X, Priority, {pqueue, [{0, Q}]});
in(X, Priority, {pqueue, Queues}) ->
P = maybe_negate_priority(Priority),
{pqueue, case lists:keysearch(P, 1, Queues) of
{value, {_, Q}} ->
lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
false when P == infinity ->
[{P, {queue, [X], [], 1}} | Queues];
false ->
case Queues of
[{infinity, InfQueue} | Queues1] ->
[{infinity, InfQueue} |
lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues1])];
_ ->
lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues])
end
end}.
out({queue, [], [], 0} = Q) ->
{empty, Q};
out({queue, [V], [], 1}) ->
{{value, V}, {queue, [], [], 0}};
out({queue, [Y|In], [], Len}) ->
[V|Out] = lists:reverse(In, []),
{{value, V}, {queue, [Y], Out, Len - 1}};
out({queue, In, [V], Len}) when is_list(In) ->
{{value,V}, r2f(In, Len - 1)};
out({queue, In,[V|Out], Len}) when is_list(In) ->
{{value, V}, {queue, In, Out, Len - 1}};
out({pqueue, [{P, Q} | Queues]}) ->
{R, Q1} = out(Q),
NewQ = case is_empty(Q1) of
true -> case Queues of
[] -> {queue, [], [], 0};
[{0, OnlyQ}] -> OnlyQ;
[_|_] -> {pqueue, Queues}
end;
false -> {pqueue, [{P, Q1} | Queues]}
end,
{R, NewQ}.
out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
out(0, {queue, _, _, _} = Q) ->
out(Q);
out(Priority, {queue, _, _, _}) ->
erlang:error(badarg, [Priority]);
out(Priority, {pqueue, Queues}) ->
P = maybe_negate_priority(Priority),
case lists:keysearch(P, 1, Queues) of
{value, {_, Q}} ->
{R, Q1} = out(Q),
Queues1 = case is_empty(Q1) of
true -> lists:keydelete(P, 1, Queues);
false -> lists:keyreplace(P, 1, Queues, {P, Q1})
end,
{R, case Queues1 of
[] -> {queue, [], [], 0};
[{0, OnlyQ}] -> OnlyQ;
[_|_] -> {pqueue, Queues1}
end};
false ->
{empty, {pqueue, Queues}}
end.
add_p(R, P) -> case R of
{empty, Q} -> {empty, Q};
{{value, V}, Q} -> {{value, V, P}, Q}
end.
join(A, {queue, [], [], 0}) ->
A;
join({queue, [], [], 0}, B) ->
B;
join({queue, AIn, AOut, ALen}, {queue, BIn, BOut, BLen}) ->
{queue, BIn, AOut ++ lists:reverse(AIn, BOut), ALen + BLen};
join(A = {queue, _, _, _}, {pqueue, BPQ}) ->
{Pre, Post} =
lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ),
Post1 = case Post of
[] -> [ {0, A} ];
[ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
_ -> [ {0, A} | Post ]
end,
{pqueue, Pre ++ Post1};
join({pqueue, APQ}, B = {queue, _, _, _}) ->
{Pre, Post} =
lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ),
Post1 = case Post of
[] -> [ {0, B} ];
[ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
_ -> [ {0, B} | Post ]
end,
{pqueue, Pre ++ Post1};
join({pqueue, APQ}, {pqueue, BPQ}) ->
{pqueue, merge(APQ, BPQ, [])}.
merge([], BPQ, Acc) ->
lists:reverse(Acc, BPQ);
merge(APQ, [], Acc) ->
lists:reverse(Acc, APQ);
merge([{P, A}|As], [{P, B}|Bs], Acc) ->
merge(As, Bs, [ {P, join(A, B)} | Acc ]);
merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity ->
merge(As, Bs, [ {PA, A} | Acc ]);
merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
merge(As, Bs, [ {PB, B} | Acc ]).
filter(Pred, Q) -> fold(fun(V, P, Acc) ->
case Pred(V) of
true -> in(V, P, Acc);
false -> Acc
end
end, new(), Q).
fold(Fun, Init, Q) -> case out_p(Q) of
{empty, _Q} -> Init;
{{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1)
end.
highest({queue, [], [], 0}) -> empty;
highest({queue, _, _, _}) -> 0;
highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P).
r2f([], 0) -> {queue, [], [], 0};
r2f([_] = R, 1) -> {queue, [], R, 1};
r2f([X,Y], 2) -> {queue, [X], [Y], 2};
r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
maybe_negate_priority(infinity) -> infinity;
maybe_negate_priority(P) -> -P.