From d3e39ae9a35d2807fd9dde5c664b6c068e833c51 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 13 Oct 2015 21:01:53 +0800 Subject: [PATCH] src/emqttd_mod_rewrite.erl --- include/emqttd.hrl | 2 + src/emqttd.erl | 9 ++- src/emqttd_access_rule.erl | 1 + src/emqttd_net.erl | 6 +- src/emqttd_opts.erl | 1 + src/emqttd_plugins.erl | 6 +- src/emqttd_pooler.erl | 15 +++-- src/emqttd_protocol.erl | 135 ++++++++++++++++++------------------- src/emqttd_sm_helper.erl | 2 +- src/emqttd_sysmon.erl | 10 +-- src/emqttd_topic.erl | 2 +- src/emqttd_ws_client.erl | 59 ++++++++-------- 12 files changed, 125 insertions(+), 123 deletions(-) diff --git a/include/emqttd.hrl b/include/emqttd.hrl index 5c2f61019..51bce2f6d 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -47,6 +47,8 @@ %%------------------------------------------------------------------------------ -type pubsub() :: publish | subscribe. +-define(IS_PUBSUB(PS), (PS =:= publish orelse PS =:= subscribe)). + %%------------------------------------------------------------------------------ %% MQTT Topic %%------------------------------------------------------------------------------ diff --git a/src/emqttd.erl b/src/emqttd.erl index b09cd2d32..eeb5bab4d 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd). -author("Feng Lee "). @@ -41,7 +42,7 @@ {nodelay, true} ]). --type listener() :: {atom(), inet:port_number(), [esockd:option()]}. +-type listener() :: {atom(), inet:port_number(), [esockd:option()]}. %%------------------------------------------------------------------------------ %% @doc Start emqttd application. @@ -109,14 +110,12 @@ close_listeners(Listeners) when is_list(Listeners) -> close_listener({Protocol, Port, _Options}) -> esockd:close({Protocol, Port}). - load_all_mods() -> - Mods = application:get_env(emqttd, modules, []), - lists:foreach(fun({Name, Opts}) -> + lists:foreach(fun({Name, Opts}) -> Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)), Mod:load(Opts), lager:info("load module ~s successfully", [Name]) - end, Mods). + end, env(modules)). is_mod_enabled(Name) -> env(modules, Name) =/= undefined. diff --git a/src/emqttd_access_rule.erl b/src/emqttd_access_rule.erl index 40158285d..fab9461b9 100644 --- a/src/emqttd_access_rule.erl +++ b/src/emqttd_access_rule.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_access_rule). -author("Feng Lee "). diff --git a/src/emqttd_net.erl b/src/emqttd_net.erl index 8488fe76e..627856cd8 100644 --- a/src/emqttd_net.erl +++ b/src/emqttd_net.erl @@ -30,9 +30,11 @@ -include_lib("kernel/include/inet.hrl"). --export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]). +-export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, + getaddr/2, port_to_listeners/1]). --export([peername/1, sockname/1, format/2, format/1, connection_string/2, ntoa/1]). +-export([peername/1, sockname/1, format/2, format/1, + connection_string/2, ntoa/1]). -define(FIRST_TEST_BIND_PORT, 10000). diff --git a/src/emqttd_opts.erl b/src/emqttd_opts.erl index dfd5b74b6..938d002cc 100644 --- a/src/emqttd_opts.erl +++ b/src/emqttd_opts.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_opts). -author("Feng Lee "). diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index 8e23f41e1..b02b1d427 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -41,7 +41,6 @@ %% @doc Load all plugins when the broker started. %% @end %%------------------------------------------------------------------------------ - -spec load() -> list() | {error, any()}. load() -> case env(loaded_file) of @@ -64,7 +63,7 @@ with_loaded_file(File, SuccFun) -> load_plugins(Names, Persistent) -> Plugins = list(), NotFound = Names -- names(Plugins), case NotFound of - [] -> ok; + [] -> ok; NotFound -> lager:error("Cannot find plugins: ~p", [NotFound]) end, NeedToLoad = Names -- NotFound -- names(started_app), @@ -78,7 +77,7 @@ load_plugins(Names, Persistent) -> unload() -> case env(loaded_file) of {ok, File} -> - with_loaded_file(File, fun(Names) -> stop_plugins(Names) end); + with_loaded_file(File, fun stop_plugins/1); undefined -> ignore end. @@ -128,7 +127,6 @@ plugin(PluginsDir, AppFile0) -> %% @doc Load One Plugin %% @end %%------------------------------------------------------------------------------ - -spec load(atom()) -> ok | {error, any()}. load(PluginName) when is_atom(PluginName) -> case lists:member(PluginName, names(started_app)) of diff --git a/src/emqttd_pooler.erl b/src/emqttd_pooler.erl index 147c19ee5..bec9f287a 100644 --- a/src/emqttd_pooler.erl +++ b/src/emqttd_pooler.erl @@ -42,9 +42,12 @@ %%%============================================================================= %%% API %%%============================================================================= --spec start_link(I :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}. -start_link(I) -> - gen_server:start_link(?MODULE, [I], []). +-spec start_link(Id :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}. +start_link(Id) -> + gen_server:start_link({local, name(Id)}, ?MODULE, [Id], []). + +name(Id) -> + list_to_atom(lists:concat([?MODULE, "_", integer_to_list(Id)])). %%------------------------------------------------------------------------------ %% @doc Submit work to pooler @@ -64,9 +67,9 @@ async_submit(Fun) -> %%% gen_server callbacks %%%============================================================================= -init([I]) -> - gproc_pool:connect_worker(pooler, {pooler, I}), - {ok, #state{id = I}}. +init([Id]) -> + gproc_pool:connect_worker(pooler, {pooler, Id}), + {ok, #state{id = Id}}. handle_call({submit, Fun}, _From, State) -> {reply, run(Fun), State}; diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 840339819..9eab5fe14 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -38,7 +38,7 @@ -export([received/2, send/2, redeliver/2, shutdown/2]). --export([handle/2]). +-export([process/2]). %% Protocol State -record(proto_state, {peername, @@ -65,8 +65,8 @@ %%------------------------------------------------------------------------------ init(Peername, SendFun, Opts) -> - MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), - WsInitialHeaders = proplists:get_value(ws_initial_headers, Opts), + MaxLen = emqttd_opts:g(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), + WsInitialHeaders = emqttd_opts:g(ws_initial_headers, Opts), #proto_state{peername = Peername, sendfun = SendFun, max_clientid_len = MaxLen, @@ -130,7 +130,7 @@ session(#proto_state{session = Session}) -> %%A Client can only send the CONNECT Packet once over a Network Connection. -spec received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}. received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) -> - handle(Packet, State#proto_state{connected = true}); + process(Packet, State#proto_state{connected = true}); received(?PACKET(?CONNECT), State = #proto_state{connected = true}) -> {error, protocol_bad_connect, State}; @@ -143,19 +143,19 @@ received(Packet = ?PACKET(_Type), State) -> trace(recv, Packet, State), case validate_packet(Packet) of ok -> - handle(Packet, State); + process(Packet, State); {error, Reason} -> {error, Reason, State} end. -handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) -> +process(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) -> #mqtt_packet_connect{proto_ver = ProtoVer, proto_name = ProtoName, username = Username, password = Password, clean_sess = CleanSess, - keep_alive = KeepAlive, + keep_alive = KeepAlive, client_id = ClientId} = Var, State1 = State0#proto_state{proto_ver = ProtoVer, @@ -190,7 +190,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} exit({shutdown, Error}) end; {error, Reason}-> - lager:error("~s@~s: username '~s', login failed - ~s", + lager:error("~s@~s: username '~s' login failed for ~s", [ClientId, emqttd_net:format(Peername), Username, Reason]), {?CONNACK_CREDENTIALS, State1} @@ -203,8 +203,8 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} %% Send connack send(?CONNACK_PACKET(ReturnCode1), State3); -handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), - State = #proto_state{client_id = ClientId}) -> +process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), + State = #proto_state{client_id = ClientId}) -> case check_acl(publish, Topic, State) of allow -> @@ -214,70 +214,76 @@ handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), end, {ok, State}; -handle(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) -> - emqttd_session:puback(Session, PacketId), - {ok, State}; +process(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) -> + emqttd_session:puback(Session, PacketId), {ok, State}; -handle(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) -> +process(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) -> emqttd_session:pubrec(Session, PacketId), send(?PUBREL_PACKET(PacketId), State); -handle(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) -> +process(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) -> emqttd_session:pubrel(Session, PacketId), send(?PUBACK_PACKET(?PUBCOMP, PacketId), State); -handle(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session}) -> - emqttd_session:pubcomp(Session, PacketId), - {ok, State}; +process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session})-> + emqttd_session:pubcomp(Session, PacketId), {ok, State}; %% protect from empty topic list -handle(?SUBSCRIBE_PACKET(PacketId, []), State) -> +process(?SUBSCRIBE_PACKET(PacketId, []), State) -> send(?SUBACK_PACKET(PacketId, []), State); -handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = ClientId, session = Session}) -> +process(?SUBSCRIBE_PACKET(PacketId, TopicTable), + State = #proto_state{client_id = ClientId, session = Session}) -> AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable], case lists:member(deny, AllowDenies) of true -> - %%TODO: return 128 QoS when deny... no need to SUBACK? - lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]); + lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), + send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State); false -> - Callback = fun(GrantedQos) -> send(?SUBACK_PACKET(PacketId, GrantedQos), State) end, - emqttd_session:subscribe(Session, TopicTable, Callback) - end, - {ok, State}; + AckFun = fun(GrantedQos) -> + send(?SUBACK_PACKET(PacketId, GrantedQos), State) + end, + emqttd_session:subscribe(Session, TopicTable, AckFun), {ok, State} + end; %% protect from empty topic list -handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> +process(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); -handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> +process(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> emqttd_session:unsubscribe(Session, Topics), send(?UNSUBACK_PACKET(PacketId), State); -handle(?PACKET(?PINGREQ), State) -> +process(?PACKET(?PINGREQ), State) -> send(?PACKET(?PINGRESP), State); -handle(?PACKET(?DISCONNECT), State) -> +process(?PACKET(?DISCONNECT), State) -> % clean willmsg {stop, normal, State#proto_state{will_msg = undefined}}. -publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{client_id = ClientId, session = Session}) -> - emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)); +publish(Packet = ?PUBLISH(?QOS_0, _PacketId), + #proto_state{client_id = ClientId, session = Session}) -> + Msg = emqttd_message:from_packet(ClientId, Packet), + emqttd_session:publish(Session, Msg); -publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{client_id = ClientId, session = Session}) -> - case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of +publish(Packet = ?PUBLISH(?QOS_1, PacketId), + State = #proto_state{client_id = ClientId, session = Session}) -> + Msg = emqttd_message:from_packet(ClientId, Packet), + case emqttd_session:publish(Session, Msg) of ok -> send(?PUBACK_PACKET(?PUBACK, PacketId), State); {error, Error} -> - lager:error("Client ~s: publish qos1 error - ~p", [ClientId, Error]) + lager:error("Client(~s): publish qos1 error - ~p", [ClientId, Error]) end; -publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{client_id = ClientId, session = Session}) -> - case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of +publish(Packet = ?PUBLISH(?QOS_2, PacketId), + State = #proto_state{client_id = ClientId, session = Session}) -> + Msg = emqttd_message:from_packet(ClientId, Packet), + case emqttd_session:publish(Session, Msg) of ok -> send(?PUBACK_PACKET(?PUBREC, PacketId), State); {error, Error} -> - lager:error("Client ~s: publish qos2 error - ~p", [ClientId, Error]) + lager:error("Client(~s): publish qos2 error - ~p", [ClientId, Error]) end. -spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}. @@ -285,9 +291,9 @@ send(Msg, State) when is_record(Msg, mqtt_message) -> send(emqttd_message:to_packet(Msg), State); send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) - when is_record(Packet, mqtt_packet) -> + when is_record(Packet, mqtt_packet) -> trace(send, Packet, State), - sent_stats(Packet), + emqttd_metrics:sent(Packet), Data = emqttd_serialiser:serialise(Packet), lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]), emqttd_metrics:inc('bytes/sent', size(Data)), @@ -370,28 +376,31 @@ validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_c true; %% MQTT3.1.1 allow null clientId. -validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId}, _ProtoState) +validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, + client_id = ClientId}, _ProtoState) when size(ClientId) =:= 0 -> true; -validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}, _ProtoState) -> +validate_clientid(#mqtt_packet_connect{proto_ver = Ver, + clean_sess = CleanSess, + client_id = ClientId}, _ProtoState) -> lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]), false. -validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH}, +validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH}, variable = #mqtt_packet_publish{topic_name = Topic}}) -> case emqttd_topic:validate({name, Topic}) of - true -> ok; - false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic} + true -> ok; + false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic} end; -validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE}, +validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE}, variable = #mqtt_packet_subscribe{topic_table = Topics}}) -> validate_topics(filter, Topics); -validate_packet(#mqtt_packet{ header = #mqtt_packet_header{type = ?UNSUBSCRIBE}, - variable = #mqtt_packet_subscribe{topic_table = Topics}}) -> +validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE}, + variable = #mqtt_packet_subscribe{topic_table = Topics}}) -> validate_topics(filter, Topics); @@ -406,13 +415,16 @@ validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter -> ErrTopics = [Topic || {Topic, Qos} <- Topics, not (emqttd_topic:validate({Type, Topic}) and validate_qos(Qos))], case ErrTopics of - [] -> ok; - _ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic} + [] -> ok; + _ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic} end. -validate_qos(undefined) -> true; -validate_qos(Qos) when Qos =< ?QOS_2 -> true; -validate_qos(_) -> false. +validate_qos(undefined) -> + true; +validate_qos(Qos) when ?IS_QOS(Qos) -> + true; +validate_qos(_) -> + false. %% publish ACL is cached in process dictionary. check_acl(publish, Topic, State) -> @@ -428,20 +440,3 @@ check_acl(publish, Topic, State) -> check_acl(subscribe, Topic, State) -> emqttd_access_control:check_acl(client(State), subscribe, Topic). -sent_stats(?PACKET(Type)) -> - emqttd_metrics:inc('packets/sent'), - inc(Type). -inc(?CONNACK) -> - emqttd_metrics:inc('packets/connack'); -inc(?PUBLISH) -> - emqttd_metrics:inc('messages/sent'), - emqttd_metrics:inc('packets/publish/sent'); -inc(?SUBACK) -> - emqttd_metrics:inc('packets/suback'); -inc(?UNSUBACK) -> - emqttd_metrics:inc('packets/unsuback'); -inc(?PINGRESP) -> - emqttd_metrics:inc('packets/pingresp'); -inc(_) -> - ingore. - diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index a3387b6a3..509419335 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -54,7 +54,7 @@ start_link() -> init([]) -> mnesia:subscribe(system), - {ok, TRef} = timer:send_interval(1000, tick), + {ok, TRef} = timer:send_interval(timer:seconds(1), tick), StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), {ok, #state{stats_fun = StatsFun, tick_tref = TRef}}. diff --git a/src/emqttd_sysmon.erl b/src/emqttd_sysmon.erl index a86cad01e..1031ee478 100644 --- a/src/emqttd_sysmon.erl +++ b/src/emqttd_sysmon.erl @@ -36,7 +36,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {tref, events = []}). +-record(state, {tick_tref, events = []}). %%------------------------------------------------------------------------------ %% @doc Start system monitor @@ -53,8 +53,8 @@ start_link(Opts) -> init([Opts]) -> erlang:system_monitor(self(), parse_opt(Opts)), - {ok, TRef} = timer:send_interval(1000, reset), - {ok, #state{tref = TRef}}. + {ok, TRef} = timer:send_interval(timer:seconds(1), reset), + {ok, #state{tick_tref = TRef}}. parse_opt(Opts) -> parse_opt(Opts, []). @@ -134,8 +134,8 @@ handle_info(Info, State) -> lager:error("Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{tref = TRef}) -> - timer:cancel(TRef), ok. +terminate(_Reason, #state{tick_tref = TRef}) -> + timer:cancel(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index 7ea5c3ac7..8e2b22973 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% MQTT Topic +%%% MQTT Topic Functions %%% %%% @end %%%----------------------------------------------------------------------------- diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index fb83a4539..3d06e8432 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -101,18 +101,18 @@ ws_loop(Data, State = #wsocket_state{request = Req, Peer = Req:get(peer), lager:debug("RECV from ~s(WebSocket): ~p", [Peer, Data]), case Parser(iolist_to_binary(Data)) of - {more, NewParser} -> - State#wsocket_state{parser = NewParser}; - {ok, Packet, Rest} -> - gen_server:cast(ClientPid, {received, Packet}), - ws_loop(Rest, reset_parser(State), ReplyChannel); - {error, Error} -> - lager:error("MQTT(WebSocket) detected framing error ~p for connection ~s", [Error, Peer]), - exit({shutdown, Error}) + {more, NewParser} -> + State#wsocket_state{parser = NewParser}; + {ok, Packet, Rest} -> + gen_server:cast(ClientPid, {received, Packet}), + ws_loop(Rest, reset_parser(State), ReplyChannel); + {error, Error} -> + lager:error("MQTT(WebSocket) frame error ~p for connection ~s", [Error, Peer]), + exit({shutdown, Error}) end. reset_parser(State = #wsocket_state{packet_opts = PktOpts}) -> - State#wsocket_state{parser = emqttd_parser:new (PktOpts)}. + State#wsocket_state{parser = emqttd_parser:new(PktOpts)}. %%%============================================================================= %%% gen_fsm callbacks @@ -124,7 +124,8 @@ init([WsPid, Req, ReplyChannel, PktOpts]) -> SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, Headers = mochiweb_request:get(headers, Req), HeadersList = mochiweb_headers:to_list(Headers), - ProtoState = emqttd_protocol:init(Peername, SendFun, [{ws_initial_headers, HeadersList}|PktOpts]), + ProtoState = emqttd_protocol:init(Peername, SendFun, + [{ws_initial_headers, HeadersList}|PktOpts]), {ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> @@ -149,15 +150,15 @@ handle_cast({unsubscribe, Topics}, State) -> handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) -> case emqttd_protocol:received(Packet, ProtoState) of - {ok, ProtoState1} -> - noreply(State#client_state{proto_state = ProtoState1}); - {error, Error} -> - lager:error("MQTT protocol error ~p", [Error]), - stop({shutdown, Error}, State); - {error, Error, ProtoState1} -> - stop({shutdown, Error}, State#client_state{proto_state = ProtoState1}); - {stop, Reason, ProtoState1} -> - stop(Reason, State#client_state{proto_state = ProtoState1}) + {ok, ProtoState1} -> + noreply(State#client_state{proto_state = ProtoState1}); + {error, Error} -> + lager:error("MQTT protocol error ~p", [Error]), + stop({shutdown, Error}, State); + {error, Error, ProtoState1} -> + stop({shutdown, Error}, State#client_state{proto_state = ProtoState1}); + {stop, Reason, ProtoState1} -> + stop(Reason, State#client_state{proto_state = ProtoState1}) end; handle_cast(_Msg, State) -> @@ -189,18 +190,18 @@ handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req} handle_info({keepalive, check}, State = #client_state{request = Req, keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of - {ok, KeepAlive1} -> - lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]), - noreply(State#client_state{keepalive = KeepAlive1}); - {error, timeout} -> - lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]), - stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined}); - {error, Error} -> - lager:debug("Client(WebSocket) ~s: Keepalive Error: ~p", [Req:get(peer), Error]), - stop({shutdown, keepalive_error}, State#client_state{keepalive = undefined}) + {ok, KeepAlive1} -> + noreply(State#client_state{keepalive = KeepAlive1}); + {error, timeout} -> + lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]), + stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined}); + {error, Error} -> + lager:debug("Client(WebSocket) ~s: Keepalive Error: ~p", [Req:get(peer), Error]), + stop({shutdown, keepalive_error}, State#client_state{keepalive = undefined}) end; -handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto_state = ProtoState}) -> +handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, + proto_state = ProtoState}) -> ClientId = emqttd_protocol:clientid(ProtoState), lager:warning("Websocket client ~s exit: reason=~p", [ClientId, Reason]), stop({shutdown, websocket_closed}, State);