Compare commits
23 Commits
master
...
dev/e4.2.7
Author | SHA1 | Date |
---|---|---|
![]() |
362c157418 | |
![]() |
743f44af17 | |
![]() |
ec7b48ee3e | |
![]() |
ca9b90be9a | |
![]() |
e02704e05a | |
![]() |
5cc21dc3f0 | |
![]() |
806cd33d1f | |
![]() |
f8ca066d74 | |
![]() |
59b7e8e3ba | |
![]() |
571219f073 | |
![]() |
9084554f4f | |
![]() |
1b179c8e2a | |
![]() |
fb8f5b79ad | |
![]() |
e427b0ff75 | |
![]() |
b4e3c32e24 | |
![]() |
c511e324bb | |
![]() |
1eb0f7b3b2 | |
![]() |
7d003e0bfc | |
![]() |
927264d793 | |
![]() |
70da59c3bb | |
![]() |
c69e1c6222 | |
![]() |
6666210211 | |
![]() |
238eaa8e40 |
|
@ -296,6 +296,16 @@ broker.shared_dispatch_ack_enabled = false
|
||||||
## Value: Flag
|
## Value: Flag
|
||||||
broker.route_batch_clean = off
|
broker.route_batch_clean = off
|
||||||
|
|
||||||
|
## Performance toggle for subscribe/unsubscribe wildcard topic.
|
||||||
|
## Change this toggle only when there are many wildcard topics.
|
||||||
|
## Value: Enum
|
||||||
|
## - key: mnesia translational updates with per-key locks. recommended for single node setup.
|
||||||
|
## - tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
|
||||||
|
## - global: global lock protected updates. recommended for larger cluster.
|
||||||
|
## NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
|
||||||
|
## to be stopped before the change.
|
||||||
|
broker.perf.route_lock_type = key
|
||||||
|
|
||||||
##-------------------------------------------------------------------
|
##-------------------------------------------------------------------
|
||||||
## Plugins
|
## Plugins
|
||||||
##-------------------------------------------------------------------
|
##-------------------------------------------------------------------
|
||||||
|
|
|
@ -544,6 +544,16 @@ listener.ws.external.verify_protocol_header = on
|
||||||
## Value: Duration
|
## Value: Duration
|
||||||
## listener.ws.external.proxy_protocol_timeout = 3s
|
## listener.ws.external.proxy_protocol_timeout = 3s
|
||||||
|
|
||||||
|
## See: listener.ssl.$name.peer_cert_as_username
|
||||||
|
##
|
||||||
|
## Value: cn
|
||||||
|
## listener.ws.external.peer_cert_as_username = cn
|
||||||
|
|
||||||
|
## See: listener.ssl.$name.peer_cert_as_clientid
|
||||||
|
##
|
||||||
|
## Value: cn
|
||||||
|
## listener.ws.external.peer_cert_as_clientid = cn
|
||||||
|
|
||||||
## The TCP backlog of external MQTT/WebSocket Listener.
|
## The TCP backlog of external MQTT/WebSocket Listener.
|
||||||
##
|
##
|
||||||
## See: listener.ws.$name.backlog
|
## See: listener.ws.$name.backlog
|
||||||
|
|
|
@ -10,7 +10,7 @@
|
||||||
## - file: write logs only to file
|
## - file: write logs only to file
|
||||||
## - console: write logs only to standard I/O
|
## - console: write logs only to standard I/O
|
||||||
## - both: write logs both to file and standard I/O
|
## - both: write logs both to file and standard I/O
|
||||||
log.to = both
|
log.to = file
|
||||||
|
|
||||||
## The log severity level.
|
## The log severity level.
|
||||||
##
|
##
|
||||||
|
@ -167,4 +167,4 @@ log.rotation.count = 5
|
||||||
## Value: MaxBurstCount,TimeWindow
|
## Value: MaxBurstCount,TimeWindow
|
||||||
## Default: disabled
|
## Default: disabled
|
||||||
##
|
##
|
||||||
#log.burst_limit = 20000, 1s
|
#log.burst_limit = 20000, 1s
|
||||||
|
|
|
@ -184,6 +184,30 @@ zone.external.enable_flapping_detect = off
|
||||||
## Example: 100KB incoming per 10 seconds.
|
## Example: 100KB incoming per 10 seconds.
|
||||||
#zone.external.rate_limit.conn_bytes_in = 100KB,10s
|
#zone.external.rate_limit.conn_bytes_in = 100KB,10s
|
||||||
|
|
||||||
|
## Whether to alarm the congested connections.
|
||||||
|
##
|
||||||
|
## Sometimes the mqtt connection (usually an MQTT subscriber) may get "congested" because
|
||||||
|
## there're too many packets to sent. The socket trys to buffer the packets until the buffer is
|
||||||
|
## full. If more packets comes after that, the packets will be "pending" in a queue
|
||||||
|
## and we consider the connection is "congested".
|
||||||
|
##
|
||||||
|
## Enable this to send an alarm when there's any bytes pending in the queue. You could set
|
||||||
|
## the `listener.tcp.<ZoneName>.sndbuf` to a larger value if the alarm is triggered too often.
|
||||||
|
##
|
||||||
|
## The name of the alarm is of format "conn_congestion/<ClientID>/<Username>".
|
||||||
|
## Where the <ClientID> is the client-id of the congested MQTT connection.
|
||||||
|
## And the <Username> is the username or "unknown_user" of not provided by the client.
|
||||||
|
## Default: off
|
||||||
|
#zone.external.conn_congestion.alarm = off
|
||||||
|
|
||||||
|
## Won't clear the congested alarm in how long time.
|
||||||
|
## The alarm is cleared only when there're no pending bytes in the queue, and also it has been
|
||||||
|
## `min_alarm_sustain_duration` time since the last time we considered the connection is "congested".
|
||||||
|
##
|
||||||
|
## This is to avoid clearing and sending the alarm again too often.
|
||||||
|
## Default: 1m
|
||||||
|
#zone.external.conn_congestion.min_alarm_sustain_duration = 1m
|
||||||
|
|
||||||
## Messages quota for the each of external MQTT connection.
|
## Messages quota for the each of external MQTT connection.
|
||||||
## This value consumed by the number of recipient on a message.
|
## This value consumed by the number of recipient on a message.
|
||||||
##
|
##
|
||||||
|
|
|
@ -999,6 +999,16 @@ end}.
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
{mapping, "zone.$name.conn_congestion.alarm", "emqx.zones", [
|
||||||
|
{datatype, flag},
|
||||||
|
{default, off}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "zone.$name.conn_congestion.min_alarm_sustain_duration", "emqx.zones", [
|
||||||
|
{default, "1m"},
|
||||||
|
{datatype, {duration, ms}}
|
||||||
|
]}.
|
||||||
|
|
||||||
{mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [
|
{mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
@ -1128,6 +1138,10 @@ end}.
|
||||||
{ratelimit, {conn_messages_in, Ratelimit(Val)}};
|
{ratelimit, {conn_messages_in, Ratelimit(Val)}};
|
||||||
(["rate_limit", "conn_bytes_in"], Val) ->
|
(["rate_limit", "conn_bytes_in"], Val) ->
|
||||||
{ratelimit, {conn_bytes_in, Ratelimit(Val)}};
|
{ratelimit, {conn_bytes_in, Ratelimit(Val)}};
|
||||||
|
(["conn_congestion", "alarm"], Val) ->
|
||||||
|
{conn_congestion_alarm_enabled, Val};
|
||||||
|
(["conn_congestion", "min_alarm_sustain_duration"], Val) ->
|
||||||
|
{conn_congestion_min_alarm_sustain_duration, Val};
|
||||||
(["quota", "conn_messages_routing"], Val) ->
|
(["quota", "conn_messages_routing"], Val) ->
|
||||||
{quota, {conn_messages_routing, Ratelimit(Val)}};
|
{quota, {conn_messages_routing, Ratelimit(Val)}};
|
||||||
(["quota", "overall_messages_routing"], Val) ->
|
(["quota", "overall_messages_routing"], Val) ->
|
||||||
|
@ -1573,7 +1587,11 @@ end}.
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [
|
{mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [
|
||||||
{datatype, {enum, [cn, dn, crt]}}
|
{datatype, {enum, [cn]}}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.ws.$name.peer_cert_as_clientid", "emqx.listeners", [
|
||||||
|
{datatype, {enum, [cn]}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -2008,6 +2026,18 @@ end}.
|
||||||
{datatype, flag}
|
{datatype, flag}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
%% @doc Performance toggle for subscribe/unsubscribe wildcard topic.
|
||||||
|
%% Change this toggle only when there are many wildcard topics.
|
||||||
|
%% key: mnesia translational updates with per-key locks. recommended for single node setup.
|
||||||
|
%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
|
||||||
|
%% global: global lock protected updates. recommended for larger cluster.
|
||||||
|
%% NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
|
||||||
|
%%
|
||||||
|
{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [
|
||||||
|
{default, key},
|
||||||
|
{datatype, {enum, [key, tab, global]}}
|
||||||
|
]}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% System Monitor
|
%% System Monitor
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
|
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
|
||||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
|
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
|
||||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}},
|
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}},
|
||||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.5"}}},
|
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.7"}}},
|
||||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
|
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
|
||||||
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
{add_module, emqx_congestion},
|
{add_module, emqx_congestion},
|
||||||
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
{suspend, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
|
{suspend, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
|
||||||
|
@ -14,12 +15,17 @@
|
||||||
{update, emqx_ws_connection, {advanced, []}},
|
{update, emqx_ws_connection, {advanced, []}},
|
||||||
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
{resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]}
|
{resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
||||||
]},
|
]},
|
||||||
{"4.2.1", [
|
{"4.2.1", [
|
||||||
{add_module, emqx_congestion},
|
{add_module, emqx_congestion},
|
||||||
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
@ -27,15 +33,54 @@
|
||||||
{update, emqx_ws_connection, {advanced, []}},
|
{update, emqx_ws_connection, {advanced, []}},
|
||||||
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]}
|
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.[23]">>, [
|
{<<"4.2.[23]">>, [
|
||||||
{add_module, emqx_congestion},
|
{add_module, emqx_congestion},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
|
{update, emqx_connection, {advanced, []}},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []}
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
||||||
|
]},
|
||||||
|
{<<"4.2.4">>, [
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
||||||
|
{update, emqx_connection, {advanced, []}},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
||||||
|
]},
|
||||||
|
{<<"4.2.5">>, [
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
||||||
|
{update, emqx_connection, {advanced, []}},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
||||||
|
]},
|
||||||
|
{<<"4.2.6">>, [
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
|
@ -51,8 +96,13 @@
|
||||||
{update, emqx_connection, {advanced, []}},
|
{update, emqx_connection, {advanced, []}},
|
||||||
{update, emqx_ws_connection, {advanced, []}},
|
{update, emqx_ws_connection, {advanced, []}},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
||||||
{delete_module, emqx_congestion}
|
{delete_module, emqx_congestion},
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{"4.2.1", [
|
{"4.2.1", [
|
||||||
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
|
@ -60,20 +110,60 @@
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{update, emqx_connection, {advanced, []}},
|
{update, emqx_connection, {advanced, []}},
|
||||||
{update, emqx_ws_connection, {advanced, []}},
|
{update, emqx_ws_connection, {advanced, []}},
|
||||||
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
||||||
{delete_module, emqx_congestion}
|
{delete_module, emqx_congestion},
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.[23]">>, [
|
{<<"4.2.[23]">>, [
|
||||||
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{update, emqx_connection, {advanced, []}},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
{delete_module, emqx_congestion}
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{delete_module, emqx_congestion},
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{<<"4.2.4">>, [
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
||||||
|
{update, emqx_connection, {advanced, []}},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{<<"4.2.5">>, [
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
||||||
|
{update, emqx_connection, {advanced, []}},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{<<"4.2.6">>, [
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
]
|
]
|
||||||
|
|
|
@ -361,7 +361,7 @@ normalize_message(partition, #{occurred := Node}) ->
|
||||||
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
|
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
|
||||||
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
|
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
|
||||||
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
|
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
|
||||||
normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) ->
|
normalize_message(<<"conn_congestion/", Info/binary>>, _) ->
|
||||||
list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info]));
|
list_to_binary(io_lib:format("connection congested: ~s", [Info]));
|
||||||
normalize_message(_Name, _UnknownDetails) ->
|
normalize_message(_Name, _UnknownDetails) ->
|
||||||
<<"Unknown alarm">>.
|
<<"Unknown alarm">>.
|
||||||
|
|
|
@ -419,7 +419,7 @@ safe_update_stats(Tab, Stat, MaxStat) ->
|
||||||
-compile({inline, [call/2, cast/2, pick/1]}).
|
-compile({inline, [call/2, cast/2, pick/1]}).
|
||||||
|
|
||||||
call(Broker, Req) ->
|
call(Broker, Req) ->
|
||||||
gen_server:call(Broker, Req).
|
gen_server:call(Broker, Req, infinity).
|
||||||
|
|
||||||
cast(Broker, Msg) ->
|
cast(Broker, Msg) ->
|
||||||
gen_server:cast(Broker, Msg).
|
gen_server:cast(Broker, Msg).
|
||||||
|
|
|
@ -32,6 +32,8 @@
|
||||||
-export([ info/1
|
-export([ info/1
|
||||||
, info/2
|
, info/2
|
||||||
, set_conn_state/2
|
, set_conn_state/2
|
||||||
|
, get_session/1
|
||||||
|
, set_session/2
|
||||||
, stats/1
|
, stats/1
|
||||||
, caps/1
|
, caps/1
|
||||||
]).
|
]).
|
||||||
|
@ -46,6 +48,12 @@
|
||||||
, terminate/2
|
, terminate/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% Export for emqx_sn
|
||||||
|
-export([ do_deliver/2
|
||||||
|
, ensure_keepalive/2
|
||||||
|
, clear_keepalive/1
|
||||||
|
]).
|
||||||
|
|
||||||
%% Exports for CT
|
%% Exports for CT
|
||||||
-export([set_field/3]).
|
-export([set_field/3]).
|
||||||
|
|
||||||
|
@ -167,6 +175,12 @@ info(timers, #channel{timers = Timers}) -> Timers.
|
||||||
set_conn_state(ConnState, Channel) ->
|
set_conn_state(ConnState, Channel) ->
|
||||||
Channel#channel{conn_state = ConnState}.
|
Channel#channel{conn_state = ConnState}.
|
||||||
|
|
||||||
|
get_session(#channel{session = Session}) ->
|
||||||
|
Session.
|
||||||
|
|
||||||
|
set_session(Session, Channel) ->
|
||||||
|
Channel#channel{session = Session}.
|
||||||
|
|
||||||
%% TODO: Add more stats.
|
%% TODO: Add more stats.
|
||||||
-spec(stats(channel()) -> emqx_types:stats()).
|
-spec(stats(channel()) -> emqx_types:stats()).
|
||||||
stats(#channel{session = Session})->
|
stats(#channel{session = Session})->
|
||||||
|
@ -1501,6 +1515,14 @@ ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}
|
||||||
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
|
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
|
||||||
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
|
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
|
||||||
|
|
||||||
|
clear_keepalive(Channel = #channel{timers = Timers}) ->
|
||||||
|
case maps:get(alive_timer, Timers, undefined) of
|
||||||
|
undefined ->
|
||||||
|
Channel;
|
||||||
|
TRef ->
|
||||||
|
emqx_misc:cancel_timer(TRef),
|
||||||
|
Channel#channel{timers = maps:without([alive_timer], Timers)}
|
||||||
|
end.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Maybe Resume Session
|
%% Maybe Resume Session
|
||||||
|
|
||||||
|
|
|
@ -16,17 +16,16 @@
|
||||||
|
|
||||||
-module(emqx_congestion).
|
-module(emqx_congestion).
|
||||||
|
|
||||||
-export([ maybe_alarm_port_busy/3
|
-export([ maybe_alarm_conn_congestion/3
|
||||||
, maybe_alarm_port_busy/4
|
|
||||||
, maybe_alarm_too_many_publish/5
|
|
||||||
, maybe_alarm_too_many_publish/6
|
|
||||||
, cancel_alarms/3
|
, cancel_alarms/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(ALARM_CONN_CONGEST(Channel, Reason),
|
-define(ALARM_CONN_CONGEST(Channel, Reason),
|
||||||
list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s/~s", [emqx_channel:info(clientid, Channel),
|
list_to_binary(
|
||||||
maps:get(username, emqx_channel:info(clientinfo, Channel), <<"undefined">>),
|
io_lib:format("~s/~s/~s",
|
||||||
Reason]))).
|
[Reason, emqx_channel:info(clientid, Channel),
|
||||||
|
maps:get(username, emqx_channel:info(clientinfo, Channel),
|
||||||
|
<<"unknown_user">>)]))).
|
||||||
|
|
||||||
-define(ALARM_CONN_INFO_KEYS, [
|
-define(ALARM_CONN_INFO_KEYS, [
|
||||||
socktype, sockname, peername, clientid, username, proto_name, proto_ver,
|
socktype, sockname, peername, clientid, username, proto_name, proto_ver,
|
||||||
|
@ -36,44 +35,28 @@
|
||||||
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
|
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
|
||||||
-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]).
|
-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]).
|
||||||
-define(ALARM_SENT(REASON), {alarm_sent, REASON}).
|
-define(ALARM_SENT(REASON), {alarm_sent, REASON}).
|
||||||
-define(ALL_ALARM_REASONS, [port_busy, too_many_publish]).
|
-define(ALL_ALARM_REASONS, [conn_congestion]).
|
||||||
-define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}).
|
-define(WONT_CLEAR_IN, 60000).
|
||||||
-define(CONFIRM_CLEAR_INTERVAL, 10000).
|
|
||||||
|
|
||||||
maybe_alarm_port_busy(Socket, Transport, Channel) ->
|
maybe_alarm_conn_congestion(Socket, Transport, Channel) ->
|
||||||
maybe_alarm_port_busy(Socket, Transport, Channel, false).
|
case is_alarm_enabled(Channel) of
|
||||||
|
false -> ok;
|
||||||
maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) ->
|
true ->
|
||||||
case is_tcp_congested(Socket, Transport) of
|
case is_tcp_congested(Socket, Transport) of
|
||||||
true -> alarm_congestion(Socket, Transport, Channel, port_busy);
|
true -> alarm_congestion(Socket, Transport, Channel, conn_congestion);
|
||||||
false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy,
|
false -> cancel_alarm_congestion(Socket, Transport, Channel, conn_congestion)
|
||||||
ForceClear)
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
|
|
||||||
MaxBatchSize) ->
|
|
||||||
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
|
|
||||||
MaxBatchSize, false).
|
|
||||||
|
|
||||||
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
|
|
||||||
PubMsgCount = _MaxBatchSize, _ForceClear) ->
|
|
||||||
%% we only alarm it when the process is "too busy"
|
|
||||||
alarm_congestion(Socket, Transport, Channel, too_many_publish);
|
|
||||||
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
|
|
||||||
_MaxBatchSize, ForceClear) when PubMsgCount == 0 ->
|
|
||||||
%% but we clear the alarm until it is really "idle", to avoid sending
|
|
||||||
%% alarms and clears too frequently
|
|
||||||
cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish,
|
|
||||||
ForceClear);
|
|
||||||
maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount,
|
|
||||||
_MaxBatchSize, _ForceClear) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
cancel_alarms(Socket, Transport, Channel) ->
|
cancel_alarms(Socket, Transport, Channel) ->
|
||||||
lists:foreach(fun(Reason) ->
|
lists:foreach(fun(Reason) ->
|
||||||
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason)
|
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason)
|
||||||
end, ?ALL_ALARM_REASONS).
|
end, ?ALL_ALARM_REASONS).
|
||||||
|
|
||||||
|
is_alarm_enabled(Channel) ->
|
||||||
|
emqx_zone:get_env(emqx_channel:info(zone, Channel),
|
||||||
|
conn_congestion_alarm_enabled, false).
|
||||||
|
|
||||||
alarm_congestion(Socket, Transport, Channel, Reason) ->
|
alarm_congestion(Socket, Transport, Channel, Reason) ->
|
||||||
case has_alarm_sent(Reason) of
|
case has_alarm_sent(Reason) of
|
||||||
false -> do_alarm_congestion(Socket, Transport, Channel, Reason);
|
false -> do_alarm_congestion(Socket, Transport, Channel, Reason);
|
||||||
|
@ -82,8 +65,11 @@ alarm_congestion(Socket, Transport, Channel, Reason) ->
|
||||||
update_alarm_sent_at(Reason)
|
update_alarm_sent_at(Reason)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) ->
|
cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
|
||||||
case is_alarm_allowed_clear(Reason, ForceClear) of
|
Zone = emqx_channel:info(zone, Channel),
|
||||||
|
WontClearIn = emqx_zone:get_env(Zone, conn_congestion_min_alarm_sustain_duration,
|
||||||
|
?WONT_CLEAR_IN),
|
||||||
|
case has_alarm_sent(Reason) andalso long_time_since_last_alarm(Reason, WontClearIn) of
|
||||||
true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
|
true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
|
||||||
false -> ok
|
false -> ok
|
||||||
end.
|
end.
|
||||||
|
@ -123,13 +109,11 @@ get_alarm_sent_at(Reason) ->
|
||||||
LastSentAt -> LastSentAt
|
LastSentAt -> LastSentAt
|
||||||
end.
|
end.
|
||||||
|
|
||||||
is_alarm_allowed_clear(Reason, _ForceClear = true) ->
|
long_time_since_last_alarm(Reason, WontClearIn) ->
|
||||||
has_alarm_sent(Reason);
|
|
||||||
is_alarm_allowed_clear(Reason, _ForceClear = false) ->
|
|
||||||
%% only sent clears when the alarm was not triggered in the last
|
%% only sent clears when the alarm was not triggered in the last
|
||||||
%% ?CONFIRM_CLEAR_INTERVAL time
|
%% WontClearIn time
|
||||||
case timenow() - get_alarm_sent_at(Reason) of
|
case timenow() - get_alarm_sent_at(Reason) of
|
||||||
Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true;
|
Elapse when Elapse >= WontClearIn -> true;
|
||||||
_ -> false
|
_ -> false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -374,12 +374,8 @@ handle_msg({Passive, _Sock}, State)
|
||||||
handle_info(activate_socket, NState1);
|
handle_info(activate_socket, NState1);
|
||||||
|
|
||||||
handle_msg(Deliver = {deliver, _Topic, _Msg},
|
handle_msg(Deliver = {deliver, _Topic, _Msg},
|
||||||
#state{active_n = MaxBatchSize, transport = Transport,
|
#state{active_n = ActiveN} = State) ->
|
||||||
socket = Socket, channel = Channel} = State) ->
|
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
|
||||||
Delivers0 = emqx_misc:drain_deliver(MaxBatchSize),
|
|
||||||
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
|
|
||||||
length(Delivers0), MaxBatchSize),
|
|
||||||
Delivers = [Deliver|Delivers0],
|
|
||||||
with_channel(handle_deliver, [Delivers], State);
|
with_channel(handle_deliver, [Delivers], State);
|
||||||
|
|
||||||
%% Something sent
|
%% Something sent
|
||||||
|
@ -548,12 +544,9 @@ handle_timeout(_TRef, limit_timeout, State) ->
|
||||||
},
|
},
|
||||||
handle_info(activate_socket, NState);
|
handle_info(activate_socket, NState);
|
||||||
|
|
||||||
handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize,
|
handle_timeout(_TRef, emit_stats, State = #state{channel = Channel, transport = Transport,
|
||||||
channel = Channel, transport = Transport, socket = Socket}) ->
|
socket = Socket}) ->
|
||||||
{_, MsgQLen} = erlang:process_info(self(), message_queue_len),
|
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
|
||||||
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel, true),
|
|
||||||
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
|
|
||||||
MsgQLen, MaxBatchSize, true),
|
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||||
{ok, State#state{stats_timer = undefined}};
|
{ok, State#state{stats_timer = undefined}};
|
||||||
|
@ -666,7 +659,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
|
||||||
Oct = iolist_size(IoData),
|
Oct = iolist_size(IoData),
|
||||||
ok = emqx_metrics:inc('bytes.sent', Oct),
|
ok = emqx_metrics:inc('bytes.sent', Oct),
|
||||||
emqx_pd:inc_counter(outgoing_bytes, Oct),
|
emqx_pd:inc_counter(outgoing_bytes, Oct),
|
||||||
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel),
|
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
|
||||||
case Transport:async_send(Socket, IoData, [nosuspend]) of
|
case Transport:async_send(Socket, IoData, [nosuspend]) of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
Error = {error, _Reason} ->
|
Error = {error, _Reason} ->
|
||||||
|
|
|
@ -141,6 +141,9 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
|
||||||
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
|
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
|
||||||
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
|
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
|
||||||
parse_frame(Rest, Header, 2, Options);
|
parse_frame(Rest, Header, 2, Options);
|
||||||
|
parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options)
|
||||||
|
when Multiplier > 2097152 ->
|
||||||
|
error(malformed_variable_byte_integer);
|
||||||
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
|
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
|
||||||
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
|
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
|
||||||
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
|
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
|
||||||
|
|
|
@ -118,7 +118,8 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_router_helper:monitor(Dest),
|
ok = emqx_router_helper:monitor(Dest),
|
||||||
case emqx_topic:wildcard(Topic) of
|
case emqx_topic:wildcard(Topic) of
|
||||||
true -> trans(fun insert_trie_route/1, [Route]);
|
true ->
|
||||||
|
maybe_trans(fun insert_trie_route/1, [Route]);
|
||||||
false -> insert_direct_route(Route)
|
false -> insert_direct_route(Route)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
@ -164,7 +165,8 @@ do_delete_route(Topic) when is_binary(Topic) ->
|
||||||
do_delete_route(Topic, Dest) ->
|
do_delete_route(Topic, Dest) ->
|
||||||
Route = #route{topic = Topic, dest = Dest},
|
Route = #route{topic = Topic, dest = Dest},
|
||||||
case emqx_topic:wildcard(Topic) of
|
case emqx_topic:wildcard(Topic) of
|
||||||
true -> trans(fun delete_trie_route/1, [Route]);
|
true ->
|
||||||
|
maybe_trans(fun delete_trie_route/1, [Route]);
|
||||||
false -> delete_direct_route(Route)
|
false -> delete_direct_route(Route)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -247,10 +249,59 @@ delete_trie_route(Route = #route{topic = Topic}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
-spec(trans(function(), list(any())) -> ok | {error, term()}).
|
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
|
||||||
trans(Fun, Args) ->
|
maybe_trans(Fun, Args) ->
|
||||||
case mnesia:transaction(Fun, Args) of
|
case persistent_term:get(emqx_route_lock_type, key) of
|
||||||
{atomic, Ok} -> Ok;
|
key ->
|
||||||
{aborted, Reason} -> {error, Reason}
|
trans(Fun, Args);
|
||||||
|
global ->
|
||||||
|
lock_router(),
|
||||||
|
try mnesia:sync_dirty(Fun, Args)
|
||||||
|
after
|
||||||
|
unlock_router()
|
||||||
|
end;
|
||||||
|
tab ->
|
||||||
|
trans(fun() ->
|
||||||
|
emqx_trie:lock_tables(),
|
||||||
|
apply(Fun, Args)
|
||||||
|
end, [])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec(trans(function(), list(any())) -> ok | {error, term()}).
|
||||||
|
trans(Fun, Args) ->
|
||||||
|
%% trigger selective receive optimization of compiler,
|
||||||
|
%% ideal for handling bursty traffic.
|
||||||
|
Ref = erlang:make_ref(),
|
||||||
|
Owner = self(),
|
||||||
|
{WPid, RefMon} = spawn_monitor(
|
||||||
|
fun() ->
|
||||||
|
Res = case mnesia:transaction(Fun, Args) of
|
||||||
|
{atomic, Ok} -> Ok;
|
||||||
|
{aborted, Reason} -> {error, Reason}
|
||||||
|
end,
|
||||||
|
Owner ! {Ref, Res}
|
||||||
|
end),
|
||||||
|
receive
|
||||||
|
{Ref, TransRes} ->
|
||||||
|
receive
|
||||||
|
{'DOWN', RefMon, process, WPid, normal} -> ok
|
||||||
|
end,
|
||||||
|
TransRes;
|
||||||
|
{'DOWN', RefMon, process, WPid, Info} ->
|
||||||
|
{error, {trans_crash, Info}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
lock_router() ->
|
||||||
|
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
|
||||||
|
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
|
||||||
|
case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
|
||||||
|
false ->
|
||||||
|
%% Force to sleep 1ms instead.
|
||||||
|
timer:sleep(1),
|
||||||
|
lock_router();
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
unlock_router() ->
|
||||||
|
global:del_lock({?MODULE, self()}).
|
||||||
|
|
|
@ -34,6 +34,10 @@ init([]) ->
|
||||||
type => worker,
|
type => worker,
|
||||||
modules => [emqx_router_helper]},
|
modules => [emqx_router_helper]},
|
||||||
|
|
||||||
|
ok = persistent_term:put(emqx_route_lock_type,
|
||||||
|
application:get_env(emqx, route_lock_type, key)
|
||||||
|
),
|
||||||
|
|
||||||
%% Router pool
|
%% Router pool
|
||||||
RouterPool = emqx_pool_sup:spec([router_pool, hash,
|
RouterPool = emqx_pool_sup:spec([router_pool, hash,
|
||||||
{emqx_router, start_link, []}]),
|
{emqx_router, start_link, []}]),
|
||||||
|
|
|
@ -75,6 +75,7 @@
|
||||||
|
|
||||||
-export([ deliver/2
|
-export([ deliver/2
|
||||||
, enqueue/2
|
, enqueue/2
|
||||||
|
, dequeue/1
|
||||||
, retry/1
|
, retry/1
|
||||||
, terminate/3
|
, terminate/3
|
||||||
]).
|
]).
|
||||||
|
|
|
@ -31,7 +31,9 @@
|
||||||
, delete/1
|
, delete/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([empty/0]).
|
-export([ empty/0
|
||||||
|
, lock_tables/0
|
||||||
|
]).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
@ -120,6 +122,11 @@ delete(Topic) when is_binary(Topic) ->
|
||||||
empty() ->
|
empty() ->
|
||||||
ets:info(?TRIE_TAB, size) == 0.
|
ets:info(?TRIE_TAB, size) == 0.
|
||||||
|
|
||||||
|
-spec lock_tables() -> ok.
|
||||||
|
lock_tables() ->
|
||||||
|
mnesia:write_lock_table(?TRIE_TAB),
|
||||||
|
mnesia:write_lock_table(?TRIE_NODE_TAB).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -196,15 +196,21 @@ init(Req, Opts) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
websocket_init([Req, Opts]) ->
|
websocket_init([Req, Opts]) ->
|
||||||
Peername = case proplists:get_bool(proxy_protocol, Opts)
|
{Peername, Peercert} =
|
||||||
andalso maps:get(proxy_header, Req) of
|
case proplists:get_bool(proxy_protocol, Opts)
|
||||||
#{src_address := SrcAddr, src_port := SrcPort} ->
|
andalso maps:get(proxy_header, Req) of
|
||||||
{SrcAddr, SrcPort};
|
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
|
||||||
_ ->
|
ProxyName = {SrcAddr, SrcPort},
|
||||||
cowboy_req:peer(Req)
|
%% Notice: Only CN is available in Proxy Protocol V2 additional info
|
||||||
end,
|
ProxySSL = case maps:get(cn, SSL, undefined) of
|
||||||
|
undeined -> nossl;
|
||||||
|
CN -> [{pp2_ssl_cn, CN}]
|
||||||
|
end,
|
||||||
|
{ProxyName, ProxySSL};
|
||||||
|
_ ->
|
||||||
|
{cowboy_req:peer(Req), cowboy_req:cert(Req)}
|
||||||
|
end,
|
||||||
Sockname = cowboy_req:sock(Req),
|
Sockname = cowboy_req:sock(Req),
|
||||||
Peercert = cowboy_req:cert(Req),
|
|
||||||
WsCookie = try cowboy_req:parse_cookies(Req)
|
WsCookie = try cowboy_req:parse_cookies(Req)
|
||||||
catch
|
catch
|
||||||
error:badarg ->
|
error:badarg ->
|
||||||
|
|
|
@ -42,7 +42,8 @@ all() ->
|
||||||
groups() ->
|
groups() ->
|
||||||
[{parse, [parallel],
|
[{parse, [parallel],
|
||||||
[t_parse_cont,
|
[t_parse_cont,
|
||||||
t_parse_frame_too_large
|
t_parse_frame_too_large,
|
||||||
|
t_parse_frame_malformed_variable_byte_integer
|
||||||
]},
|
]},
|
||||||
{connect, [parallel],
|
{connect, [parallel],
|
||||||
[t_serialize_parse_v3_connect,
|
[t_serialize_parse_v3_connect,
|
||||||
|
@ -129,6 +130,12 @@ t_parse_frame_too_large(_) ->
|
||||||
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
|
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
|
||||||
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
|
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
|
||||||
|
|
||||||
|
t_parse_frame_malformed_variable_byte_integer(_) ->
|
||||||
|
MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 4) >>,
|
||||||
|
ParseState = emqx_frame:initial_parse_state(#{}),
|
||||||
|
?catch_error(malformed_variable_byte_integer,
|
||||||
|
emqx_frame:parse(MalformedPayload, ParseState)).
|
||||||
|
|
||||||
t_serialize_parse_v3_connect(_) ->
|
t_serialize_parse_v3_connect(_) ->
|
||||||
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
|
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
|
||||||
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,
|
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,
|
||||||
|
|
|
@ -270,7 +270,7 @@ t_connect_limit_timeout(_) ->
|
||||||
meck:unload(proplists).
|
meck:unload(proplists).
|
||||||
|
|
||||||
t_connect_emit_stats_timeout(_) ->
|
t_connect_emit_stats_timeout(_) ->
|
||||||
IdleTimeout = 2000,
|
IdleTimeout = 2000 + 200,
|
||||||
emqx_zone:set_env(external, idle_timeout, IdleTimeout),
|
emqx_zone:set_env(external, idle_timeout, IdleTimeout),
|
||||||
|
|
||||||
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]),
|
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]),
|
||||||
|
|
Loading…
Reference in New Issue