From 698d096caeb396ed1a89de5328c569125bd44c99 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 23 Oct 2015 17:54:31 +0800 Subject: [PATCH 01/11] +zdbbl --- rel/files/vm.args | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rel/files/vm.args b/rel/files/vm.args index ef3307fed..8e37830dc 100644 --- a/rel/files/vm.args +++ b/rel/files/vm.args @@ -28,6 +28,10 @@ ## max atom number ## +t +## Set the distribution buffer busy limit (dist_buf_busy_limit) in kilobytes. +## Valid range is 1-2097151. Default is 1024. +## +zdbbl 8192 + ##------------------------------------------------------------------------- ## Env ##------------------------------------------------------------------------- From 676aa3cb7128d54bacf21f5b4f49bff9d0602df1 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 23 Oct 2015 18:05:08 +0800 Subject: [PATCH 02/11] order of sockopts --- rel/files/emqttd.config.development | 4 ++-- rel/files/emqttd.config.production | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index 4989110f9..fff25aed9 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -187,11 +187,11 @@ {access, [{allow, all}]}, %% Socket Options {sockopts, [ - {backlog, 512} %Set buffer if hight thoughtput %{recbuf, 4096}, - %{sndbuf, 4096} + %{sndbuf, 4096}, %{buffer, 4096}, + {backlog, 512} ]} ]}, {mqtts, 8883, [ diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index 53b1ea3a0..2f70b61f3 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -179,11 +179,11 @@ {access, [{allow, all}]}, %% Socket Options {sockopts, [ - {backlog, 512} %Set buffer if hight thoughtput %{recbuf, 4096}, - %{sndbuf, 4096} + %{sndbuf, 4096}, %{buffer, 4096}, + {backlog, 512} ]} ]}, {mqtts, 8883, [ From 59ca283eb020a05d10e2422cde4bf798663bfc84 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 23 Oct 2015 18:06:35 +0800 Subject: [PATCH 03/11] gen_server --- src/emqttd_ws_client.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 3d06e8432..4d72ce7bb 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -115,7 +115,7 @@ reset_parser(State = #wsocket_state{packet_opts = PktOpts}) -> State#wsocket_state{parser = emqttd_parser:new(PktOpts)}. %%%============================================================================= -%%% gen_fsm callbacks +%%% gen_server callbacks %%%============================================================================= init([WsPid, Req, ReplyChannel, PktOpts]) -> From 09047287c4230efe7e85db647e188f339c141cb6 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 25 Oct 2015 14:56:26 +0800 Subject: [PATCH 04/11] retained test --- test/emqttd_retained_tests.erl | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 test/emqttd_retained_tests.erl diff --git a/test/emqttd_retained_tests.erl b/test/emqttd_retained_tests.erl new file mode 100644 index 000000000..b541a0731 --- /dev/null +++ b/test/emqttd_retained_tests.erl @@ -0,0 +1,14 @@ +-module(emqttd_retained_tests). + +-include("emqttd.hrl"). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +retain_test() -> + mnesia:start(), + emqttd_retained:mnesia(boot), + mnesia:stop(). + +-endif. From 9a590b2f39f2bd33c97b9a669fe645d20c789d93 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 26 Oct 2015 09:19:32 +0800 Subject: [PATCH 05/11] rate limit --- rel/files/emqttd.config.development | 2 ++ rel/files/emqttd.config.production | 2 ++ 2 files changed, 4 insertions(+) diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index fff25aed9..6ec4754c6 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -185,6 +185,8 @@ {max_clients, 512}, %% Socket Access Control {access, [{allow, all}]}, + %% Rate Limit. Format is 'burst, rate', Unit is KB/Sec + %% {rate_limit, "100,10"}, %% 100K burst, 10K rate %% Socket Options {sockopts, [ %Set buffer if hight thoughtput diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index 2f70b61f3..e3685518f 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -175,6 +175,8 @@ {acceptors, 16}, %% Maximum number of concurrent clients {max_clients, 8192}, + %% Rate Limit. Format is 'burst, rate', Unit is KB/Sec. + %% {rate_limit, "10,1"}, %% 10K burst, 1K rate %% Socket Access Control {access, [{allow, all}]}, %% Socket Options From fb9f1bf8e4fcbe8d0954008f352f797011d60eec Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 26 Oct 2015 09:20:13 +0800 Subject: [PATCH 06/11] rate_limit --- src/emqttd.erl | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/emqttd.erl b/src/emqttd.erl index eeb5bab4d..09661d6c8 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -91,7 +91,8 @@ open_listener({https, Port, Options}) -> mochiweb:start_http(Port, Options, MFArgs). open_listener(Protocol, Port, Options) -> - MFArgs = {emqttd_client, start_link, [env(mqtt)]}, + Rl = rate_limiter(emqttd_opts:g(rate_limit, Options)), + MFArgs = {emqttd_client, start_link, [[{rate_limiter, Rl} | env(mqtt)]]}, esockd:open(Protocol, Port, merge_sockopts(Options) , MFArgs). merge_sockopts(Options) -> @@ -99,6 +100,14 @@ merge_sockopts(Options) -> proplists:get_value(sockopts, Options, [])), emqttd_opts:merge(Options, [{sockopts, SockOpts}]). +%% TODO: will refactor in 0.14.0 release. +rate_limiter(undefined) -> + undefined; +rate_limiter(Config) -> + Bps = fun(S) -> list_to_integer(string:strip(S)) * 1024 end, + [Burst, Rate] = [Bps(S) || S <- string:tokens(Config, ",")], + esockd_rate_limiter:new(Burst, Rate). + %%------------------------------------------------------------------------------ %% @doc Close Listeners %% @end From 2af91ea1401574f1de099ffe0d7c35f4c5da8b77 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 26 Oct 2015 09:21:03 +0800 Subject: [PATCH 07/11] link with client --- src/emqttd_session.erl | 44 ++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index d2163d3f4..f33e72521 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -81,9 +81,6 @@ %% Client Pid bind with session client_pid :: pid(), - %% Client Monitor - client_mon :: reference(), - %% Last packet id of the session packet_id = 1, @@ -224,7 +221,8 @@ unsubscribe(SessPid, Topics) -> %%%============================================================================= init([CleanSess, ClientId, ClientPid]) -> - %% process_flag(trap_exit, true), + process_flag(trap_exit, true), + true = link(ClientPid), QEnv = emqttd:env(mqtt, queue), SessEnv = emqttd:env(mqtt, session), Session = #session{ @@ -245,10 +243,8 @@ init([CleanSess, ClientId, ClientPid]) -> collect_interval = emqttd_opts:g(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, emqttd_sm:register_session(CleanSess, ClientId, info(Session)), - %% monitor client - MRef = erlang:monitor(process, ClientPid), %% start statistics - {ok, start_collector(Session#session{client_mon = MRef}), hibernate}. + {ok, start_collector(Session), hibernate}. prioritise_call(Msg, _From, _Len, _State) -> case Msg of _ -> 0 end. @@ -268,7 +264,6 @@ prioritise_cast(Msg, _Len, _State) -> prioritise_info(Msg, _Len, _State) -> case Msg of - {'DOWN', _, _, _, _} -> 10; {'EXIT', _, _} -> 10; session_expired -> 10; {timeout, _, _} -> 5; @@ -368,7 +363,6 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> #session{client_id = ClientId, client_pid = OldClientPid, - client_mon = MRef, inflight_queue = InflightQ, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, @@ -388,10 +382,12 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> true -> lager:error([{client, ClientId}], "Session(~s): ~p kickout ~p", [ClientId, ClientPid, OldClientPid]), - OldClientPid ! {stop, duplicate_id, ClientPid}, - erlang:demonitor(MRef, [flush]) + unlink(OldClientPid), + OldClientPid ! {stop, duplicate_id, ClientPid} end, + true = link(ClientPid), + %% Redeliver PUBREL [ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)], @@ -402,7 +398,6 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)], Session1 = Session#session{client_pid = ClientPid, - client_mon = erlang:monitor(process, ClientPid), awaiting_ack = #{}, awaiting_comp = #{}, expired_timer = undefined}, @@ -548,21 +543,24 @@ handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = emqttd_sm:register_session(CleanSess, ClientId, info(Session)), {noreply, start_collector(Session), hibernate}; -handle_info({'DOWN', _MRef, process, ClientPid, _}, Session = #session{clean_sess = true, - client_pid = ClientPid}) -> +handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, + client_pid = ClientPid}) -> {stop, normal, Session}; -handle_info({'DOWN', _MRef, process, ClientPid, _}, Session = #session{clean_sess = false, - client_pid = ClientPid, - expired_after = Expires}) -> +handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, + client_id = ClientId, + client_pid = ClientPid, + expired_after = Expires}) -> + lager:info("Session(~s): unlink with client ~p: reason=~p", + [ClientId, ClientPid, Reason]), TRef = timer(Expires, session_expired), - noreply(Session#session{client_pid = undefined, client_mon = undefined, expired_timer = TRef}); + noreply(Session#session{client_pid = undefined, expired_timer = TRef}); -handle_info({'DOWN', _MRef, process, Pid, Reason}, Session = #session{client_id = ClientId, - client_pid = ClientPid}) -> - lager:error([{client, ClientId}], "Session(~s): unexpected DOWN: " - "client_pid=~p, down_pid=~p, reason=~p", - [ClientId, ClientPid, Pid, Reason]), +handle_info({'EXIT', Pid, Reason}, Session = #session{client_id = ClientId, + client_pid = ClientPid}) -> + + lager:error("Session(~s): Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", + [ClientId, ClientPid, Pid, Reason]), noreply(Session); handle_info(session_expired, Session = #session{client_id = ClientId}) -> From f58f42196abdc61edc67e38c7a223873fba51d9f Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 26 Oct 2015 09:21:24 +0800 Subject: [PATCH 08/11] 0.12.4 --- src/emqttd.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd.app.src b/src/emqttd.app.src index c35f7465d..7591c61d5 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {id, "emqttd"}, - {vsn, "0.12.3"}, + {vsn, "0.12.4"}, {description, "Erlang MQTT Broker"}, {modules, []}, {registered, []}, From 976c7653f3ed30ff50b0151f6e78a0130c53ae86 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 26 Oct 2015 09:25:40 +0800 Subject: [PATCH 09/11] use esockd_rate_limiter --- src/emqttd_throttle.erl | 32 -------------------------------- 1 file changed, 32 deletions(-) delete mode 100644 src/emqttd_throttle.erl diff --git a/src/emqttd_throttle.erl b/src/emqttd_throttle.erl deleted file mode 100644 index 256ae27d4..000000000 --- a/src/emqttd_throttle.erl +++ /dev/null @@ -1,32 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd client throttle. -%%% -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_throttle). - --author("Feng Lee "). - -%% TODO:... 0.11.0... - From 47710c36aa406b95171d593f98b11fbc83adfa38 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 26 Oct 2015 09:25:57 +0800 Subject: [PATCH 10/11] port_command --- src/emqttd_client.erl | 126 +++++++++++++++++++++++++----------------- 1 file changed, 75 insertions(+), 51 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index de7c10766..b8909ff32 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% MQTT Client +%%% MQTT Client Connection. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -52,7 +52,7 @@ conn_name, await_recv, conn_state, - conserve, + rate_limiter, parser, proto_state, packet_opts, @@ -85,22 +85,26 @@ unsubscribe(CPid, Topics) -> init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) -> % Transform if ssl. - {ok, NewSock} = esockd_connection:accept(SockArgs), + {ok, NewSock} = esockd_connection:accept(SockArgs), + %%TODO:... + {ok, BufSizes} = inet:getopts(Sock, [sndbuf, recbuf, buffer]), + io:format("~p~n", [BufSizes]), {ok, Peername} = emqttd_net:peername(Sock), - {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), - SendFun = fun(Data) -> Transport:send(NewSock, Data) end, - PktOpts = proplists:get_value(packet, MqttEnv), + {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), + SendFun = send_fun(Transport, NewSock), + PktOpts = proplists:get_value(packet, MqttEnv), ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts), - State = control_throttle(#state{transport = Transport, - socket = NewSock, - peername = Peername, - conn_name = ConnStr, - await_recv = false, - conn_state = running, - conserve = false, - packet_opts = PktOpts, - parser = emqttd_parser:new(PktOpts), - proto_state = ProtoState}), + Limiter = proplists:get_value(rate_limiter, MqttEnv), + State = run_socket(#state{transport = Transport, + socket = NewSock, + peername = Peername, + conn_name = ConnStr, + await_recv = false, + conn_state = running, + rate_limiter = Limiter, + packet_opts = PktOpts, + parser = emqttd_parser:new(PktOpts), + proto_state = ProtoState}), ClientOpts = proplists:get_value(client, MqttEnv), IdleTimout = proplists:get_value(idle_timeout, ClientOpts, 10), gen_server:enter_loop(?MODULE, [], State, timer:seconds(IdleTimout)). @@ -146,20 +150,26 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = Prot {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), noreply(State#state{proto_state = ProtoState1}); -handle_info({inet_reply, _Ref, ok}, State) -> - noreply(State); +handle_info(activate_sock, State) -> + noreply(run_socket(State#state{conn_state = running})); handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) -> + Size = size(Data), lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]), - emqttd_metrics:inc('bytes/received', size(Data)), - received(Data, control_throttle(State #state{await_recv = false})); + emqttd_metrics:inc('bytes/received', Size), + received(Data, rate_limit(Size, State#state{await_recv = false})); handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> + %%TODO: ... network_error(Reason, State); +handle_info({inet_reply, _Ref, ok}, State) -> + %%TODO: ok... + io:format("inet_reply ok~n"), + noreply(State); + handle_info({inet_reply, _Sock, {error, Reason}}, State) -> - ?ERROR("Unexpected inet_reply - ~p", [Reason], State), - {noreply, State}; + network_error(Reason, State); handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) -> ?DEBUG("Start KeepAlive with ~p seconds", [TimeoutSec], State), @@ -174,14 +184,14 @@ handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of - {ok, KeepAlive1} -> - noreply(State#state{keepalive = KeepAlive1}); - {error, timeout} -> - ?DEBUG("Keepalive Timeout!", [], State), - stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); - {error, Error} -> - ?DEBUG("Keepalive Error - ~p", [Error], State), - stop({shutdown, keepalive_error}, State#state{keepalive = undefined}) + {ok, KeepAlive1} -> + noreply(State#state{keepalive = KeepAlive1}); + {error, timeout} -> + ?DEBUG("Keepalive Timeout!", [], State), + stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); + {error, Error} -> + ?DEBUG("Keepalive Error - ~p", [Error], State), + stop({shutdown, keepalive_error}, State#state{keepalive = undefined}) end; handle_info(Info, State) -> @@ -223,27 +233,27 @@ with_session(Fun, State = #state{proto_state = ProtoState}) -> %% receive and parse tcp data received(<<>>, State) -> - {noreply, State, hibernate}; + noreply(State); -received(Bytes, State = #state{packet_opts = PacketOpts, - parser = Parser, +received(Bytes, State = #state{parser = Parser, + packet_opts = PacketOpts, proto_state = ProtoState}) -> case catch Parser(Bytes) of {more, NewParser} -> - noreply(control_throttle(State#state{parser = NewParser})); + noreply(run_socket(State#state{parser = NewParser})); {ok, Packet, Rest} -> emqttd_metrics:received(Packet), case emqttd_protocol:received(Packet, ProtoState) of - {ok, ProtoState1} -> - received(Rest, State#state{parser = emqttd_parser:new(PacketOpts), - proto_state = ProtoState1}); - {error, Error} -> - ?ERROR("Protocol error - ~p", [Error], State), - stop({shutdown, Error}, State); - {error, Error, ProtoState1} -> - stop({shutdown, Error}, State#state{proto_state = ProtoState1}); - {stop, Reason, ProtoState1} -> - stop(Reason, State#state{proto_state = ProtoState1}) + {ok, ProtoState1} -> + received(Rest, State#state{parser = emqttd_parser:new(PacketOpts), + proto_state = ProtoState1}); + {error, Error} -> + ?ERROR("Protocol error - ~p", [Error], State), + stop({shutdown, Error}, State); + {error, Error, ProtoState1} -> + stop({shutdown, Error}, State#state{proto_state = ProtoState1}); + {stop, Reason, ProtoState1} -> + stop(Reason, State#state{proto_state = ProtoState1}) end; {error, Error} -> ?ERROR("Framing error - ~p", [Error], State), @@ -258,6 +268,20 @@ network_error(Reason, State = #state{peername = Peername}) -> [emqttd_net:format(Peername), Reason]), stop({shutdown, conn_closed}, State). +rate_limit(_Size, State = #state{rate_limiter = undefined}) -> + run_socket(State); +rate_limit(Size, State = #state{socket = Sock, rate_limiter = Limiter}) -> + {ok, BufSizes} = inet:getopts(Sock, [sndbuf, recbuf, buffer]), + io:format("~p~n", [BufSizes]), + case esockd_rate_limiter:check(Limiter, Size) of + {0, Limiter1} -> + run_socket(State#state{conn_state = running, rate_limiter = Limiter1}); + {Pause, Limiter1} -> + ?ERROR("~p Received, Rate Limiter Pause for ~w", [Size, Pause], State), + erlang:send_after(Pause, self(), activate_sock), + State#state{conn_state = blocked, rate_limiter = Limiter1} + end. + run_socket(State = #state{conn_state = blocked}) -> State; run_socket(State = #state{await_recv = true}) -> @@ -266,11 +290,11 @@ run_socket(State = #state{transport = Transport, socket = Sock}) -> Transport:async_recv(Sock, 0, infinity), State#state{await_recv = true}. -control_throttle(State = #state{conn_state = Flow, - conserve = Conserve}) -> - case {Flow, Conserve} of - {running, true} -> State #state{conn_state = blocked}; - {blocked, false} -> run_socket(State #state{conn_state = running}); - {_, _} -> run_socket(State) +send_fun(Transport, Sock) -> + fun(Data) -> + try Transport:port_command(Sock, Data) of + true -> ok + catch + error:Error -> exit({socket_error, Error}) + end end. - From 565c8abb3a9f85c235adf96ab9907e4c3358ec34 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 26 Oct 2015 16:30:39 +0800 Subject: [PATCH 11/11] improve suback --- src/emqttd_client.erl | 17 +++++++++++------ src/emqttd_protocol.erl | 5 +---- src/emqttd_session.erl | 12 ++++++++---- src/emqttd_ws_client.erl | 5 +++++ 4 files changed, 25 insertions(+), 14 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index b8909ff32..247fe7500 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -135,12 +135,11 @@ handle_cast(Msg, State) -> handle_info(timeout, State) -> stop({shutdown, timeout}, State); - -handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState, - conn_name = ConnName}) -> - lager:warning("Shutdown for duplicate clientid: ~s, conn:~s", - [emqttd_protocol:clientid(ProtoState), ConnName]), - stop({shutdown, duplicate_id}, State); + +%% Asynchronous SUBACK +handle_info({suback, PacketId, GrantedQos}, State = #state{proto_state = ProtoState}) -> + {ok, ProtoState1} = emqttd_protocol:send(?SUBACK_PACKET(PacketId, GrantedQos), ProtoState), + noreply(State#state{proto_state = ProtoState1}); handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), @@ -150,6 +149,12 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = Prot {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), noreply(State#state{proto_state = ProtoState1}); +handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState, + conn_name = ConnName}) -> + lager:warning("Shutdown for duplicate clientid: ~s, conn:~s", + [emqttd_protocol:clientid(ProtoState), ConnName]), + stop({shutdown, duplicate_id}, State); + handle_info(activate_sock, State) -> noreply(run_socket(State#state{conn_state = running})); diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index ce85ddab3..b4371f525 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -240,10 +240,7 @@ process(?SUBSCRIBE_PACKET(PacketId, TopicTable), lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State); false -> - AckFun = fun(GrantedQos) -> - send(?SUBACK_PACKET(PacketId, GrantedQos), State) - end, - emqttd_session:subscribe(Session, TopicTable, AckFun), {ok, State} + emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State} end; %% protect from empty topic list diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index f33e72521..12583fb5d 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -167,8 +167,12 @@ destroy(SessPid, ClientId) -> subscribe(SessPid, TopicTable) -> subscribe(SessPid, TopicTable, fun(_) -> ok end). --spec subscribe(pid(), [{binary(), mqtt_qos()}], AckFun :: fun()) -> ok. -subscribe(SessPid, TopicTable, AckFun) -> +-spec subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok. +subscribe(SessPid, PacketId, TopicTable) -> + From = self(), + AckFun = fun(GrantedQos) -> + From ! {suback, PacketId, GrantedQos} + end, gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}). %%------------------------------------------------------------------------------ @@ -298,13 +302,13 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{ case TopicTable -- Subscriptions of [] -> - catch AckFun([Qos || {_, Qos} <- TopicTable]), + AckFun([Qos || {_, Qos} <- TopicTable]), noreply(Session); _ -> %% subscribe first and don't care if the subscriptions have been existed {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), - catch AckFun(GrantedQos), + AckFun(GrantedQos), emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 4d72ce7bb..b4f82c043 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -164,6 +164,11 @@ handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) handle_cast(_Msg, State) -> {noreply, State}. +%% Asynchronous SUBACK +handle_info({suback, PacketId, GrantedQos}, State = #client_state{proto_state = ProtoState}) -> + {ok, ProtoState1} = emqttd_protocol:send(?SUBACK_PACKET(PacketId, GrantedQos), ProtoState), + noreply(State#client_state{proto_state = ProtoState1}); + handle_info({deliver, Message}, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), noreply(State#client_state{proto_state = ProtoState1});