Compare commits

...

27 Commits

Author SHA1 Message Date
JianBo He 933077682f test: increase waiting time to avoid test failure 2021-09-26 18:40:29 +08:00
JianBo He e3af52142b chore: export keepalive funcs for mqtt-sn 2021-09-26 18:25:51 +08:00
JianBo He 18bb341a39 fix: more safe session call 2021-09-26 18:25:48 +08:00
JianBo He 9dbbc0abc8 feat: acl.conf support ipaddrs 2021-09-24 22:32:17 +08:00
JianBo He b1dc2a742d chore: add mqtt-sn protocol define 2021-09-24 22:29:35 +08:00
Zaiming (Stone) Shi 4c30c50490
Merge pull request #5799 from emqx/merge-e4.3.4
feat(mqueue): Interleave messages with different priorities
2021-09-24 11:52:00 +02:00
Turtle 903a9e57a8 feat(mqueue): Interleave messages with different priorities 2021-09-24 12:51:28 +08:00
William Yang ca9b90be9a perf(broker): speedup trans when broker has a big mqueue 2021-04-29 17:10:10 +08:00
William Yang e02704e05a perf(broker): Optimization for handling bursty traffic
intro. new lock type: 'spawn' of broker.perf.route_lock_type

mnesia get lock calls are not optimized for selective receive.

hence taking locks would be very expensive while there are tones of
messages in the brokers message queue.

This optimization run the transaction in a separate process to utilize
the selective receive optimization of the compiler.
2021-04-29 17:10:10 +08:00
William Yang 5cc21dc3f0 fix(upgrade): duplicated modules loading. 2021-04-28 17:01:54 +02:00
William Yang 806cd33d1f chore(ct): fix a testcase timing. 2021-04-28 17:01:54 +02:00
William Yang f8ca066d74 upgrade: safe upgrade of emqx_connection 2021-04-28 21:29:20 +08:00
William Yang 59b7e8e3ba perf(lock): upgrade code for new lock types. 2021-04-28 21:29:20 +08:00
William Yang 571219f073 perf: new perf toggle broker.perf.route_lock_type 2021-04-28 21:29:20 +08:00
William Yang 9084554f4f fix: broker call should not timeout before client timeout
So change broker call timeout to infinity.
2021-04-28 21:29:20 +08:00
William Yang 1b179c8e2a perf(router): add route runs in async dirty context 2021-04-28 21:29:20 +08:00
William Yang fb8f5b79ad perf(trie): use global lock
Use global lock to reduce remote lock overhead.

So that emqx route trans can run in dirty *sync* context.

At least 10X subscribe/unsubscribe improvments.
2021-04-28 21:29:20 +08:00
JianBo He e427b0ff75 chore(appup): supply appup instructions for ws-connetion 2021-04-28 21:29:20 +08:00
Zaiming Shi b4e3c32e24 chore(conf): log only to file by default 2021-04-28 21:29:20 +08:00
Zaiming Shi c511e324bb fix(ekka): allow remote_console 2021-04-28 21:29:20 +08:00
zhanghongtong 1eb0f7b3b2 fix(ws connection): fix peer_cert_as_username error when ws connect 2021-04-28 21:29:20 +08:00
Shawn 7d003e0bfc Porting code for congestion alarms from 4.3.0 to e4.2.6 (#4523)
* fix(congestion): port some code from 4.3.0
* chore(emqx): update the appup file
2021-04-28 21:29:20 +08:00
Shawn 927264d793 fix(emqx): export do_deliver/2 for emqx_sn 2021-04-28 21:29:20 +08:00
Shawn 70da59c3bb chore(emqx): update appup 2021-04-28 21:29:20 +08:00
Shawn c69e1c6222 fix(mqtt-sn): sleep mode not working #4434 2021-04-28 21:29:20 +08:00
Shawn 6666210211 fix(congestion): alarm and clear congestion too frequetly 2021-04-28 21:29:20 +08:00
Shawn 238eaa8e40 fix(emqx): validate mqtt malformed variable byte integer 2021-03-10 19:42:44 +08:00
26 changed files with 515 additions and 129 deletions

View File

@ -296,6 +296,16 @@ broker.shared_dispatch_ack_enabled = false
## Value: Flag
broker.route_batch_clean = off
## Performance toggle for subscribe/unsubscribe wildcard topic.
## Change this toggle only when there are many wildcard topics.
## Value: Enum
## - key: mnesia translational updates with per-key locks. recommended for single node setup.
## - tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
## - global: global lock protected updates. recommended for larger cluster.
## NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
## to be stopped before the change.
broker.perf.route_lock_type = key
##-------------------------------------------------------------------
## Plugins
##-------------------------------------------------------------------

View File

@ -544,6 +544,16 @@ listener.ws.external.verify_protocol_header = on
## Value: Duration
## listener.ws.external.proxy_protocol_timeout = 3s
## See: listener.ssl.$name.peer_cert_as_username
##
## Value: cn
## listener.ws.external.peer_cert_as_username = cn
## See: listener.ssl.$name.peer_cert_as_clientid
##
## Value: cn
## listener.ws.external.peer_cert_as_clientid = cn
## The TCP backlog of external MQTT/WebSocket Listener.
##
## See: listener.ws.$name.backlog

View File

@ -10,7 +10,7 @@
## - file: write logs only to file
## - console: write logs only to standard I/O
## - both: write logs both to file and standard I/O
log.to = both
log.to = file
## The log severity level.
##
@ -167,4 +167,4 @@ log.rotation.count = 5
## Value: MaxBurstCount,TimeWindow
## Default: disabled
##
#log.burst_limit = 20000, 1s
#log.burst_limit = 20000, 1s

View File

@ -184,6 +184,30 @@ zone.external.enable_flapping_detect = off
## Example: 100KB incoming per 10 seconds.
#zone.external.rate_limit.conn_bytes_in = 100KB,10s
## Whether to alarm the congested connections.
##
## Sometimes the mqtt connection (usually an MQTT subscriber) may get "congested" because
## there're too many packets to sent. The socket trys to buffer the packets until the buffer is
## full. If more packets comes after that, the packets will be "pending" in a queue
## and we consider the connection is "congested".
##
## Enable this to send an alarm when there's any bytes pending in the queue. You could set
## the `listener.tcp.<ZoneName>.sndbuf` to a larger value if the alarm is triggered too often.
##
## The name of the alarm is of format "conn_congestion/<ClientID>/<Username>".
## Where the <ClientID> is the client-id of the congested MQTT connection.
## And the <Username> is the username or "unknown_user" of not provided by the client.
## Default: off
#zone.external.conn_congestion.alarm = off
## Won't clear the congested alarm in how long time.
## The alarm is cleared only when there're no pending bytes in the queue, and also it has been
## `min_alarm_sustain_duration` time since the last time we considered the connection is "congested".
##
## This is to avoid clearing and sending the alarm again too often.
## Default: 1m
#zone.external.conn_congestion.min_alarm_sustain_duration = 1m
## Messages quota for the each of external MQTT connection.
## This value consumed by the number of recipient on a message.
##

View File

@ -30,11 +30,13 @@
%% MQTT Protocol Version and Names
%%--------------------------------------------------------------------
-define(MQTT_SN_PROTO_V1, 1).
-define(MQTT_PROTO_V3, 3).
-define(MQTT_PROTO_V4, 4).
-define(MQTT_PROTO_V5, 5).
-define(PROTOCOL_NAMES, [
{?MQTT_SN_PROTO_V1, <<"MQTT-SN">>}, %% XXX:Compatible with emqx-sn plug-in
{?MQTT_PROTO_V3, <<"MQIsdp">>},
{?MQTT_PROTO_V4, <<"MQTT">>},
{?MQTT_PROTO_V5, <<"MQTT">>}]).

View File

@ -999,6 +999,16 @@ end}.
{datatype, string}
]}.
{mapping, "zone.$name.conn_congestion.alarm", "emqx.zones", [
{datatype, flag},
{default, off}
]}.
{mapping, "zone.$name.conn_congestion.min_alarm_sustain_duration", "emqx.zones", [
{default, "1m"},
{datatype, {duration, ms}}
]}.
{mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [
{datatype, string}
]}.
@ -1128,6 +1138,10 @@ end}.
{ratelimit, {conn_messages_in, Ratelimit(Val)}};
(["rate_limit", "conn_bytes_in"], Val) ->
{ratelimit, {conn_bytes_in, Ratelimit(Val)}};
(["conn_congestion", "alarm"], Val) ->
{conn_congestion_alarm_enabled, Val};
(["conn_congestion", "min_alarm_sustain_duration"], Val) ->
{conn_congestion_min_alarm_sustain_duration, Val};
(["quota", "conn_messages_routing"], Val) ->
{quota, {conn_messages_routing, Ratelimit(Val)}};
(["quota", "overall_messages_routing"], Val) ->
@ -1573,7 +1587,11 @@ end}.
]}.
{mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [
{datatype, {enum, [cn, dn, crt]}}
{datatype, {enum, [cn]}}
]}.
{mapping, "listener.ws.$name.peer_cert_as_clientid", "emqx.listeners", [
{datatype, {enum, [cn]}}
]}.
%%--------------------------------------------------------------------
@ -2008,6 +2026,18 @@ end}.
{datatype, flag}
]}.
%% @doc Performance toggle for subscribe/unsubscribe wildcard topic.
%% Change this toggle only when there are many wildcard topics.
%% key: mnesia translational updates with per-key locks. recommended for single node setup.
%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
%% global: global lock protected updates. recommended for larger cluster.
%% NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
%%
{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [
{default, key},
{datatype, {enum, [key, tab, global]}}
]}.
%%--------------------------------------------------------------------
%% System Monitor
%%--------------------------------------------------------------------

View File

@ -7,7 +7,7 @@
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.5"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.6"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}.

View File

@ -6,6 +6,7 @@
{add_module, emqx_congestion},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
@ -14,12 +15,19 @@
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]}
{resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]},
{"4.2.1", [
{add_module, emqx_congestion},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
@ -27,15 +35,63 @@
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]}
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]},
{<<"4.2.[23]">>, [
{add_module, emqx_congestion},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []}
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]},
{<<"4.2.4">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]},
{<<"4.2.5">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]},
{<<"4.2.[6-7]">>, [
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]},
{<<".*">>, []}
],
@ -51,8 +107,15 @@
{update, emqx_connection, {advanced, []}},
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{delete_module, emqx_congestion}
{delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]},
{"4.2.1", [
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
@ -60,20 +123,69 @@
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{delete_module, emqx_congestion}
{delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]},
{<<"4.2.[23]">>, [
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{delete_module, emqx_congestion}
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]},
{<<"4.2.4">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]},
{<<"4.2.5">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]},
{<<"4.2.[6-7]">>, [
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]},
{<<".*">>, []}
]

View File

@ -28,7 +28,8 @@
-type(who() :: all | binary() |
{client, binary()} |
{user, binary()} |
{ipaddr, esockd_cidr:cidr_string()}).
{ipaddr, esockd_cidr:cidr_string()} |
{ipaddrs, list(esockd_cidr:cidr_string())}).
-type(access() :: subscribe | publish | pubsub).
@ -52,6 +53,8 @@ compile(who, all) ->
all;
compile(who, {ipaddr, CIDR}) ->
{ipaddr, esockd_cidr:parse(CIDR, true)};
compile(who, {ipaddrs, CIDRs}) ->
{ipaddrs, lists:map(fun(CIDR) -> esockd_cidr:parse(CIDR, true) end, CIDRs)};
compile(who, {client, all}) ->
{client, all};
compile(who, {client, ClientId}) ->
@ -108,8 +111,14 @@ match_who(#{username := Username}, {user, Username}) ->
true;
match_who(#{peerhost := undefined}, {ipaddr, _Tup}) ->
false;
match_who(#{peerhost := undefined}, {ipaddrs, _}) ->
false;
match_who(#{peerhost := IP}, {ipaddr, CIDR}) ->
esockd_cidr:match(IP, CIDR);
match_who(#{peerhost := IP}, {ipaddrs, CIDRs}) ->
lists:any(fun(CIDR) ->
esockd_cidr:match(IP, CIDR)
end, CIDRs);
match_who(ClientInfo, {'and', Conds}) when is_list(Conds) ->
lists:foldl(fun(Who, Allow) ->
match_who(ClientInfo, Who) andalso Allow

View File

@ -361,7 +361,7 @@ normalize_message(partition, #{occurred := Node}) ->
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) ->
list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info]));
normalize_message(<<"conn_congestion/", Info/binary>>, _) ->
list_to_binary(io_lib:format("connection congested: ~s", [Info]));
normalize_message(_Name, _UnknownDetails) ->
<<"Unknown alarm">>.

View File

@ -419,7 +419,7 @@ safe_update_stats(Tab, Stat, MaxStat) ->
-compile({inline, [call/2, cast/2, pick/1]}).
call(Broker, Req) ->
gen_server:call(Broker, Req).
gen_server:call(Broker, Req, infinity).
cast(Broker, Msg) ->
gen_server:cast(Broker, Msg).

View File

@ -32,6 +32,8 @@
-export([ info/1
, info/2
, set_conn_state/2
, get_session/1
, set_session/2
, stats/1
, caps/1
]).
@ -46,6 +48,12 @@
, terminate/2
]).
%% Export for emqx_sn
-export([ do_deliver/2
, ensure_keepalive/2
, clear_keepalive/1
]).
%% Exports for CT
-export([set_field/3]).
@ -167,6 +175,12 @@ info(timers, #channel{timers = Timers}) -> Timers.
set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}.
get_session(#channel{session = Session}) ->
Session.
set_session(Session, Channel) ->
Channel#channel{session = Session}.
%% TODO: Add more stats.
-spec(stats(channel()) -> emqx_types:stats()).
stats(#channel{session = Session})->
@ -1501,6 +1515,15 @@ ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
clear_keepalive(Channel = #channel{timers = Timers}) ->
case maps:get(alive_timer, Timers, undefined) of
undefined ->
Channel;
TRef ->
emqx_misc:cancel_timer(TRef),
Channel#channel{timers = maps:without([alive_timer], Timers)}
end.
%%--------------------------------------------------------------------
%% Maybe Resume Session

View File

@ -88,6 +88,8 @@
%% Batch drain
-define(BATCH_SIZE, 100000).
-define(T_TAKEOVER, 15000).
%% Server name
-define(CM, ?MODULE).
@ -222,7 +224,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
register_channel_(ClientId, Self, ConnInfo),
{ok, #{session => Session,
present => true,
@ -264,7 +266,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined ->
{error, not_found};
ConnMod when is_atom(ConnMod) ->
Session = ConnMod:call(ChanPid, {takeover, 'begin'}),
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
{ok, ConnMod, ChanPid, Session}
end;
@ -277,24 +279,35 @@ discard_session(ClientId) when is_binary(ClientId) ->
case lookup_channels(ClientId) of
[] -> ok;
ChanPids ->
lists:foreach(
fun(ChanPid) ->
try
discard_session(ClientId, ChanPid)
catch
_:{noproc,_}:_Stk -> ok;
_:{{shutdown,_},_}:_Stk -> ok;
_:Error:_Stk ->
?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error])
end
end, ChanPids)
lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids)
end.
do_discard_session(ClientId, Pid) ->
try
discard_session(ClientId, Pid)
catch
_ : noproc -> % emqx_ws_connection: call
?LOG(debug, "session_already_gone: ~p", [Pid]),
ok;
_ : {noproc, _} -> % emqx_connection: gen_server:call
?LOG(debug, "session_already_gone: ~p", [Pid]),
ok;
_ : {'EXIT', {noproc, _}} -> % rpc_call/3
?LOG(debug, "session_already_gone: ~p", [Pid]),
ok;
_ : {{shutdown, _}, _} ->
?LOG(debug, "session_already_shutdown: ~p", [Pid]),
ok;
_ : Error : St ->
?LOG(debug, "failed_to_discard_session: ~p, "
"error: ~p, stacktrace: ~0p", [Pid, Error, St])
end.
discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of
undefined -> ok;
ConnMod when is_atom(ConnMod) ->
ConnMod:call(ChanPid, discard)
ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
end;
discard_session(ClientId, ChanPid) ->
@ -317,7 +330,7 @@ kick_session(ClientId) ->
kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(ChanPid, kick);
ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
undefined ->
{error, not_found}
end;
@ -361,7 +374,7 @@ lookup_channels(local, ClientId) ->
%% @private
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of
case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of
{badrpc, Reason} -> error(Reason);
Res -> Res
end.

View File

@ -16,17 +16,16 @@
-module(emqx_congestion).
-export([ maybe_alarm_port_busy/3
, maybe_alarm_port_busy/4
, maybe_alarm_too_many_publish/5
, maybe_alarm_too_many_publish/6
-export([ maybe_alarm_conn_congestion/3
, cancel_alarms/3
]).
-define(ALARM_CONN_CONGEST(Channel, Reason),
list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s/~s", [emqx_channel:info(clientid, Channel),
maps:get(username, emqx_channel:info(clientinfo, Channel), <<"undefined">>),
Reason]))).
list_to_binary(
io_lib:format("~s/~s/~s",
[Reason, emqx_channel:info(clientid, Channel),
maps:get(username, emqx_channel:info(clientinfo, Channel),
<<"unknown_user">>)]))).
-define(ALARM_CONN_INFO_KEYS, [
socktype, sockname, peername, clientid, username, proto_name, proto_ver,
@ -36,44 +35,28 @@
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]).
-define(ALARM_SENT(REASON), {alarm_sent, REASON}).
-define(ALL_ALARM_REASONS, [port_busy, too_many_publish]).
-define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}).
-define(CONFIRM_CLEAR_INTERVAL, 10000).
-define(ALL_ALARM_REASONS, [conn_congestion]).
-define(WONT_CLEAR_IN, 60000).
maybe_alarm_port_busy(Socket, Transport, Channel) ->
maybe_alarm_port_busy(Socket, Transport, Channel, false).
maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) ->
case is_tcp_congested(Socket, Transport) of
true -> alarm_congestion(Socket, Transport, Channel, port_busy);
false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy,
ForceClear)
maybe_alarm_conn_congestion(Socket, Transport, Channel) ->
case is_alarm_enabled(Channel) of
false -> ok;
true ->
case is_tcp_congested(Socket, Transport) of
true -> alarm_congestion(Socket, Transport, Channel, conn_congestion);
false -> cancel_alarm_congestion(Socket, Transport, Channel, conn_congestion)
end
end.
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize) ->
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize, false).
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
PubMsgCount = _MaxBatchSize, _ForceClear) ->
%% we only alarm it when the process is "too busy"
alarm_congestion(Socket, Transport, Channel, too_many_publish);
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
_MaxBatchSize, ForceClear) when PubMsgCount == 0 ->
%% but we clear the alarm until it is really "idle", to avoid sending
%% alarms and clears too frequently
cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish,
ForceClear);
maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount,
_MaxBatchSize, _ForceClear) ->
ok.
cancel_alarms(Socket, Transport, Channel) ->
lists:foreach(fun(Reason) ->
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason)
end, ?ALL_ALARM_REASONS).
is_alarm_enabled(Channel) ->
emqx_zone:get_env(emqx_channel:info(zone, Channel),
conn_congestion_alarm_enabled, false).
alarm_congestion(Socket, Transport, Channel, Reason) ->
case has_alarm_sent(Reason) of
false -> do_alarm_congestion(Socket, Transport, Channel, Reason);
@ -82,8 +65,11 @@ alarm_congestion(Socket, Transport, Channel, Reason) ->
update_alarm_sent_at(Reason)
end.
cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) ->
case is_alarm_allowed_clear(Reason, ForceClear) of
cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
Zone = emqx_channel:info(zone, Channel),
WontClearIn = emqx_zone:get_env(Zone, conn_congestion_min_alarm_sustain_duration,
?WONT_CLEAR_IN),
case has_alarm_sent(Reason) andalso long_time_since_last_alarm(Reason, WontClearIn) of
true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
false -> ok
end.
@ -123,13 +109,11 @@ get_alarm_sent_at(Reason) ->
LastSentAt -> LastSentAt
end.
is_alarm_allowed_clear(Reason, _ForceClear = true) ->
has_alarm_sent(Reason);
is_alarm_allowed_clear(Reason, _ForceClear = false) ->
long_time_since_last_alarm(Reason, WontClearIn) ->
%% only sent clears when the alarm was not triggered in the last
%% ?CONFIRM_CLEAR_INTERVAL time
%% WontClearIn time
case timenow() - get_alarm_sent_at(Reason) of
Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true;
Elapse when Elapse >= WontClearIn -> true;
_ -> false
end.

View File

@ -38,7 +38,7 @@
, stats/1
]).
-export([call/2]).
-export([call/2, call/3]).
%% Callback
-export([init/4]).
@ -168,7 +168,9 @@ stats(#state{transport = Transport,
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
call(Pid, Req) ->
gen_server:call(Pid, Req, infinity).
call(Pid, Req, infinity).
call(Pid, Req, Timeout) ->
gen_server:call(Pid, Req, Timeout).
stop(Pid) ->
gen_server:stop(Pid).
@ -374,12 +376,8 @@ handle_msg({Passive, _Sock}, State)
handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg},
#state{active_n = MaxBatchSize, transport = Transport,
socket = Socket, channel = Channel} = State) ->
Delivers0 = emqx_misc:drain_deliver(MaxBatchSize),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
length(Delivers0), MaxBatchSize),
Delivers = [Deliver|Delivers0],
#state{active_n = ActiveN} = State) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
@ -548,12 +546,9 @@ handle_timeout(_TRef, limit_timeout, State) ->
},
handle_info(activate_socket, NState);
handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize,
channel = Channel, transport = Transport, socket = Socket}) ->
{_, MsgQLen} = erlang:process_info(self(), message_queue_len),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel, true),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
MsgQLen, MaxBatchSize, true),
handle_timeout(_TRef, emit_stats, State = #state{channel = Channel, transport = Transport,
socket = Socket}) ->
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_stats(ClientId, stats(State)),
{ok, State#state{stats_timer = undefined}};
@ -666,7 +661,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
Oct = iolist_size(IoData),
ok = emqx_metrics:inc('bytes.sent', Oct),
emqx_pd:inc_counter(outgoing_bytes, Oct),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel),
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
case Transport:async_send(Socket, IoData, [nosuspend]) of
ok -> ok;
Error = {error, _Reason} ->

View File

@ -141,6 +141,9 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
parse_frame(Rest, Header, 2, Options);
parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options)
when Multiplier > 2097152 ->
error(malformed_variable_byte_integer);
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,

View File

@ -67,6 +67,9 @@
, dropped/1
]).
-export([ live_upgrade/1
]).
-export_type([mqueue/0, options/0]).
-type(topic() :: emqx_topic:topic()).
@ -91,6 +94,11 @@
-define(MAX_LEN_INFINITY, 0).
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
-record(shift_opts, {
multiplier :: non_neg_integer(),
base :: integer()
}).
-record(mqueue, {
store_qos0 = false :: boolean(),
max_len = ?MAX_LEN_INFINITY :: count(),
@ -98,11 +106,16 @@
dropped = 0 :: count(),
p_table = ?NO_PRIORITY_TABLE :: p_table(),
default_p = ?LOWEST_PRIORITY :: priority(),
q = ?PQUEUE:new() :: pq()
q = ?PQUEUE:new() :: pq(),
shift_opts :: #shift_opts{},
last_p :: non_neg_integer() | undefined,
counter :: non_neg_integer() | undefined
}).
-type(mqueue() :: #mqueue{}).
-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}).
-spec(init(options()) -> mqueue()).
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
@ -112,7 +125,8 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
#mqueue{max_len = MaxLen,
store_qos0 = QoS_0,
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
default_p = get_priority_opt(Opts)
default_p = get_priority_opt(Opts),
shift_opts = get_shift_opt(Opts)
}.
-spec(info(mqueue()) -> emqx_types:infos()).
@ -127,22 +141,30 @@ info(max_len, #mqueue{max_len = MaxLen}) ->
info(len, #mqueue{len = Len}) ->
Len;
info(dropped, #mqueue{dropped = Dropped}) ->
Dropped.
Dropped;
info(Info, ?OLD(MQ)) ->
info(Info, live_upgrade(MQ)).
is_empty(#mqueue{len = Len}) -> Len =:= 0.
is_empty(#mqueue{len = Len}) -> Len =:= 0;
is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)).
len(#mqueue{len = Len}) -> Len.
len(#mqueue{len = Len}) -> Len;
len(?OLD(MQ)) -> len(live_upgrade(MQ)).
max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
max_len(#mqueue{max_len = MaxLen}) -> MaxLen;
max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)).
%% @doc Return number of dropped messages.
-spec(dropped(mqueue()) -> count()).
dropped(#mqueue{dropped = Dropped}) -> Dropped.
dropped(#mqueue{dropped = Dropped}) -> Dropped;
dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)).
%% @doc Stats of the mqueue
-spec(stats(mqueue()) -> [stat()]).
stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}].
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}];
stats(?OLD(MQ)) ->
stats(live_upgrade(MQ)).
%% @doc Enqueue a message.
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
@ -165,15 +187,34 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
false ->
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
end.
end;
in(Msg, ?OLD(MQ)) ->
in(Msg, live_upgrade(MQ)).
-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
out(MQ = #mqueue{len = 0, q = Q}) ->
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
{empty, MQ};
out(MQ = #mqueue{q = Q, len = Len}) ->
out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) ->
{{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length
MQ1 = MQ#mqueue{
q = Q1,
len = Len - 1,
last_p = Prio,
counter = init_counter(Prio, ShiftOpts)
},
{{value, Val}, MQ1};
out(MQ = #mqueue{q = Q, counter = 0}) ->
MQ1 = MQ#mqueue{
q = ?PQUEUE:shift(Q),
last_p = undefined
},
out(MQ1);
out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) ->
{R, Q1} = ?PQUEUE:out(Q),
{R, MQ#mqueue{q = Q1, len = Len - 1}}.
{R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}};
out(?OLD(MQ)) ->
out(live_upgrade(MQ)).
get_opt(Key, Opts, Default) ->
case maps:get(Key, Opts, Default) of
@ -194,3 +235,46 @@ get_priority_opt(Opts) ->
%% while the highest 'infinity' is a [{infinity, queue:queue()}]
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
init_counter(?HIGHEST_PRIORITY, Opts) ->
Infinity = 1000000,
init_counter(Infinity, Opts);
init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
(Prio + Base) * Mult.
get_shift_opt(Opts) ->
Mult = maps:get(shift_multiplier, Opts, 10),
Min = case Opts of
#{p_table := PTab} ->
case maps:size(PTab) of
0 -> 0;
_ -> lists:min(maps:values(PTab))
end;
_ ->
?LOWEST_PRIORITY
end,
Base = case Min < 0 of
true -> -Min;
false -> 0
end,
#shift_opts{
multiplier = Mult,
base = Base
}.
live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) ->
ShiftOpts = case is_map(PTable) of
true -> get_shift_opt(#{p_table => PTable});
false -> get_shift_opt(#{})
end,
#mqueue{ store_qos0 = StoreQos0
, max_len = MaxLen
, dropped = Dropped
, p_table = PTable
, default_p = DefaultP
, len = Len
, q = Q
, shift_opts = ShiftOpts
, last_p = undefined
, counter = undefined
}.

View File

@ -55,6 +55,7 @@
, filter/2
, fold/3
, highest/1
, shift/1
]).
-export_type([q/0]).
@ -170,6 +171,14 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
-spec(shift(pqueue()) -> pqueue()).
shift(Q = {queue, _, _, _}) ->
Q;
shift({pqueue, []}) ->
{pqueue, []}; %% Shouldn't happen?
shift({pqueue, [Hd|Rest]}) ->
{pqueue, Rest ++ [Hd]}. %% Let's hope there are not many priorities.
-spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
@ -266,4 +275,3 @@ r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
maybe_negate_priority(infinity) -> infinity;
maybe_negate_priority(P) -> -P.

View File

@ -118,7 +118,8 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
false ->
ok = emqx_router_helper:monitor(Dest),
case emqx_topic:wildcard(Topic) of
true -> trans(fun insert_trie_route/1, [Route]);
true ->
maybe_trans(fun insert_trie_route/1, [Route]);
false -> insert_direct_route(Route)
end
end.
@ -164,7 +165,8 @@ do_delete_route(Topic) when is_binary(Topic) ->
do_delete_route(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true -> trans(fun delete_trie_route/1, [Route]);
true ->
maybe_trans(fun delete_trie_route/1, [Route]);
false -> delete_direct_route(Route)
end.
@ -247,10 +249,59 @@ delete_trie_route(Route = #route{topic = Topic}) ->
end.
%% @private
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
maybe_trans(Fun, Args) ->
case persistent_term:get(emqx_route_lock_type, key) of
key ->
trans(Fun, Args);
global ->
lock_router(),
try mnesia:sync_dirty(Fun, Args)
after
unlock_router()
end;
tab ->
trans(fun() ->
emqx_trie:lock_tables(),
apply(Fun, Args)
end, [])
end.
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
%% trigger selective receive optimization of compiler,
%% ideal for handling bursty traffic.
Ref = erlang:make_ref(),
Owner = self(),
{WPid, RefMon} = spawn_monitor(
fun() ->
Res = case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
Owner ! {Ref, Res}
end),
receive
{Ref, TransRes} ->
receive
{'DOWN', RefMon, process, WPid, normal} -> ok
end,
TransRes;
{'DOWN', RefMon, process, WPid, Info} ->
{error, {trans_crash, Info}}
end.
lock_router() ->
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
false ->
%% Force to sleep 1ms instead.
timer:sleep(1),
lock_router();
true ->
ok
end.
unlock_router() ->
global:del_lock({?MODULE, self()}).

View File

@ -34,6 +34,10 @@ init([]) ->
type => worker,
modules => [emqx_router_helper]},
ok = persistent_term:put(emqx_route_lock_type,
application:get_env(emqx, route_lock_type, key)
),
%% Router pool
RouterPool = emqx_pool_sup:spec([router_pool, hash,
{emqx_router, start_link, []}]),

View File

@ -75,6 +75,7 @@
-export([ deliver/2
, enqueue/2
, dequeue/1
, retry/1
, terminate/3
]).

View File

@ -31,7 +31,9 @@
, delete/1
]).
-export([empty/0]).
-export([ empty/0
, lock_tables/0
]).
-ifdef(TEST).
-compile(export_all).
@ -120,6 +122,11 @@ delete(Topic) when is_binary(Topic) ->
empty() ->
ets:info(?TRIE_TAB, size) == 0.
-spec lock_tables() -> ok.
lock_tables() ->
mnesia:write_lock_table(?TRIE_TAB),
mnesia:write_lock_table(?TRIE_NODE_TAB).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -34,7 +34,7 @@
, stats/1
]).
-export([call/2]).
-export([call/2, call/3]).
%% WebSocket callbacks
-export([ init/2
@ -151,7 +151,10 @@ stats(#state{channel = Channel}) ->
%% kick|discard|takeover
-spec(call(pid(), Req :: term()) -> Reply :: term()).
call(WsPid, Req) when is_pid(WsPid) ->
call(WsPid, Req) ->
call(WsPid, Req, 5000).
call(WsPid, Req, Timeout) when is_pid(WsPid) ->
Mref = erlang:monitor(process, WsPid),
WsPid ! {call, {self(), Mref}, Req},
receive
@ -160,7 +163,7 @@ call(WsPid, Req) when is_pid(WsPid) ->
Reply;
{'DOWN', Mref, _, _, Reason} ->
exit(Reason)
after 5000 ->
after Timeout ->
erlang:demonitor(Mref, [flush]),
exit(timeout)
end.
@ -196,15 +199,21 @@ init(Req, Opts) ->
end.
websocket_init([Req, Opts]) ->
Peername = case proplists:get_bool(proxy_protocol, Opts)
andalso maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort} ->
{SrcAddr, SrcPort};
_ ->
cowboy_req:peer(Req)
end,
{Peername, Peercert} =
case proplists:get_bool(proxy_protocol, Opts)
andalso maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
ProxyName = {SrcAddr, SrcPort},
%% Notice: Only CN is available in Proxy Protocol V2 additional info
ProxySSL = case maps:get(cn, SSL, undefined) of
undeined -> nossl;
CN -> [{pp2_ssl_cn, CN}]
end,
{ProxyName, ProxySSL};
_ ->
{cowboy_req:peer(Req), cowboy_req:cert(Req)}
end,
Sockname = cowboy_req:sock(Req),
Peercert = cowboy_req:cert(Req),
WsCookie = try cowboy_req:parse_cookies(Req)
catch
error:badarg ->

View File

@ -77,7 +77,7 @@ t_get_set_chan_stats(_) ->
t_open_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
ClientInfo = #{zone => external,
clientid => <<"clientid">>,
@ -153,14 +153,14 @@ t_open_session_race_condition(_) ->
t_discard_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = meck:unload(emqx_connection).
@ -180,7 +180,7 @@ t_takeover_session(_) ->
t_kick_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end),
{error, not_found} = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
test = emqx_cm:kick_session(<<"clientid">>),

View File

@ -42,7 +42,8 @@ all() ->
groups() ->
[{parse, [parallel],
[t_parse_cont,
t_parse_frame_too_large
t_parse_frame_too_large,
t_parse_frame_malformed_variable_byte_integer
]},
{connect, [parallel],
[t_serialize_parse_v3_connect,
@ -129,6 +130,12 @@ t_parse_frame_too_large(_) ->
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
t_parse_frame_malformed_variable_byte_integer(_) ->
MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 4) >>,
ParseState = emqx_frame:initial_parse_state(#{}),
?catch_error(malformed_variable_byte_integer,
emqx_frame:parse(MalformedPayload, ParseState)).
t_serialize_parse_v3_connect(_) ->
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,

View File

@ -270,7 +270,7 @@ t_connect_limit_timeout(_) ->
meck:unload(proplists).
t_connect_emit_stats_timeout(_) ->
IdleTimeout = 2000,
IdleTimeout = 2000 + 200,
emqx_zone:set_env(external, idle_timeout, IdleTimeout),
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]),
@ -278,7 +278,7 @@ t_connect_emit_stats_timeout(_) ->
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))),
timer:sleep(IdleTimeout),
timer:sleep(IdleTimeout+500),
?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))),
ok = emqtt:disconnect(Client).