Compare commits

...

25 Commits

Author SHA1 Message Date
JianBo He 802a7696ff chore: pretty print stacktrace 2022-04-11 09:22:27 +08:00
JianBo He 93bdee80ea hitofix: update emqx_cm module to e4.2.10 2022-04-11 01:00:37 +08:00
Turtle 830150b309 chore: update ekka tag 2021-06-17 11:30:43 +08:00
Turtle e10e241fe9 chore: export function 2021-06-08 10:54:19 +08:00
Zaiming Shi eeb480c051 core(deps): pin ekka 0.7.7 2021-06-08 10:54:19 +08:00
William Yang ca9b90be9a perf(broker): speedup trans when broker has a big mqueue 2021-04-29 17:10:10 +08:00
William Yang e02704e05a perf(broker): Optimization for handling bursty traffic
intro. new lock type: 'spawn' of broker.perf.route_lock_type

mnesia get lock calls are not optimized for selective receive.

hence taking locks would be very expensive while there are tones of
messages in the brokers message queue.

This optimization run the transaction in a separate process to utilize
the selective receive optimization of the compiler.
2021-04-29 17:10:10 +08:00
William Yang 5cc21dc3f0 fix(upgrade): duplicated modules loading. 2021-04-28 17:01:54 +02:00
William Yang 806cd33d1f chore(ct): fix a testcase timing. 2021-04-28 17:01:54 +02:00
William Yang f8ca066d74 upgrade: safe upgrade of emqx_connection 2021-04-28 21:29:20 +08:00
William Yang 59b7e8e3ba perf(lock): upgrade code for new lock types. 2021-04-28 21:29:20 +08:00
William Yang 571219f073 perf: new perf toggle broker.perf.route_lock_type 2021-04-28 21:29:20 +08:00
William Yang 9084554f4f fix: broker call should not timeout before client timeout
So change broker call timeout to infinity.
2021-04-28 21:29:20 +08:00
William Yang 1b179c8e2a perf(router): add route runs in async dirty context 2021-04-28 21:29:20 +08:00
William Yang fb8f5b79ad perf(trie): use global lock
Use global lock to reduce remote lock overhead.

So that emqx route trans can run in dirty *sync* context.

At least 10X subscribe/unsubscribe improvments.
2021-04-28 21:29:20 +08:00
JianBo He e427b0ff75 chore(appup): supply appup instructions for ws-connetion 2021-04-28 21:29:20 +08:00
Zaiming Shi b4e3c32e24 chore(conf): log only to file by default 2021-04-28 21:29:20 +08:00
Zaiming Shi c511e324bb fix(ekka): allow remote_console 2021-04-28 21:29:20 +08:00
zhanghongtong 1eb0f7b3b2 fix(ws connection): fix peer_cert_as_username error when ws connect 2021-04-28 21:29:20 +08:00
Shawn 7d003e0bfc Porting code for congestion alarms from 4.3.0 to e4.2.6 (#4523)
* fix(congestion): port some code from 4.3.0
* chore(emqx): update the appup file
2021-04-28 21:29:20 +08:00
Shawn 927264d793 fix(emqx): export do_deliver/2 for emqx_sn 2021-04-28 21:29:20 +08:00
Shawn 70da59c3bb chore(emqx): update appup 2021-04-28 21:29:20 +08:00
Shawn c69e1c6222 fix(mqtt-sn): sleep mode not working #4434 2021-04-28 21:29:20 +08:00
Shawn 6666210211 fix(congestion): alarm and clear congestion too frequetly 2021-04-28 21:29:20 +08:00
Shawn 238eaa8e40 fix(emqx): validate mqtt malformed variable byte integer 2021-03-10 19:42:44 +08:00
21 changed files with 446 additions and 144 deletions

View File

@ -296,6 +296,16 @@ broker.shared_dispatch_ack_enabled = false
## Value: Flag
broker.route_batch_clean = off
## Performance toggle for subscribe/unsubscribe wildcard topic.
## Change this toggle only when there are many wildcard topics.
## Value: Enum
## - key: mnesia translational updates with per-key locks. recommended for single node setup.
## - tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
## - global: global lock protected updates. recommended for larger cluster.
## NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
## to be stopped before the change.
broker.perf.route_lock_type = key
##-------------------------------------------------------------------
## Plugins
##-------------------------------------------------------------------

View File

@ -544,6 +544,16 @@ listener.ws.external.verify_protocol_header = on
## Value: Duration
## listener.ws.external.proxy_protocol_timeout = 3s
## See: listener.ssl.$name.peer_cert_as_username
##
## Value: cn
## listener.ws.external.peer_cert_as_username = cn
## See: listener.ssl.$name.peer_cert_as_clientid
##
## Value: cn
## listener.ws.external.peer_cert_as_clientid = cn
## The TCP backlog of external MQTT/WebSocket Listener.
##
## See: listener.ws.$name.backlog

View File

@ -10,7 +10,7 @@
## - file: write logs only to file
## - console: write logs only to standard I/O
## - both: write logs both to file and standard I/O
log.to = both
log.to = file
## The log severity level.
##
@ -167,4 +167,4 @@ log.rotation.count = 5
## Value: MaxBurstCount,TimeWindow
## Default: disabled
##
#log.burst_limit = 20000, 1s
#log.burst_limit = 20000, 1s

View File

@ -184,6 +184,30 @@ zone.external.enable_flapping_detect = off
## Example: 100KB incoming per 10 seconds.
#zone.external.rate_limit.conn_bytes_in = 100KB,10s
## Whether to alarm the congested connections.
##
## Sometimes the mqtt connection (usually an MQTT subscriber) may get "congested" because
## there're too many packets to sent. The socket trys to buffer the packets until the buffer is
## full. If more packets comes after that, the packets will be "pending" in a queue
## and we consider the connection is "congested".
##
## Enable this to send an alarm when there's any bytes pending in the queue. You could set
## the `listener.tcp.<ZoneName>.sndbuf` to a larger value if the alarm is triggered too often.
##
## The name of the alarm is of format "conn_congestion/<ClientID>/<Username>".
## Where the <ClientID> is the client-id of the congested MQTT connection.
## And the <Username> is the username or "unknown_user" of not provided by the client.
## Default: off
#zone.external.conn_congestion.alarm = off
## Won't clear the congested alarm in how long time.
## The alarm is cleared only when there're no pending bytes in the queue, and also it has been
## `min_alarm_sustain_duration` time since the last time we considered the connection is "congested".
##
## This is to avoid clearing and sending the alarm again too often.
## Default: 1m
#zone.external.conn_congestion.min_alarm_sustain_duration = 1m
## Messages quota for the each of external MQTT connection.
## This value consumed by the number of recipient on a message.
##

View File

@ -999,6 +999,16 @@ end}.
{datatype, string}
]}.
{mapping, "zone.$name.conn_congestion.alarm", "emqx.zones", [
{datatype, flag},
{default, off}
]}.
{mapping, "zone.$name.conn_congestion.min_alarm_sustain_duration", "emqx.zones", [
{default, "1m"},
{datatype, {duration, ms}}
]}.
{mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [
{datatype, string}
]}.
@ -1128,6 +1138,10 @@ end}.
{ratelimit, {conn_messages_in, Ratelimit(Val)}};
(["rate_limit", "conn_bytes_in"], Val) ->
{ratelimit, {conn_bytes_in, Ratelimit(Val)}};
(["conn_congestion", "alarm"], Val) ->
{conn_congestion_alarm_enabled, Val};
(["conn_congestion", "min_alarm_sustain_duration"], Val) ->
{conn_congestion_min_alarm_sustain_duration, Val};
(["quota", "conn_messages_routing"], Val) ->
{quota, {conn_messages_routing, Ratelimit(Val)}};
(["quota", "overall_messages_routing"], Val) ->
@ -1573,7 +1587,11 @@ end}.
]}.
{mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [
{datatype, {enum, [cn, dn, crt]}}
{datatype, {enum, [cn]}}
]}.
{mapping, "listener.ws.$name.peer_cert_as_clientid", "emqx.listeners", [
{datatype, {enum, [cn]}}
]}.
%%--------------------------------------------------------------------
@ -2008,6 +2026,18 @@ end}.
{datatype, flag}
]}.
%% @doc Performance toggle for subscribe/unsubscribe wildcard topic.
%% Change this toggle only when there are many wildcard topics.
%% key: mnesia translational updates with per-key locks. recommended for single node setup.
%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
%% global: global lock protected updates. recommended for larger cluster.
%% NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
%%
{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [
{default, key},
{datatype, {enum, [key, tab, global]}}
]}.
%%--------------------------------------------------------------------
%% System Monitor
%%--------------------------------------------------------------------

View File

@ -7,7 +7,7 @@
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.5"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.8"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}.

View File

@ -6,6 +6,7 @@
{add_module, emqx_congestion},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
@ -14,12 +15,17 @@
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]}
{resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
]},
{"4.2.1", [
{add_module, emqx_congestion},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
@ -27,15 +33,54 @@
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]}
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
]},
{<<"4.2.[23]">>, [
{add_module, emqx_congestion},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []}
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
]},
{<<"4.2.4">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
]},
{<<"4.2.5">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
]},
{<<"4.2.6">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
],
@ -51,8 +96,13 @@
{update, emqx_connection, {advanced, []}},
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{delete_module, emqx_congestion}
{delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []}
]},
{"4.2.1", [
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
@ -60,20 +110,60 @@
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{update, emqx_ws_connection, {advanced, []}},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
{delete_module, emqx_congestion}
{delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []}
]},
{<<"4.2.[23]">>, [
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{delete_module, emqx_congestion}
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{delete_module, emqx_congestion},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []}
]},
{<<"4.2.4">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []}
]},
{<<"4.2.5">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{update, emqx_connection, {advanced, []}},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []}
]},
{<<"4.2.6">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
]

View File

@ -361,7 +361,7 @@ normalize_message(partition, #{occurred := Node}) ->
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) ->
list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info]));
normalize_message(<<"conn_congestion/", Info/binary>>, _) ->
list_to_binary(io_lib:format("connection congested: ~s", [Info]));
normalize_message(_Name, _UnknownDetails) ->
<<"Unknown alarm">>.

View File

@ -419,7 +419,7 @@ safe_update_stats(Tab, Stat, MaxStat) ->
-compile({inline, [call/2, cast/2, pick/1]}).
call(Broker, Req) ->
gen_server:call(Broker, Req).
gen_server:call(Broker, Req, infinity).
cast(Broker, Msg) ->
gen_server:cast(Broker, Msg).

View File

@ -32,6 +32,8 @@
-export([ info/1
, info/2
, set_conn_state/2
, get_session/1
, set_session/2
, stats/1
, caps/1
]).
@ -46,6 +48,12 @@
, terminate/2
]).
%% Export for emqx_sn
-export([ do_deliver/2
, ensure_keepalive/2
, clear_keepalive/1
]).
%% Exports for CT
-export([set_field/3]).
@ -167,6 +175,12 @@ info(timers, #channel{timers = Timers}) -> Timers.
set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}.
get_session(#channel{session = Session}) ->
Session.
set_session(Session, Channel) ->
Channel#channel{session = Session}.
%% TODO: Add more stats.
-spec(stats(channel()) -> emqx_types:stats()).
stats(#channel{session = Session})->
@ -1501,6 +1515,14 @@ ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
clear_keepalive(Channel = #channel{timers = Timers}) ->
case maps:get(alive_timer, Timers, undefined) of
undefined ->
Channel;
TRef ->
emqx_misc:cancel_timer(TRef),
Channel#channel{timers = maps:without([alive_timer], Timers)}
end.
%%--------------------------------------------------------------------
%% Maybe Resume Session

View File

@ -70,7 +70,10 @@
]).
%% Internal export
-export([stats_fun/0]).
-export([stats_fun/0, clean_down/1]).
%% Test export
-export([register_channel_/3]).
-type(chan_pid() :: pid()).
@ -91,6 +94,10 @@
%% Server name
-define(CM, ?MODULE).
-define(T_KICK, 5000).
-define(T_GET_INFO, 5000).
-define(T_TAKEOVER, 15000).
%% @doc Start the channel manager.
-spec(start_link() -> startlink_ret()).
start_link() ->
@ -159,7 +166,7 @@ get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() ->
error:badarg -> undefined
end;
get_chan_info(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]).
rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO).
%% @doc Update infos of the channel.
-spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()).
@ -184,7 +191,7 @@ get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
error:badarg -> undefined
end;
get_chan_stats(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]).
rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO).
%% @doc Set channel's stats.
-spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()).
@ -222,7 +229,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
register_channel_(ClientId, Self, ConnInfo),
{ok, #{session => Session,
present => true,
@ -252,7 +259,7 @@ takeover_session(ClientId) ->
takeover_session(ClientId, ChanPid);
ChanPids ->
[ChanPid|StalePids] = lists:reverse(ChanPids),
?LOG(error, "More than one channel found: ~p", [ChanPids]),
?LOG(error, "more_than_one_channel_found: ~p", [ChanPids]),
lists:foreach(fun(StalePid) ->
catch discard_session(ClientId, StalePid)
end, StalePids),
@ -264,67 +271,111 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined ->
{error, not_found};
ConnMod when is_atom(ConnMod) ->
Session = ConnMod:call(ChanPid, {takeover, 'begin'}),
%% TODO: if takeover times out, maybe kill the old?
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
{ok, ConnMod, ChanPid, Session}
end;
takeover_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid]).
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
%% @doc Discard all the sessions identified by the ClientId.
-spec(discard_session(emqx_types:clientid()) -> ok).
discard_session(ClientId) when is_binary(ClientId) ->
case lookup_channels(ClientId) of
[] -> ok;
ChanPids ->
lists:foreach(
fun(ChanPid) ->
try
discard_session(ClientId, ChanPid)
catch
_:{noproc,_}:_Stk -> ok;
_:{{shutdown,_},_}:_Stk -> ok;
_:Error:_Stk ->
?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error])
end
end, ChanPids)
ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
end.
discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of
undefined -> ok;
ConnMod when is_atom(ConnMod) ->
ConnMod:call(ChanPid, discard)
end;
%% @private Kick a local stale session to force it step down.
%% If failed to kick (e.g. timeout) force a kill.
%% Keeping the stale pid around, or returning error or raise an exception
%% benefits nobody.
-spec kick_or_kill(kick | discard, module(), pid()) -> ok.
kick_or_kill(Action, ConnMod, Pid) ->
try
%% this is essentailly a gen_server:call implemented in emqx_connection
%% and emqx_ws_connection.
%% the handle_call is implemented in emqx_channel
ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
catch
_ : noproc -> % emqx_ws_connection: call
?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]),
ok;
_ : {noproc, _} -> % emqx_connection: gen_server:call
?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]),
ok;
_ : {shutdown, _} ->
?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]),
ok;
_ : {{shutdown, _}, _} ->
?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]),
ok;
_ : {timeout, {gen_server, call, _}} ->
?LOG(warning, "session_kick_timeout: ~p, action: ~p, "
"stale_channel: ~0p",
[Pid, Action, stale_channel_info(Pid)]),
ok = force_kill(Pid);
_ : Error ->
?LOG(error, "session_kick_exception: ~p, action: ~p, "
"reason: ~p, stacktrace: ~0p, stale_channel: ~0p",
[Pid, Action, Error, erlang:get_stacktrace(), stale_channel_info(Pid)]),
ok = force_kill(Pid)
end.
force_kill(Pid) ->
exit(Pid, kill),
ok.
stale_channel_info(Pid) ->
process_info(Pid, [status, message_queue_len, current_stacktrace]).
discard_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), discard_session, [ClientId, ChanPid]).
kick_session(discard, ClientId, ChanPid).
kick_session(ClientId, ChanPid) ->
kick_session(kick, ClientId, ChanPid).
%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of
undefined ->
%% already deregistered
ok;
ConnMod when is_atom(ConnMod) ->
ok = kick_or_kill(Action, ConnMod, ChanPid)
end;
kick_session(Action, ClientId, ChanPid) ->
%% call remote node on the old APIs because we do not know if they have upgraded
%% to have kick_session/3
Function = case Action of
discard -> discard_session;
kick -> kick_session
end,
try
rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK)
catch
Error : Reason ->
%% This should mostly be RPC failures.
%% However, if the node is still running the old version
%% code (prior to emqx app 4.3.10) some of the RPC handler
%% exceptions may get propagated to a new version node
?LOG(error, "failed_to_kick_session_on_remote_node ~p: ~p ~p ~p",
[node(ChanPid), Action, Error, Reason])
end.
kick_session(ClientId) ->
case lookup_channels(ClientId) of
[] -> {error, not_found};
[ChanPid] ->
kick_session(ClientId, ChanPid);
[] ->
?LOG(warning, "kiecked_an_unknown_session ~ts", [ClientId]),
ok;
ChanPids ->
[ChanPid|StalePids] = lists:reverse(ChanPids),
?LOG(error, "More than one channel found: ~p", [ChanPids]),
lists:foreach(fun(StalePid) ->
catch discard_session(ClientId, StalePid)
end, StalePids),
kick_session(ClientId, ChanPid)
case length(ChanPids) > 1 of
true -> ?LOG(info, "more_than_one_channel_found: ~p", [ChanPids]);
false -> ok
end,
lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids)
end.
kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(ChanPid, kick);
undefined ->
{error, not_found}
end;
kick_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), kick_session, [ClientId, ChanPid]).
%% @doc Is clean start?
% is_clean_start(#{clean_start := false}) -> false;
% is_clean_start(_Attrs) -> true.
@ -360,10 +411,16 @@ lookup_channels(local, ClientId) ->
[ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
%% @private
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of
{badrpc, Reason} -> error(Reason);
Res -> Res
rpc_call(Node, Fun, Args, Timeout) ->
case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
{badrpc, Reason} ->
%% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler
%% should catch all exceptions and always return 'ok'.
%% This leaves 'badrpc' only possible when there is problem
%% calling the remote node.
error({badrpc, Reason});
Res ->
Res
end.
%% @private
@ -396,7 +453,7 @@ handle_cast(Msg, State) ->
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
{noreply, State#{chan_pmon := PMon1}};
handle_info(Info, State) ->
@ -432,5 +489,5 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
error:badarg -> undefined
end;
get_chann_conn_mod(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]).
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).

View File

@ -16,17 +16,16 @@
-module(emqx_congestion).
-export([ maybe_alarm_port_busy/3
, maybe_alarm_port_busy/4
, maybe_alarm_too_many_publish/5
, maybe_alarm_too_many_publish/6
-export([ maybe_alarm_conn_congestion/3
, cancel_alarms/3
]).
-define(ALARM_CONN_CONGEST(Channel, Reason),
list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s/~s", [emqx_channel:info(clientid, Channel),
maps:get(username, emqx_channel:info(clientinfo, Channel), <<"undefined">>),
Reason]))).
list_to_binary(
io_lib:format("~s/~s/~s",
[Reason, emqx_channel:info(clientid, Channel),
maps:get(username, emqx_channel:info(clientinfo, Channel),
<<"unknown_user">>)]))).
-define(ALARM_CONN_INFO_KEYS, [
socktype, sockname, peername, clientid, username, proto_name, proto_ver,
@ -36,44 +35,28 @@
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]).
-define(ALARM_SENT(REASON), {alarm_sent, REASON}).
-define(ALL_ALARM_REASONS, [port_busy, too_many_publish]).
-define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}).
-define(CONFIRM_CLEAR_INTERVAL, 10000).
-define(ALL_ALARM_REASONS, [conn_congestion]).
-define(WONT_CLEAR_IN, 60000).
maybe_alarm_port_busy(Socket, Transport, Channel) ->
maybe_alarm_port_busy(Socket, Transport, Channel, false).
maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) ->
case is_tcp_congested(Socket, Transport) of
true -> alarm_congestion(Socket, Transport, Channel, port_busy);
false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy,
ForceClear)
maybe_alarm_conn_congestion(Socket, Transport, Channel) ->
case is_alarm_enabled(Channel) of
false -> ok;
true ->
case is_tcp_congested(Socket, Transport) of
true -> alarm_congestion(Socket, Transport, Channel, conn_congestion);
false -> cancel_alarm_congestion(Socket, Transport, Channel, conn_congestion)
end
end.
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize) ->
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize, false).
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
PubMsgCount = _MaxBatchSize, _ForceClear) ->
%% we only alarm it when the process is "too busy"
alarm_congestion(Socket, Transport, Channel, too_many_publish);
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
_MaxBatchSize, ForceClear) when PubMsgCount == 0 ->
%% but we clear the alarm until it is really "idle", to avoid sending
%% alarms and clears too frequently
cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish,
ForceClear);
maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount,
_MaxBatchSize, _ForceClear) ->
ok.
cancel_alarms(Socket, Transport, Channel) ->
lists:foreach(fun(Reason) ->
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason)
end, ?ALL_ALARM_REASONS).
is_alarm_enabled(Channel) ->
emqx_zone:get_env(emqx_channel:info(zone, Channel),
conn_congestion_alarm_enabled, false).
alarm_congestion(Socket, Transport, Channel, Reason) ->
case has_alarm_sent(Reason) of
false -> do_alarm_congestion(Socket, Transport, Channel, Reason);
@ -82,8 +65,11 @@ alarm_congestion(Socket, Transport, Channel, Reason) ->
update_alarm_sent_at(Reason)
end.
cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) ->
case is_alarm_allowed_clear(Reason, ForceClear) of
cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
Zone = emqx_channel:info(zone, Channel),
WontClearIn = emqx_zone:get_env(Zone, conn_congestion_min_alarm_sustain_duration,
?WONT_CLEAR_IN),
case has_alarm_sent(Reason) andalso long_time_since_last_alarm(Reason, WontClearIn) of
true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
false -> ok
end.
@ -123,13 +109,11 @@ get_alarm_sent_at(Reason) ->
LastSentAt -> LastSentAt
end.
is_alarm_allowed_clear(Reason, _ForceClear = true) ->
has_alarm_sent(Reason);
is_alarm_allowed_clear(Reason, _ForceClear = false) ->
long_time_since_last_alarm(Reason, WontClearIn) ->
%% only sent clears when the alarm was not triggered in the last
%% ?CONFIRM_CLEAR_INTERVAL time
%% WontClearIn time
case timenow() - get_alarm_sent_at(Reason) of
Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true;
Elapse when Elapse >= WontClearIn -> true;
_ -> false
end.

View File

@ -38,7 +38,7 @@
, stats/1
]).
-export([call/2]).
-export([call/2, call/3]).
%% Callback
-export([init/4]).
@ -170,6 +170,9 @@ stats(#state{transport = Transport,
call(Pid, Req) ->
gen_server:call(Pid, Req, infinity).
call(Pid, Req, Timeout) ->
gen_server:call(Pid, Req, Timeout).
stop(Pid) ->
gen_server:stop(Pid).
@ -374,12 +377,8 @@ handle_msg({Passive, _Sock}, State)
handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg},
#state{active_n = MaxBatchSize, transport = Transport,
socket = Socket, channel = Channel} = State) ->
Delivers0 = emqx_misc:drain_deliver(MaxBatchSize),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
length(Delivers0), MaxBatchSize),
Delivers = [Deliver|Delivers0],
#state{active_n = ActiveN} = State) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
@ -548,12 +547,9 @@ handle_timeout(_TRef, limit_timeout, State) ->
},
handle_info(activate_socket, NState);
handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize,
channel = Channel, transport = Transport, socket = Socket}) ->
{_, MsgQLen} = erlang:process_info(self(), message_queue_len),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel, true),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
MsgQLen, MaxBatchSize, true),
handle_timeout(_TRef, emit_stats, State = #state{channel = Channel, transport = Transport,
socket = Socket}) ->
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_stats(ClientId, stats(State)),
{ok, State#state{stats_timer = undefined}};
@ -666,7 +662,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
Oct = iolist_size(IoData),
ok = emqx_metrics:inc('bytes.sent', Oct),
emqx_pd:inc_counter(outgoing_bytes, Oct),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel),
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
case Transport:async_send(Socket, IoData, [nosuspend]) of
ok -> ok;
Error = {error, _Reason} ->

View File

@ -141,6 +141,9 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
parse_frame(Rest, Header, 2, Options);
parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options)
when Multiplier > 2097152 ->
error(malformed_variable_byte_integer);
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,

View File

@ -118,7 +118,8 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
false ->
ok = emqx_router_helper:monitor(Dest),
case emqx_topic:wildcard(Topic) of
true -> trans(fun insert_trie_route/1, [Route]);
true ->
maybe_trans(fun insert_trie_route/1, [Route]);
false -> insert_direct_route(Route)
end
end.
@ -164,7 +165,8 @@ do_delete_route(Topic) when is_binary(Topic) ->
do_delete_route(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true -> trans(fun delete_trie_route/1, [Route]);
true ->
maybe_trans(fun delete_trie_route/1, [Route]);
false -> delete_direct_route(Route)
end.
@ -247,10 +249,59 @@ delete_trie_route(Route = #route{topic = Topic}) ->
end.
%% @private
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
maybe_trans(Fun, Args) ->
case persistent_term:get(emqx_route_lock_type, key) of
key ->
trans(Fun, Args);
global ->
lock_router(),
try mnesia:sync_dirty(Fun, Args)
after
unlock_router()
end;
tab ->
trans(fun() ->
emqx_trie:lock_tables(),
apply(Fun, Args)
end, [])
end.
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
%% trigger selective receive optimization of compiler,
%% ideal for handling bursty traffic.
Ref = erlang:make_ref(),
Owner = self(),
{WPid, RefMon} = spawn_monitor(
fun() ->
Res = case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
Owner ! {Ref, Res}
end),
receive
{Ref, TransRes} ->
receive
{'DOWN', RefMon, process, WPid, normal} -> ok
end,
TransRes;
{'DOWN', RefMon, process, WPid, Info} ->
{error, {trans_crash, Info}}
end.
lock_router() ->
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
false ->
%% Force to sleep 1ms instead.
timer:sleep(1),
lock_router();
true ->
ok
end.
unlock_router() ->
global:del_lock({?MODULE, self()}).

View File

@ -34,6 +34,10 @@ init([]) ->
type => worker,
modules => [emqx_router_helper]},
ok = persistent_term:put(emqx_route_lock_type,
application:get_env(emqx, route_lock_type, key)
),
%% Router pool
RouterPool = emqx_pool_sup:spec([router_pool, hash,
{emqx_router, start_link, []}]),

View File

@ -75,6 +75,7 @@
-export([ deliver/2
, enqueue/2
, dequeue/1
, retry/1
, terminate/3
]).

View File

@ -31,7 +31,9 @@
, delete/1
]).
-export([empty/0]).
-export([ empty/0
, lock_tables/0
]).
-ifdef(TEST).
-compile(export_all).
@ -120,6 +122,11 @@ delete(Topic) when is_binary(Topic) ->
empty() ->
ets:info(?TRIE_TAB, size) == 0.
-spec lock_tables() -> ok.
lock_tables() ->
mnesia:write_lock_table(?TRIE_TAB),
mnesia:write_lock_table(?TRIE_NODE_TAB).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -196,15 +196,21 @@ init(Req, Opts) ->
end.
websocket_init([Req, Opts]) ->
Peername = case proplists:get_bool(proxy_protocol, Opts)
andalso maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort} ->
{SrcAddr, SrcPort};
_ ->
cowboy_req:peer(Req)
end,
{Peername, Peercert} =
case proplists:get_bool(proxy_protocol, Opts)
andalso maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
ProxyName = {SrcAddr, SrcPort},
%% Notice: Only CN is available in Proxy Protocol V2 additional info
ProxySSL = case maps:get(cn, SSL, undefined) of
undeined -> nossl;
CN -> [{pp2_ssl_cn, CN}]
end,
{ProxyName, ProxySSL};
_ ->
{cowboy_req:peer(Req), cowboy_req:cert(Req)}
end,
Sockname = cowboy_req:sock(Req),
Peercert = cowboy_req:cert(Req),
WsCookie = try cowboy_req:parse_cookies(Req)
catch
error:badarg ->

View File

@ -42,7 +42,8 @@ all() ->
groups() ->
[{parse, [parallel],
[t_parse_cont,
t_parse_frame_too_large
t_parse_frame_too_large,
t_parse_frame_malformed_variable_byte_integer
]},
{connect, [parallel],
[t_serialize_parse_v3_connect,
@ -129,6 +130,12 @@ t_parse_frame_too_large(_) ->
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
t_parse_frame_malformed_variable_byte_integer(_) ->
MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 4) >>,
ParseState = emqx_frame:initial_parse_state(#{}),
?catch_error(malformed_variable_byte_integer,
emqx_frame:parse(MalformedPayload, ParseState)).
t_serialize_parse_v3_connect(_) ->
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,

View File

@ -270,7 +270,7 @@ t_connect_limit_timeout(_) ->
meck:unload(proplists).
t_connect_emit_stats_timeout(_) ->
IdleTimeout = 2000,
IdleTimeout = 2000 + 200,
emqx_zone:set_env(external, idle_timeout, IdleTimeout),
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]),