new session
This commit is contained in:
parent
789e482a30
commit
4c906b19ae
|
@ -1,7 +1,7 @@
|
|||
{application, emqtt,
|
||||
[
|
||||
{description, "Erlang MQTT Common Library"},
|
||||
{vsn, "0.8.3"},
|
||||
{vsn, git},
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
{application, emqttd,
|
||||
[
|
||||
{description, "Erlang MQTT Broker"},
|
||||
{vsn, "0.8.3"},
|
||||
{vsn, git},
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [kernel,
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
|
||||
-module(emqttd_mqueue).
|
||||
|
||||
-export([init/1, in/1]).
|
||||
|
||||
-record(queue_state, {
|
||||
max_queued_messages = 1000
|
||||
}).
|
||||
|
||||
init(Opts) ->
|
||||
{ok, #queue_state{}}.
|
||||
|
||||
in(Msg, Q = #queue_state{}) ->
|
||||
Q.
|
||||
|
|
@ -24,6 +24,7 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_pubsub).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
@ -217,6 +218,7 @@ match(Topic) when is_binary(Topic) ->
|
|||
init([Id, _Opts]) ->
|
||||
process_flag(min_heap_size, 1024*1024),
|
||||
gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
|
||||
%%TODO: gb_trees to replace maps?
|
||||
{ok, #state{id = Id, submap = maps:new()}}.
|
||||
|
||||
handle_call({subscribe, SubPid, Topics}, _From, State) ->
|
||||
|
@ -384,9 +386,24 @@ add_topic(TopicR = #mqtt_topic{topic = Topic}) ->
|
|||
end
|
||||
end.
|
||||
|
||||
add_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) ->
|
||||
%% Fix issue #53 - Remove Overlapping Subscriptions
|
||||
add_subscriber({TopicR, Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}})
|
||||
when is_record(TopicR, mqtt_topic) ->
|
||||
case add_topic(TopicR) of
|
||||
ok ->
|
||||
OverlapSubs = [Sub || Sub = #mqtt_subscriber{topic = SubTopic, qos = SubQos}
|
||||
<- mnesia:index_read(subscriber, SubPid, #mqtt_subscriber.pid),
|
||||
SubTopic =:= Topic, SubQos =/= Qos],
|
||||
|
||||
%% remove overlapping subscribers
|
||||
if
|
||||
length(OverlapSubs) =:= 0 -> ok;
|
||||
true ->
|
||||
lager:warning("Remove overlapping subscribers: ~p", [OverlapSubs]),
|
||||
[mnesia:delete_object(subscriber, OverlapSub, write) || OverlapSub <- OverlapSubs]
|
||||
end,
|
||||
|
||||
%% insert subscriber
|
||||
mnesia:write(subscriber, Subscriber, write);
|
||||
Error ->
|
||||
Error
|
||||
|
|
|
@ -43,6 +43,7 @@
|
|||
unsubscribe/2,
|
||||
destroy/2]).
|
||||
|
||||
%% This api looks strange... :(
|
||||
-export([store/2]).
|
||||
|
||||
%% Start gen_server
|
||||
|
@ -52,19 +53,53 @@
|
|||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
-record(session_state, {
|
||||
clientid :: binary(),
|
||||
-record(session_state, {
|
||||
%% ClientId: Identifier of Session
|
||||
clientid :: binary(),
|
||||
|
||||
%% Client Pid linked with session
|
||||
client_pid :: pid(),
|
||||
message_id = 1,
|
||||
submap :: map(),
|
||||
inflight_messages,
|
||||
queued_messages,
|
||||
msg_queue, %% do not receive rel
|
||||
awaiting_ack :: map(),
|
||||
awaiting_rel :: map(),
|
||||
|
||||
%% Last message id of the session
|
||||
message_id = 1,
|
||||
|
||||
%% Client’s subscriptions.
|
||||
subscriptions :: list(),
|
||||
|
||||
%% Inflight window size
|
||||
inflight_window = 40,
|
||||
|
||||
%% Inflight qos1, qos2 messages sent to the client but unacked, QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely acknowledged.
|
||||
%% Client <- Broker
|
||||
inflight_queue :: list(),
|
||||
|
||||
%% Inflight qos2 messages received from client and waiting for pubrel. QoS 2 messages which have been received from the Client, but have not been completely acknowledged.
|
||||
%% Client -> Broker
|
||||
awaiting_queue :: list(),
|
||||
|
||||
%% All qos1, qos2 messages published to when client is disconnected. QoS 1 and QoS 2 messages pending transmission to the Client.
|
||||
%% Optionally, QoS 0 messages pending transmission to the Client.
|
||||
pending_queue :: emqttd_mqueue:mqueue(),
|
||||
|
||||
%% Awaiting timers for ack, rel and comp.
|
||||
awaiting_ack :: map(),
|
||||
|
||||
awaiting_rel :: map(),
|
||||
|
||||
awaiting_comp :: map(),
|
||||
expires,
|
||||
expire_timer}).
|
||||
|
||||
%% Retries to resend the unacked messages
|
||||
max_unack_retries = 3,
|
||||
|
||||
%% 4, 8, 16 seconds if 3 retries:)
|
||||
unack_retry_after = 4,
|
||||
|
||||
%% session expired
|
||||
sess_expired_after = 48,
|
||||
|
||||
sess_expired_timer,
|
||||
|
||||
timestamp}).
|
||||
|
||||
-type session() :: #session_state{} | pid().
|
||||
|
||||
|
@ -235,11 +270,13 @@ store(SessState = #session_state{message_id = MsgId, awaiting_ack = Awaiting},
|
|||
{Message1, next_msg_id(SessState#session_state{awaiting_ack = Awaiting1})}.
|
||||
|
||||
initial_state(ClientId) ->
|
||||
#session_state{clientid = ClientId,
|
||||
submap = #{},
|
||||
awaiting_ack = #{},
|
||||
awaiting_rel = #{},
|
||||
awaiting_comp = #{}}.
|
||||
#session_state{clientid = ClientId,
|
||||
subscriptions = [],
|
||||
inflight_queue = [],
|
||||
awaiting_queue = [],
|
||||
awaiting_ack = #{},
|
||||
awaiting_rel = #{},
|
||||
awaiting_comp = #{}}.
|
||||
|
||||
initial_state(ClientId, ClientPid) ->
|
||||
State = initial_state(ClientId),
|
||||
|
@ -258,16 +295,36 @@ start_link(ClientId, ClientPid) ->
|
|||
|
||||
init([ClientId, ClientPid]) ->
|
||||
process_flag(trap_exit, true),
|
||||
%%TODO: Is this OK? or should monitor...
|
||||
true = link(ClientPid),
|
||||
SessOpts = emqttd:env(mqtt, session),
|
||||
State = initial_state(ClientId, ClientPid),
|
||||
Expires = proplists:get_value(expires, SessOpts, 1) * 3600,
|
||||
MsgQueue = emqttd_queue:new(proplists:get_value(max_queue, SessOpts, 1000),
|
||||
proplists:get_value(store_qos0, SessOpts, false)),
|
||||
{ok, State#session_state{expires = Expires,
|
||||
msg_queue = MsgQueue}, hibernate}.
|
||||
MQueue = emqttd_mqueue:new(ClientId, emqttd:env(mqtt, queue)),
|
||||
State1 = State#session_state{pending_queue = MQueue,
|
||||
timestamp = os:timestamp()},
|
||||
{ok, init(emqttd:env(mqtt, session), State1), hibernate}.
|
||||
|
||||
init([], State) ->
|
||||
State;
|
||||
|
||||
%% Session expired after hours
|
||||
init([{expired_after, Hours} | Opts], State) ->
|
||||
init(Opts, State#session_state{sess_expired_after = Hours * 3600 * 1000});
|
||||
|
||||
%% Max number of QoS 1 and 2 messages that can be “inflight” at one time.
|
||||
init([{max_inflight_messages, MaxInflight} | Opts], State) ->
|
||||
init(Opts, State#session_state{inflight_window = MaxInflight});
|
||||
|
||||
%% Max retries for unacknolege Qos1/2 messages
|
||||
init([{max_unack_retries, Retries} | Opts], State) ->
|
||||
init(Opts, State#session_state{max_unack_retries = Retries});
|
||||
|
||||
%% Retry after 4, 8, 16 seconds
|
||||
init([{unack_retry_after, Secs} | Opts], State) ->
|
||||
init(Opts, State#session_state{unack_retry_after = Secs * 1000});
|
||||
|
||||
init([Opt | Opts], State) ->
|
||||
lager:error("Bad Session Option: ~p", [Opt]),
|
||||
init(Opts, State).
|
||||
|
||||
handle_call({subscribe, Topics}, _From, State) ->
|
||||
{ok, NewState, GrantedQos} = subscribe(State, Topics),
|
||||
{reply, {ok, GrantedQos}, NewState};
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
## Overview
|
||||
|
||||
ZenMQ is a general architecture of a distributed messaging queue written in Erlang.
|
||||
|
||||
## Responsibilties
|
||||
|
||||
* Topic Trie Tree
|
||||
* Message Route
|
||||
* Queue Management
|
||||
* Broker Cluster
|
||||
* Distributed Broker
|
||||
|
||||
**Notice that this is an experimental design**
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
{application, zenmq,
|
||||
[
|
||||
{description, ""},
|
||||
{vsn, "1"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib
|
||||
]},
|
||||
{mod, { zenmq_app, []}},
|
||||
{env, []}
|
||||
]}.
|
|
@ -0,0 +1,2 @@
|
|||
-module(zenmq).
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
-module(zenmq_app).
|
||||
|
||||
-behaviour(application).
|
||||
|
||||
%% Application callbacks
|
||||
-export([start/2, stop/1]).
|
||||
|
||||
%% ===================================================================
|
||||
%% Application callbacks
|
||||
%% ===================================================================
|
||||
|
||||
start(_StartType, _StartArgs) ->
|
||||
zenmq_sup:start_link().
|
||||
|
||||
stop(_State) ->
|
||||
ok.
|
|
@ -0,0 +1,27 @@
|
|||
-module(zenmq_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
|
||||
%% Supervisor callbacks
|
||||
-export([init/1]).
|
||||
|
||||
%% Helper macro for declaring children of supervisor
|
||||
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
|
||||
|
||||
%% ===================================================================
|
||||
%% API functions
|
||||
%% ===================================================================
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
%% ===================================================================
|
||||
%% Supervisor callbacks
|
||||
%% ===================================================================
|
||||
|
||||
init([]) ->
|
||||
{ok, { {one_for_one, 5, 10}, []} }.
|
||||
|
|
@ -93,8 +93,8 @@
|
|||
{max_inflight_messages, 20},
|
||||
%% Max retries for unacknolege Qos1/2 messages
|
||||
{max_unack_retries, 3},
|
||||
%% Retry after 10 seconds
|
||||
{unack_retry_after, 10}
|
||||
%% Retry after 2, 4,8 seconds
|
||||
{unack_retry_after, 2}
|
||||
]},
|
||||
{queue, [
|
||||
%% Max messages queued when client is disconnected, or inflight messsages is overload
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
{lib_dirs, ["../apps", "../deps", "../plugins"]},
|
||||
{erts, [{mod_cond, derived}, {app_file, strip}]},
|
||||
{app_file, strip},
|
||||
{rel, "emqttd", "0.8.3",
|
||||
{rel, "emqttd", git,
|
||||
[
|
||||
kernel,
|
||||
stdlib,
|
||||
|
|
Loading…
Reference in New Issue