From 53883ae188442245dd3a834014df6b915739e512 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Fri, 20 Mar 2015 01:34:52 +0800 Subject: [PATCH 01/10] performance issue --- TODO | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/TODO b/TODO index c456a1161..3b2531023 100644 --- a/TODO +++ b/TODO @@ -1,4 +1,9 @@ +v0.9.0-alpha (2015-03-20) +------------------------- + +emqtt_sm, emqtt_cm, emqtt_pubsub performance issue... + v0.8.0-alpha (2015-03-20) ------------------------- From 7a9c30c2d101e57a29ee7c38f7bedd9cdc503ef1 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Fri, 20 Mar 2015 14:44:07 +0800 Subject: [PATCH 02/10] infinity --- apps/emqttd/src/emqttd_cm.erl | 3 ++- apps/emqttd/src/emqttd_pubsub.erl | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 90091ee14..b0ec49ed5 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -86,7 +86,8 @@ lookup(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec register(ClientId :: binary(), Pid :: pid()) -> ok. register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) -> - gen_server:call(?SERVER, {register, ClientId, Pid}). + %%TODO: infinify to block requests when too many clients, this will be redesinged in 0.9.x... + gen_server:call(?SERVER, {register, ClientId, Pid}, infinity). %%------------------------------------------------------------------------------ %% @doc diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 67b841dd8..db990d871 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -119,8 +119,12 @@ create(Topic) -> subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) -> subscribe([{Topic, Qos}], SubPid); +%% TODO: +%% call will not work when there are 2000K+ clients, 100+ sub requests/sec... +%% will optimize in 0.9.x... +%% subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> - gen_server:call(?SERVER, {subscribe, Topics, SubPid}). + gen_server:call(?SERVER, {subscribe, Topics, SubPid}, infinity). %%------------------------------------------------------------------------------ %% @doc From 3b84e5c982b7d1ed655eb075eed62e7c51e7a170 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Sun, 22 Mar 2015 18:06:40 +0800 Subject: [PATCH 03/10] todo --- apps/emqttd/src/emqttd_pubsub.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index db990d871..ebdd8fe4c 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -22,6 +22,8 @@ %%% @doc %%% emqttd core pubsub. %%% +%%% TODO: should not use gen_server:call to create, subscribe topics... +%%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_pubsub). From a72fccf28de938527a2affef6bf2adb63c26d462 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 22 Mar 2015 21:13:40 +0800 Subject: [PATCH 04/10] rewrite --- apps/emqttd/src/emqttd_pubsub.erl | 128 +++++++++++++----------------- 1 file changed, 55 insertions(+), 73 deletions(-) diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index ebdd8fe4c..6521b85eb 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -48,8 +48,8 @@ -export([topics/0, create/1, - subscribe/2, - unsubscribe/2, + subscribe/1, + unsubscribe/1, publish/1, publish/2, %local node @@ -108,8 +108,8 @@ topics() -> %% @end %%------------------------------------------------------------------------------ -spec create(binary()) -> {atomic, Reason :: any()} | {aborted, Reason :: any()}. -create(Topic) -> - gen_server:call(?SERVER, {create, Topic}). +create(Topic) when is_binary(Topic) -> + mnesia:transaction(fun trie_add/1, [Topic]). %%------------------------------------------------------------------------------ %% @doc @@ -117,16 +117,33 @@ create(Topic) -> %% %% @end %%------------------------------------------------------------------------------ --spec subscribe({binary(), mqtt_qos()} | list(), pid()) -> {ok, list(mqtt_qos())}. -subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) -> - subscribe([{Topic, Qos}], SubPid); +-spec subscribe({binary(), mqtt_qos()} | list()) -> {ok, list(mqtt_qos())}. +subscribe({Topic, Qos}) when is_binary(Topic) -> + case subscribe([{Topic, Qos}]) of + {ok, [GrantedQos]} -> {ok, GrantedQos}; + {error, Error} -> {error, Error} + end; +subscribe(Topics = [{_Topic, _Qos}|_]) -> + subscribe(Topics, self(), []). -%% TODO: -%% call will not work when there are 2000K+ clients, 100+ sub requests/sec... -%% will optimize in 0.9.x... -%% -subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> - gen_server:call(?SERVER, {subscribe, Topics, SubPid}, infinity). +subscribe([], _SubPid, Acc) -> + {ok, lists:reverse(Acc)}; +subscribe([{Topic, Qos}|Topics], SubPid, Acc) -> + Subscriber = #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}, + F = fun() -> + case trie_add(Topic) of + ok -> + mnesia:write(Subscriber); + {atomic, already_exist} -> + mnesia:write(Subscriber); + {aborted, Reason} -> + {aborted, Reason} + end + end, + case mnesia:transaction(F) of + ok -> subscribe(Topics, SubPid, [Qos|Acc]); + {aborted, Reason} -> {error, {aborted, Reason}} + end. %%------------------------------------------------------------------------------ %% @doc @@ -134,12 +151,16 @@ subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> %% %% @end %%------------------------------------------------------------------------------ --spec unsubscribe(binary() | list(binary()), pid()) -> ok. -unsubscribe(Topic, SubPid) when is_binary(Topic) and is_pid(SubPid) -> - unsubscribe([Topic], SubPid); +-spec unsubscribe(binary() | list(binary())) -> ok. +unsubscribe(Topic) when is_binary(Topic) -> + %% call mnesia directly + unsubscribe([Topic]); + +unsubscribe(Topics = [Topics|_]) when is_list(Topics) and is_binary(Topic) -> + unsubscribe(Topics, self()). + +unsubscribe(Topics, SubPid) -> -unsubscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> - gen_server:cast(?SERVER, {unsubscribe, Topics, SubPid}). %%------------------------------------------------------------------------------ %% @doc @@ -199,42 +220,38 @@ match(Topic) when is_binary(Topic) -> %% ------------------------------------------------------------------ init([]) -> - mnesia:create_table(topic_trie, [ - {ram_copies, [node()]}, - {attributes, record_info(fields, topic_trie)}]), + %% trie and topic tables, will be copied by all nodes. mnesia:create_table(topic_trie_node, [ {ram_copies, [node()]}, {attributes, record_info(fields, topic_trie_node)}]), + mnesia:add_table_copy(topic_trie_node, node(), ram_copies), + mnesia:create_table(topic_trie, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, topic_trie)}]), + mnesia:add_table_copy(topic_trie, node(), ram_copies), mnesia:create_table(topic, [ {type, bag}, {record_name, topic}, {ram_copies, [node()]}, {attributes, record_info(fields, topic)}]), - mnesia:add_table_copy(topic_trie, node(), ram_copies), - mnesia:add_table_copy(topic_trie_node, node(), ram_copies), mnesia:add_table_copy(topic, node(), ram_copies), - ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]), + + %% local table, not shared with other table + mnesia:create_table(topic_subscriber, [ + {type, bag}, + {record_name, topic_subscriber}, + {ram_copies, [node()]}, + {attributes, record_info(fields, topic_subscriber)}, + {index, [subpid]}, + {local_content, true}]), {ok, #state{}}. handle_call(getstats, _From, State = #state{max_subs = Max}) -> Stats = [{'topics/count', mnesia:table_info(topic, size)}, - {'subscribers/count', ets:info(topic_subscriber, size)}, + {'subscribers/count', mnesia:info(topic_subscriber, size)}, {'subscribers/max', Max}], {reply, Stats, State}; -handle_call({create, Topic}, _From, State) -> - Result = mnesia:transaction(fun trie_add/1, [Topic]), - {reply, Result, setstats(State)}; - -handle_call({subscribe, Topics, SubPid}, _From, State) -> - Result = [subscribe_topic({Topic, Qos}, SubPid) || {Topic, Qos} <- Topics], - Reply = - case [Err || Err = {error, _} <- Result] of - [] -> {ok, [Qos || {ok, Qos} <- Result]}; - Errors -> hd(Errors) - end, - {reply, Reply, setstats(State)}; - handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. @@ -273,41 +290,6 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= %%% Internal functions %%%============================================================================= -subscribe_topic({Topic, Qos}, SubPid) -> - case mnesia:transaction(fun trie_add/1, [Topic]) of - {atomic, _} -> - case get({subscriber, SubPid}) of - undefined -> - %%TODO: refactor later... - MonRef = erlang:monitor(process, SubPid), - put({subcriber, SubPid}, MonRef), - put({submon, MonRef}, SubPid); - _ -> - already_monitored - end, - %% remove duplicated subscribers - try_remove_subscriber({Topic, Qos}, SubPid), - ets:insert(topic_subscriber, #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}), - %TODO: GrantedQos?? - {ok, Qos}; - {aborted, Reason} -> - {error, Reason} - end. - -try_remove_subscriber({Topic, Qos}, SubPid) -> - case ets:lookup(topic_subscriber, Topic) of - [] -> - not_found; - Subs -> - DupSubs = [Sub || Sub = #topic_subscriber{qos = OldQos, subpid = OldPid} - <- Subs, Qos =/= OldQos, OldPid =:= SubPid], - case DupSubs of - [] -> ok; - [DupSub] -> - lager:warning("PubSub: remove duplicated subscriber ~p", [DupSub]), - ets:delete(topic_subscriber, DupSub) - end - end. try_remove_topic(Name) when is_binary(Name) -> case ets:member(topic_subscriber, Name) of From 21d456fd1fff6cc66170dac1af287d1c106560ac Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 22 Mar 2015 22:35:50 +0800 Subject: [PATCH 05/10] rewrite pubsub --- apps/emqttd/src/emqttd_bridge.erl | 2 +- apps/emqttd/src/emqttd_broker.erl | 4 +- apps/emqttd/src/emqttd_metrics.erl | 2 +- apps/emqttd/src/emqttd_pubsub.erl | 130 ++++++++++++++++------------- apps/emqttd/src/emqttd_session.erl | 4 +- 5 files changed, 78 insertions(+), 64 deletions(-) diff --git a/apps/emqttd/src/emqttd_bridge.erl b/apps/emqttd/src/emqttd_bridge.erl index d80979516..765f988e1 100644 --- a/apps/emqttd/src/emqttd_bridge.erl +++ b/apps/emqttd/src/emqttd_bridge.erl @@ -81,7 +81,7 @@ init([Node, SubTopic, Options]) -> true -> true = erlang:monitor_node(Node, true), State = parse_opts(Options, #state{node = Node, subtopic = SubTopic}), - emqttd_pubsub:subscribe({SubTopic, ?QOS_0}, self()), + emqttd_pubsub:subscribe({SubTopic, ?QOS_0}), {ok, State}; false -> {stop, {cannot_connect, Node}} diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index ea73efa3e..b5f23d156 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -150,8 +150,8 @@ init([Options]) -> Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB, [ets:insert(?BROKER_TAB, {Topic, 0}) || Topic <- Topics], % Create $SYS Topics - [{atomic, _} = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS], - [{atomic, _} = create(systop(Topic)) || Topic <- Topics], + [ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS], + [ok = create(systop(Topic)) || Topic <- Topics], SysInterval = proplists:get_value(sys_interval, Options, 60), State = #state{started_at = os:timestamp(), sys_interval = SysInterval}, Delay = if diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index ad1b4b85d..3858ec441 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -182,7 +182,7 @@ init([Options]) -> % Init metrics [new_metric(Metric) || Metric <- Metrics], % $SYS Topics for metrics - [{atomic, _} = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics], + [ok = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics], PubInterval = proplists:get_value(pub_interval, Options, 60), Delay = if PubInterval == 0 -> 0; diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 6521b85eb..e77c181e0 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -109,7 +109,7 @@ topics() -> %%------------------------------------------------------------------------------ -spec create(binary()) -> {atomic, Reason :: any()} | {aborted, Reason :: any()}. create(Topic) when is_binary(Topic) -> - mnesia:transaction(fun trie_add/1, [Topic]). + {atomic, ok} = mnesia:transaction(fun trie_add/1, [Topic]), ok. %%------------------------------------------------------------------------------ %% @doc @@ -128,21 +128,13 @@ subscribe(Topics = [{_Topic, _Qos}|_]) -> subscribe([], _SubPid, Acc) -> {ok, lists:reverse(Acc)}; +%%TODO: check this function later. subscribe([{Topic, Qos}|Topics], SubPid, Acc) -> Subscriber = #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}, - F = fun() -> - case trie_add(Topic) of - ok -> - mnesia:write(Subscriber); - {atomic, already_exist} -> - mnesia:write(Subscriber); - {aborted, Reason} -> - {aborted, Reason} - end - end, + F = fun() -> trie_add(Topic), mnesia:write(Subscriber) end, case mnesia:transaction(F) of - ok -> subscribe(Topics, SubPid, [Qos|Acc]); - {aborted, Reason} -> {error, {aborted, Reason}} + {atomic, ok} -> subscribe(Topics, SubPid, [Qos|Acc]); + Error -> {error, Error} end. %%------------------------------------------------------------------------------ @@ -153,14 +145,25 @@ subscribe([{Topic, Qos}|Topics], SubPid, Acc) -> %%------------------------------------------------------------------------------ -spec unsubscribe(binary() | list(binary())) -> ok. unsubscribe(Topic) when is_binary(Topic) -> - %% call mnesia directly unsubscribe([Topic]); -unsubscribe(Topics = [Topics|_]) when is_list(Topics) and is_binary(Topic) -> +unsubscribe(Topics = [Topic|_]) when is_list(Topics) and is_binary(Topic) -> unsubscribe(Topics, self()). +%%TODO: check this function later. unsubscribe(Topics, SubPid) -> - + F = fun() -> + Subscribers = mnesia:index_read(topic_subscriber, SubPid, #topic_subscriber.subpid), + lists:foreach(fun(Sub = #topic_subscriber{topic = Topic}) -> + case lists:member(Topic, Topics) of + true -> mneisa:delete_object(Sub); + false -> ok + end + end, Subscribers) + %TODO: try to remove topic??? if topic is dynamic... + %%try_remove_topic(Topic) + end, + {atomic, _} = mneisa:transaction(F), ok. %%------------------------------------------------------------------------------ %% @doc @@ -191,7 +194,7 @@ publish(Topic, Msg) when is_binary(Topic) -> %%------------------------------------------------------------------------------ -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> - Subscribers = ets:lookup(topic_subscriber, Topic), + Subscribers = mnesia:dirty_read(topic_subscriber, Topic), lists:foreach( fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) -> Msg1 = if @@ -235,15 +238,16 @@ init([]) -> {ram_copies, [node()]}, {attributes, record_info(fields, topic)}]), mnesia:add_table_copy(topic, node(), ram_copies), - + mnesia:subscribe({table, topic, simple}), %% local table, not shared with other table mnesia:create_table(topic_subscriber, [ {type, bag}, {record_name, topic_subscriber}, - {ram_copies, [node()]}, + {ram_copies, [node()]}, {attributes, record_info(fields, topic_subscriber)}, - {index, [subpid]}, + {index, [subpid]}, {local_content, true}]), + mnesia:subscribe({table, topic_subscriber, simple}), {ok, #state{}}. handle_call(getstats, _From, State = #state{max_subs = Max}) -> @@ -253,35 +257,45 @@ handle_call(getstats, _From, State = #state{max_subs = Max}) -> {reply, Stats, State}; handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. - -handle_cast({unsubscribe, Topics, SubPid}, State) -> - lists:foreach(fun(Topic) -> - ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}), - try_remove_topic(Topic) - end, Topics), - {noreply, setstats(State)}; + lager:error("Bad Req: ~p", [Req]), + {reply, error, State}. handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. + lager:error("Bad Msg: ~p", [Msg]), + {noreply, State}. -handle_info({'DOWN', Mon, _Type, _Object, _Info}, State) -> - case get({submon, Mon}) of - undefined -> - lager:error("unexpected 'DOWN': ~p", [Mon]); - SubPid -> - erase({submon, Mon}), - erase({subscriber, SubPid}), - Subs = ets:match_object(topic_subscriber, #topic_subscriber{subpid=SubPid, _='_'}), - [ets:delete_object(topic_subscriber, Sub) || Sub <- Subs], - [try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs] - end, +%% a new record has been written. +handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}}, State) -> + erlang:monitor(process, Pid), + {noreply, setstats(State)}; + +%% {write, #topic{}, _ActivityId} +%% {delete_object, _OldRecord, _ActivityId} +%% {delete, {Tab, Key}, ActivityId} +handle_info({mnesia_table_event, _Event}, State) -> + {noreply, setstats(State)}; + +handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> + F = fun() -> + %%TODO: how to read with write lock? + [mnesia:delete_object(Sub) || Sub <- mnesia:index_read(topic_subscriber, DownPid, #topic_subscriber.subpid)] + %%TODO: try to remove dynamic topics without subscribers + %% [try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs] + end, + case catch mnesia:transaction(F) of + {atomic, _} -> ok; + {aborted, Reason} -> lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]) + end, {noreply, setstats(State)}; handle_info(Info, State) -> - {stop, {badinfo, Info}, State}. + lager:error("Bad Info: ~p", [Info]), + {noreply, State}. terminate(_Reason, _State) -> + mnesia:unsubscribe({table, topic, simple}), + mnesia:unsubscribe({table, topic_subscriber, simple}), + %%TODO: clear topics belongs to this node??? ok. code_change(_OldVsn, State, _Extra) -> @@ -291,21 +305,21 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -try_remove_topic(Name) when is_binary(Name) -> - case ets:member(topic_subscriber, Name) of - false -> - Topic = emqttd_topic:new(Name), - Fun = fun() -> - mnesia:delete_object(Topic), - case mnesia:read(topic, Name) of - [] -> trie_delete(Name); - _ -> ignore - end - end, - mnesia:transaction(Fun); - true -> - ok - end. +%%try_remove_topic(Name) when is_binary(Name) -> +%% case ets:member(topic_subscriber, Name) of +%% false -> +%% Topic = emqttd_topic:new(Name), +%% Fun = fun() -> +%% mnesia:delete_object(Topic), +%% case mnesia:read(topic, Name) of +%% [] -> trie_delete(Name); +%% _ -> ignore +%% end +%% end, +%% mnesia:transaction(Fun); +%% true -> +%% ok +%% end. trie_add(Topic) when is_binary(Topic) -> mnesia:write(emqttd_topic:new(Topic)), @@ -313,7 +327,7 @@ trie_add(Topic) when is_binary(Topic) -> [TrieNode=#topic_trie_node{topic=undefined}] -> mnesia:write(TrieNode#topic_trie_node{topic=Topic}); [#topic_trie_node{topic=Topic}] -> - {atomic, already_exist}; + ok; [] -> %add trie path [trie_add_path(Triple) || Triple <- emqttd_topic:triples(Topic)], @@ -389,7 +403,7 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) -> setstats(State = #state{max_subs = Max}) -> emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)), - SubCount = ets:info(topic_subscriber, size), + SubCount = mnesia:table_info(topic_subscriber, size), emqttd_broker:setstat('subscribers/count', SubCount), if SubCount > Max -> diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 9754f8bd7..bf9831135 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -185,7 +185,7 @@ subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Top _ -> lager:warning("~s resubscribe ~p", [ClientId, Resubs]) end, SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics), - {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics, self()), + {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), %%TODO: should be gen_event and notification... emqttd_server:subscribe([ Name || {Name, _} <- Topics ], self()), {ok, SessState#session_state{submap = SubMap1}, GrantedQos}; @@ -208,7 +208,7 @@ unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, T BadUnsubs -> lager:warning("~s should not unsubscribe ~p", [ClientId, BadUnsubs]) end, %%unsubscribe from topic tree - ok = emqttd_pubsub:unsubscribe(Topics, self()), + ok = emqttd_pubsub:unsubscribe(Topics), SubMap1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) end, SubMap, Topics), {ok, SessState#session_state{submap = SubMap1}}; From 18a5da66911bf324a47684e840d6432acd59c082 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 22 Mar 2015 23:06:31 +0800 Subject: [PATCH 06/10] 0.5.3 --- apps/emqttd/src/emqttd.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqttd/src/emqttd.app.src b/apps/emqttd/src/emqttd.app.src index a61daf48e..2b78e3bdb 100644 --- a/apps/emqttd/src/emqttd.app.src +++ b/apps/emqttd/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.5.0"}, + {vsn, "0.5.3"}, {modules, []}, {registered, []}, {applications, [kernel, From 3a7103a728f1decf3adc7185fd091a33317bddb1 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 23 Mar 2015 00:06:31 +0800 Subject: [PATCH 07/10] control commands --- apps/emqttd/src/emqttd_ctl.erl | 10 +++++--- rel/files/emqttd_ctl | 45 +++++++++++++++++++++++++++++++--- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/apps/emqttd/src/emqttd_ctl.erl b/apps/emqttd/src/emqttd_ctl.erl index 4fa2433a2..da8d2cf60 100644 --- a/apps/emqttd/src/emqttd_ctl.erl +++ b/apps/emqttd/src/emqttd_ctl.erl @@ -38,6 +38,8 @@ -export([status/1, broker/1, + stats/1, + metrics/1, cluster/1, listeners/1, bridges/1, @@ -84,12 +86,12 @@ userdel([Username]) -> broker([]) -> Funs = [sysdescr, version, uptime, datetime], - [?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs]; + [?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs]. -broker(["stats"]) -> - [?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_broker:getstats()]; +stats([]) -> + [?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_broker:getstats()]. -broker(["metrics"]) -> +metrics([]) -> [?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()]. listeners([]) -> diff --git a/rel/files/emqttd_ctl b/rel/files/emqttd_ctl index 35c550ca0..2e751b9d4 100755 --- a/rel/files/emqttd_ctl +++ b/rel/files/emqttd_ctl @@ -149,8 +149,8 @@ case "$1" in ;; broker) - if [ $# -gt 2 ]; then - echo "Usage: $SCRIPT broker [status | stats | metrics]" + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT broker" exit 1 fi @@ -165,6 +165,41 @@ case "$1" in $NODETOOL rpc emqttd_ctl broker $@ ;; + stats) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT stats" + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqttd_ctl stats $@ + ;; + + metrics) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT metrics" + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqttd_ctl metrics $@ + ;; + + bridges) # Make sure the local node IS running RES=`$NODETOOL ping` @@ -223,8 +258,10 @@ case "$1" in *) echo "Usage: $SCRIPT" - echo " status #query status" - echo " broker [stats | metrics] #query broker stats or metrics" + echo " status #query broker status" + echo " broker #query broker version, uptime and description" + echo " stats #query broker statistics of clients, topics, subscribers" + echo " metrics #query broker metrics" echo " cluster [] #query or cluster nodes" echo " plugins list #query loaded plugins" echo " plugins load #load plugin" From 40b07c5e4c0cc10f2ee12770296ad37677885122 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 23 Mar 2015 01:42:53 +0800 Subject: [PATCH 08/10] 0.5.4 --- CHANGELOG.md | 24 ++++++++++++++++++++++++ apps/emqttd/src/emqttd.app.src | 2 +- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b6faf9368..384dea81b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,30 @@ eMQTTD ChangeLog ================== +0.5.4-alpha (2015-03-22) +------------------------- + +Benchmark this release on a ubuntu/14.04 server with 8 cores, 32G memory from QingCloud.com: + +``` +200K Connections, +30K Messages/Sec, +20Mbps In/Out Traffic, +200K Topics, +200K Subscribers, + +Consumed 7G memory, 40% CPU/core +``` + +Benchmark code: https://github.com/emqtt/emqttd_benchmark + +Change: rewrite emqttd_pubsub to handle more concurrent subscribe requests. + +Change: ./bin/emqttd_ctl add 'stats', 'metrics' commands. + +Bugfix: issue #71, #72 + + 0.5.3-alpha (2015-03-19) ------------------------- diff --git a/apps/emqttd/src/emqttd.app.src b/apps/emqttd/src/emqttd.app.src index 2b78e3bdb..3c0bbd24c 100644 --- a/apps/emqttd/src/emqttd.app.src +++ b/apps/emqttd/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.5.3"}, + {vsn, "0.5.4"}, {modules, []}, {registered, []}, {applications, [kernel, From 7e3ce51a0a81c0ff6f901a9585cc27f7c68ebc09 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 23 Mar 2015 02:19:20 +0800 Subject: [PATCH 09/10] benchmark --- README.md | 45 +++++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 24b43fe21..c36750269 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,24 @@ -# eMQTT [![Build Status](https://travis-ci.org/emqtt/emqttd.svg?branch=master)](https://travis-ci.org/emqtt/emqttd) +# eMQTTD [![Build Status](https://travis-ci.org/emqtt/emqttd.svg?branch=master)](https://travis-ci.org/emqtt/emqttd) -eMQTT is a clusterable, massively scalable, fault-tolerant and extensible MQTT V3.1/V3.1.1 broker written in Erlang/OTP. +eMQTTD is a clusterable, massively scalable, fault-tolerant and extensible MQTT V3.1/V3.1.1 broker written in Erlang/OTP. -eMQTT support MQTT V3.1/V3.1.1 Protocol Specification. +eMQTTD support MQTT V3.1/V3.1.1 Protocol Specification. + +eMQTTD requires Erlang R17+. + + +## Benchmark + +Benchmark 0.5.4-alpha on a ubuntu/14.04 server with 8 cores, 32G memory from QingCloud: + +200K Connections, 200K Topics, 20K Messages/sec, 20Mbps In/Out with 7G Memory, 40%CPU/core -eMQTT requires Erlang R17+. ## NOTICE eMQTTD still cannot not handle massive retained messages. + ## Featues Full MQTT V3.1.1 Support @@ -49,7 +58,7 @@ $ make && make dist $ cd rel/emqtt -$ ./bin/emqtt console +$ ./bin/emqttd console ``` ## Deploy and Start @@ -57,18 +66,18 @@ $ ./bin/emqtt console ### start ``` -cp -R rel/emqtt $INSTALL_DIR +cp -R rel/emqttd $INSTALL_DIR -cd $INSTALL_DIR/emqtt +cd $INSTALL_DIR/emqttd -./bin/emqtt start +./bin/emqttd start ``` ### stop ``` -./bin/emqtt stop +./bin/emqttd stop ``` @@ -77,7 +86,7 @@ cd $INSTALL_DIR/emqtt ### etc/app.config ``` - {emqtt, [ + {emqttd, [ {auth, {anonymous, []}}, %internal, anonymous {listen, [ {mqtt, 1883, [ @@ -104,7 +113,7 @@ cd $INSTALL_DIR/emqtt ``` --name emqtt@127.0.0.1 +-name emqttd@127.0.0.1 -setcookie emqtt @@ -113,7 +122,7 @@ cd $INSTALL_DIR/emqtt When nodes clustered, vm.args should be configured as below: ``` --name emqtt@host1 +-name emqttd@host1 ``` ## Cluster @@ -123,22 +132,22 @@ Suppose we cluster two nodes on 'host1', 'host2', Steps: on 'host1': ``` -./bin/emqtt start +./bin/emqttd start ``` on 'host2': ``` -./bin/emqtt start +./bin/emqttd start -./bin/emqtt_ctl cluster emqtt@host1 +./bin/emqttd_ctl cluster emqttd@host1 ``` -Run './bin/emqtt_ctl cluster' on 'host1' or 'host2' to check cluster nodes. +Run './bin/emqttd_ctl cluster' on 'host1' or 'host2' to check cluster nodes. ## HTTP API -eMQTT support http to publish message. +eMQTTD support http to publish message. Example: @@ -163,7 +172,7 @@ message | Message ## Design -[Design Wiki](https://github.com/emqtt/emqtt/wiki) +[Design Wiki](https://github.com/emqtt/emqttd/wiki) ## License From 30d31b3135600a44f13eb4eded0c784ddafe5592 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 23 Mar 2015 02:21:38 +0800 Subject: [PATCH 10/10] emqttd --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c36750269..bad673d56 100644 --- a/README.md +++ b/README.md @@ -50,13 +50,13 @@ Bridge brokers locally or remotelly ## Startup in Five Minutes ``` -$ git clone git://github.com/emqtt/emqtt.git +$ git clone git://github.com/emqtt/emqttd.git -$ cd emqtt +$ cd emqttd $ make && make dist -$ cd rel/emqtt +$ cd rel/emqttd $ ./bin/emqttd console ```