From 4c906b19aebd2288ca5c22194d7560c5b61fa69c Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 11 Jun 2015 12:07:44 +0800 Subject: [PATCH] new session --- apps/emqtt/src/emqtt.app.src | 2 +- apps/emqttd/src/emqttd.app.src | 2 +- apps/emqttd/src/emqttd_mqueue.erl | 15 +++++ apps/emqttd/src/emqttd_pubsub.erl | 19 +++++- apps/emqttd/src/emqttd_session.erl | 103 ++++++++++++++++++++++------- apps/zenmq/README | 14 ++++ apps/zenmq/src/zenmq.app.src | 12 ++++ apps/zenmq/src/zenmq.erl | 2 + apps/zenmq/src/zenmq_app.erl | 16 +++++ apps/zenmq/src/zenmq_sup.erl | 27 ++++++++ rel/files/emqttd.config | 4 +- rel/reltool.config | 2 +- 12 files changed, 189 insertions(+), 29 deletions(-) create mode 100644 apps/emqttd/src/emqttd_mqueue.erl create mode 100644 apps/zenmq/README create mode 100644 apps/zenmq/src/zenmq.app.src create mode 100644 apps/zenmq/src/zenmq.erl create mode 100644 apps/zenmq/src/zenmq_app.erl create mode 100644 apps/zenmq/src/zenmq_sup.erl diff --git a/apps/emqtt/src/emqtt.app.src b/apps/emqtt/src/emqtt.app.src index c444276fa..c84d855f6 100644 --- a/apps/emqtt/src/emqtt.app.src +++ b/apps/emqtt/src/emqtt.app.src @@ -1,7 +1,7 @@ {application, emqtt, [ {description, "Erlang MQTT Common Library"}, - {vsn, "0.8.3"}, + {vsn, git}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqttd/src/emqttd.app.src b/apps/emqttd/src/emqttd.app.src index 5c3347f27..024fcdac9 100644 --- a/apps/emqttd/src/emqttd.app.src +++ b/apps/emqttd/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.8.3"}, + {vsn, git}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/apps/emqttd/src/emqttd_mqueue.erl b/apps/emqttd/src/emqttd_mqueue.erl new file mode 100644 index 000000000..58077355f --- /dev/null +++ b/apps/emqttd/src/emqttd_mqueue.erl @@ -0,0 +1,15 @@ + +-module(emqttd_mqueue). + +-export([init/1, in/1]). + +-record(queue_state, { + max_queued_messages = 1000 +}). + +init(Opts) -> + {ok, #queue_state{}}. + +in(Msg, Q = #queue_state{}) -> + Q. + diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index b52037d29..b89355c76 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_pubsub). -author("Feng Lee "). @@ -217,6 +218,7 @@ match(Topic) when is_binary(Topic) -> init([Id, _Opts]) -> process_flag(min_heap_size, 1024*1024), gproc_pool:connect_worker(pubsub, {?MODULE, Id}), + %%TODO: gb_trees to replace maps? {ok, #state{id = Id, submap = maps:new()}}. handle_call({subscribe, SubPid, Topics}, _From, State) -> @@ -384,9 +386,24 @@ add_topic(TopicR = #mqtt_topic{topic = Topic}) -> end end. -add_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) -> +%% Fix issue #53 - Remove Overlapping Subscriptions +add_subscriber({TopicR, Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}}) + when is_record(TopicR, mqtt_topic) -> case add_topic(TopicR) of ok -> + OverlapSubs = [Sub || Sub = #mqtt_subscriber{topic = SubTopic, qos = SubQos} + <- mnesia:index_read(subscriber, SubPid, #mqtt_subscriber.pid), + SubTopic =:= Topic, SubQos =/= Qos], + + %% remove overlapping subscribers + if + length(OverlapSubs) =:= 0 -> ok; + true -> + lager:warning("Remove overlapping subscribers: ~p", [OverlapSubs]), + [mnesia:delete_object(subscriber, OverlapSub, write) || OverlapSub <- OverlapSubs] + end, + + %% insert subscriber mnesia:write(subscriber, Subscriber, write); Error -> Error diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 3052f059a..ed10c37a7 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -43,6 +43,7 @@ unsubscribe/2, destroy/2]). +%% This api looks strange... :( -export([store/2]). %% Start gen_server @@ -52,19 +53,53 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(session_state, { - clientid :: binary(), +-record(session_state, { + %% ClientId: Identifier of Session + clientid :: binary(), + + %% Client Pid linked with session client_pid :: pid(), - message_id = 1, - submap :: map(), - inflight_messages, - queued_messages, - msg_queue, %% do not receive rel - awaiting_ack :: map(), - awaiting_rel :: map(), + + %% Last message id of the session + message_id = 1, + + %% Client’s subscriptions. + subscriptions :: list(), + + %% Inflight window size + inflight_window = 40, + + %% Inflight qos1, qos2 messages sent to the client but unacked, QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely acknowledged. + %% Client <- Broker + inflight_queue :: list(), + + %% Inflight qos2 messages received from client and waiting for pubrel. QoS 2 messages which have been received from the Client, but have not been completely acknowledged. + %% Client -> Broker + awaiting_queue :: list(), + + %% All qos1, qos2 messages published to when client is disconnected. QoS 1 and QoS 2 messages pending transmission to the Client. + %% Optionally, QoS 0 messages pending transmission to the Client. + pending_queue :: emqttd_mqueue:mqueue(), + + %% Awaiting timers for ack, rel and comp. + awaiting_ack :: map(), + + awaiting_rel :: map(), + awaiting_comp :: map(), - expires, - expire_timer}). + + %% Retries to resend the unacked messages + max_unack_retries = 3, + + %% 4, 8, 16 seconds if 3 retries:) + unack_retry_after = 4, + + %% session expired + sess_expired_after = 48, + + sess_expired_timer, + + timestamp}). -type session() :: #session_state{} | pid(). @@ -235,11 +270,13 @@ store(SessState = #session_state{message_id = MsgId, awaiting_ack = Awaiting}, {Message1, next_msg_id(SessState#session_state{awaiting_ack = Awaiting1})}. initial_state(ClientId) -> - #session_state{clientid = ClientId, - submap = #{}, - awaiting_ack = #{}, - awaiting_rel = #{}, - awaiting_comp = #{}}. + #session_state{clientid = ClientId, + subscriptions = [], + inflight_queue = [], + awaiting_queue = [], + awaiting_ack = #{}, + awaiting_rel = #{}, + awaiting_comp = #{}}. initial_state(ClientId, ClientPid) -> State = initial_state(ClientId), @@ -258,16 +295,36 @@ start_link(ClientId, ClientPid) -> init([ClientId, ClientPid]) -> process_flag(trap_exit, true), - %%TODO: Is this OK? or should monitor... true = link(ClientPid), - SessOpts = emqttd:env(mqtt, session), State = initial_state(ClientId, ClientPid), - Expires = proplists:get_value(expires, SessOpts, 1) * 3600, - MsgQueue = emqttd_queue:new(proplists:get_value(max_queue, SessOpts, 1000), - proplists:get_value(store_qos0, SessOpts, false)), - {ok, State#session_state{expires = Expires, - msg_queue = MsgQueue}, hibernate}. + MQueue = emqttd_mqueue:new(ClientId, emqttd:env(mqtt, queue)), + State1 = State#session_state{pending_queue = MQueue, + timestamp = os:timestamp()}, + {ok, init(emqttd:env(mqtt, session), State1), hibernate}. +init([], State) -> + State; + +%% Session expired after hours +init([{expired_after, Hours} | Opts], State) -> + init(Opts, State#session_state{sess_expired_after = Hours * 3600 * 1000}); + +%% Max number of QoS 1 and 2 messages that can be “inflight” at one time. +init([{max_inflight_messages, MaxInflight} | Opts], State) -> + init(Opts, State#session_state{inflight_window = MaxInflight}); + +%% Max retries for unacknolege Qos1/2 messages +init([{max_unack_retries, Retries} | Opts], State) -> + init(Opts, State#session_state{max_unack_retries = Retries}); + +%% Retry after 4, 8, 16 seconds +init([{unack_retry_after, Secs} | Opts], State) -> + init(Opts, State#session_state{unack_retry_after = Secs * 1000}); + +init([Opt | Opts], State) -> + lager:error("Bad Session Option: ~p", [Opt]), + init(Opts, State). + handle_call({subscribe, Topics}, _From, State) -> {ok, NewState, GrantedQos} = subscribe(State, Topics), {reply, {ok, GrantedQos}, NewState}; diff --git a/apps/zenmq/README b/apps/zenmq/README new file mode 100644 index 000000000..5b60d8891 --- /dev/null +++ b/apps/zenmq/README @@ -0,0 +1,14 @@ +## Overview + +ZenMQ is a general architecture of a distributed messaging queue written in Erlang. + +## Responsibilties + +* Topic Trie Tree +* Message Route +* Queue Management +* Broker Cluster +* Distributed Broker + +**Notice that this is an experimental design** + diff --git a/apps/zenmq/src/zenmq.app.src b/apps/zenmq/src/zenmq.app.src new file mode 100644 index 000000000..2d191d048 --- /dev/null +++ b/apps/zenmq/src/zenmq.app.src @@ -0,0 +1,12 @@ +{application, zenmq, + [ + {description, ""}, + {vsn, "1"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {mod, { zenmq_app, []}}, + {env, []} + ]}. diff --git a/apps/zenmq/src/zenmq.erl b/apps/zenmq/src/zenmq.erl new file mode 100644 index 000000000..1e96b16f5 --- /dev/null +++ b/apps/zenmq/src/zenmq.erl @@ -0,0 +1,2 @@ +-module(zenmq). + diff --git a/apps/zenmq/src/zenmq_app.erl b/apps/zenmq/src/zenmq_app.erl new file mode 100644 index 000000000..15200771a --- /dev/null +++ b/apps/zenmq/src/zenmq_app.erl @@ -0,0 +1,16 @@ +-module(zenmq_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + zenmq_sup:start_link(). + +stop(_State) -> + ok. diff --git a/apps/zenmq/src/zenmq_sup.erl b/apps/zenmq/src/zenmq_sup.erl new file mode 100644 index 000000000..f626a12b1 --- /dev/null +++ b/apps/zenmq/src/zenmq_sup.erl @@ -0,0 +1,27 @@ +-module(zenmq_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + {ok, { {one_for_one, 5, 10}, []} }. + diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 442c801d7..fc15fb0a2 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -93,8 +93,8 @@ {max_inflight_messages, 20}, %% Max retries for unacknolege Qos1/2 messages {max_unack_retries, 3}, - %% Retry after 10 seconds - {unack_retry_after, 10} + %% Retry after 2, 4,8 seconds + {unack_retry_after, 2} ]}, {queue, [ %% Max messages queued when client is disconnected, or inflight messsages is overload diff --git a/rel/reltool.config b/rel/reltool.config index f26654f74..2dbb76ea5 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -4,7 +4,7 @@ {lib_dirs, ["../apps", "../deps", "../plugins"]}, {erts, [{mod_cond, derived}, {app_file, strip}]}, {app_file, strip}, - {rel, "emqttd", "0.8.3", + {rel, "emqttd", git, [ kernel, stdlib,