Merge pull request #249 from emqtt/dev

gen_server2 to resolve issue #212
This commit is contained in:
Feng Lee 2015-08-15 20:54:13 +08:00
commit 73313cf1fd
5 changed files with 1626 additions and 17 deletions

View File

@ -20,3 +20,6 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
The source files 'gen_server2.erl' and 'priority_queue.erl' are from RabbitMQ
v3.5.4 and licensed under MPL license.

View File

@ -89,7 +89,7 @@
{session, [ {session, [
%% Max number of QoS 1 and 2 messages that can be “in flight” at one time. %% Max number of QoS 1 and 2 messages that can be “in flight” at one time.
%% 0 means no limit %% 0 means no limit
{max_inflight, 100}, {max_inflight, 20},
%% Max retries for unack Qos1/2 messages %% Max retries for unack Qos1/2 messages
{unack_retries, 3}, {unack_retries, 3},
@ -114,7 +114,7 @@
{queue, [ {queue, [
%% Max queue length. enqueued messages when persistent client disconnected, %% Max queue length. enqueued messages when persistent client disconnected,
%% or inflight window is full. %% or inflight window is full.
{max_length, 1000}, {max_length, 100},
%% Low-water mark of queued messsages %% Low-water mark of queued messsages
{low_watermark, 0.2}, {low_watermark, 0.2},

View File

@ -61,12 +61,15 @@
puback/2, pubrec/2, pubrel/2, pubcomp/2, puback/2, pubrec/2, pubrel/2, pubcomp/2,
subscribe/2, unsubscribe/2]). subscribe/2, unsubscribe/2]).
-behaviour(gen_server). -behaviour(gen_server2).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
%% gen_server2 Message Priorities
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
-record(session, { -record(session, {
%% Clean Session Flag %% Clean Session Flag
@ -139,7 +142,7 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec start_link(boolean(), mqtt_client_id(), pid()) -> {ok, pid()} | {error, any()}. -spec start_link(boolean(), mqtt_client_id(), pid()) -> {ok, pid()} | {error, any()}.
start_link(CleanSess, ClientId, ClientPid) -> start_link(CleanSess, ClientId, ClientPid) ->
gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []). gen_server2:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Resume a session. %% @doc Resume a session.
@ -147,7 +150,7 @@ start_link(CleanSess, ClientId, ClientPid) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec resume(pid(), mqtt_client_id(), pid()) -> ok. -spec resume(pid(), mqtt_client_id(), pid()) -> ok.
resume(Session, ClientId, ClientPid) -> resume(Session, ClientId, ClientPid) ->
gen_server:cast(Session, {resume, ClientId, ClientPid}). gen_server2:cast(Session, {resume, ClientId, ClientPid}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Destroy a session. %% @doc Destroy a session.
@ -155,7 +158,7 @@ resume(Session, ClientId, ClientPid) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec destroy(pid(), mqtt_client_id()) -> ok. -spec destroy(pid(), mqtt_client_id()) -> ok.
destroy(Session, ClientId) -> destroy(Session, ClientId) ->
gen_server:call(Session, {destroy, ClientId}). gen_server2:call(Session, {destroy, ClientId}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Subscribe Topics %% @doc Subscribe Topics
@ -163,7 +166,7 @@ destroy(Session, ClientId) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}. -spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}.
subscribe(Session, TopicTable) -> subscribe(Session, TopicTable) ->
gen_server:call(Session, {subscribe, TopicTable}, infinity). gen_server2:call(Session, {subscribe, TopicTable}, infinity).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Publish message %% @doc Publish message
@ -180,7 +183,7 @@ publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) ->
publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) -> publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) ->
%% publish qos2 by session %% publish qos2 by session
gen_server:call(Session, {publish, Msg}). gen_server2:call(Session, {publish, Msg}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc PubAck message %% @doc PubAck message
@ -188,19 +191,19 @@ publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec puback(pid(), mqtt_packet_id()) -> ok. -spec puback(pid(), mqtt_packet_id()) -> ok.
puback(Session, PktId) -> puback(Session, PktId) ->
gen_server:cast(Session, {puback, PktId}). gen_server2:cast(Session, {puback, PktId}).
-spec pubrec(pid(), mqtt_packet_id()) -> ok. -spec pubrec(pid(), mqtt_packet_id()) -> ok.
pubrec(Session, PktId) -> pubrec(Session, PktId) ->
gen_server:cast(Session, {pubrec, PktId}). gen_server2:cast(Session, {pubrec, PktId}).
-spec pubrel(pid(), mqtt_packet_id()) -> ok. -spec pubrel(pid(), mqtt_packet_id()) -> ok.
pubrel(Session, PktId) -> pubrel(Session, PktId) ->
gen_server:cast(Session, {pubrel, PktId}). gen_server2:cast(Session, {pubrel, PktId}).
-spec pubcomp(pid(), mqtt_packet_id()) -> ok. -spec pubcomp(pid(), mqtt_packet_id()) -> ok.
pubcomp(Session, PktId) -> pubcomp(Session, PktId) ->
gen_server:cast(Session, {pubcomp, PktId}). gen_server2:cast(Session, {pubcomp, PktId}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Unsubscribe Topics %% @doc Unsubscribe Topics
@ -208,7 +211,7 @@ pubcomp(Session, PktId) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec unsubscribe(pid(), [binary()]) -> ok. -spec unsubscribe(pid(), [binary()]) -> ok.
unsubscribe(Session, Topics) -> unsubscribe(Session, Topics) ->
gen_server:call(Session, {unsubscribe, Topics}, infinity). gen_server2:call(Session, {unsubscribe, Topics}, infinity).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -241,6 +244,33 @@ init([CleanSess, ClientId, ClientPid]) ->
%% start statistics %% start statistics
{ok, start_collector(Session), hibernate}. {ok, start_collector(Session), hibernate}.
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
{destroy, _} -> 9;
{unsubscribe, _} -> 2;
{subscribe, _} -> 1;
_ -> 0
end.
prioritise_cast(Msg, _Len, _State) ->
case Msg of
{resume, _, _} -> 9;
{pubrel, _PktId} -> 8;
{pubcomp, _PktId} -> 8;
{pubrec, _PktId} -> 8;
{puback, _PktId} -> 7;
_ -> 0
end.
prioritise_info(Msg, _Len, _State) ->
case Msg of
{'EXIT', _, _} -> 10;
session_expired -> 10;
{timeout, _, _} -> 5;
collect_info -> 2;
_ -> 0
end.
handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId, handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId,
subscriptions = Subscriptions}) -> subscriptions = Subscriptions}) ->
@ -368,7 +398,7 @@ handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_a
cancel_timer(TRef), cancel_timer(TRef),
noreply(dequeue(acked(PktId, Session))); noreply(dequeue(acked(PktId, Session)));
error -> error ->
lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, PktId]), lager:critical("Session(~s): cannot find PUBACK '~p'!", [ClientId, PktId]),
noreply(Session) noreply(Session)
end; end;
@ -429,8 +459,7 @@ handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}},
ClientPid ! {deliver, Msg}, ClientPid ! {deliver, Msg},
noreply(Session); noreply(Session);
handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, Session = #session{message_queue = MsgQ})
Session = #session{client_id = ClientId, message_queue = MsgQ})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
case check_inflight(Session) of case check_inflight(Session) of
@ -461,7 +490,7 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id =
{noreply, Session#session{awaiting_ack = AwaitingAck1}}; {noreply, Session#session{awaiting_ack = AwaitingAck1}};
error -> error ->
lager:error([{client, ClientId}], "Session ~s " lager:error([{client, ClientId}], "Session ~s "
"cannot find Awaiting Ack:~p", [ClientId, PktId]), "Cannot find Awaiting Ack:~p", [ClientId, PktId]),
{noreply, Session} {noreply, Session}
end; end;

1350
src/gen_server2.erl Normal file

File diff suppressed because it is too large Load Diff

227
src/priority_queue.erl Normal file
View File

@ -0,0 +1,227 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
%%
%% Priority queues have essentially the same interface as ordinary
%% queues, except that a) there is an in/3 that takes a priority, and
%% b) we have only implemented the core API we need.
%%
%% Priorities should be integers - the higher the value the higher the
%% priority - but we don't actually check that.
%%
%% in/2 inserts items with priority 0.
%%
%% We optimise the case where a priority queue is being used just like
%% an ordinary queue. When that is the case we represent the priority
%% queue as an ordinary queue. We could just call into the 'queue'
%% module for that, but for efficiency we implement the relevant
%% functions directly in here, thus saving on inter-module calls and
%% eliminating a level of boxing.
%%
%% When the queue contains items with non-zero priorities, it is
%% represented as a sorted kv list with the inverted Priority as the
%% key and an ordinary queue as the value. Here again we use our own
%% ordinary queue implemention for efficiency, often making recursive
%% calls into the same function knowing that ordinary queues represent
%% a base case.
-module(priority_queue).
-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, from_list/1,
in/2, in/3, out/1, out_p/1, join/2, filter/2, fold/3, highest/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-export_type([q/0]).
-type(q() :: pqueue()).
-type(priority() :: integer() | 'infinity').
-type(squeue() :: {queue, [any()], [any()], non_neg_integer()}).
-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
-spec(new/0 :: () -> pqueue()).
-spec(is_queue/1 :: (any()) -> boolean()).
-spec(is_empty/1 :: (pqueue()) -> boolean()).
-spec(len/1 :: (pqueue()) -> non_neg_integer()).
-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
-spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()).
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
-spec(out_p/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()).
-spec(fold/3 ::
(fun ((any(), priority(), A) -> A), A, pqueue()) -> A).
-spec(highest/1 :: (pqueue()) -> priority() | 'empty').
-endif.
%%----------------------------------------------------------------------------
new() ->
{queue, [], [], 0}.
is_queue({queue, R, F, L}) when is_list(R), is_list(F), is_integer(L) ->
true;
is_queue({pqueue, Queues}) when is_list(Queues) ->
lists:all(fun ({infinity, Q}) -> is_queue(Q);
({P, Q}) -> is_integer(P) andalso is_queue(Q)
end, Queues);
is_queue(_) ->
false.
is_empty({queue, [], [], 0}) ->
true;
is_empty(_) ->
false.
len({queue, _R, _F, L}) ->
L;
len({pqueue, Queues}) ->
lists:sum([len(Q) || {_, Q} <- Queues]).
to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) ->
[{0, V} || V <- Out ++ lists:reverse(In, [])];
to_list({pqueue, Queues}) ->
[{maybe_negate_priority(P), V} || {P, Q} <- Queues,
{0, V} <- to_list(Q)].
from_list(L) ->
lists:foldl(fun ({P, E}, Q) -> in(E, P, Q) end, new(), L).
in(Item, Q) ->
in(Item, 0, Q).
in(X, 0, {queue, [_] = In, [], 1}) ->
{queue, [X], In, 2};
in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) ->
{queue, [X|In], Out, Len + 1};
in(X, Priority, _Q = {queue, [], [], 0}) ->
in(X, Priority, {pqueue, []});
in(X, Priority, Q = {queue, _, _, _}) ->
in(X, Priority, {pqueue, [{0, Q}]});
in(X, Priority, {pqueue, Queues}) ->
P = maybe_negate_priority(Priority),
{pqueue, case lists:keysearch(P, 1, Queues) of
{value, {_, Q}} ->
lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
false when P == infinity ->
[{P, {queue, [X], [], 1}} | Queues];
false ->
case Queues of
[{infinity, InfQueue} | Queues1] ->
[{infinity, InfQueue} |
lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues1])];
_ ->
lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues])
end
end}.
out({queue, [], [], 0} = Q) ->
{empty, Q};
out({queue, [V], [], 1}) ->
{{value, V}, {queue, [], [], 0}};
out({queue, [Y|In], [], Len}) ->
[V|Out] = lists:reverse(In, []),
{{value, V}, {queue, [Y], Out, Len - 1}};
out({queue, In, [V], Len}) when is_list(In) ->
{{value,V}, r2f(In, Len - 1)};
out({queue, In,[V|Out], Len}) when is_list(In) ->
{{value, V}, {queue, In, Out, Len - 1}};
out({pqueue, [{P, Q} | Queues]}) ->
{R, Q1} = out(Q),
NewQ = case is_empty(Q1) of
true -> case Queues of
[] -> {queue, [], [], 0};
[{0, OnlyQ}] -> OnlyQ;
[_|_] -> {pqueue, Queues}
end;
false -> {pqueue, [{P, Q1} | Queues]}
end,
{R, NewQ}.
out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
add_p(R, P) -> case R of
{empty, Q} -> {empty, Q};
{{value, V}, Q} -> {{value, V, P}, Q}
end.
join(A, {queue, [], [], 0}) ->
A;
join({queue, [], [], 0}, B) ->
B;
join({queue, AIn, AOut, ALen}, {queue, BIn, BOut, BLen}) ->
{queue, BIn, AOut ++ lists:reverse(AIn, BOut), ALen + BLen};
join(A = {queue, _, _, _}, {pqueue, BPQ}) ->
{Pre, Post} =
lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ),
Post1 = case Post of
[] -> [ {0, A} ];
[ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
_ -> [ {0, A} | Post ]
end,
{pqueue, Pre ++ Post1};
join({pqueue, APQ}, B = {queue, _, _, _}) ->
{Pre, Post} =
lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ),
Post1 = case Post of
[] -> [ {0, B} ];
[ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
_ -> [ {0, B} | Post ]
end,
{pqueue, Pre ++ Post1};
join({pqueue, APQ}, {pqueue, BPQ}) ->
{pqueue, merge(APQ, BPQ, [])}.
merge([], BPQ, Acc) ->
lists:reverse(Acc, BPQ);
merge(APQ, [], Acc) ->
lists:reverse(Acc, APQ);
merge([{P, A}|As], [{P, B}|Bs], Acc) ->
merge(As, Bs, [ {P, join(A, B)} | Acc ]);
merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity ->
merge(As, Bs, [ {PA, A} | Acc ]);
merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
merge(As, Bs, [ {PB, B} | Acc ]).
filter(Pred, Q) -> fold(fun(V, P, Acc) ->
case Pred(V) of
true -> in(V, P, Acc);
false -> Acc
end
end, new(), Q).
fold(Fun, Init, Q) -> case out_p(Q) of
{empty, _Q} -> Init;
{{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1)
end.
highest({queue, [], [], 0}) -> empty;
highest({queue, _, _, _}) -> 0;
highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P).
r2f([], 0) -> {queue, [], [], 0};
r2f([_] = R, 1) -> {queue, [], R, 1};
r2f([X,Y], 2) -> {queue, [X], [Y], 2};
r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
maybe_negate_priority(infinity) -> infinity;
maybe_negate_priority(P) -> -P.