From c85617c0800fb8975a1ef6f7f90ed27b13d90d8a Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 8 Jul 2015 18:17:48 +0800 Subject: [PATCH] distributed session --- include/emqttd.hrl | 68 +++++----- src/emqttd.erl | 1 - src/emqttd_access_control.erl | 4 +- src/emqttd_access_rule.erl | 4 +- src/emqttd_app.erl | 1 + src/emqttd_auth_clientid.erl | 4 +- src/emqttd_bridge.erl | 29 ++++- src/emqttd_bridge_sup.erl | 1 - src/emqttd_broker.erl | 20 ++- src/emqttd_client.erl | 47 +++---- src/emqttd_cm.erl | 151 ++++++++++++++++++++++ src/emqttd_cm_sup.erl | 59 +++++++++ src/emqttd_guid.erl | 4 +- src/emqttd_http.erl | 4 +- src/emqttd_message.erl | 15 ++- src/emqttd_metrics.erl | 2 - src/emqttd_mnesia.erl | 1 + src/emqttd_mod_presence.erl | 8 +- src/emqttd_mod_rewrite.erl | 4 +- src/emqttd_mod_sup.erl | 1 + src/emqttd_packet.erl | 1 + src/emqttd_parser.erl | 12 +- src/emqttd_pooler_sup.erl | 1 + src/emqttd_protocol.erl | 145 ++++++++++++--------- src/emqttd_pubsub.erl | 47 +++---- src/emqttd_pubsub_sup.erl | 1 + src/emqttd_retained.erl | 7 +- src/emqttd_serialiser.erl | 1 + src/emqttd_sm.erl | 234 +++++++++++++++++++++++----------- src/emqttd_sm_sup.erl | 9 +- src/emqttd_throttle.erl | 2 +- src/emqttd_topic.erl | 1 + src/emqttd_trace.erl | 1 - src/emqttd_vm.erl | 5 + src/emqttd_ws_client.erl | 18 +-- 35 files changed, 637 insertions(+), 276 deletions(-) create mode 100644 src/emqttd_cm.erl create mode 100644 src/emqttd_cm_sup.erl diff --git a/include/emqttd.hrl b/include/emqttd.hrl index 2ea71811c..d142fdd7b 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -1,24 +1,24 @@ -%%------------------------------------------------------------------------------ -%% Copyright (c) 2012-2015, Feng Lee -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. -%%------------------------------------------------------------------------------ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- %%% @doc %%% MQTT Broker Header. %%% @@ -73,7 +73,7 @@ %%------------------------------------------------------------------------------ -record(mqtt_queue, { name :: binary(), - subpid :: pid(), + qpid :: pid(), qos = 0 :: 0 | 1 | 2 }). @@ -83,12 +83,15 @@ %% MQTT Client %%------------------------------------------------------------------------------ -record(mqtt_client, { - client_id :: binary() | undefined, - username :: binary() | undefined, - ipaddress :: inet:ip_address(), - clean_sess :: boolean(), - client_pid :: pid(), - proto_ver :: 3 | 4 + client_id :: binary() | undefined, + client_pid :: pid(), + username :: binary() | undefined, + peername :: {inet:ip_address(), integer()}, + clean_sess :: boolean(), + proto_ver :: 3 | 4, + keepalive = 0, + will_topic :: undefined | binary(), + connected_at :: erlang:timestamp() }). -type mqtt_client() :: #mqtt_client{}. @@ -98,8 +101,9 @@ %%------------------------------------------------------------------------------ -record(mqtt_session, { client_id, - session_pid, - subscriptions = [] + sess_pid, + persistent, + on_node }). -type mqtt_session() :: #mqtt_session{}. @@ -111,8 +115,8 @@ -type mqtt_pktid() :: 1..16#ffff | undefined. -record(mqtt_message, { - msgid :: mqtt_msgid(), %% Unique Message ID - pktid :: 1..16#ffff, %% PacketId + msgid :: mqtt_msgid(), %% Global unique message ID + pktid :: mqtt_pktid(), %% PacketId topic :: binary(), %% Topic that the message is published to from :: binary() | atom(), %% ClientId of publisher qos = 0 :: 0 | 1 | 2, %% Message QoS diff --git a/src/emqttd.erl b/src/emqttd.erl index ce5f1faa5..a4387cecc 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -230,4 +230,3 @@ is_running(Node) -> Pid when is_pid(Pid) -> true end. - diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index fa01a5ccb..d53d25f19 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -102,8 +102,8 @@ check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) -> allow; check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) -> case M:check_acl({Client, PubSub, Topic}, State) of - allow -> allow; - deny -> deny; + allow -> allow; + deny -> deny; ignore -> check_acl(Client, PubSub, Topic, AclMods) end. diff --git a/src/emqttd_access_rule.erl b/src/emqttd_access_rule.erl index 6d7e895f7..40158285d 100644 --- a/src/emqttd_access_rule.erl +++ b/src/emqttd_access_rule.erl @@ -114,9 +114,9 @@ match_who(#mqtt_client{client_id = ClientId}, {client, ClientId}) -> true; match_who(#mqtt_client{username = Username}, {user, Username}) -> true; -match_who(#mqtt_client{ipaddress = undefined}, {ipaddr, _Tup}) -> +match_who(#mqtt_client{peername = undefined}, {ipaddr, _Tup}) -> false; -match_who(#mqtt_client{ipaddress = IP}, {ipaddr, {_CDIR, Start, End}}) -> +match_who(#mqtt_client{peername = {IP, _}}, {ipaddr, {_CDIR, Start, End}}) -> I = esockd_access:atoi(IP), I >= Start andalso I =< End; match_who(_Client, _Who) -> diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index c1ffd10e6..f9434b385 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -70,6 +70,7 @@ print_vsn() -> start_servers(Sup) -> Servers = [{"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, + {"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd session manager", {supervisor, emqttd_sm_sup}}, {"emqttd session supervisor", {supervisor, emqttd_session_sup}}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl index 2b0ef60c4..53cc5496f 100644 --- a/src/emqttd_auth_clientid.erl +++ b/src/emqttd_auth_clientid.erl @@ -101,9 +101,9 @@ init(Opts) -> check(#mqtt_client{client_id = undefined}, _Password, []) -> {error, "ClientId undefined"}; -check(#mqtt_client{client_id = ClientId, ipaddress = IpAddress}, _Password, []) -> +check(#mqtt_client{client_id = ClientId, peername = {IpAddress, _}}, _Password, []) -> check_clientid_only(ClientId, IpAddress); -check(#mqtt_client{client_id = ClientId, ipaddress = IpAddress}, _Password, [{password, no}|_]) -> +check(#mqtt_client{client_id = ClientId, peername = {IpAddress, _}}, _Password, [{password, no}|_]) -> check_clientid_only(ClientId, IpAddress); check(_Client, undefined, [{password, yes}|_]) -> {error, "Password undefined"}; diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index 38183a573..d59f187cd 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -46,7 +46,8 @@ -record(state, {node, subtopic, qos, topic_suffix = <<>>, - topic_prefix = <<>>, + topic_prefix = <<>>, + mqueue = emqttd_mqueue:mqueue(), max_queue_len = 0, ping_down_interval = ?PING_DOWN_INTERVAL, status = up}). @@ -81,8 +82,11 @@ init([Node, SubTopic, Options]) -> true -> true = erlang:monitor_node(Node, true), State = parse_opts(Options, #state{node = Node, subtopic = SubTopic}), + MQueue = emqttd_mqueue:new(qname(Node, SubTopic), + [{max_len, State#state.max_queue_len}], + emqttd_alarm:alarm_fun()), emqttd_pubsub:subscribe({SubTopic, State#state.qos}), - {ok, State}; + {ok, State#state{mqueue = MQueue}}; false -> {stop, {cannot_connect, Node}} end. @@ -100,15 +104,19 @@ parse_opts([{max_queue_len, Len} | Opts], State) -> parse_opts([{ping_down_interval, Interval} | Opts], State) -> parse_opts(Opts, State#state{ping_down_interval = Interval*1000}). +qname(Node, SubTopic) when is_atom(Node) -> + qname(atom_to_list(Node), SubTopic); +qname(Node, SubTopic) -> + list_to_binary(["Bridge:", Node, ":", SubTopic]). + handle_call(_Request, _From, State) -> {reply, error, State}. handle_cast(_Msg, State) -> {noreply, State}. -handle_info({dispatch, Msg}, State = #state{node = Node, status = down}) -> - lager:error("Bridge Dropped Msg for ~p Down: ~s", [Node, emqttd_message:format(Msg)]), - {noreply, State}; +handle_info({dispatch, Msg}, State = #state{mqueue = MQ, status = down}) -> + {noreply, State#state{mqueue = emqttd_mqueue:in(Msg, MQ)}}; handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) -> rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]), @@ -124,7 +132,7 @@ handle_info({nodeup, Node}, State = #state{node = Node}) -> case emqttd:is_running(Node) of true -> lager:warning("Bridge Node Up: ~p", [Node]), - {noreply, State#state{status = up}}; + {noreply, dequeue(State#state{status = up})}; false -> self() ! {nodedown, Node}, {noreply, State#state{status = down}} @@ -159,6 +167,15 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= +dequeue(State = #state{mqueue = MQ}) -> + case emqttd_mqueue:out(MQ) of + {empty, MQ1} -> + State#state{mqueue = MQ1}; + {{value, Msg}, MQ1} -> + handle_info({dispatch, Msg}, State), + dequeue(State#state{mqueue = MQ1}) + end. + transform(Msg = #mqtt_message{topic = Topic}, #state{topic_prefix = Prefix, topic_suffix = Suffix}) -> Msg#mqtt_message{topic = <>}. diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index 93787937d..d7ffc0f80 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -48,7 +48,6 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -%%TODO: bridges... -spec bridges() -> [{tuple(), pid()}]. bridges() -> [{{Node, SubTopic}, Pid} || {{bridge, Node, SubTopic}, Pid, worker, _} diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 730e70f16..9b002711e 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -58,7 +58,7 @@ -define(BROKER_TAB, mqtt_broker). --record(state, {started_at, sys_interval, tick_tref}). +-record(state, {started_at, sys_interval, heartbeat, tick_tref}). %% $SYS Topics of Broker -define(SYSTOP_BROKERS, [ @@ -224,7 +224,9 @@ init([]) -> emqttd_pubsub:create(<<"$SYS/brokers">>), [ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS], % Tick - {ok, #state{started_at = os:timestamp(), tick_tref = start_tick(tick)}, hibernate}. + {ok, #state{started_at = os:timestamp(), + heartbeat = start_tick(1000, heartbeat), + tick_tref = start_tick(tick)}, hibernate}. handle_call(uptime, _From, State) -> {reply, uptime(State), State}; @@ -260,18 +262,22 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info(tick, State) -> - retain(brokers), - retain(version, list_to_binary(version())), - retain(sysdescr, list_to_binary(sysdescr())), +handle_info(heartbeat, State) -> publish(uptime, list_to_binary(uptime(State))), publish(datetime, list_to_binary(datetime())), {noreply, State, hibernate}; +handle_info(tick, State) -> + retain(brokers), + retain(version, list_to_binary(version())), + retain(sysdescr, list_to_binary(sysdescr())), + {noreply, State, hibernate}; + handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{tick_tref = TRef}) -> +terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) -> + stop_tick(Hb), stop_tick(TRef). code_change(_OldVsn, State, _Extra) -> diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index ba5203a8f..b55326061 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -41,7 +41,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). -%%Client State... +%% Client State... -record(state, {transport, socket, peername, @@ -49,7 +49,7 @@ await_recv, conn_state, conserve, - parse_state, + parser, proto_state, packet_opts, keepalive}). @@ -57,18 +57,16 @@ start_link(SockArgs, PktOpts) -> {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, PktOpts]])}. -%%TODO: rename? info(Pid) -> - gen_server:call(Pid, info). + gen_server:call(Pid, info, infinity). init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> - %transform if ssl. + % Transform if ssl. {ok, NewSock} = esockd_connection:accept(SockArgs), {ok, Peername} = emqttd_net:peername(Sock), {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), lager:info("Connect from ~s", [ConnStr]), SendFun = fun(Data) -> Transport:send(NewSock, Data) end, - ParserState = emqttd_parser:init(PacketOpts), ProtoState = emqttd_protocol:init(Peername, SendFun, PacketOpts), State = control_throttle(#state{transport = Transport, socket = NewSock, @@ -78,17 +76,16 @@ init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> conn_state = running, conserve = false, packet_opts = PacketOpts, - parse_state = ParserState, + parser = emqttd_parser:new(PacketOpts), proto_state = ProtoState}), gen_server:enter_loop(?MODULE, [], State, 10000). -%%TODO: Not enough... -handle_call(info, _From, State = #state{conn_name=ConnName, +handle_call(info, _From, State = #state{conn_name = ConnName, proto_state = ProtoState}) -> {reply, [{conn_name, ConnName} | emqttd_protocol:info(ProtoState)], State}; handle_call(Req, _From, State = #state{peername = Peername}) -> - lager:critical("Client ~s: unexpected request - ~p",[emqttd_net:format(Peername), Req]), + lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]), {reply, {error, unsupported_request}, State}. handle_cast(Msg, State = #state{peername = Peername}) -> @@ -100,8 +97,6 @@ handle_info(timeout, State) -> handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState, conn_name=ConnName}) -> - %% need transfer data??? - %% emqttd_client:transfer(NewPid, Data), lager:error("Shutdown for duplicate clientid: ~s, conn:~s", [emqttd_protocol:clientid(ProtoState), ConnName]), stop({shutdown, duplicate_id}, State); @@ -124,8 +119,7 @@ handle_info({inet_reply, _Ref, ok}, State) -> handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) -> lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]), emqttd_metrics:inc('bytes/received', size(Data)), - process_received_bytes(Data, - control_throttle(State #state{await_recv = false})); + received(Data, control_throttle(State #state{await_recv = false})); handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> network_error(Reason, State); @@ -170,24 +164,22 @@ code_change(_OldVsn, State, _Extra) -> %------------------------------------------------------- % receive and parse tcp data %------------------------------------------------------- -process_received_bytes(<<>>, State) -> +received(<<>>, State) -> {noreply, State, hibernate}; -process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts, - parse_state = ParseState, - proto_state = ProtoState, - conn_name = ConnStr}) -> - case emqttd_parser:parse(Bytes, ParseState) of - {more, ParseState1} -> - {noreply, - control_throttle(State #state{parse_state = ParseState1}), - hibernate}; +received(Bytes, State = #state{packet_opts = PacketOpts, + parser = Parser, + proto_state = ProtoState, + conn_name = ConnStr}) -> + case Parser(Bytes) of + {more, NewParser} -> + {noreply, control_throttle(State #state{parser = NewParser}), hibernate}; {ok, Packet, Rest} -> received_stats(Packet), case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - process_received_bytes(Rest, State#state{parse_state = emqttd_parser:init(PacketOpts), - proto_state = ProtoState1}); + received(Rest, State#state{parser = emqttd_parser:new(PacketOpts), + proto_state = ProtoState1}); {error, Error} -> lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]), stop({shutdown, Error}, State); @@ -201,7 +193,6 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts, stop({shutdown, Error}, State) end. -%%---------------------------------------------------------------------------- network_error(Reason, State = #state{peername = Peername}) -> lager:warning("Client ~s: MQTT detected network error '~p'", [emqttd_net:format(Peername), Reason]), @@ -244,4 +235,4 @@ inc(?DISCONNECT) -> emqttd_metrics:inc('packets/disconnect'); inc(_) -> ignore. - + diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl new file mode 100644 index 000000000..55359ec6b --- /dev/null +++ b/src/emqttd_cm.erl @@ -0,0 +1,151 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% MQTT Client Manager +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_cm). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + +%% API Exports +-export([start_link/2, pool/0]). + +-export([lookup/1, register/1, unregister/1]). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {id, statsfun}). + +-define(CM_POOL, cm_pool). + +%%%============================================================================= +%%% API +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc Start client manager +%% @end +%%------------------------------------------------------------------------------ +-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when + Id :: pos_integer(), + StatsFun :: fun(). +start_link(Id, StatsFun) -> + gen_server:start_link(?MODULE, [Id, StatsFun], []). + +pool() -> ?CM_POOL. + +%%------------------------------------------------------------------------------ +%% @doc Lookup client pid with clientId +%% @end +%%------------------------------------------------------------------------------ +-spec lookup(ClientId :: binary()) -> mqtt_client() | undefined. +lookup(ClientId) when is_binary(ClientId) -> + case ets:lookup(mqtt_client, ClientId) of + [Client] -> Client; + [] -> undefined + end. + +%%------------------------------------------------------------------------------ +%% @doc Register clientId with pid. +%% @end +%%------------------------------------------------------------------------------ +-spec register(Client :: mqtt_client()) -> ok. +register(Client = #mqtt_client{client_id = ClientId}) -> + CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), + gen_server:cast(CmPid, {register, Client}). + +%%------------------------------------------------------------------------------ +%% @doc Unregister clientId with pid. +%% @end +%%------------------------------------------------------------------------------ +-spec unregister(ClientId :: binary()) -> ok. +unregister(ClientId) when is_binary(ClientId) -> + CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), + gen_server:cast(CmPid, {unregister, ClientId, self()}). + +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= + +init([Id, StatsFun]) -> + gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}), + {ok, #state{id = Id, statsfun = StatsFun}}. + +handle_call(Req, _From, State) -> + lager:error("unexpected request: ~p", [Req]), + {reply, {error, badreq}, State}. + +handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = Pid}}, State) -> + lager:info("CM register ~s with ~p", [ClientId, Pid]), + case ets:lookup(mqtt_client, ClientId) of + [#mqtt_client{client_pid = Pid}] -> + lager:error("ClientId '~s' has been registered with ~p", [ClientId, Pid]), + ignore; + [#mqtt_client{client_pid = OldPid}] -> + lager:error("ClientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]); + [] -> + ok + end, + ets:insert(mqtt_client, Client), + {noreply, setstats(State)}; + +handle_cast({unregister, ClientId, Pid}, State) -> + lager:info("CM unregister ~s with ~p", [ClientId, Pid]), + case ets:lookup(mqtt_client, ClientId) of + [#mqtt_client{client_pid = Pid}] -> + ets:delete(mqtt_client, ClientId); + [_] -> + ignore; + [] -> + lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid]) + end, + {noreply, setstats(State)}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #state{id = Id}) -> + gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +setstats(State = #state{statsfun = StatsFun}) -> + StatsFun(ets:info(mqtt_client, size)), State. + diff --git a/src/emqttd_cm_sup.erl b/src/emqttd_cm_sup.erl new file mode 100644 index 000000000..aa7af9d8a --- /dev/null +++ b/src/emqttd_cm_sup.erl @@ -0,0 +1,59 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd client manager supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_cm_sup). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + ets:new(mqtt_client, [ordered_set, named_table, public, + {keypos, 2}, {write_concurrency, true}]), + Schedulers = erlang:system_info(schedulers), + gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]), + StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), + Children = lists:map( + fun(I) -> + Name = {emqttd_cm, I}, + gproc_pool:add_worker(emqttd_cm:pool(), Name, I), + {Name, {emqttd_cm, start_link, [I, StatsFun]}, + permanent, 10000, worker, [emqttd_cm]} + end, lists:seq(1, Schedulers)), + {ok, {{one_for_all, 10, 100}, Children}}. + + diff --git a/src/emqttd_guid.erl b/src/emqttd_guid.erl index 07d3dbcd4..d9efd3620 100644 --- a/src/emqttd_guid.erl +++ b/src/emqttd_guid.erl @@ -31,14 +31,14 @@ %%% 1. Timestamp: erlang:system_time if Erlang >= R18, otherwise os:timestamp %%% 2. NodeId: encode node() to 2 bytes integer %%% 3. Pid: encode pid to 4 bytes integer -%%% 4. Sequence: 2 bytes sequence per pid +%%% 4. Sequence: 2 bytes sequence in one process %%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_guid). --export([gen/0]). +-export([gen/0, new/0]). -define(MAX_SEQ, 16#FFFF). diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 3e66670e6..6258450da 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_http). -author("Feng Lee "). @@ -46,13 +47,14 @@ handle_request('POST', "/mqtt/publish", Req) -> lager:info("HTTP Publish: ~p", [Params]), case authorized(Req) of true -> + ClientId = get_value("client", Params, http), Qos = int(get_value("qos", Params, "0")), Retain = bool(get_value("retain", Params, "0")), Topic = list_to_binary(get_value("topic", Params)), Payload = list_to_binary(get_value("message", Params)), case {validate(qos, Qos), validate(topic, Topic)} of {true, true} -> - Msg = emqttd_message:make(http, Qos, Topic, Payload), + Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), emqttd_pubsub:publish(Msg#mqtt_message{retain = Retain}), Req:ok({"text/plan", <<"ok\n">>}); {false, _} -> diff --git a/src/emqttd_message.erl b/src/emqttd_message.erl index bcd094928..00608ea69 100644 --- a/src/emqttd_message.erl +++ b/src/emqttd_message.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_message). -author("Feng Lee "). @@ -150,7 +151,6 @@ set_flag(retain, Msg = #mqtt_message{retain = false}) -> Msg#mqtt_message{retain = true}; set_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. - %%------------------------------------------------------------------------------ %% @doc Unset dup, retain flag %% @end @@ -170,7 +170,14 @@ unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. %% @doc Format MQTT Message %% @end %%------------------------------------------------------------------------------ -format(#mqtt_message{msgid=MsgId, pktid = PktId, from = From, qos=Qos, retain=Retain, dup=Dup, topic=Topic}) -> - io_lib:format("Message(MsgId=~p, PktId=~p, from=~s, Qos=~p, Retain=~s, Dup=~s, Topic=~s)", - [MsgId, PktId, From, Qos, Retain, Dup, Topic]). +format(#mqtt_message{msgid=MsgId, + pktid = PktId, + from = From, + qos=Qos, + retain=Retain, + dup=Dup, + topic=Topic}) -> + io_lib:format("Message(MsgId=~p, PktId=~p, from=~s, " + "Qos=~p, Retain=~s, Dup=~s, Topic=~s)", + [MsgId, PktId, From, Qos, Retain, Dup, Topic]). diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index bb96f5820..b6752df29 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -78,7 +78,6 @@ {counter, 'messages/received'}, % Messages received {counter, 'messages/sent'}, % Messages sent {gauge, 'messages/retained'}, % Messagea retained - {gauge, 'messages/stored/count'}, % Messages stored {counter, 'messages/dropped'} % Messages dropped ]). @@ -236,4 +235,3 @@ create_metric({counter, Name}) -> metric_topic(Metric) -> emqttd_topic:systop(list_to_binary(lists:concat(['metrics/', Metric]))). - diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 3fb2d67a4..2c7f3b961 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_mnesia). -author('feng@emqtt.io'). diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index 63edf024b..b534695e0 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -24,12 +24,15 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_mod_presence). -author("Feng Lee "). -include("emqttd.hrl"). +-behaviour(emqttd_gen_mod). + -export([load/1, unload/1]). -export([client_connected/3, client_disconnected/3]). @@ -41,7 +44,7 @@ load(Opts) -> client_connected(ConnAck, #mqtt_client{client_id = ClientId, username = Username, - ipaddress = IpAddress, + peername = {IpAddress, _}, clean_sess = CleanSess, proto_ver = ProtoVer}, Opts) -> Sess = case CleanSess of @@ -80,7 +83,8 @@ topic(connected, ClientId) -> topic(disconnected, ClientId) -> emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])). -reason(Reason) when is_atom(Reason) -> Reason; +reason(Reason) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; reason(_) -> internal_error. + diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index 4f29b51d1..b15a0c700 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -49,7 +49,7 @@ load(Opts) -> {?MODULE, rewrite, [subscribe, Sections]}), emqttd_broker:hook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}, {?MODULE, rewrite, [unsubscribe, Sections]}), - emqttd_broker:hook('client.publish', {?MODULE, rewrite_publish}, + emqttd_broker:hook('message.publish', {?MODULE, rewrite_publish}, {?MODULE, rewrite, [publish, Sections]}). rewrite(_ClientId, TopicTable, subscribe, Sections) -> @@ -85,7 +85,7 @@ reload(File) -> unload(_) -> emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}), emqttd_broker:unhook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), - emqttd_broker:unhook('client.publish', {?MODULE, rewrite_publish}). + emqttd_broker:unhook('message.publish', {?MODULE, rewrite_publish}). %%%============================================================================= %%% Internal functions diff --git a/src/emqttd_mod_sup.erl b/src/emqttd_mod_sup.erl index 8fa3ced59..a2b1a717f 100644 --- a/src/emqttd_mod_sup.erl +++ b/src/emqttd_mod_sup.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_mod_sup). -author("Feng Lee "). diff --git a/src/emqttd_packet.erl b/src/emqttd_packet.erl index c2098c2ff..383f8df7d 100644 --- a/src/emqttd_packet.erl +++ b/src/emqttd_packet.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_packet). -author("Feng Lee "). diff --git a/src/emqttd_parser.erl b/src/emqttd_parser.erl index c0a398af8..01b91131f 100644 --- a/src/emqttd_parser.erl +++ b/src/emqttd_parser.erl @@ -33,21 +33,25 @@ -include("emqttd_protocol.hrl"). %% API --export([init/1, parse/2]). +-export([new/1, parse/2]). -record(mqtt_packet_limit, {max_packet_size}). -type option() :: {atom(), any()}. +-type parser() :: fun( (binary()) -> any() ). + %%------------------------------------------------------------------------------ %% @doc Initialize a parser %% @end %%------------------------------------------------------------------------------ --spec init(Opts :: [option()]) -> {none, #mqtt_packet_limit{}}. -init(Opts) -> {none, limit(Opts)}. +-spec new(Opts :: [option()]) -> parser(). +new(Opts) -> + fun(Bin) -> parse(Bin, {none, limit(Opts)}) end. limit(Opts) -> - #mqtt_packet_limit{max_packet_size = proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}. + #mqtt_packet_limit{max_packet_size = + proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}. %%------------------------------------------------------------------------------ %% @doc Parse MQTT Packet diff --git a/src/emqttd_pooler_sup.erl b/src/emqttd_pooler_sup.erl index d5f3a0ee5..e503f7c1a 100644 --- a/src/emqttd_pooler_sup.erl +++ b/src/emqttd_pooler_sup.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_pooler_sup). -author("Feng Lee "). diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 3aa4c6292..40fad61b0 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_protocol). -author("Feng Lee "). @@ -40,20 +41,20 @@ -export([handle/2]). %% Protocol State --record(proto_state, { - peername, - sendfun, - connected = false, %received CONNECT action? - proto_ver, - proto_name, - username, - client_id, - clean_sess, - session, - will_msg, - max_clientid_len = ?MAX_CLIENTID_LEN, - client_pid -}). +-record(proto_state, {peername, + sendfun, + connected = false, %received CONNECT action? + proto_ver, + proto_name, + username, + client_id, + clean_sess, + session, + will_msg, + keepalive, + max_clientid_len = ?MAX_CLIENTID_LEN, + client_pid, + connected_at}). -type proto_state() :: #proto_state{}. @@ -61,6 +62,7 @@ %% @doc Init protocol %% @end %%------------------------------------------------------------------------------ + init(Peername, SendFun, Opts) -> MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), #proto_state{peername = Peername, @@ -68,32 +70,50 @@ init(Peername, SendFun, Opts) -> max_clientid_len = MaxLen, client_pid = self()}. -info(#proto_state{proto_ver = ProtoVer, +info(#proto_state{client_id = ClientId, + username = Username, + peername = Peername, + proto_ver = ProtoVer, proto_name = ProtoName, - client_id = ClientId, + keepalive = KeepAlive, clean_sess = CleanSess, - will_msg = WillMsg}) -> - [{proto_ver, ProtoVer}, + will_msg = WillMsg, + connected_at = ConnectedAt}) -> + [{client_id, ClientId}, + {username, Username}, + {peername, Peername}, + {proto_ver, ProtoVer}, {proto_name, ProtoName}, - {client_id, ClientId}, - {clean_sess, CleanSess}, - {will_msg, WillMsg}]. + {keepalive, KeepAlive}, + {clean_sess, CleanSess}, + {will_msg, WillMsg}, + {connected_at, ConnectedAt}]. clientid(#proto_state{client_id = ClientId}) -> ClientId. -client(#proto_state{peername = {Addr, _Port}, - client_id = ClientId, - username = Username, +client(#proto_state{client_id = ClientId, + peername = Peername, + username = Username, clean_sess = CleanSess, proto_ver = ProtoVer, - client_pid = Pid}) -> - #mqtt_client{client_id = ClientId, + keepalive = Keepalive, + will_msg = WillMsg, + client_pid = Pid, + connected_at = Time}) -> + WillTopic = if + WillMsg =:= undefined -> undefined; + true -> WillMsg#mqtt_message.topic + end, + #mqtt_client{client_id = ClientId, + client_pid = Pid, username = Username, - ipaddress = Addr, + peername = Peername, clean_sess = CleanSess, proto_ver = ProtoVer, - client_pid = Pid}. + keepalive = Keepalive, + will_topic = WillTopic, + connected_at = Time}. %% CONNECT – Client requests a connection to a Server @@ -111,7 +131,7 @@ received(_Packet, State = #proto_state{connected = false}) -> received(Packet = ?PACKET(_Type), State) -> trace(recv, Packet, State), - case validate_packet(Packet) of + case validate_packet(Packet) of ok -> handle(Packet, State); {error, Reason} -> @@ -121,18 +141,21 @@ received(Packet = ?PACKET(_Type), State) -> handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) -> #mqtt_packet_connect{proto_ver = ProtoVer, - proto_name = ProtoName, + proto_name = ProtoName, username = Username, password = Password, clean_sess = CleanSess, - keep_alive = KeepAlive, - client_id = ClientId} = Var, + keep_alive = KeepAlive, + client_id = ClientId} = Var, State1 = State0#proto_state{proto_ver = ProtoVer, - proto_name = ProtoName, + proto_name = ProtoName, username = Username, client_id = ClientId, - clean_sess = CleanSess}, + clean_sess = CleanSess, + keepalive = KeepAlive, + will_msg = willmsg(Var), + connected_at = os:timestamp()}, trace(recv, Packet, State1), @@ -142,16 +165,20 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} case emqttd_access_control:auth(client(State1), Password) of ok -> %% Generate clientId if null - State2 = State1#proto_state{client_id = clientid(ClientId, State1)}, + State2 = maybe_set_clientid(State1), - %%Starting session - {ok, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)), - - %% Start keepalive - start_keepalive(KeepAlive), - - %% ACCEPT - {?CONNACK_ACCEPT, State2#proto_state{session = Session, will_msg = willmsg(Var)}}; + %% Start session + case emqttd_sm:start_session(CleanSess, clientid(State2)) of + {ok, Session} -> + %% Register the client + emqttd_cm:register(client(State2)), + %% Start keepalive + start_keepalive(KeepAlive), + %% ACCEPT + {?CONNACK_ACCEPT, State2#proto_state{session = Session}}; + {error, Error} -> + exit({shutdown, Error}) + end; {error, Reason}-> lager:error("~s@~s: username '~s', login failed - ~s", [ClientId, emqttd_net:format(Peername), Username, Reason]), @@ -177,8 +204,6 @@ handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), end, {ok, State}; - - handle(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) -> emqttd_session:puback(Session, PacketId), {ok, State}; @@ -239,8 +264,7 @@ publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{client_id = Cl ok -> send(?PUBACK_PACKET(?PUBACK, PacketId), State); {error, Error} -> - %%TODO: log format... - lager:error("Client ~s: publish qos1 error ~p", [ClientId, Error]) + lager:error("Client ~s: publish qos1 error - ~p", [ClientId, Error]) end; publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{client_id = ClientId, session = Session}) -> @@ -248,15 +272,15 @@ publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{client_id = Cl ok -> send(?PUBACK_PACKET(?PUBREC, PacketId), State); {error, Error} -> - %%TODO: log format... - lager:error("Client ~s: publish qos2 error ~p", [ClientId, Error]) + lager:error("Client ~s: publish qos2 error - ~p", [ClientId, Error]) end. -spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}. send(Msg, State) when is_record(Msg, mqtt_message) -> send(emqttd_message:to_packet(Msg), State); -send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when is_record(Packet, mqtt_packet) -> +send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) + when is_record(Packet, mqtt_packet) -> trace(send, Packet, State), sent_stats(Packet), Data = emqttd_serialiser:serialise(Packet), @@ -280,26 +304,29 @@ redeliver({?PUBREL, PacketId}, State) -> shutdown(duplicate_id, _State) -> quiet; %% -shutdown(_, #proto_state{client_id = undefined}) -> +shutdown(Error, #proto_state{client_id = undefined}) -> + lager:info("Protocol shutdown ~p", [Error]), ignore; shutdown(Error, #proto_state{peername = Peername, client_id = ClientId, will_msg = WillMsg}) -> lager:info([{client, ClientId}], "Client ~s@~s: shutdown ~p", [ClientId, emqttd_net:format(Peername), Error]), send_willmsg(ClientId, WillMsg), + emqttd_cm:unregister(ClientId), emqttd_broker:foreach_hooks('client.disconnected', [Error, ClientId]). willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> emqttd_message:from_packet(Packet). -%% generate a clientId -clientid(undefined, State) -> - clientid(<<>>, State); -%%TODO: <<>> is not right. -clientid(<<>>, #proto_state{peername = Peername}) -> - {_, _, MicroSecs} = os:timestamp(), - iolist_to_binary(["emqttd_", base64:encode(emqttd_net:format(Peername)), integer_to_list(MicroSecs)]); -clientid(ClientId, _State) -> ClientId. +%% Generate a client if if nulll +maybe_set_clientid(State = #proto_state{client_id = NullId}) + when NullId =:= undefined orelse NullId =:= <<>> -> + {_, NPid, _} = emqttd_guid:new(), + ClientId = iolist_to_binary(["emqttd_", integer_to_list(NPid)]), + State#proto_state{client_id = ClientId}; + +maybe_set_clientid(State) -> + State. send_willmsg(_ClientId, undefined) -> ignore; diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index a81ce1af0..6deda1950 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -39,17 +39,19 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). --behaviour(gen_server). - %% API Exports -export([start_link/2]). -export([create/1, subscribe/1, unsubscribe/1, - publish/1, - %local node - dispatch/2, match/1]). + publish/1]). + +%% Local node +-export([dispatch/2, + match/1]). + +-behaviour(gen_server). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -62,6 +64,7 @@ %%%============================================================================= %%% Mnesia callbacks %%%============================================================================= + mnesia(boot) -> %% p2p queue table ok = emqttd_mnesia:create_table(queue, [ @@ -111,6 +114,7 @@ start_link(Id, Opts) -> create(<<"$Q/", _Queue/binary>>) -> %% protecte from queue {error, cannot_create_queue}; + create(Topic) when is_binary(Topic) -> TopicR = #mqtt_topic{topic = Topic, node = node()}, case mnesia:transaction(fun add_topic/1, [TopicR]) of @@ -124,7 +128,8 @@ create(Topic) when is_binary(Topic) -> %% @doc Subscribe topic %% @end %%------------------------------------------------------------------------------ --spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when +-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> + {ok, Qos | list(Qos)} | {error, any()} when Topic :: binary(), Qos :: mqtt_qos(). subscribe({Topic, Qos}) when is_binary(Topic) andalso ?IS_QOS(Qos) -> @@ -158,15 +163,14 @@ cast(Msg) -> %%------------------------------------------------------------------------------ -spec publish(Msg :: mqtt_message()) -> ok. publish(#mqtt_message{from = From} = Msg) -> - trace(publish, From, Msg), - - Msg1 = #mqtt_message{topic = Topic} = emqttd_broker:foldl_hooks('client.publish', [], Msg), + Msg1 = #mqtt_message{topic = Topic} + = emqttd_broker:foldl_hooks('message.publish', [], Msg), %% Retain message first. Don't create retained topic. case emqttd_retained:retain(Msg1) of ok -> - %TODO: why unset 'retain' flag? + %% TODO: why unset 'retain' flag? publish(Topic, emqttd_message:unset_flag(Msg1)); ignore -> publish(Topic, Msg1) @@ -174,12 +178,12 @@ publish(#mqtt_message{from = From} = Msg) -> publish(<<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) -> lists:foreach( - fun(#mqtt_queue{subpid = SubPid, qos = SubQos}) -> + fun(#mqtt_queue{qpid = QPid, qos = SubQos}) -> Msg1 = if Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; true -> Msg end, - SubPid ! {dispatch, Msg1} + QPid ! {dispatch, Msg1} end, mnesia:dirty_read(queue, Queue)); publish(Topic, Msg) when is_binary(Topic) -> @@ -197,7 +201,7 @@ publish(Topic, Msg) when is_binary(Topic) -> -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). dispatch(Topic, #mqtt_message{qos = Qos} = Msg ) when is_binary(Topic) -> Subscribers = mnesia:dirty_read(subscriber, Topic), - setstats(dropped, Subscribers =:= []), %%TODO:... + setstats(dropped, Subscribers =:= []), lists:foreach( fun(#mqtt_subscriber{subpid=SubPid, qos = SubQos}) -> Msg1 = if @@ -220,12 +224,11 @@ match(Topic) when is_binary(Topic) -> init([Id, _Opts]) -> process_flag(min_heap_size, 1024*1024), gproc_pool:connect_worker(pubsub, {?MODULE, Id}), - %%TODO: gb_trees to replace maps? {ok, #state{id = Id, submap = maps:new()}}. handle_call({subscribe, SubPid, Topics}, _From, State) -> TopicSubs = lists:map(fun({<<"$Q/", _/binary>> = Queue, Qos}) -> - #mqtt_queue{name = Queue, subpid = SubPid, qos = Qos}; + #mqtt_queue{name = Queue, qpid = SubPid, qos = Qos}; ({Topic, Qos}) -> {#mqtt_topic{topic = Topic, node = node()}, #mqtt_subscriber{topic = Topic, subpid = SubPid, qos = Qos}} @@ -252,7 +255,7 @@ handle_call({subscribe, SubPid, <<"$Q/", _/binary>> = Queue, Qos}, _From, State) [OldQueueR] -> lager:error("Queue is overwrited by ~p: ~p", [SubPid, OldQueueR]); [] -> ok end, - QueueR = #mqtt_queue{name = Queue, subpid = SubPid, qos = Qos}, + QueueR = #mqtt_queue{name = Queue, qpid = SubPid, qos = Qos}, case mnesia:transaction(fun add_queue/1, [QueueR]) of {atomic, ok} -> setstats(queues), @@ -279,7 +282,7 @@ handle_call(Req, _From, State) -> handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) -> TopicSubs = lists:map(fun(<<"$Q/", _/binary>> = Queue) -> - #mqtt_queue{name = Queue, subpid = SubPid}; + #mqtt_queue{name = Queue, qpid = SubPid}; (Topic) -> {#mqtt_topic{topic = Topic, node = node()}, #mqtt_subscriber{topic = Topic, subpid = SubPid, _ = '_'}} @@ -300,7 +303,7 @@ handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) -> {noreply, State}; handle_cast({unsubscribe, SubPid, <<"$Q/", _/binary>> = Queue}, State) -> - QueueR = #mqtt_queue{name = Queue, subpid = SubPid}, + QueueR = #mqtt_queue{name = Queue, qpid = SubPid}, case mnesia:transaction(fun remove_queue/1, [QueueR]) of {atomic, _} -> setstats(queues); @@ -329,7 +332,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa Node = node(), F = fun() -> %% remove queue... - Queues = mnesia:match_object(queue, #mqtt_queue{subpid = DownPid, _ = '_'}, write), + Queues = mnesia:match_object(queue, #mqtt_queue{qpid = DownPid, _ = '_'}, write), lists:foreach(fun(QueueR) -> mnesia:delete_object(queue, QueueR, write) end, Queues), @@ -420,9 +423,9 @@ monitor_subscriber(SubPid, State = #state{submap = SubMap}) -> end, State#state{submap = NewSubMap}. -remove_queue(#mqtt_queue{name = Name, subpid = Pid}) -> +remove_queue(#mqtt_queue{name = Name, qpid = Pid}) -> case mnesia:wread({queue, Name}) of - [R = #mqtt_queue{subpid = Pid}] -> + [R = #mqtt_queue{qpid = Pid}] -> mnesia:delete(queue, R, write); _ -> ok @@ -463,13 +466,11 @@ setstats(subscribers) -> emqttd_stats:setstats('subscribers/count', 'subscribers/max', mnesia:table_info(subscriber, size)). -%%TODO: queue dropped? setstats(dropped, false) -> ignore; setstats(dropped, true) -> emqttd_metrics:inc('messages/dropped'). - %%%============================================================================= %%% Trace functions %%%============================================================================= diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 51e89d56f..fb58ec74a 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_pubsub_sup). -author("Feng Lee "). diff --git a/src/emqttd_retained.erl b/src/emqttd_retained.erl index 5c1b03610..adbe82e64 100644 --- a/src/emqttd_retained.erl +++ b/src/emqttd_retained.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_retained). -author("Feng Lee "). @@ -73,7 +74,7 @@ retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) -> - TabSize = mnesia:table_info(message, size), + TabSize = mnesia:table_info(retained, size), case {TabSize < limit(table), size(Payload) < limit(payload)} of {true, true} -> Retained = #mqtt_retained{topic = Topic, message = Msg}, @@ -83,7 +84,7 @@ retain(Msg = #mqtt_message{topic = Topic, {false, _}-> lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]); {_, false}-> - lager:error("Dropped retained message(topic=~s, payload=~p) for payload is too big!", [Topic, size(Payload)]) + lager:error("Dropped retained message(topic=~s, payload_size=~p) for payload is too big!", [Topic, size(Payload)]) end, ok. limit(table) -> @@ -107,7 +108,7 @@ env() -> -spec dispatch(Topic, CPid) -> any() when Topic :: binary(), CPid :: pid(). -dispatch(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) -> +dispatch(Topic, CPid) when is_binary(Topic) -> Msgs = case emqttd_topic:wildcard(Topic) of false -> diff --git a/src/emqttd_serialiser.erl b/src/emqttd_serialiser.erl index 471904bce..85d3239b1 100644 --- a/src/emqttd_serialiser.erl +++ b/src/emqttd_serialiser.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_serialiser). -author("Feng Lee "). diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index b567937d7..5becbba1e 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -22,16 +22,6 @@ %%% @doc %%% emqttd session manager. %%% -%%% The Session state in the Server consists of: -%%% The existence of a Session, even if the rest of the Session state is empty. -%%% The Client’s subscriptions. -%%% QoS 1 and QoS 2 messages which have been sent to the Client, but have not -%%% been completely acknowledged. -%%% QoS 1 and QoS 2 messages pending transmission to the Client. -%%% QoS 2 messages which have been received from the Client, but have not been -%%% completely acknowledged. -%%% Optionally, QoS 0 messages pending transmission to the Client. -%%% %%% @end %%%----------------------------------------------------------------------------- @@ -41,14 +31,18 @@ -include("emqttd.hrl"). --behaviour(gen_server). +%% Mnesia Callbacks +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). %% API Function Exports --export([start_link/2, pool/0, table/0]). +-export([start_link/2, pool/0]). --export([lookup_session/1, - start_session/2, - destroy_session/1]). +-export([start_session/2, lookup_session/1]). + +-behaviour(gen_server). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -58,7 +52,20 @@ -define(SM_POOL, sm_pool). --define(SESSION_TAB, mqtt_session). +%%%============================================================================= +%%% Mnesia callbacks +%%%============================================================================= + +mnesia(boot) -> + ok = emqttd_mnesia:create_table(session, [ + {type, ordered_set}, + {ram_copies, [node()]}, + {record_name, mqtt_session}, + {attributes, record_info(fields, mqtt_session)}, + {index, [sess_pid]}]); + +mnesia(copy) -> + ok = emqttd_mnesia:copy_table(session). %%%============================================================================= %%% API @@ -69,8 +76,8 @@ %% @end %%------------------------------------------------------------------------------ -spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when - Id :: pos_integer(), - StatsFun :: {fun(), fun()}. + Id :: pos_integer(), + StatsFun :: fun(). start_link(Id, StatsFun) -> gen_server:start_link(?MODULE, [Id, StatsFun], []). @@ -80,43 +87,27 @@ start_link(Id, StatsFun) -> %%------------------------------------------------------------------------------ pool() -> ?SM_POOL. -%%------------------------------------------------------------------------------ -%% @doc Table name. -%% @end -%%------------------------------------------------------------------------------ -table() -> ?SESSION_TAB. - %%------------------------------------------------------------------------------ %% @doc Start a session %% @end %%------------------------------------------------------------------------------ - -spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid()} | {error, any()}. start_session(CleanSess, ClientId) -> SM = gproc_pool:pick_worker(?SM_POOL, ClientId), call(SM, {start_session, {CleanSess, ClientId, self()}}). %%------------------------------------------------------------------------------ -%% @doc Lookup Session Pid +%% @doc Lookup a Session %% @end %%------------------------------------------------------------------------------ -spec lookup_session(binary()) -> pid() | undefined. lookup_session(ClientId) -> - case ets:lookup(?SESSION_TAB, ClientId) of - [{_Clean, _, SessPid, _}] -> SessPid; + case mnesia:dirty_read(session, ClientId) of + [Session] -> Session; [] -> undefined end. -%%------------------------------------------------------------------------------ -%% @doc Destroy a session -%% @end -%%------------------------------------------------------------------------------ --spec destroy_session(binary()) -> ok. -destroy_session(ClientId) -> - SM = gproc_pool:pick_worker(?SM_POOL, ClientId), - call(SM, {destroy_session, ClientId}). - -call(SM, Req) -> gen_server:call(SM, Req). +call(SM, Req) -> gen_server:call(SM, Req, infinity). %%%============================================================================= %%% gen_server callbacks @@ -126,37 +117,28 @@ init([Id, StatsFun]) -> gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}), {ok, #state{id = Id, statsfun = StatsFun}}. +%% persistent session handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> - Reply = - case ets:lookup(?SESSION_TAB, ClientId) of - [{_Clean, _, SessPid, _MRef}] -> - emqttd_session:resume(SessPid, ClientId, ClientPid), - {ok, SessPid}; - [] -> - new_session(false, ClientId, ClientPid) - end, - {reply, Reply, setstats(State)}; + case lookup_session(ClientId) of + undefined -> + %% create session locally + {reply, create_session(false, ClientId, ClientPid), State}; + Session -> + {reply, resume_session(Session, ClientPid), State} + end; handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> - case ets:lookup(?SESSION_TAB, ClientId) of - [{_Clean, _, SessPid, MRef}] -> - erlang:demonitor(MRef, [flush]), - emqttd_session:destroy(SessPid, ClientId); - [] -> - ok - end, - {reply, new_session(true, ClientId, ClientPid), setstats(State)}; - -handle_call({destroy_session, ClientId}, _From, State) -> - case ets:lookup(?SESSION_TAB, ClientId) of - [{_Clean, _, SessPid, MRef}] -> - emqttd_session:destroy(SessPid, ClientId), - erlang:demonitor(MRef, [flush]), - ets:delete(?SESSION_TAB, ClientId); - [] -> - ignore - end, - {reply, ok, setstats(State)}; + case lookup_session(ClientId) of + undefined -> + {reply, create_session(true, ClientId, ClientPid), State}; + Session -> + case destroy_session(Session) of + ok -> + {reply, create_session(true, ClientId, ClientPid), State}; + {error, Error} -> + {reply, {error, Error}, State} + end + end; handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -164,8 +146,11 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> - ets:match_delete(?SESSION_TAB, {'_', '_', DownPid, MRef}), +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) -> + mnesia:transaction(fun() -> + [mnesia:delete_object(session, Sess, write) || Sess + <- mnesia:index_read(session, DownPid, #mqtt_session.sess_pid)] + end), {noreply, setstats(State)}; handle_info(_Info, State) -> @@ -181,17 +166,116 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -new_session(CleanSess, ClientId, ClientPid) -> +create_session(CleanSess, ClientId, ClientPid) -> case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of {ok, SessPid} -> - MRef = erlang:monitor(process, SessPid), - ets:insert(?SESSION_TAB, {CleanSess, ClientId, SessPid, MRef}), - {ok, SessPid}; + Session = #mqtt_session{client_id = ClientId, + sess_pid = SessPid, + persistent = not CleanSess, + on_node = node()}, + case insert_session(Session) of + {aborted, {conflict, Node}} -> + %% conflict with othe node? + lager:critical("Session ~s conflict with node ~p!", [ClientId, Node]), + {error, conflict}; + {atomic, ok} -> + erlang:monitor(process, SessPid), + {ok, SessPid} + end; {error, Error} -> {error, Error} end. -setstats(State = #state{statsfun = {CFun, SFun}}) -> - CFun(ets:info(?SESSION_TAB, size)), - SFun(ets:select_count(?SESSION_TAB, [{{false, '_', '_', '_'}, [], [true]}])), +insert_session(Session = #mqtt_session{client_id = ClientId}) -> + mnesia:transaction(fun() -> + case mnesia:wread({session, ClientId}) of + [] -> + mnesia:write(session, Session, write); + [#mqtt_session{on_node = Node}] -> + mnesia:abort({conflict, Node}) + end + end). + +%% local node +resume_session(#mqtt_session{client_id = ClientId, + sess_pid = SessPid, + on_node = Node}, ClientPid) + when Node =:= node() -> + case is_process_alive(SessPid) of + true -> + emqttd_session:resume(SessPid, ClientId, ClientPid), + {ok, SessPid}; + false -> + lager:critical("Session ~s@~p died unexpectedly!", [ClientId, SessPid]), + {error, session_died} + end; + +%% remote node +resume_session(Session = #mqtt_session{client_id = ClientId, + sess_pid = SessPid, + on_node = Node}, ClientPid) -> + case emqttd:is_running(Node) of + true -> + case rpc:call(Node, emqttd_session, resume, [SessPid, ClientId, ClientPid]) of + ok -> + {ok, SessPid}; + {badrpc, Reason} -> + lager:critical("Resume session ~s on remote node ~p failed for ~p", + [ClientId, Node, Reason]), + {error, list_to_atom("session_" ++ atom_to_list(Reason))} + end; + false -> + lager:critical("Session ~s died for node ~p down!", [ClientId, Node]), + remove_session(Session), + {error, session_node_down} + end. + +%% local node +destroy_session(Session = #mqtt_session{client_id = ClientId, + sess_pid = SessPid, + on_node = Node}) when Node =:= node() -> + case is_process_alive(SessPid) of + true -> + emqttd_session:destroy(SessPid, ClientId); + false -> + lager:critical("Session ~s@~p died unexpectedly!", [ClientId, SessPid]) + end, + case remove_session(Session) of + {atomic, ok} -> ok; + {aborted, Error} -> {error, Error} + end; + +%% remote node +destroy_session(Session = #mqtt_session{client_id = ClientId, + sess_pid = SessPid, + on_node = Node}) -> + case emqttd:is_running(Node) of + true -> + case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of + ok -> + case remove_session(Session) of + {atomic, ok} -> ok; + {aborted, Error} -> {error, Error} + end; + {badrpc, Reason} -> + lager:critical("Destroy session ~s on remote node ~p failed for ~p", + [ClientId, Node, Reason]), + {error, list_to_atom("session_" ++ atom_to_list(Reason))} + end; + false -> + lager:error("Session ~s died for node ~p down!", [ClientId, Node]), + case remove_session(Session) of + {atomic, ok} -> ok; + {aborted, Error} -> {error, Error} + end + end. + +remove_session(Session) -> + mnesia:transaction(fun() -> + mnesia:delete_object(session, Session, write) + end). + +setstats(State = #state{statsfun = _StatsFun}) -> State. + + diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index 8f5dbd0a4..a597d6dad 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -43,20 +43,15 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - ets:new(emqttd_sm:table(), [set, named_table, public, {keypos, 2}, - {write_concurrency, true}]), Schedulers = erlang:system_info(schedulers), gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), + StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), Children = lists:map( fun(I) -> Name = {emqttd_sm, I}, gproc_pool:add_worker(emqttd_sm:pool(), Name, I), - {Name, {emqttd_sm, start_link, [I, statsfun()]}, + {Name, {emqttd_sm, start_link, [I, StatsFun]}, permanent, 10000, worker, [emqttd_sm]} end, lists:seq(1, Schedulers)), {ok, {{one_for_all, 10, 100}, Children}}. -statsfun() -> - {emqttd_stats:statsfun('clients/count', 'clients/max'), - emqttd_stats:statsfun('sessions/count', 'sessions/max')}. - diff --git a/src/emqttd_throttle.erl b/src/emqttd_throttle.erl index c14e69987..0eb095505 100644 --- a/src/emqttd_throttle.erl +++ b/src/emqttd_throttle.erl @@ -28,5 +28,5 @@ -author("Feng Lee "). -%% TODO:... 0.9.0... +%% TODO:... 0.10.0... diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index 13e9d55a3..7ea5c3ac7 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_topic). -author("Feng Lee "). diff --git a/src/emqttd_trace.erl b/src/emqttd_trace.erl index 4e149f452..c67677747 100644 --- a/src/emqttd_trace.erl +++ b/src/emqttd_trace.erl @@ -52,7 +52,6 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - %%------------------------------------------------------------------------------ %% @doc Start to trace client or topic. %% @end diff --git a/src/emqttd_vm.erl b/src/emqttd_vm.erl index bacf17ab2..0e5f20776 100644 --- a/src/emqttd_vm.erl +++ b/src/emqttd_vm.erl @@ -27,6 +27,8 @@ -module(emqttd_vm). +-export([schedulers/0]). + -export([microsecs/0]). -export([loads/0, scheduler_usage/1]). @@ -164,6 +166,9 @@ sndbuf, tos]). +schedulers() -> + erlang:system_info(schedulers). + microsecs() -> {Mega, Sec, Micro} = erlang:now(), (Mega * 1000000 + Sec) * 1000000 + Micro. diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 2cde114e6..13e013aa6 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -43,7 +43,7 @@ terminate/2, code_change/3]). %% WebSocket Loop State --record(wsocket_state, {request, client_pid, packet_opts, parser_state}). +-record(wsocket_state, {request, client_pid, packet_opts, parser}). %% Client State -record(client_state, {ws_pid, request, proto_state, keepalive}). @@ -59,7 +59,7 @@ start_link(Req) -> ReentryWs(#wsocket_state{request = Req, client_pid = ClientPid, packet_opts = PktOpts, - parser_state = emqttd_parser:init(PktOpts)}). + parser = emqttd_parser:new(PktOpts)}). %%------------------------------------------------------------------------------ %% @private @@ -77,14 +77,14 @@ ws_loop(<<>>, State, _ReplyChannel) -> State; ws_loop([<<>>], State, _ReplyChannel) -> State; -ws_loop(Data, State = #wsocket_state{request = Req, +ws_loop(Data, State = #wsocket_state{request = Req, client_pid = ClientPid, - parser_state = ParserState}, ReplyChannel) -> + parser = Parser}, ReplyChannel) -> Peer = Req:get(peer), lager:debug("RECV from ~s(WebSocket): ~p", [Peer, Data]), - case emqttd_parser:parse(iolist_to_binary(Data), ParserState) of - {more, ParserState1} -> - State#wsocket_state{parser_state = ParserState1}; + case Parser(iolist_to_binary(Data)) of + {more, NewParser} -> + State#wsocket_state{parser = NewParser}; {ok, Packet, Rest} -> gen_server:cast(ClientPid, {received, Packet}), ws_loop(Rest, reset_parser(State), ReplyChannel); @@ -93,8 +93,8 @@ ws_loop(Data, State = #wsocket_state{request = Req, exit({shutdown, Error}) end. -reset_parser(State = #wsocket_state{packet_opts = PktOpts}) -> - State#wsocket_state{parser_state = emqttd_parser:init(PktOpts)}. +reset_parser(State = #wsocket_state{packet_opts = PktOpts}) -> + State#wsocket_state{parser = emqttd_parser:new (PktOpts)}. %%%============================================================================= %%% gen_fsm callbacks