Merge pull request #251 from emqtt/dev

use gen_server2 to improve emqttd_session
This commit is contained in:
Feng Lee 2015-08-16 22:11:24 +08:00
commit 23336d98a4
16 changed files with 84 additions and 38 deletions

View File

@ -13,6 +13,8 @@ MySQL Authentication and ACL Plugin
Session Statistics
Session Improve
0.9.3-alpha (2015-07-25)
-------------------------

View File

@ -8,7 +8,7 @@ emqttd requires Erlang R17+ to build.
## Goals
emqttd is aimed to provide a solid, enterprise grade, extensible open-source MQTT broker for IoT(M2M) applications that need to support ten millions of concurrent MQTT clients.
emqttd is aimed to provide a solid, enterprise grade, extensible open-source MQTT broker for IoT, M2M and Mobile applications that need to support ten millions of concurrent MQTT clients.
* Easy to install
* Massively scalable
@ -124,9 +124,11 @@ The MIT License (MIT)
* [@Hades32](https://github.com/Hades32)
* [@huangdan](https://github.com/huangdan)
* [@phanimahesh](https://github.com/phanimahesh)
* [@dvliman](https://github.com/dvliman)
## Author
Feng Lee <feng@emqtt.io>

@ -1 +1 @@
Subproject commit ca3182f5ddf9b2776f826471b8ccc698218697f8
Subproject commit 281d39be66f867b2144adfa37431ac10ba0fbbc7

@ -1 +1 @@
Subproject commit 3d5d2ccabdde2d0381bcd17c803be5e42b3fec90
Subproject commit d10ee3dcdaf4e7d17f50ed0745c39628a96678e2

View File

@ -89,7 +89,7 @@
{session, [
%% Max number of QoS 1 and 2 messages that can be “in flight” at one time.
%% 0 means no limit
{max_inflight, 20},
{max_inflight, 100},
%% Max retries for unack Qos1/2 messages
{unack_retries, 3},

View File

@ -110,11 +110,11 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState
handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) ->
{ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
{noreply, State#state{proto_state = ProtoState1}, hibernate};
handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = ProtoState}) ->
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
{noreply, State#state{proto_state = ProtoState1}, hibernate};
handle_info({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) ->
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState),

View File

@ -30,7 +30,7 @@
-include("emqttd.hrl").
-behaviour(gen_server).
-behaviour(gen_server2).
-define(SERVER, ?MODULE).
@ -59,7 +59,7 @@
Id :: pos_integer(),
StatsFun :: fun().
start_link(Id, StatsFun) ->
gen_server:start_link(?MODULE, [Id, StatsFun], []).
gen_server2:start_link(?MODULE, [Id, StatsFun], []).
pool() -> ?CM_POOL.
@ -81,7 +81,7 @@ lookup(ClientId) when is_binary(ClientId) ->
-spec register(Client :: mqtt_client()) -> ok.
register(Client = #mqtt_client{client_id = ClientId}) ->
CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
gen_server:cast(CmPid, {register, Client}).
gen_server2:cast(CmPid, {register, Client}).
%%------------------------------------------------------------------------------
%% @doc Unregister clientId with pid.
@ -90,7 +90,7 @@ register(Client = #mqtt_client{client_id = ClientId}) ->
-spec unregister(ClientId :: binary()) -> ok.
unregister(ClientId) when is_binary(ClientId) ->
CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
gen_server:cast(CmPid, {unregister, ClientId, self()}).
gen_server2:cast(CmPid, {unregister, ClientId, self()}).
%%%=============================================================================
%%% gen_server callbacks
@ -105,7 +105,6 @@ handle_call(Req, _From, State) ->
{reply, {error, badreq}, State}.
handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = Pid}}, State) ->
lager:info("CM register ~s with ~p", [ClientId, Pid]),
case ets:lookup(mqtt_client, ClientId) of
[#mqtt_client{client_pid = Pid}] ->
lager:error("ClientId '~s' has been registered with ~p", [ClientId, Pid]),
@ -119,21 +118,22 @@ handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid =
{noreply, setstats(State)};
handle_cast({unregister, ClientId, Pid}, State) ->
lager:info("CM unregister ~s with ~p", [ClientId, Pid]),
case ets:lookup(mqtt_client, ClientId) of
[#mqtt_client{client_pid = Pid}] ->
ets:delete(mqtt_client, ClientId);
[_] ->
ignore;
[] ->
lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid])
lager:error("Cannot find clientId '~s' with ~p", [ClientId, Pid])
end,
{noreply, setstats(State)};
handle_cast(_Msg, State) ->
handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p", [Msg]),
{noreply, State}.
handle_info(_Info, State) ->
handle_info(Info, State) ->
lager:critical("Unexpected Msg: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{id = Id}) ->

View File

@ -38,7 +38,9 @@
-module(emqttd_guid).
-export([gen/0, new/0]).
-author("Feng Lee <feng@emqtt.io>").
-export([gen/0, new/0, timestamp/1]).
-define(MAX_SEQ, 16#FFFF).
@ -54,17 +56,21 @@ gen() ->
undefined -> new();
{_Ts, NPid, Seq} -> next(NPid, Seq)
end,
put(guid, Guid), enc(Guid).
put(guid, Guid), bin(Guid).
new() ->
{ts(), npid(), 0}.
-spec timestamp(guid()) -> integer().
timestamp(<<Ts:64, _/binary>>) ->
Ts.
next(NPid, Seq) when Seq >= ?MAX_SEQ ->
{ts(), NPid, 0};
next(NPid, Seq) ->
{ts(), NPid, Seq + 1}.
enc({Ts, NPid, Seq}) ->
bin({Ts, NPid, Seq}) ->
<<Ts:64, NPid:48, Seq:16>>.
ts() ->

View File

@ -26,6 +26,7 @@
%%%-----------------------------------------------------------------------------
%% TODO: issue #103
%% 0.12.0 ???
-module(emqttd_log).

View File

@ -51,12 +51,15 @@
-export([dispatch/2,
match/1]).
-behaviour(gen_server).
-behaviour(gen_server2).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% gen_server2 priorities
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
-define(POOL, pubsub).
-record(state, {id, submap :: map()}).
@ -104,7 +107,7 @@ mnesia(copy) ->
Id :: pos_integer(),
Opts :: list().
start_link(Id, Opts) ->
gen_server:start_link(?MODULE, [Id, Opts], []).
gen_server2:start_link(?MODULE, [Id, Opts], []).
%%------------------------------------------------------------------------------
%% @doc Create topic. Notice That this transaction is not protected by pubsub pool
@ -157,11 +160,11 @@ unsubscribe(Topics = [Topic|_]) when is_binary(Topic) ->
call(Req) ->
Pid = gproc_pool:pick_worker(?POOL, self()),
gen_server:call(Pid, Req, infinity).
gen_server2:call(Pid, Req, infinity).
cast(Msg) ->
Pid = gproc_pool:pick_worker(?POOL, self()),
gen_server:cast(Pid, Msg).
gen_server2:cast(Pid, Msg).
%%------------------------------------------------------------------------------
%% @doc Publish to cluster nodes
@ -232,6 +235,15 @@ init([Id, _Opts]) ->
gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
{ok, #state{id = Id, submap = maps:new()}}.
prioritise_call(_Msg, _From, _Len, _State) ->
1.
prioritise_cast(_Msg, _Len, _State) ->
0.
prioritise_info(_Msg, _Len, _State) ->
1.
handle_call({subscribe, SubPid, Topics}, _From, State) ->
TopicSubs = lists:map(fun({<<"$Q/", _/binary>> = Queue, Qos}) ->
#mqtt_queue{name = Queue, qpid = SubPid, qos = Qos};
@ -363,7 +375,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa
end;
handle_info(Info, State) ->
lager:error("Unexpected Info: ~p", [Info]),
lager:critical("Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -122,5 +122,5 @@ dispatch(Topic, CPid) when is_binary(Topic) ->
end,
mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained])
end,
[CPid ! {dispatch, Msg} || Msg <- Msgs].
lists:foreach(fun(Msg) -> CPid ! {dispatch, Msg} end, lists:reverse(Msgs)).

View File

@ -268,6 +268,7 @@ prioritise_info(Msg, _Len, _State) ->
session_expired -> 10;
{timeout, _, _} -> 5;
collect_info -> 2;
{dispatch, _} -> 1;
_ -> 0
end.
@ -476,8 +477,7 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = unde
%% just remove awaiting
noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)});
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId,
inflight_queue = InflightQ,
handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ,
awaiting_ack = AwaitingAck}) ->
case maps:find(PktId, AwaitingAck) of
{ok, {{0, _Timeout}, _TRef}} ->
@ -489,8 +489,9 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id =
AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck),
{noreply, Session#session{awaiting_ack = AwaitingAck1}};
error ->
lager:error([{client, ClientId}], "Session ~s "
"Cannot find Awaiting Ack:~p", [ClientId, PktId]),
% TODO: too many logs when overloaded...
% lager:error([{client, ClientId}], "Session ~s "
% "Cannot find Awaiting Ack:~p", [ClientId, PktId]),
{noreply, Session}
end;

View File

@ -44,12 +44,15 @@
-export([register_session/3, unregister_session/2]).
-behaviour(gen_server).
-behaviour(gen_server2).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% gen_server2 priorities
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
-record(state, {id, statsfun}).
-define(SM_POOL, sm_pool).
@ -82,7 +85,7 @@ mnesia(copy) ->
Id :: pos_integer(),
StatsFun :: fun().
start_link(Id, StatsFun) ->
gen_server:start_link(?MODULE, [Id, StatsFun], []).
gen_server2:start_link(?MODULE, [Id, StatsFun], []).
%%------------------------------------------------------------------------------
%% @doc Pool name.
@ -123,7 +126,7 @@ register_session(true, ClientId, Info) ->
register_session(false, ClientId, Info) ->
SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server:cast(SM, {register, ClientId, Info}).
gen_server2:cast(SM, {register, ClientId, Info}).
%%------------------------------------------------------------------------------
%% @doc Unregister a session.
@ -136,9 +139,9 @@ unregister_session(true, ClientId) ->
ets:delete(mqtt_transient_session, ClientId);
unregister_session(false, ClientId) ->
SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server:cast(SM, {unregister, ClientId}).
gen_server2:cast(SM, {unregister, ClientId}).
call(SM, Req) -> gen_server:call(SM, Req, infinity).
call(SM, Req) -> gen_server2:call(SM, Req, infinity).
%%%=============================================================================
%%% gen_server callbacks
@ -148,6 +151,15 @@ init([Id, StatsFun]) ->
gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}),
{ok, #state{id = Id, statsfun = StatsFun}}.
prioritise_call(_Msg, _From, _Len, _State) ->
1.
prioritise_cast(_Msg, _Len, _State) ->
0.
prioritise_info(_Msg, _Len, _State) ->
1.
%% persistent session
handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
case lookup_session(ClientId) of
@ -194,7 +206,8 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) ->
end),
{noreply, setstats(State)};
handle_info(_Info, State) ->
handle_info(Info, State) ->
lager:critical("Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{id = Id}) ->

View File

@ -28,5 +28,5 @@
-author("Feng Lee <feng@emqtt.io>").
%% TODO:... 0.10.0...
%% TODO:... 0.11.0...

View File

@ -633,8 +633,13 @@ extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
%%% The MAIN loop.
%%% ---------------------------------------------------
loop(GS2State = #gs2_state { time = hibernate,
timeout_state = undefined }) ->
pre_hibernate(GS2State);
timeout_state = undefined,
queue = Queue }) ->
case priority_queue:is_empty(Queue) of
true -> pre_hibernate(GS2State);
false -> process_next_msg(GS2State)
end;
loop(GS2State) ->
process_next_msg(drain(GS2State)).

View File

@ -52,7 +52,11 @@ match_test() ->
?assert( match(<<"sport">>, <<"sport/#">>) ),
?assert( match(<<"sport">>, <<"#">>) ),
?assert( match(<<"/sport/football/score/1">>, <<"#">>) ).
?assert( match(<<"/sport/football/score/1">>, <<"#">>) ),
%% paho test
?assert( match(<<"Topic/C">>, <<"+/+">>) ),
?assert( match(<<"TopicA/B">>, <<"+/+">>) ),
?assert( match(<<"TopicA/C">>, <<"+/+">>) ).
sigle_level_match_test() ->
?assert( match(<<"sport/tennis/player1">>, <<"sport/tennis/+">>) ),