diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl index 435408f47..f9d70fc04 100644 --- a/apps/emqttd/src/emqttd.erl +++ b/apps/emqttd/src/emqttd.erl @@ -56,19 +56,22 @@ open(Listeners) when is_list(Listeners) -> %% open mqtt port open({mqtt, Port, Options}) -> - MFArgs = {emqttd_client, start_link, []}, - esockd:open(mqtt, Port, emqttd_opts:merge(?MQTT_SOCKOPTS, Options) , MFArgs); + open(mqtt, Port, Options); %% open mqtt(SSL) port open({mqtts, Port, Options}) -> - MFArgs = {emqttd_client, start_link, []}, - esockd:open(mqtts, Port, emqttd_opts:merge(?MQTT_SOCKOPTS, Options) , MFArgs); + open(mqtts, Port, Options); %% open http port open({http, Port, Options}) -> MFArgs = {emqttd_http, handle, []}, mochiweb:start_http(Port, Options, MFArgs). +open(Protocol, Port, Options) -> + {ok, PktOpts} = application:get_env(emqttd, packet), + MFArgs = {emqttd_client, start_link, [PktOpts]}, + esockd:open(Protocol, Port, emqttd_opts:merge(?MQTT_SOCKOPTS, Options) , MFArgs). + is_running(Node) -> case rpc:call(Node, erlang, whereis, [emqttd]) of {badrpc, _} -> false; diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index cda22a495..449ae697f 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -30,7 +30,7 @@ -behaviour(gen_server). --export([start_link/1, info/1]). +-export([start_link/2, info/1]). -export([init/1, handle_call/3, @@ -53,30 +53,34 @@ conserve, parse_state, proto_state, + packet_opts, keepalive}). -start_link(SockArgs) -> - {ok, proc_lib:spawn_link(?MODULE, init, [SockArgs])}. +start_link(SockArgs, PktOpts) -> + {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, PktOpts]])}. %%TODO: rename? info(Pid) -> gen_server:call(Pid, info). -init(SockArgs = {Transport, Sock, _SockFun}) -> +init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> %transform if ssl. {ok, NewSock} = esockd_connection:accept(SockArgs), {ok, Peername} = emqttd_net:peer_string(Sock), {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), lager:info("Connect from ~s", [ConnStr]), + ParserState = emqttd_parser:init(PacketOpts), + ProtoState = emqttd_protocol:init({Transport, NewSock, Peername}, PacketOpts), State = control_throttle(#state{transport = Transport, - socket = NewSock, + socket = NewSock, peer_name = Peername, - conn_name = ConnStr, - await_recv = false, - conn_state = running, - conserve = false, - parse_state = emqttd_parser:init(), - proto_state = emqttd_protocol:init(Transport, NewSock, Peername)}), + conn_name = ConnStr, + await_recv = false, + conn_state = running, + conserve = false, + packet_opts = PacketOpts, + parse_state = ParserState, + proto_state = ProtoState}), gen_server:enter_loop(?MODULE, [], State, 10000). %%TODO: Not enough... @@ -168,7 +172,8 @@ code_change(_OldVsn, State, _Extra) -> process_received_bytes(<<>>, State) -> {noreply, State, hibernate}; -process_received_bytes(Bytes, State = #state{parse_state = ParseState, +process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts, + parse_state = ParseState, proto_state = ProtoState, conn_name = ConnStr}) -> case emqttd_parser:parse(Bytes, ParseState) of @@ -180,7 +185,7 @@ process_received_bytes(Bytes, State = #state{parse_state = ParseState, received_stats(Packet), case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - process_received_bytes(Rest, State#state{parse_state = emqttd_parser:init(), + process_received_bytes(Rest, State#state{parse_state = emqttd_parser:init(PacketOpts), proto_state = ProtoState1}); {error, Error} -> lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]), diff --git a/apps/emqttd/src/emqttd_parser.erl b/apps/emqttd/src/emqttd_parser.erl index ef1d970bf..1390843e4 100644 --- a/apps/emqttd/src/emqttd_parser.erl +++ b/apps/emqttd/src/emqttd_parser.erl @@ -31,7 +31,11 @@ -include("emqttd_packet.hrl"). %% API --export([init/0, parse/2]). +-export([init/1, parse/2]). + +-record(mqtt_packet_limit, {max_packet_size}). + +-type option() :: {atom(), any()}. %%%----------------------------------------------------------------------------- %% @doc @@ -39,8 +43,11 @@ %% %% @end %%%----------------------------------------------------------------------------- --spec init() -> none. -init() -> none. +-spec init(Opts :: [option()]) -> {none, #mqtt_packet_limit{}}. +init(Opts) -> {none, limit(Opts)}. + +limit(Opts) -> + #mqtt_packet_limit{max_packet_size = proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}. %%%----------------------------------------------------------------------------- %% @doc @@ -48,33 +55,36 @@ init() -> none. %% %% @end %%%----------------------------------------------------------------------------- --spec parse(binary(), none | fun()) -> {ok, mqtt_packet()} | {error, any()} | {more, fun()}. -parse(<<>>, none) -> - {more, fun(Bin) -> parse(Bin, none) end}; -parse(<>, none) -> +-spec parse(binary(), {none, [option()]} | fun()) -> {ok, mqtt_packet()} | {error, any()} | {more, fun()}. +parse(<<>>, {none, Limit}) -> + {more, fun(Bin) -> parse(Bin, {none, Limit}) end}; +parse(<>, {none, Limit}) -> parse_remaining_len(Rest, #mqtt_packet_header{type = PacketType, dup = bool(Dup), qos = QoS, - retain = bool(Retain)}); + retain = bool(Retain)}, Limit); parse(Bin, Cont) -> Cont(Bin). -parse_remaining_len(<<>>, Header) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header) end}; -parse_remaining_len(Rest, Header) -> - parse_remaining_len(Rest, Header, 1, 0). +parse_remaining_len(<<>>, Header, Limit) -> + {more, fun(Bin) -> parse_remaining_len(Bin, Header, Limit) end}; +parse_remaining_len(Rest, Header, Limit) -> + parse_remaining_len(Rest, Header, 1, 0, Limit). -parse_remaining_len(_Bin, _Header, _Multiplier, Length) - when Length > ?MAX_LEN -> +parse_remaining_len(_Bin, _Header, _Multiplier, Length, #mqtt_packet_limit{max_packet_size = MaxLen}) + when Length > MaxLen -> {error, invalid_mqtt_frame_len}; -parse_remaining_len(<<>>, Header, Multiplier, Length) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length) end}; -parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value) -> - parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier); -parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value) -> - parse_frame(Rest, Header, Value + Len * Multiplier). +parse_remaining_len(<<>>, Header, Multiplier, Length, Limit) -> + {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Limit) end}; +parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Limit) -> + parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Limit); +parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, #mqtt_packet_limit{max_packet_size = MaxLen}) -> + FrameLen = Value + Len * Multiplier, + if + FrameLen > MaxLen -> {error, invalid_mqtt_frame_len}; + true -> parse_frame(Rest, Header, FrameLen) + end. -parse_frame(Bin, #mqtt_packet_header{type = Type, - qos = Qos} = Header, Length) -> +parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) -> case {Type, Bin} of {?CONNECT, <>} -> {ProtoName, Rest1} = parse_utf(FrameBin), diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 71684de8f..e78637ab3 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -31,7 +31,7 @@ -include("emqttd_packet.hrl"). %% API --export([init/3, client_id/1]). +-export([init/2, client_id/1]). -export([received/2, send/2, redeliver/2, shutdown/2]). @@ -43,32 +43,34 @@ socket, peer_name, connected = false, %received CONNECT action? - proto_vsn, + proto_ver, proto_name, %packet_id, client_id, clean_sess, session, %% session state or session pid - will_msg + will_msg, + max_clientid_len = ?MAX_CLIENTID_LEN }). -type proto_state() :: #proto_state{}. -init(Transport, Socket, Peername) -> +init({Transport, Socket, Peername}, Opts) -> #proto_state{ - transport = Transport, - socket = Socket, - peer_name = Peername}. + transport = Transport, + socket = Socket, + peer_name = Peername, + max_clientid_len = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN)}. client_id(#proto_state{client_id = ClientId}) -> ClientId. %%SHOULD be registered in emqttd_cm -info(#proto_state{proto_vsn = ProtoVsn, +info(#proto_state{proto_ver = ProtoVer, proto_name = ProtoName, client_id = ClientId, clean_sess = CleanSess, will_msg = WillMsg}) -> - [{proto_vsn, ProtoVsn}, + [{proto_ver, ProtoVer}, {proto_name, ProtoName}, {client_id, ClientId}, {clean_sess, CleanSess}, @@ -100,7 +102,8 @@ received(Packet = ?PACKET(_Type), State = #proto_state{peer_name = PeerName, handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peer_name = PeerName}) -> - #mqtt_packet_connect{username = Username, + #mqtt_packet_connect{proto_ver = ProtoVer, + username = Username, password = Password, clean_sess = CleanSess, keep_alive = KeepAlive, @@ -109,22 +112,24 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peer_name = PeerName} lager:info("RECV from ~s@~s: ~s", [ClientId, PeerName, emqttd_packet:dump(Packet)]), {ReturnCode1, State1} = - case validate_connect(Var) of + case validate_connect(Var, State) of ?CONNACK_ACCEPT -> case emqttd_auth:check(Username, Password) of true -> ClientId1 = clientid(ClientId, State), start_keepalive(KeepAlive), emqttd_cm:register(ClientId1, self()), - {?CONNACK_ACCEPT, State#proto_state{will_msg = willmsg(Var), + {?CONNACK_ACCEPT, State#proto_state{proto_ver = ProtoVer, + client_id = ClientId1, clean_sess = CleanSess, - client_id = ClientId1}}; + will_msg = willmsg(Var)}}; false -> lager:error("~s@~s: username '~s' login failed - no credentials", [ClientId, PeerName, Username]), {?CONNACK_CREDENTIALS, State#proto_state{client_id = ClientId}} end; ReturnCode -> - {ReturnCode, State#proto_state{client_id = ClientId}} + {ReturnCode, State#proto_state{client_id = ClientId, + clean_sess = CleanSess}} end, notify(connected, ReturnCode1, State1), send(?CONNACK_PACKET(ReturnCode1), State1), @@ -234,10 +239,10 @@ start_keepalive(Sec) when Sec > 0 -> %%---------------------------------------------------------------------------- %% Validate Packets %%---------------------------------------------------------------------------- -validate_connect(Connect = #mqtt_packet_connect{}) -> +validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) -> case validate_protocol(Connect) of true -> - case validate_clientid(Connect) of + case validate_clientid(Connect, ProtoState) of true -> ?CONNACK_ACCEPT; false -> @@ -250,16 +255,16 @@ validate_connect(Connect = #mqtt_packet_connect{}) -> validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) -> lists:member({Ver, Name}, ?PROTOCOL_NAMES). -validate_clientid(#mqtt_packet_connect{client_id = ClientId}) - when ( size(ClientId) >= 1 ) andalso ( size(ClientId) =< ?MAX_CLIENTID_LEN ) -> +validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_clientid_len = MaxLen}) + when ( size(ClientId) >= 1 ) andalso ( size(ClientId) =< MaxLen ) -> true; %% MQTT3.1.1 allow null clientId. -validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId}) +validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId}, _ProtoState) when size(ClientId) =:= 0 -> true; -validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}) -> +validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}, _ProtoState) -> lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]), false. @@ -320,7 +325,7 @@ inc(_) -> ingore. notify(connected, ReturnCode, #proto_state{peer_name = PeerName, - proto_vsn = ProtoVsn, + proto_ver = ProtoVer, client_id = ClientId, clean_sess = CleanSess}) -> Sess = case CleanSess of @@ -328,7 +333,7 @@ notify(connected, ReturnCode, #proto_state{peer_name = PeerName, false -> true end, Params = [{from, PeerName}, - {protocol, ProtoVsn}, + {protocol, ProtoVer}, {session, Sess}, {connack, ReturnCode}], emqttd_event:notify({connected, ClientId, Params}). diff --git a/rel/files/app.config b/rel/files/app.config index 44c6ff882..47b9097a5 100644 --- a/rel/files/app.config +++ b/rel/files/app.config @@ -43,7 +43,7 @@ {access, []}, {packet, [ {max_clientid_len, 1024}, - {max_packet_size, 64k}, + {max_packet_size, 16#ffff} ]}, {session, [ {expires, 1},