Merge pull request #3007 from emqx/improve-connection
Improve the 'connection', 'channel' and 'zone' modules
This commit is contained in:
commit
1a901942f7
|
@ -1349,9 +1349,14 @@ listener.ws.external.max_connections = 102400
|
|||
## Value: Number
|
||||
listener.ws.external.max_conn_rate = 1000
|
||||
|
||||
## Simulate the {active, N} option for the MQTT/WebSocket connections.
|
||||
##
|
||||
## Value: Number
|
||||
listener.ws.external.active_n = 100
|
||||
|
||||
## Rate limit for the MQTT/WebSocket connections.
|
||||
##
|
||||
## Value: limit,duration
|
||||
## Value: Limit,Duration
|
||||
## Default: 100KB incoming per 10 seconds.
|
||||
## listener.ws.external.rate_limit = 100KB,10s
|
||||
|
||||
|
@ -1557,9 +1562,14 @@ listener.wss.external.max_connections = 16
|
|||
## Value: Number
|
||||
listener.wss.external.max_conn_rate = 1000
|
||||
|
||||
## Simulate the {active, N} option for the MQTT/WebSocket/SSL connections.
|
||||
##
|
||||
## Value: Number
|
||||
listener.wss.external.active_n = 100
|
||||
|
||||
## Rate limit for the MQTT/WebSocket/SSL connections.
|
||||
##
|
||||
## Value: limit,duration
|
||||
## Value: Limit,Duration
|
||||
## Default: 100KB incoming per 10 seconds.
|
||||
## listener.wss.external.rate_limit = 100KB,10s
|
||||
|
||||
|
|
|
@ -959,17 +959,18 @@ end}.
|
|||
{force_gc_policy, GcPolicy};
|
||||
("force_shutdown_policy", "default") ->
|
||||
{DefaultLen, DefaultSize} =
|
||||
case erlang:system_info(wordsize) of
|
||||
case WordSize = erlang:system_info(wordsize) of
|
||||
8 -> % arch_64
|
||||
{8000, cuttlefish_bytesize:parse("800MB")};
|
||||
4 -> % arch_32
|
||||
{1000, cuttlefish_bytesize:parse("100MB")}
|
||||
end,
|
||||
{force_shutdown_policy, #{message_queue_len => DefaultLen,
|
||||
max_heap_size => DefaultSize}};
|
||||
max_heap_size => DefaultSize div WordSize
|
||||
}};
|
||||
("force_shutdown_policy", Val) ->
|
||||
[Len, Siz] = string:tokens(Val, "| "),
|
||||
MaxSiz = case erlang:system_info(wordsize) of
|
||||
MaxSiz = case WordSize = erlang:system_info(wordsize) of
|
||||
8 -> % arch_64
|
||||
(1 bsl 59) - 1;
|
||||
4 -> % arch_32
|
||||
|
@ -983,7 +984,7 @@ end}.
|
|||
cuttlefish:invalid(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz]));
|
||||
Siz1 ->
|
||||
#{message_queue_len => list_to_integer(Len),
|
||||
max_heap_size => Siz1}
|
||||
max_heap_size => Siz1 div WordSize}
|
||||
end,
|
||||
{force_shutdown_policy, ShutdownPolicy};
|
||||
("mqueue_priorities", Val) ->
|
||||
|
@ -1289,6 +1290,11 @@ end}.
|
|||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.active_n", "emqx.listeners", [
|
||||
{default, 100},
|
||||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.zone", "emqx.listeners", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
@ -1442,6 +1448,11 @@ end}.
|
|||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.active_n", "emqx.listeners", [
|
||||
{default, 100},
|
||||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.zone", "emqx.listeners", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
|
|
@ -158,7 +158,7 @@ encode_alarm({AlarmId, #alarm{severity = Severity,
|
|||
{desc, [{severity, Severity},
|
||||
{title, iolist_to_binary(Title)},
|
||||
{summary, iolist_to_binary(Summary)},
|
||||
{timestamp, emqx_time:now_ms(Ts)}]}]);
|
||||
{timestamp, emqx_misc:now_to_secs(Ts)}]}]);
|
||||
encode_alarm({AlarmId, undefined}) ->
|
||||
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}]);
|
||||
encode_alarm({AlarmId, AlarmDesc}) ->
|
||||
|
|
|
@ -40,15 +40,11 @@
|
|||
, handle_in/2
|
||||
, handle_out/2
|
||||
, handle_call/2
|
||||
, handle_info/2
|
||||
, handle_timeout/3
|
||||
, handle_info/2
|
||||
, terminate/2
|
||||
]).
|
||||
|
||||
-export([ recvd/2
|
||||
, sent/2
|
||||
]).
|
||||
|
||||
%% export for ct
|
||||
-export([set_field/3]).
|
||||
|
||||
|
@ -75,14 +71,10 @@
|
|||
topic_aliases :: maybe(map()),
|
||||
%% MQTT Topic Alias Maximum
|
||||
alias_maximum :: maybe(map()),
|
||||
%% Publish Stats
|
||||
pub_stats :: emqx_types:stats(),
|
||||
%% Timers
|
||||
timers :: #{atom() => disabled | maybe(reference())},
|
||||
%% Conn State
|
||||
conn_state :: conn_state(),
|
||||
%% GC State
|
||||
gc_state :: maybe(emqx_gc:gc_state()),
|
||||
%% Takeover
|
||||
takeover :: boolean(),
|
||||
%% Resume
|
||||
|
@ -103,7 +95,6 @@
|
|||
-type(output() :: emqx_types:packet() | action() | [action()]).
|
||||
|
||||
-define(TIMER_TABLE, #{
|
||||
stats_timer => emit_stats,
|
||||
alive_timer => keepalive,
|
||||
retry_timer => retry_delivery,
|
||||
await_timer => expire_awaiting_rel,
|
||||
|
@ -113,8 +104,7 @@
|
|||
|
||||
-define(ATTR_KEYS, [conninfo, clientinfo, session, conn_state]).
|
||||
|
||||
-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases,
|
||||
alias_maximum, gc_state]).
|
||||
-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases, alias_maximum]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Info, Attrs and Caps
|
||||
|
@ -146,12 +136,8 @@ info(will_msg, #channel{will_msg = undefined}) ->
|
|||
undefined;
|
||||
info(will_msg, #channel{will_msg = WillMsg}) ->
|
||||
emqx_message:to_map(WillMsg);
|
||||
info(pub_stats, #channel{pub_stats = PubStats}) ->
|
||||
PubStats;
|
||||
info(timers, #channel{timers = Timers}) ->
|
||||
Timers;
|
||||
info(gc_state, #channel{gc_state = GcState}) ->
|
||||
maybe_apply(fun emqx_gc:info/1, GcState).
|
||||
Timers.
|
||||
|
||||
%% @doc Get attrs of the channel.
|
||||
-spec(attrs(channel()) -> emqx_types:attrs()).
|
||||
|
@ -164,10 +150,8 @@ attrs(session, #channel{session = Session}) ->
|
|||
attrs(Key, Channel) -> info(Key, Channel).
|
||||
|
||||
-spec(stats(channel()) -> emqx_types:stats()).
|
||||
stats(#channel{pub_stats = PubStats, session = undefined}) ->
|
||||
maps:to_list(PubStats);
|
||||
stats(#channel{pub_stats = PubStats, session = Session}) ->
|
||||
maps:to_list(PubStats) ++ emqx_session:stats(Session).
|
||||
stats(#channel{session = Session})->
|
||||
emqx_session:stats(Session).
|
||||
|
||||
-spec(caps(channel()) -> emqx_types:caps()).
|
||||
caps(#channel{clientinfo = #{zone := Zone}}) ->
|
||||
|
@ -194,7 +178,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) ->
|
|||
_ -> undefined
|
||||
end,
|
||||
Protocol = maps:get(protocol, ConnInfo, mqtt),
|
||||
MountPoint = emqx_zone:get_env(Zone, mountpoint),
|
||||
MountPoint = emqx_zone:mountpoint(Zone),
|
||||
ClientInfo = #{zone => Zone,
|
||||
protocol => Protocol,
|
||||
peerhost => PeerHost,
|
||||
|
@ -205,16 +189,10 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) ->
|
|||
is_bridge => false,
|
||||
is_superuser => false
|
||||
},
|
||||
StatsTimer = case emqx_zone:enable_stats(Zone) of
|
||||
true -> undefined;
|
||||
false -> disabled
|
||||
end,
|
||||
#channel{conninfo = ConnInfo,
|
||||
clientinfo = ClientInfo,
|
||||
pub_stats = #{},
|
||||
timers = #{stats_timer => StatsTimer},
|
||||
timers = #{},
|
||||
conn_state = idle,
|
||||
gc_state = init_gc_state(Zone),
|
||||
takeover = false,
|
||||
resuming = false,
|
||||
pendings = []
|
||||
|
@ -223,17 +201,10 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) ->
|
|||
peer_cert_as_username(Options) ->
|
||||
proplists:get_value(peer_cert_as_username, Options).
|
||||
|
||||
init_gc_state(Zone) ->
|
||||
maybe_apply(fun emqx_gc:init/1, emqx_zone:force_gc_policy(Zone)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle incoming packet
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(recvd(pos_integer(), channel()) -> channel()).
|
||||
recvd(Bytes, Channel) ->
|
||||
ensure_timer(stats_timer, maybe_gc_and_check_oom(Bytes, Channel)).
|
||||
|
||||
-spec(handle_in(emqx_types:packet(), channel())
|
||||
-> {ok, channel()}
|
||||
| {ok, output(), channel()}
|
||||
|
@ -258,74 +229,69 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
|||
end;
|
||||
|
||||
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
|
||||
NChannel = inc_pub_stats(publish_in, Channel),
|
||||
case emqx_packet:check(Packet) of
|
||||
ok -> handle_publish(Packet, NChannel);
|
||||
ok -> handle_publish(Packet, Channel);
|
||||
{error, ReasonCode} ->
|
||||
handle_out(disconnect, ReasonCode, NChannel)
|
||||
handle_out(disconnect, ReasonCode, Channel)
|
||||
end;
|
||||
|
||||
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
|
||||
Channel = #channel{clientinfo = ClientInfo, session = Session}) ->
|
||||
NChannel = inc_pub_stats(puback_in, Channel),
|
||||
case emqx_session:puback(PacketId, Session) of
|
||||
{ok, Msg, Publishes, NSession} ->
|
||||
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
||||
handle_out({publish, Publishes}, NChannel#channel{session = NSession});
|
||||
handle_out({publish, Publishes}, Channel#channel{session = NSession});
|
||||
{ok, Msg, NSession} ->
|
||||
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
||||
{ok, NChannel#channel{session = NSession}};
|
||||
{ok, Channel#channel{session = NSession}};
|
||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
|
||||
ok = emqx_metrics:inc('packets.puback.inuse'),
|
||||
{ok, NChannel};
|
||||
{ok, Channel};
|
||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
||||
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
|
||||
ok = emqx_metrics:inc('packets.puback.missed'),
|
||||
{ok, NChannel}
|
||||
{ok, Channel}
|
||||
end;
|
||||
|
||||
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode),
|
||||
Channel = #channel{clientinfo = ClientInfo, session = Session}) ->
|
||||
Channel1 = inc_pub_stats(pubrec_in, Channel),
|
||||
case emqx_session:pubrec(PacketId, Session) of
|
||||
{ok, Msg, NSession} ->
|
||||
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
||||
NChannel = Channel1#channel{session = NSession},
|
||||
NChannel = Channel#channel{session = NSession},
|
||||
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
|
||||
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||
?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]),
|
||||
ok = emqx_metrics:inc('packets.pubrec.inuse'),
|
||||
handle_out(pubrel, {PacketId, RC}, Channel1);
|
||||
handle_out(pubrel, {PacketId, RC}, Channel);
|
||||
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
||||
?LOG(warning, "The PUBREC ~w is not found.", [PacketId]),
|
||||
ok = emqx_metrics:inc('packets.pubrec.missed'),
|
||||
handle_out(pubrel, {PacketId, RC}, Channel1)
|
||||
handle_out(pubrel, {PacketId, RC}, Channel)
|
||||
end;
|
||||
|
||||
handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
||||
Channel1 = inc_pub_stats(pubrel_in, Channel),
|
||||
case emqx_session:pubrel(PacketId, Session) of
|
||||
{ok, NSession} ->
|
||||
Channel2 = Channel1#channel{session = NSession},
|
||||
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, Channel2);
|
||||
NChannel = Channel#channel{session = NSession},
|
||||
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
|
||||
{error, NotFound} ->
|
||||
ok = emqx_metrics:inc('packets.pubrel.missed'),
|
||||
?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]),
|
||||
handle_out(pubcomp, {PacketId, NotFound}, Channel1)
|
||||
handle_out(pubcomp, {PacketId, NotFound}, Channel)
|
||||
end;
|
||||
|
||||
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
||||
Channel1 = inc_pub_stats(pubcomp_in, Channel),
|
||||
case emqx_session:pubcomp(PacketId, Session) of
|
||||
{ok, Publishes, NSession} ->
|
||||
handle_out({publish, Publishes}, Channel1#channel{session = NSession});
|
||||
handle_out({publish, Publishes}, Channel#channel{session = NSession});
|
||||
{ok, NSession} ->
|
||||
{ok, Channel1#channel{session = NSession}};
|
||||
{ok, Channel#channel{session = NSession}};
|
||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
||||
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]),
|
||||
ok = emqx_metrics:inc('packets.pubcomp.missed'),
|
||||
{ok, Channel1}
|
||||
{ok, Channel}
|
||||
end;
|
||||
|
||||
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||
|
@ -422,11 +388,6 @@ process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
|
|||
%% Process Publish
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
inc_pub_stats(Key, Channel) -> inc_pub_stats(Key, 1, Channel).
|
||||
inc_pub_stats(Key, I, Channel = #channel{pub_stats = PubStats}) ->
|
||||
NPubStats = maps:update_with(Key, fun(V) -> V+I end, I, PubStats),
|
||||
Channel#channel{pub_stats = NPubStats}.
|
||||
|
||||
handle_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId),
|
||||
Channel = #channel{conninfo = #{proto_ver := ProtoVer}}) ->
|
||||
case pipeline([fun process_alias/2,
|
||||
|
@ -540,10 +501,6 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
|||
%% Handle outgoing packet
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(sent(pos_integer(), channel()) -> channel()).
|
||||
sent(Bytes, Channel) ->
|
||||
ensure_timer(stats_timer, maybe_gc_and_check_oom(Bytes, Channel)).
|
||||
|
||||
-spec(handle_out(term(), channel())
|
||||
-> {ok, channel()}
|
||||
| {ok, output(), channel()}
|
||||
|
@ -578,8 +535,7 @@ handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
|
|||
{ok, _Ch} -> Acc
|
||||
end
|
||||
end, [], Publishes),
|
||||
NChannel = inc_pub_stats(publish_out, length(Packets), Channel),
|
||||
{ok, {outgoing, lists:reverse(Packets)}, NChannel};
|
||||
{ok, {outgoing, lists:reverse(Packets)}, Channel};
|
||||
|
||||
%% Ignore loop deliver
|
||||
handle_out({publish, _PacketId, #message{from = ClientId,
|
||||
|
@ -635,16 +591,16 @@ handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn
|
|||
shutdown(Reason, ?CONNACK_PACKET(ReasonCode1), Channel);
|
||||
|
||||
handle_out(puback, {PacketId, ReasonCode}, Channel) ->
|
||||
{ok, ?PUBACK_PACKET(PacketId, ReasonCode), inc_pub_stats(puback_out, Channel)};
|
||||
{ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel};
|
||||
|
||||
handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
|
||||
{ok, ?PUBREC_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrec_out, Channel)};
|
||||
{ok, ?PUBREC_PACKET(PacketId, ReasonCode), Channel};
|
||||
|
||||
handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
|
||||
{ok, ?PUBREL_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrel_out, Channel)};
|
||||
{ok, ?PUBREL_PACKET(PacketId, ReasonCode), Channel};
|
||||
|
||||
handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
|
||||
{ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), inc_pub_stats(pubcomp_out, Channel)};
|
||||
{ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel};
|
||||
|
||||
handle_out(suback, {PacketId, ReasonCodes},
|
||||
Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
|
||||
|
@ -750,11 +706,6 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI
|
|||
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
||||
{ok, NChannel};
|
||||
|
||||
handle_info({register, Attrs, Stats}, #channel{clientinfo = #{clientid := ClientId}}) ->
|
||||
ok = emqx_cm:register_channel(ClientId),
|
||||
emqx_cm:set_chan_attrs(ClientId, Attrs),
|
||||
emqx_cm:set_chan_stats(ClientId, Stats);
|
||||
|
||||
handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) ->
|
||||
{ok, Channel};
|
||||
|
||||
|
@ -796,12 +747,6 @@ handle_info(Info, Channel) ->
|
|||
-> {ok, channel()}
|
||||
| {ok, Result :: term(), channel()}
|
||||
| {shutdown, Reason :: term(), channel()}).
|
||||
handle_timeout(TRef, {emit_stats, Stats},
|
||||
Channel = #channel{clientinfo = #{clientid := ClientId},
|
||||
timers = #{stats_timer := TRef}}) ->
|
||||
ok = emqx_cm:set_chan_stats(ClientId, Stats),
|
||||
{ok, clean_timer(stats_timer, Channel)};
|
||||
|
||||
handle_timeout(TRef, {keepalive, StatVal},
|
||||
Channel = #channel{keepalive = Keepalive,
|
||||
timers = #{alive_timer := TRef}}) ->
|
||||
|
@ -881,8 +826,6 @@ reset_timer(Name, Time, Channel) ->
|
|||
clean_timer(Name, Channel = #channel{timers = Timers}) ->
|
||||
Channel#channel{timers = maps:remove(Name, Timers)}.
|
||||
|
||||
interval(stats_timer, #channel{clientinfo = #{zone := Zone}}) ->
|
||||
emqx_zone:get_env(Zone, idle_timeout, 30000);
|
||||
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
||||
emqx_keepalive:info(interval, KeepAlive);
|
||||
interval(retry_timer, #channel{session = Session}) ->
|
||||
|
@ -920,6 +863,7 @@ publish_will_msg(undefined) ->
|
|||
publish_will_msg(Msg) ->
|
||||
emqx_broker:publish(Msg).
|
||||
|
||||
|
||||
%% @doc Enrich MQTT Connect Info.
|
||||
enrich_conninfo(#mqtt_packet_connect{
|
||||
proto_name = ProtoName,
|
||||
|
@ -1179,7 +1123,7 @@ ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) ->
|
|||
|
||||
ensure_keepalive_timer(0, Channel) -> Channel;
|
||||
ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
|
||||
Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75),
|
||||
Backoff = emqx_zone:keepalive_backoff(Zone),
|
||||
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
|
||||
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
|
||||
|
||||
|
@ -1205,19 +1149,6 @@ is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
|
|||
parse_topic_filters(TopicFilters) ->
|
||||
lists:map(fun emqx_topic:parse/1, TopicFilters).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Maybe GC and Check OOM
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
maybe_gc_and_check_oom(_Oct, Channel = #channel{gc_state = undefined}) ->
|
||||
Channel;
|
||||
maybe_gc_and_check_oom(Oct, Channel = #channel{clientinfo = #{zone := Zone},
|
||||
gc_state = GCSt}) ->
|
||||
{IsGC, GCSt1} = emqx_gc:run(1, Oct, GCSt),
|
||||
IsGC andalso emqx_metrics:inc('channel.gc.cnt'),
|
||||
IsGC andalso emqx_zone:check_oom(Zone, fun(Shutdown) -> self() ! Shutdown end),
|
||||
Channel#channel{gc_state = GCSt1}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%%-------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
|
@ -93,7 +93,6 @@ start_link() ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Register a channel.
|
||||
%% Channel will be unregistered automatically when the channel process dies
|
||||
-spec(register_channel(emqx_types:clientid()) -> ok).
|
||||
register_channel(ClientId) when is_binary(ClientId) ->
|
||||
register_channel(ClientId, self()).
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% MQTT/TCP Connection
|
||||
%% MQTT/TCP|TLS Connection
|
||||
-module(emqx_connection).
|
||||
|
||||
-include("emqx.hrl").
|
||||
|
@ -40,7 +40,7 @@
|
|||
|
||||
-export([call/2]).
|
||||
|
||||
%% callback
|
||||
%% Callback
|
||||
-export([init/4]).
|
||||
|
||||
%% Sys callbacks
|
||||
|
@ -50,12 +50,15 @@
|
|||
, system_get_state/1
|
||||
]).
|
||||
|
||||
%% Internal callbacks
|
||||
-export([wakeup_from_hib/2]).
|
||||
%% Internal callback
|
||||
-export([wakeup_from_hib/3]).
|
||||
|
||||
-import(emqx_misc,
|
||||
[ maybe_apply/2
|
||||
, start_timer/2
|
||||
]).
|
||||
|
||||
-record(state, {
|
||||
%% Parent
|
||||
parent :: pid(),
|
||||
%% TCP/TLS Transport
|
||||
transport :: esockd:transport(),
|
||||
%% TCP/TLS Socket
|
||||
|
@ -64,34 +67,37 @@
|
|||
peername :: emqx_types:peername(),
|
||||
%% Sockname of the connection
|
||||
sockname :: emqx_types:peername(),
|
||||
%% Sock state
|
||||
%% Sock State
|
||||
sockstate :: emqx_types:sockstate(),
|
||||
%% The {active, N} option
|
||||
active_n :: pos_integer(),
|
||||
%% Publish Limit
|
||||
pub_limit :: maybe(esockd_rate_limit:bucket()),
|
||||
%% Rate Limit
|
||||
rate_limit :: maybe(esockd_rate_limit:bucket()),
|
||||
%% Limiter
|
||||
limiter :: maybe(emqx_limiter:limiter()),
|
||||
%% Limit Timer
|
||||
limit_timer :: maybe(reference()),
|
||||
%% Parser State
|
||||
%% Parse State
|
||||
parse_state :: emqx_frame:parse_state(),
|
||||
%% Serialize function
|
||||
serialize :: emqx_frame:serialize_fun(),
|
||||
%% Channel State
|
||||
channel :: emqx_channel:channel(),
|
||||
%% Idle timer
|
||||
idle_timer :: reference()
|
||||
%% GC State
|
||||
gc_state :: maybe(emqx_gc:gc_state()),
|
||||
%% Stats Timer
|
||||
stats_timer :: disabled | maybe(reference()),
|
||||
%% Idle Timer
|
||||
idle_timer :: maybe(reference())
|
||||
}).
|
||||
|
||||
-type(state() :: #state{}).
|
||||
|
||||
-define(ACTIVE_N, 100).
|
||||
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n,
|
||||
pub_limit, rate_limit]).
|
||||
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n, limiter]).
|
||||
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
|
||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
|
||||
|
||||
-define(ENABLED(X), (X =/= undefined)).
|
||||
|
||||
-spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist())
|
||||
-> {ok, pid()}).
|
||||
start_link(Transport, Socket, Options) ->
|
||||
|
@ -123,13 +129,8 @@ info(sockstate, #state{sockstate = SockSt}) ->
|
|||
SockSt;
|
||||
info(active_n, #state{active_n = ActiveN}) ->
|
||||
ActiveN;
|
||||
info(pub_limit, #state{pub_limit = PubLimit}) ->
|
||||
limit_info(PubLimit);
|
||||
info(rate_limit, #state{rate_limit = RateLimit}) ->
|
||||
limit_info(RateLimit).
|
||||
|
||||
limit_info(Limit) ->
|
||||
emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit).
|
||||
info(limiter, #state{limiter = Limiter}) ->
|
||||
maybe_apply(fun emqx_limiter:info/1, Limiter).
|
||||
|
||||
%% @doc Get stats of the connection/channel.
|
||||
-spec(stats(pid()|state()) -> emqx_types:stats()).
|
||||
|
@ -147,6 +148,13 @@ stats(#state{transport = Transport,
|
|||
ProcStats = emqx_misc:proc_stats(),
|
||||
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
|
||||
|
||||
attrs(#state{active_n = ActiveN, sockstate = SockSt, channel = Channel}) ->
|
||||
SockAttrs = #{active_n => ActiveN,
|
||||
sockstate => SockSt
|
||||
},
|
||||
ChanAttrs = emqx_channel:attrs(Channel),
|
||||
maps:merge(ChanAttrs, #{sockinfo => SockAttrs}).
|
||||
|
||||
call(Pid, Req) ->
|
||||
gen_server:call(Pid, Req, infinity).
|
||||
|
||||
|
@ -169,7 +177,6 @@ init(Parent, Transport, RawSocket, Options) ->
|
|||
do_init(Parent, Transport, Socket, Options) ->
|
||||
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
|
||||
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
|
||||
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
|
||||
Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
|
||||
ConnInfo = #{socktype => Transport:type(Socket),
|
||||
peername => Peername,
|
||||
|
@ -179,42 +186,39 @@ do_init(Parent, Transport, Socket, Options) ->
|
|||
},
|
||||
Zone = proplists:get_value(zone, Options),
|
||||
ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
|
||||
PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)),
|
||||
RateLimit = init_limiter(proplists:get_value(rate_limit, Options)),
|
||||
FrameOpts = emqx_zone:frame_options(Zone),
|
||||
Limiter = emqx_limiter:init(Options),
|
||||
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
|
||||
ParseState = emqx_frame:initial_parse_state(FrameOpts),
|
||||
Serialize = emqx_frame:serialize_fun(),
|
||||
Channel = emqx_channel:init(ConnInfo, Options),
|
||||
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
|
||||
IdleTimer = emqx_misc:start_timer(IdleTimout, idle_timeout),
|
||||
HibAfterTimeout = emqx_zone:get_env(Zone, hibernate_after, IdleTimout*2),
|
||||
State = #state{parent = Parent,
|
||||
transport = Transport,
|
||||
GcState = emqx_zone:init_gc_state(Zone),
|
||||
StatsTimer = emqx_zone:stats_timer(Zone),
|
||||
IdleTimeout = emqx_zone:idle_timeout(Zone),
|
||||
IdleTimer = start_timer(IdleTimeout, idle_timeout),
|
||||
emqx_misc:tune_heap_size(emqx_zone:oom_policy(Zone)),
|
||||
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
|
||||
State = #state{transport = Transport,
|
||||
socket = Socket,
|
||||
peername = Peername,
|
||||
sockname = Sockname,
|
||||
sockstate = idle,
|
||||
active_n = ActiveN,
|
||||
pub_limit = PubLimit,
|
||||
rate_limit = RateLimit,
|
||||
limiter = Limiter,
|
||||
parse_state = ParseState,
|
||||
serialize = Serialize,
|
||||
channel = Channel,
|
||||
gc_state = GcState,
|
||||
stats_timer = StatsTimer,
|
||||
idle_timer = IdleTimer
|
||||
},
|
||||
case activate_socket(State) of
|
||||
{ok, NState} ->
|
||||
hibernate(NState, #{hibernate_after => HibAfterTimeout});
|
||||
hibernate(Parent, NState, #{idle_timeout => IdleTimeout});
|
||||
{error, Reason} ->
|
||||
ok = Transport:fast_close(Socket),
|
||||
exit_on_sock_error(Reason)
|
||||
end.
|
||||
|
||||
-compile({inline, [init_limiter/1]}).
|
||||
init_limiter(undefined) -> undefined;
|
||||
init_limiter({Rate, Burst}) ->
|
||||
esockd_rate_limit:new(Rate, Burst).
|
||||
|
||||
exit_on_sock_error(Reason) when Reason =:= einval;
|
||||
Reason =:= enotconn;
|
||||
Reason =:= closed ->
|
||||
|
@ -227,8 +231,7 @@ exit_on_sock_error(Reason) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Recv Loop
|
||||
|
||||
recvloop(State = #state{parent = Parent},
|
||||
Options = #{hibernate_after := HibAfterTimeout}) ->
|
||||
recvloop(Parent, State, Options = #{idle_timeout := IdleTimeout}) ->
|
||||
receive
|
||||
{system, From, Request} ->
|
||||
sys:handle_system_msg(Request, From, Parent,
|
||||
|
@ -236,33 +239,49 @@ recvloop(State = #state{parent = Parent},
|
|||
{'EXIT', Parent, Reason} ->
|
||||
terminate(Reason, State);
|
||||
Msg ->
|
||||
process_msg([Msg], State, Options)
|
||||
NState = ensure_stats_timer(IdleTimeout, State),
|
||||
process_msg([Msg], Parent, NState, Options)
|
||||
after
|
||||
HibAfterTimeout ->
|
||||
hibernate(State, Options)
|
||||
IdleTimeout ->
|
||||
NState = cancel_stats_timer(State),
|
||||
hibernate(Parent, NState, Options)
|
||||
end.
|
||||
|
||||
hibernate(State, Options) ->
|
||||
proc_lib:hibernate(?MODULE, wakeup_from_hib, [State, Options]).
|
||||
hibernate(Parent, State, Options) ->
|
||||
proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State, Options]).
|
||||
|
||||
wakeup_from_hib(State, Options) ->
|
||||
wakeup_from_hib(Parent, State, Options) ->
|
||||
%% Maybe do something later here.
|
||||
recvloop(State, Options).
|
||||
recvloop(Parent, State, Options).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Ensure/cancel stats timer
|
||||
|
||||
-compile({inline, [ensure_stats_timer/2]}).
|
||||
ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
|
||||
State#state{stats_timer = start_timer(Timeout, emit_stats)};
|
||||
ensure_stats_timer(_Timeout, State) -> State.
|
||||
|
||||
-compile({inline, [cancel_stats_timer/1]}).
|
||||
cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) ->
|
||||
ok = emqx_misc:cancel_timer(TRef),
|
||||
State#state{stats_timer = undefined};
|
||||
cancel_stats_timer(State) -> State.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Process next Msg
|
||||
|
||||
process_msg([], State, Options) ->
|
||||
recvloop(State, Options);
|
||||
process_msg([], Parent, State, Options) ->
|
||||
recvloop(Parent, State, Options);
|
||||
|
||||
process_msg([Msg|More], State, Options) ->
|
||||
process_msg([Msg|More], Parent, State, Options) ->
|
||||
case catch handle_msg(Msg, State) of
|
||||
ok ->
|
||||
process_msg(More, State, Options);
|
||||
process_msg(More, Parent, State, Options);
|
||||
{ok, NState} ->
|
||||
process_msg(More, NState, Options);
|
||||
{ok, NextMsgs, NState} ->
|
||||
process_msg(append_msg(NextMsgs, More), NState, Options);
|
||||
process_msg(More, Parent, NState, Options);
|
||||
{ok, Msgs, NState} ->
|
||||
process_msg(append_msg(Msgs, More), Parent, NState, Options);
|
||||
{stop, Reason} ->
|
||||
terminate(Reason, State);
|
||||
{stop, Reason, NState} ->
|
||||
|
@ -284,14 +303,12 @@ handle_msg({'$gen_call', From, Req}, State) ->
|
|||
stop(Reason, NState)
|
||||
end;
|
||||
|
||||
handle_msg({Inet, _Sock, Data}, State = #state{channel = Channel})
|
||||
when Inet == tcp; Inet == ssl ->
|
||||
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
|
||||
?LOG(debug, "RECV ~p", [Data]),
|
||||
Oct = iolist_size(Data),
|
||||
emqx_pd:update_counter(incoming_bytes, Oct),
|
||||
emqx_pd:inc_counter(incoming_bytes, Oct),
|
||||
ok = emqx_metrics:inc('bytes.received', Oct),
|
||||
NChannel = emqx_channel:recvd(Oct, Channel),
|
||||
parse_incoming(Data, State#state{channel = NChannel});
|
||||
parse_incoming(Data, State);
|
||||
|
||||
handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
|
||||
State = #state{idle_timer = IdleTimer}) ->
|
||||
|
@ -302,6 +319,9 @@ handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
|
|||
},
|
||||
handle_incoming(Packet, NState);
|
||||
|
||||
handle_msg({incoming, ?PACKET(?PINGREQ)}, State) ->
|
||||
handle_outgoing(?PACKET(?PINGRESP), State);
|
||||
|
||||
handle_msg({incoming, Packet}, State) ->
|
||||
handle_incoming(Packet, State);
|
||||
|
||||
|
@ -315,30 +335,34 @@ handle_msg({Closed, _Sock}, State)
|
|||
|
||||
handle_msg({Passive, _Sock}, State)
|
||||
when Passive == tcp_passive; Passive == ssl_passive ->
|
||||
%% Rate limit here:)
|
||||
NState = ensure_rate_limit(State),
|
||||
handle_info(activate_socket, NState);
|
||||
|
||||
%% Rate limit timer expired.
|
||||
handle_msg(activate_socket, State) ->
|
||||
NState = State#state{sockstate = idle,
|
||||
limit_timer = undefined
|
||||
},
|
||||
handle_info(activate_socket, NState);
|
||||
InStats = #{cnt => emqx_pd:reset_counter(incoming_pubs),
|
||||
oct => emqx_pd:reset_counter(incoming_bytes)
|
||||
},
|
||||
%% Ensure Rate Limit
|
||||
NState = ensure_rate_limit(InStats, State),
|
||||
%% Run GC and Check OOM
|
||||
NState1 = check_oom(run_gc(InStats, NState)),
|
||||
handle_info(activate_socket, NState1);
|
||||
|
||||
handle_msg(Deliver = {deliver, _Topic, _Msg},
|
||||
State = #state{channel = Channel}) ->
|
||||
Delivers = emqx_misc:drain_deliver([Deliver]),
|
||||
Result = emqx_channel:handle_out(Delivers, Channel),
|
||||
handle_return(Result, State);
|
||||
Delivers = [Deliver|emqx_misc:drain_deliver()],
|
||||
Ret = emqx_channel:handle_out(Delivers, Channel),
|
||||
handle_chan_return(Ret, State);
|
||||
|
||||
handle_msg({outgoing, Packets}, State) ->
|
||||
NState = handle_outgoing(Packets, State),
|
||||
{ok, NState};
|
||||
handle_outgoing(Packets, State);
|
||||
|
||||
%% something sent
|
||||
handle_msg({inet_reply, _Sock, ok}, _State) ->
|
||||
ok;
|
||||
%% Something sent
|
||||
handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
|
||||
case emqx_pd:get_counter(outgoing_pubs) > ActiveN of
|
||||
true ->
|
||||
OutStats = #{cnt => emqx_pd:reset_counter(outgoing_pubs),
|
||||
oct => emqx_pd:reset_counter(outgoing_bytes)
|
||||
},
|
||||
{ok, check_oom(run_gc(OutStats, State))};
|
||||
false -> ok
|
||||
end;
|
||||
|
||||
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||
handle_info({sock_error, Reason}, State);
|
||||
|
@ -349,7 +373,8 @@ handle_msg({timeout, TRef, TMsg}, State) ->
|
|||
handle_msg(Shutdown = {shutdown, _Reason}, State) ->
|
||||
stop(Shutdown, State);
|
||||
|
||||
handle_msg(Msg, State) -> handle_info(Msg, State).
|
||||
handle_msg(Msg, State) ->
|
||||
handle_info(Msg, State).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Terminate
|
||||
|
@ -363,8 +388,8 @@ terminate(Reason, State = #state{channel = Channel}) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Sys callbacks
|
||||
|
||||
system_continue(_Parent, _Deb, {State, Options}) ->
|
||||
recvloop(State, Options).
|
||||
system_continue(Parent, _Deb, {State, Options}) ->
|
||||
recvloop(Parent, State, Options).
|
||||
|
||||
system_terminate(Reason, _Parent, _Deb, {State, _}) ->
|
||||
terminate(Reason, State).
|
||||
|
@ -392,8 +417,8 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
|
|||
shutdown(Reason, Reply, State#state{channel = NChannel});
|
||||
{shutdown, Reason, Reply, OutPacket, NChannel} ->
|
||||
NState = State#state{channel = NChannel},
|
||||
NState1 = handle_outgoing(OutPacket, NState),
|
||||
shutdown(Reason, Reply, NState1)
|
||||
ok = handle_outgoing(OutPacket, NState),
|
||||
shutdown(Reason, Reply, NState)
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -402,8 +427,18 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
|
|||
handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
|
||||
shutdown(idle_timeout, State);
|
||||
|
||||
handle_timeout(TRef, emit_stats, State) ->
|
||||
handle_timeout(TRef, {emit_stats, stats(State)}, State);
|
||||
handle_timeout(TRef, limit_timeout, State = #state{limit_timer = TRef}) ->
|
||||
NState = State#state{sockstate = idle,
|
||||
limit_timer = undefined
|
||||
},
|
||||
handle_info(activate_socket, NState);
|
||||
|
||||
handle_timeout(TRef, emit_stats, State = #state{stats_timer = TRef,
|
||||
channel = Channel}) ->
|
||||
#{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
|
||||
(ClientId =/= undefined) andalso
|
||||
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||
{ok, State#state{stats_timer = undefined}};
|
||||
|
||||
handle_timeout(TRef, keepalive, State = #state{transport = Transport,
|
||||
socket = Socket}) ->
|
||||
|
@ -415,7 +450,8 @@ handle_timeout(TRef, keepalive, State = #state{transport = Transport,
|
|||
end;
|
||||
|
||||
handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
|
||||
handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
|
||||
Ret = emqx_channel:handle_timeout(TRef, Msg, Channel),
|
||||
handle_chan_return(Ret, State).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Parse incoming data
|
||||
|
@ -450,30 +486,30 @@ next_incoming_msgs(Packets) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Handle incoming packet
|
||||
|
||||
handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) ->
|
||||
_ = inc_incoming_stats(Type),
|
||||
ok = emqx_metrics:inc_recv(Packet),
|
||||
handle_incoming(Packet, State = #state{channel = Channel})
|
||||
when is_record(Packet, mqtt_packet) ->
|
||||
ok = inc_incoming_stats(Packet),
|
||||
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
|
||||
handle_return(emqx_channel:handle_in(Packet, Channel), State);
|
||||
handle_chan_return(emqx_channel:handle_in(Packet, Channel), State);
|
||||
|
||||
handle_incoming(FrameError, State = #state{channel = Channel}) ->
|
||||
handle_return(emqx_channel:handle_in(FrameError, Channel), State).
|
||||
handle_chan_return(emqx_channel:handle_in(FrameError, Channel), State).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle channel return
|
||||
|
||||
handle_return(ok, State) ->
|
||||
handle_chan_return(ok, State) ->
|
||||
{ok, State};
|
||||
handle_return({ok, NChannel}, State) ->
|
||||
handle_chan_return({ok, NChannel}, State) ->
|
||||
{ok, State#state{channel = NChannel}};
|
||||
handle_return({ok, Replies, NChannel}, State) ->
|
||||
handle_chan_return({ok, Replies, NChannel}, State) ->
|
||||
{ok, next_msgs(Replies), State#state{channel = NChannel}};
|
||||
handle_return({shutdown, Reason, NChannel}, State) ->
|
||||
handle_chan_return({shutdown, Reason, NChannel}, State) ->
|
||||
shutdown(Reason, State#state{channel = NChannel});
|
||||
handle_return({shutdown, Reason, OutPacket, NChannel}, State) ->
|
||||
handle_chan_return({shutdown, Reason, OutPacket, NChannel}, State) ->
|
||||
NState = State#state{channel = NChannel},
|
||||
NState1 = handle_outgoing(OutPacket, NState),
|
||||
shutdown(Reason, NState1).
|
||||
ok = handle_outgoing(OutPacket, NState),
|
||||
shutdown(Reason, NState).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle outgoing packets
|
||||
|
@ -485,14 +521,13 @@ handle_outgoing(Packet, State) ->
|
|||
send((serialize_and_inc_stats_fun(State))(Packet), State).
|
||||
|
||||
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
||||
fun(Packet = ?PACKET(Type)) ->
|
||||
fun(Packet) ->
|
||||
case Serialize(Packet) of
|
||||
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
|
||||
[emqx_packet:format(Packet)]),
|
||||
<<>>;
|
||||
Data -> _ = inc_outgoing_stats(Type),
|
||||
_ = emqx_metrics:inc_sent(Packet),
|
||||
?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
|
||||
Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
|
||||
ok = inc_outgoing_stats(Packet),
|
||||
Data
|
||||
end
|
||||
end.
|
||||
|
@ -500,52 +535,52 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Send data
|
||||
|
||||
send(IoData, State = #state{transport = Transport,
|
||||
socket = Socket,
|
||||
channel = Channel}) ->
|
||||
-spec(send(iodata(), state()) -> ok).
|
||||
send(IoData, #state{transport = Transport, socket = Socket}) ->
|
||||
Oct = iolist_size(IoData),
|
||||
ok = emqx_metrics:inc('bytes.sent', Oct),
|
||||
emqx_pd:inc_counter(outgoing_bytes, Oct),
|
||||
case Transport:async_send(Socket, IoData) of
|
||||
ok ->
|
||||
State#state{channel = emqx_channel:sent(Oct, Channel)};
|
||||
ok -> ok;
|
||||
Error = {error, _Reason} ->
|
||||
%% Simulate an inet_reply to postpone handling the error
|
||||
self() ! {inet_reply, Socket, Error}, State
|
||||
%% Send an inet_reply to postpone handling the error
|
||||
self() ! {inet_reply, Socket, Error},
|
||||
ok
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle Info
|
||||
|
||||
handle_info({connack, ConnAck}, State = #state{active_n = ActiveN,
|
||||
sockstate = SockSt,
|
||||
channel = Channel}) ->
|
||||
NState = handle_outgoing(ConnAck, State),
|
||||
ChanAttrs = emqx_channel:attrs(Channel),
|
||||
SockAttrs = #{active_n => ActiveN,
|
||||
sockstate => SockSt
|
||||
},
|
||||
Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
|
||||
handle_info({register, Attrs, stats(State)}, NState);
|
||||
handle_info({connack, ConnAck}, State = #state{channel = Channel}) ->
|
||||
#{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
|
||||
ok = emqx_cm:register_channel(ClientId),
|
||||
ok = emqx_cm:set_chan_attrs(ClientId, attrs(State)),
|
||||
ok = emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||
ok = handle_outgoing(ConnAck, State);
|
||||
|
||||
handle_info({enter, disconnected}, State = #state{active_n = ActiveN,
|
||||
sockstate = SockSt,
|
||||
channel = Channel}) ->
|
||||
ChanAttrs = emqx_channel:attrs(Channel),
|
||||
SockAttrs = #{active_n => ActiveN,
|
||||
sockstate => SockSt
|
||||
},
|
||||
Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
|
||||
handle_info({register, Attrs, stats(State)}, State);
|
||||
handle_info({enter, disconnected}, State = #state{channel = Channel}) ->
|
||||
#{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
|
||||
emqx_cm:set_chan_attrs(ClientId, attrs(State)),
|
||||
emqx_cm:set_chan_stats(ClientId, stats(State));
|
||||
|
||||
handle_info(activate_socket, State) ->
|
||||
handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
|
||||
case activate_socket(State) of
|
||||
{ok, NState} -> {ok, NState};
|
||||
{ok, NState = #state{sockstate = NewSst}} ->
|
||||
if OldSst =/= NewSst ->
|
||||
{ok, {event, sockstate_changed}, NState};
|
||||
true -> {ok, NState}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
handle_info({sock_error, Reason}, State)
|
||||
end;
|
||||
|
||||
handle_info({event, sockstate_changed}, State = #state{channel = Channel}) ->
|
||||
#{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
|
||||
ClientId =/= undefined andalso emqx_cm:set_chan_attrs(ClientId, attrs(State));
|
||||
|
||||
%%TODO: this is not right
|
||||
handle_info({sock_error, _Reason}, #state{sockstate = closed}) -> ok;
|
||||
handle_info({sock_error, _Reason}, #state{sockstate = closed}) ->
|
||||
ok;
|
||||
handle_info({sock_error, Reason}, State) ->
|
||||
?LOG(debug, "Socket error: ~p", [Reason]),
|
||||
handle_info({sock_closed, Reason}, close_socket(State));
|
||||
|
@ -560,7 +595,45 @@ handle_info({close, Reason}, State) ->
|
|||
handle_info({sock_closed, Reason}, close_socket(State));
|
||||
|
||||
handle_info(Info, State = #state{channel = Channel}) ->
|
||||
handle_return(emqx_channel:handle_info(Info, Channel), State).
|
||||
handle_chan_return(emqx_channel:handle_info(Info, Channel), State).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Ensure rate limit
|
||||
|
||||
ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
||||
case ?ENABLED(limiter) andalso emqx_limiter:check(Stats, Limiter) of
|
||||
false -> State;
|
||||
{ok, Limiter1} ->
|
||||
State#state{limiter = Limiter1};
|
||||
{pause, Time, Limiter1} ->
|
||||
?LOG(debug, "Pause ~pms due to rate limit", [Time]),
|
||||
TRef = start_timer(Time, limit_timeout),
|
||||
State#state{sockstate = blocked,
|
||||
limiter = Limiter1,
|
||||
limit_timer = TRef
|
||||
}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Run GC and Check OOM
|
||||
|
||||
run_gc(Stats, State = #state{gc_state = GcSt}) ->
|
||||
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
|
||||
false -> State;
|
||||
{IsGC, GcSt1} ->
|
||||
IsGC andalso emqx_metrics:inc('channel.gc.cnt'),
|
||||
State#state{gc_state = GcSt1}
|
||||
end.
|
||||
|
||||
check_oom(State = #state{channel = Channel}) ->
|
||||
#{zone := Zone} = emqx_channel:info(clientinfo, Channel),
|
||||
OomPolicy = emqx_zone:oom_policy(Zone),
|
||||
case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
|
||||
Shutdown = {shutdown, _Reason} ->
|
||||
erlang:send(self(), Shutdown);
|
||||
_Other -> ok
|
||||
end,
|
||||
State.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Activate Socket
|
||||
|
@ -587,48 +660,30 @@ close_socket(State = #state{transport = Transport, socket = Socket}) ->
|
|||
ok = Transport:fast_close(Socket),
|
||||
State#state{sockstate = closed}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Ensure rate limit
|
||||
|
||||
-define(ENABLED(Rl), (Rl =/= undefined)).
|
||||
|
||||
ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl}) ->
|
||||
Pubs = emqx_pd:reset_counter(incoming_pubs),
|
||||
Bytes = emqx_pd:reset_counter(incoming_bytes),
|
||||
Limiters = [{Pl, #state.pub_limit, Pubs} || ?ENABLED(Pl)] ++
|
||||
[{Rl, #state.rate_limit, Bytes} || ?ENABLED(Rl)],
|
||||
ensure_rate_limit(Limiters, State).
|
||||
|
||||
ensure_rate_limit([], State) ->
|
||||
State;
|
||||
ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) ->
|
||||
case esockd_rate_limit:check(Cnt, Rl) of
|
||||
{0, Rl1} ->
|
||||
ensure_rate_limit(Limiters, setelement(Pos, State, Rl1));
|
||||
{Pause, Rl1} ->
|
||||
?LOG(debug, "Pause ~pms due to rate limit", [Pause]),
|
||||
TRef = erlang:send_after(Pause, self(), activate_socket),
|
||||
NState = State#state{sockstate = blocked, limit_timer = TRef},
|
||||
setelement(Pos, NState, Rl1)
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Inc incoming/outgoing stats
|
||||
|
||||
-compile({inline, [inc_incoming_stats/1]}).
|
||||
inc_incoming_stats(Type) when is_integer(Type) ->
|
||||
emqx_pd:update_counter(recv_pkt, 1),
|
||||
inc_incoming_stats(Packet = ?PACKET(Type)) ->
|
||||
emqx_pd:inc_counter(recv_pkt, 1),
|
||||
if
|
||||
Type == ?PUBLISH ->
|
||||
emqx_pd:update_counter(recv_msg, 1),
|
||||
emqx_pd:update_counter(incoming_pubs, 1);
|
||||
emqx_pd:inc_counter(recv_msg, 1),
|
||||
emqx_pd:inc_counter(incoming_pubs, 1);
|
||||
true -> ok
|
||||
end.
|
||||
end,
|
||||
emqx_metrics:inc_recv(Packet).
|
||||
|
||||
-compile({inline, [inc_outgoing_stats/1]}).
|
||||
inc_outgoing_stats(Type) ->
|
||||
emqx_pd:update_counter(send_pkt, 1),
|
||||
(Type == ?PUBLISH) andalso emqx_pd:update_counter(send_msg, 1).
|
||||
inc_outgoing_stats(Packet = ?PACKET(Type)) ->
|
||||
emqx_pd:inc_counter(send_pkt, 1),
|
||||
if
|
||||
Type == ?PUBLISH ->
|
||||
emqx_pd:inc_counter(send_msg, 1),
|
||||
emqx_pd:inc_counter(outgoing_pubs, 1);
|
||||
true -> ok
|
||||
end,
|
||||
emqx_metrics:inc_sent(Packet).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
|
@ -646,13 +701,14 @@ next_msgs(Action) when is_tuple(Action) ->
|
|||
next_msgs(Actions) when is_list(Actions) ->
|
||||
Actions.
|
||||
|
||||
-compile({inline, [shutdown/2, shutdown/3]}).
|
||||
shutdown(Reason, State) ->
|
||||
stop({shutdown, Reason}, State).
|
||||
|
||||
shutdown(Reason, Reply, State) ->
|
||||
stop({shutdown, Reason}, Reply, State).
|
||||
|
||||
-compile({inline, [stop/2]}).
|
||||
-compile({inline, [stop/2, stop/3]}).
|
||||
stop(Reason, State) ->
|
||||
{stop, Reason, State}.
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ detect(#{clientid := ClientId, peerhost := PeerHost},
|
|||
%% Create a flapping record.
|
||||
Flapping = #flapping{clientid = ClientId,
|
||||
peerhost = PeerHost,
|
||||
started_at = emqx_time:now_ms(),
|
||||
started_at = erlang:system_time(millisecond),
|
||||
detect_cnt = 1
|
||||
},
|
||||
true = ets:insert(?FLAPPING_TAB, Flapping),
|
||||
|
@ -111,7 +111,7 @@ detect(#{clientid := ClientId, peerhost := PeerHost},
|
|||
get_policy() ->
|
||||
emqx:get_env(flapping_detect_policy, ?DEFAULT_DETECT_POLICY).
|
||||
|
||||
now_diff(TS) -> emqx_time:now_ms() - TS.
|
||||
now_diff(TS) -> erlang:system_time(millisecond) - TS.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
|
@ -143,7 +143,7 @@ handle_cast({detected, Flapping = #flapping{clientid = ClientId,
|
|||
[ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Duration]),
|
||||
%% Banned.
|
||||
BannedFlapping = Flapping#flapping{clientid = {banned, ClientId},
|
||||
banned_at = emqx_time:now_ms()
|
||||
banned_at = erlang:system_time(millisecond)
|
||||
},
|
||||
alarm_handler:set_alarm({{flapping_detected, ClientId}, BannedFlapping}),
|
||||
ets:insert(?FLAPPING_TAB, BannedFlapping);
|
||||
|
@ -160,7 +160,8 @@ handle_cast(Msg, State) ->
|
|||
|
||||
handle_info({timeout, TRef, expire_flapping}, State = #{tref := TRef}) ->
|
||||
with_flapping_tab(fun expire_flapping/2,
|
||||
[emqx_time:now_ms(), get_policy()]),
|
||||
[erlang:system_time(millisecond),
|
||||
get_policy()]),
|
||||
{noreply, ensure_timer(State#{tref => undefined}), hibernate};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
-include("types.hrl").
|
||||
|
||||
-export([ init/1
|
||||
, run/2
|
||||
, run/3
|
||||
, info/1
|
||||
, reset/1
|
||||
|
@ -57,21 +58,26 @@ init(#{count := Count, bytes := Bytes}) ->
|
|||
?GCS(maps:from_list(Cnt ++ Oct)).
|
||||
|
||||
%% @doc Try to run GC based on reduntions of count or bytes.
|
||||
-spec(run(#{cnt := pos_integer(), oct := pos_integer()}, gc_state())
|
||||
-> {boolean(), gc_state()}).
|
||||
run(#{cnt := Cnt, oct := Oct}, GcSt) ->
|
||||
run(Cnt, Oct, GcSt).
|
||||
|
||||
-spec(run(pos_integer(), pos_integer(), gc_state())
|
||||
-> {boolean(), gc_state()}).
|
||||
run(Cnt, Oct, ?GCS(St)) ->
|
||||
{Res, St1} = run([{cnt, Cnt}, {oct, Oct}], St),
|
||||
{Res, St1} = do_run([{cnt, Cnt}, {oct, Oct}], St),
|
||||
{Res, ?GCS(St1)}.
|
||||
|
||||
run([], St) ->
|
||||
do_run([], St) ->
|
||||
{false, St};
|
||||
run([{K, N}|T], St) ->
|
||||
do_run([{K, N}|T], St) ->
|
||||
case dec(K, N, St) of
|
||||
{true, St1} ->
|
||||
true = erlang:garbage_collect(),
|
||||
erlang:garbage_collect(),
|
||||
{true, do_reset(St1)};
|
||||
{false, St1} ->
|
||||
run(T, St1)
|
||||
do_run(T, St1)
|
||||
end.
|
||||
|
||||
%% @doc Info of GC state.
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_limiter).
|
||||
|
||||
-include("types.hrl").
|
||||
|
||||
-export([init/1, info/1, check/2]).
|
||||
|
||||
-import(emqx_misc, [maybe_apply/2]).
|
||||
|
||||
-record(limiter, {
|
||||
%% Publish Limit
|
||||
pub_limit :: maybe(esockd_rate_limit:bucket()),
|
||||
%% Rate Limit
|
||||
rate_limit :: maybe(esockd_rate_limit:bucket())
|
||||
}).
|
||||
|
||||
-type(limiter() :: #limiter{}).
|
||||
|
||||
-export_type([limiter/0]).
|
||||
|
||||
-define(ENABLED(Rl), (Rl =/= undefined)).
|
||||
|
||||
-spec(init(proplists:proplist()) -> maybe(limiter())).
|
||||
init(Options) ->
|
||||
Zone = proplists:get_value(zone, Options),
|
||||
Pl = emqx_zone:publish_limit(Zone),
|
||||
Rl = proplists:get_value(rate_limit, Options),
|
||||
case ?ENABLED(Pl) or ?ENABLED(Rl) of
|
||||
true -> #limiter{pub_limit = init_limit(Pl),
|
||||
rate_limit = init_limit(Rl)
|
||||
};
|
||||
false -> undefined
|
||||
end.
|
||||
|
||||
init_limit(Rl) ->
|
||||
maybe_apply(fun esockd_rate_limit:new/1, Rl).
|
||||
|
||||
info(#limiter{pub_limit = Pl, rate_limit = Rl}) ->
|
||||
#{pub_limit => info(Pl), rate_limit => info(Rl)};
|
||||
|
||||
info(Rl) ->
|
||||
maybe_apply(fun esockd_rate_limit:info/1, Rl).
|
||||
|
||||
check(#{cnt := Cnt, oct := Oct}, Limiter = #limiter{pub_limit = Pl,
|
||||
rate_limit = Rl}) ->
|
||||
do_check([{#limiter.pub_limit, Cnt, Pl} || ?ENABLED(Pl)] ++
|
||||
[{#limiter.rate_limit, Oct, Rl} || ?ENABLED(Rl)], Limiter).
|
||||
|
||||
do_check([], Limiter) ->
|
||||
{ok, Limiter};
|
||||
do_check([{Pos, Cnt, Rl}|More], Limiter) ->
|
||||
case esockd_rate_limit:check(Cnt, Rl) of
|
||||
{0, Rl1} ->
|
||||
do_check(More, setelement(Pos, Limiter, Rl1));
|
||||
{Pause, Rl1} ->
|
||||
{pause, Pause, setelement(Pos, Limiter, Rl1)}
|
||||
end.
|
||||
|
|
@ -281,7 +281,7 @@ do_inc_recv(?PUBLISH_PACKET(QoS, _PktId)) ->
|
|||
?QOS_0 -> inc('messages.qos0.received');
|
||||
?QOS_1 -> inc('messages.qos1.received');
|
||||
?QOS_2 -> inc('messages.qos2.received');
|
||||
_ -> ignore
|
||||
_ -> ok
|
||||
end,
|
||||
inc('packets.publish.received');
|
||||
do_inc_recv(?PACKET(?PUBACK)) ->
|
||||
|
@ -302,13 +302,12 @@ do_inc_recv(?PACKET(?DISCONNECT)) ->
|
|||
inc('packets.disconnect.received');
|
||||
do_inc_recv(?PACKET(?AUTH)) ->
|
||||
inc('packets.auth.received');
|
||||
do_inc_recv(_Packet) ->
|
||||
ignore.
|
||||
do_inc_recv(_Packet) -> ok.
|
||||
|
||||
%% @doc Inc packets sent. Will not count $SYS PUBLISH.
|
||||
-spec(inc_sent(emqx_types:packet()) -> ok | ignore).
|
||||
-spec(inc_sent(emqx_types:packet()) -> ok).
|
||||
inc_sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) ->
|
||||
ignore;
|
||||
ok;
|
||||
inc_sent(Packet) ->
|
||||
inc('packets.sent'),
|
||||
do_inc_sent(Packet).
|
||||
|
@ -349,8 +348,7 @@ do_inc_sent(?PACKET(?DISCONNECT)) ->
|
|||
inc('packets.disconnect.sent');
|
||||
do_inc_sent(?PACKET(?AUTH)) ->
|
||||
inc('packets.auth.sent');
|
||||
do_inc_sent(_Packet) ->
|
||||
ignore.
|
||||
do_inc_sent(_Packet) -> ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-module(emqx_misc).
|
||||
|
||||
-compile(inline).
|
||||
|
||||
-include("types.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
|
@ -26,22 +28,20 @@
|
|||
, start_timer/2
|
||||
, start_timer/3
|
||||
, cancel_timer/1
|
||||
, drain_deliver/0
|
||||
, drain_down/1
|
||||
, check_oom/1
|
||||
, check_oom/2
|
||||
, tune_heap_size/1
|
||||
, proc_name/2
|
||||
, proc_stats/0
|
||||
, proc_stats/1
|
||||
, rand_seed/0
|
||||
, now_to_secs/1
|
||||
, now_to_ms/1
|
||||
, index_of/2
|
||||
]).
|
||||
|
||||
-export([ drain_deliver/0
|
||||
, drain_deliver/1
|
||||
, drain_down/1
|
||||
]).
|
||||
|
||||
-compile({inline,
|
||||
[ start_timer/2
|
||||
, start_timer/3
|
||||
]}).
|
||||
|
||||
%% @doc Merge options
|
||||
-spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()).
|
||||
merge_opts(Defaults, Options) ->
|
||||
|
@ -112,6 +112,68 @@ cancel_timer(Timer) when is_reference(Timer) ->
|
|||
end;
|
||||
cancel_timer(_) -> ok.
|
||||
|
||||
%% @doc Drain delivers from the channel proc's mailbox.
|
||||
drain_deliver() ->
|
||||
drain_deliver([]).
|
||||
|
||||
drain_deliver(Acc) ->
|
||||
receive
|
||||
Deliver = {deliver, _Topic, _Msg} ->
|
||||
drain_deliver([Deliver|Acc])
|
||||
after 0 ->
|
||||
lists:reverse(Acc)
|
||||
end.
|
||||
|
||||
%% @doc Drain process 'DOWN' events.
|
||||
-spec(drain_down(pos_integer()) -> list(pid())).
|
||||
drain_down(Cnt) when Cnt > 0 ->
|
||||
drain_down(Cnt, []).
|
||||
|
||||
drain_down(0, Acc) ->
|
||||
lists:reverse(Acc);
|
||||
drain_down(Cnt, Acc) ->
|
||||
receive
|
||||
{'DOWN', _MRef, process, Pid, _Reason} ->
|
||||
drain_down(Cnt - 1, [Pid|Acc])
|
||||
after 0 ->
|
||||
lists:reverse(Acc)
|
||||
end.
|
||||
|
||||
%% @doc Check process's mailbox and heapsize against OOM policy,
|
||||
%% return `ok | {shutdown, Reason}' accordingly.
|
||||
%% `ok': There is nothing out of the ordinary.
|
||||
%% `shutdown': Some numbers (message queue length hit the limit),
|
||||
%% hence shutdown for greater good (system stability).
|
||||
-spec(check_oom(emqx_types:oom_policy()) -> ok | {shutdown, term()}).
|
||||
check_oom(Policy) ->
|
||||
check_oom(self(), Policy).
|
||||
|
||||
-spec(check_oom(pid(), emqx_types:oom_policy()) -> ok | {shutdown, term()}).
|
||||
check_oom(Pid, #{message_queue_len := MaxQLen,
|
||||
max_heap_size := MaxHeapSize}) ->
|
||||
case process_info(Pid, [message_queue_len, total_heap_size]) of
|
||||
undefined -> ok;
|
||||
[{message_queue_len, QLen}, {total_heap_size, HeapSize}] ->
|
||||
do_check_oom([{QLen, MaxQLen, message_queue_too_long},
|
||||
{HeapSize, MaxHeapSize, proc_heap_too_large}
|
||||
])
|
||||
end.
|
||||
|
||||
do_check_oom([]) -> ok;
|
||||
do_check_oom([{Val, Max, Reason}|Rest]) ->
|
||||
case is_integer(Max) andalso (0 < Max) andalso (Max < Val) of
|
||||
true -> {shutdown, Reason};
|
||||
false -> do_check_oom(Rest)
|
||||
end.
|
||||
|
||||
tune_heap_size(#{max_heap_size := MaxHeapSize}) ->
|
||||
%% If set to zero, the limit is disabled.
|
||||
erlang:process_flag(max_heap_size, #{size => MaxHeapSize,
|
||||
kill => false,
|
||||
error_logger => true
|
||||
});
|
||||
tune_heap_size(undefined) -> ok.
|
||||
|
||||
-spec(proc_name(atom(), pos_integer()) -> atom()).
|
||||
proc_name(Mod, Id) ->
|
||||
list_to_atom(lists:concat([Mod, "_", Id])).
|
||||
|
@ -132,32 +194,16 @@ proc_stats(Pid) ->
|
|||
[{mailbox_len, Len}|ProcStats]
|
||||
end.
|
||||
|
||||
%% @doc Drain delivers from the channel's mailbox.
|
||||
drain_deliver() ->
|
||||
drain_deliver([]).
|
||||
rand_seed() ->
|
||||
rand:seed(exsplus, erlang:timestamp()).
|
||||
|
||||
drain_deliver(Acc) ->
|
||||
receive
|
||||
Deliver = {deliver, _Topic, _Msg} ->
|
||||
drain_deliver([Deliver|Acc])
|
||||
after 0 ->
|
||||
lists:reverse(Acc)
|
||||
end.
|
||||
-spec(now_to_secs(erlang:timestamp()) -> pos_integer()).
|
||||
now_to_secs({MegaSecs, Secs, _MicroSecs}) ->
|
||||
MegaSecs * 1000000 + Secs.
|
||||
|
||||
%% @doc Drain process down events.
|
||||
-spec(drain_down(pos_integer()) -> list(pid())).
|
||||
drain_down(Cnt) when Cnt > 0 ->
|
||||
drain_down(Cnt, []).
|
||||
|
||||
drain_down(0, Acc) ->
|
||||
lists:reverse(Acc);
|
||||
drain_down(Cnt, Acc) ->
|
||||
receive
|
||||
{'DOWN', _MRef, process, Pid, _Reason} ->
|
||||
drain_down(Cnt - 1, [Pid|Acc])
|
||||
after 0 ->
|
||||
drain_down(0, Acc)
|
||||
end.
|
||||
-spec(now_to_ms(erlang:timestamp()) -> pos_integer()).
|
||||
now_to_ms({MegaSecs, Secs, MicroSecs}) ->
|
||||
(MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000).
|
||||
|
||||
%% lists:index_of/2
|
||||
index_of(E, L) ->
|
||||
|
|
|
@ -62,7 +62,7 @@ on_client_connected(ClientInfo, ConnAck, ConnInfo, Env) ->
|
|||
connack => ConnAck,
|
||||
clean_start => CleanStart,
|
||||
expiry_interval => ExpiryInterval,
|
||||
ts => emqx_time:now_ms()
|
||||
ts => erlang:system_time(millisecond)
|
||||
},
|
||||
case emqx_json:safe_encode(Presence) of
|
||||
{ok, Payload} ->
|
||||
|
@ -78,7 +78,7 @@ on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
|
|||
Presence = #{clientid => ClientId,
|
||||
username => Username,
|
||||
reason => reason(Reason),
|
||||
ts => emqx_time:now_ms()
|
||||
ts => erlang:system_time(millisecond)
|
||||
},
|
||||
case emqx_json:safe_encode(Presence) of
|
||||
{ok, Payload} ->
|
||||
|
|
|
@ -1,97 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc OOM (Out Of Memory) monitor for the channel process.
|
||||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_oom).
|
||||
|
||||
-include("types.hrl").
|
||||
|
||||
-export([ init/1
|
||||
, check/1
|
||||
, info/1
|
||||
]).
|
||||
|
||||
-export_type([opts/0, oom_policy/0]).
|
||||
|
||||
-type(opts() :: #{message_queue_len => non_neg_integer(),
|
||||
max_heap_size => non_neg_integer()
|
||||
}).
|
||||
|
||||
-opaque(oom_policy() :: {oom_policy, opts()}).
|
||||
|
||||
-type(reason() :: message_queue_too_long|proc_heap_too_large).
|
||||
|
||||
-define(DISABLED, 0).
|
||||
|
||||
%% @doc Init the OOM policy.
|
||||
-spec(init(opts()) -> oom_policy()).
|
||||
init(#{message_queue_len := MaxQLen,
|
||||
max_heap_size := MaxHeapSizeInBytes}) ->
|
||||
MaxHeapSize = MaxHeapSizeInBytes div erlang:system_info(wordsize),
|
||||
%% If set to zero, the limit is disabled.
|
||||
_ = erlang:process_flag(max_heap_size, #{size => MaxHeapSize,
|
||||
kill => false,
|
||||
error_logger => true
|
||||
}),
|
||||
{oom_policy, #{message_queue_len => MaxQLen,
|
||||
max_heap_size => MaxHeapSize
|
||||
}}.
|
||||
|
||||
%% @doc Check self() process status against channel process management policy,
|
||||
%% return `ok | {shutdown, Reason}' accordingly.
|
||||
%% `ok': There is nothing out of the ordinary.
|
||||
%% `shutdown': Some numbers (message queue length hit the limit),
|
||||
%% hence shutdown for greater good (system stability).
|
||||
-spec(check(oom_policy()) -> ok | {shutdown, reason()}).
|
||||
check({oom_policy, #{message_queue_len := MaxQLen,
|
||||
max_heap_size := MaxHeapSize}}) ->
|
||||
Qlength = proc_info(message_queue_len),
|
||||
HeapSize = proc_info(total_heap_size),
|
||||
do_check([{fun() -> is_exceeded(Qlength, MaxQLen) end,
|
||||
{shutdown, message_queue_too_long}},
|
||||
{fun() -> is_exceeded(HeapSize, MaxHeapSize) end,
|
||||
{shutdown, proc_heap_too_large}}]).
|
||||
|
||||
do_check([]) -> ok;
|
||||
do_check([{Pred, Result} | Rest]) ->
|
||||
case Pred() of
|
||||
true -> Result;
|
||||
false -> do_check(Rest)
|
||||
end.
|
||||
|
||||
-spec(info(oom_policy()) -> opts()).
|
||||
info({oom_policy, Opts}) -> Opts.
|
||||
|
||||
-compile({inline,
|
||||
[ is_exceeded/2
|
||||
, is_enabled/1
|
||||
, proc_info/1
|
||||
]}).
|
||||
|
||||
is_exceeded(Val, Max) ->
|
||||
is_enabled(Max) andalso Val > Max.
|
||||
|
||||
is_enabled(Max) ->
|
||||
is_integer(Max) andalso Max > ?DISABLED.
|
||||
|
||||
proc_info(Key) ->
|
||||
{Key, Value} = erlang:process_info(self(), Key),
|
||||
Value.
|
||||
|
|
@ -21,14 +21,14 @@
|
|||
|
||||
-export([ get_counters/1
|
||||
, get_counter/1
|
||||
, update_counter/2
|
||||
, inc_counter/2
|
||||
, reset_counter/1
|
||||
]).
|
||||
|
||||
-compile({inline,
|
||||
[ get_counters/1
|
||||
, get_counter/1
|
||||
, update_counter/2
|
||||
, inc_counter/2
|
||||
, reset_counter/1
|
||||
]}).
|
||||
|
||||
|
@ -42,8 +42,8 @@ get_counters(Keys) when is_list(Keys) ->
|
|||
get_counter(Key) ->
|
||||
case get(Key) of undefined -> 0; Cnt -> Cnt end.
|
||||
|
||||
-spec(update_counter(key(), number()) -> maybe(number())).
|
||||
update_counter(Key, Inc) ->
|
||||
-spec(inc_counter(key(), number()) -> maybe(number())).
|
||||
inc_counter(Key, Inc) ->
|
||||
put(Key, get_counter(Key) + Inc).
|
||||
|
||||
-spec(reset_counter(key()) -> number()).
|
||||
|
|
|
@ -1,52 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_time).
|
||||
|
||||
-export([ seed/0
|
||||
, now_secs/0
|
||||
, now_secs/1
|
||||
, now_ms/0
|
||||
, now_ms/1
|
||||
]).
|
||||
|
||||
-compile({inline,
|
||||
[ seed/0
|
||||
, now_secs/0
|
||||
, now_secs/1
|
||||
, now_ms/0
|
||||
, now_ms/1
|
||||
]}).
|
||||
|
||||
seed() ->
|
||||
rand:seed(exsplus, erlang:timestamp()).
|
||||
|
||||
-spec(now_secs() -> pos_integer()).
|
||||
now_secs() ->
|
||||
erlang:system_time(second).
|
||||
|
||||
-spec(now_secs(erlang:timestamp()) -> pos_integer()).
|
||||
now_secs({MegaSecs, Secs, _MicroSecs}) ->
|
||||
MegaSecs * 1000000 + Secs.
|
||||
|
||||
-spec(now_ms() -> pos_integer()).
|
||||
now_ms() ->
|
||||
erlang:system_time(millisecond).
|
||||
|
||||
-spec(now_ms(erlang:timestamp()) -> pos_integer()).
|
||||
now_ms({MegaSecs, Secs, MicroSecs}) ->
|
||||
(MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000).
|
||||
|
|
@ -85,6 +85,8 @@
|
|||
, stats/0
|
||||
]).
|
||||
|
||||
-export_type([oom_policy/0]).
|
||||
|
||||
-type(ver() :: ?MQTT_PROTO_V3
|
||||
| ?MQTT_PROTO_V4
|
||||
| ?MQTT_PROTO_V5).
|
||||
|
@ -186,3 +188,7 @@
|
|||
-type(infos() :: #{atom() => term()}).
|
||||
-type(stats() :: #{atom() => non_neg_integer()|stats()}).
|
||||
|
||||
-type(oom_policy() :: #{message_queue_len => non_neg_integer(),
|
||||
max_heap_size => non_neg_integer()
|
||||
}).
|
||||
|
||||
|
|
|
@ -45,6 +45,11 @@
|
|||
, terminate/3
|
||||
]).
|
||||
|
||||
-import(emqx_misc,
|
||||
[ maybe_apply/2
|
||||
, start_timer/2
|
||||
]).
|
||||
|
||||
-record(state, {
|
||||
%% Peername of the ws connection.
|
||||
peername :: emqx_types:peername(),
|
||||
|
@ -52,24 +57,41 @@
|
|||
sockname :: emqx_types:peername(),
|
||||
%% Sock state
|
||||
sockstate :: emqx_types:sockstate(),
|
||||
%% Parser State
|
||||
%% Simulate the active_n opt
|
||||
active_n :: pos_integer(),
|
||||
%% Limiter
|
||||
limiter :: emqx_limiter:limiter(),
|
||||
%% Limit Timer
|
||||
limit_timer :: maybe(reference()),
|
||||
%% Parse State
|
||||
parse_state :: emqx_frame:parse_state(),
|
||||
%% Serialize function
|
||||
serialize :: emqx_frame:serialize_fun(),
|
||||
%% Channel
|
||||
channel :: emqx_channel:channel(),
|
||||
%% GC State
|
||||
gc_state :: maybe(emqx_gc:gc_state()),
|
||||
%% Out Pending Packets
|
||||
pendings :: list(emqx_types:packet()),
|
||||
%% Stats Timer
|
||||
stats_timer :: disabled | maybe(reference()),
|
||||
%% Idle Timeout
|
||||
idle_timeout :: timeout(),
|
||||
%% Idle Timer
|
||||
idle_timer :: reference(),
|
||||
%% The stop reason
|
||||
stop_reason :: term()
|
||||
}).
|
||||
|
||||
-type(state() :: #state{}).
|
||||
|
||||
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]).
|
||||
-define(ACTIVE_N, 100).
|
||||
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n, limiter]).
|
||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
||||
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
|
||||
|
||||
-define(ENABLED(X), (X =/= undefined)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -92,11 +114,20 @@ info(sockname, #state{sockname = Sockname}) ->
|
|||
Sockname;
|
||||
info(sockstate, #state{sockstate = SockSt}) ->
|
||||
SockSt;
|
||||
info(active_n, #state{active_n = ActiveN}) ->
|
||||
ActiveN;
|
||||
info(limiter, #state{limiter = Limiter}) ->
|
||||
maybe_apply(fun emqx_limiter:info/1, Limiter);
|
||||
info(channel, #state{channel = Channel}) ->
|
||||
emqx_channel:info(Channel);
|
||||
info(stop_reason, #state{stop_reason = Reason}) ->
|
||||
Reason.
|
||||
|
||||
attrs(State = #state{channel = Channel}) ->
|
||||
ChanAttrs = emqx_channel:attrs(Channel),
|
||||
SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
|
||||
maps:merge(ChanAttrs, #{sockinfo => SockAttrs}).
|
||||
|
||||
-spec(stats(pid()|state()) -> emqx_types:stats()).
|
||||
stats(WsPid) when is_pid(WsPid) ->
|
||||
call(WsPid, stats);
|
||||
|
@ -128,6 +159,7 @@ call(WsPid, Req) when is_pid(WsPid) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
init(Req, Opts) ->
|
||||
%% WS Transport Idle Timeout
|
||||
IdleTimeout = proplists:get_value(idle_timeout, Opts, 7200000),
|
||||
DeflateOptions = maps:from_list(proplists:get_value(deflate_options, Opts, [])),
|
||||
MaxFrameSize = case proplists:get_value(max_frame_size, Opts, 0) of
|
||||
|
@ -174,29 +206,41 @@ websocket_init([Req, Opts]) ->
|
|||
conn_mod => ?MODULE
|
||||
},
|
||||
Zone = proplists:get_value(zone, Opts),
|
||||
FrameOpts = emqx_zone:frame_options(Zone),
|
||||
ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N),
|
||||
Limiter = emqx_limiter:init(Opts),
|
||||
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
|
||||
ParseState = emqx_frame:initial_parse_state(FrameOpts),
|
||||
Serialize = emqx_frame:serialize_fun(),
|
||||
Channel = emqx_channel:init(ConnInfo, Opts),
|
||||
GcState = emqx_zone:init_gc_state(Zone),
|
||||
StatsTimer = emqx_zone:stats_timer(Zone),
|
||||
%% MQTT Idle Timeout
|
||||
IdleTimeout = emqx_zone:idle_timeout(Zone),
|
||||
IdleTimer = start_timer(IdleTimeout, idle_timeout),
|
||||
emqx_misc:tune_heap_size(emqx_zone:oom_policy(Zone)),
|
||||
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
|
||||
{ok, #state{peername = Peername,
|
||||
sockname = Sockname,
|
||||
sockstate = idle,
|
||||
parse_state = ParseState,
|
||||
serialize = Serialize,
|
||||
channel = Channel,
|
||||
pendings = []
|
||||
}}.
|
||||
{ok, #state{peername = Peername,
|
||||
sockname = Sockname,
|
||||
sockstate = running,
|
||||
active_n = ActiveN,
|
||||
limiter = Limiter,
|
||||
parse_state = ParseState,
|
||||
serialize = Serialize,
|
||||
channel = Channel,
|
||||
gc_state = GcState,
|
||||
pendings = [],
|
||||
stats_timer = StatsTimer,
|
||||
idle_timeout = IdleTimeout,
|
||||
idle_timer = IdleTimer
|
||||
}, hibernate}.
|
||||
|
||||
websocket_handle({binary, Data}, State) when is_list(Data) ->
|
||||
websocket_handle({binary, iolist_to_binary(Data)}, State);
|
||||
|
||||
websocket_handle({binary, Data}, State = #state{channel = Channel}) ->
|
||||
websocket_handle({binary, Data}, State) ->
|
||||
?LOG(debug, "RECV ~p", [Data]),
|
||||
Oct = iolist_size(Data),
|
||||
ok = inc_recv_stats(1, Oct),
|
||||
NChannel = emqx_channel:recvd(Oct, Channel),
|
||||
parse_incoming(Data, State#state{channel = NChannel});
|
||||
ok = inc_recv_stats(1, iolist_size(Data)),
|
||||
parse_incoming(Data, ensure_stats_timer(State));
|
||||
|
||||
%% Pings should be replied with pongs, cowboy does it automatically
|
||||
%% Pongs can be safely ignored. Clause here simply prevents crash.
|
||||
|
@ -215,30 +259,43 @@ websocket_info({call, From, Req}, State) ->
|
|||
handle_call(From, Req, State);
|
||||
|
||||
websocket_info({cast, Msg}, State = #state{channel = Channel}) ->
|
||||
handle_return(emqx_channel:handle_info(Msg, Channel), State);
|
||||
handle_chan_return(emqx_channel:handle_info(Msg, Channel), State);
|
||||
|
||||
websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
|
||||
websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
|
||||
State = #state{idle_timer = IdleTimer}) ->
|
||||
ok = emqx_misc:cancel_timer(IdleTimer),
|
||||
Serialize = emqx_frame:serialize_fun(ConnPkt),
|
||||
NState = State#state{sockstate = running,
|
||||
serialize = Serialize
|
||||
NState = State#state{serialize = Serialize,
|
||||
idle_timer = undefined
|
||||
},
|
||||
handle_incoming(Packet, NState);
|
||||
|
||||
websocket_info({incoming, ?PACKET(?PINGREQ)}, State) ->
|
||||
reply(?PACKET(?PINGRESP), State);
|
||||
|
||||
websocket_info({incoming, Packet}, State) ->
|
||||
handle_incoming(Packet, State);
|
||||
|
||||
websocket_info(Deliver = {deliver, _Topic, _Msg},
|
||||
State = #state{channel = Channel}) ->
|
||||
Delivers = emqx_misc:drain_deliver([Deliver]),
|
||||
Result = emqx_channel:handle_out(Delivers, Channel),
|
||||
handle_return(Result, State);
|
||||
websocket_info(rate_limit, State) ->
|
||||
InStats = #{cnt => emqx_pd:reset_counter(incoming_pubs),
|
||||
oct => emqx_pd:reset_counter(incoming_bytes)
|
||||
},
|
||||
erlang:send(self(), {check_gc, InStats}),
|
||||
ensure_rate_limit(InStats, State);
|
||||
|
||||
websocket_info({timeout, TRef, keepalive}, State) when is_reference(TRef) ->
|
||||
RecvOct = emqx_pd:get_counter(recv_oct),
|
||||
handle_timeout(TRef, {keepalive, RecvOct}, State);
|
||||
websocket_info({check_gc, Stats}, State) ->
|
||||
{ok, check_oom(run_gc(Stats, State))};
|
||||
|
||||
websocket_info({timeout, TRef, emit_stats}, State) when is_reference(TRef) ->
|
||||
handle_timeout(TRef, {emit_stats, stats(State)}, State);
|
||||
websocket_info({deliver, _Topic, _Msg} = Deliver, State = #state{channel = Channel}) ->
|
||||
Delivers = [Deliver|emqx_misc:drain_deliver()],
|
||||
Ret = emqx_channel:handle_out(Delivers, Channel),
|
||||
handle_chan_return(Ret, State);
|
||||
|
||||
websocket_info({timeout, TRef, limit_timeout}, State = #state{limit_timer = TRef}) ->
|
||||
NState = State#state{sockstate = running,
|
||||
limit_timer = undefined
|
||||
},
|
||||
{reply, [{active, true}], NState};
|
||||
|
||||
websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
|
||||
handle_timeout(TRef, Msg, State);
|
||||
|
@ -293,27 +350,89 @@ handle_call(From, Req, State = #state{channel = Channel}) ->
|
|||
%% Handle Info
|
||||
|
||||
handle_info({connack, ConnAck}, State = #state{channel = Channel}) ->
|
||||
ChanAttrs = emqx_channel:attrs(Channel),
|
||||
SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
|
||||
Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
|
||||
ok = emqx_channel:handle_info({register, Attrs, stats(State)}, Channel),
|
||||
ClientId = emqx_channel:info(clientid, Channel),
|
||||
ok = emqx_cm:register_channel(ClientId),
|
||||
ok = emqx_cm:set_chan_attrs(ClientId, attrs(State)),
|
||||
ok = emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||
reply(enqueue(ConnAck, State));
|
||||
|
||||
handle_info({enter, disconnected}, State = #state{channel = Channel}) ->
|
||||
ChanAttrs = emqx_channel:attrs(Channel),
|
||||
SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
|
||||
Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
|
||||
ok = emqx_channel:handle_info({register, Attrs, stats(State)}, Channel),
|
||||
ClientId = emqx_channel:info(clientid, Channel),
|
||||
emqx_cm:set_chan_attrs(ClientId, attrs(State)),
|
||||
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||
reply(State);
|
||||
|
||||
handle_info(Info, State = #state{channel = Channel}) ->
|
||||
handle_return(emqx_channel:handle_info(Info, Channel), State).
|
||||
Ret = emqx_channel:handle_info(Info, Channel),
|
||||
handle_chan_return(Ret, State).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle timeout
|
||||
|
||||
handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
|
||||
handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
|
||||
handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
|
||||
shutdown(idle_timeout, State);
|
||||
|
||||
handle_timeout(TRef, keepalive, State) when is_reference(TRef) ->
|
||||
RecvOct = emqx_pd:get_counter(recv_oct),
|
||||
handle_timeout(TRef, {keepalive, RecvOct}, State);
|
||||
|
||||
handle_timeout(TRef, emit_stats, State = #state{channel = Channel,
|
||||
stats_timer = TRef}) ->
|
||||
ClientId = emqx_channel:info(clientid, Channel),
|
||||
(ClientId =/= undefined) andalso emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||
reply(State#state{stats_timer = undefined});
|
||||
|
||||
handle_timeout(TRef, TMsg, State = #state{channel = Channel}) ->
|
||||
Ret = emqx_channel:handle_timeout(TRef, TMsg, Channel),
|
||||
handle_chan_return(Ret, State).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Ensure stats timer
|
||||
|
||||
-compile({inline, [ensure_stats_timer/1]}).
|
||||
ensure_stats_timer(State = #state{idle_timeout = Timeout,
|
||||
stats_timer = undefined}) ->
|
||||
State#state{stats_timer = start_timer(Timeout, emit_stats)};
|
||||
ensure_stats_timer(State) -> State.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Ensure rate limit
|
||||
|
||||
ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
||||
case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of
|
||||
false -> {ok, State};
|
||||
{ok, Limiter1} ->
|
||||
{ok, State#state{limiter = Limiter1}};
|
||||
{pause, Time, Limiter1} ->
|
||||
?LOG(debug, "Pause ~pms due to rate limit", [Time]),
|
||||
TRef = start_timer(Time, limit_timeout),
|
||||
NState = State#state{sockstate = blocked,
|
||||
limiter = Limiter1,
|
||||
limit_timer = TRef
|
||||
},
|
||||
{reply, [{active, false}], NState}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Run GC and Check OOM
|
||||
|
||||
run_gc(Stats, State = #state{gc_state = GcSt}) ->
|
||||
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
|
||||
false -> State;
|
||||
{IsGC, GcSt1} ->
|
||||
IsGC andalso emqx_metrics:inc('channel.gc.cnt'),
|
||||
State#state{gc_state = GcSt1}
|
||||
end.
|
||||
|
||||
check_oom(State = #state{channel = Channel}) ->
|
||||
#{zone := Zone} = emqx_channel:info(clientinfo, Channel),
|
||||
OomPolicy = emqx_zone:oom_policy(Zone),
|
||||
case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
|
||||
Shutdown = {shutdown, _Reason} ->
|
||||
erlang:send(self(), Shutdown);
|
||||
_Other -> ok
|
||||
end,
|
||||
State.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Parse incoming data
|
||||
|
@ -326,7 +445,7 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
|
|||
{more, NParseState} ->
|
||||
{ok, State#state{parse_state = NParseState}};
|
||||
{ok, Packet, Rest, NParseState} ->
|
||||
self() ! {incoming, Packet},
|
||||
erlang:send(self(), {incoming, Packet}),
|
||||
parse_incoming(Rest, State#state{parse_state = NParseState})
|
||||
catch
|
||||
error:Reason:Stk ->
|
||||
|
@ -337,52 +456,60 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
|
|||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle incoming packets
|
||||
%% Handle incoming packet
|
||||
|
||||
handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) ->
|
||||
_ = inc_incoming_stats(Type),
|
||||
_ = emqx_metrics:inc_recv(Packet),
|
||||
handle_incoming(Packet, State = #state{active_n = ActiveN, channel = Channel})
|
||||
when is_record(Packet, mqtt_packet) ->
|
||||
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
|
||||
handle_return(emqx_channel:handle_in(Packet, Channel), State);
|
||||
ok = inc_incoming_stats(Packet),
|
||||
(emqx_pd:get_counter(incoming_pubs) > ActiveN)
|
||||
andalso erlang:send(self(), rate_limit),
|
||||
Ret = emqx_channel:handle_in(Packet, Channel),
|
||||
handle_chan_return(Ret, State);
|
||||
|
||||
handle_incoming(FrameError, State = #state{channel = Channel}) ->
|
||||
handle_return(emqx_channel:handle_in(FrameError, Channel), State).
|
||||
handle_chan_return(emqx_channel:handle_in(FrameError, Channel), State).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle channel return
|
||||
|
||||
handle_return(ok, State) ->
|
||||
handle_chan_return(ok, State) ->
|
||||
reply(State);
|
||||
handle_return({ok, NChannel}, State) ->
|
||||
handle_chan_return({ok, NChannel}, State) ->
|
||||
reply(State#state{channel= NChannel});
|
||||
handle_return({ok, Replies, NChannel}, State) ->
|
||||
handle_chan_return({ok, Replies, NChannel}, State) ->
|
||||
reply(Replies, State#state{channel= NChannel});
|
||||
handle_return({shutdown, Reason, NChannel}, State) ->
|
||||
handle_chan_return({shutdown, Reason, NChannel}, State) ->
|
||||
stop(Reason, State#state{channel = NChannel});
|
||||
handle_return({shutdown, Reason, OutPacket, NChannel}, State) ->
|
||||
handle_chan_return({shutdown, Reason, OutPacket, NChannel}, State) ->
|
||||
NState = State#state{channel = NChannel},
|
||||
stop(Reason, enqueue(OutPacket, NState)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle outgoing packets
|
||||
|
||||
handle_outgoing(Packets, State = #state{channel = Channel}) ->
|
||||
handle_outgoing(Packets, State = #state{active_n = ActiveN}) ->
|
||||
IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
|
||||
Oct = iolist_size(IoData),
|
||||
ok = inc_sent_stats(length(Packets), Oct),
|
||||
NChannel = emqx_channel:sent(Oct, Channel),
|
||||
{{binary, IoData}, State#state{channel = NChannel}}.
|
||||
case emqx_pd:get_counter(outgoing_pubs) > ActiveN of
|
||||
true ->
|
||||
OutStats = #{cnt => emqx_pd:reset_counter(outgoing_pubs),
|
||||
oct => emqx_pd:reset_counter(outgoing_bytes)
|
||||
},
|
||||
erlang:send(self(), {check_gc, OutStats});
|
||||
false -> ok
|
||||
end,
|
||||
{{binary, IoData}, ensure_stats_timer(State)}.
|
||||
|
||||
%% TODO: Duplicated with emqx_channel:serialize_and_inc_stats_fun/1
|
||||
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
||||
fun(Packet = ?PACKET(Type)) ->
|
||||
fun(Packet) ->
|
||||
case Serialize(Packet) of
|
||||
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
|
||||
[emqx_packet:format(Packet)]),
|
||||
<<>>;
|
||||
Data -> _ = inc_outgoing_stats(Type),
|
||||
_ = emqx_metrics:inc_sent(Packet),
|
||||
?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
|
||||
Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
|
||||
ok = inc_outgoing_stats(Packet),
|
||||
Data
|
||||
end
|
||||
end.
|
||||
|
@ -398,23 +525,33 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|||
]}).
|
||||
|
||||
inc_recv_stats(Cnt, Oct) ->
|
||||
emqx_pd:update_counter(recv_cnt, Cnt),
|
||||
emqx_pd:update_counter(recv_oct, Oct),
|
||||
emqx_pd:inc_counter(incoming_bytes, Oct),
|
||||
emqx_pd:inc_counter(recv_cnt, Cnt),
|
||||
emqx_pd:inc_counter(recv_oct, Oct),
|
||||
emqx_metrics:inc('bytes.received', Oct).
|
||||
|
||||
inc_incoming_stats(Type) ->
|
||||
emqx_pd:update_counter(recv_pkt, 1),
|
||||
(Type == ?PUBLISH)
|
||||
andalso emqx_pd:update_counter(recv_msg, 1).
|
||||
inc_incoming_stats(Packet = ?PACKET(Type)) ->
|
||||
emqx_pd:inc_counter(recv_pkt, 1),
|
||||
if Type == ?PUBLISH ->
|
||||
emqx_pd:inc_counter(recv_msg, 1),
|
||||
emqx_pd:inc_counter(incoming_pubs, 1);
|
||||
true -> ok
|
||||
end,
|
||||
emqx_metrics:inc_recv(Packet).
|
||||
|
||||
inc_outgoing_stats(Type) ->
|
||||
emqx_pd:update_counter(send_pkt, 1),
|
||||
(Type == ?PUBLISH)
|
||||
andalso emqx_pd:update_counter(send_msg, 1).
|
||||
inc_outgoing_stats(Packet = ?PACKET(Type)) ->
|
||||
emqx_pd:inc_counter(send_pkt, 1),
|
||||
if Type == ?PUBLISH ->
|
||||
emqx_pd:inc_counter(send_msg, 1),
|
||||
emqx_pd:inc_counter(outgoing_pubs, 1);
|
||||
true -> ok
|
||||
end,
|
||||
emqx_metrics:inc_sent(Packet).
|
||||
|
||||
inc_sent_stats(Cnt, Oct) ->
|
||||
emqx_pd:update_counter(send_cnt, Cnt),
|
||||
emqx_pd:update_counter(send_oct, Oct),
|
||||
emqx_pd:inc_counter(outgoing_bytes, Oct),
|
||||
emqx_pd:inc_counter(send_cnt, Cnt),
|
||||
emqx_pd:inc_counter(send_oct, Oct),
|
||||
emqx_metrics:inc('bytes.sent', Oct).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -451,6 +588,9 @@ enqueue(Packet, State) when is_record(Packet, mqtt_packet) ->
|
|||
enqueue(Packets, State = #state{pendings = Pendings}) ->
|
||||
State#state{pendings = lists:append(Pendings, Packets)}.
|
||||
|
||||
shutdown(Reason, State) ->
|
||||
stop({shutdown, Reason}, State).
|
||||
|
||||
stop(Reason, State = #state{pendings = []}) ->
|
||||
{stop, State#state{stop_reason = Reason}};
|
||||
stop(Reason, State = #state{pendings = Pendings}) ->
|
||||
|
|
|
@ -25,27 +25,58 @@
|
|||
|
||||
-logger_header("[Zone]").
|
||||
|
||||
-compile({inline,
|
||||
[ idle_timeout/1
|
||||
, publish_limit/1
|
||||
, mqtt_frame_options/1
|
||||
, mqtt_strict_mode/1
|
||||
, max_packet_size/1
|
||||
, mountpoint/1
|
||||
, use_username_as_clientid/1
|
||||
, stats_timer/1
|
||||
, enable_stats/1
|
||||
, enable_acl/1
|
||||
, enable_ban/1
|
||||
, enable_flapping_detect/1
|
||||
, ignore_loop_deliver/1
|
||||
, server_keepalive/1
|
||||
, keepalive_backoff/1
|
||||
, max_inflight/1
|
||||
, session_expiry_interval/1
|
||||
, force_gc_policy/1
|
||||
, force_shutdown_policy/1
|
||||
]}).
|
||||
|
||||
%% APIs
|
||||
-export([start_link/0, stop/0]).
|
||||
|
||||
-export([ frame_options/1
|
||||
%% Zone Option API
|
||||
-export([ idle_timeout/1
|
||||
, publish_limit/1
|
||||
, mqtt_frame_options/1
|
||||
, mqtt_strict_mode/1
|
||||
, max_packet_size/1
|
||||
, mountpoint/1
|
||||
, use_username_as_clientid/1
|
||||
, stats_timer/1
|
||||
, enable_stats/1
|
||||
, enable_acl/1
|
||||
, enable_ban/1
|
||||
, enable_flapping_detect/1
|
||||
, ignore_loop_deliver/1
|
||||
, server_keepalive/1
|
||||
, keepalive_backoff/1
|
||||
, max_inflight/1
|
||||
, session_expiry_interval/1
|
||||
, force_gc_policy/1
|
||||
, force_shutdown_policy/1
|
||||
]).
|
||||
|
||||
-export([check_oom/2]).
|
||||
-export([ init_gc_state/1
|
||||
, oom_policy/1
|
||||
]).
|
||||
|
||||
%% Zone API
|
||||
-export([ get_env/2
|
||||
, get_env/3
|
||||
, set_env/3
|
||||
|
@ -64,27 +95,46 @@
|
|||
, code_change/3
|
||||
]).
|
||||
|
||||
-export_type([zone/0]).
|
||||
-import(emqx_misc, [maybe_apply/2]).
|
||||
|
||||
%% dummy state
|
||||
-record(state, {}).
|
||||
-export_type([zone/0]).
|
||||
|
||||
-type(zone() :: atom()).
|
||||
|
||||
-define(TAB, ?MODULE).
|
||||
-define(SERVER, ?MODULE).
|
||||
-define(DEFAULT_IDLE_TIMEOUT, 30000).
|
||||
-define(KEY(Zone, Key), {?MODULE, Zone, Key}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(start_link() -> startlink_ret()).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
-spec(frame_options(zone()) -> emqx_frame:options()).
|
||||
frame_options(Zone) ->
|
||||
-spec(stop() -> ok).
|
||||
stop() ->
|
||||
gen_server:stop(?SERVER).
|
||||
|
||||
-spec(init_gc_state(zone()) -> emqx_gc:gc_state()).
|
||||
init_gc_state(Zone) ->
|
||||
maybe_apply(fun emqx_gc:init/1, force_gc_policy(Zone)).
|
||||
|
||||
-spec(oom_policy(zone()) -> emqx_types:oom_policy()).
|
||||
oom_policy(Zone) -> force_shutdown_policy(Zone).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Zone Options API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(idle_timeout(zone()) -> pos_integer()).
|
||||
idle_timeout(Zone) ->
|
||||
get_env(Zone, idle_timeout, ?DEFAULT_IDLE_TIMEOUT).
|
||||
|
||||
-spec(publish_limit(zone()) -> maybe(esockd_rate_limit:config())).
|
||||
publish_limit(Zone) ->
|
||||
get_env(Zone, publish_limit).
|
||||
|
||||
-spec(mqtt_frame_options(zone()) -> emqx_frame:options()).
|
||||
mqtt_frame_options(Zone) ->
|
||||
#{strict_mode => mqtt_strict_mode(Zone),
|
||||
max_size => max_packet_size(Zone)
|
||||
}.
|
||||
|
@ -97,10 +147,17 @@ mqtt_strict_mode(Zone) ->
|
|||
max_packet_size(Zone) ->
|
||||
get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE).
|
||||
|
||||
-spec(mountpoint(zone()) -> maybe(emqx_mountpoint:mountpoint())).
|
||||
mountpoint(Zone) -> get_env(Zone, mountpoint).
|
||||
|
||||
-spec(use_username_as_clientid(zone()) -> boolean()).
|
||||
use_username_as_clientid(Zone) ->
|
||||
get_env(Zone, use_username_as_clientid, false).
|
||||
|
||||
-spec(stats_timer(zone()) -> undefined | disabled).
|
||||
stats_timer(Zone) ->
|
||||
case enable_stats(Zone) of true -> undefined; false -> disabled end.
|
||||
|
||||
-spec(enable_stats(zone()) -> boolean()).
|
||||
enable_stats(Zone) ->
|
||||
get_env(Zone, enable_stats, true).
|
||||
|
@ -125,6 +182,10 @@ ignore_loop_deliver(Zone) ->
|
|||
server_keepalive(Zone) ->
|
||||
get_env(Zone, server_keepalive).
|
||||
|
||||
-spec(keepalive_backoff(zone()) -> float()).
|
||||
keepalive_backoff(Zone) ->
|
||||
get_env(Zone, keepalive_backoff, 0.75).
|
||||
|
||||
-spec(max_inflight(zone()) -> 0..65535).
|
||||
max_inflight(Zone) ->
|
||||
get_env(Zone, max_inflight, 65535).
|
||||
|
@ -141,18 +202,9 @@ force_gc_policy(Zone) ->
|
|||
force_shutdown_policy(Zone) ->
|
||||
get_env(Zone, force_shutdown_policy).
|
||||
|
||||
-spec(check_oom(zone(), fun()) -> ok | term()).
|
||||
check_oom(Zone, Action) ->
|
||||
case emqx_zone:force_shutdown_policy(Zone) of
|
||||
undefined -> ok;
|
||||
Policy -> do_check_oom(emqx_oom:init(Policy), Action)
|
||||
end.
|
||||
|
||||
do_check_oom(OomPolicy, Action) ->
|
||||
case emqx_oom:check(OomPolicy) of
|
||||
ok -> ok;
|
||||
Shutdown -> Action(Shutdown)
|
||||
end.
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(get_env(maybe(zone()), atom()) -> maybe(term())).
|
||||
get_env(undefined, Key) -> emqx:get_env(Key);
|
||||
|
@ -185,17 +237,13 @@ unset_all_env() ->
|
|||
force_reload() ->
|
||||
gen_server:call(?SERVER, force_reload).
|
||||
|
||||
-spec(stop() -> ok).
|
||||
stop() ->
|
||||
gen_server:stop(?SERVER, normal, infinity).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
_ = do_reload(),
|
||||
{ok, #state{}}.
|
||||
{ok, #{}}.
|
||||
|
||||
handle_call(force_reload, _From, State) ->
|
||||
_ = do_reload(),
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_time_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
t_seed(_) ->
|
||||
?assert(is_tuple(emqx_time:seed())).
|
||||
|
||||
t_now_secs(_) ->
|
||||
?assert(emqx_time:now_secs() =< emqx_time:now_secs(os:timestamp())).
|
||||
|
||||
t_now_ms(_) ->
|
||||
?assert(emqx_time:now_ms() =< emqx_time:now_ms(os:timestamp())).
|
||||
|
Loading…
Reference in New Issue