Merge pull request #3877 from emqx/e422_to_v430
This commit is contained in:
commit
c6ec7a3724
|
@ -35,4 +35,4 @@ Mnesia.*/
|
||||||
_checkouts
|
_checkouts
|
||||||
rebar.config.rendered
|
rebar.config.rendered
|
||||||
/rebar3
|
/rebar3
|
||||||
rebar.lock
|
rebar.lock
|
|
@ -1139,6 +1139,13 @@ listener.tcp.external.send_timeout_close = on
|
||||||
## Value: on | off
|
## Value: on | off
|
||||||
## listener.tcp.external.tune_buffer = off
|
## listener.tcp.external.tune_buffer = off
|
||||||
|
|
||||||
|
## The socket is set to a busy state when the amount of data queued internally
|
||||||
|
## by the ERTS socket implementation reaches this limit.
|
||||||
|
##
|
||||||
|
## Value: on | off
|
||||||
|
## Defaults to 1MB
|
||||||
|
## listener.tcp.external.high_watermark = 1MB
|
||||||
|
|
||||||
## The TCP_NODELAY flag for MQTT connections. Small amounts of data are
|
## The TCP_NODELAY flag for MQTT connections. Small amounts of data are
|
||||||
## sent immediately if the option is enabled.
|
## sent immediately if the option is enabled.
|
||||||
##
|
##
|
||||||
|
@ -1317,6 +1324,11 @@ listener.ssl.external.access.1 = allow all
|
||||||
## Value: Duration
|
## Value: Duration
|
||||||
listener.ssl.external.handshake_timeout = 15s
|
listener.ssl.external.handshake_timeout = 15s
|
||||||
|
|
||||||
|
## Maximum number of non-self-issued intermediate certificates that can follow the peer certificate in a valid certification path.
|
||||||
|
##
|
||||||
|
## Value: Number
|
||||||
|
#listener.ssl.external.depth = 10
|
||||||
|
|
||||||
## Path to the file containing the user's private PEM-encoded key.
|
## Path to the file containing the user's private PEM-encoded key.
|
||||||
##
|
##
|
||||||
## See: http://erlang.org/doc/man/ssl.html
|
## See: http://erlang.org/doc/man/ssl.html
|
||||||
|
|
|
@ -1244,6 +1244,11 @@ end}.
|
||||||
hidden
|
hidden
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.tcp.$name.high_watermark", "emqx.listeners", [
|
||||||
|
{datatype, bytesize},
|
||||||
|
{default, "1MB"}
|
||||||
|
]}.
|
||||||
|
|
||||||
{mapping, "listener.tcp.$name.tune_buffer", "emqx.listeners", [
|
{mapping, "listener.tcp.$name.tune_buffer", "emqx.listeners", [
|
||||||
{datatype, flag},
|
{datatype, flag},
|
||||||
hidden
|
hidden
|
||||||
|
@ -1336,6 +1341,11 @@ end}.
|
||||||
hidden
|
hidden
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.ssl.$name.high_watermark", "emqx.listeners", [
|
||||||
|
{datatype, bytesize},
|
||||||
|
{default, "1MB"}
|
||||||
|
]}.
|
||||||
|
|
||||||
{mapping, "listener.ssl.$name.tune_buffer", "emqx.listeners", [
|
{mapping, "listener.ssl.$name.tune_buffer", "emqx.listeners", [
|
||||||
{datatype, flag},
|
{datatype, flag},
|
||||||
hidden
|
hidden
|
||||||
|
@ -1368,6 +1378,11 @@ end}.
|
||||||
{datatype, {duration, ms}}
|
{datatype, {duration, ms}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.ssl.$name.depth", "emqx.listeners", [
|
||||||
|
{default, 10},
|
||||||
|
{datatype, integer}
|
||||||
|
]}.
|
||||||
|
|
||||||
{mapping, "listener.ssl.$name.dhfile", "emqx.listeners", [
|
{mapping, "listener.ssl.$name.dhfile", "emqx.listeners", [
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
@ -1839,6 +1854,7 @@ end}.
|
||||||
{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
|
{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
|
||||||
{sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
|
{sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
|
||||||
{buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
|
{buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
|
||||||
|
{high_watermark, cuttlefish:conf_get(Prefix ++ ".high_watermark", Conf, undefined)},
|
||||||
{nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)},
|
{nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)},
|
||||||
{reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}])
|
{reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}])
|
||||||
end,
|
end,
|
||||||
|
@ -1878,6 +1894,7 @@ end}.
|
||||||
{ciphers, Ciphers},
|
{ciphers, Ciphers},
|
||||||
{user_lookup_fun, UserLookupFun},
|
{user_lookup_fun, UserLookupFun},
|
||||||
{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)},
|
{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)},
|
||||||
|
{depth, cuttlefish:conf_get(Prefix ++ ".depth", Conf, undefined)},
|
||||||
{dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)},
|
{dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)},
|
||||||
{keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)},
|
{keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)},
|
||||||
{certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)},
|
{certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)},
|
||||||
|
|
|
@ -1,48 +1,9 @@
|
||||||
%% -*-: erlang -*-
|
%% -*-: erlang -*-
|
||||||
{DefaultLen, DefaultSize} =
|
{VSN,
|
||||||
case WordSize = erlang:system_info(wordsize) of
|
[
|
||||||
8 -> % arch_64
|
{<<".*">>, []}
|
||||||
{10000, cuttlefish_bytesize:parse("64MB")};
|
],
|
||||||
4 -> % arch_32
|
[
|
||||||
{1000, cuttlefish_bytesize:parse("32MB")}
|
{<<".*">>, []}
|
||||||
end,
|
]
|
||||||
{"4.2.3",
|
|
||||||
[
|
|
||||||
{"4.2.2", [
|
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []}
|
|
||||||
]},
|
|
||||||
{"4.2.1", [
|
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
|
||||||
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
|
|
||||||
{load_module, emqx_json, brutal_purge, soft_purge, []}
|
|
||||||
]},
|
|
||||||
{"4.2.0", [
|
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
|
||||||
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
|
|
||||||
{load_module, emqx_json, brutal_purge, soft_purge, []},
|
|
||||||
{apply, {application, set_env,
|
|
||||||
[emqx, force_shutdown_policy,
|
|
||||||
#{message_queue_len => DefaultLen,
|
|
||||||
max_heap_size => DefaultSize div WordSize}]}}
|
|
||||||
]}
|
|
||||||
],
|
|
||||||
[
|
|
||||||
{"4.2.2", [
|
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []}
|
|
||||||
]},
|
|
||||||
{"4.2.1", [
|
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
|
||||||
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
|
|
||||||
{load_module, emqx_json, brutal_purge, soft_purge, []}
|
|
||||||
]},
|
|
||||||
{"4.2.0", [
|
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
|
||||||
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
|
|
||||||
{load_module, emqx_json, brutal_purge, soft_purge, []}
|
|
||||||
]}
|
|
||||||
]
|
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -344,6 +344,8 @@ normalize_message(partition, #{occurred := Node}) ->
|
||||||
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
|
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
|
||||||
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
|
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
|
||||||
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
|
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
|
||||||
|
normalize_message(<<"mqtt_conn/congested/", ClientId/binary>>, _) ->
|
||||||
|
list_to_binary(io_lib:format("MQTT connection for clientid '~s' is congested", [ClientId]));
|
||||||
normalize_message(_Name, _UnknownDetails) ->
|
normalize_message(_Name, _UnknownDetails) ->
|
||||||
<<"Unknown alarm">>.
|
<<"Unknown alarm">>.
|
||||||
|
|
||||||
|
|
|
@ -131,6 +131,20 @@ info(zone, #channel{clientinfo = #{zone := Zone}}) ->
|
||||||
Zone;
|
Zone;
|
||||||
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
|
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
|
||||||
ClientId;
|
ClientId;
|
||||||
|
info(username, #channel{clientinfo = #{username := Username}}) ->
|
||||||
|
Username;
|
||||||
|
info(socktype, #channel{conninfo = #{socktype := SockType}}) ->
|
||||||
|
SockType;
|
||||||
|
info(peername, #channel{conninfo = #{peername := Peername}}) ->
|
||||||
|
Peername;
|
||||||
|
info(sockname, #channel{conninfo = #{sockname := Sockname}}) ->
|
||||||
|
Sockname;
|
||||||
|
info(proto_name, #channel{conninfo = #{proto_name := ProtoName}}) ->
|
||||||
|
ProtoName;
|
||||||
|
info(proto_ver, #channel{conninfo = #{proto_ver := ProtoVer}}) ->
|
||||||
|
ProtoVer;
|
||||||
|
info(connected_at, #channel{conninfo = #{connected_at := ConnectedAt}}) ->
|
||||||
|
ConnectedAt;
|
||||||
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
|
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
|
||||||
ClientInfo;
|
ClientInfo;
|
||||||
info(session, #channel{session = Session}) ->
|
info(session, #channel{session = Session}) ->
|
||||||
|
|
|
@ -80,8 +80,8 @@
|
||||||
limit_timer :: maybe(reference()),
|
limit_timer :: maybe(reference()),
|
||||||
%% Parse State
|
%% Parse State
|
||||||
parse_state :: emqx_frame:parse_state(),
|
parse_state :: emqx_frame:parse_state(),
|
||||||
%% Serialize function
|
%% Serialize options
|
||||||
serialize :: emqx_frame:serialize_fun(),
|
serialize :: emqx_frame:serialize_opts(),
|
||||||
%% Channel State
|
%% Channel State
|
||||||
channel :: emqx_channel:channel(),
|
channel :: emqx_channel:channel(),
|
||||||
%% GC State
|
%% GC State
|
||||||
|
@ -103,11 +103,24 @@
|
||||||
|
|
||||||
-define(ENABLED(X), (X =/= undefined)).
|
-define(ENABLED(X), (X =/= undefined)).
|
||||||
|
|
||||||
|
-define(ALARM_TCP_CONGEST(Channel),
|
||||||
|
list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s",
|
||||||
|
[emqx_channel:info(clientid, Channel),
|
||||||
|
emqx_channel:info(username, Channel)]))).
|
||||||
|
|
||||||
|
-define(ALARM_CONN_INFO_KEYS, [
|
||||||
|
socktype, sockname, peername,
|
||||||
|
clientid, username, proto_name, proto_ver, connected_at
|
||||||
|
]).
|
||||||
|
-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
|
||||||
|
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
|
||||||
|
|
||||||
-dialyzer({no_match, [info/2]}).
|
-dialyzer({no_match, [info/2]}).
|
||||||
-dialyzer({nowarn_function, [ init/4
|
-dialyzer({nowarn_function, [ init/4
|
||||||
, init_state/3
|
, init_state/3
|
||||||
, run_loop/2
|
, run_loop/2
|
||||||
, system_terminate/4
|
, system_terminate/4
|
||||||
|
, system_code_change/4
|
||||||
]}).
|
]}).
|
||||||
|
|
||||||
-spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist())
|
-spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist())
|
||||||
|
@ -203,7 +216,7 @@ init_state(Transport, Socket, Options) ->
|
||||||
Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit),
|
Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit),
|
||||||
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
|
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
|
||||||
ParseState = emqx_frame:initial_parse_state(FrameOpts),
|
ParseState = emqx_frame:initial_parse_state(FrameOpts),
|
||||||
Serialize = emqx_frame:serialize_fun(),
|
Serialize = emqx_frame:serialize_opts(),
|
||||||
Channel = emqx_channel:init(ConnInfo, Options),
|
Channel = emqx_channel:init(ConnInfo, Options),
|
||||||
GcState = emqx_zone:init_gc_state(Zone),
|
GcState = emqx_zone:init_gc_state(Zone),
|
||||||
StatsTimer = emqx_zone:stats_timer(Zone),
|
StatsTimer = emqx_zone:stats_timer(Zone),
|
||||||
|
@ -337,7 +350,7 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
|
||||||
handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
|
handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
|
||||||
State = #state{idle_timer = IdleTimer}) ->
|
State = #state{idle_timer = IdleTimer}) ->
|
||||||
ok = emqx_misc:cancel_timer(IdleTimer),
|
ok = emqx_misc:cancel_timer(IdleTimer),
|
||||||
Serialize = emqx_frame:serialize_fun(ConnPkt),
|
Serialize = emqx_frame:serialize_opts(ConnPkt),
|
||||||
NState = State#state{serialize = Serialize,
|
NState = State#state{serialize = Serialize,
|
||||||
idle_timer = undefined
|
idle_timer = undefined
|
||||||
},
|
},
|
||||||
|
@ -428,6 +441,7 @@ handle_msg(Msg, State) ->
|
||||||
|
|
||||||
terminate(Reason, State = #state{channel = Channel}) ->
|
terminate(Reason, State = #state{channel = Channel}) ->
|
||||||
?LOG(debug, "Terminated due to ~p", [Reason]),
|
?LOG(debug, "Terminated due to ~p", [Reason]),
|
||||||
|
emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)),
|
||||||
emqx_channel:terminate(Reason, Channel),
|
emqx_channel:terminate(Reason, Channel),
|
||||||
close_socket(State),
|
close_socket(State),
|
||||||
exit(Reason).
|
exit(Reason).
|
||||||
|
@ -578,7 +592,7 @@ handle_outgoing(Packet, State) ->
|
||||||
|
|
||||||
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
||||||
fun(Packet) ->
|
fun(Packet) ->
|
||||||
case Serialize(Packet) of
|
case emqx_frame:serialize_pkt(Packet, Serialize) of
|
||||||
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
|
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
|
||||||
[emqx_packet:format(Packet)]),
|
[emqx_packet:format(Packet)]),
|
||||||
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
||||||
|
@ -594,11 +608,12 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
||||||
%% Send data
|
%% Send data
|
||||||
|
|
||||||
-spec(send(iodata(), state()) -> ok).
|
-spec(send(iodata(), state()) -> ok).
|
||||||
send(IoData, #state{transport = Transport, socket = Socket}) ->
|
send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) ->
|
||||||
Oct = iolist_size(IoData),
|
Oct = iolist_size(IoData),
|
||||||
ok = emqx_metrics:inc('bytes.sent', Oct),
|
ok = emqx_metrics:inc('bytes.sent', Oct),
|
||||||
emqx_pd:inc_counter(outgoing_bytes, Oct),
|
emqx_pd:inc_counter(outgoing_bytes, Oct),
|
||||||
case Transport:async_send(Socket, IoData) of
|
maybe_warn_congestion(Socket, Transport, Channel),
|
||||||
|
case Transport:async_send(Socket, IoData, [nosuspend]) of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
Error = {error, _Reason} ->
|
Error = {error, _Reason} ->
|
||||||
%% Send an inet_reply to postpone handling the error
|
%% Send an inet_reply to postpone handling the error
|
||||||
|
@ -606,6 +621,48 @@ send(IoData, #state{transport = Transport, socket = Socket}) ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
maybe_warn_congestion(Socket, Transport, Channel) ->
|
||||||
|
IsCongestAlarmSet = is_congestion_alarm_set(),
|
||||||
|
case is_congested(Socket, Transport) of
|
||||||
|
true when not IsCongestAlarmSet ->
|
||||||
|
ok = set_congestion_alarm(),
|
||||||
|
emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel),
|
||||||
|
tcp_congestion_alarm_details(Socket, Transport, Channel));
|
||||||
|
false when IsCongestAlarmSet ->
|
||||||
|
ok = clear_congestion_alarm(),
|
||||||
|
emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel));
|
||||||
|
_ -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
is_congested(Socket, Transport) ->
|
||||||
|
case Transport:getstat(Socket, [send_pend]) of
|
||||||
|
{ok, [{send_pend, N}]} when N > 0 -> true;
|
||||||
|
_ -> false
|
||||||
|
end.
|
||||||
|
|
||||||
|
is_congestion_alarm_set() ->
|
||||||
|
case erlang:get(conn_congested) of
|
||||||
|
true -> true;
|
||||||
|
_ -> false
|
||||||
|
end.
|
||||||
|
set_congestion_alarm() ->
|
||||||
|
erlang:put(conn_congested, true), ok.
|
||||||
|
clear_congestion_alarm() ->
|
||||||
|
erlang:put(conn_congested, false), ok.
|
||||||
|
|
||||||
|
tcp_congestion_alarm_details(Socket, Transport, Channel) ->
|
||||||
|
{ok, Stat} = Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS),
|
||||||
|
{ok, Opts} = Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS),
|
||||||
|
SockInfo = maps:from_list(Stat ++ Opts),
|
||||||
|
ConnInfo = maps:from_list([conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS]),
|
||||||
|
maps:merge(ConnInfo, SockInfo).
|
||||||
|
|
||||||
|
conn_info(Key, Channel) when Key =:= sockname; Key =:= peername ->
|
||||||
|
{IPStr, Port} = emqx_channel:info(Key, Channel),
|
||||||
|
{Key, iolist_to_binary([inet:ntoa(IPStr),":",integer_to_list(Port)])};
|
||||||
|
conn_info(Key, Channel) ->
|
||||||
|
{Key, emqx_channel:info(Key, Channel)}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle Info
|
%% Handle Info
|
||||||
|
|
||||||
|
@ -621,7 +678,7 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({sock_error, Reason}, State) ->
|
handle_info({sock_error, Reason}, State) ->
|
||||||
?LOG(debug, "Socket error: ~p", [Reason]),
|
Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]),
|
||||||
handle_info({sock_closed, Reason}, close_socket(State));
|
handle_info({sock_closed, Reason}, close_socket(State));
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
|
|
@ -27,6 +27,9 @@
|
||||||
, parse/2
|
, parse/2
|
||||||
, serialize_fun/0
|
, serialize_fun/0
|
||||||
, serialize_fun/1
|
, serialize_fun/1
|
||||||
|
, serialize_opts/0
|
||||||
|
, serialize_opts/1
|
||||||
|
, serialize_pkt/2
|
||||||
, serialize/1
|
, serialize/1
|
||||||
, serialize/2
|
, serialize/2
|
||||||
]).
|
]).
|
||||||
|
@ -34,7 +37,7 @@
|
||||||
-export_type([ options/0
|
-export_type([ options/0
|
||||||
, parse_state/0
|
, parse_state/0
|
||||||
, parse_result/0
|
, parse_result/0
|
||||||
, serialize_fun/0
|
, serialize_opts/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-type(options() :: #{strict_mode => boolean(),
|
-type(options() :: #{strict_mode => boolean(),
|
||||||
|
@ -42,14 +45,19 @@
|
||||||
version => emqx_types:version()
|
version => emqx_types:version()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type(parse_state() :: {none, options()} | cont_fun()).
|
-type(parse_state() :: {none, options()} | {cont_state(), options()}).
|
||||||
|
|
||||||
-type(parse_result() :: {more, cont_fun()}
|
-type(parse_result() :: {more, parse_state()}
|
||||||
| {ok, emqx_types:packet(), binary(), parse_state()}).
|
| {ok, emqx_types:packet(), binary(), parse_state()}).
|
||||||
|
|
||||||
-type(cont_fun() :: fun((binary()) -> parse_result())).
|
-type(cont_state() :: {Stage :: len | body,
|
||||||
|
State :: #{hdr := #mqtt_packet_header{},
|
||||||
|
len := {pos_integer(), non_neg_integer()} | non_neg_integer(),
|
||||||
|
rest => binary()
|
||||||
|
}
|
||||||
|
}).
|
||||||
|
|
||||||
-type(serialize_fun() :: fun((emqx_types:packet()) -> iodata())).
|
-type(serialize_opts() :: options()).
|
||||||
|
|
||||||
-define(none(Options), {none, Options}).
|
-define(none(Options), {none, Options}).
|
||||||
|
|
||||||
|
@ -87,7 +95,7 @@ parse(Bin) ->
|
||||||
|
|
||||||
-spec(parse(binary(), parse_state()) -> parse_result()).
|
-spec(parse(binary(), parse_state()) -> parse_result()).
|
||||||
parse(<<>>, {none, Options}) ->
|
parse(<<>>, {none, Options}) ->
|
||||||
{more, fun(Bin) -> parse(Bin, {none, Options}) end};
|
{more, {none, Options}};
|
||||||
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
|
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
|
||||||
{none, Options = #{strict_mode := StrictMode}}) ->
|
{none, Options = #{strict_mode := StrictMode}}) ->
|
||||||
%% Validate header if strict mode.
|
%% Validate header if strict mode.
|
||||||
|
@ -102,11 +110,19 @@ parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
|
||||||
FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS}
|
FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS}
|
||||||
end,
|
end,
|
||||||
parse_remaining_len(Rest, Header1, Options);
|
parse_remaining_len(Rest, Header1, Options);
|
||||||
parse(Bin, Cont) when is_binary(Bin), is_function(Cont) ->
|
|
||||||
Cont(Bin).
|
parse(Bin, {{len, #{hdr := Header,
|
||||||
|
len := {Multiplier, Length}}
|
||||||
|
}, Options}) when is_binary(Bin) ->
|
||||||
|
parse_remaining_len(Bin, Header, Multiplier, Length, Options);
|
||||||
|
parse(Bin, {{body, #{hdr := Header,
|
||||||
|
len := Length,
|
||||||
|
rest := Rest}
|
||||||
|
}, Options}) when is_binary(Bin) ->
|
||||||
|
parse_frame(<<Rest/binary, Bin/binary>>, Header, Length, Options).
|
||||||
|
|
||||||
parse_remaining_len(<<>>, Header, Options) ->
|
parse_remaining_len(<<>>, Header, Options) ->
|
||||||
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end};
|
{more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
|
||||||
parse_remaining_len(Rest, Header, Options) ->
|
parse_remaining_len(Rest, Header, Options) ->
|
||||||
parse_remaining_len(Rest, Header, 1, 0, Options).
|
parse_remaining_len(Rest, Header, 1, 0, Options).
|
||||||
|
|
||||||
|
@ -114,7 +130,7 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize})
|
||||||
when Length > MaxSize ->
|
when Length > MaxSize ->
|
||||||
error(frame_too_large);
|
error(frame_too_large);
|
||||||
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
|
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
|
||||||
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end};
|
{more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
|
||||||
%% Match DISCONNECT without payload
|
%% Match DISCONNECT without payload
|
||||||
parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
|
parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
|
||||||
Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
|
Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
|
||||||
|
@ -150,9 +166,7 @@ parse_frame(Bin, Header, Length, Options) ->
|
||||||
{ok, packet(Header, Variable), Rest, ?none(Options)}
|
{ok, packet(Header, Variable), Rest, ?none(Options)}
|
||||||
end;
|
end;
|
||||||
TooShortBin ->
|
TooShortBin ->
|
||||||
{more, fun(BinMore) ->
|
{more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}}
|
||||||
parse_frame(<<TooShortBin/binary, BinMore/binary>>, Header, Length, Options)
|
|
||||||
end}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-compile({inline, [packet/1, packet/2, packet/3]}).
|
-compile({inline, [packet/1, packet/2, packet/3]}).
|
||||||
|
@ -443,6 +457,20 @@ serialize_fun(#{version := Ver, max_size := MaxSize}) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
serialize_opts() ->
|
||||||
|
?DEFAULT_OPTIONS.
|
||||||
|
|
||||||
|
serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
|
||||||
|
MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
|
||||||
|
#{version => ProtoVer, max_size => MaxSize}.
|
||||||
|
|
||||||
|
serialize_pkt(Packet, #{version := Ver, max_size := MaxSize}) ->
|
||||||
|
IoData = serialize(Packet, Ver),
|
||||||
|
case is_too_large(IoData, MaxSize) of
|
||||||
|
true -> <<>>;
|
||||||
|
false -> IoData
|
||||||
|
end.
|
||||||
|
|
||||||
-spec(serialize(emqx_types:packet()) -> iodata()).
|
-spec(serialize(emqx_types:packet()) -> iodata()).
|
||||||
serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4).
|
serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4).
|
||||||
|
|
||||||
|
@ -746,4 +774,3 @@ fixqos(?PUBREL, 0) -> 1;
|
||||||
fixqos(?SUBSCRIBE, 0) -> 1;
|
fixqos(?SUBSCRIBE, 0) -> 1;
|
||||||
fixqos(?UNSUBSCRIBE, 0) -> 1;
|
fixqos(?UNSUBSCRIBE, 0) -> 1;
|
||||||
fixqos(_Type, QoS) -> QoS.
|
fixqos(_Type, QoS) -> QoS.
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@
|
||||||
-type(checker() :: #{ name := name()
|
-type(checker() :: #{ name := name()
|
||||||
, capacity := non_neg_integer()
|
, capacity := non_neg_integer()
|
||||||
, interval := non_neg_integer()
|
, interval := non_neg_integer()
|
||||||
, consumer := function() | esockd_rate_limit:bucket()
|
, consumer := esockd_rate_limit:bucket() | emqx_zone:zone()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type(name() :: conn_bytes_in
|
-type(name() :: conn_bytes_in
|
||||||
|
@ -53,6 +53,8 @@
|
||||||
|
|
||||||
-type(limiter() :: #limiter{}).
|
-type(limiter() :: #limiter{}).
|
||||||
|
|
||||||
|
-dialyzer({nowarn_function, [consume/3]}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -84,7 +86,7 @@ do_init_checker(Zone, {Name, {Capacity, Interval}}) ->
|
||||||
_ ->
|
_ ->
|
||||||
esockd_limiter:create({Zone, Name}, Capacity, Interval)
|
esockd_limiter:create({Zone, Name}, Capacity, Interval)
|
||||||
end,
|
end,
|
||||||
Ck#{consumer => fun(I) -> esockd_limiter:consume({Zone, Name}, I) end};
|
Ck#{consumer => Zone};
|
||||||
_ ->
|
_ ->
|
||||||
Ck#{consumer => esockd_rate_limit:new(Capacity / Interval, Capacity)}
|
Ck#{consumer => esockd_rate_limit:new(Capacity / Interval, Capacity)}
|
||||||
end.
|
end.
|
||||||
|
@ -126,7 +128,7 @@ consume(Pubs, Bytes, #{name := Name, consumer := Cons}) ->
|
||||||
_ ->
|
_ ->
|
||||||
case is_overall_limiter(Name) of
|
case is_overall_limiter(Name) of
|
||||||
true ->
|
true ->
|
||||||
{_, Intv} = Cons(Tokens),
|
{_, Intv} = esockd_limiter:consume({Cons, Name}, Tokens),
|
||||||
{Intv, Cons};
|
{Intv, Cons};
|
||||||
_ ->
|
_ ->
|
||||||
esockd_rate_limit:check(Tokens, Cons)
|
esockd_rate_limit:check(Tokens, Cons)
|
||||||
|
|
|
@ -70,8 +70,8 @@
|
||||||
limit_timer :: maybe(reference()),
|
limit_timer :: maybe(reference()),
|
||||||
%% Parse State
|
%% Parse State
|
||||||
parse_state :: emqx_frame:parse_state(),
|
parse_state :: emqx_frame:parse_state(),
|
||||||
%% Serialize Fun
|
%% Serialize options
|
||||||
serialize :: emqx_frame:serialize_fun(),
|
serialize :: emqx_frame:serialize_opts(),
|
||||||
%% Channel
|
%% Channel
|
||||||
channel :: emqx_channel:channel(),
|
channel :: emqx_channel:channel(),
|
||||||
%% GC State
|
%% GC State
|
||||||
|
@ -231,7 +231,7 @@ websocket_init([Req, Opts]) ->
|
||||||
MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple),
|
MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple),
|
||||||
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
|
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
|
||||||
ParseState = emqx_frame:initial_parse_state(FrameOpts),
|
ParseState = emqx_frame:initial_parse_state(FrameOpts),
|
||||||
Serialize = emqx_frame:serialize_fun(),
|
Serialize = emqx_frame:serialize_opts(),
|
||||||
Channel = emqx_channel:init(ConnInfo, Opts),
|
Channel = emqx_channel:init(ConnInfo, Opts),
|
||||||
GcState = emqx_zone:init_gc_state(Zone),
|
GcState = emqx_zone:init_gc_state(Zone),
|
||||||
StatsTimer = emqx_zone:stats_timer(Zone),
|
StatsTimer = emqx_zone:stats_timer(Zone),
|
||||||
|
@ -292,7 +292,7 @@ websocket_info({cast, Msg}, State) ->
|
||||||
handle_info(Msg, State);
|
handle_info(Msg, State);
|
||||||
|
|
||||||
websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
|
websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
|
||||||
Serialize = emqx_frame:serialize_fun(ConnPkt),
|
Serialize = emqx_frame:serialize_opts(ConnPkt),
|
||||||
NState = State#state{serialize = Serialize},
|
NState = State#state{serialize = Serialize},
|
||||||
handle_incoming(Packet, cancel_idle_timer(NState));
|
handle_incoming(Packet, cancel_idle_timer(NState));
|
||||||
|
|
||||||
|
@ -544,7 +544,7 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQT
|
||||||
|
|
||||||
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
||||||
fun(Packet) ->
|
fun(Packet) ->
|
||||||
case Serialize(Packet) of
|
case emqx_frame:serialize_pkt(Packet, Serialize) of
|
||||||
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.",
|
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.",
|
||||||
[emqx_packet:format(Packet)]),
|
[emqx_packet:format(Packet)]),
|
||||||
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
||||||
|
|
|
@ -52,6 +52,9 @@ init_per_suite(Config) ->
|
||||||
|
|
||||||
ok = meck:expect(emqx_channel, ensure_disconnected, fun(_, Channel) -> Channel end),
|
ok = meck:expect(emqx_channel, ensure_disconnected, fun(_, Channel) -> Channel end),
|
||||||
|
|
||||||
|
ok = meck:expect(emqx_alarm, activate, fun(_, _) -> ok end),
|
||||||
|
ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end),
|
||||||
|
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
|
@ -62,6 +65,7 @@ end_per_suite(_Config) ->
|
||||||
ok = meck:unload(emqx_pd),
|
ok = meck:unload(emqx_pd),
|
||||||
ok = meck:unload(emqx_metrics),
|
ok = meck:unload(emqx_metrics),
|
||||||
ok = meck:unload(emqx_hooks),
|
ok = meck:unload(emqx_hooks),
|
||||||
|
ok = meck:unload(emqx_alarm),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
|
@ -77,6 +81,7 @@ init_per_testcase(_TestCase, Config) ->
|
||||||
{ok, [{K, 0} || K <- Options]}
|
{ok, [{K, 0} || K <- Options]}
|
||||||
end),
|
end),
|
||||||
ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
|
ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
|
||||||
|
ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end),
|
||||||
ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
|
ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,7 @@ t_get_port_info(_Config) ->
|
||||||
{ok, Sock} = gen_tcp:connect("localhost", 5678, [binary, {packet, 0}]),
|
{ok, Sock} = gen_tcp:connect("localhost", 5678, [binary, {packet, 0}]),
|
||||||
emqx_vm:get_port_info(),
|
emqx_vm:get_port_info(),
|
||||||
ok = gen_tcp:close(Sock),
|
ok = gen_tcp:close(Sock),
|
||||||
[Port | _] = erlang:ports().
|
[_Port | _] = erlang:ports().
|
||||||
|
|
||||||
t_transform_port(_Config) ->
|
t_transform_port(_Config) ->
|
||||||
[Port | _] = erlang:ports(),
|
[Port | _] = erlang:ports(),
|
||||||
|
|
|
@ -348,16 +348,19 @@ t_connect_will_delay_interval(_) ->
|
||||||
{will_topic, Topic},
|
{will_topic, Topic},
|
||||||
{will_payload, Payload},
|
{will_payload, Payload},
|
||||||
{will_props, #{'Will-Delay-Interval' => 3}},
|
{will_props, #{'Will-Delay-Interval' => 3}},
|
||||||
{properties, #{'Session-Expiry-Interval' => 7200}},
|
{properties, #{'Session-Expiry-Interval' => 7200}}
|
||||||
{keepalive, 2}
|
|
||||||
]),
|
]),
|
||||||
{ok, _} = emqtt:connect(Client2),
|
{ok, _} = emqtt:connect(Client2),
|
||||||
|
%% terminate the client without sending the DISCONNECT
|
||||||
timer:sleep(5000),
|
emqtt:stop(Client2),
|
||||||
|
%% should not get the will msg in 2.5s
|
||||||
|
timer:sleep(1500),
|
||||||
?assertEqual(0, length(receive_messages(1))),
|
?assertEqual(0, length(receive_messages(1))),
|
||||||
timer:sleep(7000),
|
%% should get the will msg in 4.5s
|
||||||
|
timer:sleep(1000),
|
||||||
?assertEqual(1, length(receive_messages(1))),
|
?assertEqual(1, length(receive_messages(1))),
|
||||||
|
|
||||||
|
%% try again, but let the session expire quickly
|
||||||
{ok, Client3} = emqtt:start_link([
|
{ok, Client3} = emqtt:start_link([
|
||||||
{clientid, <<"t_connect_will_delay_interval">>},
|
{clientid, <<"t_connect_will_delay_interval">>},
|
||||||
{proto_ver, v5},
|
{proto_ver, v5},
|
||||||
|
@ -367,14 +370,16 @@ t_connect_will_delay_interval(_) ->
|
||||||
{will_topic, Topic},
|
{will_topic, Topic},
|
||||||
{will_payload, Payload},
|
{will_payload, Payload},
|
||||||
{will_props, #{'Will-Delay-Interval' => 7200}},
|
{will_props, #{'Will-Delay-Interval' => 7200}},
|
||||||
{properties, #{'Session-Expiry-Interval' => 3}},
|
{properties, #{'Session-Expiry-Interval' => 3}}
|
||||||
{keepalive, 2}
|
|
||||||
]),
|
]),
|
||||||
{ok, _} = emqtt:connect(Client3),
|
{ok, _} = emqtt:connect(Client3),
|
||||||
|
%% terminate the client without sending the DISCONNECT
|
||||||
timer:sleep(5000),
|
emqtt:stop(Client3),
|
||||||
|
%% should not get the will msg in 2.5s
|
||||||
|
timer:sleep(1500),
|
||||||
?assertEqual(0, length(receive_messages(1))),
|
?assertEqual(0, length(receive_messages(1))),
|
||||||
timer:sleep(7000),
|
%% should get the will msg in 4.5s
|
||||||
|
timer:sleep(1000),
|
||||||
?assertEqual(1, length(receive_messages(1))),
|
?assertEqual(1, length(receive_messages(1))),
|
||||||
|
|
||||||
ok = emqtt:disconnect(Client1),
|
ok = emqtt:disconnect(Client1),
|
||||||
|
|
Loading…
Reference in New Issue