diff --git a/Makefile b/Makefile index c3718ad6a..e0a06a673 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ PROJECT = emqx PROJECT_DESCRIPTION = EMQ X Broker -PROJECT_VERSION = 2.3.0 +PROJECT_VERSION = 2.3.2 NO_AUTOPATCH = cuttlefish @@ -12,9 +12,9 @@ dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_lager = git https://github.com/basho/lager master dep_lager_syslog = git https://github.com/basho/lager_syslog dep_jsx = git https://github.com/talentdeficit/jsx -dep_esockd = git https://github.com/emqtt/esockd master +dep_esockd = git https://github.com/emqtt/esockd v5.1 dep_ekka = git https://github.com/emqtt/ekka master -dep_mochiweb = git https://github.com/emqtt/mochiweb master +dep_mochiweb = git https://github.com/emqtt/mochiweb v4.2.0 dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master dep_clique = git https://github.com/emqtt/clique @@ -27,7 +27,7 @@ dep_cuttlefish = git https://github.com/emqtt/cuttlefish TEST_DEPS = emqttc emq_dashboard dep_emqttc = git https://github.com/emqtt/emqttc -dep_emq_dashboard = git https://github.com/emqtt/emq_dashboard +dep_emq_dashboard = git https://github.com/emqtt/emq_dashboard develop TEST_ERLC_OPTS += +debug_info TEST_ERLC_OPTS += +'{parse_transform, lager_transform}' diff --git a/etc/emqx.conf b/etc/emqx.conf index b4c60ee48..062f5d033 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -346,6 +346,10 @@ listener.tcp.external.access.2 = allow all ## TCP Socket Options listener.tcp.external.backlog = 1024 +listener.tcp.external.send_timeout = 15s + +listener.tcp.external.send_timeout_close = on + #listener.tcp.external.recbuf = 4KB #listener.tcp.external.sndbuf = 4KB @@ -376,6 +380,10 @@ listener.tcp.internal.max_clients = 102400 ## TCP Socket Options listener.tcp.internal.backlog = 512 +listener.tcp.internal.send_timeout = 15s + +listener.tcp.external.send_timeout_close = on + listener.tcp.internal.tune_buffer = on listener.tcp.internal.buffer = 1MB @@ -485,6 +493,10 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## SSL Socket Options ## listener.ssl.external.backlog = 1024 +## listener.ssl.external.send_timeout = 15s + +## listener.ssl.external.send_timeout_close = on + ## listener.ssl.external.recbuf = 4KB ## listener.ssl.external.sndbuf = 4KB @@ -509,6 +521,10 @@ listener.ws.external.access.1 = allow all ## TCP Options listener.ws.external.backlog = 1024 +listener.ws.external.send_timeout = 15s + +listener.ws.external.send_timeout_close = on + listener.ws.external.recbuf = 4KB listener.ws.external.sndbuf = 4KB @@ -528,7 +544,21 @@ listener.wss.external.max_clients = 64 ## listener.wss.external.zone = external -listener.wss.external.access.1 = allow all +## listener.wss.external.access.1 = allow all + +listener.wss.external.backlog = 1024 + +listener.wss.external.send_timeout = 15s + +listener.wss.external.send_timeout_close = on + +## listener.wss.external.recbuf = 4KB + +## listener.wss.external.sndbuf = 4KB + +## listener.wss.external.buffer = 4KB + +## listener.wss.external.nodelay = true ## SSL Options listener.wss.external.handshake_timeout = 15s diff --git a/priv/emqx.schema b/priv/emqx.schema index e37950b12..8fdd97329 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -804,8 +804,18 @@ end}. ]}. {mapping, "listener.tcp.$name.backlog", "emqx.listeners", [ - {default, 1024}, - {datatype, integer} + {datatype, integer}, + {default, 1024} +]}. + +{mapping, "listener.tcp.$name.send_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.tcp.$name.send_timeout_close", "emqx.listeners", [ + {datatype, flag}, + {default, on} ]}. {mapping, "listener.tcp.$name.recbuf", "emqx.listeners", [ @@ -882,6 +892,16 @@ end}. {datatype, integer} ]}. +{mapping, "listener.ssl.$name.send_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.ssl.$name.send_timeout_close", "emqx.listeners", [ + {datatype, flag}, + {default, on} +]}. + {mapping, "listener.ssl.$name.recbuf", "emqx.listeners", [ {datatype, bytesize}, hidden @@ -995,6 +1015,16 @@ end}. {datatype, integer} ]}. +{mapping, "listener.ws.$name.send_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.ws.$name.send_timeout_close", "emqx.listeners", [ + {datatype, flag}, + {default, on} +]}. + {mapping, "listener.ws.$name.recbuf", "emqx.listeners", [ {datatype, bytesize}, hidden @@ -1058,6 +1088,16 @@ end}. {datatype, integer} ]}. +{mapping, "listener.wss.$name.send_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.wss.$name.send_timeout_close", "emqx.listeners", [ + {datatype, flag}, + {default, on} +]}. + {mapping, "listener.wss.$name.recbuf", "emqx.listeners", [ {datatype, bytesize}, hidden @@ -1144,6 +1184,8 @@ end}. end, TcpOpts = fun(Prefix) -> Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, + {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)}, + {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)}, {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, @@ -1251,6 +1293,16 @@ end}. {datatype, integer} ]}. +{mapping, "listener.api.$name.send_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.api.$name.send_timeout_close", "emqx.listeners", [ + {datatype, flag}, + {default, on} +]}. + {mapping, "listener.api.$name.recbuf", "emqx.listeners", [ {datatype, bytesize}, hidden diff --git a/src/emqx.app.src b/src/emqx.app.src index 43c0afb21..d40646086 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -1,6 +1,6 @@ {application,emqx, [{description,"EMQ X Broker"}, - {vsn,"2.3.0"}, + {vsn,"2.3.2"}, {modules,[]}, {registered,[emqx_sup]}, {applications,[kernel,stdlib,gproc,gen_rpc,lager,esockd,mochiweb,lager_syslog,pbkdf2,bcrypt,clique,jsx]}, diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 5a16d1897..0701072a8 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -140,7 +140,9 @@ send_fun(Conn, Peername) -> ?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}), emqx_metrics:inc('bytes/sent', iolist_size(Data)), try Conn:async_send(Data) of - true -> ok + ok -> ok; + true -> ok; %% Compatible with esockd 4.x + {error, Reason} -> Self ! {shutdown, Reason} catch error:Error -> Self ! {shutdown, Error} end diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 011ab3aec..e50d7355a 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -44,7 +44,7 @@ clean_sess, proto_ver, proto_name, username, is_superuser, will_msg, keepalive, keepalive_backoff, max_clientid_len, session, stats_data, mountpoint, ws_initial_headers, - is_bridge, connected_at}). + peercert_username, is_bridge, connected_at}). -type(proto_state() :: #proto_state{}). @@ -362,13 +362,11 @@ send(Msg, State = #proto_state{client_id = ClientId, emqx_hooks:run('message.delivered', [ClientId, Username], Msg), send(emqx_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State); -send(Packet = ?PACKET(Type), - State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> +send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> trace(send, Packet, State), emqx_metrics:sent(Packet), SendFun(Packet), - Stats1 = inc_stats(send, Type, Stats), - {ok, State#proto_state{stats_data = Stats1}}. + {ok, State#proto_state{stats_data = inc_stats(send, Type, Stats)}}. trace(recv, Packet, ProtoState) -> ?LOG(info, "RECV ~s", [emqx_packet:format(Packet)], ProtoState); diff --git a/src/emqx_ws.erl b/src/emqx_ws.erl index 4f5e5b4b3..a0f1ae6f7 100644 --- a/src/emqx_ws.erl +++ b/src/emqx_ws.erl @@ -38,6 +38,7 @@ handle_request(Req) -> %%-------------------------------------------------------------------- %% MQTT Over WebSocket %%-------------------------------------------------------------------- + handle_request('GET', "/mqtt", Req) -> lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]), Upgrade = Req:get_header_value("Upgrade"), diff --git a/src/emqx_ws_client.erl b/src/emqx_ws_client.erl index 40e824238..42b69b221 100644 --- a/src/emqx_ws_client.erl +++ b/src/emqx_ws_client.erl @@ -275,10 +275,14 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- send_fun(ReplyChannel) -> + Self = self(), fun(Packet) -> Data = emqx_serializer:serialize(Packet), emqx_metrics:inc('bytes/sent', iolist_size(Data)), - ReplyChannel({binary, Data}) + case ReplyChannel({binary, Data}) of + ok -> ok; + {error, Reason} -> Self ! {shutdown, Reason} + end end. stat_fun(Conn) -> diff --git a/test/emqttd_router_SUITE.erl b/test/emqttd_router_SUITE.erl index addd36288..415550ec3 100644 --- a/test/emqttd_router_SUITE.erl +++ b/test/emqttd_router_SUITE.erl @@ -34,7 +34,8 @@ groups() -> t_add_del_route, t_match_route, t_print, - t_has_route]}, + t_has_route, + router_unused]}, {local_route, [sequence], [t_get_local_topics, t_add_del_local_route, @@ -86,11 +87,6 @@ t_match_route(_) -> #mqtt_route{topic = <<"a/b/c">>, node = Node}], lists:sort(?R:match(<<"a/b/c">>))). -t_print(_) -> - ?R:add_route(<<"topic">>), - ?R:add_route(<<"topic/#">>), - ?R:print(<<"topic">>). - t_has_route(_) -> ?R:add_route(<<"devices/+/messages">>), ?assert(?R:has_route(<<"devices/+/messages">>)). @@ -130,3 +126,49 @@ clear_tables() -> ?R:clean_local_routes(), lists:foreach(fun mnesia:clear_table/1, [mqtt_route, mqtt_trie, mqtt_trie_node]). +%%-------------------------------------------------------------------- +%% Router Test +%%-------------------------------------------------------------------- + +router_add_del(_) -> + %% Add + ?R:add_route(<<"#">>), + ?R:add_route(<<"a/b/c">>), + ?R:add_route(<<"+/#">>), + Routes = [R1, R2 | _] = [ + #mqtt_route{topic = <<"#">>, node = node()}, + #mqtt_route{topic = <<"+/#">>, node = node()}, + #mqtt_route{topic = <<"a/b/c">>, node = node()}], + Routes = lists:sort(?R:match(<<"a/b/c">>)), + + %% Batch Add + lists:foreach(fun(R) -> ?R:add_route(R) end, Routes), + Routes = lists:sort(?R:match(<<"a/b/c">>)), + + %% Del + ?R:del_route(<<"a/b/c">>), + [R1, R2] = lists:sort(?R:match(<<"a/b/c">>)), + {atomic, []} = mnesia:transaction(fun emqttd_trie:lookup/1, [<<"a/b/c">>]), + + %% Batch Del + R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'}, + ?R:add_route(R3), + ?R:del_route(R1), + ?R:del_route(R2), + ?R:del_route(R3), + [] = lists:sort(?R:match(<<"a/b/c">>)). + +t_print(_) -> + Routes = [#mqtt_route{topic = <<"a/b/c">>, node = node()}, + #mqtt_route{topic = <<"#">>, node = node()}, + #mqtt_route{topic = <<"+/#">>, node = node()}], + lists:foreach(fun(R) -> ?R:add_route(R) end, Routes), + ?R:print(<<"a/b/c">>), + ?R:del_route(<<"+/#">>), + ?R:del_route(<<"a/b/c">>), + ?R:del_route(<<"#">>). + +router_unused(_) -> + gen_server:call(emqttd_router, bad_call), + gen_server:cast(emqttd_router, bad_msg), + emqttd_router ! bad_info. diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index 48753c1f5..2bd961f30 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -60,7 +60,6 @@ all() -> [{group, protocol}, {group, pubsub}, - {group, router}, {group, session}, {group, broker}, {group, metrics}, @@ -83,10 +82,6 @@ groups() -> t_local_subscribe, t_shared_subscribe, 'pubsub#', 'pubsub+']}, - {router, [sequence], - [router_add_del, - router_print, - router_unused]}, {session, [sequence], [start_session]}, {broker, [sequence], @@ -297,50 +292,6 @@ loop_recv(Topic, Timeout, Acc) -> Timeout -> {ok, Acc} end. -%%-------------------------------------------------------------------- -%% Router Test -%%-------------------------------------------------------------------- - -router_add_del(_) -> - %% Add - emqx_router:add_route(<<"#">>), - emqx_router:add_route(<<"a/b/c">>), - emqx_router:add_route(<<"+/#">>), - Routes = [R1, R2 | _] = [ - #mqtt_route{topic = <<"#">>, node = node()}, - #mqtt_route{topic = <<"+/#">>, node = node()}, - #mqtt_route{topic = <<"a/b/c">>, node = node()}], - Routes = lists:sort(emqx_router:match(<<"a/b/c">>)), - - %% Batch Add - lists:foreach(fun(R) -> emqx_router:add_route(R) end, Routes), - Routes = lists:sort(emqx_router:match(<<"a/b/c">>)), - - %% Del - emqx_router:del_route(<<"a/b/c">>), - [R1, R2] = lists:sort(emqx_router:match(<<"a/b/c">>)), - {atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]), - - %% Batch Del - R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'}, - emqx_router:add_route(R3), - emqx_router:del_route(R1), - emqx_router:del_route(R2), - emqx_router:del_route(R3), - [] = lists:sort(emqx_router:match(<<"a/b/c">>)). - -router_print(_) -> - Routes = [#mqtt_route{topic = <<"a/b/c">>, node = node()}, - #mqtt_route{topic = <<"#">>, node = node()}, - #mqtt_route{topic = <<"+/#">>, node = node()}], - lists:foreach(fun(R) -> emqx_router:add_route(R) end, Routes), - emqx_router:print(<<"a/b/c">>). - -router_unused(_) -> - gen_server:call(emqx_router, bad_call), - gen_server:cast(emqx_router, bad_msg), - emqx_router ! bad_info. - recv_loop(Msgs) -> receive {dispatch, _Topic, Msg} -> @@ -604,6 +555,7 @@ conflict_listeners(_) -> L = proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners), ?assertEqual(1, proplists:get_value(current_clients, L)), ?assertEqual(1, proplists:get_value(conflict, proplists:get_value(shutdown_count, L))), + timer:sleep(100), emqttc:disconnect(C2). cli_vm(_) ->