Merge branch 'dev'

This commit is contained in:
Ery Lee 2015-03-02 02:02:42 +08:00
commit e64f2c02fe
13 changed files with 131 additions and 45 deletions

View File

@ -2,6 +2,16 @@
eMQTT ChangeLog eMQTT ChangeLog
================== ==================
v0.3.1-beta (2015-03-01)
------------------------
Feature: SSL Socket Support
Change: upgrade esockd to v0.2.0
Change: upgrade mochiweb to v3.0.0
v0.3.0-beta (2015-01-19) v0.3.0-beta (2015-01-19)
------------------------ ------------------------

View File

@ -1,14 +1,30 @@
.PHONY: test
REBAR=./rebar
all: get-deps compile all: get-deps compile
compile: get-deps compile: get-deps
./rebar compile @$(REBAR) compile
get-deps: get-deps:
./rebar get-deps @$(REBAR) get-deps
update-deps:
@$(REBAR) update-deps
xref:
@$(REBAR) xref skip_deps=true
clean: clean:
./rebar clean @$(REBAR) clean
rm -rf rel/emqtt rm -rf rel/emqtt
test:
@$(REBAR) skip_deps=true eunit
edoc:
@$(REBAR) doc
dist: dist:
cd rel && ../rebar generate -f cd rel && ../rebar generate -f

3
TODO
View File

@ -1,3 +1,6 @@
0.3.2
=====
merge emqttc code...
0.2.2 0.2.2
===== =====

View File

@ -37,7 +37,11 @@ listen(Listeners) when is_list(Listeners) ->
listen({mqtt, Port, Options}) -> listen({mqtt, Port, Options}) ->
MFArgs = {emqtt_client, start_link, []}, MFArgs = {emqtt_client, start_link, []},
esockd:listen(mqtt, Port, Options ++ ?MQTT_SOCKOPTS, MFArgs); esockd:open(mqtt, Port, Options ++ ?MQTT_SOCKOPTS, MFArgs);
listen({mqtts, Port, Options}) ->
MFArgs = {emqtt_client, start_link, []},
esockd:open(mqtts, Port, Options ++ ?MQTT_SOCKOPTS, MFArgs);
listen({http, Port, Options}) -> listen({http, Port, Options}) ->
MFArgs = {emqtt_http, handle, []}, MFArgs = {emqtt_http, handle, []},

View File

@ -26,7 +26,7 @@
-behaviour(gen_server). -behaviour(gen_server).
-export([start_link/1, info/1, go/2]). -export([start_link/1, info/1]).
-export([init/1, -export([init/1,
handle_call/3, handle_call/3,
@ -41,6 +41,7 @@
%%Client State... %%Client State...
-record(state, { -record(state, {
transport,
socket, socket,
peer_name, peer_name,
conn_name, conn_name,
@ -52,32 +53,29 @@
keepalive keepalive
}). }).
start_link(Sock) -> start_link(SockArgs) ->
gen_server:start_link(?MODULE, [Sock], []). io:format("start_link: ~p~n", [SockArgs]),
{ok, proc_lib:spawn_link(?MODULE, init, [SockArgs])}.
info(Pid) -> info(Pid) ->
gen_server:call(Pid, info). gen_server:call(Pid, info).
go(Pid, Sock) -> init(SockArgs = {Transport, Sock, _SockFun}) ->
gen_server:call(Pid, {go, Sock}). %%TODO: replace emqtt_net??
{ok, NewSock} = esockd_connection:accept(SockArgs),
init([Sock]) ->
{ok, #state{socket = Sock}, 1000}.
handle_call({go, Sock}, _From, #state{socket = Sock}) ->
{ok, Peername} = emqtt_net:peer_string(Sock), {ok, Peername} = emqtt_net:peer_string(Sock),
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
lager:info("Connect from ~s", [ConnStr]), lager:info("Connect from ~s", [ConnStr]),
{reply, ok, State = control_throttle(#state{transport = Transport,
control_throttle( socket = NewSock,
#state{ socket = Sock,
peer_name = Peername, peer_name = Peername,
conn_name = ConnStr, conn_name = ConnStr,
await_recv = false, await_recv = false,
conn_state = running, conn_state = running,
conserve = false, conserve = false,
parse_state = emqtt_packet:initial_state(), parse_state = emqtt_packet:initial_state(),
proto_state = emqtt_protocol:initial_state(Sock, Peername)}), 10000}; proto_state = emqtt_protocol:initial_state(Transport, NewSock, Peername)}),
gen_server:enter_loop(?MODULE, [], State, 10000).
handle_call(info, _From, State = #state{ handle_call(info, _From, State = #state{
conn_name=ConnName, proto_state = ProtoState}) -> conn_name=ConnName, proto_state = ProtoState}) ->
@ -157,12 +155,6 @@ terminate(Reason, #state{ peer_name = PeerName, keepalive = KeepAlive, proto_sta
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
async_recv(Sock, Length, infinity) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, -1);
async_recv(Sock, Length, Timeout) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, Timeout).
%------------------------------------------------------- %-------------------------------------------------------
% receive and parse tcp data % receive and parse tcp data
%------------------------------------------------------- %-------------------------------------------------------
@ -203,12 +195,12 @@ network_error(Reason, State = #state{ peer_name = PeerName }) ->
lager:error("Client ~s: MQTT detected network error '~p'", [PeerName, Reason]), lager:error("Client ~s: MQTT detected network error '~p'", [PeerName, Reason]),
stop({shutdown, conn_closed}, State). stop({shutdown, conn_closed}, State).
run_socket(State = #state{ conn_state = blocked }) -> run_socket(State = #state{conn_state = blocked}) ->
State; State;
run_socket(State = #state{ await_recv = true }) -> run_socket(State = #state{await_recv = true}) ->
State; State;
run_socket(State = #state{ socket = Sock }) -> run_socket(State = #state{transport = Transport, socket = Sock}) ->
async_recv(Sock, 0, infinity), Transport:async_recv(Sock, 0, infinity),
State#state{ await_recv = true }. State#state{ await_recv = true }.
control_throttle(State = #state{ conn_state = Flow, control_throttle(State = #state{ conn_state = Flow,
@ -223,4 +215,3 @@ control_throttle(State = #state{ conn_state = Flow,
stop(Reason, State ) -> stop(Reason, State ) ->
{stop, Reason, State}. {stop, Reason, State}.

View File

@ -30,7 +30,7 @@
%% API Function Exports %% API Function Exports
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
-export([initial_state/2, client_id/1]). -export([initial_state/3, client_id/1]).
-export([handle_packet/2, send_message/2, send_packet/2, redeliver/2, shutdown/2]). -export([handle_packet/2, send_message/2, send_packet/2, redeliver/2, shutdown/2]).
@ -41,6 +41,7 @@
%% Protocol State %% Protocol State
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
-record(proto_state, { -record(proto_state, {
transport,
socket, socket,
peer_name, peer_name,
connected = false, %received CONNECT action? connected = false, %received CONNECT action?
@ -72,8 +73,9 @@
-define(PUBACK_PACKET(PacketId), #mqtt_packet_puback { packet_id = PacketId }). -define(PUBACK_PACKET(PacketId), #mqtt_packet_puback { packet_id = PacketId }).
initial_state(Socket, Peername) -> initial_state(Transport, Socket, Peername) ->
#proto_state{ #proto_state{
transport = Transport,
socket = Socket, socket = Socket,
peer_name = Peername peer_name = Peername
}. }.
@ -245,12 +247,11 @@ send_message({_From, Message = #mqtt_message{ qos = Qos }}, State = #proto_state
{Message1, NewSession} = emqtt_session:store(Session, Message), {Message1, NewSession} = emqtt_session:store(Session, Message),
send_packet(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession}). send_packet(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession}).
send_packet(Packet, State = #proto_state{socket = Sock, peer_name = PeerName, client_id = ClientId}) -> send_packet(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_name = PeerName, client_id = ClientId}) ->
lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]), lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]),
Data = emqtt_packet:serialise(Packet), Data = emqtt_packet:serialise(Packet),
lager:debug("SENT to ~s: ~p", [PeerName, Data]), lager:debug("SENT to ~s: ~p", [PeerName, Data]),
%%FIXME Later... Transport:send(Sock, Data),
erlang:port_command(Sock, Data),
{ok, State}. {ok, State}.
%% %%

17
doc/topic.md Normal file
View File

@ -0,0 +1,17 @@
Topic Types:
static:
/brokers/alerts/
/brokers/clients/connected
/brokers/clients/disconnected
dynamic:
created when subscribe...
bridge:
cretated when bridge...

BIN
rebar vendored

Binary file not shown.

View File

@ -11,6 +11,6 @@
{deps, [ {deps, [
{lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}}, {lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}},
{esockd, ".*", {git, "git://github.com/slimpp/esockd.git", {branch, "master"}}}, {esockd, ".*", {git, "git://github.com/emqtt/esockd.git", {branch, "master"}}},
{mochiweb, ".*", {git, "git://github.com/slimpp/mochiweb.git", {branch, "master"}}} {mochiweb, ".*", {git, "git://github.com/slimpp/mochiweb.git", {branch, "master"}}}
]}. ]}.

View File

@ -10,6 +10,9 @@
{mnesia, [ {mnesia, [
{dir, "data"} {dir, "data"}
]}, ]},
{ssl, [
%{versions, ['tlsv1.2', 'tlsv1.1']}
]},
{lager, [ {lager, [
{error_logger_redirect, false}, {error_logger_redirect, false},
{crash_log, "log/emqtt_crash.log"}, {crash_log, "log/emqtt_crash.log"},
@ -45,11 +48,18 @@
]}, ]},
{listen, [ {listen, [
{mqtt, 1883, [ {mqtt, 1883, [
{max_conns, 1024}, {max_clients, 1024},
{acceptor_pool, 4} {acceptor_pool, 4}
]}, ]},
{'mqtts', 8883, [
{max_clients, 1024},
{acceptor_pool, 4},
%{cacertfile, "etc/ssl/cacert.pem"},
{ssl, [{certfile, "etc/ssl.crt"},
{keyfile, "etc/ssl.key"}]}
]},
{http, 8083, [ {http, 8083, [
{max_conns, 512}, {max_clients, 512},
{acceptor_pool, 1} {acceptor_pool, 1}
]} ]}
]} ]}

17
rel/files/ssl/ssl.crt Normal file
View File

@ -0,0 +1,17 @@
-----BEGIN CERTIFICATE-----
MIICuTCCAiICCQC8+3PPaqATfDANBgkqhkiG9w0BAQUFADCBoDELMAkGA1UEBhMC
Q0gxETAPBgNVBAgTCFpoZUppYW5nMREwDwYDVQQHEwhIYW5nWmhvdTEUMBIGA1UE
ChMLWGlhb0xpIFRlY2gxHzAdBgNVBAsTFkluZm9ybWF0aW9uIFRlY2hub2xvZ3kx
EzARBgNVBAMTCnQuZW1xdHQuaW8xHzAdBgkqhkiG9w0BCQEWEGZlbmcgYXQgZW1x
dHQuaW8wHhcNMTUwMjI1MTc0NjQwWhcNMTYwMjI1MTc0NjQwWjCBoDELMAkGA1UE
BhMCQ0gxETAPBgNVBAgTCFpoZUppYW5nMREwDwYDVQQHEwhIYW5nWmhvdTEUMBIG
A1UEChMLWGlhb0xpIFRlY2gxHzAdBgNVBAsTFkluZm9ybWF0aW9uIFRlY2hub2xv
Z3kxEzARBgNVBAMTCnQuZW1xdHQuaW8xHzAdBgkqhkiG9w0BCQEWEGZlbmcgYXQg
ZW1xdHQuaW8wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBALAtN2OHsvltOk+9
AtlwMtKuaWW2WpV/S0lRRG9x9k8pyd5PJeeYAr2jVsoWnZInb1CoEOHFcwxZLjv3
gEvz+X+//W02YyI9hnvCJUpT/+6P0gJEbmTmqL078M6vbtwtiF1YC7mdo0nGAZuK
qedpIoEZbVJavf4S0vXWTsb3s5unAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAgUR3
z4uDUsAl+xUorPMBIOS/ncHHVk1XucVv9Wi4chzzZ+4/Y77/fFqP6oxhQ59C9Q8i
iT5wjaE4R1eCge18lPSw3yb1tsTe5B3WkRTzziPq/Q/AsC+DifkkE1YW67leuJV/
vz74sEi0dudmOVoe6peYxjEH8xXoIUqhnwXt/4Q=
-----END CERTIFICATE-----

15
rel/files/ssl/ssl.key Normal file
View File

@ -0,0 +1,15 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQCwLTdjh7L5bTpPvQLZcDLSrmlltlqVf0tJUURvcfZPKcneTyXn
mAK9o1bKFp2SJ29QqBDhxXMMWS4794BL8/l/v/1tNmMiPYZ7wiVKU//uj9ICRG5k
5qi9O/DOr27cLYhdWAu5naNJxgGbiqnnaSKBGW1SWr3+EtL11k7G97ObpwIDAQAB
AoGBAKU1cbiLG0GdtU3rME3ZUj+RQNMZ4u5IVcBmTie4FcN8q4ombKQ2P3O4RX3z
IUZaZp+bS2F8uHt+8cVYPl57Zp5fwbIlv6jWgGpvXLsX8JBQl2OTw38B+hVwJvAM
h0mBzprUOs3KGZyF5cyA4osrZ4QvCZhwId9fAjwLGBF9i1yBAkEA4jWAF1sWQiwF
vY476m+0ihpRwGKjldKHWFZmvoB/AnNV/rXO+HRl3MB5wmO+Dqg3gJZrjGBgDeaV
g9hoQjK6ZwJBAMdg57iKLd8uUb7c4pR8fDdDbeeI5X7WDf2k9emT3BMPJPQ3EiSf
CStn1hRfp31U9CXEnw94rKHhrdMFrYjdzMECQCcWD3f5qTLt4GAMf5XWj199hLq1
UIbGxdQhuccY9Nk7jJRiXczYb/Fg4KkSCvkFX/G8DAFJdc9xFEyfzAQEN+kCQH3a
nMrvZn9gBLffRKOIZPyZctHZp0xGIHTA4X39GMlrIN+Lt8coIKimlgssSlSiAK+q
iuFAQnC5PXlcNyuTHsECQAMNMY6jXikgSUZfVXitAFX3g9+IbjT9eJ92f60QneW8
mxWQoqP3fqCSbTEysb7NojEEwppSZtaNgnBb5R4E+mU=
-----END RSA PRIVATE KEY-----

View File

@ -65,6 +65,8 @@
{template, "files/emqtt.cmd", "bin/emqtt.cmd"}, {template, "files/emqtt.cmd", "bin/emqtt.cmd"},
{copy, "files/start_erl.cmd", "bin/start_erl.cmd"}, {copy, "files/start_erl.cmd", "bin/start_erl.cmd"},
{copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"},
{copy, "files/ssl/ssl.crt", "etc/ssl.crt"},
{copy, "files/ssl/ssl.key", "etc/ssl.key"},
{template, "files/app.config", "etc/app.config"}, {template, "files/app.config", "etc/app.config"},
{template, "files/vm.args", "etc/vm.args"} {template, "files/vm.args", "etc/vm.args"}
]}. ]}.