diff --git a/etc/emqx.conf b/etc/emqx.conf index bcf92d751..e24f3fab5 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -563,13 +563,13 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ##-------------------------------------------------------------------- ## HTTP Management API Listener -listener.api.mgmt = 127.0.0.1:8080 +## listener.api.mgmt = 127.0.0.1:8080 -listener.api.mgmt.acceptors = 4 +## listener.api.mgmt.acceptors = 4 -listener.api.mgmt.max_clients = 64 +## listener.api.mgmt.max_clients = 64 -listener.api.mgmt.access.1 = allow all +## listener.api.mgmt.access.1 = allow all ##------------------------------------------------------------------- ## System Monitor diff --git a/src/emqx.app.src b/src/emqx.app.src new file mode 100644 index 000000000..eece8df33 --- /dev/null +++ b/src/emqx.app.src @@ -0,0 +1,11 @@ +{application,emqx, + [{description,"EMQ X Broker"}, + {vsn,"2.4"}, + {modules,[]}, + {registered,[emqx_sup]}, + {applications,[kernel,stdlib,gproc,gen_rpc,lager,esockd,mochiweb, lager_syslog,pbkdf2,bcrypt,clique,jsx]}, + {env,[]}, + {mod,{emqx_app,[]}}, + {maintainers,["Feng Lee "]}, + {licenses,["Apache-2.0"]}, + {links,[{"Github","https://github.com/emqtt/emqttd"}]}]}. diff --git a/src/emqx_mgmt.erl b/src/emqx_mgmt.erl.bk similarity index 100% rename from src/emqx_mgmt.erl rename to src/emqx_mgmt.erl.bk diff --git a/src/emqx_rest_api.erl b/src/emqx_rest_api.erl.bk similarity index 100% rename from src/emqx_rest_api.erl rename to src/emqx_rest_api.erl.bk diff --git a/src/priority_queue.erl b/src/priority_queue.erl new file mode 100644 index 000000000..7147b0bb4 --- /dev/null +++ b/src/priority_queue.erl @@ -0,0 +1,259 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + +%% Priority queues have essentially the same interface as ordinary +%% queues, except that a) there is an in/3 that takes a priority, and +%% b) we have only implemented the core API we need. +%% +%% Priorities should be integers - the higher the value the higher the +%% priority - but we don't actually check that. +%% +%% in/2 inserts items with priority 0. +%% +%% We optimise the case where a priority queue is being used just like +%% an ordinary queue. When that is the case we represent the priority +%% queue as an ordinary queue. We could just call into the 'queue' +%% module for that, but for efficiency we implement the relevant +%% functions directly in here, thus saving on inter-module calls and +%% eliminating a level of boxing. +%% +%% When the queue contains items with non-zero priorities, it is +%% represented as a sorted kv list with the inverted Priority as the +%% key and an ordinary queue as the value. Here again we use our own +%% ordinary queue implemention for efficiency, often making recursive +%% calls into the same function knowing that ordinary queues represent +%% a base case. + +-module(priority_queue). + +-export([new/0, is_queue/1, is_empty/1, len/1, plen/2, to_list/1, from_list/1, + in/2, in/3, out/1, out/2, out_p/1, join/2, filter/2, fold/3, highest/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(q() :: pqueue()). +-type(priority() :: integer() | 'infinity'). +-type(squeue() :: {queue, [any()], [any()], non_neg_integer()}). +-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). + +-export_type([q/0]). + +-spec(new/0 :: () -> pqueue()). +-spec(is_queue/1 :: (any()) -> boolean()). +-spec(is_empty/1 :: (pqueue()) -> boolean()). +-spec(len/1 :: (pqueue()) -> non_neg_integer()). +-spec(plen/2 :: (priority(), pqueue()) -> non_neg_integer()). +-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). +-spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()). +-spec(in/2 :: (any(), pqueue()) -> pqueue()). +-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). +-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). +-spec(out_p/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}). +-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). +-spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()). +-spec(fold/3 :: + (fun ((any(), priority(), A) -> A), A, pqueue()) -> A). +-spec(highest/1 :: (pqueue()) -> priority() | 'empty'). + +-endif. + +%%---------------------------------------------------------------------------- + +new() -> + {queue, [], [], 0}. + +is_queue({queue, R, F, L}) when is_list(R), is_list(F), is_integer(L) -> + true; +is_queue({pqueue, Queues}) when is_list(Queues) -> + lists:all(fun ({infinity, Q}) -> is_queue(Q); + ({P, Q}) -> is_integer(P) andalso is_queue(Q) + end, Queues); +is_queue(_) -> + false. + +is_empty({queue, [], [], 0}) -> + true; +is_empty(_) -> + false. + +len({queue, _R, _F, L}) -> + L; +len({pqueue, Queues}) -> + lists:sum([len(Q) || {_, Q} <- Queues]). + +plen(0, {queue, _R, _F, L}) -> + L; +plen(_, {queue, _R, _F, _}) -> + 0; +plen(P, {pqueue, Queues}) -> + case lists:keysearch(maybe_negate_priority(P), 1, Queues) of + {value, {_, Q}} -> len(Q); + false -> 0 + end. + +to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) -> + [{0, V} || V <- Out ++ lists:reverse(In, [])]; +to_list({pqueue, Queues}) -> + [{maybe_negate_priority(P), V} || {P, Q} <- Queues, + {0, V} <- to_list(Q)]. + +from_list(L) -> + lists:foldl(fun ({P, E}, Q) -> in(E, P, Q) end, new(), L). + +in(Item, Q) -> + in(Item, 0, Q). + +in(X, 0, {queue, [_] = In, [], 1}) -> + {queue, [X], In, 2}; +in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) -> + {queue, [X|In], Out, Len + 1}; +in(X, Priority, _Q = {queue, [], [], 0}) -> + in(X, Priority, {pqueue, []}); +in(X, Priority, Q = {queue, _, _, _}) -> + in(X, Priority, {pqueue, [{0, Q}]}); +in(X, Priority, {pqueue, Queues}) -> + P = maybe_negate_priority(Priority), + {pqueue, case lists:keysearch(P, 1, Queues) of + {value, {_, Q}} -> + lists:keyreplace(P, 1, Queues, {P, in(X, Q)}); + false when P == infinity -> + [{P, {queue, [X], [], 1}} | Queues]; + false -> + case Queues of + [{infinity, InfQueue} | Queues1] -> + [{infinity, InfQueue} | + lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues1])]; + _ -> + lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues]) + end + end}. + +out({queue, [], [], 0} = Q) -> + {empty, Q}; +out({queue, [V], [], 1}) -> + {{value, V}, {queue, [], [], 0}}; +out({queue, [Y|In], [], Len}) -> + [V|Out] = lists:reverse(In, []), + {{value, V}, {queue, [Y], Out, Len - 1}}; +out({queue, In, [V], Len}) when is_list(In) -> + {{value,V}, r2f(In, Len - 1)}; +out({queue, In,[V|Out], Len}) when is_list(In) -> + {{value, V}, {queue, In, Out, Len - 1}}; +out({pqueue, [{P, Q} | Queues]}) -> + {R, Q1} = out(Q), + NewQ = case is_empty(Q1) of + true -> case Queues of + [] -> {queue, [], [], 0}; + [{0, OnlyQ}] -> OnlyQ; + [_|_] -> {pqueue, Queues} + end; + false -> {pqueue, [{P, Q1} | Queues]} + end, + {R, NewQ}. + +out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0); +out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)). + +out(0, {queue, _, _, _} = Q) -> + out(Q); +out(Priority, {queue, _, _, _}) -> + erlang:error(badarg, [Priority]); +out(Priority, {pqueue, Queues}) -> + P = maybe_negate_priority(Priority), + case lists:keysearch(P, 1, Queues) of + {value, {_, Q}} -> + {R, Q1} = out(Q), + Queues1 = case is_empty(Q1) of + true -> lists:keydelete(P, 1, Queues); + false -> lists:keyreplace(P, 1, Queues, {P, Q1}) + end, + {R, case Queues1 of + [] -> {queue, [], [], 0}; + [{0, OnlyQ}] -> OnlyQ; + [_|_] -> {pqueue, Queues1} + end}; + false -> + {empty, {pqueue, Queues}} + end. + +add_p(R, P) -> case R of + {empty, Q} -> {empty, Q}; + {{value, V}, Q} -> {{value, V, P}, Q} + end. + +join(A, {queue, [], [], 0}) -> + A; +join({queue, [], [], 0}, B) -> + B; +join({queue, AIn, AOut, ALen}, {queue, BIn, BOut, BLen}) -> + {queue, BIn, AOut ++ lists:reverse(AIn, BOut), ALen + BLen}; +join(A = {queue, _, _, _}, {pqueue, BPQ}) -> + {Pre, Post} = + lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ), + Post1 = case Post of + [] -> [ {0, A} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; + _ -> [ {0, A} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, B = {queue, _, _, _}) -> + {Pre, Post} = + lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ), + Post1 = case Post of + [] -> [ {0, B} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; + _ -> [ {0, B} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, {pqueue, BPQ}) -> + {pqueue, merge(APQ, BPQ, [])}. + +merge([], BPQ, Acc) -> + lists:reverse(Acc, BPQ); +merge(APQ, [], Acc) -> + lists:reverse(Acc, APQ); +merge([{P, A}|As], [{P, B}|Bs], Acc) -> + merge(As, Bs, [ {P, join(A, B)} | Acc ]); +merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity -> + merge(As, Bs, [ {PA, A} | Acc ]); +merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> + merge(As, Bs, [ {PB, B} | Acc ]). + +filter(Pred, Q) -> fold(fun(V, P, Acc) -> + case Pred(V) of + true -> in(V, P, Acc); + false -> Acc + end + end, new(), Q). + +fold(Fun, Init, Q) -> case out_p(Q) of + {empty, _Q} -> Init; + {{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1) + end. + +highest({queue, [], [], 0}) -> empty; +highest({queue, _, _, _}) -> 0; +highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P). + +r2f([], 0) -> {queue, [], [], 0}; +r2f([_] = R, 1) -> {queue, [], R, 1}; +r2f([X,Y], 2) -> {queue, [X], [Y], 2}; +r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}. + +maybe_negate_priority(infinity) -> infinity; +maybe_negate_priority(P) -> -P.