diff --git a/CHANGELOG.md b/CHANGELOG.md index 49e990681..9b33d3c4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,16 @@ WebSocket Presence Management.... +0.6.2-alpha (2015-04-24) +------------------------- + +Bugfix: critical issue #54, #104, #106 - error when resume session + +Improve: add emqttd_cm_sup module, and use 'hash' gproc_pool to register/unregister client ids + +Improve: kick old client out when session is duplicated. + +Improve: move mnesia dir config from etc/app.config to etc/vm.args 0.6.1-alpha (2015-04-20) diff --git a/apps/emqtt/src/emqtt.app.src b/apps/emqtt/src/emqtt.app.src index 61e5bbaef..ac2d2417a 100644 --- a/apps/emqtt/src/emqtt.app.src +++ b/apps/emqtt/src/emqtt.app.src @@ -1,7 +1,7 @@ {application, emqtt, [ - {description, "Erlang Common MQTT Library"}, - {vsn, "0.6.1"}, + {description, "Erlang MQTT Common Library"}, + {vsn, "0.7.0"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 28b430f93..63fc9227b 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -77,7 +77,7 @@ start_servers(Sup) -> {"emqttd event", emqttd_event}, {"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, - {"emqttd client manager", emqttd_cm}, + {"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd session manager", emqttd_sm}, {"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts}, diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 5269138c8..809e88a12 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -33,7 +33,7 @@ -define(SERVER, ?MODULE). %% API Exports --export([start_link/0]). +-export([start_link/2]). -export([lookup/1, register/1, unregister/1]). @@ -43,7 +43,7 @@ -record(state, {tab, statsfun}). --define(CLIENT_TAB, mqtt_client). +-define(POOL, cm). %%%============================================================================= %%% API @@ -53,9 +53,11 @@ %% @doc Start client manager %% @end %%------------------------------------------------------------------------------ --spec start_link() -> {ok, pid()} | ignore | {error, any()}. -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec start_link(Id, TabId) -> {ok, pid()} | ignore | {error, any()} when + Id :: pos_integer(), + TabId :: ets:tid(). +start_link(Id, TabId) -> + gen_server:start_link(?MODULE, [Id, TabId], []). %%------------------------------------------------------------------------------ %% @doc Lookup client pid with clientId @@ -63,7 +65,7 @@ start_link() -> %%------------------------------------------------------------------------------ -spec lookup(ClientId :: binary()) -> pid() | undefined. lookup(ClientId) when is_binary(ClientId) -> - case ets:lookup(?CLIENT_TAB, ClientId) of + case ets:lookup(emqttd_cm_sup:table(), ClientId) of [{_, Pid, _}] -> Pid; [] -> undefined end. @@ -74,12 +76,8 @@ lookup(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec register(ClientId :: binary()) -> ok. register(ClientId) when is_binary(ClientId) -> - Pid = self(), - %% this is atomic - case ets:insert_new(?CLIENT_TAB, {ClientId, Pid, undefined}) of - true -> gen_server:cast(?SERVER, {monitor, ClientId, Pid}); - false -> gen_server:cast(?SERVER, {register, ClientId, Pid}) - end. + CmPid = gproc_pool:pick_worker(?POOL, ClientId), + gen_server:call(CmPid, {register, ClientId, self()}, infinity). %%------------------------------------------------------------------------------ %% @doc Unregister clientId with pid. @@ -87,45 +85,42 @@ register(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec unregister(ClientId :: binary()) -> ok. unregister(ClientId) when is_binary(ClientId) -> - gen_server:cast(?SERVER, {unregister, ClientId, self()}). + CmPid = gproc_pool:pick_worker(?POOL, ClientId), + gen_server:cast(CmPid, {unregister, ClientId, self()}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([]) -> - TabId = ets:new(?CLIENT_TAB, [set, - named_table, - public, - {write_concurrency, true}]), +init([Id, TabId]) -> + gproc_pool:connect_worker(?POOL, {?MODULE, Id}), StatsFun = emqttd_broker:statsfun('clients/count', 'clients/max'), {ok, #state{tab = TabId, statsfun = StatsFun}}. +handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> + case ets:lookup(Tab, ClientId) of + [{_, Pid, _}] -> + lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), + ignore; + [{_, OldPid, MRef}] -> + lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), + OldPid ! {stop, duplicate_id, Pid}, + erlang:demonitor(MRef), + ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}); + [] -> + ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}) + end, + {reply, ok, State}; + handle_call(Req, _From, State) -> lager:error("unexpected request: ~p", [Req]), {reply, {error, badreq}, State}. -handle_cast({register, ClientId, Pid}, State=#state{tab = Tab}) -> - case registerd(Tab, {ClientId, Pid}) of - true -> - ignore; - false -> - ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}) - end, - {noreply, setstats(State)}; - -handle_cast({monitor, ClientId, Pid}, State = #state{tab = Tab}) -> - case ets:update_element(Tab, ClientId, {3, erlang:monitor(process, Pid)}) of - true -> ok; - false -> lager:error("failed to monitor clientId '~s' with pid ~p", [ClientId, Pid]) - end, - {noreply, setstats(State)}; - -handle_cast({unregister, ClientId, Pid}, State) -> - case ets:lookup(?CLIENT_TAB, ClientId) of +handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) -> + case ets:lookup(TabId, ClientId) of [{_, Pid, MRef}] -> erlang:demonitor(MRef, [flush]), - ets:delete(?CLIENT_TAB, ClientId); + ets:delete(TabId, ClientId); [_] -> ignore; [] -> @@ -136,8 +131,8 @@ handle_cast({unregister, ClientId, Pid}, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> - ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}), +handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = TabId}) -> + ets:match_delete(TabId, {'_', DownPid, MRef}), {noreply, setstats(State)}; handle_info(_Info, State) -> @@ -152,21 +147,8 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= %%% Internal functions %%%============================================================================= -registerd(Tab, {ClientId, Pid}) -> - case ets:lookup(Tab, ClientId) of - [{_, Pid, _}] -> - lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), - true; - [{_, OldPid, MRef}] -> - lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), - OldPid ! {stop, duplicate_id, Pid}, - erlang:demonitor(MRef), - false; - [] -> - false - end. -setstats(State = #state{statsfun = StatsFun}) -> - StatsFun(ets:info(?CLIENT_TAB, size)), State. +setstats(State = #state{tab = TabId, statsfun = StatsFun}) -> + StatsFun(ets:info(TabId, size)), State. diff --git a/apps/emqttd/src/emqttd_cm_sup.erl b/apps/emqttd/src/emqttd_cm_sup.erl new file mode 100644 index 000000000..853349b55 --- /dev/null +++ b/apps/emqttd/src/emqttd_cm_sup.erl @@ -0,0 +1,62 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd client manager supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_cm_sup). + +-author('feng@emqtt.io'). + +-include("emqttd.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/0, table/0]). + +%% Supervisor callbacks +-export([init/1]). + +-define(CLIENT_TAB, mqtt_client). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +table() -> ?CLIENT_TAB. + +init([]) -> + TabId = ets:new(?CLIENT_TAB, [set, named_table, public, + {write_concurrency, true}]), + Schedulers = erlang:system_info(schedulers), + gproc_pool:new(cm, hash, [{size, Schedulers}]), + Children = lists:map( + fun(I) -> + Name = {emqttd_cm, I}, + gproc_pool:add_worker(cm, Name, I), + {Name, {emqttd_cm, start_link, [I, TabId]}, + permanent, 10000, worker, [emqttd_cm]} + end, lists:seq(1, Schedulers)), + {ok, {{one_for_all, 10, 100}, Children}}. + + diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index affe1ae33..79969daa1 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -243,6 +243,10 @@ initial_state(ClientId, ClientPid) -> State = initial_state(ClientId), State#session_state{client_pid = ClientPid}. +%%------------------------------------------------------------------------------ +%% @doc Start a session process. +%% @end +%%------------------------------------------------------------------------------ start_link(SessOpts, ClientId, ClientPid) -> gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []). @@ -270,18 +274,33 @@ handle_call({unsubscribe, Topics}, _From, State) -> {reply, ok, NewState}; handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. + lager:error("Unexpected request: ~p", [Req]), + {reply, error, State}. handle_cast({resume, ClientId, ClientPid}, State = #session_state{ clientid = ClientId, - client_pid = undefined, + client_pid = OldClientPid, msg_queue = Queue, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, expire_timer = ETimer}) -> + lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]), - %cancel timeout timer - erlang:cancel_timer(ETimer), + + %% kick old client... + if + OldClientPid =:= undefined -> + ok; + OldClientPid =:= ClientPid -> + ok; + true -> + lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]), + unlink(OldClientPid), + OldClientPid ! {stop, duplicate_id, ClientPid} + end, + + %% cancel timeout timer + emqttd_utils:cancel_timer(ETimer), %% redelivery PUBREL lists:foreach(fun(PacketId) -> @@ -328,7 +347,8 @@ handle_cast({destroy, ClientId}, State = #session_state{clientid = ClientId}) -> {stop, normal, State}; handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. + lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) -> F = fun(Message, S) -> dispatch(Message, S) end, @@ -338,18 +358,21 @@ handle_info({dispatch, {_From, Message}}, State) -> {noreply, dispatch(Message, State)}; handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = ClientId, - client_pid = ClientPid, - expires = Expires}) -> - lager:warning("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]), - Timer = erlang:send_after(Expires * 1000, self(), session_expired), - {noreply, State#session_state{client_pid = undefined, expire_timer = Timer}}; + client_pid = ClientPid}) -> + lager:error("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]), + {noreply, start_expire_timer(State#session_state{client_pid = undefined})}; + +handle_info({'EXIT', ClientPid0, _Reason}, State = #session_state{client_pid = ClientPid}) -> + lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]), + {noreply, State}; handle_info(session_expired, State = #session_state{clientid = ClientId}) -> lager:warning("Session ~s expired!", [ClientId]), {stop, {shutdown, expired}, State}; handle_info(Info, State) -> - {stop, {badinfo, Info}, State}. + lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]), + {noreply, State}. terminate(_Reason, _State) -> ok. @@ -384,4 +407,9 @@ next_msg_id(State = #session_state{message_id = 16#ffff}) -> next_msg_id(State = #session_state{message_id = MsgId}) -> State#session_state{message_id = MsgId + 1}. +start_expire_timer(State = #session_state{expires = Expires, + expire_timer = OldTimer}) -> + emqttd_utils:cancel_timer(OldTimer), + Timer = erlang:send_after(Expires * 1000, self(), session_expired), + State#session_state{expire_timer = Timer}. diff --git a/apps/emqttd/src/emqttd_session_sup.erl b/apps/emqttd/src/emqttd_session_sup.erl index b1c89c879..e0f82f083 100644 --- a/apps/emqttd/src/emqttd_session_sup.erl +++ b/apps/emqttd/src/emqttd_session_sup.erl @@ -49,7 +49,7 @@ start_session(ClientId, ClientPid) -> %%%============================================================================= init([SessOpts]) -> - {ok, {{simple_one_for_one, 0, 1}, + {ok, {{simple_one_for_one, 10, 10}, [{session, {emqttd_session, start_link, [SessOpts]}, transient, 10000, worker, [emqttd_session]}]}}. diff --git a/apps/emqttd/src/emqttd_utils.erl b/apps/emqttd/src/emqttd_utils.erl index 112b6171a..d9a76380a 100644 --- a/apps/emqttd/src/emqttd_utils.erl +++ b/apps/emqttd/src/emqttd_utils.erl @@ -29,7 +29,8 @@ -author("Feng Lee "). -export([apply_module_attributes/1, - all_module_attributes/1]). + all_module_attributes/1, + cancel_timer/1]). %% only {F, Args}... apply_module_attributes(Name) -> @@ -78,3 +79,8 @@ ignore_lib_apps(Apps) -> hipe, esockd, mochiweb], [App || App = {Name, _, _} <- Apps, not lists:member(Name, LibApps)]. + +cancel_timer(undefined) -> + undefined; +cancel_timer(Ref) -> + catch erlang:cancel_timer(Ref). diff --git a/doc/rfc6455.pdf b/doc/rfc6455.pdf new file mode 100644 index 000000000..74625160e Binary files /dev/null and b/doc/rfc6455.pdf differ diff --git a/rebar.config b/rebar.config index 91420f44e..acbdaf378 100644 --- a/rebar.config +++ b/rebar.config @@ -24,7 +24,7 @@ "plugins/emqttd_plugin_demo"]}. {deps, [ - {gproc, "0.3.*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}}, + {gproc, "0.4.*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}}, {lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}}, {esockd, "2.*", {git, "git://github.com/emqtt/esockd.git", {branch, "master"}}}, {mochiweb, ".*", {git, "git://github.com/slimpp/mochiweb.git", {branch, "master"}}} diff --git a/rel/files/app.config b/rel/files/app.config index 33c2b5868..269dc5078 100644 --- a/rel/files/app.config +++ b/rel/files/app.config @@ -7,9 +7,6 @@ {sasl, [ {sasl_error_logger, {file, "log/emqttd_sasl.log"}} ]}, - {mnesia, [ - {dir, "data/mnesia"} - ]}, {ssl, [ %{versions, ['tlsv1.2', 'tlsv1.1']} ]}, @@ -21,7 +18,7 @@ {handlers, [ {lager_console_backend, info}, {lager_file_backend, [ - {formatter_config, [time, " [",severity,"] ", message, "\n"]}, + {formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]}, {file, "log/emqttd_info.log"}, {level, info}, {size, 104857600}, @@ -29,7 +26,7 @@ {count, 30} ]}, {lager_file_backend, [ - {formatter_config, [time, " [",severity,"] ", message, "\n"]}, + {formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]}, {file, "log/emqttd_error.log"}, {level, error}, {size, 104857600}, diff --git a/rel/files/vm.args b/rel/files/vm.args index 4ff3744f2..3e9784935 100644 --- a/rel/files/vm.args +++ b/rel/files/vm.args @@ -1,9 +1,20 @@ +##------------------------------------------------------------------------- ## Name of the node +##------------------------------------------------------------------------- -name emqttd@127.0.0.1 ## Cookie for distributed erlang -setcookie emqttdsecretcookie +##------------------------------------------------------------------------- +## Mnesia dir. NOTICE: quote the dir with '" "' +##------------------------------------------------------------------------- +-mnesia dir '"data/mnesia"' + +##------------------------------------------------------------------------- +## Flags +##------------------------------------------------------------------------- + ## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive ## (Disabled by default..use with caution!) ##-heart @@ -16,6 +27,10 @@ ## max process numbers +P 1000000 +##------------------------------------------------------------------------- +## Env +##------------------------------------------------------------------------- + ## Increase number of concurrent ports/sockets -env ERL_MAX_PORTS 4096 diff --git a/scripts/mosquitto_test b/scripts/mosquitto_test deleted file mode 100755 index d5a504134..000000000 --- a/scripts/mosquitto_test +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/sh -# -*- tab-width:4;indent-tabs-mode:nil -*- -# ex: ts=4 sw=4 et - -# slimple publish -mosquitto_pub -t xxx/yyy -m hello -if [ "$?" == 0 ]; then - echo "[Success]: slimple publish" -else - echo "[Failure]: slimple publish" -fi - -# publish will willmsg -mosquitto_pub -q 1 -t a/b/c -m hahah -u test -P public --will-topic /will --will-payload willmsg --will-qos 1 -if [ "$?" == 0 ]; then - echo "[Success]: publish with willmsg" -else - echo "[Failure]: publish with willmsg" -fi