From 786ffa2ab8d94160da3b96ef73c8f083310e65aa Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 15 Aug 2015 22:31:18 +0800 Subject: [PATCH 01/12] inflight 100 --- rel/files/emqttd.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index ff30db38d..3a8155feb 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -89,7 +89,7 @@ {session, [ %% Max number of QoS 1 and 2 messages that can be “in flight” at one time. %% 0 means no limit - {max_inflight, 20}, + {max_inflight, 100}, %% Max retries for unack Qos1/2 messages {unack_retries, 3}, From df4321423307346f0aa893b04480137f66cc74f8 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 15 Aug 2015 22:31:29 +0800 Subject: [PATCH 02/12] comment log --- src/emqttd_session.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index edf8dfc66..726a83b51 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -476,8 +476,7 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = unde %% just remove awaiting noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); -handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId, - inflight_queue = InflightQ, +handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ, awaiting_ack = AwaitingAck}) -> case maps:find(PktId, AwaitingAck) of {ok, {{0, _Timeout}, _TRef}} -> @@ -489,8 +488,9 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck), {noreply, Session#session{awaiting_ack = AwaitingAck1}}; error -> - lager:error([{client, ClientId}], "Session ~s " - "Cannot find Awaiting Ack:~p", [ClientId, PktId]), + % TODO: too many logs when overloaded... + % lager:error([{client, ClientId}], "Session ~s " + % "Cannot find Awaiting Ack:~p", [ClientId, PktId]), {noreply, Session} end; From 768bc2ed1d781f25e342f8573a521c71b2fdfdde Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 15 Aug 2015 23:33:51 +0800 Subject: [PATCH 03/12] hibernate --- src/emqttd_client.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index c2c08607f..c013c3055 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -110,11 +110,11 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), - {noreply, State#state{proto_state = ProtoState1}}; + {noreply, State#state{proto_state = ProtoState1}, hibernate}; handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), - {noreply, State#state{proto_state = ProtoState1}}; + {noreply, State#state{proto_state = ProtoState1}, hibernate}; handle_info({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), From 34034b3969b67d2f3927340711c664be35ed8cb1 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 16 Aug 2015 00:11:53 +0800 Subject: [PATCH 04/12] 0.10.0 --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a63afae5..8ccdcd304 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ MySQL Authentication and ACL Plugin Session Statistics +Session Improve + 0.9.3-alpha (2015-07-25) ------------------------- From d68e749b34fbe64cf8dbe4fe8048913507cf6e08 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 16 Aug 2015 00:12:29 +0800 Subject: [PATCH 05/12] Mobile --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index feea2727d..997fef82f 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ emqttd requires Erlang R17+ to build. ## Goals -emqttd is aimed to provide a solid, enterprise grade, extensible open-source MQTT broker for IoT(M2M) applications that need to support ten millions of concurrent MQTT clients. +emqttd is aimed to provide a solid, enterprise grade, extensible open-source MQTT broker for IoT, M2M and Mobile applications that need to support ten millions of concurrent MQTT clients. * Easy to install * Massively scalable @@ -130,3 +130,4 @@ The MIT License (MIT) Feng Lee + From 27336f98594bd0e39b38d5e24d3a80da3a753687 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 16 Aug 2015 00:12:36 +0800 Subject: [PATCH 06/12] timestamp --- src/emqttd_guid.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_guid.erl b/src/emqttd_guid.erl index d9efd3620..65fc172cc 100644 --- a/src/emqttd_guid.erl +++ b/src/emqttd_guid.erl @@ -38,7 +38,7 @@ -module(emqttd_guid). --export([gen/0, new/0]). +-export([gen/0, new/0, timestamp/1]). -define(MAX_SEQ, 16#FFFF). From 2f1c03a4693cfcfcb3edbe245d09f0229240fb04 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 16 Aug 2015 00:36:56 +0800 Subject: [PATCH 07/12] misc fix --- src/emqttd_guid.erl | 10 ++++++++-- src/emqttd_log.erl | 1 + src/emqttd_throttle.erl | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/emqttd_guid.erl b/src/emqttd_guid.erl index 65fc172cc..bf668a384 100644 --- a/src/emqttd_guid.erl +++ b/src/emqttd_guid.erl @@ -38,6 +38,8 @@ -module(emqttd_guid). +-author("Feng Lee "). + -export([gen/0, new/0, timestamp/1]). -define(MAX_SEQ, 16#FFFF). @@ -54,17 +56,21 @@ gen() -> undefined -> new(); {_Ts, NPid, Seq} -> next(NPid, Seq) end, - put(guid, Guid), enc(Guid). + put(guid, Guid), bin(Guid). new() -> {ts(), npid(), 0}. +-spec timestamp(guid()) -> integer(). +timestamp(<>) -> + Ts. + next(NPid, Seq) when Seq >= ?MAX_SEQ -> {ts(), NPid, 0}; next(NPid, Seq) -> {ts(), NPid, Seq + 1}. -enc({Ts, NPid, Seq}) -> +bin({Ts, NPid, Seq}) -> <>. ts() -> diff --git a/src/emqttd_log.erl b/src/emqttd_log.erl index 9aafdb6d3..96522ae9d 100644 --- a/src/emqttd_log.erl +++ b/src/emqttd_log.erl @@ -26,6 +26,7 @@ %%%----------------------------------------------------------------------------- %% TODO: issue #103 +%% 0.12.0 ??? -module(emqttd_log). diff --git a/src/emqttd_throttle.erl b/src/emqttd_throttle.erl index 0eb095505..256ae27d4 100644 --- a/src/emqttd_throttle.erl +++ b/src/emqttd_throttle.erl @@ -28,5 +28,5 @@ -author("Feng Lee "). -%% TODO:... 0.10.0... +%% TODO:... 0.11.0... From 24512a1c1db98bc5ed941717392c558c80e5124f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 16 Aug 2015 11:06:47 +0800 Subject: [PATCH 08/12] gen_server2 --- src/emqttd_cm.erl | 18 +++++++++--------- src/emqttd_pubsub.erl | 22 +++++++++++++++++----- src/emqttd_sm.erl | 25 +++++++++++++++++++------ 3 files changed, 45 insertions(+), 20 deletions(-) diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index 55359ec6b..9c28654eb 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -30,7 +30,7 @@ -include("emqttd.hrl"). --behaviour(gen_server). +-behaviour(gen_server2). -define(SERVER, ?MODULE). @@ -59,7 +59,7 @@ Id :: pos_integer(), StatsFun :: fun(). start_link(Id, StatsFun) -> - gen_server:start_link(?MODULE, [Id, StatsFun], []). + gen_server2:start_link(?MODULE, [Id, StatsFun], []). pool() -> ?CM_POOL. @@ -81,7 +81,7 @@ lookup(ClientId) when is_binary(ClientId) -> -spec register(Client :: mqtt_client()) -> ok. register(Client = #mqtt_client{client_id = ClientId}) -> CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), - gen_server:cast(CmPid, {register, Client}). + gen_server2:cast(CmPid, {register, Client}). %%------------------------------------------------------------------------------ %% @doc Unregister clientId with pid. @@ -90,7 +90,7 @@ register(Client = #mqtt_client{client_id = ClientId}) -> -spec unregister(ClientId :: binary()) -> ok. unregister(ClientId) when is_binary(ClientId) -> CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), - gen_server:cast(CmPid, {unregister, ClientId, self()}). + gen_server2:cast(CmPid, {unregister, ClientId, self()}). %%%============================================================================= %%% gen_server callbacks @@ -105,7 +105,6 @@ handle_call(Req, _From, State) -> {reply, {error, badreq}, State}. handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = Pid}}, State) -> - lager:info("CM register ~s with ~p", [ClientId, Pid]), case ets:lookup(mqtt_client, ClientId) of [#mqtt_client{client_pid = Pid}] -> lager:error("ClientId '~s' has been registered with ~p", [ClientId, Pid]), @@ -119,21 +118,22 @@ handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = {noreply, setstats(State)}; handle_cast({unregister, ClientId, Pid}, State) -> - lager:info("CM unregister ~s with ~p", [ClientId, Pid]), case ets:lookup(mqtt_client, ClientId) of [#mqtt_client{client_pid = Pid}] -> ets:delete(mqtt_client, ClientId); [_] -> ignore; [] -> - lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid]) + lager:error("Cannot find clientId '~s' with ~p", [ClientId, Pid]) end, {noreply, setstats(State)}; -handle_cast(_Msg, State) -> +handle_cast(Msg, State) -> + lager:critical("Unexpected Msg: ~p", [Msg]), {noreply, State}. -handle_info(_Info, State) -> +handle_info(Info, State) -> + lager:critical("Unexpected Msg: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{id = Id}) -> diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 1420fb2df..4c89cd63a 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -51,12 +51,15 @@ -export([dispatch/2, match/1]). --behaviour(gen_server). +-behaviour(gen_server2). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% gen_server2 priorities +-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). + -define(POOL, pubsub). -record(state, {id, submap :: map()}). @@ -104,7 +107,7 @@ mnesia(copy) -> Id :: pos_integer(), Opts :: list(). start_link(Id, Opts) -> - gen_server:start_link(?MODULE, [Id, Opts], []). + gen_server2:start_link(?MODULE, [Id, Opts], []). %%------------------------------------------------------------------------------ %% @doc Create topic. Notice That this transaction is not protected by pubsub pool @@ -157,11 +160,11 @@ unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> call(Req) -> Pid = gproc_pool:pick_worker(?POOL, self()), - gen_server:call(Pid, Req, infinity). + gen_server2:call(Pid, Req, infinity). cast(Msg) -> Pid = gproc_pool:pick_worker(?POOL, self()), - gen_server:cast(Pid, Msg). + gen_server2:cast(Pid, Msg). %%------------------------------------------------------------------------------ %% @doc Publish to cluster nodes @@ -232,6 +235,15 @@ init([Id, _Opts]) -> gproc_pool:connect_worker(pubsub, {?MODULE, Id}), {ok, #state{id = Id, submap = maps:new()}}. +prioritise_call(_Msg, _From, _Len, _State) -> + 1. + +prioritise_cast(_Msg, _Len, _State) -> + 0. + +prioritise_info(_Msg, _Len, _State) -> + 1. + handle_call({subscribe, SubPid, Topics}, _From, State) -> TopicSubs = lists:map(fun({<<"$Q/", _/binary>> = Queue, Qos}) -> #mqtt_queue{name = Queue, qpid = SubPid, qos = Qos}; @@ -363,7 +375,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa end; handle_info(Info, State) -> - lager:error("Unexpected Info: ~p", [Info]), + lager:critical("Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index f1e3ae567..6d52c40ab 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -44,12 +44,15 @@ -export([register_session/3, unregister_session/2]). --behaviour(gen_server). +-behaviour(gen_server2). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% gen_server2 priorities +-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). + -record(state, {id, statsfun}). -define(SM_POOL, sm_pool). @@ -82,7 +85,7 @@ mnesia(copy) -> Id :: pos_integer(), StatsFun :: fun(). start_link(Id, StatsFun) -> - gen_server:start_link(?MODULE, [Id, StatsFun], []). + gen_server2:start_link(?MODULE, [Id, StatsFun], []). %%------------------------------------------------------------------------------ %% @doc Pool name. @@ -123,7 +126,7 @@ register_session(true, ClientId, Info) -> register_session(false, ClientId, Info) -> SM = gproc_pool:pick_worker(?SM_POOL, ClientId), - gen_server:cast(SM, {register, ClientId, Info}). + gen_server2:cast(SM, {register, ClientId, Info}). %%------------------------------------------------------------------------------ %% @doc Unregister a session. @@ -136,9 +139,9 @@ unregister_session(true, ClientId) -> ets:delete(mqtt_transient_session, ClientId); unregister_session(false, ClientId) -> SM = gproc_pool:pick_worker(?SM_POOL, ClientId), - gen_server:cast(SM, {unregister, ClientId}). + gen_server2:cast(SM, {unregister, ClientId}). -call(SM, Req) -> gen_server:call(SM, Req, infinity). +call(SM, Req) -> gen_server2:call(SM, Req, infinity). %%%============================================================================= %%% gen_server callbacks @@ -148,6 +151,15 @@ init([Id, StatsFun]) -> gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}), {ok, #state{id = Id, statsfun = StatsFun}}. +prioritise_call(_Msg, _From, _Len, _State) -> + 1. + +prioritise_cast(_Msg, _Len, _State) -> + 0. + +prioritise_info(_Msg, _Len, _State) -> + 1. + %% persistent session handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> case lookup_session(ClientId) of @@ -194,7 +206,8 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) -> end), {noreply, setstats(State)}; -handle_info(_Info, State) -> +handle_info(Info, State) -> + lager:critical("Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{id = Id}) -> From 11298a9581fe66faf0331e6a84da34c48e04533d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 16 Aug 2015 11:31:39 +0800 Subject: [PATCH 09/12] contributors --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 997fef82f..b13943559 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ emqttd is aimed to provide a solid, enterprise grade, extensible open-source MQT * TCP/SSL Connection Support * MQTT Over Websocket(SSL) Support * HTTP Publish API Support -* [$SYS/borkers/#](https://github.com/emqtt/emqtt/wiki/$SYS-Topics-of-Broker) Support +* [$SYS/brokers/#](https://github.com/emqtt/emqtt/wiki/$SYS-Topics-of-Broker) Support * Client Authentication with clientId, ipaddress * Client Authentication with username, password. * Client ACL control with ipaddress, clientid, username. @@ -124,6 +124,7 @@ The MIT License (MIT) * [@Hades32](https://github.com/Hades32) * [@huangdan](https://github.com/huangdan) * [@phanimahesh](https://github.com/phanimahesh) +* [@dvliman](https://github.com/dvliman) ## Author From f9eeab89a2c72b601161bccac36e970341462001 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 16 Aug 2015 22:01:04 +0800 Subject: [PATCH 10/12] fix gen_server2 issue, https://github.com/rabbitmq/rabbitmq-server/issues/268 --- src/emqttd_retained.erl | 2 +- src/emqttd_session.erl | 1 + src/gen_server2.erl | 9 +++++++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/emqttd_retained.erl b/src/emqttd_retained.erl index adbe82e64..633618557 100644 --- a/src/emqttd_retained.erl +++ b/src/emqttd_retained.erl @@ -122,5 +122,5 @@ dispatch(Topic, CPid) when is_binary(Topic) -> end, mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained]) end, - [CPid ! {dispatch, Msg} || Msg <- Msgs]. + lists:foreach(fun(Msg) -> CPid ! {dispatch, Msg} end, lists:reverse(Msgs)). diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 726a83b51..db68b368d 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -268,6 +268,7 @@ prioritise_info(Msg, _Len, _State) -> session_expired -> 10; {timeout, _, _} -> 5; collect_info -> 2; + {dispatch, _} -> 1; _ -> 0 end. diff --git a/src/gen_server2.erl b/src/gen_server2.erl index fd0e6553b..5c04f3f9e 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -633,8 +633,13 @@ extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> %%% The MAIN loop. %%% --------------------------------------------------- loop(GS2State = #gs2_state { time = hibernate, - timeout_state = undefined }) -> - pre_hibernate(GS2State); + timeout_state = undefined, + queue = Queue }) -> + case priority_queue:is_empty(Queue) of + true -> pre_hibernate(GS2State); + false -> process_next_msg(GS2State) + end; + loop(GS2State) -> process_next_msg(drain(GS2State)). From 991d6584380f78e25b2a0276dc93baf1c90ecf8b Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 16 Aug 2015 22:01:41 +0800 Subject: [PATCH 11/12] add more match tests --- test/emqttd_topic_tests.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/emqttd_topic_tests.erl b/test/emqttd_topic_tests.erl index a21b6c520..77a09b6e8 100644 --- a/test/emqttd_topic_tests.erl +++ b/test/emqttd_topic_tests.erl @@ -52,7 +52,11 @@ match_test() -> ?assert( match(<<"sport">>, <<"sport/#">>) ), ?assert( match(<<"sport">>, <<"#">>) ), - ?assert( match(<<"/sport/football/score/1">>, <<"#">>) ). + ?assert( match(<<"/sport/football/score/1">>, <<"#">>) ), + %% paho test + ?assert( match(<<"Topic/C">>, <<"+/+">>) ), + ?assert( match(<<"TopicA/B">>, <<"+/+">>) ), + ?assert( match(<<"TopicA/C">>, <<"+/+">>) ). sigle_level_match_test() -> ?assert( match(<<"sport/tennis/player1">>, <<"sport/tennis/+">>) ), From 6f1e80ae3bc14eb1fe7b8b15334e199dbb366777 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 16 Aug 2015 22:04:31 +0800 Subject: [PATCH 12/12] update --- plugins/emqttd_dashboard | 2 +- plugins/emqttd_plugin_template | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/emqttd_dashboard b/plugins/emqttd_dashboard index ca3182f5d..281d39be6 160000 --- a/plugins/emqttd_dashboard +++ b/plugins/emqttd_dashboard @@ -1 +1 @@ -Subproject commit ca3182f5ddf9b2776f826471b8ccc698218697f8 +Subproject commit 281d39be66f867b2144adfa37431ac10ba0fbbc7 diff --git a/plugins/emqttd_plugin_template b/plugins/emqttd_plugin_template index 3d5d2ccab..d10ee3dcd 160000 --- a/plugins/emqttd_plugin_template +++ b/plugins/emqttd_plugin_template @@ -1 +1 @@ -Subproject commit 3d5d2ccabdde2d0381bcd17c803be5e42b3fec90 +Subproject commit d10ee3dcdaf4e7d17f50ed0745c39628a96678e2