diff --git a/CHANGELOG.md b/CHANGELOG.md index be3055b2d..88821daf8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,30 @@ emqttd ChangeLog ================== +0.9.0-alpha (2015-06-14) +------------------------- + +Session + +Queue + +Alarm + +Protocol Compliant + + +0.8.5-beta (2015-06-10) +------------------------- + +Bugfix: issue #53 - client will receive duplicate messages when overlapping subscription + + +0.8.4-beta (2015-06-08) +------------------------- + +Bugfix: issue #165 - duplicated message when publish 'retained' message to persistent client + + 0.8.3-beta (2015-06-05) ------------------------- diff --git a/CONTRIBUTORS b/CONTRIBUTORS new file mode 100644 index 000000000..8f1f4c344 --- /dev/null +++ b/CONTRIBUTORS @@ -0,0 +1,10 @@ + +# CONTRIBUTORS + +* [@callbay](https://github.com/callbay) +* [@hejin1026](https://github.com/hejin1026) +* [@desoulter](https://github.com/desoulter) +* [@turtleDeng](https://github.com/turtleDeng) +* [@Hades32](https://github.com/Hades32) +* [@huangdan](https://github.com/huangdan) + diff --git a/README.md b/README.md index edfba9761..6ab9acaba 100644 --- a/README.md +++ b/README.md @@ -102,12 +102,12 @@ The MIT License (MIT) ## Contributors -[@hejin1026](https://github.com/hejin1026) -[@desoulter](https://github.com/desoulter) -[@turtleDeng](https://github.com/turtleDeng) -[@Hades32](https://github.com/Hades32) -[@huangdan](https://github.com/huangdan) -[@callbay](https://github.com/callbay) +* [@hejin1026](https://github.com/hejin1026) +* [@desoulter](https://github.com/desoulter) +* [@turtleDeng](https://github.com/turtleDeng) +* [@Hades32](https://github.com/Hades32) +* [@huangdan](https://github.com/huangdan) +* [@callbay](https://github.com/callbay) ## Author diff --git a/apps/emqttd/src/emqttd_alarm.erl b/apps/emqttd/src/emqttd_alarm.erl new file mode 100644 index 000000000..73c432534 --- /dev/null +++ b/apps/emqttd/src/emqttd_alarm.erl @@ -0,0 +1,98 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% copy alarm_handler. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_alarm). + +-export([start_link/0, set_alarm/1, clear_alarm/1, get_alarms/0, + add_alarm_handler/1, add_alarm_handler/2, + delete_alarm_handler/1]). + +-export([init/1, handle_event/2, handle_call/2, handle_info/2, + terminate/2]). + +-define(SERVER, ?MODULE). + +-type alarm() :: {AlarmId :: any(), AlarmDescription :: string() | binary()}. + +start_link() -> + case gen_event:start_link({local, ?SERVER}) of + {ok, Pid} -> + gen_event:add_handler(?SERVER, ?MODULE, []), + {ok, Pid}; + Error -> + Error + end. + +-spec set_alarm(alarm()) -> ok. +set_alarm(Alarm) -> + gen_event:notify(?SERVER, {set_alarm, Alarm}). + +-spec clear_alarm(any()) -> ok. +clear_alarm(AlarmId) -> + gen_event:notify(?SERVER, {clear_alarm, AlarmId}). + +get_alarms() -> + gen_event:call(?SERVER, ?MODULE, get_alarms). + +add_alarm_handler(Module) when is_atom(Module) -> + gen_event:add_handler(?SERVER, Module, []). + +add_alarm_handler(Module, Args) when is_atom(Module) -> + gen_event:add_handler(?SERVER, Module, Args). + +delete_alarm_handler(Module) when is_atom(Module) -> + gen_event:delete_handler(?SERVER, Module, []). + +%%----------------------------------------------------------------- +%% Default Alarm handler +%%----------------------------------------------------------------- + +init(_) -> {ok, []}. + +handle_event({set_alarm, Alarm}, Alarms)-> + %%TODO: publish to $SYS + {ok, [Alarm | Alarms]}; + +handle_event({clear_alarm, AlarmId}, Alarms)-> + %TODO: publish to $SYS + {ok, lists:keydelete(AlarmId, 1, Alarms)}; + +handle_event(_, Alarms)-> + {ok, Alarms}. + +handle_info(_, Alarms) -> {ok, Alarms}. + +handle_call(get_alarms, Alarms) -> {ok, Alarms, Alarms}; + +handle_call(_Query, Alarms) -> {ok, {error, bad_query}, Alarms}. + +terminate(swap, Alarms) -> + {?MODULE, Alarms}; + +terminate(_, _) -> + ok. + diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 138808b08..f9434b385 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -76,8 +76,8 @@ start_servers(Sup) -> {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd stats", emqttd_stats}, {"emqttd metrics", emqttd_metrics}, - %{"emqttd router", emqttd_router}, {"emqttd broker", emqttd_broker}, + {"emqttd alarm", emqttd_alarm}, {"emqttd mode supervisor", emqttd_mod_sup}, {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, {"emqttd access control", emqttd_access_control}, diff --git a/apps/emqttd/src/emqttd_mqueue.erl b/apps/emqttd/src/emqttd_mqueue.erl index 58077355f..874aa82d0 100644 --- a/apps/emqttd/src/emqttd_mqueue.erl +++ b/apps/emqttd/src/emqttd_mqueue.erl @@ -1,15 +1,143 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% Simple message queue. +%%% +%%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client +%%% should be online in most of the time. +%%% +%%% This module wraps an erlang queue to store offline messages temporarily for MQTT +%%% persistent session. +%%% +%%% If the broker restarted or crashed, all the messages stored will be gone. +%%% +%%% @end +%%%----------------------------------------------------------------------------- -module(emqttd_mqueue). --export([init/1, in/1]). +-author("Feng Lee "). --record(queue_state, { - max_queued_messages = 1000 -}). +-include_lib("emqtt/include/emqtt.hrl"). -init(Opts) -> - {ok, #queue_state{}}. +-export([new/2, name/1, + is_empty/1, len/1, + in/2, out/1, + peek/1, + to_list/1]). -in(Msg, Q = #queue_state{}) -> - Q. +-define(MAX_LEN, 600). + +-define(HIGH_WM, 0.6). + +-define(LOW_WM, 0.2). + +-record(mqueue, {name, + len = 0, + max_len = ?MAX_LEN, + queue = queue:new(), + store_qos0 = false, + high_watermark = ?HIGH_WM, + low_watermark = ?LOW_WM, + alert = false}). + +-type mqueue() :: #mqueue{}. + +-type queue_option() :: {max_queued_messages, pos_integer()} %% Max messages queued + | {high_queue_watermark, float()} %% High watermark + | {low_queue_watermark, float()} %% Low watermark + | {queue_qos0_messages, boolean()}. %% Queue Qos0 messages? + +%%------------------------------------------------------------------------------ +%% @doc New Queue. +%% @end +%%------------------------------------------------------------------------------ +-spec new(binary() | string(), list(queue_option())) -> mqueue(). +new(Name, Opts) -> + MaxLen = emqttd_opts:g(max_queued_messages, Opts, ?MAX_LEN), + HighWM = round(MaxLen * emqttd_opts:g(high_queue_watermark, Opts, ?HIGH_WM)), + LowWM = round(MaxLen * emqttd_opts:g(low_queue_watermark, Opts, ?LOW_WM)), + StoreQos0 = emqttd_opts:g(queue_qos0_messages, Opts, false), + #mqueue{name = Name, + max_len = MaxLen, + store_qos0 = StoreQos0, + high_watermark = HighWM, + low_watermark = LowWM}. + +name(#mqueue{name = Name}) -> + Name. + +len(#mqueue{len = Len}) -> + Len. + +is_empty(#mqueue{len = 0}) -> true; +is_empty(_Q) -> false. + +%%------------------------------------------------------------------------------ +%% @doc +%% Queue one message. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec in(mqtt_message(), mqueue()) -> mqueue(). +in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) -> + MQ; +%% queue is full, drop the oldest +in(Msg, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen, queue = Q}) when Len =:= MaxLen -> + Q2 = case queue:out(Q) of + {{value, OldMsg}, Q1} -> + %%TODO: publish the dropped message to $SYS? + lager:error("Queue(~s) drop message: ~p", [Name, OldMsg]), + Q1; + {empty, Q1} -> %% maybe max_len is 1 + Q1 + end, + MQ#mqueue{queue = queue:in(Msg, Q2)}; +in(Msg, MQ = #mqueue{len = Len, queue = Q}) -> + maybe_set_alarm(MQ#mqueue{len = Len+1, queue = queue:in(Msg, Q)}). + +out(MQ = #mqueue{len = 0, queue = _Q}) -> + {empty, MQ}; +out(MQ = #mqueue{len = Len, queue = Q}) -> + {Result, Q1} = queue:out(Q), + {Result, maybe_clear_alarm(MQ#mqueue{len = Len - 1, queue = Q1})}. + +peek(#mqueue{queue = Q}) -> + queue:peek(Q). + +to_list(#mqueue{queue = Q}) -> + queue:to_list(Q). + +maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_watermark = HighWM, alert = false}) + when Len >= HighWM -> + AlarmDescr = io_lib:format("len ~p > high_watermark ~p", [Len, HighWM]), + emqttd_alarm:set_alarm({{queue_high_watermark, Name}, AlarmDescr}), + MQ#mqueue{alert = true}; +maybe_set_alarm(MQ) -> + MQ. + +maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_watermark = LowWM, alert = true}) + when Len =< LowWM -> + emqttd_alarm:clear_alarm({queue_high_watermark, Name}), MQ#mqueue{alert = false}; +maybe_clear_alarm(MQ) -> + MQ. diff --git a/apps/emqttd/src/emqttd_opts.erl b/apps/emqttd/src/emqttd_opts.erl index a83466a73..dfd5b74b6 100644 --- a/apps/emqttd/src/emqttd_opts.erl +++ b/apps/emqttd/src/emqttd_opts.erl @@ -28,7 +28,7 @@ -author("Feng Lee "). --export([merge/2]). +-export([merge/2, g/2, g/3]). %%------------------------------------------------------------------------------ %% @doc Merge Options @@ -50,3 +50,13 @@ merge(Defaults, Options) -> end end, Defaults, Options). +%%------------------------------------------------------------------------------ +%% @doc Get option +%% @end +%%------------------------------------------------------------------------------ +g(Key, Options) -> + proplists:get_value(Key, Options). + +g(Key, Options, Default) -> + proplists:get_value(Key, Options, Default). + diff --git a/apps/emqttd/src/emqttd_queue.erl b/apps/emqttd/src/emqttd_queue.erl deleted file mode 100644 index d9897a3f8..000000000 --- a/apps/emqttd/src/emqttd_queue.erl +++ /dev/null @@ -1,98 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd simple queue. -%%% -%%% @end -%%%----------------------------------------------------------------------------- - -%% TODO: this module should be rewrited... - --module(emqttd_queue). - --author("Feng Lee "). - --include_lib("emqtt/include/emqtt.hrl"). - --export([new/1, new/2, in/3, all/1, clear/1]). - --define(DEFAULT_MAX_LEN, 1000). - --record(mqtt_queue_wrapper, {queue = queue:new(), - max_len = ?DEFAULT_MAX_LEN, - store_qos0 = false}). - --type mqtt_queue() :: #mqtt_queue_wrapper{}. - -%%------------------------------------------------------------------------------ -%% @doc -%% New Queue. -%% -%% @end -%%------------------------------------------------------------------------------ --spec new(non_neg_integer()) -> mqtt_queue(). -new(MaxLen) -> #mqtt_queue_wrapper{max_len = MaxLen}. - -new(MaxLen, StoreQos0) -> #mqtt_queue_wrapper{max_len = MaxLen, store_qos0 = StoreQos0}. - -%%------------------------------------------------------------------------------ -%% @doc -%% Queue one message. -%% -%% @end -%%------------------------------------------------------------------------------ --spec in(binary(), mqtt_message(), mqtt_queue()) -> mqtt_queue(). -in(ClientId, Message = #mqtt_message{qos = Qos}, - Wrapper = #mqtt_queue_wrapper{queue = Queue, max_len = MaxLen}) -> - case queue:len(Queue) < MaxLen of - true -> - Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue)}; - false -> % full - if - Qos =:= ?QOS_0 -> - lager:error("Queue ~s drop qos0 message: ~p", [ClientId, Message]), - Wrapper; - true -> - {{value, Msg}, Queue1} = queue:drop(Queue), - lager:error("Queue ~s drop message: ~p", [ClientId, Msg]), - Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue1)} - end - end. - -%%------------------------------------------------------------------------------ -%% @doc -%% Get all messages in queue. -%% -%% @end -%%------------------------------------------------------------------------------ --spec all(mqtt_queue()) -> list(). -all(#mqtt_queue_wrapper { queue = Queue }) -> queue:to_list(Queue). - -%%------------------------------------------------------------------------------ -%% @doc -%% Clear queue. -%% -%% @end -%%------------------------------------------------------------------------------ --spec clear(mqtt_queue()) -> mqtt_queue(). -clear(Queue) -> Queue#mqtt_queue_wrapper{queue = queue:new()}. - diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index ed10c37a7..313b8eb3d 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -210,17 +210,31 @@ puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> %%------------------------------------------------------------------------------ -spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}. subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) -> - Resubs = [Topic || {Name, _Qos} = Topic <- Topics, maps:is_key(Name, SubMap)], - case Resubs of - [] -> ok; - _ -> lager:warning("~s resubscribe ~p", [ClientId, Resubs]) - end, - SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics), + + %% subscribe first and don't care if the subscriptions have been existed {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), + lager:info([{client, ClientId}], "Client ~s subscribe ~p. Granted QoS: ~p", - [ClientId, Topics, GrantedQos]), - %%TODO: should be gen_event and notification... - [emqttd_msg_store:redeliver(Name, self()) || {Name, _} <- Topics], + [ClientId, Topics, GrantedQos]), + + + %% : 3.8.4 + %% Where the Topic Filter is not identical to any existing Subscription’s filter, + %% a new Subscription is created and all matching retained messages are sent. + lists:foreach(fun({Name, _Qos}) -> + case maps:is_key(Name, SubMap) of + true -> + lager:warning("~s resubscribe ~p", [ClientId, Name]); + false -> + %%TODO: this is not right, rewrite later... + emqttd_msg_store:redeliver(Name, self()) + end + end, Topics), + + SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> + maps:put(Name, Qos, Acc) + end, SubMap, Topics), + {ok, SessState#session_state{submap = SubMap1}, GrantedQos}; subscribe(SessPid, Topics) when is_pid(SessPid) -> diff --git a/doc/.retain.md.swp b/doc/.retain.md.swp deleted file mode 100644 index 648c9731d..000000000 Binary files a/doc/.retain.md.swp and /dev/null differ diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index ca11109e4..5de150b4b 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -97,14 +97,14 @@ {unack_retry_after, 4} ]}, {queue, [ - %% Max messages queued when client is disconnected, or inflight messsages is overload + %% Max messages queued when client is disconnected, or inflight messsage window is overload {max_queued_messages, 200}, %% High watermark of queued messsages - {high_queue_watermark, 0.6}, + {high_queue_watermark, 0.8}, %% Low watermark of queued messsages {low_queue_watermark, 0.2}, %% Queue Qos0 offline messages? - {queue_qos0_messages, false} + {queue_qos0_messages, true} ]} ]}, %% Broker Options