diff --git a/CHANGELOG.md b/CHANGELOG.md index aca2274ca..3a0a60206 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,22 @@ eMQTT ChangeLog ================== +v0.3.1-beta (2015-03-02) +------------------------ + +Feature: SSL Socket Support + +Feature: issue#44 HTTP API should add Qos parameter + +Bugfix: issue#52 emqtt_session crash + +Bugfix: issue#53 sslsocket keepalive error + +Upgrade: esockd to v0.2.0 + +Upgrade: mochiweb to v3.0.0 + + v0.3.0-beta (2015-01-19) ------------------------ diff --git a/Makefile b/Makefile index f70e77bab..383a1d53e 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,4 @@ +.PHONY: test REBAR=./rebar @@ -19,5 +20,11 @@ clean: @$(REBAR) clean rm -rf rel/emqtt +test: + @$(REBAR) skip_deps=true eunit + +edoc: + @$(REBAR) doc + dist: cd rel && ../rebar generate -f diff --git a/README.md b/README.md index 562e6ec90..44bd15db0 100644 --- a/README.md +++ b/README.md @@ -49,11 +49,18 @@ cd $INSTALL_DIR/emqtt {auth, {anonymous, []}}, %internal, anonymous {listen, [ {mqtt, 1883, [ - {max_conns, 1024}, + {max_clients, 1024}, {acceptor_pool, 4} ]}, + {mqtts, 8883, [ + {max_clients, 1024}, + {acceptor_pool, 4}, + %{cacertfile, "etc/ssl/cacert.pem"}, + {ssl, [{certfile, "etc/ssl.crt"}, + {keyfile, "etc/ssl.key"}]} + ]}, {http, 8083, [ - {max_conns, 512}, + {max_clients, 512}, {acceptor_pool, 1} ]} ]} diff --git a/TODO b/TODO index fa535fa3e..65559fcfa 100644 --- a/TODO +++ b/TODO @@ -1,3 +1,6 @@ +0.3.2 +===== +merge emqttc code... 0.2.2 ===== diff --git a/apps/emqtt/src/emqtt.erl b/apps/emqtt/src/emqtt.erl index 06d4b7d96..0ed219bfc 100644 --- a/apps/emqtt/src/emqtt.erl +++ b/apps/emqtt/src/emqtt.erl @@ -37,7 +37,11 @@ listen(Listeners) when is_list(Listeners) -> listen({mqtt, Port, Options}) -> MFArgs = {emqtt_client, start_link, []}, - esockd:listen(mqtt, Port, Options ++ ?MQTT_SOCKOPTS, MFArgs); + esockd:open(mqtt, Port, Options ++ ?MQTT_SOCKOPTS, MFArgs); + +listen({mqtts, Port, Options}) -> + MFArgs = {emqtt_client, start_link, []}, + esockd:open(mqtts, Port, Options ++ ?MQTT_SOCKOPTS, MFArgs); listen({http, Port, Options}) -> MFArgs = {emqtt_http, handle, []}, diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index 751f29d17..734203fa8 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -26,7 +26,7 @@ -behaviour(gen_server). --export([start_link/1, info/1, go/2]). +-export([start_link/1, info/1]). -export([init/1, handle_call/3, @@ -40,8 +40,9 @@ -include("emqtt_packet.hrl"). %%Client State... --record(state, { - socket, +-record(state, { + transport, + socket, peer_name, conn_name, await_recv, @@ -52,32 +53,28 @@ keepalive }). -start_link(Sock) -> - gen_server:start_link(?MODULE, [Sock], []). +start_link(SockArgs) -> + {ok, proc_lib:spawn_link(?MODULE, init, [SockArgs])}. info(Pid) -> gen_server:call(Pid, info). -go(Pid, Sock) -> - gen_server:call(Pid, {go, Sock}). - -init([Sock]) -> - {ok, #state{socket = Sock}, 1000}. - -handle_call({go, Sock}, _From, #state{socket = Sock}) -> +init(SockArgs = {Transport, Sock, _SockFun}) -> + %%TODO: replace emqtt_net?? + {ok, NewSock} = esockd_connection:accept(SockArgs), {ok, Peername} = emqtt_net:peer_string(Sock), {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), lager:info("Connect from ~s", [ConnStr]), - {reply, ok, - control_throttle( - #state{ socket = Sock, - peer_name = Peername, - conn_name = ConnStr, - await_recv = false, - conn_state = running, - conserve = false, - parse_state = emqtt_packet:initial_state(), - proto_state = emqtt_protocol:initial_state(Sock, Peername)}), 10000}; + State = control_throttle(#state{transport = Transport, + socket = NewSock, + peer_name = Peername, + conn_name = ConnStr, + await_recv = false, + conn_state = running, + conserve = false, + parse_state = emqtt_packet:initial_state(), + proto_state = emqtt_protocol:initial_state(Transport, NewSock, Peername)}), + gen_server:enter_loop(?MODULE, [], State, 10000). handle_call(info, _From, State = #state{ conn_name=ConnName, proto_state = ProtoState}) -> @@ -124,9 +121,9 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peer_name = Pee lager:critical("Client ~s: unexpected inet_reply '~p'", [PeerName, Reason]), {noreply, State}; -handle_info({keepalive, start, TimeoutSec}, State = #state{socket = Socket}) -> +handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) -> lager:info("Client ~s: Start KeepAlive with ~p seconds", [State#state.peer_name, TimeoutSec]), - KeepAlive = emqtt_keepalive:new(Socket, TimeoutSec, {keepalive, timeout}), + KeepAlive = emqtt_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}), {noreply, State#state{ keepalive = KeepAlive }}; handle_info({keepalive, timeout}, State = #state { keepalive = KeepAlive }) -> @@ -157,12 +154,6 @@ terminate(Reason, #state{ peer_name = PeerName, keepalive = KeepAlive, proto_sta code_change(_OldVsn, State, _Extra) -> {ok, State}. -async_recv(Sock, Length, infinity) when is_port(Sock) -> - prim_inet:async_recv(Sock, Length, -1); - -async_recv(Sock, Length, Timeout) when is_port(Sock) -> - prim_inet:async_recv(Sock, Length, Timeout). - %------------------------------------------------------- % receive and parse tcp data %------------------------------------------------------- @@ -203,12 +194,12 @@ network_error(Reason, State = #state{ peer_name = PeerName }) -> lager:error("Client ~s: MQTT detected network error '~p'", [PeerName, Reason]), stop({shutdown, conn_closed}, State). -run_socket(State = #state{ conn_state = blocked }) -> +run_socket(State = #state{conn_state = blocked}) -> State; -run_socket(State = #state{ await_recv = true }) -> +run_socket(State = #state{await_recv = true}) -> State; -run_socket(State = #state{ socket = Sock }) -> - async_recv(Sock, 0, infinity), +run_socket(State = #state{transport = Transport, socket = Sock}) -> + Transport:async_recv(Sock, 0, infinity), State#state{ await_recv = true }. control_throttle(State = #state{ conn_state = Flow, @@ -223,4 +214,3 @@ control_throttle(State = #state{ conn_state = Flow, stop(Reason, State ) -> {stop, Reason, State}. - diff --git a/apps/emqtt/src/emqtt_keepalive.erl b/apps/emqtt/src/emqtt_keepalive.erl index 7dd7be0f8..fa72722c2 100644 --- a/apps/emqtt/src/emqtt_keepalive.erl +++ b/apps/emqtt/src/emqtt_keepalive.erl @@ -26,29 +26,31 @@ -export([new/3, resume/1, cancel/1]). --record(keepalive, {socket, recv_oct, timeout_sec, timeout_msg, timer_ref}). +-record(keepalive, {transport, socket, recv_oct, timeout_sec, timeout_msg, timer_ref}). %% %% @doc create a keepalive. %% -new(Socket, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> - {ok, [{recv_oct, RecvOct}]} = inet:getstat(Socket, [recv_oct]), +new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> + {ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]), Ref = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg), - #keepalive { socket = Socket, - recv_oct = RecvOct, - timeout_sec = TimeoutSec, - timeout_msg = TimeoutMsg, - timer_ref = Ref }. + #keepalive {transport = Transport, + socket = Socket, + recv_oct = RecvOct, + timeout_sec = TimeoutSec, + timeout_msg = TimeoutMsg, + timer_ref = Ref }. %% %% @doc try to resume keepalive, called when timeout. %% -resume(KeepAlive = #keepalive { socket = Socket, - recv_oct = RecvOct, - timeout_sec = TimeoutSec, - timeout_msg = TimeoutMsg, - timer_ref = Ref }) -> - {ok, [{recv_oct, NewRecvOct}]} = inet:getstat(Socket, [recv_oct]), +resume(KeepAlive = #keepalive {transport = Transport, + socket = Socket, + recv_oct = RecvOct, + timeout_sec = TimeoutSec, + timeout_msg = TimeoutMsg, + timer_ref = Ref }) -> + {ok, [{recv_oct, NewRecvOct}]} = Transport:getstat(Socket, [recv_oct]), if NewRecvOct =:= RecvOct -> timeout; diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index 5da781f33..230e75745 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -30,7 +30,7 @@ %% API Function Exports %% ------------------------------------------------------------------ --export([initial_state/2, client_id/1]). +-export([initial_state/3, client_id/1]). -export([handle_packet/2, send_message/2, send_packet/2, redeliver/2, shutdown/2]). @@ -41,6 +41,7 @@ %% Protocol State %% ------------------------------------------------------------------ -record(proto_state, { + transport, socket, peer_name, connected = false, %received CONNECT action? @@ -72,8 +73,9 @@ -define(PUBACK_PACKET(PacketId), #mqtt_packet_puback { packet_id = PacketId }). -initial_state(Socket, Peername) -> +initial_state(Transport, Socket, Peername) -> #proto_state{ + transport = Transport, socket = Socket, peer_name = Peername }. @@ -245,12 +247,11 @@ send_message({_From, Message = #mqtt_message{ qos = Qos }}, State = #proto_state {Message1, NewSession} = emqtt_session:store(Session, Message), send_packet(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession}). -send_packet(Packet, State = #proto_state{socket = Sock, peer_name = PeerName, client_id = ClientId}) -> +send_packet(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_name = PeerName, client_id = ClientId}) -> lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]), Data = emqtt_packet:serialise(Packet), lager:debug("SENT to ~s: ~p", [PeerName, Data]), - %%FIXME Later... - erlang:port_command(Sock, Data), + Transport:send(Sock, Data), {ok, State}. %% diff --git a/doc/topic.md b/doc/topic.md new file mode 100644 index 000000000..c087cdcfa --- /dev/null +++ b/doc/topic.md @@ -0,0 +1,17 @@ +Topic Types: + +static: + + /brokers/alerts/ + /brokers/clients/connected + /brokers/clients/disconnected + +dynamic: + + created when subscribe... + +bridge: + + cretated when bridge... + + diff --git a/rebar b/rebar index 13f76298b..c2b7e2022 100755 Binary files a/rebar and b/rebar differ diff --git a/rebar.config b/rebar.config index 50069b6c4..4c6146618 100644 --- a/rebar.config +++ b/rebar.config @@ -1,16 +1,26 @@ +%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- +%% ex: ts=4 sw=4 ft=erlang et + {require_min_otp_vsn, "R17"}. {erl_opts, [debug_info, {parse_transform, lager_transform}]}. -{erl_opts, [{i, "include"}, +{erl_opts, [warn_export_all, + warn_unused_import, + {i, "include"}, {src_dirs, ["src"]}]}. +{xref_checks, [undefined_function_calls]}. +{cover_enabled, false}. + +{validate_app_modules, true}. + {sub_dirs, [ "rel", "apps/emqtt"]}. {deps, [ {lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}}, - {esockd, ".*", {git, "git://github.com/slimpp/esockd.git", {branch, "master"}}}, + {esockd, ".*", {git, "git://github.com/emqtt/esockd.git", {branch, "master"}}}, {mochiweb, ".*", {git, "git://github.com/slimpp/mochiweb.git", {branch, "master"}}} ]}. diff --git a/rel/files/app.config b/rel/files/app.config index f1af6ad77..15fc6dc02 100644 --- a/rel/files/app.config +++ b/rel/files/app.config @@ -1,7 +1,7 @@ %% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- %% ex: ft=erlang ts=4 sw=4 et [{kernel, - [{start_timer, true}, + [{start_timer, true}, {start_pg2, true} ]}, {sasl, [ @@ -10,6 +10,9 @@ {mnesia, [ {dir, "data"} ]}, + {ssl, [ + %{versions, ['tlsv1.2', 'tlsv1.1']} + ]}, {lager, [ {error_logger_redirect, false}, {crash_log, "log/emqtt_crash.log"}, @@ -31,6 +34,9 @@ ]} ]} ]}, + {esockd, [ + {logger, {lager, info}} + ]}, {emqtt, [ %Authetication. Internal, Anonymous Default. {auth, {anonymous, []}}, @@ -45,11 +51,18 @@ ]}, {listen, [ {mqtt, 1883, [ - {max_conns, 1024}, + {max_clients, 1024}, {acceptor_pool, 4} ]}, + {mqtts, 8883, [ + {max_clients, 1024}, + {acceptor_pool, 4}, + %{cacertfile, "etc/ssl/cacert.pem"}, + {ssl, [{certfile, "etc/ssl.crt"}, + {keyfile, "etc/ssl.key"}]} + ]}, {http, 8083, [ - {max_conns, 512}, + {max_clients, 512}, {acceptor_pool, 1} ]} ]} diff --git a/rel/files/ssl/ssl.crt b/rel/files/ssl/ssl.crt new file mode 100644 index 000000000..001844674 --- /dev/null +++ b/rel/files/ssl/ssl.crt @@ -0,0 +1,17 @@ +-----BEGIN CERTIFICATE----- +MIICuTCCAiICCQC8+3PPaqATfDANBgkqhkiG9w0BAQUFADCBoDELMAkGA1UEBhMC +Q0gxETAPBgNVBAgTCFpoZUppYW5nMREwDwYDVQQHEwhIYW5nWmhvdTEUMBIGA1UE +ChMLWGlhb0xpIFRlY2gxHzAdBgNVBAsTFkluZm9ybWF0aW9uIFRlY2hub2xvZ3kx +EzARBgNVBAMTCnQuZW1xdHQuaW8xHzAdBgkqhkiG9w0BCQEWEGZlbmcgYXQgZW1x +dHQuaW8wHhcNMTUwMjI1MTc0NjQwWhcNMTYwMjI1MTc0NjQwWjCBoDELMAkGA1UE +BhMCQ0gxETAPBgNVBAgTCFpoZUppYW5nMREwDwYDVQQHEwhIYW5nWmhvdTEUMBIG +A1UEChMLWGlhb0xpIFRlY2gxHzAdBgNVBAsTFkluZm9ybWF0aW9uIFRlY2hub2xv +Z3kxEzARBgNVBAMTCnQuZW1xdHQuaW8xHzAdBgkqhkiG9w0BCQEWEGZlbmcgYXQg +ZW1xdHQuaW8wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBALAtN2OHsvltOk+9 +AtlwMtKuaWW2WpV/S0lRRG9x9k8pyd5PJeeYAr2jVsoWnZInb1CoEOHFcwxZLjv3 +gEvz+X+//W02YyI9hnvCJUpT/+6P0gJEbmTmqL078M6vbtwtiF1YC7mdo0nGAZuK +qedpIoEZbVJavf4S0vXWTsb3s5unAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAgUR3 +z4uDUsAl+xUorPMBIOS/ncHHVk1XucVv9Wi4chzzZ+4/Y77/fFqP6oxhQ59C9Q8i +iT5wjaE4R1eCge18lPSw3yb1tsTe5B3WkRTzziPq/Q/AsC+DifkkE1YW67leuJV/ +vz74sEi0dudmOVoe6peYxjEH8xXoIUqhnwXt/4Q= +-----END CERTIFICATE----- diff --git a/rel/files/ssl/ssl.key b/rel/files/ssl/ssl.key new file mode 100644 index 000000000..5d5786fac --- /dev/null +++ b/rel/files/ssl/ssl.key @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXAIBAAKBgQCwLTdjh7L5bTpPvQLZcDLSrmlltlqVf0tJUURvcfZPKcneTyXn +mAK9o1bKFp2SJ29QqBDhxXMMWS4794BL8/l/v/1tNmMiPYZ7wiVKU//uj9ICRG5k +5qi9O/DOr27cLYhdWAu5naNJxgGbiqnnaSKBGW1SWr3+EtL11k7G97ObpwIDAQAB +AoGBAKU1cbiLG0GdtU3rME3ZUj+RQNMZ4u5IVcBmTie4FcN8q4ombKQ2P3O4RX3z +IUZaZp+bS2F8uHt+8cVYPl57Zp5fwbIlv6jWgGpvXLsX8JBQl2OTw38B+hVwJvAM +h0mBzprUOs3KGZyF5cyA4osrZ4QvCZhwId9fAjwLGBF9i1yBAkEA4jWAF1sWQiwF +vY476m+0ihpRwGKjldKHWFZmvoB/AnNV/rXO+HRl3MB5wmO+Dqg3gJZrjGBgDeaV +g9hoQjK6ZwJBAMdg57iKLd8uUb7c4pR8fDdDbeeI5X7WDf2k9emT3BMPJPQ3EiSf +CStn1hRfp31U9CXEnw94rKHhrdMFrYjdzMECQCcWD3f5qTLt4GAMf5XWj199hLq1 +UIbGxdQhuccY9Nk7jJRiXczYb/Fg4KkSCvkFX/G8DAFJdc9xFEyfzAQEN+kCQH3a +nMrvZn9gBLffRKOIZPyZctHZp0xGIHTA4X39GMlrIN+Lt8coIKimlgssSlSiAK+q +iuFAQnC5PXlcNyuTHsECQAMNMY6jXikgSUZfVXitAFX3g9+IbjT9eJ92f60QneW8 +mxWQoqP3fqCSbTEysb7NojEEwppSZtaNgnBb5R4E+mU= +-----END RSA PRIVATE KEY----- diff --git a/rel/reltool.config b/rel/reltool.config index 2103d021c..5e74c70e0 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -65,6 +65,8 @@ {template, "files/emqtt.cmd", "bin/emqtt.cmd"}, {copy, "files/start_erl.cmd", "bin/start_erl.cmd"}, {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, + {copy, "files/ssl/ssl.crt", "etc/ssl.crt"}, + {copy, "files/ssl/ssl.key", "etc/ssl.key"}, {template, "files/app.config", "etc/app.config"}, {template, "files/vm.args", "etc/vm.args"} ]}.