diff --git a/src/emqttd.erl b/src/emqttd.erl index 63470420d..ee51546bc 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -23,6 +23,7 @@ %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- + -module(emqttd). -export([start/0, env/1, env/2, diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index fd11746f2..385a6be39 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -30,20 +30,21 @@ %%% %%% If the broker restarted or crashed, all the messages queued will be gone. %%% -%%% Desgin of The Queue: +%%% Concept of Message Queue and Inflight Window: +%%% %%% |<----------------- Max Len ----------------->| %%% ----------------------------------------------- -%%% IN -> | Pending Messages | Inflight Window | -> Out +%%% IN -> | Messages Queue | Inflight Window | -> Out %%% ----------------------------------------------- -%%% |<--- Win Size --->| +%%% |<--- Win Size --->| %%% %%% -%%% 1. Inflight Window to store the messages awaiting for ack. +%%% 1. Inflight Window to store the messages delivered and awaiting for puback. %%% -%%% 2. Suspend IN messages when the queue is deactive, or inflight windows is full. +%%% 2. Enqueue messages when the inflight window is full. %%% %%% 3. If the queue is full, dropped qos0 messages if store_qos0 is true, -%%% otherwise dropped the oldest pending one. +%%% otherwise dropped the oldest one. %%% %%% @end %%% @@ -55,96 +56,158 @@ -include("emqttd_protocol.hrl"). --export([new/3, name/1, - is_empty/1, is_full/1, - len/1, max_len/1, - in/2, out/1, - stats/1]). +-export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, stats/1]). -define(LOW_WM, 0.2). -define(HIGH_WM, 0.6). --record(mqueue, {name, - q = queue:new(), %% pending queue - len = 0, %% current queue len - low_wm = ?LOW_WM, - high_wm = ?HIGH_WM, - max_len = ?MAX_LEN, - qos0 = false, - dropped = 0, +-type priority() :: {iolist(), pos_integer()}. + +-type option() :: {type, simple | priority} + | {max_length, pos_integer() | infinity} + | {priority, list(priority())} + | {low_watermark, float()} %% Low watermark + | {high_watermark, float()} %% High watermark + | {queue_qos0, boolean()}. %% Queue Qos0? + +-type mqueue_option() :: {max_length, pos_integer()} %% Max queue length + | {low_watermark, float()} %% Low watermark + | {high_watermark, float()} %% High watermark + | {queue_qos0, boolean()}. %% Queue Qos0 + +-type stat() :: {max_len, infinity | pos_integer()} + | {len, non_neg_integer()} + | {dropped, non_neg_integer()}. + +-record(mqueue, {type :: simple | priority, + name, q :: queue:queue() | priority_queue:q(), + %% priority table + pseq = 0, priorities = [], + %% len of simple queue + len = 0, max_len = ?MAX_LEN, + low_wm = ?LOW_WM, high_wm = ?HIGH_WM, + qos0 = false, dropped = 0, alarm_fun}). -type mqueue() :: #mqueue{}. --type mqueue_option() :: {max_length, pos_integer()} %% Max queue length - | {low_watermark, float()} %% Low watermark - | {high_watermark, float()} %% High watermark - | {queue_qos0, boolean()}. %% Queue Qos0 +-export_type([mqueue/0, priority/0, option/0]). --export_type([mqueue/0]). - -%%------------------------------------------------------------------------------ %% @doc New Queue. -%% @end -%%------------------------------------------------------------------------------ --spec new(binary(), list(mqueue_option()), fun()) -> mqueue(). +-spec new(iolist(), list(mqueue_option()), fun()) -> mqueue(). new(Name, Opts, AlarmFun) -> - MaxLen = emqttd_opts:g(max_length, Opts, 1000), - #mqueue{name = Name, - max_len = MaxLen, - low_wm = round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)), - high_wm = round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)), - qos0 = emqttd_opts:g(queue_qos0, Opts, true), - alarm_fun = AlarmFun}. + Type = emqttd_opts:g(type, Opts, simple), + MaxLen = emqttd_opts:g(max_length, Opts, infinity), + init_q(#mqueue{type = Type, name = Name, max_len = MaxLen, + low_wm = low_wm(MaxLen, Opts), + high_wm = high_wm(MaxLen, Opts), + qos0 = emqttd_opts:g(queue_qos0, Opts, false), + alarm_fun = AlarmFun}, Opts). +init_q(MQ = #mqueue{type = simple}, _Opts) -> + MQ#mqueue{q = queue:new()}; +init_q(MQ = #mqueue{type = priority}, Opts) -> + Priorities = emqttd_opts:g(priority, Opts, []), + init_p(Priorities, MQ#mqueue{q = priority_queue:new()}). + +init_p([], MQ) -> + MQ; +init_p([{Topic, P} | L], MQ) -> + {_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ), + init_p(L, MQ1). + +insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) -> + <> = <>, + {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}. + +low_wm(infinity, _Opts) -> + infinity; +low_wm(MaxLen, Opts) -> + round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)). + +high_wm(infinity, _Opts) -> + infinity; +high_wm(MaxLen, Opts) -> + round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)). + +-spec name(mqueue()) -> iolist(). name(#mqueue{name = Name}) -> Name. -is_empty(#mqueue{len = 0}) -> true; -is_empty(_MQ) -> false. +-spec type(mqueue()) -> atom(). +type(#mqueue{type = Type}) -> + Type. -is_full(#mqueue{len = Len, max_len = MaxLen}) - when Len =:= MaxLen -> true; -is_full(_MQ) -> false. +is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0; +is_empty(#mqueue{type = priority, q = Q}) -> priority_queue:is_empty(Q). -len(#mqueue{len = Len}) -> Len. +len(#mqueue{type = simple, len = Len}) -> Len; +len(#mqueue{type = priority, q = Q}) -> priority_queue:len(Q). max_len(#mqueue{max_len= MaxLen}) -> MaxLen. -stats(#mqueue{max_len = MaxLen, len = Len, dropped = Dropped}) -> - [{max_len, MaxLen}, {len, Len}, {dropped, Dropped}]. - -%%------------------------------------------------------------------------------ -%% @doc Queue one message. -%% @end -%%------------------------------------------------------------------------------ +stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) -> + [{len, case Type of + simple -> Len; + priority -> priority_queue:len(Q) + end} | [{max_len, MaxLen}, {dropped, Dropped}]]. +%% @doc Enqueue a message. -spec in(mqtt_message(), mqueue()) -> mqueue(). -%% drop qos0 in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> MQ; - -%% simply drop the oldest one if queue is full, improve later -in(Msg, MQ = #mqueue{q = Q, len = Len, max_len = MaxLen, dropped = Dropped}) - when Len =:= MaxLen -> - {{value, _OldMsg}, Q2} = queue:out(Q), - %lager:error("MQueue(~s) drop ~s", [Name, emqttd_message:format(OldMsg)]), +in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) -> + MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; +in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped}) + when Len >= MaxLen -> + {{value, _Old}, Q2} = queue:out(Q), MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1}; +in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) -> + maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}); -in(Msg, MQ = #mqueue{q = Q, len = Len}) -> - maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}). +in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, + priorities = Priorities, + max_len = infinity}) -> + case lists:keysearch(Topic, 1, Priorities) of + {value, {_, Pri}} -> + MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)}; + false -> + {Pri, MQ1} = insert_p(Topic, 0, MQ), + MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)} + end; +in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, + priorities = Priorities, + max_len = MaxLen}) -> + case lists:keysearch(Topic, 1, Priorities) of + {value, {_, Pri}} -> + case priority_queue:plen(Pri, Q) >= MaxLen of + true -> + {_, Q1} = priority_queue:out(Pri, Q), + MQ#mqueue{q = priority_queue:in(Msg, Pri, Q1)}; + false -> + MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)} + end; + false -> + {Pri, MQ1} = insert_p(Topic, 0, MQ), + MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)} + end. -out(MQ = #mqueue{len = 0}) -> +out(MQ = #mqueue{type = simple, len = 0}) -> {empty, MQ}; - -out(MQ = #mqueue{q = Q, len = Len}) -> - {Result, Q2} = queue:out(Q), - {Result, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}. +out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) -> + {R, Q2} = queue:out(Q), + {R, MQ#mqueue{q = Q2, len = Len - 1}}; +out(MQ = #mqueue{type = simple, q = Q, len = Len}) -> + {R, Q2} = queue:out(Q), + {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}; +out(MQ = #mqueue{type = priority, q = Q}) -> + {R, Q2} = priority_queue:out(Q), + {R, MQ#mqueue{q = Q2}}. maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun}) when Len > HighWM -> - Alarm = #mqtt_alarm{id = list_to_binary(["queue_high_watermark.", Name]), + Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]), severity = warning, title = io_lib:format("Queue ~s high-water mark", [Name]), summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])}, diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 7b9999a12..96e4c9260 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -19,7 +19,7 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc Message Router on Local Node. +%%% @doc Message Router on local node. %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- @@ -52,8 +52,6 @@ -record(state, {pool, id, statsfun, aging :: #aging{}}). --type topic() :: binary(). - %% @doc Start a local router. -spec start_link(atom(), pos_integer(), fun((atom()) -> ok), list()) -> {ok, pid()} | {error, any()}. start_link(Pool, Id, StatsFun, Env) -> @@ -61,7 +59,7 @@ start_link(Pool, Id, StatsFun, Env) -> ?MODULE, [Pool, Id, StatsFun, Env], []). %% @doc Route Message on the local node. --spec route(topic(), mqtt_message()) -> any(). +-spec route(emqttd_topic:topic(), mqtt_message()) -> any(). route(Queue = <<"$Q/", _Q>>, Msg) -> case lookup_routes(Queue) of [] -> @@ -87,12 +85,12 @@ route(Topic, Msg) -> end. %% @doc Has Route? --spec has_route(topic()) -> boolean(). +-spec has_route(emqttd_topic:topic()) -> boolean(). has_route(Topic) -> ets:member(route, Topic). %% @doc Lookup Routes --spec lookup_routes(topic()) -> list(pid()). +-spec lookup_routes(emqttd_topic:topic()) -> list(pid()). lookup_routes(Topic) when is_binary(Topic) -> case ets:member(route, Topic) of true -> @@ -102,12 +100,12 @@ lookup_routes(Topic) when is_binary(Topic) -> end. %% @doc Add Route. --spec add_route(topic(), pid()) -> ok. +-spec add_route(emqttd_topic:topic(), pid()) -> ok. add_route(Topic, Pid) when is_pid(Pid) -> call(pick(Topic), {add_route, Topic, Pid}). %% @doc Add Routes. --spec add_routes(list(topic()), pid()) -> ok. +-spec add_routes(list(emqttd_topic:topic()), pid()) -> ok. add_routes([], _Pid) -> ok; add_routes([Topic], Pid) -> @@ -119,12 +117,12 @@ add_routes(Topics, Pid) -> end, slice(Topics)). %% @doc Delete Route. --spec delete_route(topic(), pid()) -> ok. +-spec delete_route(emqttd_topic:topic(), pid()) -> ok. delete_route(Topic, Pid) -> cast(pick(Topic), {delete_route, Topic, Pid}). %% @doc Delete Routes. --spec delete_routes(list(topic()), pid()) -> ok. +-spec delete_routes(list(emqttd_topic:topic()), pid()) -> ok. delete_routes([Topic], Pid) -> delete_route(Topic, Pid); @@ -136,13 +134,7 @@ delete_routes(Topics, Pid) -> %% @private Slice topics. slice(Topics) -> dict:to_list(lists:foldl(fun(Topic, Dict) -> - Router = pick(Topic), - case dict:find(Router, Dict) of - {ok, L} -> - dict:store(Router, [Topic | L], Dict); - error -> - dict:store(Router, [Topic], Dict) - end + dict:append(pick(Topic), Topic, Dict) end, dict:new(), Topics)). %% @private Pick a router. @@ -162,7 +154,7 @@ init([Pool, Id, StatsFun, Opts]) -> ?GPROC_POOL(join, Pool, Id), - random:seed(os:timestamp()), + random:seed(erlang:now()), AgingSecs = proplists:get_value(route_aging, Opts, 5), @@ -261,7 +253,7 @@ try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> %% Lock topic first case mnesia:wread({topic, Topic}) of [] -> - mnesia:abort(not_found); + ok; %% mnesia:abort(not_found); [TopicR] -> %% Remove topic and trie delete_topic(TopicR), diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 88c69513d..454e9ab18 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -37,11 +37,10 @@ %% calls into the same function knowing that ordinary queues represent %% a base case. - -module(priority_queue). --export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, from_list/1, - in/2, in/3, out/1, out_p/1, join/2, filter/2, fold/3, highest/1]). +-export([new/0, is_queue/1, is_empty/1, len/1, plen/2, to_list/1, from_list/1, + in/2, in/3, out/1, out/2, out_p/1, join/2, filter/2, fold/3, highest/1]). %%---------------------------------------------------------------------------- @@ -58,6 +57,7 @@ -spec(is_queue/1 :: (any()) -> boolean()). -spec(is_empty/1 :: (pqueue()) -> boolean()). -spec(len/1 :: (pqueue()) -> non_neg_integer()). +-spec(len_p/2 :: (priority(), pqueue()) -> non_neg_integer()). -spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). -spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()). -spec(in/2 :: (any(), pqueue()) -> pqueue()). @@ -96,6 +96,16 @@ len({queue, _R, _F, L}) -> len({pqueue, Queues}) -> lists:sum([len(Q) || {_, Q} <- Queues]). +plen(0, {queue, _R, _F, L}) -> + L; +plen(P, {queue, _R, _F, _}) -> + erlang:error(badarg, [P]); +plen(P, {pqueue, Queues}) -> + case lists:keysearch(maybe_negate_priority(P), 1, Queues) of + {value, {_, Q}} -> len(Q); + false -> 0 + end. + to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) -> [{0, V} || V <- Out ++ lists:reverse(In, [])]; to_list({pqueue, Queues}) -> @@ -159,6 +169,28 @@ out({pqueue, [{P, Q} | Queues]}) -> out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0); out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)). +out(0, {queue, _, _, _} = Q) -> + out(Q); +out(Priority, {queue, _, _, _}) -> + erlang:error(badarg, [Priority]); +out(Priority, {pqueue, Queues}) -> + P = maybe_negate_priority(Priority), + case lists:keysearch(P, 1, Queues) of + {value, {_, Q}} -> + {R, Q1} = out(Q), + Queues1 = case is_empty(Q1) of + true -> lists:keydelete(P, 1, Queues); + false -> lists:keyreplace(P, 1, Queues, {P, Q1}) + end, + {R, case Queues1 of + [] -> {queue, [], [], 0}; + [{0, OnlyQ}] -> OnlyQ; + [_|_] -> {pqueue, Queues1} + end}; + false -> + {empty, {pqueue, Queues}} + end. + add_p(R, P) -> case R of {empty, Q} -> {empty, Q}; {{value, V}, Q} -> {{value, V, P}, Q}