fix conflict
This commit is contained in:
commit
bb2e8c9653
16
CHANGELOG.md
16
CHANGELOG.md
|
@ -2,6 +2,22 @@
|
|||
eMQTT ChangeLog
|
||||
==================
|
||||
|
||||
v0.3.1-beta (2015-03-02)
|
||||
------------------------
|
||||
|
||||
Feature: SSL Socket Support
|
||||
|
||||
Feature: issue#44 HTTP API should add Qos parameter
|
||||
|
||||
Bugfix: issue#52 emqtt_session crash
|
||||
|
||||
Bugfix: issue#53 sslsocket keepalive error
|
||||
|
||||
Upgrade: esockd to v0.2.0
|
||||
|
||||
Upgrade: mochiweb to v3.0.0
|
||||
|
||||
|
||||
v0.3.0-beta (2015-01-19)
|
||||
------------------------
|
||||
|
||||
|
|
7
Makefile
7
Makefile
|
@ -1,3 +1,4 @@
|
|||
.PHONY: test
|
||||
|
||||
REBAR=./rebar
|
||||
|
||||
|
@ -19,5 +20,11 @@ clean:
|
|||
@$(REBAR) clean
|
||||
rm -rf rel/emqtt
|
||||
|
||||
test:
|
||||
@$(REBAR) skip_deps=true eunit
|
||||
|
||||
edoc:
|
||||
@$(REBAR) doc
|
||||
|
||||
dist:
|
||||
cd rel && ../rebar generate -f
|
||||
|
|
11
README.md
11
README.md
|
@ -49,11 +49,18 @@ cd $INSTALL_DIR/emqtt
|
|||
{auth, {anonymous, []}}, %internal, anonymous
|
||||
{listen, [
|
||||
{mqtt, 1883, [
|
||||
{max_conns, 1024},
|
||||
{max_clients, 1024},
|
||||
{acceptor_pool, 4}
|
||||
]},
|
||||
{mqtts, 8883, [
|
||||
{max_clients, 1024},
|
||||
{acceptor_pool, 4},
|
||||
%{cacertfile, "etc/ssl/cacert.pem"},
|
||||
{ssl, [{certfile, "etc/ssl.crt"},
|
||||
{keyfile, "etc/ssl.key"}]}
|
||||
]},
|
||||
{http, 8083, [
|
||||
{max_conns, 512},
|
||||
{max_clients, 512},
|
||||
{acceptor_pool, 1}
|
||||
]}
|
||||
]}
|
||||
|
|
|
@ -37,7 +37,11 @@ listen(Listeners) when is_list(Listeners) ->
|
|||
|
||||
listen({mqtt, Port, Options}) ->
|
||||
MFArgs = {emqtt_client, start_link, []},
|
||||
esockd:listen(mqtt, Port, Options ++ ?MQTT_SOCKOPTS, MFArgs);
|
||||
esockd:open(mqtt, Port, Options ++ ?MQTT_SOCKOPTS, MFArgs);
|
||||
|
||||
listen({mqtts, Port, Options}) ->
|
||||
MFArgs = {emqtt_client, start_link, []},
|
||||
esockd:open(mqtts, Port, Options ++ ?MQTT_SOCKOPTS, MFArgs);
|
||||
|
||||
listen({http, Port, Options}) ->
|
||||
MFArgs = {emqtt_http, handle, []},
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-export([start_link/1, info/1, go/2]).
|
||||
-export([start_link/1, info/1]).
|
||||
|
||||
-export([init/1,
|
||||
handle_call/3,
|
||||
|
@ -40,8 +40,9 @@
|
|||
-include("emqtt_packet.hrl").
|
||||
|
||||
%%Client State...
|
||||
-record(state, {
|
||||
socket,
|
||||
-record(state, {
|
||||
transport,
|
||||
socket,
|
||||
peer_name,
|
||||
conn_name,
|
||||
await_recv,
|
||||
|
@ -52,32 +53,28 @@
|
|||
keepalive
|
||||
}).
|
||||
|
||||
start_link(Sock) ->
|
||||
gen_server:start_link(?MODULE, [Sock], []).
|
||||
start_link(SockArgs) ->
|
||||
{ok, proc_lib:spawn_link(?MODULE, init, [SockArgs])}.
|
||||
|
||||
info(Pid) ->
|
||||
gen_server:call(Pid, info).
|
||||
|
||||
go(Pid, Sock) ->
|
||||
gen_server:call(Pid, {go, Sock}).
|
||||
|
||||
init([Sock]) ->
|
||||
{ok, #state{socket = Sock}, 1000}.
|
||||
|
||||
handle_call({go, Sock}, _From, #state{socket = Sock}) ->
|
||||
init(SockArgs = {Transport, Sock, _SockFun}) ->
|
||||
%%TODO: replace emqtt_net??
|
||||
{ok, NewSock} = esockd_connection:accept(SockArgs),
|
||||
{ok, Peername} = emqtt_net:peer_string(Sock),
|
||||
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
|
||||
lager:info("Connect from ~s", [ConnStr]),
|
||||
{reply, ok,
|
||||
control_throttle(
|
||||
#state{ socket = Sock,
|
||||
peer_name = Peername,
|
||||
conn_name = ConnStr,
|
||||
await_recv = false,
|
||||
conn_state = running,
|
||||
conserve = false,
|
||||
parse_state = emqtt_packet:initial_state(),
|
||||
proto_state = emqtt_protocol:initial_state(Sock, Peername)}), 10000};
|
||||
State = control_throttle(#state{transport = Transport,
|
||||
socket = NewSock,
|
||||
peer_name = Peername,
|
||||
conn_name = ConnStr,
|
||||
await_recv = false,
|
||||
conn_state = running,
|
||||
conserve = false,
|
||||
parse_state = emqtt_packet:initial_state(),
|
||||
proto_state = emqtt_protocol:initial_state(Transport, NewSock, Peername)}),
|
||||
gen_server:enter_loop(?MODULE, [], State, 10000).
|
||||
|
||||
handle_call(info, _From, State = #state{
|
||||
conn_name=ConnName, proto_state = ProtoState}) ->
|
||||
|
@ -124,9 +121,9 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peer_name = Pee
|
|||
lager:critical("Client ~s: unexpected inet_reply '~p'", [PeerName, Reason]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({keepalive, start, TimeoutSec}, State = #state{socket = Socket}) ->
|
||||
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) ->
|
||||
lager:info("Client ~s: Start KeepAlive with ~p seconds", [State#state.peer_name, TimeoutSec]),
|
||||
KeepAlive = emqtt_keepalive:new(Socket, TimeoutSec, {keepalive, timeout}),
|
||||
KeepAlive = emqtt_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}),
|
||||
{noreply, State#state{ keepalive = KeepAlive }};
|
||||
|
||||
handle_info({keepalive, timeout}, State = #state { keepalive = KeepAlive }) ->
|
||||
|
@ -157,12 +154,6 @@ terminate(Reason, #state{ peer_name = PeerName, keepalive = KeepAlive, proto_sta
|
|||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
async_recv(Sock, Length, infinity) when is_port(Sock) ->
|
||||
prim_inet:async_recv(Sock, Length, -1);
|
||||
|
||||
async_recv(Sock, Length, Timeout) when is_port(Sock) ->
|
||||
prim_inet:async_recv(Sock, Length, Timeout).
|
||||
|
||||
%-------------------------------------------------------
|
||||
% receive and parse tcp data
|
||||
%-------------------------------------------------------
|
||||
|
@ -203,12 +194,12 @@ network_error(Reason, State = #state{ peer_name = PeerName }) ->
|
|||
lager:error("Client ~s: MQTT detected network error '~p'", [PeerName, Reason]),
|
||||
stop({shutdown, conn_closed}, State).
|
||||
|
||||
run_socket(State = #state{ conn_state = blocked }) ->
|
||||
run_socket(State = #state{conn_state = blocked}) ->
|
||||
State;
|
||||
run_socket(State = #state{ await_recv = true }) ->
|
||||
run_socket(State = #state{await_recv = true}) ->
|
||||
State;
|
||||
run_socket(State = #state{ socket = Sock }) ->
|
||||
async_recv(Sock, 0, infinity),
|
||||
run_socket(State = #state{transport = Transport, socket = Sock}) ->
|
||||
Transport:async_recv(Sock, 0, infinity),
|
||||
State#state{ await_recv = true }.
|
||||
|
||||
control_throttle(State = #state{ conn_state = Flow,
|
||||
|
@ -223,4 +214,3 @@ control_throttle(State = #state{ conn_state = Flow,
|
|||
stop(Reason, State ) ->
|
||||
{stop, Reason, State}.
|
||||
|
||||
|
||||
|
|
|
@ -26,29 +26,31 @@
|
|||
|
||||
-export([new/3, resume/1, cancel/1]).
|
||||
|
||||
-record(keepalive, {socket, recv_oct, timeout_sec, timeout_msg, timer_ref}).
|
||||
-record(keepalive, {transport, socket, recv_oct, timeout_sec, timeout_msg, timer_ref}).
|
||||
|
||||
%%
|
||||
%% @doc create a keepalive.
|
||||
%%
|
||||
new(Socket, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 ->
|
||||
{ok, [{recv_oct, RecvOct}]} = inet:getstat(Socket, [recv_oct]),
|
||||
new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 ->
|
||||
{ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]),
|
||||
Ref = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg),
|
||||
#keepalive { socket = Socket,
|
||||
recv_oct = RecvOct,
|
||||
timeout_sec = TimeoutSec,
|
||||
timeout_msg = TimeoutMsg,
|
||||
timer_ref = Ref }.
|
||||
#keepalive {transport = Transport,
|
||||
socket = Socket,
|
||||
recv_oct = RecvOct,
|
||||
timeout_sec = TimeoutSec,
|
||||
timeout_msg = TimeoutMsg,
|
||||
timer_ref = Ref }.
|
||||
|
||||
%%
|
||||
%% @doc try to resume keepalive, called when timeout.
|
||||
%%
|
||||
resume(KeepAlive = #keepalive { socket = Socket,
|
||||
recv_oct = RecvOct,
|
||||
timeout_sec = TimeoutSec,
|
||||
timeout_msg = TimeoutMsg,
|
||||
timer_ref = Ref }) ->
|
||||
{ok, [{recv_oct, NewRecvOct}]} = inet:getstat(Socket, [recv_oct]),
|
||||
resume(KeepAlive = #keepalive {transport = Transport,
|
||||
socket = Socket,
|
||||
recv_oct = RecvOct,
|
||||
timeout_sec = TimeoutSec,
|
||||
timeout_msg = TimeoutMsg,
|
||||
timer_ref = Ref }) ->
|
||||
{ok, [{recv_oct, NewRecvOct}]} = Transport:getstat(Socket, [recv_oct]),
|
||||
if
|
||||
NewRecvOct =:= RecvOct ->
|
||||
timeout;
|
||||
|
|
|
@ -30,7 +30,7 @@
|
|||
%% API Function Exports
|
||||
%% ------------------------------------------------------------------
|
||||
|
||||
-export([initial_state/2, client_id/1]).
|
||||
-export([initial_state/3, client_id/1]).
|
||||
|
||||
-export([handle_packet/2, send_message/2, send_packet/2, redeliver/2, shutdown/2]).
|
||||
|
||||
|
@ -41,6 +41,7 @@
|
|||
%% Protocol State
|
||||
%% ------------------------------------------------------------------
|
||||
-record(proto_state, {
|
||||
transport,
|
||||
socket,
|
||||
peer_name,
|
||||
connected = false, %received CONNECT action?
|
||||
|
@ -72,8 +73,9 @@
|
|||
|
||||
-define(PUBACK_PACKET(PacketId), #mqtt_packet_puback { packet_id = PacketId }).
|
||||
|
||||
initial_state(Socket, Peername) ->
|
||||
initial_state(Transport, Socket, Peername) ->
|
||||
#proto_state{
|
||||
transport = Transport,
|
||||
socket = Socket,
|
||||
peer_name = Peername
|
||||
}.
|
||||
|
@ -245,12 +247,11 @@ send_message({_From, Message = #mqtt_message{ qos = Qos }}, State = #proto_state
|
|||
{Message1, NewSession} = emqtt_session:store(Session, Message),
|
||||
send_packet(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession}).
|
||||
|
||||
send_packet(Packet, State = #proto_state{socket = Sock, peer_name = PeerName, client_id = ClientId}) ->
|
||||
send_packet(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_name = PeerName, client_id = ClientId}) ->
|
||||
lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]),
|
||||
Data = emqtt_packet:serialise(Packet),
|
||||
lager:debug("SENT to ~s: ~p", [PeerName, Data]),
|
||||
%%FIXME Later...
|
||||
erlang:port_command(Sock, Data),
|
||||
Transport:send(Sock, Data),
|
||||
{ok, State}.
|
||||
|
||||
%%
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
Topic Types:
|
||||
|
||||
static:
|
||||
|
||||
/brokers/alerts/
|
||||
/brokers/clients/connected
|
||||
/brokers/clients/disconnected
|
||||
|
||||
dynamic:
|
||||
|
||||
created when subscribe...
|
||||
|
||||
bridge:
|
||||
|
||||
cretated when bridge...
|
||||
|
||||
|
14
rebar.config
14
rebar.config
|
@ -1,16 +1,26 @@
|
|||
%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
|
||||
%% ex: ts=4 sw=4 ft=erlang et
|
||||
|
||||
{require_min_otp_vsn, "R17"}.
|
||||
|
||||
{erl_opts, [debug_info, {parse_transform, lager_transform}]}.
|
||||
|
||||
{erl_opts, [{i, "include"},
|
||||
{erl_opts, [warn_export_all,
|
||||
warn_unused_import,
|
||||
{i, "include"},
|
||||
{src_dirs, ["src"]}]}.
|
||||
|
||||
{xref_checks, [undefined_function_calls]}.
|
||||
{cover_enabled, false}.
|
||||
|
||||
{validate_app_modules, true}.
|
||||
|
||||
{sub_dirs, [
|
||||
"rel",
|
||||
"apps/emqtt"]}.
|
||||
|
||||
{deps, [
|
||||
{lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}},
|
||||
{esockd, ".*", {git, "git://github.com/slimpp/esockd.git", {branch, "master"}}},
|
||||
{esockd, ".*", {git, "git://github.com/emqtt/esockd.git", {branch, "master"}}},
|
||||
{mochiweb, ".*", {git, "git://github.com/slimpp/mochiweb.git", {branch, "master"}}}
|
||||
]}.
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
|
||||
%% ex: ft=erlang ts=4 sw=4 et
|
||||
[{kernel,
|
||||
[{start_timer, true},
|
||||
[{start_timer, true},
|
||||
{start_pg2, true}
|
||||
]},
|
||||
{sasl, [
|
||||
|
@ -10,6 +10,9 @@
|
|||
{mnesia, [
|
||||
{dir, "data"}
|
||||
]},
|
||||
{ssl, [
|
||||
%{versions, ['tlsv1.2', 'tlsv1.1']}
|
||||
]},
|
||||
{lager, [
|
||||
{error_logger_redirect, false},
|
||||
{crash_log, "log/emqtt_crash.log"},
|
||||
|
@ -31,6 +34,9 @@
|
|||
]}
|
||||
]}
|
||||
]},
|
||||
{esockd, [
|
||||
{logger, {lager, info}}
|
||||
]},
|
||||
{emqtt, [
|
||||
%Authetication. Internal, Anonymous Default.
|
||||
{auth, {anonymous, []}},
|
||||
|
@ -45,11 +51,18 @@
|
|||
]},
|
||||
{listen, [
|
||||
{mqtt, 1883, [
|
||||
{max_conns, 1024},
|
||||
{max_clients, 1024},
|
||||
{acceptor_pool, 4}
|
||||
]},
|
||||
{mqtts, 8883, [
|
||||
{max_clients, 1024},
|
||||
{acceptor_pool, 4},
|
||||
%{cacertfile, "etc/ssl/cacert.pem"},
|
||||
{ssl, [{certfile, "etc/ssl.crt"},
|
||||
{keyfile, "etc/ssl.key"}]}
|
||||
]},
|
||||
{http, 8083, [
|
||||
{max_conns, 512},
|
||||
{max_clients, 512},
|
||||
{acceptor_pool, 1}
|
||||
]}
|
||||
]}
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIICuTCCAiICCQC8+3PPaqATfDANBgkqhkiG9w0BAQUFADCBoDELMAkGA1UEBhMC
|
||||
Q0gxETAPBgNVBAgTCFpoZUppYW5nMREwDwYDVQQHEwhIYW5nWmhvdTEUMBIGA1UE
|
||||
ChMLWGlhb0xpIFRlY2gxHzAdBgNVBAsTFkluZm9ybWF0aW9uIFRlY2hub2xvZ3kx
|
||||
EzARBgNVBAMTCnQuZW1xdHQuaW8xHzAdBgkqhkiG9w0BCQEWEGZlbmcgYXQgZW1x
|
||||
dHQuaW8wHhcNMTUwMjI1MTc0NjQwWhcNMTYwMjI1MTc0NjQwWjCBoDELMAkGA1UE
|
||||
BhMCQ0gxETAPBgNVBAgTCFpoZUppYW5nMREwDwYDVQQHEwhIYW5nWmhvdTEUMBIG
|
||||
A1UEChMLWGlhb0xpIFRlY2gxHzAdBgNVBAsTFkluZm9ybWF0aW9uIFRlY2hub2xv
|
||||
Z3kxEzARBgNVBAMTCnQuZW1xdHQuaW8xHzAdBgkqhkiG9w0BCQEWEGZlbmcgYXQg
|
||||
ZW1xdHQuaW8wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBALAtN2OHsvltOk+9
|
||||
AtlwMtKuaWW2WpV/S0lRRG9x9k8pyd5PJeeYAr2jVsoWnZInb1CoEOHFcwxZLjv3
|
||||
gEvz+X+//W02YyI9hnvCJUpT/+6P0gJEbmTmqL078M6vbtwtiF1YC7mdo0nGAZuK
|
||||
qedpIoEZbVJavf4S0vXWTsb3s5unAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAgUR3
|
||||
z4uDUsAl+xUorPMBIOS/ncHHVk1XucVv9Wi4chzzZ+4/Y77/fFqP6oxhQ59C9Q8i
|
||||
iT5wjaE4R1eCge18lPSw3yb1tsTe5B3WkRTzziPq/Q/AsC+DifkkE1YW67leuJV/
|
||||
vz74sEi0dudmOVoe6peYxjEH8xXoIUqhnwXt/4Q=
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,15 @@
|
|||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIICXAIBAAKBgQCwLTdjh7L5bTpPvQLZcDLSrmlltlqVf0tJUURvcfZPKcneTyXn
|
||||
mAK9o1bKFp2SJ29QqBDhxXMMWS4794BL8/l/v/1tNmMiPYZ7wiVKU//uj9ICRG5k
|
||||
5qi9O/DOr27cLYhdWAu5naNJxgGbiqnnaSKBGW1SWr3+EtL11k7G97ObpwIDAQAB
|
||||
AoGBAKU1cbiLG0GdtU3rME3ZUj+RQNMZ4u5IVcBmTie4FcN8q4ombKQ2P3O4RX3z
|
||||
IUZaZp+bS2F8uHt+8cVYPl57Zp5fwbIlv6jWgGpvXLsX8JBQl2OTw38B+hVwJvAM
|
||||
h0mBzprUOs3KGZyF5cyA4osrZ4QvCZhwId9fAjwLGBF9i1yBAkEA4jWAF1sWQiwF
|
||||
vY476m+0ihpRwGKjldKHWFZmvoB/AnNV/rXO+HRl3MB5wmO+Dqg3gJZrjGBgDeaV
|
||||
g9hoQjK6ZwJBAMdg57iKLd8uUb7c4pR8fDdDbeeI5X7WDf2k9emT3BMPJPQ3EiSf
|
||||
CStn1hRfp31U9CXEnw94rKHhrdMFrYjdzMECQCcWD3f5qTLt4GAMf5XWj199hLq1
|
||||
UIbGxdQhuccY9Nk7jJRiXczYb/Fg4KkSCvkFX/G8DAFJdc9xFEyfzAQEN+kCQH3a
|
||||
nMrvZn9gBLffRKOIZPyZctHZp0xGIHTA4X39GMlrIN+Lt8coIKimlgssSlSiAK+q
|
||||
iuFAQnC5PXlcNyuTHsECQAMNMY6jXikgSUZfVXitAFX3g9+IbjT9eJ92f60QneW8
|
||||
mxWQoqP3fqCSbTEysb7NojEEwppSZtaNgnBb5R4E+mU=
|
||||
-----END RSA PRIVATE KEY-----
|
|
@ -65,6 +65,8 @@
|
|||
{template, "files/emqtt.cmd", "bin/emqtt.cmd"},
|
||||
{copy, "files/start_erl.cmd", "bin/start_erl.cmd"},
|
||||
{copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"},
|
||||
{copy, "files/ssl/ssl.crt", "etc/ssl.crt"},
|
||||
{copy, "files/ssl/ssl.key", "etc/ssl.key"},
|
||||
{template, "files/app.config", "etc/app.config"},
|
||||
{template, "files/vm.args", "etc/vm.args"}
|
||||
]}.
|
||||
|
|
Loading…
Reference in New Issue