Compare commits

...

37 Commits

Author SHA1 Message Date
JianBo He 9b976f6b36 chore(appup): update appup.src 2021-10-31 09:59:01 +08:00
JianBo He ab6d2e7e27 chore(cm): remove needless logs 2021-10-31 09:56:18 +08:00
Zaiming (Stone) Shi 26b426b7ad
Merge pull request #5816 from emqx/dev/e4.2.8
Auto-pull-request-on-2021-09-27
2021-09-27 11:50:28 +02:00
JianBo He 094c8fc48b
Merge branch 'stable/e4.2.7' into dev/e4.2.8 2021-09-27 17:09:34 +08:00
Turtle e86e1e0430 fix(ekka): kill the process if don't release lock 2021-09-27 09:57:23 +08:00
JianBo He 34375c6cc6 chore(appup): update appup.src 2021-09-26 20:10:23 +08:00
JianBo He b9527cfe9d test: increase waiting time to avoid test failure 2021-09-26 20:10:23 +08:00
JianBo He 5f2adb42ed chore: export keepalive funcs for mqtt-sn 2021-09-26 20:10:23 +08:00
JianBo He 7b177a7929 fix: more safe session call 2021-09-26 20:10:23 +08:00
JianBo He aa5d274464 feat: acl.conf support ipaddrs 2021-09-26 20:10:23 +08:00
JianBo He fb65d1c581 chore: add mqtt-sn protocol define 2021-09-26 20:10:23 +08:00
Turtle c0ca7f8bea fix(force_shutdown): cannot suicide if the process hangs up 2021-09-26 18:37:11 +08:00
Zaiming (Stone) Shi 4c30c50490
Merge pull request #5799 from emqx/merge-e4.3.4
feat(mqueue): Interleave messages with different priorities
2021-09-24 11:52:00 +02:00
Turtle 903a9e57a8 feat(mqueue): Interleave messages with different priorities 2021-09-24 12:51:28 +08:00
Turtle 830150b309 chore: update ekka tag 2021-06-17 11:30:43 +08:00
Turtle e10e241fe9 chore: export function 2021-06-08 10:54:19 +08:00
Zaiming Shi eeb480c051 core(deps): pin ekka 0.7.7 2021-06-08 10:54:19 +08:00
William Yang ca9b90be9a perf(broker): speedup trans when broker has a big mqueue 2021-04-29 17:10:10 +08:00
William Yang e02704e05a perf(broker): Optimization for handling bursty traffic
intro. new lock type: 'spawn' of broker.perf.route_lock_type

mnesia get lock calls are not optimized for selective receive.

hence taking locks would be very expensive while there are tones of
messages in the brokers message queue.

This optimization run the transaction in a separate process to utilize
the selective receive optimization of the compiler.
2021-04-29 17:10:10 +08:00
William Yang 5cc21dc3f0 fix(upgrade): duplicated modules loading. 2021-04-28 17:01:54 +02:00
William Yang 806cd33d1f chore(ct): fix a testcase timing. 2021-04-28 17:01:54 +02:00
William Yang f8ca066d74 upgrade: safe upgrade of emqx_connection 2021-04-28 21:29:20 +08:00
William Yang 59b7e8e3ba perf(lock): upgrade code for new lock types. 2021-04-28 21:29:20 +08:00
William Yang 571219f073 perf: new perf toggle broker.perf.route_lock_type 2021-04-28 21:29:20 +08:00
William Yang 9084554f4f fix: broker call should not timeout before client timeout
So change broker call timeout to infinity.
2021-04-28 21:29:20 +08:00
William Yang 1b179c8e2a perf(router): add route runs in async dirty context 2021-04-28 21:29:20 +08:00
William Yang fb8f5b79ad perf(trie): use global lock
Use global lock to reduce remote lock overhead.

So that emqx route trans can run in dirty *sync* context.

At least 10X subscribe/unsubscribe improvments.
2021-04-28 21:29:20 +08:00
JianBo He e427b0ff75 chore(appup): supply appup instructions for ws-connetion 2021-04-28 21:29:20 +08:00
Zaiming Shi b4e3c32e24 chore(conf): log only to file by default 2021-04-28 21:29:20 +08:00
Zaiming Shi c511e324bb fix(ekka): allow remote_console 2021-04-28 21:29:20 +08:00
zhanghongtong 1eb0f7b3b2 fix(ws connection): fix peer_cert_as_username error when ws connect 2021-04-28 21:29:20 +08:00
Shawn 7d003e0bfc Porting code for congestion alarms from 4.3.0 to e4.2.6 (#4523)
* fix(congestion): port some code from 4.3.0
* chore(emqx): update the appup file
2021-04-28 21:29:20 +08:00
Shawn 927264d793 fix(emqx): export do_deliver/2 for emqx_sn 2021-04-28 21:29:20 +08:00
Shawn 70da59c3bb chore(emqx): update appup 2021-04-28 21:29:20 +08:00
Shawn c69e1c6222 fix(mqtt-sn): sleep mode not working #4434 2021-04-28 21:29:20 +08:00
Shawn 6666210211 fix(congestion): alarm and clear congestion too frequetly 2021-04-28 21:29:20 +08:00
Shawn 238eaa8e40 fix(emqx): validate mqtt malformed variable byte integer 2021-03-10 19:42:44 +08:00
28 changed files with 611 additions and 139 deletions

View File

@ -296,6 +296,16 @@ broker.shared_dispatch_ack_enabled = false
## Value: Flag ## Value: Flag
broker.route_batch_clean = off broker.route_batch_clean = off
## Performance toggle for subscribe/unsubscribe wildcard topic.
## Change this toggle only when there are many wildcard topics.
## Value: Enum
## - key: mnesia translational updates with per-key locks. recommended for single node setup.
## - tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
## - global: global lock protected updates. recommended for larger cluster.
## NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
## to be stopped before the change.
broker.perf.route_lock_type = key
##------------------------------------------------------------------- ##-------------------------------------------------------------------
## Plugins ## Plugins
##------------------------------------------------------------------- ##-------------------------------------------------------------------

View File

@ -544,6 +544,16 @@ listener.ws.external.verify_protocol_header = on
## Value: Duration ## Value: Duration
## listener.ws.external.proxy_protocol_timeout = 3s ## listener.ws.external.proxy_protocol_timeout = 3s
## See: listener.ssl.$name.peer_cert_as_username
##
## Value: cn
## listener.ws.external.peer_cert_as_username = cn
## See: listener.ssl.$name.peer_cert_as_clientid
##
## Value: cn
## listener.ws.external.peer_cert_as_clientid = cn
## The TCP backlog of external MQTT/WebSocket Listener. ## The TCP backlog of external MQTT/WebSocket Listener.
## ##
## See: listener.ws.$name.backlog ## See: listener.ws.$name.backlog

View File

@ -10,7 +10,7 @@
## - file: write logs only to file ## - file: write logs only to file
## - console: write logs only to standard I/O ## - console: write logs only to standard I/O
## - both: write logs both to file and standard I/O ## - both: write logs both to file and standard I/O
log.to = both log.to = file
## The log severity level. ## The log severity level.
## ##

View File

@ -184,6 +184,30 @@ zone.external.enable_flapping_detect = off
## Example: 100KB incoming per 10 seconds. ## Example: 100KB incoming per 10 seconds.
#zone.external.rate_limit.conn_bytes_in = 100KB,10s #zone.external.rate_limit.conn_bytes_in = 100KB,10s
## Whether to alarm the congested connections.
##
## Sometimes the mqtt connection (usually an MQTT subscriber) may get "congested" because
## there're too many packets to sent. The socket trys to buffer the packets until the buffer is
## full. If more packets comes after that, the packets will be "pending" in a queue
## and we consider the connection is "congested".
##
## Enable this to send an alarm when there's any bytes pending in the queue. You could set
## the `listener.tcp.<ZoneName>.sndbuf` to a larger value if the alarm is triggered too often.
##
## The name of the alarm is of format "conn_congestion/<ClientID>/<Username>".
## Where the <ClientID> is the client-id of the congested MQTT connection.
## And the <Username> is the username or "unknown_user" of not provided by the client.
## Default: off
#zone.external.conn_congestion.alarm = off
## Won't clear the congested alarm in how long time.
## The alarm is cleared only when there're no pending bytes in the queue, and also it has been
## `min_alarm_sustain_duration` time since the last time we considered the connection is "congested".
##
## This is to avoid clearing and sending the alarm again too often.
## Default: 1m
#zone.external.conn_congestion.min_alarm_sustain_duration = 1m
## Messages quota for the each of external MQTT connection. ## Messages quota for the each of external MQTT connection.
## This value consumed by the number of recipient on a message. ## This value consumed by the number of recipient on a message.
## ##

View File

@ -30,11 +30,13 @@
%% MQTT Protocol Version and Names %% MQTT Protocol Version and Names
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-define(MQTT_SN_PROTO_V1, 1).
-define(MQTT_PROTO_V3, 3). -define(MQTT_PROTO_V3, 3).
-define(MQTT_PROTO_V4, 4). -define(MQTT_PROTO_V4, 4).
-define(MQTT_PROTO_V5, 5). -define(MQTT_PROTO_V5, 5).
-define(PROTOCOL_NAMES, [ -define(PROTOCOL_NAMES, [
{?MQTT_SN_PROTO_V1, <<"MQTT-SN">>}, %% XXX:Compatible with emqx-sn plug-in
{?MQTT_PROTO_V3, <<"MQIsdp">>}, {?MQTT_PROTO_V3, <<"MQIsdp">>},
{?MQTT_PROTO_V4, <<"MQTT">>}, {?MQTT_PROTO_V4, <<"MQTT">>},
{?MQTT_PROTO_V5, <<"MQTT">>}]). {?MQTT_PROTO_V5, <<"MQTT">>}]).

View File

@ -999,6 +999,16 @@ end}.
{datatype, string} {datatype, string}
]}. ]}.
{mapping, "zone.$name.conn_congestion.alarm", "emqx.zones", [
{datatype, flag},
{default, off}
]}.
{mapping, "zone.$name.conn_congestion.min_alarm_sustain_duration", "emqx.zones", [
{default, "1m"},
{datatype, {duration, ms}}
]}.
{mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [ {mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [
{datatype, string} {datatype, string}
]}. ]}.
@ -1128,6 +1138,10 @@ end}.
{ratelimit, {conn_messages_in, Ratelimit(Val)}}; {ratelimit, {conn_messages_in, Ratelimit(Val)}};
(["rate_limit", "conn_bytes_in"], Val) -> (["rate_limit", "conn_bytes_in"], Val) ->
{ratelimit, {conn_bytes_in, Ratelimit(Val)}}; {ratelimit, {conn_bytes_in, Ratelimit(Val)}};
(["conn_congestion", "alarm"], Val) ->
{conn_congestion_alarm_enabled, Val};
(["conn_congestion", "min_alarm_sustain_duration"], Val) ->
{conn_congestion_min_alarm_sustain_duration, Val};
(["quota", "conn_messages_routing"], Val) -> (["quota", "conn_messages_routing"], Val) ->
{quota, {conn_messages_routing, Ratelimit(Val)}}; {quota, {conn_messages_routing, Ratelimit(Val)}};
(["quota", "overall_messages_routing"], Val) -> (["quota", "overall_messages_routing"], Val) ->
@ -1573,7 +1587,11 @@ end}.
]}. ]}.
{mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [ {mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [
{datatype, {enum, [cn, dn, crt]}} {datatype, {enum, [cn]}}
]}.
{mapping, "listener.ws.$name.peer_cert_as_clientid", "emqx.listeners", [
{datatype, {enum, [cn]}}
]}. ]}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -2008,6 +2026,18 @@ end}.
{datatype, flag} {datatype, flag}
]}. ]}.
%% @doc Performance toggle for subscribe/unsubscribe wildcard topic.
%% Change this toggle only when there are many wildcard topics.
%% key: mnesia translational updates with per-key locks. recommended for single node setup.
%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
%% global: global lock protected updates. recommended for larger cluster.
%% NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
%%
{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [
{default, key},
{datatype, {enum, [key, tab, global]}}
]}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% System Monitor %% System Monitor
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -7,7 +7,7 @@
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.5"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.9"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}. ]}.

View File

@ -6,6 +6,7 @@
{add_module, emqx_congestion}, {add_module, emqx_congestion},
{load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_metrics, brutal_purge, soft_purge, []}, {load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor,emqx_connection, emqx_ws_connection]}, {suspend, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
@ -14,12 +15,23 @@
{update, emqx_ws_connection, {advanced, []}}, {update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, {load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]} {resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]}, ]},
{"4.2.1", [ {"4.2.1", [
{add_module, emqx_congestion}, {add_module, emqx_congestion},
{load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, {suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []},
@ -27,15 +39,90 @@
{update, emqx_ws_connection, {advanced, []}}, {update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, {load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]} {resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]}, ]},
{<<"4.2.[23]">>, [ {<<"4.2.[23]">>, [
{add_module, emqx_congestion}, {add_module, emqx_congestion},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_session, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []} {load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.4">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.5">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.[6-7]">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.8">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []}
]}, ]},
{<<".*">>, []} {<<".*">>, []}
], ],
@ -51,8 +138,19 @@
{update, emqx_connection, {advanced, []}}, {update, emqx_connection, {advanced, []}},
{update, emqx_ws_connection, {advanced, []}}, {update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, {resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{delete_module, emqx_congestion} {delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]}, ]},
{"4.2.1", [ {"4.2.1", [
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, {load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
@ -60,20 +158,96 @@
{load_module, emqx_limiter, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, {suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}}, {update, emqx_connection, {advanced, []}},
{update, emqx_ws_connection, {advanced, []}}, {update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, {resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{delete_module, emqx_congestion} {delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]}, ]},
{<<"4.2.[23]">>, [ {<<"4.2.[23]">>, [
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, {load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []},
{delete_module, emqx_congestion} {load_module, emqx_frame, brutal_purge, soft_purge, []},
{delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.4">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.5">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.[6-7]">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_cm, brutal_purge, soft_purge, []},
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
]},
{<<"4.2.8">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []}
]}, ]},
{<<".*">>, []} {<<".*">>, []}
] ]

View File

@ -28,7 +28,8 @@
-type(who() :: all | binary() | -type(who() :: all | binary() |
{client, binary()} | {client, binary()} |
{user, binary()} | {user, binary()} |
{ipaddr, esockd_cidr:cidr_string()}). {ipaddr, esockd_cidr:cidr_string()} |
{ipaddrs, list(esockd_cidr:cidr_string())}).
-type(access() :: subscribe | publish | pubsub). -type(access() :: subscribe | publish | pubsub).
@ -52,6 +53,8 @@ compile(who, all) ->
all; all;
compile(who, {ipaddr, CIDR}) -> compile(who, {ipaddr, CIDR}) ->
{ipaddr, esockd_cidr:parse(CIDR, true)}; {ipaddr, esockd_cidr:parse(CIDR, true)};
compile(who, {ipaddrs, CIDRs}) ->
{ipaddrs, lists:map(fun(CIDR) -> esockd_cidr:parse(CIDR, true) end, CIDRs)};
compile(who, {client, all}) -> compile(who, {client, all}) ->
{client, all}; {client, all};
compile(who, {client, ClientId}) -> compile(who, {client, ClientId}) ->
@ -108,8 +111,14 @@ match_who(#{username := Username}, {user, Username}) ->
true; true;
match_who(#{peerhost := undefined}, {ipaddr, _Tup}) -> match_who(#{peerhost := undefined}, {ipaddr, _Tup}) ->
false; false;
match_who(#{peerhost := undefined}, {ipaddrs, _}) ->
false;
match_who(#{peerhost := IP}, {ipaddr, CIDR}) -> match_who(#{peerhost := IP}, {ipaddr, CIDR}) ->
esockd_cidr:match(IP, CIDR); esockd_cidr:match(IP, CIDR);
match_who(#{peerhost := IP}, {ipaddrs, CIDRs}) ->
lists:any(fun(CIDR) ->
esockd_cidr:match(IP, CIDR)
end, CIDRs);
match_who(ClientInfo, {'and', Conds}) when is_list(Conds) -> match_who(ClientInfo, {'and', Conds}) when is_list(Conds) ->
lists:foldl(fun(Who, Allow) -> lists:foldl(fun(Who, Allow) ->
match_who(ClientInfo, Who) andalso Allow match_who(ClientInfo, Who) andalso Allow

View File

@ -361,7 +361,7 @@ normalize_message(partition, #{occurred := Node}) ->
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID])); list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) -> normalize_message(<<"conn_congestion/", Info/binary>>, _) ->
list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info])); list_to_binary(io_lib:format("connection congested: ~s", [Info]));
normalize_message(_Name, _UnknownDetails) -> normalize_message(_Name, _UnknownDetails) ->
<<"Unknown alarm">>. <<"Unknown alarm">>.

View File

@ -61,7 +61,7 @@ handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
{ok, State}; {ok, State};
handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) -> handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
emqx_alarm:activate(high_process_memory_usage, #{pid => Pid, emqx_alarm:activate(high_process_memory_usage, #{pid => list_to_binary(pid_to_list(Pid)),
high_watermark => emqx_os_mon:get_procmem_high_watermark()}), high_watermark => emqx_os_mon:get_procmem_high_watermark()}),
{ok, State}; {ok, State};

View File

@ -419,7 +419,7 @@ safe_update_stats(Tab, Stat, MaxStat) ->
-compile({inline, [call/2, cast/2, pick/1]}). -compile({inline, [call/2, cast/2, pick/1]}).
call(Broker, Req) -> call(Broker, Req) ->
gen_server:call(Broker, Req). gen_server:call(Broker, Req, infinity).
cast(Broker, Msg) -> cast(Broker, Msg) ->
gen_server:cast(Broker, Msg). gen_server:cast(Broker, Msg).

View File

@ -32,6 +32,8 @@
-export([ info/1 -export([ info/1
, info/2 , info/2
, set_conn_state/2 , set_conn_state/2
, get_session/1
, set_session/2
, stats/1 , stats/1
, caps/1 , caps/1
]). ]).
@ -46,6 +48,12 @@
, terminate/2 , terminate/2
]). ]).
%% Export for emqx_sn
-export([ do_deliver/2
, ensure_keepalive/2
, clear_keepalive/1
]).
%% Exports for CT %% Exports for CT
-export([set_field/3]). -export([set_field/3]).
@ -167,6 +175,12 @@ info(timers, #channel{timers = Timers}) -> Timers.
set_conn_state(ConnState, Channel) -> set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}. Channel#channel{conn_state = ConnState}.
get_session(#channel{session = Session}) ->
Session.
set_session(Session, Channel) ->
Channel#channel{session = Session}.
%% TODO: Add more stats. %% TODO: Add more stats.
-spec(stats(channel()) -> emqx_types:stats()). -spec(stats(channel()) -> emqx_types:stats()).
stats(#channel{session = Session})-> stats(#channel{session = Session})->
@ -932,7 +946,10 @@ handle_info({sock_closed, Reason}, Channel =
end; end;
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
?LOG(error, "Unexpected sock_closed: ~p", [Reason]), %% Since sock_closed messages can be generated multiple times,
%% we can simply ignore errors of this type in the disconnected state.
%% e.g. when the socket send function returns an error, there is already
%% a tcp_closed delivered to the process mailbox
{ok, Channel}; {ok, Channel};
handle_info(clean_acl_cache, Channel) -> handle_info(clean_acl_cache, Channel) ->
@ -1501,6 +1518,15 @@ ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
clear_keepalive(Channel = #channel{timers = Timers}) ->
case maps:get(alive_timer, Timers, undefined) of
undefined ->
Channel;
TRef ->
emqx_misc:cancel_timer(TRef),
Channel#channel{timers = maps:without([alive_timer], Timers)}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Maybe Resume Session %% Maybe Resume Session

View File

@ -88,6 +88,8 @@
%% Batch drain %% Batch drain
-define(BATCH_SIZE, 100000). -define(BATCH_SIZE, 100000).
-define(T_TAKEOVER, 15000).
%% Server name %% Server name
-define(CM, ?MODULE). -define(CM, ?MODULE).
@ -222,7 +224,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
case takeover_session(ClientId) of case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} -> {ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session), ok = emqx_session:resume(ClientInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}), Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
register_channel_(ClientId, Self, ConnInfo), register_channel_(ClientId, Self, ConnInfo),
{ok, #{session => Session, {ok, #{session => Session,
present => true, present => true,
@ -264,7 +266,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined -> undefined ->
{error, not_found}; {error, not_found};
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
Session = ConnMod:call(ChanPid, {takeover, 'begin'}), Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
{ok, ConnMod, ChanPid, Session} {ok, ConnMod, ChanPid, Session}
end; end;
@ -277,24 +279,35 @@ discard_session(ClientId) when is_binary(ClientId) ->
case lookup_channels(ClientId) of case lookup_channels(ClientId) of
[] -> ok; [] -> ok;
ChanPids -> ChanPids ->
lists:foreach( lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids)
fun(ChanPid) -> end.
do_discard_session(ClientId, Pid) ->
try try
discard_session(ClientId, ChanPid) discard_session(ClientId, Pid)
catch catch
_:{noproc,_}:_Stk -> ok; _ : noproc -> % emqx_ws_connection: call
_:{{shutdown,_},_}:_Stk -> ok; ?LOG(debug, "session_already_gone: ~p", [Pid]),
_:Error:_Stk -> ok;
?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error]) _ : {noproc, _} -> % emqx_connection: gen_server:call
end ?LOG(debug, "session_already_gone: ~p", [Pid]),
end, ChanPids) ok;
_ : {'EXIT', {noproc, _}} -> % rpc_call/3
?LOG(debug, "session_already_gone: ~p", [Pid]),
ok;
_ : {{shutdown, _}, _} ->
?LOG(debug, "session_already_shutdown: ~p", [Pid]),
ok;
_ : Error : St ->
?LOG(debug, "failed_to_discard_session: ~p, "
"error: ~p, stacktrace: ~0p", [Pid, Error, St])
end. end.
discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of case get_chann_conn_mod(ClientId, ChanPid) of
undefined -> ok; undefined -> ok;
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
ConnMod:call(ChanPid, discard) ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
end; end;
discard_session(ClientId, ChanPid) -> discard_session(ClientId, ChanPid) ->
@ -317,7 +330,7 @@ kick_session(ClientId) ->
kick_session(ClientId, ChanPid) when node(ChanPid) == node() -> kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} -> #{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(ChanPid, kick); ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
undefined -> undefined ->
{error, not_found} {error, not_found}
end; end;
@ -361,7 +374,7 @@ lookup_channels(local, ClientId) ->
%% @private %% @private
rpc_call(Node, Fun, Args) -> rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of
{badrpc, Reason} -> error(Reason); {badrpc, Reason} -> error(Reason);
Res -> Res Res -> Res
end. end.

View File

@ -16,17 +16,16 @@
-module(emqx_congestion). -module(emqx_congestion).
-export([ maybe_alarm_port_busy/3 -export([ maybe_alarm_conn_congestion/3
, maybe_alarm_port_busy/4
, maybe_alarm_too_many_publish/5
, maybe_alarm_too_many_publish/6
, cancel_alarms/3 , cancel_alarms/3
]). ]).
-define(ALARM_CONN_CONGEST(Channel, Reason), -define(ALARM_CONN_CONGEST(Channel, Reason),
list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s/~s", [emqx_channel:info(clientid, Channel), list_to_binary(
maps:get(username, emqx_channel:info(clientinfo, Channel), <<"undefined">>), io_lib:format("~s/~s/~s",
Reason]))). [Reason, emqx_channel:info(clientid, Channel),
maps:get(username, emqx_channel:info(clientinfo, Channel),
<<"unknown_user">>)]))).
-define(ALARM_CONN_INFO_KEYS, [ -define(ALARM_CONN_INFO_KEYS, [
socktype, sockname, peername, clientid, username, proto_name, proto_ver, socktype, sockname, peername, clientid, username, proto_name, proto_ver,
@ -36,44 +35,28 @@
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). -define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]). -define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]).
-define(ALARM_SENT(REASON), {alarm_sent, REASON}). -define(ALARM_SENT(REASON), {alarm_sent, REASON}).
-define(ALL_ALARM_REASONS, [port_busy, too_many_publish]). -define(ALL_ALARM_REASONS, [conn_congestion]).
-define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}). -define(WONT_CLEAR_IN, 60000).
-define(CONFIRM_CLEAR_INTERVAL, 10000).
maybe_alarm_port_busy(Socket, Transport, Channel) -> maybe_alarm_conn_congestion(Socket, Transport, Channel) ->
maybe_alarm_port_busy(Socket, Transport, Channel, false). case is_alarm_enabled(Channel) of
false -> ok;
maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) -> true ->
case is_tcp_congested(Socket, Transport) of case is_tcp_congested(Socket, Transport) of
true -> alarm_congestion(Socket, Transport, Channel, port_busy); true -> alarm_congestion(Socket, Transport, Channel, conn_congestion);
false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy, false -> cancel_alarm_congestion(Socket, Transport, Channel, conn_congestion)
ForceClear) end
end. end.
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize) ->
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize, false).
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
PubMsgCount = _MaxBatchSize, _ForceClear) ->
%% we only alarm it when the process is "too busy"
alarm_congestion(Socket, Transport, Channel, too_many_publish);
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
_MaxBatchSize, ForceClear) when PubMsgCount == 0 ->
%% but we clear the alarm until it is really "idle", to avoid sending
%% alarms and clears too frequently
cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish,
ForceClear);
maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount,
_MaxBatchSize, _ForceClear) ->
ok.
cancel_alarms(Socket, Transport, Channel) -> cancel_alarms(Socket, Transport, Channel) ->
lists:foreach(fun(Reason) -> lists:foreach(fun(Reason) ->
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) do_cancel_alarm_congestion(Socket, Transport, Channel, Reason)
end, ?ALL_ALARM_REASONS). end, ?ALL_ALARM_REASONS).
is_alarm_enabled(Channel) ->
emqx_zone:get_env(emqx_channel:info(zone, Channel),
conn_congestion_alarm_enabled, false).
alarm_congestion(Socket, Transport, Channel, Reason) -> alarm_congestion(Socket, Transport, Channel, Reason) ->
case has_alarm_sent(Reason) of case has_alarm_sent(Reason) of
false -> do_alarm_congestion(Socket, Transport, Channel, Reason); false -> do_alarm_congestion(Socket, Transport, Channel, Reason);
@ -82,8 +65,11 @@ alarm_congestion(Socket, Transport, Channel, Reason) ->
update_alarm_sent_at(Reason) update_alarm_sent_at(Reason)
end. end.
cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) -> cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
case is_alarm_allowed_clear(Reason, ForceClear) of Zone = emqx_channel:info(zone, Channel),
WontClearIn = emqx_zone:get_env(Zone, conn_congestion_min_alarm_sustain_duration,
?WONT_CLEAR_IN),
case has_alarm_sent(Reason) andalso long_time_since_last_alarm(Reason, WontClearIn) of
true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason); true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
false -> ok false -> ok
end. end.
@ -123,13 +109,11 @@ get_alarm_sent_at(Reason) ->
LastSentAt -> LastSentAt LastSentAt -> LastSentAt
end. end.
is_alarm_allowed_clear(Reason, _ForceClear = true) -> long_time_since_last_alarm(Reason, WontClearIn) ->
has_alarm_sent(Reason);
is_alarm_allowed_clear(Reason, _ForceClear = false) ->
%% only sent clears when the alarm was not triggered in the last %% only sent clears when the alarm was not triggered in the last
%% ?CONFIRM_CLEAR_INTERVAL time %% WontClearIn time
case timenow() - get_alarm_sent_at(Reason) of case timenow() - get_alarm_sent_at(Reason) of
Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true; Elapse when Elapse >= WontClearIn -> true;
_ -> false _ -> false
end. end.

View File

@ -38,7 +38,7 @@
, stats/1 , stats/1
]). ]).
-export([call/2]). -export([call/2, call/3]).
%% Callback %% Callback
-export([init/4]). -export([init/4]).
@ -168,7 +168,9 @@ stats(#state{transport = Transport,
lists:append([SockStats, ConnStats, ChanStats, ProcStats]). lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
call(Pid, Req) -> call(Pid, Req) ->
gen_server:call(Pid, Req, infinity). call(Pid, Req, infinity).
call(Pid, Req, Timeout) ->
gen_server:call(Pid, Req, Timeout).
stop(Pid) -> stop(Pid) ->
gen_server:stop(Pid). gen_server:stop(Pid).
@ -374,12 +376,8 @@ handle_msg({Passive, _Sock}, State)
handle_info(activate_socket, NState1); handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg}, handle_msg(Deliver = {deliver, _Topic, _Msg},
#state{active_n = MaxBatchSize, transport = Transport, #state{active_n = ActiveN} = State) ->
socket = Socket, channel = Channel} = State) -> Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
Delivers0 = emqx_misc:drain_deliver(MaxBatchSize),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
length(Delivers0), MaxBatchSize),
Delivers = [Deliver|Delivers0],
with_channel(handle_deliver, [Delivers], State); with_channel(handle_deliver, [Delivers], State);
%% Something sent %% Something sent
@ -548,12 +546,9 @@ handle_timeout(_TRef, limit_timeout, State) ->
}, },
handle_info(activate_socket, NState); handle_info(activate_socket, NState);
handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize, handle_timeout(_TRef, emit_stats, State = #state{channel = Channel, transport = Transport,
channel = Channel, transport = Transport, socket = Socket}) -> socket = Socket}) ->
{_, MsgQLen} = erlang:process_info(self(), message_queue_len), emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel, true),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
MsgQLen, MaxBatchSize, true),
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_stats(ClientId, stats(State)), emqx_cm:set_chan_stats(ClientId, stats(State)),
{ok, State#state{stats_timer = undefined}}; {ok, State#state{stats_timer = undefined}};
@ -666,7 +661,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
Oct = iolist_size(IoData), Oct = iolist_size(IoData),
ok = emqx_metrics:inc('bytes.sent', Oct), ok = emqx_metrics:inc('bytes.sent', Oct),
emqx_pd:inc_counter(outgoing_bytes, Oct), emqx_pd:inc_counter(outgoing_bytes, Oct),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel), emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
case Transport:async_send(Socket, IoData, [nosuspend]) of case Transport:async_send(Socket, IoData, [nosuspend]) of
ok -> ok; ok -> ok;
Error = {error, _Reason} -> Error = {error, _Reason} ->
@ -690,7 +685,10 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
end; end;
handle_info({sock_error, Reason}, State) -> handle_info({sock_error, Reason}, State) ->
Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]), case Reason =/= closed andalso Reason =/= einval of
true -> ?LOG(warning, "socket_error: ~p", [Reason]);
false -> ok
end,
handle_info({sock_closed, Reason}, close_socket(State)); handle_info({sock_closed, Reason}, close_socket(State));
handle_info(Info, State) -> handle_info(Info, State) ->

View File

@ -141,6 +141,9 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK... %% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
parse_frame(Rest, Header, 2, Options); parse_frame(Rest, Header, 2, Options);
parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options)
when Multiplier > 2097152 ->
error(malformed_variable_byte_integer);
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,

View File

@ -45,6 +45,8 @@
, index_of/2 , index_of/2
]). ]).
-define(OOM_FACTOR, 1.25).
%% @doc Merge options %% @doc Merge options
-spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()). -spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()).
merge_opts(Defaults, Options) -> merge_opts(Defaults, Options) ->
@ -185,8 +187,8 @@ do_check_oom([{Val, Max, Reason}|Rest]) ->
tune_heap_size(#{max_heap_size := MaxHeapSize}) -> tune_heap_size(#{max_heap_size := MaxHeapSize}) ->
%% If set to zero, the limit is disabled. %% If set to zero, the limit is disabled.
erlang:process_flag(max_heap_size, #{size => MaxHeapSize, erlang:process_flag(max_heap_size, #{size => must_kill_heap_size(MaxHeapSize),
kill => false, kill => true,
error_logger => true error_logger => true
}); });
tune_heap_size(undefined) -> ok. tune_heap_size(undefined) -> ok.
@ -233,3 +235,19 @@ index_of(E, I, [E|_]) ->
index_of(E, I, [_|L]) -> index_of(E, I, [_|L]) ->
index_of(E, I+1, L). index_of(E, I+1, L).
must_kill_heap_size(Size) ->
%% We set the max allowed heap size by `erlang:process_flag(max_heap_size, #{size => Size})`,
%% where the `Size` cannot be set to an integer lager than `(1 bsl 59) - 1` on a 64-bit system,
%% or `(1 bsl 27) - 1` on a 32-bit system.
MaxAllowedSize = case erlang:system_info(wordsize) of
8 -> % arch_64
(1 bsl 59) - 1;
4 -> % arch_32
(1 bsl 27) - 1
end,
%% We multiply the size with factor ?OOM_FACTOR, to give the
%% process a chance to suicide by `check_oom/1`
case ceil(Size * ?OOM_FACTOR) of
Size0 when Size0 >= MaxAllowedSize -> MaxAllowedSize;
Size0 -> Size0
end.

View File

@ -67,6 +67,9 @@
, dropped/1 , dropped/1
]). ]).
-export([ live_upgrade/1
]).
-export_type([mqueue/0, options/0]). -export_type([mqueue/0, options/0]).
-type(topic() :: emqx_topic:topic()). -type(topic() :: emqx_topic:topic()).
@ -91,6 +94,11 @@
-define(MAX_LEN_INFINITY, 0). -define(MAX_LEN_INFINITY, 0).
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]). -define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
-record(shift_opts, {
multiplier :: non_neg_integer(),
base :: integer()
}).
-record(mqueue, { -record(mqueue, {
store_qos0 = false :: boolean(), store_qos0 = false :: boolean(),
max_len = ?MAX_LEN_INFINITY :: count(), max_len = ?MAX_LEN_INFINITY :: count(),
@ -98,11 +106,16 @@
dropped = 0 :: count(), dropped = 0 :: count(),
p_table = ?NO_PRIORITY_TABLE :: p_table(), p_table = ?NO_PRIORITY_TABLE :: p_table(),
default_p = ?LOWEST_PRIORITY :: priority(), default_p = ?LOWEST_PRIORITY :: priority(),
q = ?PQUEUE:new() :: pq() q = ?PQUEUE:new() :: pq(),
shift_opts :: #shift_opts{},
last_p :: non_neg_integer() | undefined,
counter :: non_neg_integer() | undefined
}). }).
-type(mqueue() :: #mqueue{}). -type(mqueue() :: #mqueue{}).
-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}).
-spec(init(options()) -> mqueue()). -spec(init(options()) -> mqueue()).
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
@ -112,7 +125,8 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
#mqueue{max_len = MaxLen, #mqueue{max_len = MaxLen,
store_qos0 = QoS_0, store_qos0 = QoS_0,
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE), p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
default_p = get_priority_opt(Opts) default_p = get_priority_opt(Opts),
shift_opts = get_shift_opt(Opts)
}. }.
-spec(info(mqueue()) -> emqx_types:infos()). -spec(info(mqueue()) -> emqx_types:infos()).
@ -127,22 +141,30 @@ info(max_len, #mqueue{max_len = MaxLen}) ->
info(len, #mqueue{len = Len}) -> info(len, #mqueue{len = Len}) ->
Len; Len;
info(dropped, #mqueue{dropped = Dropped}) -> info(dropped, #mqueue{dropped = Dropped}) ->
Dropped. Dropped;
info(Info, ?OLD(MQ)) ->
info(Info, live_upgrade(MQ)).
is_empty(#mqueue{len = Len}) -> Len =:= 0. is_empty(#mqueue{len = Len}) -> Len =:= 0;
is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)).
len(#mqueue{len = Len}) -> Len. len(#mqueue{len = Len}) -> Len;
len(?OLD(MQ)) -> len(live_upgrade(MQ)).
max_len(#mqueue{max_len = MaxLen}) -> MaxLen. max_len(#mqueue{max_len = MaxLen}) -> MaxLen;
max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)).
%% @doc Return number of dropped messages. %% @doc Return number of dropped messages.
-spec(dropped(mqueue()) -> count()). -spec(dropped(mqueue()) -> count()).
dropped(#mqueue{dropped = Dropped}) -> Dropped. dropped(#mqueue{dropped = Dropped}) -> Dropped;
dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)).
%% @doc Stats of the mqueue %% @doc Stats of the mqueue
-spec(stats(mqueue()) -> [stat()]). -spec(stats(mqueue()) -> [stat()]).
stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) -> stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}]. [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}];
stats(?OLD(MQ)) ->
stats(live_upgrade(MQ)).
%% @doc Enqueue a message. %% @doc Enqueue a message.
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}). -spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
@ -165,15 +187,34 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}}; {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
false -> false ->
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}} {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
end. end;
in(Msg, ?OLD(MQ)) ->
in(Msg, live_upgrade(MQ)).
-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}). -spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
out(MQ = #mqueue{len = 0, q = Q}) -> out(MQ = #mqueue{len = 0, q = Q}) ->
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap 0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
{empty, MQ}; {empty, MQ};
out(MQ = #mqueue{q = Q, len = Len}) -> out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) ->
{{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length
MQ1 = MQ#mqueue{
q = Q1,
len = Len - 1,
last_p = Prio,
counter = init_counter(Prio, ShiftOpts)
},
{{value, Val}, MQ1};
out(MQ = #mqueue{q = Q, counter = 0}) ->
MQ1 = MQ#mqueue{
q = ?PQUEUE:shift(Q),
last_p = undefined
},
out(MQ1);
out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) ->
{R, Q1} = ?PQUEUE:out(Q), {R, Q1} = ?PQUEUE:out(Q),
{R, MQ#mqueue{q = Q1, len = Len - 1}}. {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}};
out(?OLD(MQ)) ->
out(live_upgrade(MQ)).
get_opt(Key, Opts, Default) -> get_opt(Key, Opts, Default) ->
case maps:get(Key, Opts, Default) of case maps:get(Key, Opts, Default) of
@ -194,3 +235,46 @@ get_priority_opt(Opts) ->
%% while the highest 'infinity' is a [{infinity, queue:queue()}] %% while the highest 'infinity' is a [{infinity, queue:queue()}]
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY; get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp). get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
init_counter(?HIGHEST_PRIORITY, Opts) ->
Infinity = 1000000,
init_counter(Infinity, Opts);
init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
(Prio + Base) * Mult.
get_shift_opt(Opts) ->
Mult = maps:get(shift_multiplier, Opts, 10),
Min = case Opts of
#{p_table := PTab} ->
case maps:size(PTab) of
0 -> 0;
_ -> lists:min(maps:values(PTab))
end;
_ ->
?LOWEST_PRIORITY
end,
Base = case Min < 0 of
true -> -Min;
false -> 0
end,
#shift_opts{
multiplier = Mult,
base = Base
}.
live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) ->
ShiftOpts = case is_map(PTable) of
true -> get_shift_opt(#{p_table => PTable});
false -> get_shift_opt(#{})
end,
#mqueue{ store_qos0 = StoreQos0
, max_len = MaxLen
, dropped = Dropped
, p_table = PTable
, default_p = DefaultP
, len = Len
, q = Q
, shift_opts = ShiftOpts
, last_p = undefined
, counter = undefined
}.

View File

@ -55,6 +55,7 @@
, filter/2 , filter/2
, fold/3 , fold/3
, highest/1 , highest/1
, shift/1
]). ]).
-export_type([q/0]). -export_type([q/0]).
@ -170,6 +171,14 @@ out({pqueue, [{P, Q} | Queues]}) ->
end, end,
{R, NewQ}. {R, NewQ}.
-spec(shift(pqueue()) -> pqueue()).
shift(Q = {queue, _, _, _}) ->
Q;
shift({pqueue, []}) ->
{pqueue, []}; %% Shouldn't happen?
shift({pqueue, [Hd|Rest]}) ->
{pqueue, Rest ++ [Hd]}. %% Let's hope there are not many priorities.
-spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}). -spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0); out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)). out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
@ -266,4 +275,3 @@ r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
maybe_negate_priority(infinity) -> infinity; maybe_negate_priority(infinity) -> infinity;
maybe_negate_priority(P) -> -P. maybe_negate_priority(P) -> -P.

View File

@ -118,7 +118,8 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
false -> false ->
ok = emqx_router_helper:monitor(Dest), ok = emqx_router_helper:monitor(Dest),
case emqx_topic:wildcard(Topic) of case emqx_topic:wildcard(Topic) of
true -> trans(fun insert_trie_route/1, [Route]); true ->
maybe_trans(fun insert_trie_route/1, [Route]);
false -> insert_direct_route(Route) false -> insert_direct_route(Route)
end end
end. end.
@ -164,7 +165,8 @@ do_delete_route(Topic) when is_binary(Topic) ->
do_delete_route(Topic, Dest) -> do_delete_route(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest}, Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of case emqx_topic:wildcard(Topic) of
true -> trans(fun delete_trie_route/1, [Route]); true ->
maybe_trans(fun delete_trie_route/1, [Route]);
false -> delete_direct_route(Route) false -> delete_direct_route(Route)
end. end.
@ -247,10 +249,59 @@ delete_trie_route(Route = #route{topic = Topic}) ->
end. end.
%% @private %% @private
-spec(trans(function(), list(any())) -> ok | {error, term()}). -spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) -> maybe_trans(Fun, Args) ->
case mnesia:transaction(Fun, Args) of case persistent_term:get(emqx_route_lock_type, key) of
{atomic, Ok} -> Ok; key ->
{aborted, Reason} -> {error, Reason} trans(Fun, Args);
global ->
lock_router(),
try mnesia:sync_dirty(Fun, Args)
after
unlock_router()
end;
tab ->
trans(fun() ->
emqx_trie:lock_tables(),
apply(Fun, Args)
end, [])
end. end.
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
%% trigger selective receive optimization of compiler,
%% ideal for handling bursty traffic.
Ref = erlang:make_ref(),
Owner = self(),
{WPid, RefMon} = spawn_monitor(
fun() ->
Res = case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
Owner ! {Ref, Res}
end),
receive
{Ref, TransRes} ->
receive
{'DOWN', RefMon, process, WPid, normal} -> ok
end,
TransRes;
{'DOWN', RefMon, process, WPid, Info} ->
{error, {trans_crash, Info}}
end.
lock_router() ->
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
false ->
%% Force to sleep 1ms instead.
timer:sleep(1),
lock_router();
true ->
ok
end.
unlock_router() ->
global:del_lock({?MODULE, self()}).

View File

@ -34,6 +34,10 @@ init([]) ->
type => worker, type => worker,
modules => [emqx_router_helper]}, modules => [emqx_router_helper]},
ok = persistent_term:put(emqx_route_lock_type,
application:get_env(emqx, route_lock_type, key)
),
%% Router pool %% Router pool
RouterPool = emqx_pool_sup:spec([router_pool, hash, RouterPool = emqx_pool_sup:spec([router_pool, hash,
{emqx_router, start_link, []}]), {emqx_router, start_link, []}]),

View File

@ -75,6 +75,7 @@
-export([ deliver/2 -export([ deliver/2
, enqueue/2 , enqueue/2
, dequeue/1
, retry/1 , retry/1
, terminate/3 , terminate/3
]). ]).

View File

@ -31,7 +31,9 @@
, delete/1 , delete/1
]). ]).
-export([empty/0]). -export([ empty/0
, lock_tables/0
]).
-ifdef(TEST). -ifdef(TEST).
-compile(export_all). -compile(export_all).
@ -120,6 +122,11 @@ delete(Topic) when is_binary(Topic) ->
empty() -> empty() ->
ets:info(?TRIE_TAB, size) == 0. ets:info(?TRIE_TAB, size) == 0.
-spec lock_tables() -> ok.
lock_tables() ->
mnesia:write_lock_table(?TRIE_TAB),
mnesia:write_lock_table(?TRIE_NODE_TAB).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -34,7 +34,7 @@
, stats/1 , stats/1
]). ]).
-export([call/2]). -export([call/2, call/3]).
%% WebSocket callbacks %% WebSocket callbacks
-export([ init/2 -export([ init/2
@ -151,7 +151,10 @@ stats(#state{channel = Channel}) ->
%% kick|discard|takeover %% kick|discard|takeover
-spec(call(pid(), Req :: term()) -> Reply :: term()). -spec(call(pid(), Req :: term()) -> Reply :: term()).
call(WsPid, Req) when is_pid(WsPid) -> call(WsPid, Req) ->
call(WsPid, Req, 5000).
call(WsPid, Req, Timeout) when is_pid(WsPid) ->
Mref = erlang:monitor(process, WsPid), Mref = erlang:monitor(process, WsPid),
WsPid ! {call, {self(), Mref}, Req}, WsPid ! {call, {self(), Mref}, Req},
receive receive
@ -160,7 +163,7 @@ call(WsPid, Req) when is_pid(WsPid) ->
Reply; Reply;
{'DOWN', Mref, _, _, Reason} -> {'DOWN', Mref, _, _, Reason} ->
exit(Reason) exit(Reason)
after 5000 -> after Timeout ->
erlang:demonitor(Mref, [flush]), erlang:demonitor(Mref, [flush]),
exit(timeout) exit(timeout)
end. end.
@ -196,15 +199,21 @@ init(Req, Opts) ->
end. end.
websocket_init([Req, Opts]) -> websocket_init([Req, Opts]) ->
Peername = case proplists:get_bool(proxy_protocol, Opts) {Peername, Peercert} =
case proplists:get_bool(proxy_protocol, Opts)
andalso maps:get(proxy_header, Req) of andalso maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort} -> #{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
{SrcAddr, SrcPort}; ProxyName = {SrcAddr, SrcPort},
%% Notice: Only CN is available in Proxy Protocol V2 additional info
ProxySSL = case maps:get(cn, SSL, undefined) of
undeined -> nossl;
CN -> [{pp2_ssl_cn, CN}]
end,
{ProxyName, ProxySSL};
_ -> _ ->
cowboy_req:peer(Req) {cowboy_req:peer(Req), cowboy_req:cert(Req)}
end, end,
Sockname = cowboy_req:sock(Req), Sockname = cowboy_req:sock(Req),
Peercert = cowboy_req:cert(Req),
WsCookie = try cowboy_req:parse_cookies(Req) WsCookie = try cowboy_req:parse_cookies(Req)
catch catch
error:badarg -> error:badarg ->

View File

@ -77,7 +77,7 @@ t_get_set_chan_stats(_) ->
t_open_session(_) -> t_open_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]), ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
ClientInfo = #{zone => external, ClientInfo = #{zone => external,
clientid => <<"clientid">>, clientid => <<"clientid">>,
@ -153,14 +153,14 @@ t_open_session_race_condition(_) ->
t_discard_session(_) -> t_discard_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]), ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:discard_session(<<"clientid">>),
ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end), ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end),
ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = meck:unload(emqx_connection). ok = meck:unload(emqx_connection).
@ -180,7 +180,7 @@ t_takeover_session(_) ->
t_kick_session(_) -> t_kick_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]), ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end), ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end),
{error, not_found} = emqx_cm:kick_session(<<"clientid">>), {error, not_found} = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
test = emqx_cm:kick_session(<<"clientid">>), test = emqx_cm:kick_session(<<"clientid">>),

View File

@ -42,7 +42,8 @@ all() ->
groups() -> groups() ->
[{parse, [parallel], [{parse, [parallel],
[t_parse_cont, [t_parse_cont,
t_parse_frame_too_large t_parse_frame_too_large,
t_parse_frame_malformed_variable_byte_integer
]}, ]},
{connect, [parallel], {connect, [parallel],
[t_serialize_parse_v3_connect, [t_serialize_parse_v3_connect,
@ -129,6 +130,12 @@ t_parse_frame_too_large(_) ->
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})), ?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})). ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
t_parse_frame_malformed_variable_byte_integer(_) ->
MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 4) >>,
ParseState = emqx_frame:initial_parse_state(#{}),
?catch_error(malformed_variable_byte_integer,
emqx_frame:parse(MalformedPayload, ParseState)).
t_serialize_parse_v3_connect(_) -> t_serialize_parse_v3_connect(_) ->
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108, 113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,

View File

@ -270,7 +270,7 @@ t_connect_limit_timeout(_) ->
meck:unload(proplists). meck:unload(proplists).
t_connect_emit_stats_timeout(_) -> t_connect_emit_stats_timeout(_) ->
IdleTimeout = 2000, IdleTimeout = 2000 + 200,
emqx_zone:set_env(external, idle_timeout, IdleTimeout), emqx_zone:set_env(external, idle_timeout, IdleTimeout),
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]), {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]),
@ -278,7 +278,7 @@ t_connect_emit_stats_timeout(_) ->
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)), [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))), ?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))),
timer:sleep(IdleTimeout), timer:sleep(IdleTimeout+500),
?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))), ?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))),
ok = emqtt:disconnect(Client). ok = emqtt:disconnect(Client).