diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a63afae5..8ccdcd304 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ MySQL Authentication and ACL Plugin Session Statistics +Session Improve + 0.9.3-alpha (2015-07-25) ------------------------- diff --git a/README.md b/README.md index dee226720..b13943559 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ emqttd requires Erlang R17+ to build. ## Goals -emqttd is aimed to provide a solid, enterprise grade, extensible open-source MQTT broker for IoT(M2M) applications that need to support ten millions of concurrent MQTT clients. +emqttd is aimed to provide a solid, enterprise grade, extensible open-source MQTT broker for IoT, M2M and Mobile applications that need to support ten millions of concurrent MQTT clients. * Easy to install * Massively scalable @@ -124,9 +124,11 @@ The MIT License (MIT) * [@Hades32](https://github.com/Hades32) * [@huangdan](https://github.com/huangdan) * [@phanimahesh](https://github.com/phanimahesh) +* [@dvliman](https://github.com/dvliman) ## Author Feng Lee + diff --git a/plugins/emqttd_dashboard b/plugins/emqttd_dashboard index ca3182f5d..281d39be6 160000 --- a/plugins/emqttd_dashboard +++ b/plugins/emqttd_dashboard @@ -1 +1 @@ -Subproject commit ca3182f5ddf9b2776f826471b8ccc698218697f8 +Subproject commit 281d39be66f867b2144adfa37431ac10ba0fbbc7 diff --git a/plugins/emqttd_plugin_template b/plugins/emqttd_plugin_template index 3d5d2ccab..d10ee3dcd 160000 --- a/plugins/emqttd_plugin_template +++ b/plugins/emqttd_plugin_template @@ -1 +1 @@ -Subproject commit 3d5d2ccabdde2d0381bcd17c803be5e42b3fec90 +Subproject commit d10ee3dcdaf4e7d17f50ed0745c39628a96678e2 diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index ff30db38d..3a8155feb 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -89,7 +89,7 @@ {session, [ %% Max number of QoS 1 and 2 messages that can be “in flight” at one time. %% 0 means no limit - {max_inflight, 20}, + {max_inflight, 100}, %% Max retries for unack Qos1/2 messages {unack_retries, 3}, diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index c2c08607f..c013c3055 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -110,11 +110,11 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), - {noreply, State#state{proto_state = ProtoState1}}; + {noreply, State#state{proto_state = ProtoState1}, hibernate}; handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), - {noreply, State#state{proto_state = ProtoState1}}; + {noreply, State#state{proto_state = ProtoState1}, hibernate}; handle_info({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index 55359ec6b..9c28654eb 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -30,7 +30,7 @@ -include("emqttd.hrl"). --behaviour(gen_server). +-behaviour(gen_server2). -define(SERVER, ?MODULE). @@ -59,7 +59,7 @@ Id :: pos_integer(), StatsFun :: fun(). start_link(Id, StatsFun) -> - gen_server:start_link(?MODULE, [Id, StatsFun], []). + gen_server2:start_link(?MODULE, [Id, StatsFun], []). pool() -> ?CM_POOL. @@ -81,7 +81,7 @@ lookup(ClientId) when is_binary(ClientId) -> -spec register(Client :: mqtt_client()) -> ok. register(Client = #mqtt_client{client_id = ClientId}) -> CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), - gen_server:cast(CmPid, {register, Client}). + gen_server2:cast(CmPid, {register, Client}). %%------------------------------------------------------------------------------ %% @doc Unregister clientId with pid. @@ -90,7 +90,7 @@ register(Client = #mqtt_client{client_id = ClientId}) -> -spec unregister(ClientId :: binary()) -> ok. unregister(ClientId) when is_binary(ClientId) -> CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), - gen_server:cast(CmPid, {unregister, ClientId, self()}). + gen_server2:cast(CmPid, {unregister, ClientId, self()}). %%%============================================================================= %%% gen_server callbacks @@ -105,7 +105,6 @@ handle_call(Req, _From, State) -> {reply, {error, badreq}, State}. handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = Pid}}, State) -> - lager:info("CM register ~s with ~p", [ClientId, Pid]), case ets:lookup(mqtt_client, ClientId) of [#mqtt_client{client_pid = Pid}] -> lager:error("ClientId '~s' has been registered with ~p", [ClientId, Pid]), @@ -119,21 +118,22 @@ handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = {noreply, setstats(State)}; handle_cast({unregister, ClientId, Pid}, State) -> - lager:info("CM unregister ~s with ~p", [ClientId, Pid]), case ets:lookup(mqtt_client, ClientId) of [#mqtt_client{client_pid = Pid}] -> ets:delete(mqtt_client, ClientId); [_] -> ignore; [] -> - lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid]) + lager:error("Cannot find clientId '~s' with ~p", [ClientId, Pid]) end, {noreply, setstats(State)}; -handle_cast(_Msg, State) -> +handle_cast(Msg, State) -> + lager:critical("Unexpected Msg: ~p", [Msg]), {noreply, State}. -handle_info(_Info, State) -> +handle_info(Info, State) -> + lager:critical("Unexpected Msg: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{id = Id}) -> diff --git a/src/emqttd_guid.erl b/src/emqttd_guid.erl index d9efd3620..bf668a384 100644 --- a/src/emqttd_guid.erl +++ b/src/emqttd_guid.erl @@ -38,7 +38,9 @@ -module(emqttd_guid). --export([gen/0, new/0]). +-author("Feng Lee "). + +-export([gen/0, new/0, timestamp/1]). -define(MAX_SEQ, 16#FFFF). @@ -54,17 +56,21 @@ gen() -> undefined -> new(); {_Ts, NPid, Seq} -> next(NPid, Seq) end, - put(guid, Guid), enc(Guid). + put(guid, Guid), bin(Guid). new() -> {ts(), npid(), 0}. +-spec timestamp(guid()) -> integer(). +timestamp(<>) -> + Ts. + next(NPid, Seq) when Seq >= ?MAX_SEQ -> {ts(), NPid, 0}; next(NPid, Seq) -> {ts(), NPid, Seq + 1}. -enc({Ts, NPid, Seq}) -> +bin({Ts, NPid, Seq}) -> <>. ts() -> diff --git a/src/emqttd_log.erl b/src/emqttd_log.erl index 9aafdb6d3..96522ae9d 100644 --- a/src/emqttd_log.erl +++ b/src/emqttd_log.erl @@ -26,6 +26,7 @@ %%%----------------------------------------------------------------------------- %% TODO: issue #103 +%% 0.12.0 ??? -module(emqttd_log). diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 1420fb2df..4c89cd63a 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -51,12 +51,15 @@ -export([dispatch/2, match/1]). --behaviour(gen_server). +-behaviour(gen_server2). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% gen_server2 priorities +-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). + -define(POOL, pubsub). -record(state, {id, submap :: map()}). @@ -104,7 +107,7 @@ mnesia(copy) -> Id :: pos_integer(), Opts :: list(). start_link(Id, Opts) -> - gen_server:start_link(?MODULE, [Id, Opts], []). + gen_server2:start_link(?MODULE, [Id, Opts], []). %%------------------------------------------------------------------------------ %% @doc Create topic. Notice That this transaction is not protected by pubsub pool @@ -157,11 +160,11 @@ unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> call(Req) -> Pid = gproc_pool:pick_worker(?POOL, self()), - gen_server:call(Pid, Req, infinity). + gen_server2:call(Pid, Req, infinity). cast(Msg) -> Pid = gproc_pool:pick_worker(?POOL, self()), - gen_server:cast(Pid, Msg). + gen_server2:cast(Pid, Msg). %%------------------------------------------------------------------------------ %% @doc Publish to cluster nodes @@ -232,6 +235,15 @@ init([Id, _Opts]) -> gproc_pool:connect_worker(pubsub, {?MODULE, Id}), {ok, #state{id = Id, submap = maps:new()}}. +prioritise_call(_Msg, _From, _Len, _State) -> + 1. + +prioritise_cast(_Msg, _Len, _State) -> + 0. + +prioritise_info(_Msg, _Len, _State) -> + 1. + handle_call({subscribe, SubPid, Topics}, _From, State) -> TopicSubs = lists:map(fun({<<"$Q/", _/binary>> = Queue, Qos}) -> #mqtt_queue{name = Queue, qpid = SubPid, qos = Qos}; @@ -363,7 +375,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa end; handle_info(Info, State) -> - lager:error("Unexpected Info: ~p", [Info]), + lager:critical("Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqttd_retained.erl b/src/emqttd_retained.erl index adbe82e64..633618557 100644 --- a/src/emqttd_retained.erl +++ b/src/emqttd_retained.erl @@ -122,5 +122,5 @@ dispatch(Topic, CPid) when is_binary(Topic) -> end, mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained]) end, - [CPid ! {dispatch, Msg} || Msg <- Msgs]. + lists:foreach(fun(Msg) -> CPid ! {dispatch, Msg} end, lists:reverse(Msgs)). diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index edf8dfc66..db68b368d 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -268,6 +268,7 @@ prioritise_info(Msg, _Len, _State) -> session_expired -> 10; {timeout, _, _} -> 5; collect_info -> 2; + {dispatch, _} -> 1; _ -> 0 end. @@ -476,8 +477,7 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = unde %% just remove awaiting noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); -handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId, - inflight_queue = InflightQ, +handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ, awaiting_ack = AwaitingAck}) -> case maps:find(PktId, AwaitingAck) of {ok, {{0, _Timeout}, _TRef}} -> @@ -489,8 +489,9 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck), {noreply, Session#session{awaiting_ack = AwaitingAck1}}; error -> - lager:error([{client, ClientId}], "Session ~s " - "Cannot find Awaiting Ack:~p", [ClientId, PktId]), + % TODO: too many logs when overloaded... + % lager:error([{client, ClientId}], "Session ~s " + % "Cannot find Awaiting Ack:~p", [ClientId, PktId]), {noreply, Session} end; diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index f1e3ae567..6d52c40ab 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -44,12 +44,15 @@ -export([register_session/3, unregister_session/2]). --behaviour(gen_server). +-behaviour(gen_server2). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% gen_server2 priorities +-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). + -record(state, {id, statsfun}). -define(SM_POOL, sm_pool). @@ -82,7 +85,7 @@ mnesia(copy) -> Id :: pos_integer(), StatsFun :: fun(). start_link(Id, StatsFun) -> - gen_server:start_link(?MODULE, [Id, StatsFun], []). + gen_server2:start_link(?MODULE, [Id, StatsFun], []). %%------------------------------------------------------------------------------ %% @doc Pool name. @@ -123,7 +126,7 @@ register_session(true, ClientId, Info) -> register_session(false, ClientId, Info) -> SM = gproc_pool:pick_worker(?SM_POOL, ClientId), - gen_server:cast(SM, {register, ClientId, Info}). + gen_server2:cast(SM, {register, ClientId, Info}). %%------------------------------------------------------------------------------ %% @doc Unregister a session. @@ -136,9 +139,9 @@ unregister_session(true, ClientId) -> ets:delete(mqtt_transient_session, ClientId); unregister_session(false, ClientId) -> SM = gproc_pool:pick_worker(?SM_POOL, ClientId), - gen_server:cast(SM, {unregister, ClientId}). + gen_server2:cast(SM, {unregister, ClientId}). -call(SM, Req) -> gen_server:call(SM, Req, infinity). +call(SM, Req) -> gen_server2:call(SM, Req, infinity). %%%============================================================================= %%% gen_server callbacks @@ -148,6 +151,15 @@ init([Id, StatsFun]) -> gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}), {ok, #state{id = Id, statsfun = StatsFun}}. +prioritise_call(_Msg, _From, _Len, _State) -> + 1. + +prioritise_cast(_Msg, _Len, _State) -> + 0. + +prioritise_info(_Msg, _Len, _State) -> + 1. + %% persistent session handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> case lookup_session(ClientId) of @@ -194,7 +206,8 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) -> end), {noreply, setstats(State)}; -handle_info(_Info, State) -> +handle_info(Info, State) -> + lager:critical("Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{id = Id}) -> diff --git a/src/emqttd_throttle.erl b/src/emqttd_throttle.erl index 0eb095505..256ae27d4 100644 --- a/src/emqttd_throttle.erl +++ b/src/emqttd_throttle.erl @@ -28,5 +28,5 @@ -author("Feng Lee "). -%% TODO:... 0.10.0... +%% TODO:... 0.11.0... diff --git a/src/gen_server2.erl b/src/gen_server2.erl index fd0e6553b..5c04f3f9e 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -633,8 +633,13 @@ extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> %%% The MAIN loop. %%% --------------------------------------------------- loop(GS2State = #gs2_state { time = hibernate, - timeout_state = undefined }) -> - pre_hibernate(GS2State); + timeout_state = undefined, + queue = Queue }) -> + case priority_queue:is_empty(Queue) of + true -> pre_hibernate(GS2State); + false -> process_next_msg(GS2State) + end; + loop(GS2State) -> process_next_msg(drain(GS2State)). diff --git a/test/emqttd_topic_tests.erl b/test/emqttd_topic_tests.erl index a21b6c520..77a09b6e8 100644 --- a/test/emqttd_topic_tests.erl +++ b/test/emqttd_topic_tests.erl @@ -52,7 +52,11 @@ match_test() -> ?assert( match(<<"sport">>, <<"sport/#">>) ), ?assert( match(<<"sport">>, <<"#">>) ), - ?assert( match(<<"/sport/football/score/1">>, <<"#">>) ). + ?assert( match(<<"/sport/football/score/1">>, <<"#">>) ), + %% paho test + ?assert( match(<<"Topic/C">>, <<"+/+">>) ), + ?assert( match(<<"TopicA/B">>, <<"+/+">>) ), + ?assert( match(<<"TopicA/C">>, <<"+/+">>) ). sigle_level_match_test() -> ?assert( match(<<"sport/tennis/player1">>, <<"sport/tennis/+">>) ),