From 8e41aeeeb8242d630fa699906b0208ed84938cec Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 5 Dec 2017 19:47:17 +0800 Subject: [PATCH 1/8] Add send_timeout, send_timeout_close options --- etc/emq.conf | 36 +++++++++++++++++++++++++++++++ priv/emq.schema | 56 +++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 90 insertions(+), 2 deletions(-) diff --git a/etc/emq.conf b/etc/emq.conf index 9b37860b9..d6dacc8fb 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -343,6 +343,10 @@ listener.tcp.external.access.2 = allow all ## TCP Socket Options listener.tcp.external.backlog = 1024 +listener.tcp.external.send_timeout = 15s + +listener.tcp.external.send_timeout_close = on + #listener.tcp.external.recbuf = 4KB #listener.tcp.external.sndbuf = 4KB @@ -371,6 +375,10 @@ listener.tcp.internal.max_clients = 102400 ## TCP Socket Options listener.tcp.internal.backlog = 512 +listener.tcp.internal.send_timeout = 15s + +listener.tcp.external.send_timeout_close = on + listener.tcp.internal.tune_buffer = on listener.tcp.internal.buffer = 1MB @@ -477,6 +485,10 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## SSL Socket Options ## listener.ssl.external.backlog = 1024 +## listener.ssl.external.send_timeout = 15s + +## listener.ssl.external.send_timeout_close = on + ## listener.ssl.external.recbuf = 4KB ## listener.ssl.external.sndbuf = 4KB @@ -499,6 +511,10 @@ listener.ws.external.access.1 = allow all ## TCP Options listener.ws.external.backlog = 1024 +listener.ws.external.send_timeout = 15s + +listener.ws.external.send_timeout_close = on + listener.ws.external.recbuf = 4KB listener.ws.external.sndbuf = 4KB @@ -531,6 +547,20 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## listener.wss.external.fail_if_no_peer_cert = true +listener.wss.external.backlog = 1024 + +listener.wss.external.send_timeout = 15s + +listener.wss.external.send_timeout_close = on + +## listener.wss.external.recbuf = 4KB + +## listener.wss.external.sndbuf = 4KB + +## listener.wss.external.buffer = 4KB + +## listener.wss.external.nodelay = true + ##-------------------------------------------------------------------- ## HTTP Management API Listener @@ -542,6 +572,12 @@ listener.api.mgmt.max_clients = 64 listener.api.mgmt.access.1 = allow all +listener.api.mgmt.backlog = 512 + +listener.api.mgmt.send_timeout = 15s + +listener.api.mgmt.send_timeout_close = on + ##------------------------------------------------------------------- ## System Monitor ##------------------------------------------------------------------- diff --git a/priv/emq.schema b/priv/emq.schema index d05cc79cf..e8746582b 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -805,8 +805,18 @@ end}. ]}. {mapping, "listener.tcp.$name.backlog", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} + {datatype, integer}, + {default, 1024} +]}. + +{mapping, "listener.tcp.$name.send_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.tcp.$name.send_timeout_close", "emqttd.listeners", [ + {datatype, flag}, + {default, on} ]}. {mapping, "listener.tcp.$name.recbuf", "emqttd.listeners", [ @@ -883,6 +893,16 @@ end}. {datatype, integer} ]}. +{mapping, "listener.ssl.$name.send_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.ssl.$name.send_timeout_close", "emqttd.listeners", [ + {datatype, flag}, + {default, on} +]}. + {mapping, "listener.ssl.$name.recbuf", "emqttd.listeners", [ {datatype, bytesize}, hidden @@ -996,6 +1016,16 @@ end}. {datatype, integer} ]}. +{mapping, "listener.ws.$name.send_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.ws.$name.send_timeout_close", "emqttd.listeners", [ + {datatype, flag}, + {default, on} +]}. + {mapping, "listener.ws.$name.recbuf", "emqttd.listeners", [ {datatype, bytesize}, hidden @@ -1059,6 +1089,16 @@ end}. {datatype, integer} ]}. +{mapping, "listener.wss.$name.send_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.wss.$name.send_timeout_close", "emqttd.listeners", [ + {datatype, flag}, + {default, on} +]}. + {mapping, "listener.wss.$name.recbuf", "emqttd.listeners", [ {datatype, bytesize}, hidden @@ -1145,6 +1185,8 @@ end}. end, TcpOpts = fun(Prefix) -> Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, + {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)}, + {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)}, {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, @@ -1252,6 +1294,16 @@ end}. {datatype, integer} ]}. +{mapping, "listener.api.$name.send_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.api.$name.send_timeout_close", "emqttd.listeners", [ + {datatype, flag}, + {default, on} +]}. + {mapping, "listener.api.$name.recbuf", "emqttd.listeners", [ {datatype, bytesize}, hidden From 51533dbe9eedf79e4a492a74fadc3e3e394fb8a4 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 5 Dec 2017 23:41:40 +0800 Subject: [PATCH 2/8] Shutdown the connection if an error occurred when sending data --- src/emqttd_client.erl | 3 ++- src/emqttd_protocol.erl | 6 ++---- src/emqttd_ws.erl | 1 + src/emqttd_ws_client.erl | 6 +++++- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 6631b4566..63bac7c8d 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -140,7 +140,8 @@ send_fun(Conn, Peername) -> ?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}), emqttd_metrics:inc('bytes/sent', iolist_size(Data)), try Conn:async_send(Data) of - true -> ok + ok -> ok; + {error, Reason} -> Self ! {shutdown, Reason} catch error:Error -> Self ! {shutdown, Error} end diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 384f93225..c35e8ae50 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -341,13 +341,11 @@ send(Msg, State = #proto_state{client_id = ClientId, emqttd_hooks:run('message.delivered', [ClientId, Username], Msg), send(emqttd_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State); -send(Packet = ?PACKET(Type), - State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> +send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> trace(send, Packet, State), emqttd_metrics:sent(Packet), SendFun(Packet), - Stats1 = inc_stats(send, Type, Stats), - {ok, State#proto_state{stats_data = Stats1}}. + {ok, State#proto_state{stats_data = inc_stats(send, Type, Stats)}}. trace(recv, Packet, ProtoState) -> ?LOG(debug, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState); diff --git a/src/emqttd_ws.erl b/src/emqttd_ws.erl index c7d0b2119..35a7f9852 100644 --- a/src/emqttd_ws.erl +++ b/src/emqttd_ws.erl @@ -38,6 +38,7 @@ handle_request(Req) -> %%-------------------------------------------------------------------- %% MQTT Over WebSocket %%-------------------------------------------------------------------- + handle_request('GET', "/mqtt", Req) -> lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]), Upgrade = Req:get_header_value("Upgrade"), diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index b9d25ad3e..206f461bb 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -272,10 +272,14 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- send_fun(ReplyChannel) -> + Self = self(), fun(Packet) -> Data = emqttd_serializer:serialize(Packet), emqttd_metrics:inc('bytes/sent', iolist_size(Data)), - ReplyChannel({binary, Data}) + case ReplyChannel({binary, Data}) of + ok -> ok; + {error, Reason} -> Self ! {shutdown, Reason} + end end. stat_fun(Conn) -> From c3c55894520d121388c08e8f5546eccc6c471a8b Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 6 Dec 2017 10:01:42 +0800 Subject: [PATCH 3/8] Version 2.3.2 --- Makefile | 2 +- src/emqttd.app.src | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index a1d77b1a3..c42a4a345 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker -PROJECT_VERSION = 2.3.1 +PROJECT_VERSION = 2.3.2 DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt clique jsx diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 67af8854e..e321b73f1 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,6 +1,6 @@ {application,emqttd, [{description,"Erlang MQTT Broker"}, - {vsn,"2.3.1"}, + {vsn,"2.3.2"}, {modules,[]}, {registered,[emqttd_sup]}, {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb, From 9f1c3a589999794bc5f43f336e1a62a2647b88a4 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 6 Dec 2017 14:42:26 +0800 Subject: [PATCH 4/8] Compatible with esockd 4.x --- src/emqttd_client.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 63bac7c8d..5ca450bf5 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -141,6 +141,7 @@ send_fun(Conn, Peername) -> emqttd_metrics:inc('bytes/sent', iolist_size(Data)), try Conn:async_send(Data) of ok -> ok; + true -> ok; %% Compatible with esockd 4.x {error, Reason} -> Self ! {shutdown, Reason} catch error:Error -> Self ! {shutdown, Error} From 4c52d997062920dc1edddc61af65c0ac3faa27aa Mon Sep 17 00:00:00 2001 From: HuangDan Date: Wed, 6 Dec 2017 15:47:46 +0800 Subject: [PATCH 5/8] Fixed test cases for emqttd_router_SUITE --- Makefile | 2 +- test/emqttd_SUITE.erl | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index c42a4a345..d529cdbc5 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,7 @@ dep_cuttlefish = git https://github.com/emqtt/cuttlefish TEST_DEPS = emqttc emq_dashboard dep_emqttc = git https://github.com/emqtt/emqttc -dep_emq_dashboard = git https://github.com/emqtt/emq_dashboard +dep_emq_dashboard = git https://github.com/emqtt/emq_dashboard develop TEST_ERLC_OPTS += +debug_info TEST_ERLC_OPTS += +'{parse_transform, lager_transform}' diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index fe48fb402..76a8e7614 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -328,7 +328,10 @@ router_print(_) -> #mqtt_route{topic = <<"#">>, node = node()}, #mqtt_route{topic = <<"+/#">>, node = node()}], lists:foreach(fun(R) -> emqttd_router:add_route(R) end, Routes), - emqttd_router:print(<<"a/b/c">>). + emqttd_router:print(<<"a/b/c">>), + emqttd_router:del_route(<<"+/#">>), + emqttd_router:del_route(<<"a/b/c">>), + emqttd_router:del_route(<<"#">>). router_unused(_) -> gen_server:call(emqttd_router, bad_call), @@ -598,6 +601,7 @@ conflict_listeners(_) -> L = proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners), ?assertEqual(1, proplists:get_value(current_clients, L)), ?assertEqual(1, proplists:get_value(conflict, proplists:get_value(shutdown_count, L))), + timer:sleep(100), emqttc:disconnect(C2). cli_vm(_) -> From 0ba8d3e11db5dccdaca47a32ab6b8ba5cc0dcee9 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 6 Dec 2017 20:13:27 +0800 Subject: [PATCH 6/8] Depends on develop branch of esockd and mochiweb --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index d529cdbc5..1c919bb1e 100644 --- a/Makefile +++ b/Makefile @@ -8,9 +8,9 @@ dep_goldrush = git https://github.com/basho/goldrush 0.1.9 dep_gproc = git https://github.com/uwiger/gproc dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_lager = git https://github.com/basho/lager master -dep_esockd = git https://github.com/emqtt/esockd master +dep_esockd = git https://github.com/emqtt/esockd develop dep_ekka = git https://github.com/emqtt/ekka master -dep_mochiweb = git https://github.com/emqtt/mochiweb master +dep_mochiweb = git https://github.com/emqtt/mochiweb develop dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master From 896088b7e8d41524e8580354262156b6a0863444 Mon Sep 17 00:00:00 2001 From: HuangDan Date: Wed, 6 Dec 2017 21:09:03 +0800 Subject: [PATCH 7/8] Router test cases migration from emqttd_SUITE to emqttd_route_SUITE --- test/emqttd_SUITE.erl | 52 ---------------------------------- test/emqttd_router_SUITE.erl | 54 ++++++++++++++++++++++++++++++++---- 2 files changed, 48 insertions(+), 58 deletions(-) diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index 76a8e7614..c5794e5b0 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -58,7 +58,6 @@ all() -> [{group, protocol}, {group, pubsub}, - {group, router}, {group, session}, {group, broker}, {group, metrics}, @@ -81,10 +80,6 @@ groups() -> t_local_subscribe, t_shared_subscribe, 'pubsub#', 'pubsub+']}, - {router, [sequence], - [router_add_del, - router_print, - router_unused]}, {session, [sequence], [start_session]}, {broker, [sequence], @@ -291,53 +286,6 @@ loop_recv(Topic, Timeout, Acc) -> Timeout -> {ok, Acc} end. -%%-------------------------------------------------------------------- -%% Router Test -%%-------------------------------------------------------------------- - -router_add_del(_) -> - %% Add - emqttd_router:add_route(<<"#">>), - emqttd_router:add_route(<<"a/b/c">>), - emqttd_router:add_route(<<"+/#">>), - Routes = [R1, R2 | _] = [ - #mqtt_route{topic = <<"#">>, node = node()}, - #mqtt_route{topic = <<"+/#">>, node = node()}, - #mqtt_route{topic = <<"a/b/c">>, node = node()}], - Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)), - - %% Batch Add - lists:foreach(fun(R) -> emqttd_router:add_route(R) end, Routes), - Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)), - - %% Del - emqttd_router:del_route(<<"a/b/c">>), - [R1, R2] = lists:sort(emqttd_router:match(<<"a/b/c">>)), - {atomic, []} = mnesia:transaction(fun emqttd_trie:lookup/1, [<<"a/b/c">>]), - - %% Batch Del - R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'}, - emqttd_router:add_route(R3), - emqttd_router:del_route(R1), - emqttd_router:del_route(R2), - emqttd_router:del_route(R3), - [] = lists:sort(emqttd_router:match(<<"a/b/c">>)). - -router_print(_) -> - Routes = [#mqtt_route{topic = <<"a/b/c">>, node = node()}, - #mqtt_route{topic = <<"#">>, node = node()}, - #mqtt_route{topic = <<"+/#">>, node = node()}], - lists:foreach(fun(R) -> emqttd_router:add_route(R) end, Routes), - emqttd_router:print(<<"a/b/c">>), - emqttd_router:del_route(<<"+/#">>), - emqttd_router:del_route(<<"a/b/c">>), - emqttd_router:del_route(<<"#">>). - -router_unused(_) -> - gen_server:call(emqttd_router, bad_call), - gen_server:cast(emqttd_router, bad_msg), - emqttd_router ! bad_info. - recv_loop(Msgs) -> receive {dispatch, _Topic, Msg} -> diff --git a/test/emqttd_router_SUITE.erl b/test/emqttd_router_SUITE.erl index addd36288..415550ec3 100644 --- a/test/emqttd_router_SUITE.erl +++ b/test/emqttd_router_SUITE.erl @@ -34,7 +34,8 @@ groups() -> t_add_del_route, t_match_route, t_print, - t_has_route]}, + t_has_route, + router_unused]}, {local_route, [sequence], [t_get_local_topics, t_add_del_local_route, @@ -86,11 +87,6 @@ t_match_route(_) -> #mqtt_route{topic = <<"a/b/c">>, node = Node}], lists:sort(?R:match(<<"a/b/c">>))). -t_print(_) -> - ?R:add_route(<<"topic">>), - ?R:add_route(<<"topic/#">>), - ?R:print(<<"topic">>). - t_has_route(_) -> ?R:add_route(<<"devices/+/messages">>), ?assert(?R:has_route(<<"devices/+/messages">>)). @@ -130,3 +126,49 @@ clear_tables() -> ?R:clean_local_routes(), lists:foreach(fun mnesia:clear_table/1, [mqtt_route, mqtt_trie, mqtt_trie_node]). +%%-------------------------------------------------------------------- +%% Router Test +%%-------------------------------------------------------------------- + +router_add_del(_) -> + %% Add + ?R:add_route(<<"#">>), + ?R:add_route(<<"a/b/c">>), + ?R:add_route(<<"+/#">>), + Routes = [R1, R2 | _] = [ + #mqtt_route{topic = <<"#">>, node = node()}, + #mqtt_route{topic = <<"+/#">>, node = node()}, + #mqtt_route{topic = <<"a/b/c">>, node = node()}], + Routes = lists:sort(?R:match(<<"a/b/c">>)), + + %% Batch Add + lists:foreach(fun(R) -> ?R:add_route(R) end, Routes), + Routes = lists:sort(?R:match(<<"a/b/c">>)), + + %% Del + ?R:del_route(<<"a/b/c">>), + [R1, R2] = lists:sort(?R:match(<<"a/b/c">>)), + {atomic, []} = mnesia:transaction(fun emqttd_trie:lookup/1, [<<"a/b/c">>]), + + %% Batch Del + R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'}, + ?R:add_route(R3), + ?R:del_route(R1), + ?R:del_route(R2), + ?R:del_route(R3), + [] = lists:sort(?R:match(<<"a/b/c">>)). + +t_print(_) -> + Routes = [#mqtt_route{topic = <<"a/b/c">>, node = node()}, + #mqtt_route{topic = <<"#">>, node = node()}, + #mqtt_route{topic = <<"+/#">>, node = node()}], + lists:foreach(fun(R) -> ?R:add_route(R) end, Routes), + ?R:print(<<"a/b/c">>), + ?R:del_route(<<"+/#">>), + ?R:del_route(<<"a/b/c">>), + ?R:del_route(<<"#">>). + +router_unused(_) -> + gen_server:call(emqttd_router, bad_call), + gen_server:cast(emqttd_router, bad_msg), + emqttd_router ! bad_info. From ceafaad6b7c0a4269359e3f8fd046784656f6da7 Mon Sep 17 00:00:00 2001 From: HuangDan Date: Wed, 6 Dec 2017 21:22:36 +0800 Subject: [PATCH 8/8] Update deps branch --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index f3407dd9e..55dfd711e 100644 --- a/Makefile +++ b/Makefile @@ -8,9 +8,9 @@ dep_goldrush = git https://github.com/basho/goldrush 0.1.9 dep_gproc = git https://github.com/uwiger/gproc dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_lager = git https://github.com/basho/lager master -dep_esockd = git https://github.com/emqtt/esockd develop +dep_esockd = git https://github.com/emqtt/esockd v5.1 dep_ekka = git https://github.com/emqtt/ekka master -dep_mochiweb = git https://github.com/emqtt/mochiweb develop +dep_mochiweb = git https://github.com/emqtt/mochiweb v4.2.0 dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master