From 9d5223dd1ace63ea2b07dfddc7db7d99a5e846d5 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 8 Jun 2015 23:31:20 +0800 Subject: [PATCH 01/14] contributors --- README.md | 12 +++++------ apps/emqttd/src/emqttd_session.erl | 32 +++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index edfba9761..6ab9acaba 100644 --- a/README.md +++ b/README.md @@ -102,12 +102,12 @@ The MIT License (MIT) ## Contributors -[@hejin1026](https://github.com/hejin1026) -[@desoulter](https://github.com/desoulter) -[@turtleDeng](https://github.com/turtleDeng) -[@Hades32](https://github.com/Hades32) -[@huangdan](https://github.com/huangdan) -[@callbay](https://github.com/callbay) +* [@hejin1026](https://github.com/hejin1026) +* [@desoulter](https://github.com/desoulter) +* [@turtleDeng](https://github.com/turtleDeng) +* [@Hades32](https://github.com/Hades32) +* [@huangdan](https://github.com/huangdan) +* [@callbay](https://github.com/callbay) ## Author diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 15a465d4d..a629e3b08 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -173,17 +173,31 @@ puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> %%------------------------------------------------------------------------------ -spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}. subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) -> - Resubs = [Topic || {Name, _Qos} = Topic <- Topics, maps:is_key(Name, SubMap)], - case Resubs of - [] -> ok; - _ -> lager:warning("~s resubscribe ~p", [ClientId, Resubs]) - end, - SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics), + + %% subscribe first and don't care if the subscriptions have been existed {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), + lager:info([{client, ClientId}], "Client ~s subscribe ~p. Granted QoS: ~p", - [ClientId, Topics, GrantedQos]), - %%TODO: should be gen_event and notification... - [emqttd_msg_store:redeliver(Name, self()) || {Name, _} <- Topics], + [ClientId, Topics, GrantedQos]), + + + %% : 3.8.4 + %% Where the Topic Filter is not identical to any existing Subscription’s filter, + %% a new Subscription is created and all matching retained messages are sent. + lists:foreach(fun({Name, _Qos}) -> + case maps:is_key(Name, SubMap) of + true -> + lager:warning("~s resubscribe ~p", [ClientId, Name]); + false -> + %%TODO: this is not right, rewrite later... + emqttd_msg_store:redeliver(Name, self()) + end + end, Topics), + + SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> + maps:put(Name, Qos, Acc) + end, SubMap, Topics), + {ok, SessState#session_state{submap = SubMap1}, GrantedQos}; subscribe(SessPid, Topics) when is_pid(SessPid) -> From bfc83b8c4554c7d71ab62cd9d13e2236f697b091 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 8 Jun 2015 23:31:33 +0800 Subject: [PATCH 02/14] 0.8.4 --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index be3055b2d..cb598e803 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ emqttd ChangeLog ================== +0.8.4-beta (2015-06-08) +------------------------- + +Bugfix: issue #165 - duplicated message when publish 'retained' message to persistent client + + 0.8.3-beta (2015-06-05) ------------------------- From f0583a1c29a91edf949dc75c58b46263d9b23079 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 8 Jun 2015 23:34:09 +0800 Subject: [PATCH 03/14] 0.8.4 --- CONTRIBUTORS | 10 ++++++++++ apps/emqtt/src/emqtt.app.src | 2 +- apps/emqttd/src/emqttd.app.src | 2 +- rel/reltool.config | 2 +- 4 files changed, 13 insertions(+), 3 deletions(-) create mode 100644 CONTRIBUTORS diff --git a/CONTRIBUTORS b/CONTRIBUTORS new file mode 100644 index 000000000..8f1f4c344 --- /dev/null +++ b/CONTRIBUTORS @@ -0,0 +1,10 @@ + +# CONTRIBUTORS + +* [@callbay](https://github.com/callbay) +* [@hejin1026](https://github.com/hejin1026) +* [@desoulter](https://github.com/desoulter) +* [@turtleDeng](https://github.com/turtleDeng) +* [@Hades32](https://github.com/Hades32) +* [@huangdan](https://github.com/huangdan) + diff --git a/apps/emqtt/src/emqtt.app.src b/apps/emqtt/src/emqtt.app.src index c444276fa..06751dfac 100644 --- a/apps/emqtt/src/emqtt.app.src +++ b/apps/emqtt/src/emqtt.app.src @@ -1,7 +1,7 @@ {application, emqtt, [ {description, "Erlang MQTT Common Library"}, - {vsn, "0.8.3"}, + {vsn, "0.8.4"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqttd/src/emqttd.app.src b/apps/emqttd/src/emqttd.app.src index 5c3347f27..a4e0d72e7 100644 --- a/apps/emqttd/src/emqttd.app.src +++ b/apps/emqttd/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.8.3"}, + {vsn, "0.8.4"}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/rel/reltool.config b/rel/reltool.config index f26654f74..6e5d1dde7 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -4,7 +4,7 @@ {lib_dirs, ["../apps", "../deps", "../plugins"]}, {erts, [{mod_cond, derived}, {app_file, strip}]}, {app_file, strip}, - {rel, "emqttd", "0.8.3", + {rel, "emqttd", "0.8.4", [ kernel, stdlib, From 29540946194fe7bf32edf74b82a656ddac1e5d45 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 9 Jun 2015 11:50:53 +0800 Subject: [PATCH 04/14] fix issue #53 - client will receive duplicate messages when overlapping subscription --- apps/emqttd/src/emqttd_pubsub.erl | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index b52037d29..b89355c76 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_pubsub). -author("Feng Lee "). @@ -217,6 +218,7 @@ match(Topic) when is_binary(Topic) -> init([Id, _Opts]) -> process_flag(min_heap_size, 1024*1024), gproc_pool:connect_worker(pubsub, {?MODULE, Id}), + %%TODO: gb_trees to replace maps? {ok, #state{id = Id, submap = maps:new()}}. handle_call({subscribe, SubPid, Topics}, _From, State) -> @@ -384,9 +386,24 @@ add_topic(TopicR = #mqtt_topic{topic = Topic}) -> end end. -add_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) -> +%% Fix issue #53 - Remove Overlapping Subscriptions +add_subscriber({TopicR, Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}}) + when is_record(TopicR, mqtt_topic) -> case add_topic(TopicR) of ok -> + OverlapSubs = [Sub || Sub = #mqtt_subscriber{topic = SubTopic, qos = SubQos} + <- mnesia:index_read(subscriber, SubPid, #mqtt_subscriber.pid), + SubTopic =:= Topic, SubQos =/= Qos], + + %% remove overlapping subscribers + if + length(OverlapSubs) =:= 0 -> ok; + true -> + lager:warning("Remove overlapping subscribers: ~p", [OverlapSubs]), + [mnesia:delete_object(subscriber, OverlapSub, write) || OverlapSub <- OverlapSubs] + end, + + %% insert subscriber mnesia:write(subscriber, Subscriber, write); Error -> Error From 553fb394f37ce68437a2d8c977bdff73c30659f4 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 9 Jun 2015 11:57:44 +0800 Subject: [PATCH 05/14] vsn 'git' --- apps/emqtt/src/emqtt.app.src | 2 +- apps/emqttd/src/emqttd.app.src | 2 +- rel/reltool.config | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqtt/src/emqtt.app.src b/apps/emqtt/src/emqtt.app.src index 06751dfac..c84d855f6 100644 --- a/apps/emqtt/src/emqtt.app.src +++ b/apps/emqtt/src/emqtt.app.src @@ -1,7 +1,7 @@ {application, emqtt, [ {description, "Erlang MQTT Common Library"}, - {vsn, "0.8.4"}, + {vsn, git}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqttd/src/emqttd.app.src b/apps/emqttd/src/emqttd.app.src index a4e0d72e7..024fcdac9 100644 --- a/apps/emqttd/src/emqttd.app.src +++ b/apps/emqttd/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.8.4"}, + {vsn, git}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/rel/reltool.config b/rel/reltool.config index 6e5d1dde7..2dbb76ea5 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -4,7 +4,7 @@ {lib_dirs, ["../apps", "../deps", "../plugins"]}, {erts, [{mod_cond, derived}, {app_file, strip}]}, {app_file, strip}, - {rel, "emqttd", "0.8.4", + {rel, "emqttd", git, [ kernel, stdlib, From d5e35b2423bd0d60f9de3141b93bbd05379a9376 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 9 Jun 2015 11:58:37 +0800 Subject: [PATCH 06/14] 0.8.5 --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb598e803..6943606bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ emqttd ChangeLog ================== +0.8.5-beta (2015-06-12) +------------------------- + +Bugfix: issue #53 - client will receive duplicate messages when overlapping subscription + + 0.8.4-beta (2015-06-08) ------------------------- From 9bc84d2b53f3b2267fd80ff520f2e6a723506fdb Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 9 Jun 2015 12:01:14 +0800 Subject: [PATCH 07/14] rm .swp --- doc/.retain.md.swp | Bin 12288 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 doc/.retain.md.swp diff --git a/doc/.retain.md.swp b/doc/.retain.md.swp deleted file mode 100644 index 648c9731dc496c528162ce136a590152afb2b1ac..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12288 zcmeI&KQ9D97{~EB5|M}y-oU!8?H*UqDzCWYB6s?+``~2HGCK+ll{cYLD|KE8Dis~? z!Lw&IiY=~N`A+i7lga$a?5Eqz9Ci+R{a`2R8f~k_oZLPQ7w#YDAC`;}ca@2h1NF_0 zoz9E0Jh55b%G Date: Wed, 10 Jun 2015 16:24:31 +0800 Subject: [PATCH 08/14] 0.8.5 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6943606bf..a8b61e4a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ emqttd ChangeLog ================== -0.8.5-beta (2015-06-12) +0.8.5-beta (2015-06-10) ------------------------- Bugfix: issue #53 - client will receive duplicate messages when overlapping subscription From 5559cd7f58170c25700db662b99948dd34cf8a80 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 11 Jun 2015 00:01:25 +0800 Subject: [PATCH 09/14] add alarm --- apps/emqttd/src/emqttd_alarm.erl | 98 +++++++++++++++++++ .../{emqttd_queue.erl => emqttd_mqueue.erl} | 0 2 files changed, 98 insertions(+) create mode 100644 apps/emqttd/src/emqttd_alarm.erl rename apps/emqttd/src/{emqttd_queue.erl => emqttd_mqueue.erl} (100%) diff --git a/apps/emqttd/src/emqttd_alarm.erl b/apps/emqttd/src/emqttd_alarm.erl new file mode 100644 index 000000000..73c432534 --- /dev/null +++ b/apps/emqttd/src/emqttd_alarm.erl @@ -0,0 +1,98 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% copy alarm_handler. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_alarm). + +-export([start_link/0, set_alarm/1, clear_alarm/1, get_alarms/0, + add_alarm_handler/1, add_alarm_handler/2, + delete_alarm_handler/1]). + +-export([init/1, handle_event/2, handle_call/2, handle_info/2, + terminate/2]). + +-define(SERVER, ?MODULE). + +-type alarm() :: {AlarmId :: any(), AlarmDescription :: string() | binary()}. + +start_link() -> + case gen_event:start_link({local, ?SERVER}) of + {ok, Pid} -> + gen_event:add_handler(?SERVER, ?MODULE, []), + {ok, Pid}; + Error -> + Error + end. + +-spec set_alarm(alarm()) -> ok. +set_alarm(Alarm) -> + gen_event:notify(?SERVER, {set_alarm, Alarm}). + +-spec clear_alarm(any()) -> ok. +clear_alarm(AlarmId) -> + gen_event:notify(?SERVER, {clear_alarm, AlarmId}). + +get_alarms() -> + gen_event:call(?SERVER, ?MODULE, get_alarms). + +add_alarm_handler(Module) when is_atom(Module) -> + gen_event:add_handler(?SERVER, Module, []). + +add_alarm_handler(Module, Args) when is_atom(Module) -> + gen_event:add_handler(?SERVER, Module, Args). + +delete_alarm_handler(Module) when is_atom(Module) -> + gen_event:delete_handler(?SERVER, Module, []). + +%%----------------------------------------------------------------- +%% Default Alarm handler +%%----------------------------------------------------------------- + +init(_) -> {ok, []}. + +handle_event({set_alarm, Alarm}, Alarms)-> + %%TODO: publish to $SYS + {ok, [Alarm | Alarms]}; + +handle_event({clear_alarm, AlarmId}, Alarms)-> + %TODO: publish to $SYS + {ok, lists:keydelete(AlarmId, 1, Alarms)}; + +handle_event(_, Alarms)-> + {ok, Alarms}. + +handle_info(_, Alarms) -> {ok, Alarms}. + +handle_call(get_alarms, Alarms) -> {ok, Alarms, Alarms}; + +handle_call(_Query, Alarms) -> {ok, {error, bad_query}, Alarms}. + +terminate(swap, Alarms) -> + {?MODULE, Alarms}; + +terminate(_, _) -> + ok. + diff --git a/apps/emqttd/src/emqttd_queue.erl b/apps/emqttd/src/emqttd_mqueue.erl similarity index 100% rename from apps/emqttd/src/emqttd_queue.erl rename to apps/emqttd/src/emqttd_mqueue.erl From 50d65897ce1894078c521e56e2037d099c9a8e1f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 11 Jun 2015 00:02:45 +0800 Subject: [PATCH 10/14] session, queue config --- rel/files/emqttd.config | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 442c801d7..4f08c5e9a 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -90,21 +90,21 @@ %% Expired after 2 days {expired_after, 48}, %% Max number of QoS 1 and 2 messages that can be “in flight” at one time. - {max_inflight_messages, 20}, + {max_inflight_message, 20}, %% Max retries for unacknolege Qos1/2 messages {max_unack_retries, 3}, %% Retry after 10 seconds {unack_retry_after, 10} ]}, {queue, [ - %% Max messages queued when client is disconnected, or inflight messsages is overload - {max_queued_messages, 100}, + %% Max messages queued when client is disconnected, or inflight messsage window is overload + {max_queued_messages, 200}, %% High watermark of queued messsages {high_queue_watermark, 0.6}, %% Low watermark of queued messsages {low_queue_watermark, 0.2}, %% Queue Qos0 offline messages? - {queue_qos0_messages, false} + {queue_qos0_messages, true} ]} ]}, %% Broker Options From db2cc7ba0b776fe3818266bdc097cef967e46073 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 11 Jun 2015 00:03:03 +0800 Subject: [PATCH 11/14] git --- apps/emqttd/src/emqttd.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqttd/src/emqttd.app.src b/apps/emqttd/src/emqttd.app.src index 5c3347f27..024fcdac9 100644 --- a/apps/emqttd/src/emqttd.app.src +++ b/apps/emqttd/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.8.3"}, + {vsn, git}, {modules, []}, {registered, []}, {applications, [kernel, From c4027dfc16b3940ab3f78e619ec0daa5338888a6 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 11 Jun 2015 00:05:20 +0800 Subject: [PATCH 12/14] new queue --- apps/emqttd/src/emqttd_mqueue.erl | 142 +++++++++++++++++++----------- 1 file changed, 93 insertions(+), 49 deletions(-) diff --git a/apps/emqttd/src/emqttd_mqueue.erl b/apps/emqttd/src/emqttd_mqueue.erl index d9897a3f8..a76c5ef97 100644 --- a/apps/emqttd/src/emqttd_mqueue.erl +++ b/apps/emqttd/src/emqttd_mqueue.erl @@ -20,39 +20,76 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd simple queue. +%%% simple message queue. +%%% +%%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client +%%% should be online in most of the time. +%%% +%%% This module wraps an erlang queue to store offline messages temporarily for MQTT +%%% persistent session. +%%% +%%% If the broker restarted or crashed, all the messages stored will be gone. %%% %%% @end %%%----------------------------------------------------------------------------- -%% TODO: this module should be rewrited... - --module(emqttd_queue). +-module(emqttd_mqueue). -author("Feng Lee "). -include_lib("emqtt/include/emqtt.hrl"). --export([new/1, new/2, in/3, all/1, clear/1]). +-export([new/2, name/1, + is_empty/1, len/1, + in/2, out/1, + peek/1, + to_list/1]). --define(DEFAULT_MAX_LEN, 1000). +%% in_r/2, out_r/1, --record(mqtt_queue_wrapper, {queue = queue:new(), - max_len = ?DEFAULT_MAX_LEN, - store_qos0 = false}). +-define(MAX_LEN, 600). --type mqtt_queue() :: #mqtt_queue_wrapper{}. +-define(HIGH_WM, 0.6). + +-define(LOW_WM, 0.2). + +-record(mqueue, {name, + len = 0, + max_len = ?MAX_LEN, + queue = queue:new(), + store_qos0 = false, + high_watermark = ?HIGH_WM, + low_watermark = ?LOW_WM, + alert = false}). + +-type mqueue() :: #mqueue{}. + +-type queue_option() :: {max_queued_messages, pos_integer()} %% Max messages queued + | {high_queue_watermark, float()} %% High watermark + | {low_queue_watermark, float()} %% Low watermark + | {queue_qos0_messages, boolean()}. %% Queue Qos0 messages? %%------------------------------------------------------------------------------ -%% @doc -%% New Queue. -%% +%% @doc New Queue. %% @end %%------------------------------------------------------------------------------ --spec new(non_neg_integer()) -> mqtt_queue(). -new(MaxLen) -> #mqtt_queue_wrapper{max_len = MaxLen}. +-spec new(binary() | string(), list(queue_option())) -> mqueue(). +new(Name, Opts) -> + MaxLen = emqttd_opts:g(max_queued_messages, Opts, ?MAX_LEN), + HighWM = round(MaxLen * emqttd_opts:g(high_queue_watermark, Opts, ?HIGH_WM)), + LowWM = round(MaxLen * emqttd_opts:g(low_queue_watermark, Opts, ?LOW_WM)), + #mqueue{name = Name, max_len = MaxLen, + store_qos0 = emqttd_opts:g(queue_qos0_messages, Opts, false), + high_watermark = HighWM, low_watermark = LowWM}. -new(MaxLen, StoreQos0) -> #mqtt_queue_wrapper{max_len = MaxLen, store_qos0 = StoreQos0}. +name(#mqueue{name = Name}) -> + Name. + +len(#mqueue{len = Len}) -> + Len. + +is_empty(#mqueue{len = 0}) -> true; +is_empty(_Q) -> false. %%------------------------------------------------------------------------------ %% @doc @@ -60,39 +97,46 @@ new(MaxLen, StoreQos0) -> #mqtt_queue_wrapper{max_len = MaxLen, store_qos0 = Sto %% %% @end %%------------------------------------------------------------------------------ --spec in(binary(), mqtt_message(), mqtt_queue()) -> mqtt_queue(). -in(ClientId, Message = #mqtt_message{qos = Qos}, - Wrapper = #mqtt_queue_wrapper{queue = Queue, max_len = MaxLen}) -> - case queue:len(Queue) < MaxLen of - true -> - Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue)}; - false -> % full - if - Qos =:= ?QOS_0 -> - lager:error("Queue ~s drop qos0 message: ~p", [ClientId, Message]), - Wrapper; - true -> - {{value, Msg}, Queue1} = queue:drop(Queue), - lager:error("Queue ~s drop message: ~p", [ClientId, Msg]), - Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue1)} - end - end. +-spec in(mqtt_message(), mqueue()) -> mqueue(). +in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) -> + MQ; +%% queue is full, drop the oldest +in(Msg, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen, queue = Q}) when Len =:= MaxLen -> + Q2 = case queue:out(Q) of + {{value, OldMsg}, Q1} -> + %%TODO: publish the dropped message to $SYS? + lager:error("Queue(~s) drop message: ~p", [Name, OldMsg]), + Q1; + {empty, Q1} -> %% maybe max_len is 1 + Q1 + end, + MQ#mqueue{queue = queue:in(Msg, Q2)}; +in(Msg, MQ = #mqueue{len = Len, queue = Q}) -> + maybe_set_alarm(MQ#mqueue{len = Len+1, queue = queue:in(Msg, Q)}). -%%------------------------------------------------------------------------------ -%% @doc -%% Get all messages in queue. -%% -%% @end -%%------------------------------------------------------------------------------ --spec all(mqtt_queue()) -> list(). -all(#mqtt_queue_wrapper { queue = Queue }) -> queue:to_list(Queue). +out(MQ = #mqueue{len = 0, queue = _Q}) -> + {empty, MQ}; +out(MQ = #mqueue{len = Len, queue = Q}) -> + {Result, Q1} = queue:out(Q), + {Result, maybe_clear_alarm(MQ#mqueue{len = Len - 1, queue = Q1})}. -%%------------------------------------------------------------------------------ -%% @doc -%% Clear queue. -%% -%% @end -%%------------------------------------------------------------------------------ --spec clear(mqtt_queue()) -> mqtt_queue(). -clear(Queue) -> Queue#mqtt_queue_wrapper{queue = queue:new()}. +peek(#mqueue{queue = Q}) -> + queue:peek(Q). + +to_list(#mqueue{queue = Q}) -> + queue:to_list(Q). + +maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_watermark = HighWM, alert = false}) + when Len >= HighWM -> + AlarmDescr = io_lib:format("len ~p > high_watermark ~p", [Len, HighWM]), + emqttd_alarm:set_alarm({{queue_high_watermark, Name}, AlarmDescr}), + MQ#mqueue{alert = true}; +maybe_set_alarm(MQ) -> + MQ. + +maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_watermark = LowWM, alert = true}) + when Len =< LowWM -> + emqttd_alarm:clear_alarm({queue_high_watermark, Name}), MQ#mqueue{alert = false}; +maybe_clear_alarm(MQ) -> + MQ. From 051b8604e86a9a93fc1b82e97b7618533899a20c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 11 Jun 2015 00:05:44 +0800 Subject: [PATCH 13/14] g/2, g/3 --- apps/emqttd/src/emqttd_opts.erl | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/apps/emqttd/src/emqttd_opts.erl b/apps/emqttd/src/emqttd_opts.erl index a83466a73..dfd5b74b6 100644 --- a/apps/emqttd/src/emqttd_opts.erl +++ b/apps/emqttd/src/emqttd_opts.erl @@ -28,7 +28,7 @@ -author("Feng Lee "). --export([merge/2]). +-export([merge/2, g/2, g/3]). %%------------------------------------------------------------------------------ %% @doc Merge Options @@ -50,3 +50,13 @@ merge(Defaults, Options) -> end end, Defaults, Options). +%%------------------------------------------------------------------------------ +%% @doc Get option +%% @end +%%------------------------------------------------------------------------------ +g(Key, Options) -> + proplists:get_value(Key, Options). + +g(Key, Options, Default) -> + proplists:get_value(Key, Options, Default). + From 53099f253637db9f09d9e8779e6b14822ab5999c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 11 Jun 2015 10:23:13 +0800 Subject: [PATCH 14/14] 0.9.0 --- CHANGELOG.md | 12 ++++++++++++ apps/emqttd/src/emqttd_app.erl | 2 +- apps/emqttd/src/emqttd_mqueue.erl | 13 +++++++------ 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8b61e4a7..88821daf8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,18 @@ emqttd ChangeLog ================== +0.9.0-alpha (2015-06-14) +------------------------- + +Session + +Queue + +Alarm + +Protocol Compliant + + 0.8.5-beta (2015-06-10) ------------------------- diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 138808b08..f9434b385 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -76,8 +76,8 @@ start_servers(Sup) -> {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd stats", emqttd_stats}, {"emqttd metrics", emqttd_metrics}, - %{"emqttd router", emqttd_router}, {"emqttd broker", emqttd_broker}, + {"emqttd alarm", emqttd_alarm}, {"emqttd mode supervisor", emqttd_mod_sup}, {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, {"emqttd access control", emqttd_access_control}, diff --git a/apps/emqttd/src/emqttd_mqueue.erl b/apps/emqttd/src/emqttd_mqueue.erl index a76c5ef97..874aa82d0 100644 --- a/apps/emqttd/src/emqttd_mqueue.erl +++ b/apps/emqttd/src/emqttd_mqueue.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% simple message queue. +%%% Simple message queue. %%% %%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client %%% should be online in most of the time. @@ -45,8 +45,6 @@ peek/1, to_list/1]). -%% in_r/2, out_r/1, - -define(MAX_LEN, 600). -define(HIGH_WM, 0.6). @@ -78,9 +76,12 @@ new(Name, Opts) -> MaxLen = emqttd_opts:g(max_queued_messages, Opts, ?MAX_LEN), HighWM = round(MaxLen * emqttd_opts:g(high_queue_watermark, Opts, ?HIGH_WM)), LowWM = round(MaxLen * emqttd_opts:g(low_queue_watermark, Opts, ?LOW_WM)), - #mqueue{name = Name, max_len = MaxLen, - store_qos0 = emqttd_opts:g(queue_qos0_messages, Opts, false), - high_watermark = HighWM, low_watermark = LowWM}. + StoreQos0 = emqttd_opts:g(queue_qos0_messages, Opts, false), + #mqueue{name = Name, + max_len = MaxLen, + store_qos0 = StoreQos0, + high_watermark = HighWM, + low_watermark = LowWM}. name(#mqueue{name = Name}) -> Name.