new session

This commit is contained in:
Feng 2015-06-11 12:15:20 +08:00
commit f857f1ec19
11 changed files with 312 additions and 126 deletions

View File

@ -2,6 +2,30 @@
emqttd ChangeLog
==================
0.9.0-alpha (2015-06-14)
-------------------------
Session
Queue
Alarm
Protocol Compliant
0.8.5-beta (2015-06-10)
-------------------------
Bugfix: issue #53 - client will receive duplicate messages when overlapping subscription
0.8.4-beta (2015-06-08)
-------------------------
Bugfix: issue #165 - duplicated message when publish 'retained' message to persistent client
0.8.3-beta (2015-06-05)
-------------------------

10
CONTRIBUTORS Normal file
View File

@ -0,0 +1,10 @@
# CONTRIBUTORS
* [@callbay](https://github.com/callbay)
* [@hejin1026](https://github.com/hejin1026)
* [@desoulter](https://github.com/desoulter)
* [@turtleDeng](https://github.com/turtleDeng)
* [@Hades32](https://github.com/Hades32)
* [@huangdan](https://github.com/huangdan)

View File

@ -102,12 +102,12 @@ The MIT License (MIT)
## Contributors
[@hejin1026](https://github.com/hejin1026)
[@desoulter](https://github.com/desoulter)
[@turtleDeng](https://github.com/turtleDeng)
[@Hades32](https://github.com/Hades32)
[@huangdan](https://github.com/huangdan)
[@callbay](https://github.com/callbay)
* [@hejin1026](https://github.com/hejin1026)
* [@desoulter](https://github.com/desoulter)
* [@turtleDeng](https://github.com/turtleDeng)
* [@Hades32](https://github.com/Hades32)
* [@huangdan](https://github.com/huangdan)
* [@callbay](https://github.com/callbay)
## Author

View File

@ -0,0 +1,98 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% copy alarm_handler.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_alarm).
-export([start_link/0, set_alarm/1, clear_alarm/1, get_alarms/0,
add_alarm_handler/1, add_alarm_handler/2,
delete_alarm_handler/1]).
-export([init/1, handle_event/2, handle_call/2, handle_info/2,
terminate/2]).
-define(SERVER, ?MODULE).
-type alarm() :: {AlarmId :: any(), AlarmDescription :: string() | binary()}.
start_link() ->
case gen_event:start_link({local, ?SERVER}) of
{ok, Pid} ->
gen_event:add_handler(?SERVER, ?MODULE, []),
{ok, Pid};
Error ->
Error
end.
-spec set_alarm(alarm()) -> ok.
set_alarm(Alarm) ->
gen_event:notify(?SERVER, {set_alarm, Alarm}).
-spec clear_alarm(any()) -> ok.
clear_alarm(AlarmId) ->
gen_event:notify(?SERVER, {clear_alarm, AlarmId}).
get_alarms() ->
gen_event:call(?SERVER, ?MODULE, get_alarms).
add_alarm_handler(Module) when is_atom(Module) ->
gen_event:add_handler(?SERVER, Module, []).
add_alarm_handler(Module, Args) when is_atom(Module) ->
gen_event:add_handler(?SERVER, Module, Args).
delete_alarm_handler(Module) when is_atom(Module) ->
gen_event:delete_handler(?SERVER, Module, []).
%%-----------------------------------------------------------------
%% Default Alarm handler
%%-----------------------------------------------------------------
init(_) -> {ok, []}.
handle_event({set_alarm, Alarm}, Alarms)->
%%TODO: publish to $SYS
{ok, [Alarm | Alarms]};
handle_event({clear_alarm, AlarmId}, Alarms)->
%TODO: publish to $SYS
{ok, lists:keydelete(AlarmId, 1, Alarms)};
handle_event(_, Alarms)->
{ok, Alarms}.
handle_info(_, Alarms) -> {ok, Alarms}.
handle_call(get_alarms, Alarms) -> {ok, Alarms, Alarms};
handle_call(_Query, Alarms) -> {ok, {error, bad_query}, Alarms}.
terminate(swap, Alarms) ->
{?MODULE, Alarms};
terminate(_, _) ->
ok.

View File

@ -76,8 +76,8 @@ start_servers(Sup) ->
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}},
{"emqttd stats", emqttd_stats},
{"emqttd metrics", emqttd_metrics},
%{"emqttd router", emqttd_router},
{"emqttd broker", emqttd_broker},
{"emqttd alarm", emqttd_alarm},
{"emqttd mode supervisor", emqttd_mod_sup},
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
{"emqttd access control", emqttd_access_control},

View File

@ -1,15 +1,143 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% Simple message queue.
%%%
%%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
%%% should be online in most of the time.
%%%
%%% This module wraps an erlang queue to store offline messages temporarily for MQTT
%%% persistent session.
%%%
%%% If the broker restarted or crashed, all the messages stored will be gone.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_mqueue).
-export([init/1, in/1]).
-author("Feng Lee <feng@emqtt.io>").
-record(queue_state, {
max_queued_messages = 1000
}).
-include_lib("emqtt/include/emqtt.hrl").
init(Opts) ->
{ok, #queue_state{}}.
-export([new/2, name/1,
is_empty/1, len/1,
in/2, out/1,
peek/1,
to_list/1]).
in(Msg, Q = #queue_state{}) ->
Q.
-define(MAX_LEN, 600).
-define(HIGH_WM, 0.6).
-define(LOW_WM, 0.2).
-record(mqueue, {name,
len = 0,
max_len = ?MAX_LEN,
queue = queue:new(),
store_qos0 = false,
high_watermark = ?HIGH_WM,
low_watermark = ?LOW_WM,
alert = false}).
-type mqueue() :: #mqueue{}.
-type queue_option() :: {max_queued_messages, pos_integer()} %% Max messages queued
| {high_queue_watermark, float()} %% High watermark
| {low_queue_watermark, float()} %% Low watermark
| {queue_qos0_messages, boolean()}. %% Queue Qos0 messages?
%%------------------------------------------------------------------------------
%% @doc New Queue.
%% @end
%%------------------------------------------------------------------------------
-spec new(binary() | string(), list(queue_option())) -> mqueue().
new(Name, Opts) ->
MaxLen = emqttd_opts:g(max_queued_messages, Opts, ?MAX_LEN),
HighWM = round(MaxLen * emqttd_opts:g(high_queue_watermark, Opts, ?HIGH_WM)),
LowWM = round(MaxLen * emqttd_opts:g(low_queue_watermark, Opts, ?LOW_WM)),
StoreQos0 = emqttd_opts:g(queue_qos0_messages, Opts, false),
#mqueue{name = Name,
max_len = MaxLen,
store_qos0 = StoreQos0,
high_watermark = HighWM,
low_watermark = LowWM}.
name(#mqueue{name = Name}) ->
Name.
len(#mqueue{len = Len}) ->
Len.
is_empty(#mqueue{len = 0}) -> true;
is_empty(_Q) -> false.
%%------------------------------------------------------------------------------
%% @doc
%% Queue one message.
%%
%% @end
%%------------------------------------------------------------------------------
-spec in(mqtt_message(), mqueue()) -> mqueue().
in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
MQ;
%% queue is full, drop the oldest
in(Msg, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen, queue = Q}) when Len =:= MaxLen ->
Q2 = case queue:out(Q) of
{{value, OldMsg}, Q1} ->
%%TODO: publish the dropped message to $SYS?
lager:error("Queue(~s) drop message: ~p", [Name, OldMsg]),
Q1;
{empty, Q1} -> %% maybe max_len is 1
Q1
end,
MQ#mqueue{queue = queue:in(Msg, Q2)};
in(Msg, MQ = #mqueue{len = Len, queue = Q}) ->
maybe_set_alarm(MQ#mqueue{len = Len+1, queue = queue:in(Msg, Q)}).
out(MQ = #mqueue{len = 0, queue = _Q}) ->
{empty, MQ};
out(MQ = #mqueue{len = Len, queue = Q}) ->
{Result, Q1} = queue:out(Q),
{Result, maybe_clear_alarm(MQ#mqueue{len = Len - 1, queue = Q1})}.
peek(#mqueue{queue = Q}) ->
queue:peek(Q).
to_list(#mqueue{queue = Q}) ->
queue:to_list(Q).
maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_watermark = HighWM, alert = false})
when Len >= HighWM ->
AlarmDescr = io_lib:format("len ~p > high_watermark ~p", [Len, HighWM]),
emqttd_alarm:set_alarm({{queue_high_watermark, Name}, AlarmDescr}),
MQ#mqueue{alert = true};
maybe_set_alarm(MQ) ->
MQ.
maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_watermark = LowWM, alert = true})
when Len =< LowWM ->
emqttd_alarm:clear_alarm({queue_high_watermark, Name}), MQ#mqueue{alert = false};
maybe_clear_alarm(MQ) ->
MQ.

View File

@ -28,7 +28,7 @@
-author("Feng Lee <feng@emqtt.io>").
-export([merge/2]).
-export([merge/2, g/2, g/3]).
%%------------------------------------------------------------------------------
%% @doc Merge Options
@ -50,3 +50,13 @@ merge(Defaults, Options) ->
end
end, Defaults, Options).
%%------------------------------------------------------------------------------
%% @doc Get option
%% @end
%%------------------------------------------------------------------------------
g(Key, Options) ->
proplists:get_value(Key, Options).
g(Key, Options, Default) ->
proplists:get_value(Key, Options, Default).

View File

@ -1,98 +0,0 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd simple queue.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
%% TODO: this module should be rewrited...
-module(emqttd_queue).
-author("Feng Lee <feng@emqtt.io>").
-include_lib("emqtt/include/emqtt.hrl").
-export([new/1, new/2, in/3, all/1, clear/1]).
-define(DEFAULT_MAX_LEN, 1000).
-record(mqtt_queue_wrapper, {queue = queue:new(),
max_len = ?DEFAULT_MAX_LEN,
store_qos0 = false}).
-type mqtt_queue() :: #mqtt_queue_wrapper{}.
%%------------------------------------------------------------------------------
%% @doc
%% New Queue.
%%
%% @end
%%------------------------------------------------------------------------------
-spec new(non_neg_integer()) -> mqtt_queue().
new(MaxLen) -> #mqtt_queue_wrapper{max_len = MaxLen}.
new(MaxLen, StoreQos0) -> #mqtt_queue_wrapper{max_len = MaxLen, store_qos0 = StoreQos0}.
%%------------------------------------------------------------------------------
%% @doc
%% Queue one message.
%%
%% @end
%%------------------------------------------------------------------------------
-spec in(binary(), mqtt_message(), mqtt_queue()) -> mqtt_queue().
in(ClientId, Message = #mqtt_message{qos = Qos},
Wrapper = #mqtt_queue_wrapper{queue = Queue, max_len = MaxLen}) ->
case queue:len(Queue) < MaxLen of
true ->
Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue)};
false -> % full
if
Qos =:= ?QOS_0 ->
lager:error("Queue ~s drop qos0 message: ~p", [ClientId, Message]),
Wrapper;
true ->
{{value, Msg}, Queue1} = queue:drop(Queue),
lager:error("Queue ~s drop message: ~p", [ClientId, Msg]),
Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue1)}
end
end.
%%------------------------------------------------------------------------------
%% @doc
%% Get all messages in queue.
%%
%% @end
%%------------------------------------------------------------------------------
-spec all(mqtt_queue()) -> list().
all(#mqtt_queue_wrapper { queue = Queue }) -> queue:to_list(Queue).
%%------------------------------------------------------------------------------
%% @doc
%% Clear queue.
%%
%% @end
%%------------------------------------------------------------------------------
-spec clear(mqtt_queue()) -> mqtt_queue().
clear(Queue) -> Queue#mqtt_queue_wrapper{queue = queue:new()}.

View File

@ -210,17 +210,31 @@ puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
%%------------------------------------------------------------------------------
-spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}.
subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) ->
Resubs = [Topic || {Name, _Qos} = Topic <- Topics, maps:is_key(Name, SubMap)],
case Resubs of
[] -> ok;
_ -> lager:warning("~s resubscribe ~p", [ClientId, Resubs])
end,
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics),
%% subscribe first and don't care if the subscriptions have been existed
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
lager:info([{client, ClientId}], "Client ~s subscribe ~p. Granted QoS: ~p",
[ClientId, Topics, GrantedQos]),
%%TODO: should be gen_event and notification...
[emqttd_msg_store:redeliver(Name, self()) || {Name, _} <- Topics],
[ClientId, Topics, GrantedQos]),
%% <MQTT V3.1.1>: 3.8.4
%% Where the Topic Filter is not identical to any existing Subscriptions filter,
%% a new Subscription is created and all matching retained messages are sent.
lists:foreach(fun({Name, _Qos}) ->
case maps:is_key(Name, SubMap) of
true ->
lager:warning("~s resubscribe ~p", [ClientId, Name]);
false ->
%%TODO: this is not right, rewrite later...
emqttd_msg_store:redeliver(Name, self())
end
end, Topics),
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) ->
maps:put(Name, Qos, Acc)
end, SubMap, Topics),
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
subscribe(SessPid, Topics) when is_pid(SessPid) ->

Binary file not shown.

View File

@ -97,14 +97,14 @@
{unack_retry_after, 4}
]},
{queue, [
%% Max messages queued when client is disconnected, or inflight messsages is overload
%% Max messages queued when client is disconnected, or inflight messsage window is overload
{max_queued_messages, 200},
%% High watermark of queued messsages
{high_queue_watermark, 0.6},
{high_queue_watermark, 0.8},
%% Low watermark of queued messsages
{low_queue_watermark, 0.2},
%% Queue Qos0 offline messages?
{queue_qos0_messages, false}
{queue_qos0_messages, true}
]}
]},
%% Broker Options