Compare commits

...

63 Commits

Author SHA1 Message Date
JimMoen d398c5b943
Merge pull request #6861 from JimMoen/fix-vm-mem-calc
fix(vm): memory calc add `literal_alloc`
2022-01-26 14:13:38 +08:00
JimMoen 67a1c463e7 chore(appup): update appup.src for `emqx_vm` 2022-01-26 09:33:52 +08:00
JimMoen e80c15f2e3 fix(vm): memory calc add `literal_alloc` 2022-01-26 00:00:05 +08:00
tigercl 8cdfb531a7
Merge pull request #6698 from emqx/dev/e4.2.10
Auto-pull-request-on-2022-01-11
2022-01-13 10:37:18 +08:00
lafirest cb6bb9ede5
Merge pull request #6715 from lafirest/fix/appup
fix(emqx_appup): add emqx_limiter to all upgradable version
2022-01-12 17:00:50 +08:00
lafirest a3095fa91e fix(emqx_appup): add emqx_limiter to all upgradable version 2022-01-12 16:56:27 +08:00
lafirest 348e6b8f10
Merge pull request #6714 from lafirest/fix/quota_limiter
feat(emqx_limiter): add support for update overall limiter
2022-01-12 16:24:07 +08:00
lafirest 9c84bb5e87 chore: update the emqx appup 2022-01-12 15:54:17 +08:00
lafirest 3e69124ca0 feat(emqx_limiter): add support for update overall limiter 2022-01-12 15:48:58 +08:00
JimMoen 02b8b0ec08
Merge pull request #6674 from emqx/proxy-protocol-frame-log
proxy protocol frame log
revert `zone.external.max_topic_levels`
2022-01-07 15:23:42 +08:00
JimMoen cf3354d30d revert(zones_conf): zone.external.max_topic_levels 2022-01-07 15:08:20 +08:00
JimMoen 80608e9c99 chroe(appup): update appup.src 2022-01-07 15:08:20 +08:00
JimMoen 9969fd0d18 feat(frame): better log for proxy_protocol config disabled 2022-01-07 15:08:20 +08:00
JimMoen 93ec2ef995
Merge pull request #6673 from emqx/fix-appup
chore(appup): fix syntax error
2022-01-07 11:33:20 +08:00
JimMoen dc6bc76512 chore(appup): fix syntax error 2022-01-07 11:27:56 +08:00
lafirest f09bf74c99
Merge pull request #6471 from lafirest/fix/quota_limiter
fix(emqx_limiter): update the overall limiter when config updating
2021-12-23 17:19:49 +08:00
lafirest 0f1b14f865 fix(emqx_limiter): update the overall limiter when config updating 2021-12-17 14:37:54 +08:00
JimMoen e57c30a0b9 fix(conf): change `max_topic_levels` default configuration 2021-11-25 20:39:12 +08:00
tigercl 47b2642423
Merge pull request #6145 from emqx/dev/e4.2.9
Auto-pull-request-on-2021-11-12
2021-11-12 14:29:17 +08:00
tigercl a0a44eecb5
Merge pull request #6144 from JimMoen/fix-variable-byte-int-e42
fix(frame): variable byte num not limited in 4 bytes
2021-11-12 13:46:37 +08:00
Shawn cf9d82073c fix(ekka): update ekka to 0.7.10 2021-11-12 13:39:19 +08:00
JimMoen a11208b307 fix(frame): variable byte num not limited in 4 bytes 2021-11-12 11:36:11 +08:00
JianBo He fb7c84a5b8
Merge pull request #6103 from HJianBo/fix-force-kill-after-kick-or-discard-timeout-e42
Fix force kill after kick or discard timeout
2021-11-11 10:55:52 +08:00
JianBo He edb2c5f3c1 chore(appup): update appup.src 2021-11-10 11:34:32 +08:00
JianBo He 3d7f4335a0 fix(emqx_cm): replace ?tp with ?LOG 2021-11-10 11:34:30 +08:00
Zaiming Shi 915c827fdc fix(session): force kill session for 'kick' and 'discard'
Prior to this fix, 'kick' and 'discard' calls may timeout (or
fail for other reason), failures lead to only a log, then
continue to allow the new session to get registered.

As a result, in case a client is stuck, there is no way to
force it to step down, end up with multiple connections (sessions)
for the client ID in dashboard.

After this fix, the stale pids are notified to shutdown
via a gen_server:call, and forced with a exit(Pid, kill) for any
exception happend to the gen_server:call
2021-11-09 18:12:10 +08:00
JianBo He 649bf2f4cb chore(appup): update appup.src 2021-11-01 09:46:57 +08:00
JianBo He 437837d687 chore(cm): remove needless logs 2021-11-01 09:46:57 +08:00
Zaiming (Stone) Shi 26b426b7ad
Merge pull request #5816 from emqx/dev/e4.2.8
Auto-pull-request-on-2021-09-27
2021-09-27 11:50:28 +02:00
JianBo He 094c8fc48b
Merge branch 'stable/e4.2.7' into dev/e4.2.8 2021-09-27 17:09:34 +08:00
Turtle e86e1e0430 fix(ekka): kill the process if don't release lock 2021-09-27 09:57:23 +08:00
JianBo He 34375c6cc6 chore(appup): update appup.src 2021-09-26 20:10:23 +08:00
JianBo He b9527cfe9d test: increase waiting time to avoid test failure 2021-09-26 20:10:23 +08:00
JianBo He 5f2adb42ed chore: export keepalive funcs for mqtt-sn 2021-09-26 20:10:23 +08:00
JianBo He 7b177a7929 fix: more safe session call 2021-09-26 20:10:23 +08:00
JianBo He aa5d274464 feat: acl.conf support ipaddrs 2021-09-26 20:10:23 +08:00
JianBo He fb65d1c581 chore: add mqtt-sn protocol define 2021-09-26 20:10:23 +08:00
Turtle c0ca7f8bea fix(force_shutdown): cannot suicide if the process hangs up 2021-09-26 18:37:11 +08:00
Zaiming (Stone) Shi 4c30c50490
Merge pull request #5799 from emqx/merge-e4.3.4
feat(mqueue): Interleave messages with different priorities
2021-09-24 11:52:00 +02:00
Turtle 903a9e57a8 feat(mqueue): Interleave messages with different priorities 2021-09-24 12:51:28 +08:00
Turtle 830150b309 chore: update ekka tag 2021-06-17 11:30:43 +08:00
Turtle e10e241fe9 chore: export function 2021-06-08 10:54:19 +08:00
Zaiming Shi eeb480c051 core(deps): pin ekka 0.7.7 2021-06-08 10:54:19 +08:00
William Yang ca9b90be9a perf(broker): speedup trans when broker has a big mqueue 2021-04-29 17:10:10 +08:00
William Yang e02704e05a perf(broker): Optimization for handling bursty traffic
intro. new lock type: 'spawn' of broker.perf.route_lock_type

mnesia get lock calls are not optimized for selective receive.

hence taking locks would be very expensive while there are tones of
messages in the brokers message queue.

This optimization run the transaction in a separate process to utilize
the selective receive optimization of the compiler.
2021-04-29 17:10:10 +08:00
William Yang 5cc21dc3f0 fix(upgrade): duplicated modules loading. 2021-04-28 17:01:54 +02:00
William Yang 806cd33d1f chore(ct): fix a testcase timing. 2021-04-28 17:01:54 +02:00
William Yang f8ca066d74 upgrade: safe upgrade of emqx_connection 2021-04-28 21:29:20 +08:00
William Yang 59b7e8e3ba perf(lock): upgrade code for new lock types. 2021-04-28 21:29:20 +08:00
William Yang 571219f073 perf: new perf toggle broker.perf.route_lock_type 2021-04-28 21:29:20 +08:00
William Yang 9084554f4f fix: broker call should not timeout before client timeout
So change broker call timeout to infinity.
2021-04-28 21:29:20 +08:00
William Yang 1b179c8e2a perf(router): add route runs in async dirty context 2021-04-28 21:29:20 +08:00
William Yang fb8f5b79ad perf(trie): use global lock
Use global lock to reduce remote lock overhead.

So that emqx route trans can run in dirty *sync* context.

At least 10X subscribe/unsubscribe improvments.
2021-04-28 21:29:20 +08:00
JianBo He e427b0ff75 chore(appup): supply appup instructions for ws-connetion 2021-04-28 21:29:20 +08:00
Zaiming Shi b4e3c32e24 chore(conf): log only to file by default 2021-04-28 21:29:20 +08:00
Zaiming Shi c511e324bb fix(ekka): allow remote_console 2021-04-28 21:29:20 +08:00
zhanghongtong 1eb0f7b3b2 fix(ws connection): fix peer_cert_as_username error when ws connect 2021-04-28 21:29:20 +08:00
Shawn 7d003e0bfc Porting code for congestion alarms from 4.3.0 to e4.2.6 (#4523)
* fix(congestion): port some code from 4.3.0
* chore(emqx): update the appup file
2021-04-28 21:29:20 +08:00
Shawn 927264d793 fix(emqx): export do_deliver/2 for emqx_sn 2021-04-28 21:29:20 +08:00
Shawn 70da59c3bb chore(emqx): update appup 2021-04-28 21:29:20 +08:00
Shawn c69e1c6222 fix(mqtt-sn): sleep mode not working #4434 2021-04-28 21:29:20 +08:00
Shawn 6666210211 fix(congestion): alarm and clear congestion too frequetly 2021-04-28 21:29:20 +08:00
Shawn 238eaa8e40 fix(emqx): validate mqtt malformed variable byte integer 2021-03-10 19:42:44 +08:00
31 changed files with 895 additions and 218 deletions

View File

@ -202,9 +202,11 @@ mqtt.max_packet_size = 1MB
mqtt.max_clientid_len = 65535
## Maximum topic levels allowed. 0 means no limit.
## Depth so big may lead to subscribing performance issues.
##
## Value: Number
mqtt.max_topic_levels = 0
## Value: Number [0-65535]
## Default: 128
mqtt.max_topic_levels = 128
## Maximum QoS allowed.
##
@ -242,7 +244,7 @@ mqtt.ignore_loop_deliver = false
mqtt.strict_mode = false
## Specify the response information returned to the client
##
##
## Value: String
## mqtt.response_information = example
@ -296,6 +298,16 @@ broker.shared_dispatch_ack_enabled = false
## Value: Flag
broker.route_batch_clean = off
## Performance toggle for subscribe/unsubscribe wildcard topic.
## Change this toggle only when there are many wildcard topics.
## Value: Enum
## - key: mnesia translational updates with per-key locks. recommended for single node setup.
## - tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
## - global: global lock protected updates. recommended for larger cluster.
## NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
## to be stopped before the change.
broker.perf.route_lock_type = key
##-------------------------------------------------------------------
## Plugins
##-------------------------------------------------------------------

View File

@ -544,6 +544,16 @@ listener.ws.external.verify_protocol_header = on
## Value: Duration
## listener.ws.external.proxy_protocol_timeout = 3s
## See: listener.ssl.$name.peer_cert_as_username
##
## Value: cn
## listener.ws.external.peer_cert_as_username = cn
## See: listener.ssl.$name.peer_cert_as_clientid
##
## Value: cn
## listener.ws.external.peer_cert_as_clientid = cn
## The TCP backlog of external MQTT/WebSocket Listener.
##
## See: listener.ws.$name.backlog

View File

@ -10,7 +10,7 @@
## - file: write logs only to file
## - console: write logs only to standard I/O
## - both: write logs both to file and standard I/O
log.to = both
log.to = file
## The log severity level.
##
@ -167,4 +167,4 @@ log.rotation.count = 5
## Value: MaxBurstCount,TimeWindow
## Default: disabled
##
#log.burst_limit = 20000, 1s
#log.burst_limit = 20000, 1s

View File

@ -57,8 +57,9 @@ zone.external.force_shutdown_policy = 10000|32MB
## zone.external.max_clientid_len = 1024
## Maximum topic levels allowed. 0 means no limit.
## Depth so big may lead to subscribing performance issues.
##
## Value: Number
## Value: Number [0-65535]
## zone.external.max_topic_levels = 7
## Maximum QoS allowed.
@ -184,6 +185,30 @@ zone.external.enable_flapping_detect = off
## Example: 100KB incoming per 10 seconds.
#zone.external.rate_limit.conn_bytes_in = 100KB,10s
## Whether to alarm the congested connections.
##
## Sometimes the mqtt connection (usually an MQTT subscriber) may get "congested" because
## there're too many packets to sent. The socket trys to buffer the packets until the buffer is
## full. If more packets comes after that, the packets will be "pending" in a queue
## and we consider the connection is "congested".
##
## Enable this to send an alarm when there's any bytes pending in the queue. You could set
## the `listener.tcp.<ZoneName>.sndbuf` to a larger value if the alarm is triggered too often.
##
## The name of the alarm is of format "conn_congestion/<ClientID>/<Username>".
## Where the <ClientID> is the client-id of the congested MQTT connection.
## And the <Username> is the username or "unknown_user" of not provided by the client.
## Default: off
#zone.external.conn_congestion.alarm = off
## Won't clear the congested alarm in how long time.
## The alarm is cleared only when there're no pending bytes in the queue, and also it has been
## `min_alarm_sustain_duration` time since the last time we considered the connection is "congested".
##
## This is to avoid clearing and sending the alarm again too often.
## Default: 1m
#zone.external.conn_congestion.min_alarm_sustain_duration = 1m
## Messages quota for the each of external MQTT connection.
## This value consumed by the number of recipient on a message.
##
@ -226,7 +251,7 @@ zone.external.ignore_loop_deliver = false
zone.external.strict_mode = false
## Specify the response information returned to the client
##
##
## Value: String
## zone.external.response_information = example
@ -317,7 +342,7 @@ zone.internal.ignore_loop_deliver = false
zone.internal.strict_mode = false
## Specify the response information returned to the client
##
##
## Value: String
## zone.internal.response_information = example

View File

@ -30,11 +30,13 @@
%% MQTT Protocol Version and Names
%%--------------------------------------------------------------------
-define(MQTT_SN_PROTO_V1, 1).
-define(MQTT_PROTO_V3, 3).
-define(MQTT_PROTO_V4, 4).
-define(MQTT_PROTO_V5, 5).
-define(PROTOCOL_NAMES, [
{?MQTT_SN_PROTO_V1, <<"MQTT-SN">>}, %% XXX:Compatible with emqx-sn plug-in
{?MQTT_PROTO_V3, <<"MQIsdp">>},
{?MQTT_PROTO_V4, <<"MQTT">>},
{?MQTT_PROTO_V5, <<"MQTT">>}]).

View File

@ -757,7 +757,7 @@ end}.
%% @doc Set the Maximum topic levels.
{mapping, "mqtt.max_topic_levels", "emqx.max_topic_levels", [
{default, 0},
{default, 128},
{datatype, integer}
]}.
@ -999,6 +999,16 @@ end}.
{datatype, string}
]}.
{mapping, "zone.$name.conn_congestion.alarm", "emqx.zones", [
{datatype, flag},
{default, off}
]}.
{mapping, "zone.$name.conn_congestion.min_alarm_sustain_duration", "emqx.zones", [
{default, "1m"},
{datatype, {duration, ms}}
]}.
{mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [
{datatype, string}
]}.
@ -1128,6 +1138,10 @@ end}.
{ratelimit, {conn_messages_in, Ratelimit(Val)}};
(["rate_limit", "conn_bytes_in"], Val) ->
{ratelimit, {conn_bytes_in, Ratelimit(Val)}};
(["conn_congestion", "alarm"], Val) ->
{conn_congestion_alarm_enabled, Val};
(["conn_congestion", "min_alarm_sustain_duration"], Val) ->
{conn_congestion_min_alarm_sustain_duration, Val};
(["quota", "conn_messages_routing"], Val) ->
{quota, {conn_messages_routing, Ratelimit(Val)}};
(["quota", "overall_messages_routing"], Val) ->
@ -1573,7 +1587,11 @@ end}.
]}.
{mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [
{datatype, {enum, [cn, dn, crt]}}
{datatype, {enum, [cn]}}
]}.
{mapping, "listener.ws.$name.peer_cert_as_clientid", "emqx.listeners", [
{datatype, {enum, [cn]}}
]}.
%%--------------------------------------------------------------------
@ -2008,6 +2026,18 @@ end}.
{datatype, flag}
]}.
%% @doc Performance toggle for subscribe/unsubscribe wildcard topic.
%% Change this toggle only when there are many wildcard topics.
%% key: mnesia translational updates with per-key locks. recommended for single node setup.
%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
%% global: global lock protected updates. recommended for larger cluster.
%% NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
%%
{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [
{default, key},
{datatype, {enum, [key, tab, global]}}
]}.
%%--------------------------------------------------------------------
%% System Monitor
%%--------------------------------------------------------------------

View File

@ -6,8 +6,8 @@
[{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.5"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.10"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}.

View File

@ -1,11 +1,13 @@
%% -*-: erlang -*-
%% -*- mode: erlang -*-
{VSN,
[
{"4.2.0", [
{add_module, emqx_congestion},
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
@ -14,12 +16,24 @@
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]}
{resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{"4.2.1", [
{add_module, emqx_congestion},
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
@ -27,20 +41,118 @@
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]}
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.[23]">>, [
{add_module, emqx_congestion},
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []}
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.4">>, [
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.5">>, [
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.[6-7]">>, [
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.8">>, [
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.9">>, [
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
],
[
{"4.2.0", [
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
@ -51,29 +163,139 @@
{update, emqx_connection, {advanced, []}},
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{delete_module, emqx_congestion}
{delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{"4.2.1", [
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{delete_module, emqx_congestion}
{delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.[23]">>, [
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{delete_module, emqx_congestion}
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.4">>, [
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.5">>, [
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.[6-7]">>, [
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.8">>, [
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<"4.2.9">>, [
{load_module, emqx_vm, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
]

View File

@ -28,7 +28,8 @@
-type(who() :: all | binary() |
{client, binary()} |
{user, binary()} |
{ipaddr, esockd_cidr:cidr_string()}).
{ipaddr, esockd_cidr:cidr_string()} |
{ipaddrs, list(esockd_cidr:cidr_string())}).
-type(access() :: subscribe | publish | pubsub).
@ -52,6 +53,8 @@ compile(who, all) ->
all;
compile(who, {ipaddr, CIDR}) ->
{ipaddr, esockd_cidr:parse(CIDR, true)};
compile(who, {ipaddrs, CIDRs}) ->
{ipaddrs, lists:map(fun(CIDR) -> esockd_cidr:parse(CIDR, true) end, CIDRs)};
compile(who, {client, all}) ->
{client, all};
compile(who, {client, ClientId}) ->
@ -108,8 +111,14 @@ match_who(#{username := Username}, {user, Username}) ->
true;
match_who(#{peerhost := undefined}, {ipaddr, _Tup}) ->
false;
match_who(#{peerhost := undefined}, {ipaddrs, _}) ->
false;
match_who(#{peerhost := IP}, {ipaddr, CIDR}) ->
esockd_cidr:match(IP, CIDR);
match_who(#{peerhost := IP}, {ipaddrs, CIDRs}) ->
lists:any(fun(CIDR) ->
esockd_cidr:match(IP, CIDR)
end, CIDRs);
match_who(ClientInfo, {'and', Conds}) when is_list(Conds) ->
lists:foldl(fun(Who, Allow) ->
match_who(ClientInfo, Who) andalso Allow

View File

@ -361,7 +361,7 @@ normalize_message(partition, #{occurred := Node}) ->
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) ->
list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info]));
normalize_message(<<"conn_congestion/", Info/binary>>, _) ->
list_to_binary(io_lib:format("connection congested: ~s", [Info]));
normalize_message(_Name, _UnknownDetails) ->
<<"Unknown alarm">>.

View File

@ -56,20 +56,20 @@ init({_Args, {alarm_handler, _ExistingAlarms}}) ->
init(_) ->
{ok, []}.
handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
emqx_alarm:activate(high_system_memory_usage, #{high_watermark => emqx_os_mon:get_sysmem_high_watermark()}),
{ok, State};
handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
emqx_alarm:activate(high_process_memory_usage, #{pid => Pid,
handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
emqx_alarm:activate(high_process_memory_usage, #{pid => list_to_binary(pid_to_list(Pid)),
high_watermark => emqx_os_mon:get_procmem_high_watermark()}),
{ok, State};
handle_event({clear_alarm, system_memory_high_watermark}, State) ->
handle_event({clear_alarm, system_memory_high_watermark}, State) ->
emqx_alarm:deactivate(high_system_memory_usage),
{ok, State};
handle_event({clear_alarm, process_memory_high_watermark}, State) ->
handle_event({clear_alarm, process_memory_high_watermark}, State) ->
emqx_alarm:deactivate(high_process_memory_usage),
{ok, State};
@ -85,4 +85,4 @@ handle_call(_Query, State) ->
terminate(swap, _State) ->
{emqx_alarm_handler, []};
terminate(_, _) ->
ok.
ok.

View File

@ -419,7 +419,7 @@ safe_update_stats(Tab, Stat, MaxStat) ->
-compile({inline, [call/2, cast/2, pick/1]}).
call(Broker, Req) ->
gen_server:call(Broker, Req).
gen_server:call(Broker, Req, infinity).
cast(Broker, Msg) ->
gen_server:cast(Broker, Msg).

View File

@ -32,6 +32,8 @@
-export([ info/1
, info/2
, set_conn_state/2
, get_session/1
, set_session/2
, stats/1
, caps/1
]).
@ -46,6 +48,12 @@
, terminate/2
]).
%% Export for emqx_sn
-export([ do_deliver/2
, ensure_keepalive/2
, clear_keepalive/1
]).
%% Exports for CT
-export([set_field/3]).
@ -167,6 +175,12 @@ info(timers, #channel{timers = Timers}) -> Timers.
set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}.
get_session(#channel{session = Session}) ->
Session.
set_session(Session, Channel) ->
Channel#channel{session = Session}.
%% TODO: Add more stats.
-spec(stats(channel()) -> emqx_types:stats()).
stats(#channel{session = Session})->
@ -931,8 +945,11 @@ handle_info({sock_closed, Reason}, Channel =
Shutdown -> Shutdown
end;
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) ->
%% Since sock_closed messages can be generated multiple times,
%% we can simply ignore errors of this type in the disconnected state.
%% e.g. when the socket send function returns an error, there is already
%% a tcp_closed delivered to the process mailbox
{ok, Channel};
handle_info(clean_acl_cache, Channel) ->
@ -1501,6 +1518,15 @@ ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
clear_keepalive(Channel = #channel{timers = Timers}) ->
case maps:get(alive_timer, Timers, undefined) of
undefined ->
Channel;
TRef ->
emqx_misc:cancel_timer(TRef),
Channel#channel{timers = maps:without([alive_timer], Timers)}
end.
%%--------------------------------------------------------------------
%% Maybe Resume Session

View File

@ -70,7 +70,10 @@
]).
%% Internal export
-export([stats_fun/0]).
-export([stats_fun/0, clean_down/1]).
%% Test export
-export([register_channel_/3]).
-type(chan_pid() :: pid()).
@ -91,6 +94,10 @@
%% Server name
-define(CM, ?MODULE).
-define(T_KICK, 5000).
-define(T_GET_INFO, 5000).
-define(T_TAKEOVER, 15000).
%% @doc Start the channel manager.
-spec(start_link() -> startlink_ret()).
start_link() ->
@ -159,7 +166,7 @@ get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() ->
error:badarg -> undefined
end;
get_chan_info(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]).
rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO).
%% @doc Update infos of the channel.
-spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()).
@ -184,7 +191,7 @@ get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
error:badarg -> undefined
end;
get_chan_stats(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]).
rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO).
%% @doc Set channel's stats.
-spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()).
@ -222,7 +229,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
register_channel_(ClientId, Self, ConnInfo),
{ok, #{session => Session,
present => true,
@ -252,7 +259,7 @@ takeover_session(ClientId) ->
takeover_session(ClientId, ChanPid);
ChanPids ->
[ChanPid|StalePids] = lists:reverse(ChanPids),
?LOG(error, "More than one channel found: ~p", [ChanPids]),
?LOG(error, "more_than_one_channel_found: ~p", [ChanPids]),
lists:foreach(fun(StalePid) ->
catch discard_session(ClientId, StalePid)
end, StalePids),
@ -264,67 +271,111 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined ->
{error, not_found};
ConnMod when is_atom(ConnMod) ->
Session = ConnMod:call(ChanPid, {takeover, 'begin'}),
%% TODO: if takeover times out, maybe kill the old?
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
{ok, ConnMod, ChanPid, Session}
end;
takeover_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid]).
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
%% @doc Discard all the sessions identified by the ClientId.
-spec(discard_session(emqx_types:clientid()) -> ok).
discard_session(ClientId) when is_binary(ClientId) ->
case lookup_channels(ClientId) of
[] -> ok;
ChanPids ->
lists:foreach(
fun(ChanPid) ->
try
discard_session(ClientId, ChanPid)
catch
_:{noproc,_}:_Stk -> ok;
_:{{shutdown,_},_}:_Stk -> ok;
_:Error:_Stk ->
?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error])
end
end, ChanPids)
ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
end.
discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of
undefined -> ok;
ConnMod when is_atom(ConnMod) ->
ConnMod:call(ChanPid, discard)
end;
%% @private Kick a local stale session to force it step down.
%% If failed to kick (e.g. timeout) force a kill.
%% Keeping the stale pid around, or returning error or raise an exception
%% benefits nobody.
-spec kick_or_kill(kick | discard, module(), pid()) -> ok.
kick_or_kill(Action, ConnMod, Pid) ->
try
%% this is essentailly a gen_server:call implemented in emqx_connection
%% and emqx_ws_connection.
%% the handle_call is implemented in emqx_channel
ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
catch
_ : noproc -> % emqx_ws_connection: call
?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]),
ok;
_ : {noproc, _} -> % emqx_connection: gen_server:call
?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]),
ok;
_ : {shutdown, _} ->
?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]),
ok;
_ : {{shutdown, _}, _} ->
?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]),
ok;
_ : {timeout, {gen_server, call, _}} ->
?LOG(warning, "session_kick_timeout: ~p, action: ~p, "
"stale_channel: ~p",
[Pid, Action, stale_channel_info(Pid)]),
ok = force_kill(Pid);
_ : Error ->
?LOG(error, "session_kick_exception: ~p, action: ~p, "
"reason: ~p, stacktrace: ~p, stale_channel: ~p",
[Pid, Action, Error, erlang:get_stacktrace(), stale_channel_info(Pid)]),
ok = force_kill(Pid)
end.
force_kill(Pid) ->
exit(Pid, kill),
ok.
stale_channel_info(Pid) ->
process_info(Pid, [status, message_queue_len, current_stacktrace]).
discard_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), discard_session, [ClientId, ChanPid]).
kick_session(discard, ClientId, ChanPid).
kick_session(ClientId, ChanPid) ->
kick_session(kick, ClientId, ChanPid).
%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of
undefined ->
%% already deregistered
ok;
ConnMod when is_atom(ConnMod) ->
ok = kick_or_kill(Action, ConnMod, ChanPid)
end;
kick_session(Action, ClientId, ChanPid) ->
%% call remote node on the old APIs because we do not know if they have upgraded
%% to have kick_session/3
Function = case Action of
discard -> discard_session;
kick -> kick_session
end,
try
rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK)
catch
Error : Reason ->
%% This should mostly be RPC failures.
%% However, if the node is still running the old version
%% code (prior to emqx app 4.3.10) some of the RPC handler
%% exceptions may get propagated to a new version node
?LOG(error, "failed_to_kick_session_on_remote_node ~p: ~p ~p ~p",
[node(ChanPid), Action, Error, Reason])
end.
kick_session(ClientId) ->
case lookup_channels(ClientId) of
[] -> {error, not_found};
[ChanPid] ->
kick_session(ClientId, ChanPid);
[] ->
?LOG(warning, "kiecked_an_unknown_session ~ts", [ClientId]),
ok;
ChanPids ->
[ChanPid|StalePids] = lists:reverse(ChanPids),
?LOG(error, "More than one channel found: ~p", [ChanPids]),
lists:foreach(fun(StalePid) ->
catch discard_session(ClientId, StalePid)
end, StalePids),
kick_session(ClientId, ChanPid)
case length(ChanPids) > 1 of
true -> ?LOG(info, "more_than_one_channel_found: ~p", [ChanPids]);
false -> ok
end,
lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids)
end.
kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(ChanPid, kick);
undefined ->
{error, not_found}
end;
kick_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), kick_session, [ClientId, ChanPid]).
%% @doc Is clean start?
% is_clean_start(#{clean_start := false}) -> false;
% is_clean_start(_Attrs) -> true.
@ -360,10 +411,16 @@ lookup_channels(local, ClientId) ->
[ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
%% @private
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of
{badrpc, Reason} -> error(Reason);
Res -> Res
rpc_call(Node, Fun, Args, Timeout) ->
case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
{badrpc, Reason} ->
%% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler
%% should catch all exceptions and always return 'ok'.
%% This leaves 'badrpc' only possible when there is problem
%% calling the remote node.
error({badrpc, Reason});
Res ->
Res
end.
%% @private
@ -396,7 +453,7 @@ handle_cast(Msg, State) ->
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
{noreply, State#{chan_pmon := PMon1}};
handle_info(Info, State) ->
@ -432,5 +489,5 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
error:badarg -> undefined
end;
get_chann_conn_mod(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]).
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).

View File

@ -16,17 +16,16 @@
-module(emqx_congestion).
-export([ maybe_alarm_port_busy/3
, maybe_alarm_port_busy/4
, maybe_alarm_too_many_publish/5
, maybe_alarm_too_many_publish/6
-export([ maybe_alarm_conn_congestion/3
, cancel_alarms/3
]).
-define(ALARM_CONN_CONGEST(Channel, Reason),
list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s/~s", [emqx_channel:info(clientid, Channel),
maps:get(username, emqx_channel:info(clientinfo, Channel), <<"undefined">>),
Reason]))).
list_to_binary(
io_lib:format("~s/~s/~s",
[Reason, emqx_channel:info(clientid, Channel),
maps:get(username, emqx_channel:info(clientinfo, Channel),
<<"unknown_user">>)]))).
-define(ALARM_CONN_INFO_KEYS, [
socktype, sockname, peername, clientid, username, proto_name, proto_ver,
@ -36,44 +35,28 @@
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]).
-define(ALARM_SENT(REASON), {alarm_sent, REASON}).
-define(ALL_ALARM_REASONS, [port_busy, too_many_publish]).
-define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}).
-define(CONFIRM_CLEAR_INTERVAL, 10000).
-define(ALL_ALARM_REASONS, [conn_congestion]).
-define(WONT_CLEAR_IN, 60000).
maybe_alarm_port_busy(Socket, Transport, Channel) ->
maybe_alarm_port_busy(Socket, Transport, Channel, false).
maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) ->
case is_tcp_congested(Socket, Transport) of
true -> alarm_congestion(Socket, Transport, Channel, port_busy);
false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy,
ForceClear)
maybe_alarm_conn_congestion(Socket, Transport, Channel) ->
case is_alarm_enabled(Channel) of
false -> ok;
true ->
case is_tcp_congested(Socket, Transport) of
true -> alarm_congestion(Socket, Transport, Channel, conn_congestion);
false -> cancel_alarm_congestion(Socket, Transport, Channel, conn_congestion)
end
end.
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize) ->
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize, false).
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
PubMsgCount = _MaxBatchSize, _ForceClear) ->
%% we only alarm it when the process is "too busy"
alarm_congestion(Socket, Transport, Channel, too_many_publish);
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
_MaxBatchSize, ForceClear) when PubMsgCount == 0 ->
%% but we clear the alarm until it is really "idle", to avoid sending
%% alarms and clears too frequently
cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish,
ForceClear);
maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount,
_MaxBatchSize, _ForceClear) ->
ok.
cancel_alarms(Socket, Transport, Channel) ->
lists:foreach(fun(Reason) ->
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason)
end, ?ALL_ALARM_REASONS).
is_alarm_enabled(Channel) ->
emqx_zone:get_env(emqx_channel:info(zone, Channel),
conn_congestion_alarm_enabled, false).
alarm_congestion(Socket, Transport, Channel, Reason) ->
case has_alarm_sent(Reason) of
false -> do_alarm_congestion(Socket, Transport, Channel, Reason);
@ -82,8 +65,11 @@ alarm_congestion(Socket, Transport, Channel, Reason) ->
update_alarm_sent_at(Reason)
end.
cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) ->
case is_alarm_allowed_clear(Reason, ForceClear) of
cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
Zone = emqx_channel:info(zone, Channel),
WontClearIn = emqx_zone:get_env(Zone, conn_congestion_min_alarm_sustain_duration,
?WONT_CLEAR_IN),
case has_alarm_sent(Reason) andalso long_time_since_last_alarm(Reason, WontClearIn) of
true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
false -> ok
end.
@ -123,13 +109,11 @@ get_alarm_sent_at(Reason) ->
LastSentAt -> LastSentAt
end.
is_alarm_allowed_clear(Reason, _ForceClear = true) ->
has_alarm_sent(Reason);
is_alarm_allowed_clear(Reason, _ForceClear = false) ->
long_time_since_last_alarm(Reason, WontClearIn) ->
%% only sent clears when the alarm was not triggered in the last
%% ?CONFIRM_CLEAR_INTERVAL time
%% WontClearIn time
case timenow() - get_alarm_sent_at(Reason) of
Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true;
Elapse when Elapse >= WontClearIn -> true;
_ -> false
end.

View File

@ -38,7 +38,7 @@
, stats/1
]).
-export([call/2]).
-export([call/2, call/3]).
%% Callback
-export([init/4]).
@ -168,7 +168,9 @@ stats(#state{transport = Transport,
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
call(Pid, Req) ->
gen_server:call(Pid, Req, infinity).
call(Pid, Req, infinity).
call(Pid, Req, Timeout) ->
gen_server:call(Pid, Req, Timeout).
stop(Pid) ->
gen_server:stop(Pid).
@ -374,12 +376,8 @@ handle_msg({Passive, _Sock}, State)
handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg},
#state{active_n = MaxBatchSize, transport = Transport,
socket = Socket, channel = Channel} = State) ->
Delivers0 = emqx_misc:drain_deliver(MaxBatchSize),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
length(Delivers0), MaxBatchSize),
Delivers = [Deliver|Delivers0],
#state{active_n = ActiveN} = State) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
@ -548,12 +546,9 @@ handle_timeout(_TRef, limit_timeout, State) ->
},
handle_info(activate_socket, NState);
handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize,
channel = Channel, transport = Transport, socket = Socket}) ->
{_, MsgQLen} = erlang:process_info(self(), message_queue_len),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel, true),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
MsgQLen, MaxBatchSize, true),
handle_timeout(_TRef, emit_stats, State = #state{channel = Channel, transport = Transport,
socket = Socket}) ->
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_stats(ClientId, stats(State)),
{ok, State#state{stats_timer = undefined}};
@ -594,6 +589,11 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
NState = State#state{parse_state = NParseState},
parse_incoming(Rest, [Packet|Packets], NState)
catch
error:proxy_protocol_config_disabled ->
?LOG(error,
"~nMalformed packet, "
"please check proxy_protocol config for specific listeners and zones~n"),
{[{frame_error, proxy_protocol_config_disabled} | Packets], State};
error:Reason:Stk ->
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p",
[Reason, Stk, Data]),
@ -666,7 +666,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
Oct = iolist_size(IoData),
ok = emqx_metrics:inc('bytes.sent', Oct),
emqx_pd:inc_counter(outgoing_bytes, Oct),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel),
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
case Transport:async_send(Socket, IoData, [nosuspend]) of
ok -> ok;
Error = {error, _Reason} ->
@ -690,7 +690,10 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
end;
handle_info({sock_error, Reason}, State) ->
Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]),
case Reason =/= closed andalso Reason =/= einval of
true -> ?LOG(warning, "socket_error: ~p", [Reason]);
false -> ok
end,
handle_info({sock_closed, Reason}, close_socket(State));
handle_info(Info, State) ->
@ -814,4 +817,3 @@ stop(Reason, Reply, State) ->
set_field(Name, Value, State) ->
Pos = emqx_misc:index_of(Name, record_info(fields, state)),
setelement(Pos+1, State, Value).

View File

@ -67,6 +67,15 @@
version => ?MQTT_PROTO_V4
}).
-define(MULTIPLIER_MAX, 16#200000).
%% proxy_protocol v1 header human readable
-define(PPV1_PROXY, "PROXY ").
-define(PPV1_PROXY_UNKNOWN, "PROXY UNKNOWN").
%% proxy_protocol v2 header signature:
%% 16#0D,16#0A, 16#0D,16#0A,16#00,16#0D,16#0A,16#51,16#55,16#49,16#54,16#0A
-define(PPV2_HEADER_SIG, "\r\n\r\n\0\r\nQUIT\n").
-dialyzer({no_match, [serialize_utf8_string/2]}).
%%--------------------------------------------------------------------
@ -96,6 +105,13 @@ parse(Bin) ->
-spec(parse(binary(), parse_state()) -> parse_result()).
parse(<<>>, {none, Options}) ->
{more, {none, Options}};
parse(<<?PPV1_PROXY, IPVer:5/binary, _Rest/binary>>, {none, _Options})
when IPVer =:= <<"TCP4 ">> orelse IPVer =:= <<"TCP6 ">> ->
error(proxy_protocol_config_disabled);
parse(<<?PPV1_PROXY_UNKNOWN, _Rest/binary>>, {none, _Options}) ->
error(proxy_protocol_config_disabled);
parse(<<?PPV2_HEADER_SIG, _Rest/binary>>, {none, _Options}) ->
error(proxy_protocol_config_disabled);
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
{none, Options = #{strict_mode := StrictMode}}) ->
%% Validate header if strict mode.
@ -141,6 +157,9 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
parse_frame(Rest, Header, 2, Options);
parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options)
when Multiplier > ?MULTIPLIER_MAX ->
error(malformed_variable_byte_integer);
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
@ -408,6 +427,9 @@ parse_property(<<16#2A, Val, Bin/binary>>, Props) ->
parse_variable_byte_integer(Bin) ->
parse_variable_byte_integer(Bin, 1, 0).
parse_variable_byte_integer(<<1:1, _Len:7, _Rest/binary>>, Multiplier, _Value)
when Multiplier > ?MULTIPLIER_MAX ->
error(malformed_variable_byte_integer);
parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) ->
parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier);
parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->

View File

@ -23,6 +23,7 @@
, init/4 %% XXX: Compatible with before 4.2 version
, info/1
, check/2
, update_overall_limiter/4
]).
-record(limiter, {
@ -152,3 +153,15 @@ is_message_limiter(conn_messages_in) -> true;
is_message_limiter(conn_messages_routing) -> true;
is_message_limiter(overall_messages_routing) -> true;
is_message_limiter(_) -> false.
update_overall_limiter(Zone, Name, Capacity, Interval) ->
case is_overall_limiter(Name) of
false -> false;
_ ->
try
esockd_limiter:update({Zone, Name}, Capacity, Interval),
true
catch _:_:_ ->
false
end
end.

View File

@ -45,6 +45,8 @@
, index_of/2
]).
-define(OOM_FACTOR, 1.25).
%% @doc Merge options
-spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()).
merge_opts(Defaults, Options) ->
@ -185,8 +187,8 @@ do_check_oom([{Val, Max, Reason}|Rest]) ->
tune_heap_size(#{max_heap_size := MaxHeapSize}) ->
%% If set to zero, the limit is disabled.
erlang:process_flag(max_heap_size, #{size => MaxHeapSize,
kill => false,
erlang:process_flag(max_heap_size, #{size => must_kill_heap_size(MaxHeapSize),
kill => true,
error_logger => true
});
tune_heap_size(undefined) -> ok.
@ -233,3 +235,19 @@ index_of(E, I, [E|_]) ->
index_of(E, I, [_|L]) ->
index_of(E, I+1, L).
must_kill_heap_size(Size) ->
%% We set the max allowed heap size by `erlang:process_flag(max_heap_size, #{size => Size})`,
%% where the `Size` cannot be set to an integer lager than `(1 bsl 59) - 1` on a 64-bit system,
%% or `(1 bsl 27) - 1` on a 32-bit system.
MaxAllowedSize = case erlang:system_info(wordsize) of
8 -> % arch_64
(1 bsl 59) - 1;
4 -> % arch_32
(1 bsl 27) - 1
end,
%% We multiply the size with factor ?OOM_FACTOR, to give the
%% process a chance to suicide by `check_oom/1`
case ceil(Size * ?OOM_FACTOR) of
Size0 when Size0 >= MaxAllowedSize -> MaxAllowedSize;
Size0 -> Size0
end.

View File

@ -67,6 +67,9 @@
, dropped/1
]).
-export([ live_upgrade/1
]).
-export_type([mqueue/0, options/0]).
-type(topic() :: emqx_topic:topic()).
@ -91,6 +94,11 @@
-define(MAX_LEN_INFINITY, 0).
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
-record(shift_opts, {
multiplier :: non_neg_integer(),
base :: integer()
}).
-record(mqueue, {
store_qos0 = false :: boolean(),
max_len = ?MAX_LEN_INFINITY :: count(),
@ -98,11 +106,16 @@
dropped = 0 :: count(),
p_table = ?NO_PRIORITY_TABLE :: p_table(),
default_p = ?LOWEST_PRIORITY :: priority(),
q = ?PQUEUE:new() :: pq()
q = ?PQUEUE:new() :: pq(),
shift_opts :: #shift_opts{},
last_p :: non_neg_integer() | undefined,
counter :: non_neg_integer() | undefined
}).
-type(mqueue() :: #mqueue{}).
-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}).
-spec(init(options()) -> mqueue()).
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
@ -112,7 +125,8 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
#mqueue{max_len = MaxLen,
store_qos0 = QoS_0,
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
default_p = get_priority_opt(Opts)
default_p = get_priority_opt(Opts),
shift_opts = get_shift_opt(Opts)
}.
-spec(info(mqueue()) -> emqx_types:infos()).
@ -127,22 +141,30 @@ info(max_len, #mqueue{max_len = MaxLen}) ->
info(len, #mqueue{len = Len}) ->
Len;
info(dropped, #mqueue{dropped = Dropped}) ->
Dropped.
Dropped;
info(Info, ?OLD(MQ)) ->
info(Info, live_upgrade(MQ)).
is_empty(#mqueue{len = Len}) -> Len =:= 0.
is_empty(#mqueue{len = Len}) -> Len =:= 0;
is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)).
len(#mqueue{len = Len}) -> Len.
len(#mqueue{len = Len}) -> Len;
len(?OLD(MQ)) -> len(live_upgrade(MQ)).
max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
max_len(#mqueue{max_len = MaxLen}) -> MaxLen;
max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)).
%% @doc Return number of dropped messages.
-spec(dropped(mqueue()) -> count()).
dropped(#mqueue{dropped = Dropped}) -> Dropped.
dropped(#mqueue{dropped = Dropped}) -> Dropped;
dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)).
%% @doc Stats of the mqueue
-spec(stats(mqueue()) -> [stat()]).
stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}].
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}];
stats(?OLD(MQ)) ->
stats(live_upgrade(MQ)).
%% @doc Enqueue a message.
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
@ -165,15 +187,34 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
false ->
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
end.
end;
in(Msg, ?OLD(MQ)) ->
in(Msg, live_upgrade(MQ)).
-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
out(MQ = #mqueue{len = 0, q = Q}) ->
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
{empty, MQ};
out(MQ = #mqueue{q = Q, len = Len}) ->
out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) ->
{{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length
MQ1 = MQ#mqueue{
q = Q1,
len = Len - 1,
last_p = Prio,
counter = init_counter(Prio, ShiftOpts)
},
{{value, Val}, MQ1};
out(MQ = #mqueue{q = Q, counter = 0}) ->
MQ1 = MQ#mqueue{
q = ?PQUEUE:shift(Q),
last_p = undefined
},
out(MQ1);
out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) ->
{R, Q1} = ?PQUEUE:out(Q),
{R, MQ#mqueue{q = Q1, len = Len - 1}}.
{R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}};
out(?OLD(MQ)) ->
out(live_upgrade(MQ)).
get_opt(Key, Opts, Default) ->
case maps:get(Key, Opts, Default) of
@ -194,3 +235,46 @@ get_priority_opt(Opts) ->
%% while the highest 'infinity' is a [{infinity, queue:queue()}]
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
init_counter(?HIGHEST_PRIORITY, Opts) ->
Infinity = 1000000,
init_counter(Infinity, Opts);
init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
(Prio + Base) * Mult.
get_shift_opt(Opts) ->
Mult = maps:get(shift_multiplier, Opts, 10),
Min = case Opts of
#{p_table := PTab} ->
case maps:size(PTab) of
0 -> 0;
_ -> lists:min(maps:values(PTab))
end;
_ ->
?LOWEST_PRIORITY
end,
Base = case Min < 0 of
true -> -Min;
false -> 0
end,
#shift_opts{
multiplier = Mult,
base = Base
}.
live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) ->
ShiftOpts = case is_map(PTable) of
true -> get_shift_opt(#{p_table => PTable});
false -> get_shift_opt(#{})
end,
#mqueue{ store_qos0 = StoreQos0
, max_len = MaxLen
, dropped = Dropped
, p_table = PTable
, default_p = DefaultP
, len = Len
, q = Q
, shift_opts = ShiftOpts
, last_p = undefined
, counter = undefined
}.

View File

@ -55,6 +55,7 @@
, filter/2
, fold/3
, highest/1
, shift/1
]).
-export_type([q/0]).
@ -170,6 +171,14 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
-spec(shift(pqueue()) -> pqueue()).
shift(Q = {queue, _, _, _}) ->
Q;
shift({pqueue, []}) ->
{pqueue, []}; %% Shouldn't happen?
shift({pqueue, [Hd|Rest]}) ->
{pqueue, Rest ++ [Hd]}. %% Let's hope there are not many priorities.
-spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
@ -266,4 +275,3 @@ r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
maybe_negate_priority(infinity) -> infinity;
maybe_negate_priority(P) -> -P.

View File

@ -118,7 +118,8 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
false ->
ok = emqx_router_helper:monitor(Dest),
case emqx_topic:wildcard(Topic) of
true -> trans(fun insert_trie_route/1, [Route]);
true ->
maybe_trans(fun insert_trie_route/1, [Route]);
false -> insert_direct_route(Route)
end
end.
@ -164,7 +165,8 @@ do_delete_route(Topic) when is_binary(Topic) ->
do_delete_route(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true -> trans(fun delete_trie_route/1, [Route]);
true ->
maybe_trans(fun delete_trie_route/1, [Route]);
false -> delete_direct_route(Route)
end.
@ -247,10 +249,59 @@ delete_trie_route(Route = #route{topic = Topic}) ->
end.
%% @private
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
maybe_trans(Fun, Args) ->
case persistent_term:get(emqx_route_lock_type, key) of
key ->
trans(Fun, Args);
global ->
lock_router(),
try mnesia:sync_dirty(Fun, Args)
after
unlock_router()
end;
tab ->
trans(fun() ->
emqx_trie:lock_tables(),
apply(Fun, Args)
end, [])
end.
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
%% trigger selective receive optimization of compiler,
%% ideal for handling bursty traffic.
Ref = erlang:make_ref(),
Owner = self(),
{WPid, RefMon} = spawn_monitor(
fun() ->
Res = case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
Owner ! {Ref, Res}
end),
receive
{Ref, TransRes} ->
receive
{'DOWN', RefMon, process, WPid, normal} -> ok
end,
TransRes;
{'DOWN', RefMon, process, WPid, Info} ->
{error, {trans_crash, Info}}
end.
lock_router() ->
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
false ->
%% Force to sleep 1ms instead.
timer:sleep(1),
lock_router();
true ->
ok
end.
unlock_router() ->
global:del_lock({?MODULE, self()}).

View File

@ -34,6 +34,10 @@ init([]) ->
type => worker,
modules => [emqx_router_helper]},
ok = persistent_term:put(emqx_route_lock_type,
application:get_env(emqx, route_lock_type, key)
),
%% Router pool
RouterPool = emqx_pool_sup:spec([router_pool, hash,
{emqx_router, start_link, []}]),

View File

@ -75,6 +75,7 @@
-export([ deliver/2
, enqueue/2
, dequeue/1
, retry/1
, terminate/3
]).

View File

@ -31,7 +31,9 @@
, delete/1
]).
-export([empty/0]).
-export([ empty/0
, lock_tables/0
]).
-ifdef(TEST).
-compile(export_all).
@ -120,6 +122,11 @@ delete(Topic) when is_binary(Topic) ->
empty() ->
ets:info(?TRIE_TAB, size) == 0.
-spec lock_tables() -> ok.
lock_tables() ->
mnesia:write_lock_table(?TRIE_TAB),
mnesia:write_lock_table(?TRIE_NODE_TAB).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -64,6 +64,7 @@
sl_alloc,
ll_alloc,
fix_alloc,
literal_alloc,
std_alloc
]).
@ -484,4 +485,3 @@ compat_windows(Fun) ->
_Error -> 0
end
end.

View File

@ -34,7 +34,7 @@
, stats/1
]).
-export([call/2]).
-export([call/2, call/3]).
%% WebSocket callbacks
-export([ init/2
@ -63,7 +63,7 @@
%% Simulate the active_n opt
active_n :: pos_integer(),
%% MQTT Piggyback
mqtt_piggyback :: single | multiple,
mqtt_piggyback :: single | multiple,
%% Limiter
limiter :: maybe(emqx_limiter:limiter()),
%% Limit Timer
@ -151,7 +151,10 @@ stats(#state{channel = Channel}) ->
%% kick|discard|takeover
-spec(call(pid(), Req :: term()) -> Reply :: term()).
call(WsPid, Req) when is_pid(WsPid) ->
call(WsPid, Req) ->
call(WsPid, Req, 5000).
call(WsPid, Req, Timeout) when is_pid(WsPid) ->
Mref = erlang:monitor(process, WsPid),
WsPid ! {call, {self(), Mref}, Req},
receive
@ -160,7 +163,7 @@ call(WsPid, Req) when is_pid(WsPid) ->
Reply;
{'DOWN', Mref, _, _, Reason} ->
exit(Reason)
after 5000 ->
after Timeout ->
erlang:demonitor(Mref, [flush]),
exit(timeout)
end.
@ -196,15 +199,21 @@ init(Req, Opts) ->
end.
websocket_init([Req, Opts]) ->
Peername = case proplists:get_bool(proxy_protocol, Opts)
andalso maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort} ->
{SrcAddr, SrcPort};
_ ->
cowboy_req:peer(Req)
end,
{Peername, Peercert} =
case proplists:get_bool(proxy_protocol, Opts)
andalso maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
ProxyName = {SrcAddr, SrcPort},
%% Notice: Only CN is available in Proxy Protocol V2 additional info
ProxySSL = case maps:get(cn, SSL, undefined) of
undeined -> nossl;
CN -> [{pp2_ssl_cn, CN}]
end,
{ProxyName, ProxySSL};
_ ->
{cowboy_req:peer(Req), cowboy_req:cert(Req)}
end,
Sockname = cowboy_req:sock(Req),
Peercert = cowboy_req:cert(Req),
WsCookie = try cowboy_req:parse_cookies(Req)
catch
error:badarg ->
@ -477,6 +486,12 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
NState = State#state{parse_state = NParseState},
parse_incoming(Rest, postpone({incoming, Packet}, NState))
catch
error:proxy_protocol_config_disabled ->
?LOG(error,
"~nMalformed packet, "
"please check proxy_protocol config for specific listeners and zones~n"),
FrameError = {frame_error, proxy_protocol_config_disabled},
postpone({incoming, FrameError} ,State);
error:Reason:Stk ->
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data: ~0p",
[Reason, Stk, Data]),
@ -535,7 +550,7 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQT
postpone({check_gc, Stats}, State);
false -> State
end,
{case MQTTPiggyback of
single -> [{binary, IoData}];
multiple -> lists:map(fun(Bin) -> {binary, Bin} end, IoData)
@ -680,4 +695,3 @@ trigger(Event) -> erlang:send(self(), Event).
set_field(Name, Value, State) ->
Pos = emqx_misc:index_of(Name, record_info(fields, state)),
setelement(Pos+1, State, Value).

View File

@ -82,7 +82,7 @@ t_chan_caps(_) ->
#{max_clientid_len := 65535,
max_qos_allowed := 2,
max_topic_alias := 65535,
max_topic_levels := 0,
max_topic_levels := 128,
retain_available := true,
shared_subscription := true,
subscription_identifiers := true,
@ -768,4 +768,3 @@ session(InitFields) when is_map(InitFields) ->
quota() ->
emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}},
{overall_messages_routing, {10, 1}}]).

View File

@ -31,6 +31,12 @@
conn_mod => emqx_connection,
receive_maximum => 100}}).
-define(WAIT(PATTERN, TIMEOUT, RET),
fun() ->
receive PATTERN -> RET
after TIMEOUT -> error({timeout, ?LINE}) end
end()).
%%--------------------------------------------------------------------
%% CT callbacks
%%--------------------------------------------------------------------
@ -77,7 +83,7 @@ t_get_set_chan_stats(_) ->
t_open_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
ClientInfo = #{zone => external,
clientid => <<"clientid">>,
@ -151,19 +157,98 @@ t_open_session_race_condition(_) ->
exit(Pid, kill), timer:sleep(100),
?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
t_discard_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = meck:unload(emqx_connection).
t_kick_session_discard_normal(_) ->
test_kick_session(discard, normal).
t_kick_session_discard_shutdown(_) ->
test_kick_session(discard, shutdown).
t_kick_session_discard_shutdown_with_reason(_) ->
test_kick_session(discard, {shutdown, discard}).
t_kick_session_discard_timeout(_) ->
test_kick_session(discard, timeout).
t_kick_session_discard_noproc(_) ->
test_kick_session(discard, noproc).
t_kick_session_kick_normal(_) ->
test_kick_session(discard, normal).
t_kick_session_kick_shutdown(_) ->
test_kick_session(discard, shutdown).
t_kick_session_kick_shutdown_with_reason(_) ->
test_kick_session(discard, {shutdown, discard}).
t_kick_session_kick_timeout(_) ->
test_kick_session(discard, timeout).
t_kick_session_kick_noproc(_) ->
test_kick_session(discard, noproc).
test_kick_session(Action, Reason) ->
ClientId = rand_client_id(),
#{conninfo := ConnInfo} = ?ChanInfo,
FakeSessionFun =
fun Loop() ->
receive
{'$gen_call', From, A} when A =:= kick orelse
A =:= discard ->
case Reason of
normal ->
gen_server:reply(From, ok);
timeout ->
%% no response to the call
Loop();
_ ->
exit(Reason)
end;
Msg ->
ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]),
Loop()
end
end,
{Pid1, _} = spawn_monitor(FakeSessionFun),
{Pid2, _} = spawn_monitor(FakeSessionFun),
ok = emqx_cm:register_channel_(ClientId, Pid1, ConnInfo),
ok = emqx_cm:register_channel_(ClientId, Pid1, ConnInfo),
ok = emqx_cm:register_channel_(ClientId, Pid2, ConnInfo),
?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))),
case Reason of
noproc -> exit(Pid1, kill), exit(Pid2, kill);
_ -> ok
end,
ok = case Action of
kick -> emqx_cm:kick_session(ClientId);
discard -> emqx_cm:discard_session(ClientId)
end,
case Reason =:= timeout orelse Reason =:= noproc of
true ->
?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2000, R)),
?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2000, R));
false ->
?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2000, R)),
?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2000, R))
end,
ok = flush_emqx_pool(),
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
rand_client_id() ->
list_to_binary("client-id-" ++ integer_to_list(erlang:system_time())).
%% Channel deregistration is delegated to emqx_pool as a sync tasks.
%% The emqx_pool is pool of workers, and there is no way to know
%% which worker was picked for the last deregistration task.
%% This help function creates a large enough number of async tasks
%% to sync with the pool workers.
%% The number of tasks should be large enough to ensure all workers have
%% the chance to work on at least one of the tasks.
flush_emqx_pool() ->
Self = self(),
L = lists:seq(1, 1000),
lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L),
lists:foreach(fun(I) -> receive {done, I} -> ok end end, L).
t_takeover_session(_) ->
{error, not_found} = emqx_cm:takeover_session(<<"clientid">>),
@ -178,21 +263,6 @@ t_takeover_session(_) ->
{ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
emqx_cm:unregister_channel(<<"clientid">>).
t_kick_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
{error, not_found} = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
test = emqx_cm:kick_session(<<"clientid">>),
erlang:spawn(fun() ->
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
timer:sleep(1000)
end),
ct:sleep(100),
test = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = meck:unload(emqx_connection).
t_all_channels(_) ->
?assertEqual(true, is_list(emqx_cm:all_channels())).

View File

@ -42,7 +42,8 @@ all() ->
groups() ->
[{parse, [parallel],
[t_parse_cont,
t_parse_frame_too_large
t_parse_frame_too_large,
t_parse_frame_malformed_variable_byte_integer
]},
{connect, [parallel],
[t_serialize_parse_v3_connect,
@ -129,6 +130,12 @@ t_parse_frame_too_large(_) ->
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
t_parse_frame_malformed_variable_byte_integer(_) ->
MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 4) >>,
ParseState = emqx_frame:initial_parse_state(#{}),
?catch_error(malformed_variable_byte_integer,
emqx_frame:parse(MalformedPayload, ParseState)).
t_serialize_parse_v3_connect(_) ->
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,

View File

@ -270,7 +270,7 @@ t_connect_limit_timeout(_) ->
meck:unload(proplists).
t_connect_emit_stats_timeout(_) ->
IdleTimeout = 2000,
IdleTimeout = 2000 + 200,
emqx_zone:set_env(external, idle_timeout, IdleTimeout),
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]),
@ -278,7 +278,7 @@ t_connect_emit_stats_timeout(_) ->
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))),
timer:sleep(IdleTimeout),
timer:sleep(IdleTimeout+500),
?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))),
ok = emqtt:disconnect(Client).