Merge pull request #1380 from emqtt/emq24

Version 2.3.2 -Upgrade the esockd library to v5.1
This commit is contained in:
huangdan 2017-12-06 15:55:19 +08:00 committed by GitHub
commit dd72083da5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 109 additions and 12 deletions

View File

@ -1,6 +1,6 @@
PROJECT = emqttd
PROJECT_DESCRIPTION = Erlang MQTT Broker
PROJECT_VERSION = 2.3.1
PROJECT_VERSION = 2.3.2
DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt clique jsx
@ -27,7 +27,7 @@ dep_cuttlefish = git https://github.com/emqtt/cuttlefish
TEST_DEPS = emqttc emq_dashboard
dep_emqttc = git https://github.com/emqtt/emqttc
dep_emq_dashboard = git https://github.com/emqtt/emq_dashboard
dep_emq_dashboard = git https://github.com/emqtt/emq_dashboard develop
TEST_ERLC_OPTS += +debug_info
TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'

View File

@ -343,6 +343,10 @@ listener.tcp.external.access.2 = allow all
## TCP Socket Options
listener.tcp.external.backlog = 1024
listener.tcp.external.send_timeout = 15s
listener.tcp.external.send_timeout_close = on
#listener.tcp.external.recbuf = 4KB
#listener.tcp.external.sndbuf = 4KB
@ -371,6 +375,10 @@ listener.tcp.internal.max_clients = 102400
## TCP Socket Options
listener.tcp.internal.backlog = 512
listener.tcp.internal.send_timeout = 15s
listener.tcp.external.send_timeout_close = on
listener.tcp.internal.tune_buffer = on
listener.tcp.internal.buffer = 1MB
@ -477,6 +485,10 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
## SSL Socket Options
## listener.ssl.external.backlog = 1024
## listener.ssl.external.send_timeout = 15s
## listener.ssl.external.send_timeout_close = on
## listener.ssl.external.recbuf = 4KB
## listener.ssl.external.sndbuf = 4KB
@ -499,6 +511,10 @@ listener.ws.external.access.1 = allow all
## TCP Options
listener.ws.external.backlog = 1024
listener.ws.external.send_timeout = 15s
listener.ws.external.send_timeout_close = on
listener.ws.external.recbuf = 4KB
listener.ws.external.sndbuf = 4KB
@ -531,6 +547,20 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
## listener.wss.external.fail_if_no_peer_cert = true
listener.wss.external.backlog = 1024
listener.wss.external.send_timeout = 15s
listener.wss.external.send_timeout_close = on
## listener.wss.external.recbuf = 4KB
## listener.wss.external.sndbuf = 4KB
## listener.wss.external.buffer = 4KB
## listener.wss.external.nodelay = true
##--------------------------------------------------------------------
## HTTP Management API Listener
@ -542,6 +572,12 @@ listener.api.mgmt.max_clients = 64
listener.api.mgmt.access.1 = allow all
listener.api.mgmt.backlog = 512
listener.api.mgmt.send_timeout = 15s
listener.api.mgmt.send_timeout_close = on
##-------------------------------------------------------------------
## System Monitor
##-------------------------------------------------------------------

View File

@ -805,8 +805,18 @@ end}.
]}.
{mapping, "listener.tcp.$name.backlog", "emqttd.listeners", [
{default, 1024},
{datatype, integer}
{datatype, integer},
{default, 1024}
]}.
{mapping, "listener.tcp.$name.send_timeout", "emqttd.listeners", [
{datatype, {duration, ms}},
{default, "15s"}
]}.
{mapping, "listener.tcp.$name.send_timeout_close", "emqttd.listeners", [
{datatype, flag},
{default, on}
]}.
{mapping, "listener.tcp.$name.recbuf", "emqttd.listeners", [
@ -883,6 +893,16 @@ end}.
{datatype, integer}
]}.
{mapping, "listener.ssl.$name.send_timeout", "emqttd.listeners", [
{datatype, {duration, ms}},
{default, "15s"}
]}.
{mapping, "listener.ssl.$name.send_timeout_close", "emqttd.listeners", [
{datatype, flag},
{default, on}
]}.
{mapping, "listener.ssl.$name.recbuf", "emqttd.listeners", [
{datatype, bytesize},
hidden
@ -996,6 +1016,16 @@ end}.
{datatype, integer}
]}.
{mapping, "listener.ws.$name.send_timeout", "emqttd.listeners", [
{datatype, {duration, ms}},
{default, "15s"}
]}.
{mapping, "listener.ws.$name.send_timeout_close", "emqttd.listeners", [
{datatype, flag},
{default, on}
]}.
{mapping, "listener.ws.$name.recbuf", "emqttd.listeners", [
{datatype, bytesize},
hidden
@ -1059,6 +1089,16 @@ end}.
{datatype, integer}
]}.
{mapping, "listener.wss.$name.send_timeout", "emqttd.listeners", [
{datatype, {duration, ms}},
{default, "15s"}
]}.
{mapping, "listener.wss.$name.send_timeout_close", "emqttd.listeners", [
{datatype, flag},
{default, on}
]}.
{mapping, "listener.wss.$name.recbuf", "emqttd.listeners", [
{datatype, bytesize},
hidden
@ -1145,6 +1185,8 @@ end}.
end,
TcpOpts = fun(Prefix) ->
Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
{send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)},
{send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)},
{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
{sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
{buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
@ -1252,6 +1294,16 @@ end}.
{datatype, integer}
]}.
{mapping, "listener.api.$name.send_timeout", "emqttd.listeners", [
{datatype, {duration, ms}},
{default, "15s"}
]}.
{mapping, "listener.api.$name.send_timeout_close", "emqttd.listeners", [
{datatype, flag},
{default, on}
]}.
{mapping, "listener.api.$name.recbuf", "emqttd.listeners", [
{datatype, bytesize},
hidden

View File

@ -1,6 +1,6 @@
{application,emqttd,
[{description,"Erlang MQTT Broker"},
{vsn,"2.3.1"},
{vsn,"2.3.2"},
{modules,[]},
{registered,[emqttd_sup]},
{applications,[kernel,stdlib,gproc,lager,esockd,mochiweb,

View File

@ -140,7 +140,9 @@ send_fun(Conn, Peername) ->
?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}),
emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
try Conn:async_send(Data) of
true -> ok
ok -> ok;
true -> ok; %% Compatible with esockd 4.x
{error, Reason} -> Self ! {shutdown, Reason}
catch
error:Error -> Self ! {shutdown, Error}
end

View File

@ -341,13 +341,11 @@ send(Msg, State = #proto_state{client_id = ClientId,
emqttd_hooks:run('message.delivered', [ClientId, Username], Msg),
send(emqttd_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State);
send(Packet = ?PACKET(Type),
State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
trace(send, Packet, State),
emqttd_metrics:sent(Packet),
SendFun(Packet),
Stats1 = inc_stats(send, Type, Stats),
{ok, State#proto_state{stats_data = Stats1}}.
{ok, State#proto_state{stats_data = inc_stats(send, Type, Stats)}}.
trace(recv, Packet, ProtoState) ->
?LOG(debug, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);

View File

@ -38,6 +38,7 @@ handle_request(Req) ->
%%--------------------------------------------------------------------
%% MQTT Over WebSocket
%%--------------------------------------------------------------------
handle_request('GET', "/mqtt", Req) ->
lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]),
Upgrade = Req:get_header_value("Upgrade"),

View File

@ -272,10 +272,14 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
send_fun(ReplyChannel) ->
Self = self(),
fun(Packet) ->
Data = emqttd_serializer:serialize(Packet),
emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
ReplyChannel({binary, Data})
case ReplyChannel({binary, Data}) of
ok -> ok;
{error, Reason} -> Self ! {shutdown, Reason}
end
end.
stat_fun(Conn) ->

View File

@ -328,7 +328,10 @@ router_print(_) ->
#mqtt_route{topic = <<"#">>, node = node()},
#mqtt_route{topic = <<"+/#">>, node = node()}],
lists:foreach(fun(R) -> emqttd_router:add_route(R) end, Routes),
emqttd_router:print(<<"a/b/c">>).
emqttd_router:print(<<"a/b/c">>),
emqttd_router:del_route(<<"+/#">>),
emqttd_router:del_route(<<"a/b/c">>),
emqttd_router:del_route(<<"#">>).
router_unused(_) ->
gen_server:call(emqttd_router, bad_call),
@ -598,6 +601,7 @@ conflict_listeners(_) ->
L = proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners),
?assertEqual(1, proplists:get_value(current_clients, L)),
?assertEqual(1, proplists:get_value(conflict, proplists:get_value(shutdown_count, L))),
timer:sleep(100),
emqttc:disconnect(C2).
cli_vm(_) ->