Compare commits

...

35 Commits

Author SHA1 Message Date
Turtle 7cc8cb4f88 fix(frame): variable byte integer could be larger than 4 bytes 2021-09-27 23:01:37 +08:00
JianBo He 094c8fc48b
Merge branch 'stable/e4.2.7' into dev/e4.2.8 2021-09-27 17:09:34 +08:00
Turtle e86e1e0430 fix(ekka): kill the process if don't release lock 2021-09-27 09:57:23 +08:00
JianBo He 34375c6cc6 chore(appup): update appup.src 2021-09-26 20:10:23 +08:00
JianBo He b9527cfe9d test: increase waiting time to avoid test failure 2021-09-26 20:10:23 +08:00
JianBo He 5f2adb42ed chore: export keepalive funcs for mqtt-sn 2021-09-26 20:10:23 +08:00
JianBo He 7b177a7929 fix: more safe session call 2021-09-26 20:10:23 +08:00
JianBo He aa5d274464 feat: acl.conf support ipaddrs 2021-09-26 20:10:23 +08:00
JianBo He fb65d1c581 chore: add mqtt-sn protocol define 2021-09-26 20:10:23 +08:00
Turtle c0ca7f8bea fix(force_shutdown): cannot suicide if the process hangs up 2021-09-26 18:37:11 +08:00
Zaiming (Stone) Shi 4c30c50490
Merge pull request #5799 from emqx/merge-e4.3.4
feat(mqueue): Interleave messages with different priorities
2021-09-24 11:52:00 +02:00
Turtle 903a9e57a8 feat(mqueue): Interleave messages with different priorities 2021-09-24 12:51:28 +08:00
Turtle 830150b309 chore: update ekka tag 2021-06-17 11:30:43 +08:00
Turtle e10e241fe9 chore: export function 2021-06-08 10:54:19 +08:00
Zaiming Shi eeb480c051 core(deps): pin ekka 0.7.7 2021-06-08 10:54:19 +08:00
William Yang ca9b90be9a perf(broker): speedup trans when broker has a big mqueue 2021-04-29 17:10:10 +08:00
William Yang e02704e05a perf(broker): Optimization for handling bursty traffic
intro. new lock type: 'spawn' of broker.perf.route_lock_type

mnesia get lock calls are not optimized for selective receive.

hence taking locks would be very expensive while there are tones of
messages in the brokers message queue.

This optimization run the transaction in a separate process to utilize
the selective receive optimization of the compiler.
2021-04-29 17:10:10 +08:00
William Yang 5cc21dc3f0 fix(upgrade): duplicated modules loading. 2021-04-28 17:01:54 +02:00
William Yang 806cd33d1f chore(ct): fix a testcase timing. 2021-04-28 17:01:54 +02:00
William Yang f8ca066d74 upgrade: safe upgrade of emqx_connection 2021-04-28 21:29:20 +08:00
William Yang 59b7e8e3ba perf(lock): upgrade code for new lock types. 2021-04-28 21:29:20 +08:00
William Yang 571219f073 perf: new perf toggle broker.perf.route_lock_type 2021-04-28 21:29:20 +08:00
William Yang 9084554f4f fix: broker call should not timeout before client timeout
So change broker call timeout to infinity.
2021-04-28 21:29:20 +08:00
William Yang 1b179c8e2a perf(router): add route runs in async dirty context 2021-04-28 21:29:20 +08:00
William Yang fb8f5b79ad perf(trie): use global lock
Use global lock to reduce remote lock overhead.

So that emqx route trans can run in dirty *sync* context.

At least 10X subscribe/unsubscribe improvments.
2021-04-28 21:29:20 +08:00
JianBo He e427b0ff75 chore(appup): supply appup instructions for ws-connetion 2021-04-28 21:29:20 +08:00
Zaiming Shi b4e3c32e24 chore(conf): log only to file by default 2021-04-28 21:29:20 +08:00
Zaiming Shi c511e324bb fix(ekka): allow remote_console 2021-04-28 21:29:20 +08:00
zhanghongtong 1eb0f7b3b2 fix(ws connection): fix peer_cert_as_username error when ws connect 2021-04-28 21:29:20 +08:00
Shawn 7d003e0bfc Porting code for congestion alarms from 4.3.0 to e4.2.6 (#4523)
* fix(congestion): port some code from 4.3.0
* chore(emqx): update the appup file
2021-04-28 21:29:20 +08:00
Shawn 927264d793 fix(emqx): export do_deliver/2 for emqx_sn 2021-04-28 21:29:20 +08:00
Shawn 70da59c3bb chore(emqx): update appup 2021-04-28 21:29:20 +08:00
Shawn c69e1c6222 fix(mqtt-sn): sleep mode not working #4434 2021-04-28 21:29:20 +08:00
Shawn 6666210211 fix(congestion): alarm and clear congestion too frequetly 2021-04-28 21:29:20 +08:00
Shawn 238eaa8e40 fix(emqx): validate mqtt malformed variable byte integer 2021-03-10 19:42:44 +08:00
28 changed files with 607 additions and 137 deletions

View File

@ -296,6 +296,16 @@ broker.shared_dispatch_ack_enabled = false
## Value: Flag
broker.route_batch_clean = off
## Performance toggle for subscribe/unsubscribe wildcard topic.
## Change this toggle only when there are many wildcard topics.
## Value: Enum
## - key: mnesia translational updates with per-key locks. recommended for single node setup.
## - tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
## - global: global lock protected updates. recommended for larger cluster.
## NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
## to be stopped before the change.
broker.perf.route_lock_type = key
##-------------------------------------------------------------------
## Plugins
##-------------------------------------------------------------------

View File

@ -544,6 +544,16 @@ listener.ws.external.verify_protocol_header = on
## Value: Duration
## listener.ws.external.proxy_protocol_timeout = 3s
## See: listener.ssl.$name.peer_cert_as_username
##
## Value: cn
## listener.ws.external.peer_cert_as_username = cn
## See: listener.ssl.$name.peer_cert_as_clientid
##
## Value: cn
## listener.ws.external.peer_cert_as_clientid = cn
## The TCP backlog of external MQTT/WebSocket Listener.
##
## See: listener.ws.$name.backlog

View File

@ -10,7 +10,7 @@
## - file: write logs only to file
## - console: write logs only to standard I/O
## - both: write logs both to file and standard I/O
log.to = both
log.to = file
## The log severity level.
##
@ -167,4 +167,4 @@ log.rotation.count = 5
## Value: MaxBurstCount,TimeWindow
## Default: disabled
##
#log.burst_limit = 20000, 1s
#log.burst_limit = 20000, 1s

View File

@ -184,6 +184,30 @@ zone.external.enable_flapping_detect = off
## Example: 100KB incoming per 10 seconds.
#zone.external.rate_limit.conn_bytes_in = 100KB,10s
## Whether to alarm the congested connections.
##
## Sometimes the mqtt connection (usually an MQTT subscriber) may get "congested" because
## there're too many packets to sent. The socket trys to buffer the packets until the buffer is
## full. If more packets comes after that, the packets will be "pending" in a queue
## and we consider the connection is "congested".
##
## Enable this to send an alarm when there's any bytes pending in the queue. You could set
## the `listener.tcp.<ZoneName>.sndbuf` to a larger value if the alarm is triggered too often.
##
## The name of the alarm is of format "conn_congestion/<ClientID>/<Username>".
## Where the <ClientID> is the client-id of the congested MQTT connection.
## And the <Username> is the username or "unknown_user" of not provided by the client.
## Default: off
#zone.external.conn_congestion.alarm = off
## Won't clear the congested alarm in how long time.
## The alarm is cleared only when there're no pending bytes in the queue, and also it has been
## `min_alarm_sustain_duration` time since the last time we considered the connection is "congested".
##
## This is to avoid clearing and sending the alarm again too often.
## Default: 1m
#zone.external.conn_congestion.min_alarm_sustain_duration = 1m
## Messages quota for the each of external MQTT connection.
## This value consumed by the number of recipient on a message.
##

View File

@ -30,11 +30,13 @@
%% MQTT Protocol Version and Names
%%--------------------------------------------------------------------
-define(MQTT_SN_PROTO_V1, 1).
-define(MQTT_PROTO_V3, 3).
-define(MQTT_PROTO_V4, 4).
-define(MQTT_PROTO_V5, 5).
-define(PROTOCOL_NAMES, [
{?MQTT_SN_PROTO_V1, <<"MQTT-SN">>}, %% XXX:Compatible with emqx-sn plug-in
{?MQTT_PROTO_V3, <<"MQIsdp">>},
{?MQTT_PROTO_V4, <<"MQTT">>},
{?MQTT_PROTO_V5, <<"MQTT">>}]).

View File

@ -999,6 +999,16 @@ end}.
{datatype, string}
]}.
{mapping, "zone.$name.conn_congestion.alarm", "emqx.zones", [
{datatype, flag},
{default, off}
]}.
{mapping, "zone.$name.conn_congestion.min_alarm_sustain_duration", "emqx.zones", [
{default, "1m"},
{datatype, {duration, ms}}
]}.
{mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [
{datatype, string}
]}.
@ -1128,6 +1138,10 @@ end}.
{ratelimit, {conn_messages_in, Ratelimit(Val)}};
(["rate_limit", "conn_bytes_in"], Val) ->
{ratelimit, {conn_bytes_in, Ratelimit(Val)}};
(["conn_congestion", "alarm"], Val) ->
{conn_congestion_alarm_enabled, Val};
(["conn_congestion", "min_alarm_sustain_duration"], Val) ->
{conn_congestion_min_alarm_sustain_duration, Val};
(["quota", "conn_messages_routing"], Val) ->
{quota, {conn_messages_routing, Ratelimit(Val)}};
(["quota", "overall_messages_routing"], Val) ->
@ -1573,7 +1587,11 @@ end}.
]}.
{mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [
{datatype, {enum, [cn, dn, crt]}}
{datatype, {enum, [cn]}}
]}.
{mapping, "listener.ws.$name.peer_cert_as_clientid", "emqx.listeners", [
{datatype, {enum, [cn]}}
]}.
%%--------------------------------------------------------------------
@ -2008,6 +2026,18 @@ end}.
{datatype, flag}
]}.
%% @doc Performance toggle for subscribe/unsubscribe wildcard topic.
%% Change this toggle only when there are many wildcard topics.
%% key: mnesia translational updates with per-key locks. recommended for single node setup.
%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
%% global: global lock protected updates. recommended for larger cluster.
%% NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
%%
{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [
{default, key},
{datatype, {enum, [key, tab, global]}}
]}.
%%--------------------------------------------------------------------
%% System Monitor
%%--------------------------------------------------------------------

View File

@ -7,7 +7,7 @@
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.5"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.9"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}.

View File

@ -6,6 +6,7 @@
{add_module, emqx_congestion},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
@ -14,12 +15,23 @@
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]}
{resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{"4.2.1", [
{add_module, emqx_congestion},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
@ -27,15 +39,91 @@
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]}
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.[23]">>, [
{add_module, emqx_congestion},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []}
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.4">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.5">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_frame, brutal_purge, soft_purge, []}
]},
{<<"4.2.[6-7]">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_frame, brutal_purge, soft_purge, []}
]},
{<<"4.2.6">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
],
@ -51,8 +139,19 @@
{update, emqx_connection, {advanced, []}},
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{delete_module, emqx_congestion}
{delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{"4.2.1", [
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
@ -60,20 +159,94 @@
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{delete_module, emqx_congestion}
{delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.[23]">>, [
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{delete_module, emqx_congestion}
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.4">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.5">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_frame, brutal_purge, soft_purge, []}
]},
{<<"4.2.[6-7]">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_frame, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
]

View File

@ -28,7 +28,8 @@
-type(who() :: all | binary() |
{client, binary()} |
{user, binary()} |
{ipaddr, esockd_cidr:cidr_string()}).
{ipaddr, esockd_cidr:cidr_string()} |
{ipaddrs, list(esockd_cidr:cidr_string())}).
-type(access() :: subscribe | publish | pubsub).
@ -52,6 +53,8 @@ compile(who, all) ->
all;
compile(who, {ipaddr, CIDR}) ->
{ipaddr, esockd_cidr:parse(CIDR, true)};
compile(who, {ipaddrs, CIDRs}) ->
{ipaddrs, lists:map(fun(CIDR) -> esockd_cidr:parse(CIDR, true) end, CIDRs)};
compile(who, {client, all}) ->
{client, all};
compile(who, {client, ClientId}) ->
@ -108,8 +111,14 @@ match_who(#{username := Username}, {user, Username}) ->
true;
match_who(#{peerhost := undefined}, {ipaddr, _Tup}) ->
false;
match_who(#{peerhost := undefined}, {ipaddrs, _}) ->
false;
match_who(#{peerhost := IP}, {ipaddr, CIDR}) ->
esockd_cidr:match(IP, CIDR);
match_who(#{peerhost := IP}, {ipaddrs, CIDRs}) ->
lists:any(fun(CIDR) ->
esockd_cidr:match(IP, CIDR)
end, CIDRs);
match_who(ClientInfo, {'and', Conds}) when is_list(Conds) ->
lists:foldl(fun(Who, Allow) ->
match_who(ClientInfo, Who) andalso Allow

View File

@ -361,7 +361,7 @@ normalize_message(partition, #{occurred := Node}) ->
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) ->
list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info]));
normalize_message(<<"conn_congestion/", Info/binary>>, _) ->
list_to_binary(io_lib:format("connection congested: ~s", [Info]));
normalize_message(_Name, _UnknownDetails) ->
<<"Unknown alarm">>.

View File

@ -56,20 +56,20 @@ init({_Args, {alarm_handler, _ExistingAlarms}}) ->
init(_) ->
{ok, []}.
handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
emqx_alarm:activate(high_system_memory_usage, #{high_watermark => emqx_os_mon:get_sysmem_high_watermark()}),
{ok, State};
handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
emqx_alarm:activate(high_process_memory_usage, #{pid => Pid,
handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
emqx_alarm:activate(high_process_memory_usage, #{pid => list_to_binary(pid_to_list(Pid)),
high_watermark => emqx_os_mon:get_procmem_high_watermark()}),
{ok, State};
handle_event({clear_alarm, system_memory_high_watermark}, State) ->
handle_event({clear_alarm, system_memory_high_watermark}, State) ->
emqx_alarm:deactivate(high_system_memory_usage),
{ok, State};
handle_event({clear_alarm, process_memory_high_watermark}, State) ->
handle_event({clear_alarm, process_memory_high_watermark}, State) ->
emqx_alarm:deactivate(high_process_memory_usage),
{ok, State};
@ -85,4 +85,4 @@ handle_call(_Query, State) ->
terminate(swap, _State) ->
{emqx_alarm_handler, []};
terminate(_, _) ->
ok.
ok.

View File

@ -419,7 +419,7 @@ safe_update_stats(Tab, Stat, MaxStat) ->
-compile({inline, [call/2, cast/2, pick/1]}).
call(Broker, Req) ->
gen_server:call(Broker, Req).
gen_server:call(Broker, Req, infinity).
cast(Broker, Msg) ->
gen_server:cast(Broker, Msg).

View File

@ -32,6 +32,8 @@
-export([ info/1
, info/2
, set_conn_state/2
, get_session/1
, set_session/2
, stats/1
, caps/1
]).
@ -46,6 +48,12 @@
, terminate/2
]).
%% Export for emqx_sn
-export([ do_deliver/2
, ensure_keepalive/2
, clear_keepalive/1
]).
%% Exports for CT
-export([set_field/3]).
@ -167,6 +175,12 @@ info(timers, #channel{timers = Timers}) -> Timers.
set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}.
get_session(#channel{session = Session}) ->
Session.
set_session(Session, Channel) ->
Channel#channel{session = Session}.
%% TODO: Add more stats.
-spec(stats(channel()) -> emqx_types:stats()).
stats(#channel{session = Session})->
@ -1501,6 +1515,15 @@ ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
clear_keepalive(Channel = #channel{timers = Timers}) ->
case maps:get(alive_timer, Timers, undefined) of
undefined ->
Channel;
TRef ->
emqx_misc:cancel_timer(TRef),
Channel#channel{timers = maps:without([alive_timer], Timers)}
end.
%%--------------------------------------------------------------------
%% Maybe Resume Session

View File

@ -88,6 +88,8 @@
%% Batch drain
-define(BATCH_SIZE, 100000).
-define(T_TAKEOVER, 15000).
%% Server name
-define(CM, ?MODULE).
@ -222,7 +224,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
register_channel_(ClientId, Self, ConnInfo),
{ok, #{session => Session,
present => true,
@ -264,7 +266,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined ->
{error, not_found};
ConnMod when is_atom(ConnMod) ->
Session = ConnMod:call(ChanPid, {takeover, 'begin'}),
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
{ok, ConnMod, ChanPid, Session}
end;
@ -277,24 +279,35 @@ discard_session(ClientId) when is_binary(ClientId) ->
case lookup_channels(ClientId) of
[] -> ok;
ChanPids ->
lists:foreach(
fun(ChanPid) ->
try
discard_session(ClientId, ChanPid)
catch
_:{noproc,_}:_Stk -> ok;
_:{{shutdown,_},_}:_Stk -> ok;
_:Error:_Stk ->
?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error])
end
end, ChanPids)
lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids)
end.
do_discard_session(ClientId, Pid) ->
try
discard_session(ClientId, Pid)
catch
_ : noproc -> % emqx_ws_connection: call
?LOG(debug, "session_already_gone: ~p", [Pid]),
ok;
_ : {noproc, _} -> % emqx_connection: gen_server:call
?LOG(debug, "session_already_gone: ~p", [Pid]),
ok;
_ : {'EXIT', {noproc, _}} -> % rpc_call/3
?LOG(debug, "session_already_gone: ~p", [Pid]),
ok;
_ : {{shutdown, _}, _} ->
?LOG(debug, "session_already_shutdown: ~p", [Pid]),
ok;
_ : Error : St ->
?LOG(debug, "failed_to_discard_session: ~p, "
"error: ~p, stacktrace: ~0p", [Pid, Error, St])
end.
discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of
undefined -> ok;
ConnMod when is_atom(ConnMod) ->
ConnMod:call(ChanPid, discard)
ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
end;
discard_session(ClientId, ChanPid) ->
@ -317,7 +330,7 @@ kick_session(ClientId) ->
kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(ChanPid, kick);
ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
undefined ->
{error, not_found}
end;
@ -361,7 +374,7 @@ lookup_channels(local, ClientId) ->
%% @private
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of
case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of
{badrpc, Reason} -> error(Reason);
Res -> Res
end.

View File

@ -16,17 +16,16 @@
-module(emqx_congestion).
-export([ maybe_alarm_port_busy/3
, maybe_alarm_port_busy/4
, maybe_alarm_too_many_publish/5
, maybe_alarm_too_many_publish/6
-export([ maybe_alarm_conn_congestion/3
, cancel_alarms/3
]).
-define(ALARM_CONN_CONGEST(Channel, Reason),
list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s/~s", [emqx_channel:info(clientid, Channel),
maps:get(username, emqx_channel:info(clientinfo, Channel), <<"undefined">>),
Reason]))).
list_to_binary(
io_lib:format("~s/~s/~s",
[Reason, emqx_channel:info(clientid, Channel),
maps:get(username, emqx_channel:info(clientinfo, Channel),
<<"unknown_user">>)]))).
-define(ALARM_CONN_INFO_KEYS, [
socktype, sockname, peername, clientid, username, proto_name, proto_ver,
@ -36,44 +35,28 @@
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]).
-define(ALARM_SENT(REASON), {alarm_sent, REASON}).
-define(ALL_ALARM_REASONS, [port_busy, too_many_publish]).
-define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}).
-define(CONFIRM_CLEAR_INTERVAL, 10000).
-define(ALL_ALARM_REASONS, [conn_congestion]).
-define(WONT_CLEAR_IN, 60000).
maybe_alarm_port_busy(Socket, Transport, Channel) ->
maybe_alarm_port_busy(Socket, Transport, Channel, false).
maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) ->
case is_tcp_congested(Socket, Transport) of
true -> alarm_congestion(Socket, Transport, Channel, port_busy);
false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy,
ForceClear)
maybe_alarm_conn_congestion(Socket, Transport, Channel) ->
case is_alarm_enabled(Channel) of
false -> ok;
true ->
case is_tcp_congested(Socket, Transport) of
true -> alarm_congestion(Socket, Transport, Channel, conn_congestion);
false -> cancel_alarm_congestion(Socket, Transport, Channel, conn_congestion)
end
end.
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize) ->
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize, false).
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
PubMsgCount = _MaxBatchSize, _ForceClear) ->
%% we only alarm it when the process is "too busy"
alarm_congestion(Socket, Transport, Channel, too_many_publish);
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
_MaxBatchSize, ForceClear) when PubMsgCount == 0 ->
%% but we clear the alarm until it is really "idle", to avoid sending
%% alarms and clears too frequently
cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish,
ForceClear);
maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount,
_MaxBatchSize, _ForceClear) ->
ok.
cancel_alarms(Socket, Transport, Channel) ->
lists:foreach(fun(Reason) ->
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason)
end, ?ALL_ALARM_REASONS).
is_alarm_enabled(Channel) ->
emqx_zone:get_env(emqx_channel:info(zone, Channel),
conn_congestion_alarm_enabled, false).
alarm_congestion(Socket, Transport, Channel, Reason) ->
case has_alarm_sent(Reason) of
false -> do_alarm_congestion(Socket, Transport, Channel, Reason);
@ -82,8 +65,11 @@ alarm_congestion(Socket, Transport, Channel, Reason) ->
update_alarm_sent_at(Reason)
end.
cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) ->
case is_alarm_allowed_clear(Reason, ForceClear) of
cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
Zone = emqx_channel:info(zone, Channel),
WontClearIn = emqx_zone:get_env(Zone, conn_congestion_min_alarm_sustain_duration,
?WONT_CLEAR_IN),
case has_alarm_sent(Reason) andalso long_time_since_last_alarm(Reason, WontClearIn) of
true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
false -> ok
end.
@ -123,13 +109,11 @@ get_alarm_sent_at(Reason) ->
LastSentAt -> LastSentAt
end.
is_alarm_allowed_clear(Reason, _ForceClear = true) ->
has_alarm_sent(Reason);
is_alarm_allowed_clear(Reason, _ForceClear = false) ->
long_time_since_last_alarm(Reason, WontClearIn) ->
%% only sent clears when the alarm was not triggered in the last
%% ?CONFIRM_CLEAR_INTERVAL time
%% WontClearIn time
case timenow() - get_alarm_sent_at(Reason) of
Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true;
Elapse when Elapse >= WontClearIn -> true;
_ -> false
end.

View File

@ -38,7 +38,7 @@
, stats/1
]).
-export([call/2]).
-export([call/2, call/3]).
%% Callback
-export([init/4]).
@ -168,7 +168,9 @@ stats(#state{transport = Transport,
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
call(Pid, Req) ->
gen_server:call(Pid, Req, infinity).
call(Pid, Req, infinity).
call(Pid, Req, Timeout) ->
gen_server:call(Pid, Req, Timeout).
stop(Pid) ->
gen_server:stop(Pid).
@ -374,12 +376,8 @@ handle_msg({Passive, _Sock}, State)
handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg},
#state{active_n = MaxBatchSize, transport = Transport,
socket = Socket, channel = Channel} = State) ->
Delivers0 = emqx_misc:drain_deliver(MaxBatchSize),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
length(Delivers0), MaxBatchSize),
Delivers = [Deliver|Delivers0],
#state{active_n = ActiveN} = State) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
@ -548,12 +546,9 @@ handle_timeout(_TRef, limit_timeout, State) ->
},
handle_info(activate_socket, NState);
handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize,
channel = Channel, transport = Transport, socket = Socket}) ->
{_, MsgQLen} = erlang:process_info(self(), message_queue_len),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel, true),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
MsgQLen, MaxBatchSize, true),
handle_timeout(_TRef, emit_stats, State = #state{channel = Channel, transport = Transport,
socket = Socket}) ->
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_stats(ClientId, stats(State)),
{ok, State#state{stats_timer = undefined}};
@ -666,7 +661,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
Oct = iolist_size(IoData),
ok = emqx_metrics:inc('bytes.sent', Oct),
emqx_pd:inc_counter(outgoing_bytes, Oct),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel),
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
case Transport:async_send(Socket, IoData, [nosuspend]) of
ok -> ok;
Error = {error, _Reason} ->

View File

@ -67,6 +67,8 @@
version => ?MQTT_PROTO_V4
}).
-define(MULTIPLIER_MAX, 16#200000).
-dialyzer({no_match, [serialize_utf8_string/2]}).
%%--------------------------------------------------------------------
@ -141,6 +143,9 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
parse_frame(Rest, Header, 2, Options);
parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options)
when Multiplier > ?MULTIPLIER_MAX ->
error(malformed_variable_byte_integer);
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
@ -408,6 +413,9 @@ parse_property(<<16#2A, Val, Bin/binary>>, Props) ->
parse_variable_byte_integer(Bin) ->
parse_variable_byte_integer(Bin, 1, 0).
parse_variable_byte_integer(<<1:1, _Len:7, _Rest/binary>>, Multiplier, _Value)
when Multiplier > ?MULTIPLIER_MAX ->
error(malformed_variable_byte_integer);
parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) ->
parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier);
parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->

View File

@ -45,6 +45,8 @@
, index_of/2
]).
-define(OOM_FACTOR, 1.25).
%% @doc Merge options
-spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()).
merge_opts(Defaults, Options) ->
@ -185,8 +187,8 @@ do_check_oom([{Val, Max, Reason}|Rest]) ->
tune_heap_size(#{max_heap_size := MaxHeapSize}) ->
%% If set to zero, the limit is disabled.
erlang:process_flag(max_heap_size, #{size => MaxHeapSize,
kill => false,
erlang:process_flag(max_heap_size, #{size => must_kill_heap_size(MaxHeapSize),
kill => true,
error_logger => true
});
tune_heap_size(undefined) -> ok.
@ -233,3 +235,19 @@ index_of(E, I, [E|_]) ->
index_of(E, I, [_|L]) ->
index_of(E, I+1, L).
must_kill_heap_size(Size) ->
%% We set the max allowed heap size by `erlang:process_flag(max_heap_size, #{size => Size})`,
%% where the `Size` cannot be set to an integer lager than `(1 bsl 59) - 1` on a 64-bit system,
%% or `(1 bsl 27) - 1` on a 32-bit system.
MaxAllowedSize = case erlang:system_info(wordsize) of
8 -> % arch_64
(1 bsl 59) - 1;
4 -> % arch_32
(1 bsl 27) - 1
end,
%% We multiply the size with factor ?OOM_FACTOR, to give the
%% process a chance to suicide by `check_oom/1`
case ceil(Size * ?OOM_FACTOR) of
Size0 when Size0 >= MaxAllowedSize -> MaxAllowedSize;
Size0 -> Size0
end.

View File

@ -67,6 +67,9 @@
, dropped/1
]).
-export([ live_upgrade/1
]).
-export_type([mqueue/0, options/0]).
-type(topic() :: emqx_topic:topic()).
@ -91,6 +94,11 @@
-define(MAX_LEN_INFINITY, 0).
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
-record(shift_opts, {
multiplier :: non_neg_integer(),
base :: integer()
}).
-record(mqueue, {
store_qos0 = false :: boolean(),
max_len = ?MAX_LEN_INFINITY :: count(),
@ -98,11 +106,16 @@
dropped = 0 :: count(),
p_table = ?NO_PRIORITY_TABLE :: p_table(),
default_p = ?LOWEST_PRIORITY :: priority(),
q = ?PQUEUE:new() :: pq()
q = ?PQUEUE:new() :: pq(),
shift_opts :: #shift_opts{},
last_p :: non_neg_integer() | undefined,
counter :: non_neg_integer() | undefined
}).
-type(mqueue() :: #mqueue{}).
-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}).
-spec(init(options()) -> mqueue()).
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
@ -112,7 +125,8 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
#mqueue{max_len = MaxLen,
store_qos0 = QoS_0,
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
default_p = get_priority_opt(Opts)
default_p = get_priority_opt(Opts),
shift_opts = get_shift_opt(Opts)
}.
-spec(info(mqueue()) -> emqx_types:infos()).
@ -127,22 +141,30 @@ info(max_len, #mqueue{max_len = MaxLen}) ->
info(len, #mqueue{len = Len}) ->
Len;
info(dropped, #mqueue{dropped = Dropped}) ->
Dropped.
Dropped;
info(Info, ?OLD(MQ)) ->
info(Info, live_upgrade(MQ)).
is_empty(#mqueue{len = Len}) -> Len =:= 0.
is_empty(#mqueue{len = Len}) -> Len =:= 0;
is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)).
len(#mqueue{len = Len}) -> Len.
len(#mqueue{len = Len}) -> Len;
len(?OLD(MQ)) -> len(live_upgrade(MQ)).
max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
max_len(#mqueue{max_len = MaxLen}) -> MaxLen;
max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)).
%% @doc Return number of dropped messages.
-spec(dropped(mqueue()) -> count()).
dropped(#mqueue{dropped = Dropped}) -> Dropped.
dropped(#mqueue{dropped = Dropped}) -> Dropped;
dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)).
%% @doc Stats of the mqueue
-spec(stats(mqueue()) -> [stat()]).
stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}].
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}];
stats(?OLD(MQ)) ->
stats(live_upgrade(MQ)).
%% @doc Enqueue a message.
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
@ -165,15 +187,34 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
false ->
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
end.
end;
in(Msg, ?OLD(MQ)) ->
in(Msg, live_upgrade(MQ)).
-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
out(MQ = #mqueue{len = 0, q = Q}) ->
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
{empty, MQ};
out(MQ = #mqueue{q = Q, len = Len}) ->
out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) ->
{{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length
MQ1 = MQ#mqueue{
q = Q1,
len = Len - 1,
last_p = Prio,
counter = init_counter(Prio, ShiftOpts)
},
{{value, Val}, MQ1};
out(MQ = #mqueue{q = Q, counter = 0}) ->
MQ1 = MQ#mqueue{
q = ?PQUEUE:shift(Q),
last_p = undefined
},
out(MQ1);
out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) ->
{R, Q1} = ?PQUEUE:out(Q),
{R, MQ#mqueue{q = Q1, len = Len - 1}}.
{R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}};
out(?OLD(MQ)) ->
out(live_upgrade(MQ)).
get_opt(Key, Opts, Default) ->
case maps:get(Key, Opts, Default) of
@ -194,3 +235,46 @@ get_priority_opt(Opts) ->
%% while the highest 'infinity' is a [{infinity, queue:queue()}]
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
init_counter(?HIGHEST_PRIORITY, Opts) ->
Infinity = 1000000,
init_counter(Infinity, Opts);
init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
(Prio + Base) * Mult.
get_shift_opt(Opts) ->
Mult = maps:get(shift_multiplier, Opts, 10),
Min = case Opts of
#{p_table := PTab} ->
case maps:size(PTab) of
0 -> 0;
_ -> lists:min(maps:values(PTab))
end;
_ ->
?LOWEST_PRIORITY
end,
Base = case Min < 0 of
true -> -Min;
false -> 0
end,
#shift_opts{
multiplier = Mult,
base = Base
}.
live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) ->
ShiftOpts = case is_map(PTable) of
true -> get_shift_opt(#{p_table => PTable});
false -> get_shift_opt(#{})
end,
#mqueue{ store_qos0 = StoreQos0
, max_len = MaxLen
, dropped = Dropped
, p_table = PTable
, default_p = DefaultP
, len = Len
, q = Q
, shift_opts = ShiftOpts
, last_p = undefined
, counter = undefined
}.

View File

@ -55,6 +55,7 @@
, filter/2
, fold/3
, highest/1
, shift/1
]).
-export_type([q/0]).
@ -170,6 +171,14 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
-spec(shift(pqueue()) -> pqueue()).
shift(Q = {queue, _, _, _}) ->
Q;
shift({pqueue, []}) ->
{pqueue, []}; %% Shouldn't happen?
shift({pqueue, [Hd|Rest]}) ->
{pqueue, Rest ++ [Hd]}. %% Let's hope there are not many priorities.
-spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
@ -266,4 +275,3 @@ r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
maybe_negate_priority(infinity) -> infinity;
maybe_negate_priority(P) -> -P.

View File

@ -118,7 +118,8 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
false ->
ok = emqx_router_helper:monitor(Dest),
case emqx_topic:wildcard(Topic) of
true -> trans(fun insert_trie_route/1, [Route]);
true ->
maybe_trans(fun insert_trie_route/1, [Route]);
false -> insert_direct_route(Route)
end
end.
@ -164,7 +165,8 @@ do_delete_route(Topic) when is_binary(Topic) ->
do_delete_route(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true -> trans(fun delete_trie_route/1, [Route]);
true ->
maybe_trans(fun delete_trie_route/1, [Route]);
false -> delete_direct_route(Route)
end.
@ -247,10 +249,59 @@ delete_trie_route(Route = #route{topic = Topic}) ->
end.
%% @private
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
maybe_trans(Fun, Args) ->
case persistent_term:get(emqx_route_lock_type, key) of
key ->
trans(Fun, Args);
global ->
lock_router(),
try mnesia:sync_dirty(Fun, Args)
after
unlock_router()
end;
tab ->
trans(fun() ->
emqx_trie:lock_tables(),
apply(Fun, Args)
end, [])
end.
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
%% trigger selective receive optimization of compiler,
%% ideal for handling bursty traffic.
Ref = erlang:make_ref(),
Owner = self(),
{WPid, RefMon} = spawn_monitor(
fun() ->
Res = case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
Owner ! {Ref, Res}
end),
receive
{Ref, TransRes} ->
receive
{'DOWN', RefMon, process, WPid, normal} -> ok
end,
TransRes;
{'DOWN', RefMon, process, WPid, Info} ->
{error, {trans_crash, Info}}
end.
lock_router() ->
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
false ->
%% Force to sleep 1ms instead.
timer:sleep(1),
lock_router();
true ->
ok
end.
unlock_router() ->
global:del_lock({?MODULE, self()}).

View File

@ -34,6 +34,10 @@ init([]) ->
type => worker,
modules => [emqx_router_helper]},
ok = persistent_term:put(emqx_route_lock_type,
application:get_env(emqx, route_lock_type, key)
),
%% Router pool
RouterPool = emqx_pool_sup:spec([router_pool, hash,
{emqx_router, start_link, []}]),

View File

@ -75,6 +75,7 @@
-export([ deliver/2
, enqueue/2
, dequeue/1
, retry/1
, terminate/3
]).

View File

@ -31,7 +31,9 @@
, delete/1
]).
-export([empty/0]).
-export([ empty/0
, lock_tables/0
]).
-ifdef(TEST).
-compile(export_all).
@ -120,6 +122,11 @@ delete(Topic) when is_binary(Topic) ->
empty() ->
ets:info(?TRIE_TAB, size) == 0.
-spec lock_tables() -> ok.
lock_tables() ->
mnesia:write_lock_table(?TRIE_TAB),
mnesia:write_lock_table(?TRIE_NODE_TAB).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -34,7 +34,7 @@
, stats/1
]).
-export([call/2]).
-export([call/2, call/3]).
%% WebSocket callbacks
-export([ init/2
@ -151,7 +151,10 @@ stats(#state{channel = Channel}) ->
%% kick|discard|takeover
-spec(call(pid(), Req :: term()) -> Reply :: term()).
call(WsPid, Req) when is_pid(WsPid) ->
call(WsPid, Req) ->
call(WsPid, Req, 5000).
call(WsPid, Req, Timeout) when is_pid(WsPid) ->
Mref = erlang:monitor(process, WsPid),
WsPid ! {call, {self(), Mref}, Req},
receive
@ -160,7 +163,7 @@ call(WsPid, Req) when is_pid(WsPid) ->
Reply;
{'DOWN', Mref, _, _, Reason} ->
exit(Reason)
after 5000 ->
after Timeout ->
erlang:demonitor(Mref, [flush]),
exit(timeout)
end.
@ -196,15 +199,21 @@ init(Req, Opts) ->
end.
websocket_init([Req, Opts]) ->
Peername = case proplists:get_bool(proxy_protocol, Opts)
andalso maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort} ->
{SrcAddr, SrcPort};
_ ->
cowboy_req:peer(Req)
end,
{Peername, Peercert} =
case proplists:get_bool(proxy_protocol, Opts)
andalso maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
ProxyName = {SrcAddr, SrcPort},
%% Notice: Only CN is available in Proxy Protocol V2 additional info
ProxySSL = case maps:get(cn, SSL, undefined) of
undeined -> nossl;
CN -> [{pp2_ssl_cn, CN}]
end,
{ProxyName, ProxySSL};
_ ->
{cowboy_req:peer(Req), cowboy_req:cert(Req)}
end,
Sockname = cowboy_req:sock(Req),
Peercert = cowboy_req:cert(Req),
WsCookie = try cowboy_req:parse_cookies(Req)
catch
error:badarg ->

View File

@ -77,7 +77,7 @@ t_get_set_chan_stats(_) ->
t_open_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
ClientInfo = #{zone => external,
clientid => <<"clientid">>,
@ -153,14 +153,14 @@ t_open_session_race_condition(_) ->
t_discard_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = meck:unload(emqx_connection).
@ -180,7 +180,7 @@ t_takeover_session(_) ->
t_kick_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end),
{error, not_found} = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
test = emqx_cm:kick_session(<<"clientid">>),

View File

@ -42,7 +42,8 @@ all() ->
groups() ->
[{parse, [parallel],
[t_parse_cont,
t_parse_frame_too_large
t_parse_frame_too_large,
t_parse_frame_malformed_variable_byte_integer
]},
{connect, [parallel],
[t_serialize_parse_v3_connect,
@ -129,6 +130,12 @@ t_parse_frame_too_large(_) ->
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
t_parse_frame_malformed_variable_byte_integer(_) ->
MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 4) >>,
ParseState = emqx_frame:initial_parse_state(#{}),
?catch_error(malformed_variable_byte_integer,
emqx_frame:parse(MalformedPayload, ParseState)).
t_serialize_parse_v3_connect(_) ->
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,

View File

@ -270,7 +270,7 @@ t_connect_limit_timeout(_) ->
meck:unload(proplists).
t_connect_emit_stats_timeout(_) ->
IdleTimeout = 2000,
IdleTimeout = 2000 + 200,
emqx_zone:set_env(external, idle_timeout, IdleTimeout),
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]),
@ -278,7 +278,7 @@ t_connect_emit_stats_timeout(_) ->
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))),
timer:sleep(IdleTimeout),
timer:sleep(IdleTimeout+500),
?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))),
ok = emqtt:disconnect(Client).