From d5893ba2be0581501614c62547d3c826a6ef25e5 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 26 Feb 2018 13:24:29 +0800 Subject: [PATCH] Replace gen_server2 with gen_server for we cannot trace the size of drain queue --- src/emqx_bridge.erl | 6 +- src/emqx_client.erl | 44 +- src/emqx_cm.erl | 20 +- src/emqx_pubsub.erl | 18 +- src/emqx_server.erl | 17 +- src/emqx_session.erl | 89 +-- src/emqx_sm.erl | 20 +- src/emqx_ws_client.erl | 26 +- src/gen_server2.erl | 1361 ---------------------------------------- src/priority_queue.erl | 259 -------- 10 files changed, 84 insertions(+), 1776 deletions(-) delete mode 100644 src/gen_server2.erl delete mode 100644 src/priority_queue.erl diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 8ad6bf4b2..fc6f33535 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -16,9 +16,7 @@ -module(emqx_bridge). --author("Feng Lee "). - --behaviour(gen_server2). +-behaviour(gen_server). -include("emqx.hrl"). @@ -61,7 +59,7 @@ -spec(start_link(any(), pos_integer(), atom(), binary(), [option()]) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id, Node, Topic, Options) -> - gen_server2:start_link(?MODULE, [Pool, Id, Node, Topic, Options], []). + gen_server:start_link(?MODULE, [Pool, Id, Node, Topic, Options], []). %%-------------------------------------------------------------------- %% gen_server callbacks diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 520cd8878..0a3d7d8ce 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -18,9 +18,7 @@ -module(emqx_client). --behaviour(gen_server2). - --author("Feng Lee "). +-behaviour(gen_server). -include("emqx.hrl"). @@ -48,8 +46,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). -%% gen_server2 Callbacks --export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]). +%% TODO: How to emit stats? +-export([handle_pre_hibernate/1]). %% Client State %% Unused fields: connname, peerhost, peerport @@ -69,19 +67,19 @@ start_link(Conn, Env) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}. info(CPid) -> - gen_server2:call(CPid, info). + gen_server:call(CPid, info). stats(CPid) -> - gen_server2:call(CPid, stats). + gen_server:call(CPid, stats). kick(CPid) -> - gen_server2:call(CPid, kick). + gen_server:call(CPid, kick). set_rate_limit(Cpid, Rl) -> - gen_server2:call(Cpid, {set_rate_limit, Rl}). + gen_server:call(Cpid, {set_rate_limit, Rl}). get_rate_limit(Cpid) -> - gen_server2:call(Cpid, get_rate_limit). + gen_server:call(Cpid, get_rate_limit). subscribe(CPid, TopicTable) -> CPid ! {subscribe, TopicTable}. @@ -90,10 +88,10 @@ unsubscribe(CPid, Topics) -> CPid ! {unsubscribe, Topics}. session(CPid) -> - gen_server2:call(CPid, session, infinity). + gen_server:call(CPid, session, infinity). clean_acl_cache(CPid, Topic) -> - gen_server2:call(CPid, {clean_acl_cache, Topic}). + gen_server:call(CPid, {clean_acl_cache, Topic}). %%-------------------------------------------------------------------- %% gen_server Callbacks @@ -130,8 +128,8 @@ do_init(Conn, Env, Peername) -> enable_stats = EnableStats, idle_timeout = IdleTimout, force_gc_count = ForceGcCount}), - gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout, - {backoff, 2000, 2000, 20000}). + gen_server:enter_loop(?MODULE, [{hibernate_after, 10000}], + State, self(), IdleTimout). send_fun(Conn, Peername) -> Self = self(), @@ -148,12 +146,6 @@ send_fun(Conn, Peername) -> end end. -prioritise_call(Msg, _From, _Len, _State) -> - case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end. - -prioritise_info(Msg, _Len, _State) -> - case Msg of {redeliver, _} -> 5; _ -> 0 end. - handle_pre_hibernate(State) -> {hibernate, emqx_gc:reset_conn_gc_count(#client_state.force_gc_count, emit_stats(State))}. @@ -241,7 +233,7 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> shutdown(conflict, State); handle_info(activate_sock, State) -> - {noreply, run_socket(State#client_state{conn_state = running}), hibernate}; + {noreply, run_socket(State#client_state{conn_state = running})}; handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> Size = iolist_size(Data), @@ -253,7 +245,7 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> shutdown(Reason, State); handle_info({inet_reply, _Sock, ok}, State) -> - {noreply, gc(State), hibernate}; %% Tune GC + {noreply, gc(State)}; %% Tune GC handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); @@ -268,7 +260,7 @@ handle_info({keepalive, start, Interval}, State = #client_state{connection = Con end, case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of {ok, KeepAlive} -> - {noreply, State#client_state{keepalive = KeepAlive}, hibernate}; + {noreply, State#client_state{keepalive = KeepAlive}}; {error, Error} -> ?LOG(warning, "Keepalive error - ~p", [Error], State), shutdown(Error, State) @@ -277,7 +269,7 @@ handle_info({keepalive, start, Interval}, State = #client_state{connection = Con handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) -> case emqx_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> - {noreply, State#client_state{keepalive = KeepAlive1}, hibernate}; + {noreply, State#client_state{keepalive = KeepAlive1}}; {error, timeout} -> ?LOG(debug, "Keepalive timeout", [], State), shutdown(keepalive_timeout, State); @@ -314,7 +306,7 @@ code_change(_OldVsn, State, _Extra) -> %% Receive and Parse TCP Data received(<<>>, State) -> - {noreply, gc(State), hibernate}; + {noreply, gc(State)}; received(Bytes, State = #client_state{parser = Parser, packet_size = PacketSize, @@ -368,7 +360,7 @@ run_socket(State = #client_state{connection = Conn}) -> with_proto(Fun, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = Fun(ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}, hibernate}. + {noreply, State#client_state{proto_state = ProtoState1}}. emit_stats(State = #client_state{proto_state = ProtoState}) -> emit_stats(emqx_protocol:clientid(ProtoState), State). diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 2cdbbefe0..f0b502170 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -18,7 +18,7 @@ -module(emqx_cm). --behaviour(gen_server2). +-behaviour(gen_server). -author("Feng Lee "). @@ -35,9 +35,6 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% gen_server2 priorities --export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). - -record(state, {pool, id, statsfun, monitors}). -define(POOL, ?MODULE). @@ -49,7 +46,7 @@ %% @doc Start Client Manager -spec(start_link(atom(), pos_integer(), fun()) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id, StatsFun) -> - gen_server2:start_link(?MODULE, [Pool, Id, StatsFun], []). + gen_server:start_link(?MODULE, [Pool, Id, StatsFun], []). %% @doc Lookup Client by ClientId -spec(lookup(binary()) -> mqtt_client() | undefined). @@ -67,12 +64,12 @@ lookup_proc(ClientId) when is_binary(ClientId) -> %% @doc Register ClientId with Pid. -spec(reg(mqtt_client()) -> ok). reg(Client = #mqtt_client{client_id = ClientId}) -> - gen_server2:call(pick(ClientId), {reg, Client}, 120000). + gen_server:call(pick(ClientId), {reg, Client}, 120000). %% @doc Unregister clientId with pid. -spec(unreg(binary()) -> ok). unreg(ClientId) when is_binary(ClientId) -> - gen_server2:cast(pick(ClientId), {unreg, ClientId, self()}). + gen_server:cast(pick(ClientId), {unreg, ClientId, self()}). pick(ClientId) -> gproc_pool:pick_worker(?POOL, ClientId). @@ -84,15 +81,6 @@ init([Pool, Id, StatsFun]) -> ?GPROC_POOL(join, Pool, Id), {ok, #state{pool = Pool, id = Id, statsfun = StatsFun, monitors = dict:new()}}. -prioritise_call(Req, _From, _Len, _State) -> - case Req of {reg, _Client} -> 2; _ -> 1 end. - -prioritise_cast(Msg, _Len, _State) -> - case Msg of {unreg, _ClientId, _Pid} -> 9; _ -> 1 end. - -prioritise_info(_Msg, _Len, _State) -> - 3. - handle_call({reg, Client = #mqtt_client{client_id = ClientId, client_pid = Pid}}, _From, State) -> case lookup_proc(ClientId) of diff --git a/src/emqx_pubsub.erl b/src/emqx_pubsub.erl index 873cd1c2b..03647fa3c 100644 --- a/src/emqx_pubsub.erl +++ b/src/emqx_pubsub.erl @@ -16,9 +16,7 @@ -module(emqx_pubsub). --behaviour(gen_server2). - --author("Feng Lee "). +-behaviour(gen_server). -include("emqx.hrl"). @@ -48,7 +46,8 @@ -spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id, Env) -> - gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). + gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, + ?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]). %%-------------------------------------------------------------------- %% PubSub API @@ -152,10 +151,10 @@ async_unsubscribe(Topic, Subscriber, Options) -> cast(pick(Topic), {unsubscribe, Topic, Subscriber, Options}). call(PubSub, Req) when is_pid(PubSub) -> - gen_server2:call(PubSub, Req, infinity). + gen_server:call(PubSub, Req, infinity). cast(PubSub, Msg) when is_pid(PubSub) -> - gen_server2:cast(PubSub, Msg). + gen_server:cast(PubSub, Msg). pick(Topic) -> gproc_pool:pick_worker(pubsub, Topic). @@ -166,8 +165,7 @@ pick(Topic) -> init([Pool, Id, Env]) -> ?GPROC_POOL(join, Pool, Id), - {ok, #state{pool = Pool, id = Id, env = Env}, - hibernate, {backoff, 2000, 2000, 20000}}. + {ok, #state{pool = Pool, id = Id, env = Env}, hibernate}. handle_call({subscribe, Topic, Subscriber, Options}, _From, State) -> add_subscriber(Topic, Subscriber, Options), @@ -247,8 +245,8 @@ setstats(State) -> State. reply(Reply, State) -> - {reply, Reply, State, hibernate}. + {reply, Reply, State}. noreply(State) -> - {noreply, State, hibernate}. + {noreply, State}. diff --git a/src/emqx_server.erl b/src/emqx_server.erl index 1abb0d24b..47aafad49 100644 --- a/src/emqx_server.erl +++ b/src/emqx_server.erl @@ -16,9 +16,7 @@ -module(emqx_server). --behaviour(gen_server2). - --author("Feng Lee "). +-behaviour(gen_server). -include("emqx.hrl"). @@ -51,7 +49,8 @@ %% @doc Start the server -spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id, Env) -> - gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). + gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, + ?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]). %%-------------------------------------------------------------------- %% PubSub API @@ -167,10 +166,10 @@ subscribed(Topic, {SubId, SubPid}) when is_binary(Topic), is_binary(SubId), is_p ets:member(mqtt_subproperty, {Topic, {SubId, SubPid}}). call(Server, Req) -> - gen_server2:call(Server, Req, infinity). + gen_server:call(Server, Req, infinity). cast(Server, Msg) when is_pid(Server) -> - gen_server2:cast(Server, Msg). + gen_server:cast(Server, Msg). pick(SubPid) when is_pid(SubPid) -> gproc_pool:pick_worker(server, SubPid); @@ -190,7 +189,7 @@ init([Pool, Id, Env]) -> ?GPROC_POOL(join, Pool, Id), State = #state{pool = Pool, id = Id, env = Env, subids = #{}, submon = emqx_pmon:new()}, - {ok, State, hibernate, {backoff, 2000, 2000, 20000}}. + {ok, State, hibernate}. handle_call({subscribe, Topic, Subscriber, Options}, _From, State) -> case do_subscribe(Topic, Subscriber, Options, State) of @@ -321,8 +320,8 @@ setstats(State) -> ets:info(mqtt_subscription, size)), State. reply(Reply, State) -> - {reply, Reply, State, hibernate}. + {reply, Reply, State}. noreply(State) -> - {noreply, State, hibernate}. + {noreply, State}. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 8c72cd499..44cd44e00 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -45,7 +45,7 @@ -module(emqx_session). --behaviour(gen_server2). +-behaviour(gen_server). -author("Feng Lee "). @@ -73,9 +73,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% gen_server2 Message Priorities --export([prioritise_call/4, prioritise_cast/3, prioritise_info/3, - handle_pre_hibernate/1]). +%% TODO: gen_server Message Priorities +-export([handle_pre_hibernate/1]). -define(MQueue, emqx_mqueue). @@ -175,7 +174,8 @@ %% @doc Start a Session -spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, term()}). start_link(CleanSess, {ClientId, Username}, ClientPid) -> - gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []). + gen_server:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], + [{hibernate_after, 10000}]). %%-------------------------------------------------------------------- %% PubSub API @@ -183,14 +183,14 @@ start_link(CleanSess, {ClientId, Username}, ClientPid) -> %% @doc Subscribe topics -spec(subscribe(pid(), [{binary(), [emqx_topic:option()]}]) -> ok). -subscribe(Session, TopicTable) ->%%TODO: the ack function??... - gen_server2:cast(Session, {subscribe, self(), TopicTable, fun(_) -> ok end}). +subscribe(Session, TopicTable) -> %%TODO: the ack function??... + gen_server:cast(Session, {subscribe, self(), TopicTable, fun(_) -> ok end}). -spec(subscribe(pid(), mqtt_packet_id(), [{binary(), [emqx_topic:option()]}]) -> ok). subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??... From = self(), AckFun = fun(GrantedQos) -> From ! {suback, PacketId, GrantedQos} end, - gen_server2:cast(Session, {subscribe, From, TopicTable, AckFun}). + gen_server:cast(Session, {subscribe, From, TopicTable, AckFun}). %% @doc Publish Message -spec(publish(pid(), mqtt_message()) -> ok | {error, term()}). @@ -204,50 +204,50 @@ publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) -> publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) -> %% Publish QoS2 to Session - gen_server2:call(Session, {publish, Msg}, ?TIMEOUT). + gen_server:call(Session, {publish, Msg}, ?TIMEOUT). %% @doc PubAck Message -spec(puback(pid(), mqtt_packet_id()) -> ok). puback(Session, PacketId) -> - gen_server2:cast(Session, {puback, PacketId}). + gen_server:cast(Session, {puback, PacketId}). -spec(pubrec(pid(), mqtt_packet_id()) -> ok). pubrec(Session, PacketId) -> - gen_server2:cast(Session, {pubrec, PacketId}). + gen_server:cast(Session, {pubrec, PacketId}). -spec(pubrel(pid(), mqtt_packet_id()) -> ok). pubrel(Session, PacketId) -> - gen_server2:cast(Session, {pubrel, PacketId}). + gen_server:cast(Session, {pubrel, PacketId}). -spec(pubcomp(pid(), mqtt_packet_id()) -> ok). pubcomp(Session, PacketId) -> - gen_server2:cast(Session, {pubcomp, PacketId}). + gen_server:cast(Session, {pubcomp, PacketId}). %% @doc Unsubscribe the topics -spec(unsubscribe(pid(), [{binary(), [emqx_topic:option()]}]) -> ok). unsubscribe(Session, TopicTable) -> - gen_server2:cast(Session, {unsubscribe, self(), TopicTable}). + gen_server:cast(Session, {unsubscribe, self(), TopicTable}). %% @doc Resume the session -spec(resume(pid(), mqtt_client_id(), pid()) -> ok). resume(Session, ClientId, ClientPid) -> - gen_server2:cast(Session, {resume, ClientId, ClientPid}). + gen_server:cast(Session, {resume, ClientId, ClientPid}). %% @doc Get session state state(Session) when is_pid(Session) -> - gen_server2:call(Session, state). + gen_server:call(Session, state). %% @doc Get session info -spec(info(pid() | #state{}) -> list(tuple())). info(Session) when is_pid(Session) -> - gen_server2:call(Session, info); + gen_server:call(Session, info); info(State) when is_record(State, state) -> ?record_to_proplist(state, State, ?INFO_KEYS). -spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})). stats(Session) when is_pid(Session) -> - gen_server2:call(Session, stats); + gen_server:call(Session, stats); stats(#state{max_subscriptions = MaxSubscriptions, subscriptions = Subscriptions, @@ -272,7 +272,7 @@ stats(#state{max_subscriptions = MaxSubscriptions, %% @doc Destroy the session -spec(destroy(pid(), mqtt_client_id()) -> ok). destroy(Session, ClientId) -> - gen_server2:cast(Session, {destroy, ClientId}). + gen_server:cast(Session, {destroy, ClientId}). %%-------------------------------------------------------------------- %% gen_server Callbacks @@ -311,7 +311,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> created_at = os:timestamp()}, emqx_sm:register_session(ClientId, CleanSess, info(State)), emqx_hooks:run('session.created', [ClientId, Username]), - {ok, emit_stats(State), hibernate, {backoff, 1000, 1000, 10000}}. + {ok, emit_stats(State), hibernate}. init_stats(Keys) -> lists:foreach(fun(K) -> put(K, 0) end, Keys). @@ -319,30 +319,6 @@ init_stats(Keys) -> binding(ClientPid) -> case node(ClientPid) =:= node() of true -> local; false -> remote end. -prioritise_call(Msg, _From, _Len, _State) -> - case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end. - -prioritise_cast(Msg, _Len, _State) -> - case Msg of - {destroy, _} -> 10; - {resume, _, _} -> 9; - {pubrel, _} -> 8; - {pubcomp, _} -> 8; - {pubrec, _} -> 8; - {puback, _} -> 7; - {unsubscribe, _, _} -> 6; - {subscribe, _, _} -> 5; - _ -> 0 - end. - -prioritise_info(Msg, _Len, _State) -> - case Msg of - {'EXIT', _, _} -> 10; - {timeout, _, _} -> 5; - {dispatch, _, _} -> 1; - _ -> 0 - end. - handle_pre_hibernate(State) -> {hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}. @@ -406,7 +382,7 @@ handle_cast({subscribe, From, TopicTable, AckFun}, {[NewQos|QosAcc], SubMap1} end, {[], Subscriptions}, TopicTable), AckFun(lists:reverse(GrantedQos)), - hibernate(emit_stats(State#state{subscriptions = Subscriptions1})); + {noreply, emit_stats(State#state{subscriptions = Subscriptions1}), hibernate}; handle_cast({unsubscribe, From, TopicTable}, State = #state{client_id = ClientId, @@ -428,7 +404,7 @@ handle_cast({unsubscribe, From, TopicTable}, SubMap end end, Subscriptions, TopicTable), - hibernate(emit_stats(State#state{subscriptions = Subscriptions1})); + {noreply, emit_stats(State#state{subscriptions = Subscriptions1}), hibernate}; %% PUBACK: handle_cast({puback, PacketId}, State = #state{inflight = Inflight}) -> @@ -525,7 +501,7 @@ handle_cast({resume, ClientId, ClientPid}, end, %% Replay delivery and Dequeue pending messages - hibernate(emit_stats(dequeue(retry_delivery(true, State1)))); + {noreply, emit_stats(dequeue(retry_delivery(true, State1)))}; handle_cast({destroy, ClientId}, State = #state{client_id = ClientId, client_pid = undefined}) -> @@ -543,21 +519,21 @@ handle_cast(Msg, State) -> %% Ignore Messages delivered by self handle_info({dispatch, _Topic, #mqtt_message{from = {ClientId, _}}}, State = #state{client_id = ClientId, ignore_loop_deliver = true}) -> - hibernate(State); + {noreply, State}; %% Dispatch Message handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) -> - hibernate(gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))); + {noreply, gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))}; %% Do nothing if the client has been disconnected. handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) -> - hibernate(emit_stats(State#state{retry_timer = undefined})); + {noreply, emit_stats(State#state{retry_timer = undefined})}; handle_info({timeout, _Timer, retry_delivery}, State) -> - hibernate(emit_stats(retry_delivery(false, State#state{retry_timer = undefined}))); + {noreply, emit_stats(retry_delivery(false, State#state{retry_timer = undefined}))}; handle_info({timeout, _Timer, check_awaiting_rel}, State) -> - hibernate(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined}))); + {noreply, expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined}))}; handle_info({timeout, _Timer, expired}, State) -> ?LOG(info, "Expired, shutdown now.", [], State), @@ -574,17 +550,17 @@ handle_info({'EXIT', ClientPid, Reason}, ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State), ExpireTimer = start_timer(Interval, expired), State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer}, - hibernate(emit_stats(State1)); + {noreply, emit_stats(State1), hibernate}; handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) -> %%ignore - hibernate(State); + {noreply, State, hibernate}; handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) -> ?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", [ClientPid, Pid, Reason], State), - hibernate(State); + {noreply, State, hibernate}; handle_info(Info, Session) -> ?UNEXPECTED_INFO(Info, Session). @@ -857,9 +833,6 @@ inc_stats(Key) -> put(Key, get(Key) + 1). reply(Reply, State) -> {reply, Reply, State, hibernate}. -hibernate(State) -> - {noreply, State, hibernate}. - shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index aca1f77f1..d75f68984 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -16,9 +16,7 @@ -module(emqx_sm). --author("Feng Lee "). - --behaviour(gen_server2). +-behaviour(gen_server). -include("emqx.hrl"). @@ -44,9 +42,6 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% gen_server2 priorities --export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). - -record(state, {pool, id, monitors}). -define(POOL, ?MODULE). @@ -78,7 +73,7 @@ mnesia(copy) -> %% @doc Start a session manager -spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id) -> - gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []). + gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []). %% @doc Start a session -spec(start_session(boolean(), {binary(), binary() | undefined}) -> {ok, pid(), boolean()} | {error, term()}). @@ -123,7 +118,7 @@ dispatch(ClientId, Topic, Msg) -> end. call(SM, Req) -> - gen_server2:call(SM, Req, ?TIMEOUT). %%infinity). + gen_server:call(SM, Req, ?TIMEOUT). %%infinity). %% @doc for debug. local_sessions() -> @@ -137,15 +132,6 @@ init([Pool, Id]) -> ?GPROC_POOL(join, Pool, Id), {ok, #state{pool = Pool, id = Id, monitors = dict:new()}}. -prioritise_call(_Msg, _From, _Len, _State) -> - 1. - -prioritise_cast(_Msg, _Len, _State) -> - 0. - -prioritise_info(_Msg, _Len, _State) -> - 2. - %% Persistent Session handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, State) -> case lookup_session(ClientId) of diff --git a/src/emqx_ws_client.erl b/src/emqx_ws_client.erl index bac0f48d3..b5f8d7b51 100644 --- a/src/emqx_ws_client.erl +++ b/src/emqx_ws_client.erl @@ -18,7 +18,7 @@ -module(emqx_ws_client). --behaviour(gen_server2). +-behaviour(gen_server). -author("Feng Lee "). @@ -46,8 +46,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% gen_server2 Callbacks --export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]). +%% TODO: remove ... +-export([handle_pre_hibernate/1]). %% WebSocket Client State -record(wsclient_state, {ws_pid, peername, connection, proto_state, keepalive, @@ -61,17 +61,17 @@ %% @doc Start WebSocket Client. start_link(Env, WsPid, Req, ReplyChannel) -> - gen_server2:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel], - [{spawn_opt, ?FULLSWEEP_OPTS}]). %% Tune GC. + gen_server:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel], + [[{hibernate_after, 10000}]]). info(CPid) -> - gen_server2:call(CPid, info). + gen_server:call(CPid, info). stats(CPid) -> - gen_server2:call(CPid, stats). + gen_server:call(CPid, stats). kick(CPid) -> - gen_server2:call(CPid, kick). + gen_server:call(CPid, kick). subscribe(CPid, TopicTable) -> CPid ! {subscribe, TopicTable}. @@ -80,10 +80,10 @@ unsubscribe(CPid, Topics) -> CPid ! {unsubscribe, Topics}. session(CPid) -> - gen_server2:call(CPid, session). + gen_server:call(CPid, session). clean_acl_cache(CPid, Topic) -> - gen_server2:call(CPid, {clean_acl_cache, Topic}). + gen_server:call(CPid, {clean_acl_cache, Topic}). %%-------------------------------------------------------------------- %% gen_server Callbacks @@ -116,12 +116,6 @@ init([Env, WsPid, Req, ReplyChannel]) -> exit({shutdown, Reason}) end. -prioritise_call(Msg, _From, _Len, _State) -> - case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end. - -prioritise_info(Msg, _Len, _State) -> - case Msg of {redeliver, _} -> 5; _ -> 0 end. - handle_pre_hibernate(State = #wsclient_state{ws_pid = WsPid}) -> erlang:garbage_collect(WsPid), {hibernate, emqx_gc:reset_conn_gc_count(#wsclient_state.force_gc_count, emit_stats(State))}. diff --git a/src/gen_server2.erl b/src/gen_server2.erl deleted file mode 100644 index d5127f8d6..000000000 --- a/src/gen_server2.erl +++ /dev/null @@ -1,1361 +0,0 @@ -%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP -%% distribution, with the following modifications: -%% -%% 1) the module name is gen_server2 -%% -%% 2) more efficient handling of selective receives in callbacks -%% gen_server2 processes drain their message queue into an internal -%% buffer before invoking any callback module functions. Messages are -%% dequeued from the buffer for processing. Thus the effective message -%% queue of a gen_server2 process is the concatenation of the internal -%% buffer and the real message queue. -%% As a result of the draining, any selective receive invoked inside a -%% callback is less likely to have to scan a large message queue. -%% -%% 3) gen_server2:cast is guaranteed to be order-preserving -%% The original code could reorder messages when communicating with a -%% process on a remote node that was not currently connected. -%% -%% 4) The callback module can optionally implement prioritise_call/4, -%% prioritise_cast/3 and prioritise_info/3. These functions take -%% Message, From, Length and State or just Message, Length and State -%% (where Length is the current number of messages waiting to be -%% processed) and return a single integer representing the priority -%% attached to the message, or 'drop' to ignore it (for -%% prioritise_cast/3 and prioritise_info/3 only). Messages with -%% higher priorities are processed before requests with lower -%% priorities. The default priority is 0. -%% -%% 5) The callback module can optionally implement -%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be -%% called immediately prior to and post hibernation, respectively. If -%% handle_pre_hibernate returns {hibernate, NewState} then the process -%% will hibernate. If the module does not implement -%% handle_pre_hibernate/1 then the default action is to hibernate. -%% -%% 6) init can return a 4th arg, {backoff, InitialTimeout, -%% MinimumTimeout, DesiredHibernatePeriod} (all in milliseconds, -%% 'infinity' does not make sense here). Then, on all callbacks which -%% can return a timeout (including init), timeout can be -%% 'hibernate'. When this is the case, the current timeout value will -%% be used (initially, the InitialTimeout supplied from init). After -%% this timeout has occurred, hibernation will occur as normal. Upon -%% awaking, a new current timeout value will be calculated. -%% -%% The purpose is that the gen_server2 takes care of adjusting the -%% current timeout value such that the process will increase the -%% timeout value repeatedly if it is unable to sleep for the -%% DesiredHibernatePeriod. If it is able to sleep for the -%% DesiredHibernatePeriod it will decrease the current timeout down to -%% the MinimumTimeout, so that the process is put to sleep sooner (and -%% hopefully stays asleep for longer). In short, should a process -%% using this receive a burst of messages, it should not hibernate -%% between those messages, but as the messages become less frequent, -%% the process will not only hibernate, it will do so sooner after -%% each message. -%% -%% When using this backoff mechanism, normal timeout values (i.e. not -%% 'hibernate') can still be used, and if they are used then the -%% handle_info(timeout, State) will be called as normal. In this case, -%% returning 'hibernate' from handle_info(timeout, State) will not -%% hibernate the process immediately, as it would if backoff wasn't -%% being used. Instead it'll wait for the current timeout as described -%% above. -%% -%% 7) The callback module can return from any of the handle_* -%% functions, a {become, Module, State} triple, or a {become, Module, -%% State, Timeout} quadruple. This allows the gen_server to -%% dynamically change the callback module. The State is the new state -%% which will be passed into any of the callback functions in the new -%% module. Note there is no form also encompassing a reply, thus if -%% you wish to reply in handle_call/3 and change the callback module, -%% you need to use gen_server2:reply/2 to issue the reply -%% manually. The init function can similarly return a 5th argument, -%% Module, in order to dynamically decide the callback module on init. -%% -%% 8) The callback module can optionally implement -%% format_message_queue/2 which is the equivalent of format_status/2 -%% but where the second argument is specifically the priority_queue -%% which contains the prioritised message_queue. -%% -%% 9) The function with_state/2 can be used to debug a process with -%% heavyweight state (without needing to copy the entire state out of -%% process as sys:get_status/1 would). Pass through a function which -%% can be invoked on the state, get back the result. The state is not -%% modified. -%% -%% 10) an mcall/1 function has been added for performing multiple -%% call/3 in parallel. Unlike multi_call, which sends the same request -%% to same-named processes residing on a supplied list of nodes, it -%% operates on name/request pairs, where name is anything accepted by -%% call/3, i.e. a pid, global name, local name, or local name on a -%% particular node. -%% - -%% All modifications are (C) 2009-2013 GoPivotal, Inc. - -%% ``The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved via the world wide web at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. -%% -%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. -%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings -%% AB. All Rights Reserved.'' -%% -%% $Id$ -%% --module(gen_server2). - -%%% --------------------------------------------------- -%%% -%%% The idea behind THIS server is that the user module -%%% provides (different) functions to handle different -%%% kind of inputs. -%%% If the Parent process terminates the Module:terminate/2 -%%% function is called. -%%% -%%% The user module should export: -%%% -%%% init(Args) -%%% ==> {ok, State} -%%% {ok, State, Timeout} -%%% {ok, State, Timeout, Backoff} -%%% {ok, State, Timeout, Backoff, Module} -%%% ignore -%%% {stop, Reason} -%%% -%%% handle_call(Msg, {From, Tag}, State) -%%% -%%% ==> {reply, Reply, State} -%%% {reply, Reply, State, Timeout} -%%% {noreply, State} -%%% {noreply, State, Timeout} -%%% {stop, Reason, Reply, State} -%%% Reason = normal | shutdown | Term terminate(State) is called -%%% -%%% handle_cast(Msg, State) -%%% -%%% ==> {noreply, State} -%%% {noreply, State, Timeout} -%%% {stop, Reason, State} -%%% Reason = normal | shutdown | Term terminate(State) is called -%%% -%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ... -%%% -%%% ==> {noreply, State} -%%% {noreply, State, Timeout} -%%% {stop, Reason, State} -%%% Reason = normal | shutdown | Term, terminate(State) is called -%%% -%%% terminate(Reason, State) Let the user module clean up -%%% Reason = normal | shutdown | {shutdown, Term} | Term -%%% always called when server terminates -%%% -%%% ==> ok | Term -%%% -%%% handle_pre_hibernate(State) -%%% -%%% ==> {hibernate, State} -%%% {stop, Reason, State} -%%% Reason = normal | shutdown | Term, terminate(State) is called -%%% -%%% handle_post_hibernate(State) -%%% -%%% ==> {noreply, State} -%%% {stop, Reason, State} -%%% Reason = normal | shutdown | Term, terminate(State) is called -%%% -%%% The work flow (of the server) can be described as follows: -%%% -%%% User module Generic -%%% ----------- ------- -%%% start -----> start -%%% init <----- . -%%% -%%% loop -%%% handle_call <----- . -%%% -----> reply -%%% -%%% handle_cast <----- . -%%% -%%% handle_info <----- . -%%% -%%% terminate <----- . -%%% -%%% -----> reply -%%% -%%% -%%% --------------------------------------------------- - -%% API --export([start/3, start/4, - start_link/3, start_link/4, - call/2, call/3, - cast/2, reply/2, - abcast/2, abcast/3, - multi_call/2, multi_call/3, multi_call/4, - mcall/1, - with_state/2, - enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]). - -%% System exports --export([system_continue/3, - system_terminate/4, - system_code_change/4, - format_status/2]). - -%% Internal exports --export([init_it/6]). - --import(error_logger, [format/2]). - -%% State record --record(gs2_state, {parent, name, state, mod, time, - timeout_state, queue, debug, prioritisers}). - --ifdef(use_specs). - -%%%========================================================================= -%%% Specs. These exist only to shut up dialyzer's warnings -%%%========================================================================= - --type(gs2_state() :: #gs2_state{}). - --spec(handle_common_termination/3 :: - (any(), atom(), gs2_state()) -> no_return()). --spec(hibernate/1 :: (gs2_state()) -> no_return()). --spec(pre_hibernate/1 :: (gs2_state()) -> no_return()). --spec(system_terminate/4 :: (_, _, _, gs2_state()) -> no_return()). - --type(millis() :: non_neg_integer()). - -%%%========================================================================= -%%% API -%%%========================================================================= - --callback init(Args :: term()) -> - {ok, State :: term()} | - {ok, State :: term(), timeout() | hibernate} | - {ok, State :: term(), timeout() | hibernate, - {backoff, millis(), millis(), millis()}} | - {ok, State :: term(), timeout() | hibernate, - {backoff, millis(), millis(), millis()}, atom()} | - ignore | - {stop, Reason :: term()}. --callback handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: term()) -> - {reply, Reply :: term(), NewState :: term()} | - {reply, Reply :: term(), NewState :: term(), timeout() | hibernate} | - {noreply, NewState :: term()} | - {noreply, NewState :: term(), timeout() | hibernate} | - {stop, Reason :: term(), - Reply :: term(), NewState :: term()}. --callback handle_cast(Request :: term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), timeout() | hibernate} | - {stop, Reason :: term(), NewState :: term()}. --callback handle_info(Info :: term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), timeout() | hibernate} | - {stop, Reason :: term(), NewState :: term()}. --callback terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: term()) -> - ok | term(). --callback code_change(OldVsn :: (term() | {down, term()}), State :: term(), - Extra :: term()) -> - {ok, NewState :: term()} | {error, Reason :: term()}. - -%% It's not possible to define "optional" -callbacks, so putting specs -%% for handle_pre_hibernate/1 and handle_post_hibernate/1 will result -%% in warnings (the same applied for the behaviour_info before). - --else. - --export([behaviour_info/1]). - -behaviour_info(callbacks) -> - [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2}, - {terminate,2},{code_change,3}]; -behaviour_info(_Other) -> - undefined. - --endif. - -%%% ----------------------------------------------------------------- -%%% Starts a generic server. -%%% start(Mod, Args, Options) -%%% start(Name, Mod, Args, Options) -%%% start_link(Mod, Args, Options) -%%% start_link(Name, Mod, Args, Options) where: -%%% Name ::= {local, atom()} | {global, atom()} -%%% Mod ::= atom(), callback module implementing the 'real' server -%%% Args ::= term(), init arguments (to Mod:init/1) -%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}] -%%% Flag ::= trace | log | {logfile, File} | statistics | debug -%%% (debug == log && statistics) -%%% Returns: {ok, Pid} | -%%% {error, {already_started, Pid}} | -%%% {error, Reason} -%%% ----------------------------------------------------------------- -start(Mod, Args, Options) -> - gen:start(?MODULE, nolink, Mod, Args, Options). - -start(Name, Mod, Args, Options) -> - gen:start(?MODULE, nolink, Name, Mod, Args, Options). - -start_link(Mod, Args, Options) -> - gen:start(?MODULE, link, Mod, Args, Options). - -start_link(Name, Mod, Args, Options) -> - gen:start(?MODULE, link, Name, Mod, Args, Options). - - -%% ----------------------------------------------------------------- -%% Make a call to a generic server. -%% If the server is located at another node, that node will -%% be monitored. -%% If the client is trapping exits and is linked server termination -%% is handled here (? Shall we do that here (or rely on timeouts) ?). -%% ----------------------------------------------------------------- -call(Name, Request) -> - case catch gen:call(Name, '$gen_call', Request) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request]}}) - end. - -call(Name, Request, Timeout) -> - case catch gen:call(Name, '$gen_call', Request, Timeout) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) - end. - -%% ----------------------------------------------------------------- -%% Make a cast to a generic server. -%% ----------------------------------------------------------------- -cast({global,Name}, Request) -> - catch global:send(Name, cast_msg(Request)), - ok; -cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> - do_cast(Dest, Request); -cast(Dest, Request) when is_atom(Dest) -> - do_cast(Dest, Request); -cast(Dest, Request) when is_pid(Dest) -> - do_cast(Dest, Request). - -do_cast(Dest, Request) -> - do_send(Dest, cast_msg(Request)), - ok. - -cast_msg(Request) -> {'$gen_cast',Request}. - -%% ----------------------------------------------------------------- -%% Send a reply to the client. -%% ----------------------------------------------------------------- -reply({To, Tag}, Reply) -> - catch To ! {Tag, Reply}. - -%% ----------------------------------------------------------------- -%% Asyncronous broadcast, returns nothing, it's just send'n pray -%% ----------------------------------------------------------------- -abcast(Name, Request) when is_atom(Name) -> - do_abcast([node() | nodes()], Name, cast_msg(Request)). - -abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) -> - do_abcast(Nodes, Name, cast_msg(Request)). - -do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) -> - do_send({Name,Node},Msg), - do_abcast(Nodes, Name, Msg); -do_abcast([], _,_) -> abcast. - -%%% ----------------------------------------------------------------- -%%% Make a call to servers at several nodes. -%%% Returns: {[Replies],[BadNodes]} -%%% A Timeout can be given -%%% -%%% A middleman process is used in case late answers arrives after -%%% the timeout. If they would be allowed to glog the callers message -%%% queue, it would probably become confused. Late answers will -%%% now arrive to the terminated middleman and so be discarded. -%%% ----------------------------------------------------------------- -multi_call(Name, Req) - when is_atom(Name) -> - do_multi_call([node() | nodes()], Name, Req, infinity). - -multi_call(Nodes, Name, Req) - when is_list(Nodes), is_atom(Name) -> - do_multi_call(Nodes, Name, Req, infinity). - -multi_call(Nodes, Name, Req, infinity) -> - do_multi_call(Nodes, Name, Req, infinity); -multi_call(Nodes, Name, Req, Timeout) - when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> - do_multi_call(Nodes, Name, Req, Timeout). - -%%% ----------------------------------------------------------------- -%%% Make multiple calls to multiple servers, given pairs of servers -%%% and messages. -%%% Returns: {[{Dest, Reply}], [{Dest, Error}]} -%%% -%%% Dest can be pid() | RegName :: atom() | -%%% {Name :: atom(), Node :: atom()} | {global, Name :: atom()} -%%% -%%% A middleman process is used to avoid clogging up the callers -%%% message queue. -%%% ----------------------------------------------------------------- -mcall(CallSpecs) -> - Tag = make_ref(), - {_, MRef} = spawn_monitor( - fun() -> - Refs = lists:foldl( - fun ({Dest, _Request}=S, Dict) -> - dict:store(do_mcall(S), Dest, Dict) - end, dict:new(), CallSpecs), - collect_replies(Tag, Refs, [], []) - end), - receive - {'DOWN', MRef, _, _, {Tag, Result}} -> Result; - {'DOWN', MRef, _, _, Reason} -> exit(Reason) - end. - -do_mcall({{global,Name}=Dest, Request}) -> - %% whereis_name is simply an ets lookup, and is precisely what - %% global:send/2 does, yet we need a Ref to put in the call to the - %% server, so invoking whereis_name makes a lot more sense here. - case global:whereis_name(Name) of - Pid when is_pid(Pid) -> - MRef = erlang:monitor(process, Pid), - catch msend(Pid, MRef, Request), - MRef; - undefined -> - Ref = make_ref(), - self() ! {'DOWN', Ref, process, Dest, noproc}, - Ref - end; -do_mcall({{Name,Node}=Dest, Request}) when is_atom(Name), is_atom(Node) -> - {_Node, MRef} = start_monitor(Node, Name), %% NB: we don't handle R6 - catch msend(Dest, MRef, Request), - MRef; -do_mcall({Dest, Request}) when is_atom(Dest); is_pid(Dest) -> - MRef = erlang:monitor(process, Dest), - catch msend(Dest, MRef, Request), - MRef. - -msend(Dest, MRef, Request) -> - erlang:send(Dest, {'$gen_call', {self(), MRef}, Request}, [noconnect]). - -collect_replies(Tag, Refs, Replies, Errors) -> - case dict:size(Refs) of - 0 -> exit({Tag, {Replies, Errors}}); - _ -> receive - {MRef, Reply} -> - {Refs1, Replies1} = handle_call_result(MRef, Reply, - Refs, Replies), - collect_replies(Tag, Refs1, Replies1, Errors); - {'DOWN', MRef, _, _, Reason} -> - Reason1 = case Reason of - noconnection -> nodedown; - _ -> Reason - end, - {Refs1, Errors1} = handle_call_result(MRef, Reason1, - Refs, Errors), - collect_replies(Tag, Refs1, Replies, Errors1) - end - end. - -handle_call_result(MRef, Result, Refs, AccList) -> - %% we avoid the mailbox scanning cost of a call to erlang:demonitor/{1,2} - %% here, so we must cope with MRefs that we've already seen and erased - case dict:find(MRef, Refs) of - {ok, Pid} -> {dict:erase(MRef, Refs), [{Pid, Result}|AccList]}; - _ -> {Refs, AccList} - end. - -%% ----------------------------------------------------------------- -%% Apply a function to a generic server's state. -%% ----------------------------------------------------------------- -with_state(Name, Fun) -> - case catch gen:call(Name, '$with_state', Fun, infinity) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, with_state, [Name, Fun]}}) - end. - -%%----------------------------------------------------------------- -%% enter_loop(Mod, Options, State, , , ) ->_ -%% -%% Description: Makes an existing process into a gen_server. -%% The calling process will enter the gen_server receive -%% loop and become a gen_server process. -%% The process *must* have been started using one of the -%% start functions in proc_lib, see proc_lib(3). -%% The user is responsible for any initialization of the -%% process, including registering a name for it. -%%----------------------------------------------------------------- -enter_loop(Mod, Options, State) -> - enter_loop(Mod, Options, State, self(), infinity, undefined). - -enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) -> - enter_loop(Mod, Options, State, self(), infinity, Backoff); - -enter_loop(Mod, Options, State, ServerName = {_, _}) -> - enter_loop(Mod, Options, State, ServerName, infinity, undefined); - -enter_loop(Mod, Options, State, Timeout) -> - enter_loop(Mod, Options, State, self(), Timeout, undefined). - -enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) -> - enter_loop(Mod, Options, State, ServerName, infinity, Backoff); - -enter_loop(Mod, Options, State, ServerName, Timeout) -> - enter_loop(Mod, Options, State, ServerName, Timeout, undefined). - -enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> - Name = get_proc_name(ServerName), - Parent = get_parent(), - Debug = debug_options(Name, Options), - Queue = priority_queue:new(), - Backoff1 = extend_backoff(Backoff), - loop(find_prioritisers( - #gs2_state { parent = Parent, name = Name, state = State, - mod = Mod, time = Timeout, timeout_state = Backoff1, - queue = Queue, debug = Debug })). - -%%%======================================================================== -%%% Gen-callback functions -%%%======================================================================== - -%%% --------------------------------------------------- -%%% Initiate the new process. -%%% Register the name using the Rfunc function -%%% Calls the Mod:init/Args function. -%%% Finally an acknowledge is sent to Parent and the main -%%% loop is entered. -%%% --------------------------------------------------- -init_it(Starter, self, Name, Mod, Args, Options) -> - init_it(Starter, self(), Name, Mod, Args, Options); -init_it(Starter, Parent, Name0, Mod, Args, Options) -> - Name = name(Name0), - Debug = debug_options(Name, Options), - Queue = priority_queue:new(), - GS2State = find_prioritisers( - #gs2_state { parent = Parent, - name = Name, - mod = Mod, - queue = Queue, - debug = Debug }), - case catch Mod:init(Args) of - {ok, State} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(GS2State #gs2_state { state = State, - time = infinity, - timeout_state = undefined }); - {ok, State, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(GS2State #gs2_state { state = State, - time = Timeout, - timeout_state = undefined }); - {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> - Backoff1 = extend_backoff(Backoff), - proc_lib:init_ack(Starter, {ok, self()}), - loop(GS2State #gs2_state { state = State, - time = Timeout, - timeout_state = Backoff1 }); - {ok, State, Timeout, Backoff = {backoff, _, _, _}, Mod1} -> - Backoff1 = extend_backoff(Backoff), - proc_lib:init_ack(Starter, {ok, self()}), - loop(find_prioritisers( - GS2State #gs2_state { mod = Mod1, - state = State, - time = Timeout, - timeout_state = Backoff1 })); - {stop, Reason} -> - %% For consistency, we must make sure that the - %% registered name (if any) is unregistered before - %% the parent process is notified about the failure. - %% (Otherwise, the parent process could get - %% an 'already_started' error if it immediately - %% tried starting the process again.) - unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - ignore -> - unregister_name(Name0), - proc_lib:init_ack(Starter, ignore), - exit(normal); - {'EXIT', Reason} -> - unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - Else -> - Error = {bad_return_value, Else}, - proc_lib:init_ack(Starter, {error, Error}), - exit(Error) - end. - -name({local,Name}) -> Name; -name({global,Name}) -> Name; -%% name(Pid) when is_pid(Pid) -> Pid; -%% when R12 goes away, drop the line beneath and uncomment the line above -name(Name) -> Name. - -unregister_name({local,Name}) -> - _ = (catch unregister(Name)); -unregister_name({global,Name}) -> - _ = global:unregister_name(Name); -unregister_name(Pid) when is_pid(Pid) -> - Pid; -%% Under R12 let's just ignore it, as we have a single term as Name. -%% On R13 it will never get here, as we get tuple with 'local/global' atom. -unregister_name(_Name) -> ok. - -extend_backoff(undefined) -> - undefined; -extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> - {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, rand:seed(exsplus)}. - -%%%======================================================================== -%%% Internal functions -%%%======================================================================== -%%% --------------------------------------------------- -%%% The MAIN loop. -%%% --------------------------------------------------- -loop(GS2State = #gs2_state { time = hibernate, - timeout_state = undefined, - queue = Queue }) -> - case priority_queue:is_empty(Queue) of - true -> - pre_hibernate(GS2State); - false -> - process_next_msg(GS2State) - end; - -loop(GS2State) -> - process_next_msg(drain(GS2State)). - -drain(GS2State) -> - receive - Input -> drain(in(Input, GS2State)) - after 0 -> GS2State - end. - -process_next_msg(GS2State = #gs2_state { time = Time, - timeout_state = TimeoutState, - queue = Queue }) -> - case priority_queue:out(Queue) of - {{value, Msg}, Queue1} -> - process_msg(Msg, GS2State #gs2_state { queue = Queue1 }); - {empty, Queue1} -> - {Time1, HibOnTimeout} - = case {Time, TimeoutState} of - {hibernate, {backoff, Current, _Min, _Desired, _RSt}} -> - {Current, true}; - {hibernate, _} -> - %% wake_hib/7 will set Time to hibernate. If - %% we were woken and didn't receive a msg - %% then we will get here and need a sensible - %% value for Time1, otherwise we crash. - %% R13B1 always waits infinitely when waking - %% from hibernation, so that's what we do - %% here too. - {infinity, false}; - _ -> {Time, false} - end, - receive - Input -> - %% Time could be 'hibernate' here, so *don't* call loop - process_next_msg( - drain(in(Input, GS2State #gs2_state { queue = Queue1 }))) - after Time1 -> - case HibOnTimeout of - true -> - pre_hibernate( - GS2State #gs2_state { queue = Queue1 }); - false -> - process_msg(timeout, - GS2State #gs2_state { queue = Queue1 }) - end - end - end. - -wake_hib(GS2State = #gs2_state { timeout_state = TS }) -> - TimeoutState1 = case TS of - undefined -> - undefined; - {SleptAt, TimeoutState} -> - adjust_timeout_state(SleptAt, - erlang:monotonic_time(), - TimeoutState) - end, - post_hibernate( - drain(GS2State #gs2_state { timeout_state = TimeoutState1 })). - -hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) -> - TS = case TimeoutState of - undefined -> undefined; - {backoff, _, _, _, _} -> {erlang:monotonic_time(), - TimeoutState} - end, - proc_lib:hibernate(?MODULE, wake_hib, - [GS2State #gs2_state { timeout_state = TS }]). - -pre_hibernate(GS2State = #gs2_state { state = State, - mod = Mod }) -> - case erlang:function_exported(Mod, handle_pre_hibernate, 1) of - true -> - case catch Mod:handle_pre_hibernate(State) of - {hibernate, NState} -> - hibernate(GS2State #gs2_state { state = NState } ); - Reply -> - handle_common_termination(Reply, pre_hibernate, GS2State) - end; - false -> - hibernate(GS2State) - end. - -post_hibernate(GS2State = #gs2_state { state = State, - mod = Mod }) -> - case erlang:function_exported(Mod, handle_post_hibernate, 1) of - true -> - case catch Mod:handle_post_hibernate(State) of - {noreply, NState} -> - process_next_msg(GS2State #gs2_state { state = NState, - time = infinity }); - {noreply, NState, Time} -> - process_next_msg(GS2State #gs2_state { state = NState, - time = Time }); - Reply -> - handle_common_termination(Reply, post_hibernate, GS2State) - end; - false -> - %% use hibernate here, not infinity. This matches - %% R13B. The key is that we should be able to get through - %% to process_msg calling sys:handle_system_msg with Time - %% still set to hibernate, iff that msg is the very msg - %% that woke us up (or the first msg we receive after - %% waking up). - process_next_msg(GS2State #gs2_state { time = hibernate }) - end. - -adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, - DesiredHibPeriod, RandomState}) -> - NapLengthMicros = erlang:convert_time_unit(AwokeAt - SleptAt, - native, micro_seconds), - CurrentMicros = CurrentTO * 1000, - MinimumMicros = MinimumTO * 1000, - DesiredHibMicros = DesiredHibPeriod * 1000, - GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros, - Base = - %% If enough time has passed between the last two messages then we - %% should consider sleeping sooner. Otherwise stay awake longer. - case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of - true -> lists:max([MinimumTO, CurrentTO div 2]); - false -> CurrentTO - end, - {Extra, RandomState1} = rand:uniform_s(Base, RandomState), - CurrentTO1 = Base + Extra, - {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. - -in({'$gen_cast', Msg} = Input, - GS2State = #gs2_state { prioritisers = {_, F, _} }) -> - in(Input, F(Msg, GS2State), GS2State); -in({'$gen_call', From, Msg} = Input, - GS2State = #gs2_state { prioritisers = {F, _, _} }) -> - in(Input, F(Msg, From, GS2State), GS2State); -in({'$with_state', _From, _Fun} = Input, GS2State) -> - in(Input, 0, GS2State); -in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) -> - in(Input, infinity, GS2State); -in({system, _From, _Req} = Input, GS2State) -> - in(Input, infinity, GS2State); -in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) -> - in(Input, F(Input, GS2State), GS2State). - -in(_Input, drop, GS2State) -> - GS2State; - -in(Input, Priority, GS2State = #gs2_state { queue = Queue }) -> - GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }. - -process_msg({system, From, Req}, - GS2State = #gs2_state { parent = Parent, debug = Debug }) -> - %% gen_server puts Hib on the end as the 7th arg, but that version - %% of the fun seems not to be documented so leaving out for now. - sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State); -process_msg({'$with_state', From, Fun}, - GS2State = #gs2_state{state = State}) -> - reply(From, catch Fun(State)), - loop(GS2State); -process_msg({'EXIT', Parent, Reason} = Msg, - GS2State = #gs2_state { parent = Parent }) -> - terminate(Reason, Msg, GS2State); -process_msg(Msg, GS2State = #gs2_state { debug = [] }) -> - handle_msg(Msg, GS2State); -process_msg(Msg, GS2State = #gs2_state { name = Name, debug = Debug }) -> - Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {in, Msg}), - handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }). - -%%% --------------------------------------------------- -%%% Send/recive functions -%%% --------------------------------------------------- -do_send(Dest, Msg) -> - catch erlang:send(Dest, Msg). - -do_multi_call(Nodes, Name, Req, infinity) -> - Tag = make_ref(), - Monitors = send_nodes(Nodes, Name, Tag, Req), - rec_nodes(Tag, Monitors, Name, undefined); -do_multi_call(Nodes, Name, Req, Timeout) -> - Tag = make_ref(), - Caller = self(), - Receiver = - spawn( - fun () -> - %% Middleman process. Should be unsensitive to regular - %% exit signals. The sychronization is needed in case - %% the receiver would exit before the caller started - %% the monitor. - process_flag(trap_exit, true), - Mref = erlang:monitor(process, Caller), - receive - {Caller,Tag} -> - Monitors = send_nodes(Nodes, Name, Tag, Req), - TimerId = erlang:start_timer(Timeout, self(), ok), - Result = rec_nodes(Tag, Monitors, Name, TimerId), - exit({self(),Tag,Result}); - {'DOWN',Mref,_,_,_} -> - %% Caller died before sending us the go-ahead. - %% Give up silently. - exit(normal) - end - end), - Mref = erlang:monitor(process, Receiver), - Receiver ! {self(),Tag}, - receive - {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> - Result; - {'DOWN',Mref,_,_,Reason} -> - %% The middleman code failed. Or someone did - %% exit(_, kill) on the middleman process => Reason==killed - exit(Reason) - end. - -send_nodes(Nodes, Name, Tag, Req) -> - send_nodes(Nodes, Name, Tag, Req, []). - -send_nodes([Node|Tail], Name, Tag, Req, Monitors) - when is_atom(Node) -> - Monitor = start_monitor(Node, Name), - %% Handle non-existing names in rec_nodes. - catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req}, - send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]); -send_nodes([_Node|Tail], Name, Tag, Req, Monitors) -> - %% Skip non-atom Node - send_nodes(Tail, Name, Tag, Req, Monitors); -send_nodes([], _Name, _Tag, _Req, Monitors) -> - Monitors. - -%% Against old nodes: -%% If no reply has been delivered within 2 secs. (per node) check that -%% the server really exists and wait for ever for the answer. -%% -%% Against contemporary nodes: -%% Wait for reply, server 'DOWN', or timeout from TimerId. - -rec_nodes(Tag, Nodes, Name, TimerId) -> - rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). - -rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) -> - receive - {'DOWN', R, _, _, _} -> - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], Time, TimerId); - {timeout, TimerId, _} -> - unmonitor(R), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) - end; -rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) -> - %% R6 node - receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], 2000, TimerId); - {timeout, TimerId, _} -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - after Time -> - case rpc:call(N, erlang, whereis, [Name]) of - Pid when is_pid(Pid) -> % It exists try again. - rec_nodes(Tag, [N|Tail], Name, Badnodes, - Replies, infinity, TimerId); - _ -> % badnode - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], - Replies, 2000, TimerId) - end - end; -rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> - case catch erlang:cancel_timer(TimerId) of - false -> % It has already sent it's message - receive - {timeout, TimerId, _} -> ok - after 0 -> - ok - end; - _ -> % Timer was cancelled, or TimerId was 'undefined' - ok - end, - {Replies, Badnodes}. - -%% Collect all replies that already have arrived -rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) -> - receive - {'DOWN', R, _, _, _} -> - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) - after 0 -> - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) - end; -rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) -> - %% R6 node - receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) - after 0 -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) - end; -rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> - {Replies, Badnodes}. - - -%%% --------------------------------------------------- -%%% Monitor functions -%%% --------------------------------------------------- - -start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> - if node() =:= nonode@nohost, Node =/= nonode@nohost -> - Ref = make_ref(), - self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, - {Node, Ref}; - true -> - case catch erlang:monitor(process, {Name, Node}) of - {'EXIT', _} -> - %% Remote node is R6 - monitor_node(Node, true), - Node; - Ref when is_reference(Ref) -> - {Node, Ref} - end - end. - -%% Cancels a monitor started with Ref=erlang:monitor(_, _). -unmonitor(Ref) when is_reference(Ref) -> - erlang:demonitor(Ref), - receive - {'DOWN', Ref, _, _, _} -> - true - after 0 -> - true - end. - -%%% --------------------------------------------------- -%%% Message handling functions -%%% --------------------------------------------------- - -dispatch({'$gen_cast', Msg}, Mod, State) -> - Mod:handle_cast(Msg, State); -dispatch(Info, Mod, State) -> - Mod:handle_info(Info, State). - -common_reply(_Name, From, Reply, _NState, [] = _Debug) -> - reply(From, Reply), - []; -common_reply(Name, {To, _Tag} = From, Reply, NState, Debug) -> - reply(From, Reply), - sys:handle_debug(Debug, fun print_event/3, Name, {out, Reply, To, NState}). - -common_noreply(_Name, _NState, [] = _Debug) -> - []; -common_noreply(Name, NState, Debug) -> - sys:handle_debug(Debug, fun print_event/3, Name, {noreply, NState}). - -common_become(_Name, _Mod, _NState, [] = _Debug) -> - []; -common_become(Name, Mod, NState, Debug) -> - sys:handle_debug(Debug, fun print_event/3, Name, {become, Mod, NState}). - -handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod, - state = State, - name = Name, - debug = Debug }) -> - case catch Mod:handle_call(Msg, From, State) of - {reply, Reply, NState} -> - Debug1 = common_reply(Name, From, Reply, NState, Debug), - loop(GS2State #gs2_state { state = NState, - time = infinity, - debug = Debug1 }); - {reply, Reply, NState, Time1} -> - Debug1 = common_reply(Name, From, Reply, NState, Debug), - loop(GS2State #gs2_state { state = NState, - time = Time1, - debug = Debug1}); - {stop, Reason, Reply, NState} -> - {'EXIT', R} = - (catch terminate(Reason, Msg, - GS2State #gs2_state { state = NState })), - common_reply(Name, From, Reply, NState, Debug), - exit(R); - Other -> - handle_common_reply(Other, Msg, GS2State) - end; -handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) -> - Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Msg, GS2State). - -handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name, - debug = Debug}) -> - case Reply of - {noreply, NState} -> - Debug1 = common_noreply(Name, NState, Debug), - loop(GS2State #gs2_state {state = NState, - time = infinity, - debug = Debug1}); - {noreply, NState, Time1} -> - Debug1 = common_noreply(Name, NState, Debug), - loop(GS2State #gs2_state {state = NState, - time = Time1, - debug = Debug1}); - {become, Mod, NState} -> - Debug1 = common_become(Name, Mod, NState, Debug), - loop(find_prioritisers( - GS2State #gs2_state { mod = Mod, - state = NState, - time = infinity, - debug = Debug1 })); - {become, Mod, NState, Time1} -> - Debug1 = common_become(Name, Mod, NState, Debug), - loop(find_prioritisers( - GS2State #gs2_state { mod = Mod, - state = NState, - time = Time1, - debug = Debug1 })); - _ -> - handle_common_termination(Reply, Msg, GS2State) - end. - -handle_common_termination(Reply, Msg, GS2State) -> - case Reply of - {stop, Reason, NState} -> - terminate(Reason, Msg, GS2State #gs2_state { state = NState }); - {'EXIT', What} -> - terminate(What, Msg, GS2State); - _ -> - terminate({bad_return_value, Reply}, Msg, GS2State) - end. - -%%----------------------------------------------------------------- -%% Callback functions for system messages handling. -%%----------------------------------------------------------------- -system_continue(Parent, Debug, GS2State) -> - loop(GS2State #gs2_state { parent = Parent, debug = Debug }). - -system_terminate(Reason, _Parent, Debug, GS2State) -> - terminate(Reason, [], GS2State #gs2_state { debug = Debug }). - -system_code_change(GS2State = #gs2_state { mod = Mod, - state = State }, - _Module, OldVsn, Extra) -> - case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> - NewGS2State = find_prioritisers( - GS2State #gs2_state { state = NewState }), - {ok, [NewGS2State]}; - Else -> - Else - end. - -%%----------------------------------------------------------------- -%% Format debug messages. Print them as the call-back module sees -%% them, not as the real erlang messages. Use trace for that. -%%----------------------------------------------------------------- -print_event(Dev, {in, Msg}, Name) -> - case Msg of - {'$gen_call', {From, _Tag}, Call} -> - io:format(Dev, "*DBG* ~p got call ~p from ~w~n", - [Name, Call, From]); - {'$gen_cast', Cast} -> - io:format(Dev, "*DBG* ~p got cast ~p~n", - [Name, Cast]); - _ -> - io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) - end; -print_event(Dev, {out, Msg, To, State}, Name) -> - io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", - [Name, Msg, To, State]); -print_event(Dev, {noreply, State}, Name) -> - io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]); -print_event(Dev, Event, Name) -> - io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]). - - -%%% --------------------------------------------------- -%%% Terminate the server. -%%% --------------------------------------------------- - -terminate(Reason, Msg, #gs2_state { name = Name, - mod = Mod, - state = State, - debug = Debug }) -> - case catch Mod:terminate(Reason, State) of - {'EXIT', R} -> - error_info(R, Reason, Name, Msg, State, Debug), - exit(R); - _ -> - case Reason of - normal -> - exit(normal); - shutdown -> - exit(shutdown); - {shutdown,_}=Shutdown -> - exit(Shutdown); - _ -> - error_info(Reason, undefined, Name, Msg, State, Debug), - exit(Reason) - end - end. - -error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) -> - %% OTP-5811 Don't send an error report if it's the system process - %% application_controller which is terminating - let init take care - %% of it instead - ok; -error_info(Reason, RootCause, Name, Msg, State, Debug) -> - Reason1 = error_reason(Reason), - Fmt = - "** Generic server ~p terminating~n" - "** Last message in was ~p~n" - "** When Server state == ~p~n" - "** Reason for termination == ~n** ~p~n", - case RootCause of - undefined -> format(Fmt, [Name, Msg, State, Reason1]); - _ -> format(Fmt ++ "** In 'terminate' callback " - "with reason ==~n** ~p~n", - [Name, Msg, State, Reason1, - error_reason(RootCause)]) - end, - sys:print_log(Debug), - ok. - -error_reason({undef,[{M,F,A}|MFAs]} = Reason) -> - case code:is_loaded(M) of - false -> {'module could not be loaded',[{M,F,A}|MFAs]}; - _ -> case erlang:function_exported(M, F, length(A)) of - true -> Reason; - false -> {'function not exported',[{M,F,A}|MFAs]} - end - end; -error_reason(Reason) -> - Reason. - -%%% --------------------------------------------------- -%%% Misc. functions. -%%% --------------------------------------------------- - -opt(Op, [{Op, Value}|_]) -> - {ok, Value}; -opt(Op, [_|Options]) -> - opt(Op, Options); -opt(_, []) -> - false. - -debug_options(Name, Opts) -> - case opt(debug, Opts) of - {ok, Options} -> dbg_options(Name, Options); - _ -> dbg_options(Name, []) - end. - -dbg_options(Name, []) -> - Opts = - case init:get_argument(generic_debug) of - error -> - []; - _ -> - [log, statistics] - end, - dbg_opts(Name, Opts); -dbg_options(Name, Opts) -> - dbg_opts(Name, Opts). - -dbg_opts(Name, Opts) -> - case catch sys:debug_options(Opts) of - {'EXIT',_} -> - format("~p: ignoring erroneous debug options - ~p~n", - [Name, Opts]), - []; - Dbg -> - Dbg - end. - -get_proc_name(Pid) when is_pid(Pid) -> - Pid; -get_proc_name({local, Name}) -> - case process_info(self(), registered_name) of - {registered_name, Name} -> - Name; - {registered_name, _Name} -> - exit(process_not_registered); - [] -> - exit(process_not_registered) - end; -get_proc_name({global, Name}) -> - case whereis_name(Name) of - undefined -> - exit(process_not_registered_globally); - Pid when Pid =:= self() -> - Name; - _Pid -> - exit(process_not_registered_globally) - end. - -get_parent() -> - case get('$ancestors') of - [Parent | _] when is_pid(Parent)-> - Parent; - [Parent | _] when is_atom(Parent)-> - name_to_pid(Parent); - _ -> - exit(process_was_not_started_by_proc_lib) - end. - -name_to_pid(Name) -> - case whereis(Name) of - undefined -> - case whereis_name(Name) of - undefined -> - exit(could_not_find_registerd_name); - Pid -> - Pid - end; - Pid -> - Pid - end. - -whereis_name(Name) -> - case ets:lookup(global_names, Name) of - [{_Name, Pid, _Method, _RPid, _Ref}] -> - if node(Pid) == node() -> - case is_process_alive(Pid) of - true -> Pid; - false -> undefined - end; - true -> - Pid - end; - [] -> undefined - end. - -find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> - PCall = function_exported_or_default(Mod, 'prioritise_call', 4, - fun (_Msg, _From, _State) -> 0 end), - PCast = function_exported_or_default(Mod, 'prioritise_cast', 3, - fun (_Msg, _State) -> 0 end), - PInfo = function_exported_or_default(Mod, 'prioritise_info', 3, - fun (_Msg, _State) -> 0 end), - GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }. - -function_exported_or_default(Mod, Fun, Arity, Default) -> - case erlang:function_exported(Mod, Fun, Arity) of - true -> case Arity of - 3 -> fun (Msg, GS2State = #gs2_state { queue = Queue, - state = State }) -> - Length = priority_queue:len(Queue), - case catch Mod:Fun(Msg, Length, State) of - drop -> - drop; - Res when is_integer(Res) -> - Res; - Err -> - handle_common_termination(Err, Msg, GS2State) - end - end; - 4 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue, - state = State }) -> - Length = priority_queue:len(Queue), - case catch Mod:Fun(Msg, From, Length, State) of - Res when is_integer(Res) -> - Res; - Err -> - handle_common_termination(Err, Msg, GS2State) - end - end - end; - false -> Default - end. - -%%----------------------------------------------------------------- -%% Status information -%%----------------------------------------------------------------- -format_status(Opt, StatusData) -> - [PDict, SysState, Parent, Debug, - #gs2_state{name = Name, state = State, mod = Mod, queue = Queue}] = - StatusData, - NameTag = if is_pid(Name) -> - pid_to_list(Name); - is_atom(Name) -> - Name - end, - Header = lists:concat(["Status for generic server ", NameTag]), - Log = sys:get_debug(log, Debug, []), - Specfic = callback(Mod, format_status, [Opt, [PDict, State]], - fun () -> [{data, [{"State", State}]}] end), - Messages = callback(Mod, format_message_queue, [Opt, Queue], - fun () -> priority_queue:to_list(Queue) end), - [{header, Header}, - {data, [{"Status", SysState}, - {"Parent", Parent}, - {"Logged events", Log}, - {"Queued messages", Messages}]} | - Specfic]. - -callback(Mod, FunName, Args, DefaultThunk) -> - case erlang:function_exported(Mod, FunName, length(Args)) of - true -> case catch apply(Mod, FunName, Args) of - {'EXIT', _} -> DefaultThunk(); - Success -> Success - end; - false -> DefaultThunk() - end. diff --git a/src/priority_queue.erl b/src/priority_queue.erl deleted file mode 100644 index 7147b0bb4..000000000 --- a/src/priority_queue.erl +++ /dev/null @@ -1,259 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. -%% - -%% Priority queues have essentially the same interface as ordinary -%% queues, except that a) there is an in/3 that takes a priority, and -%% b) we have only implemented the core API we need. -%% -%% Priorities should be integers - the higher the value the higher the -%% priority - but we don't actually check that. -%% -%% in/2 inserts items with priority 0. -%% -%% We optimise the case where a priority queue is being used just like -%% an ordinary queue. When that is the case we represent the priority -%% queue as an ordinary queue. We could just call into the 'queue' -%% module for that, but for efficiency we implement the relevant -%% functions directly in here, thus saving on inter-module calls and -%% eliminating a level of boxing. -%% -%% When the queue contains items with non-zero priorities, it is -%% represented as a sorted kv list with the inverted Priority as the -%% key and an ordinary queue as the value. Here again we use our own -%% ordinary queue implemention for efficiency, often making recursive -%% calls into the same function knowing that ordinary queues represent -%% a base case. - --module(priority_queue). - --export([new/0, is_queue/1, is_empty/1, len/1, plen/2, to_list/1, from_list/1, - in/2, in/3, out/1, out/2, out_p/1, join/2, filter/2, fold/3, highest/1]). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(q() :: pqueue()). --type(priority() :: integer() | 'infinity'). --type(squeue() :: {queue, [any()], [any()], non_neg_integer()}). --type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). - --export_type([q/0]). - --spec(new/0 :: () -> pqueue()). --spec(is_queue/1 :: (any()) -> boolean()). --spec(is_empty/1 :: (pqueue()) -> boolean()). --spec(len/1 :: (pqueue()) -> non_neg_integer()). --spec(plen/2 :: (priority(), pqueue()) -> non_neg_integer()). --spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). --spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()). --spec(in/2 :: (any(), pqueue()) -> pqueue()). --spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). --spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). --spec(out_p/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}). --spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). --spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()). --spec(fold/3 :: - (fun ((any(), priority(), A) -> A), A, pqueue()) -> A). --spec(highest/1 :: (pqueue()) -> priority() | 'empty'). - --endif. - -%%---------------------------------------------------------------------------- - -new() -> - {queue, [], [], 0}. - -is_queue({queue, R, F, L}) when is_list(R), is_list(F), is_integer(L) -> - true; -is_queue({pqueue, Queues}) when is_list(Queues) -> - lists:all(fun ({infinity, Q}) -> is_queue(Q); - ({P, Q}) -> is_integer(P) andalso is_queue(Q) - end, Queues); -is_queue(_) -> - false. - -is_empty({queue, [], [], 0}) -> - true; -is_empty(_) -> - false. - -len({queue, _R, _F, L}) -> - L; -len({pqueue, Queues}) -> - lists:sum([len(Q) || {_, Q} <- Queues]). - -plen(0, {queue, _R, _F, L}) -> - L; -plen(_, {queue, _R, _F, _}) -> - 0; -plen(P, {pqueue, Queues}) -> - case lists:keysearch(maybe_negate_priority(P), 1, Queues) of - {value, {_, Q}} -> len(Q); - false -> 0 - end. - -to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) -> - [{0, V} || V <- Out ++ lists:reverse(In, [])]; -to_list({pqueue, Queues}) -> - [{maybe_negate_priority(P), V} || {P, Q} <- Queues, - {0, V} <- to_list(Q)]. - -from_list(L) -> - lists:foldl(fun ({P, E}, Q) -> in(E, P, Q) end, new(), L). - -in(Item, Q) -> - in(Item, 0, Q). - -in(X, 0, {queue, [_] = In, [], 1}) -> - {queue, [X], In, 2}; -in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) -> - {queue, [X|In], Out, Len + 1}; -in(X, Priority, _Q = {queue, [], [], 0}) -> - in(X, Priority, {pqueue, []}); -in(X, Priority, Q = {queue, _, _, _}) -> - in(X, Priority, {pqueue, [{0, Q}]}); -in(X, Priority, {pqueue, Queues}) -> - P = maybe_negate_priority(Priority), - {pqueue, case lists:keysearch(P, 1, Queues) of - {value, {_, Q}} -> - lists:keyreplace(P, 1, Queues, {P, in(X, Q)}); - false when P == infinity -> - [{P, {queue, [X], [], 1}} | Queues]; - false -> - case Queues of - [{infinity, InfQueue} | Queues1] -> - [{infinity, InfQueue} | - lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues1])]; - _ -> - lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues]) - end - end}. - -out({queue, [], [], 0} = Q) -> - {empty, Q}; -out({queue, [V], [], 1}) -> - {{value, V}, {queue, [], [], 0}}; -out({queue, [Y|In], [], Len}) -> - [V|Out] = lists:reverse(In, []), - {{value, V}, {queue, [Y], Out, Len - 1}}; -out({queue, In, [V], Len}) when is_list(In) -> - {{value,V}, r2f(In, Len - 1)}; -out({queue, In,[V|Out], Len}) when is_list(In) -> - {{value, V}, {queue, In, Out, Len - 1}}; -out({pqueue, [{P, Q} | Queues]}) -> - {R, Q1} = out(Q), - NewQ = case is_empty(Q1) of - true -> case Queues of - [] -> {queue, [], [], 0}; - [{0, OnlyQ}] -> OnlyQ; - [_|_] -> {pqueue, Queues} - end; - false -> {pqueue, [{P, Q1} | Queues]} - end, - {R, NewQ}. - -out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0); -out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)). - -out(0, {queue, _, _, _} = Q) -> - out(Q); -out(Priority, {queue, _, _, _}) -> - erlang:error(badarg, [Priority]); -out(Priority, {pqueue, Queues}) -> - P = maybe_negate_priority(Priority), - case lists:keysearch(P, 1, Queues) of - {value, {_, Q}} -> - {R, Q1} = out(Q), - Queues1 = case is_empty(Q1) of - true -> lists:keydelete(P, 1, Queues); - false -> lists:keyreplace(P, 1, Queues, {P, Q1}) - end, - {R, case Queues1 of - [] -> {queue, [], [], 0}; - [{0, OnlyQ}] -> OnlyQ; - [_|_] -> {pqueue, Queues1} - end}; - false -> - {empty, {pqueue, Queues}} - end. - -add_p(R, P) -> case R of - {empty, Q} -> {empty, Q}; - {{value, V}, Q} -> {{value, V, P}, Q} - end. - -join(A, {queue, [], [], 0}) -> - A; -join({queue, [], [], 0}, B) -> - B; -join({queue, AIn, AOut, ALen}, {queue, BIn, BOut, BLen}) -> - {queue, BIn, AOut ++ lists:reverse(AIn, BOut), ALen + BLen}; -join(A = {queue, _, _, _}, {pqueue, BPQ}) -> - {Pre, Post} = - lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ), - Post1 = case Post of - [] -> [ {0, A} ]; - [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; - _ -> [ {0, A} | Post ] - end, - {pqueue, Pre ++ Post1}; -join({pqueue, APQ}, B = {queue, _, _, _}) -> - {Pre, Post} = - lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ), - Post1 = case Post of - [] -> [ {0, B} ]; - [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; - _ -> [ {0, B} | Post ] - end, - {pqueue, Pre ++ Post1}; -join({pqueue, APQ}, {pqueue, BPQ}) -> - {pqueue, merge(APQ, BPQ, [])}. - -merge([], BPQ, Acc) -> - lists:reverse(Acc, BPQ); -merge(APQ, [], Acc) -> - lists:reverse(Acc, APQ); -merge([{P, A}|As], [{P, B}|Bs], Acc) -> - merge(As, Bs, [ {P, join(A, B)} | Acc ]); -merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity -> - merge(As, Bs, [ {PA, A} | Acc ]); -merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> - merge(As, Bs, [ {PB, B} | Acc ]). - -filter(Pred, Q) -> fold(fun(V, P, Acc) -> - case Pred(V) of - true -> in(V, P, Acc); - false -> Acc - end - end, new(), Q). - -fold(Fun, Init, Q) -> case out_p(Q) of - {empty, _Q} -> Init; - {{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1) - end. - -highest({queue, [], [], 0}) -> empty; -highest({queue, _, _, _}) -> 0; -highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P). - -r2f([], 0) -> {queue, [], [], 0}; -r2f([_] = R, 1) -> {queue, [], R, 1}; -r2f([X,Y], 2) -> {queue, [X], [Y], 2}; -r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}. - -maybe_negate_priority(infinity) -> infinity; -maybe_negate_priority(P) -> -P.