Compare commits

...

19 Commits

Author SHA1 Message Date
Zaiming Shi eeffc209df perf(emqx_router): optimise trans result receive 2021-05-06 06:36:46 +02:00
William Yang 84e032f2e1 chore(ci): disable elvis checks on pull requests
It is meaningless to do elvis checks on maintenance branches.
2021-05-05 11:49:08 +02:00
Zaiming (Stone) Shi 989ab64c21 chore(schema): update schema comment for trans lock type 2021-05-05 11:49:08 +02:00
William Yang b45a60fd3c fix(upgrade): default val for trans lock type. 2021-05-05 11:49:08 +02:00
William Yang 2f405f9fa6 feat(upgrade): upgrade code for new trans lock types 2021-05-05 11:49:08 +02:00
William Yang 74bc7c6a07 perf(broker): speedup trans when broker has a big mqueue 2021-05-05 11:49:08 +02:00
William Yang a4d8ef4f93 perf(broker): Optimization for handling bursty traffic
intro. new lock type: 'spawn' of broker.perf.route_lock_type

mnesia get lock calls are not optimized for selective receive.

hence taking locks would be very expensive while there are tones of
messages in the brokers message queue.

This optimization run the transaction in a separate process to utilize
the selective receive optimization of the compiler.
2021-05-05 11:49:08 +02:00
William Yang e9c14df7a3 perf(broker): new perf toggle broker.perf.route_lock_type 2021-05-05 11:49:08 +02:00
William Yang 154bf0d446 fix(broker): broker call should not timeout before client timeout
So change broker call timeout to infinity.
2021-05-05 11:49:08 +02:00
William Yang ebc1b8521b perf(router): add route runs in async dirty context 2021-05-05 11:49:08 +02:00
William Yang cee378c345 perf(trie): use global lock
Use global lock to reduce remote lock overhead.

So that emqx route trans can run in dirty *sync* context.

At least 10X subscribe/unsubscribe improvments.
2021-05-05 11:49:08 +02:00
Zaiming (Stone) Shi 23164f0a2d
Merge pull request #4725 from zmstone/upgrade-ekka-0.7.7
chore(deps): upgrade to ekka 0.7.7
2021-04-30 19:07:53 +02:00
Zaiming Shi ad73e252dc chore(deps): upgrade to ekka 0.7.7 2021-04-30 14:42:38 +02:00
zhanghongtong 6429c948c1 chore: fix syntax error 2021-04-16 20:16:16 +08:00
zhanghongtong 28810b62c8 fix(ws connection): fix peer_cert_as_username error when ws connect 2021-04-16 14:52:34 +08:00
zhouzb 054f9907c6 chore(upgrade): delete rebar3 script 2021-03-26 09:42:11 +08:00
zhouzb b0ad39376c chore(upgrade): add upgrade script 2021-03-26 09:42:11 +08:00
zhouzb 198da2c688 fix(ping): deny pingreq when mqtt not connected 2021-03-24 17:12:32 +08:00
Shawn 9e5a868bf1 fix(emqx): validate mqtt malformed variable byte integer 2021-03-10 18:29:59 +08:00
14 changed files with 252 additions and 37 deletions

View File

@ -1,6 +1,7 @@
name: Elvis Linter
on: [pull_request]
on: [push]
jobs:
build:

View File

@ -724,6 +724,7 @@ zone.external.acl_deny_action = ignore
## Numbers delimited by `|'. Zero or negative is to disable.
zone.external.force_gc_policy = 16000|16MB
## Max message queue length and total heap size to force shutdown
## connection/session process.
## Message queue here is the Erlang process mailbox, but not the number
@ -736,6 +737,16 @@ zone.external.force_gc_policy = 16000|16MB
## - 1000|32MB on ARCH_32 sytem
#zone.external.force_shutdown_policy = 10000|64MB
## Performance toggle for subscribe/unsubscribe wildcard topic.
## Change this toggle only when there are many wildcard topics.
## Value: Enum
## - key: mnesia translational updates with per-key locks. recommended for single node setup.
## - tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
## - global: global lock protected updates. recommended for larger cluster.
## NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
## to be stopped before the change.
broker.perf.route_lock_type = key
## Maximum MQTT packet size allowed.
##
## Value: Bytes
@ -1088,10 +1099,18 @@ listener.tcp.external.access.1 = allow all
## Enable the option for X.509 certificate based authentication.
## EMQX will use the common name of certificate as MQTT username.
## Only support Proxy Protocol V2, the CN is available in Proxy Protocol V2 additional info
##
## Value: cn | dn | crt
## listener.tcp.external.peer_cert_as_username = cn
## Enable the option for X.509 certificate based authentication.
## EMQX will use the common name of certificate as MQTT clientid.
## Only support Proxy Protocol V2, the CN is available in Proxy Protocol V2 additional info
##
## Value: cn
## listener.tcp.external.peer_cert_as_clientid = cn
## The TCP backlog defines the maximum length that the queue of pending
## connections can grow to.
##
@ -1567,6 +1586,20 @@ listener.ws.external.verify_protocol_header = on
## Value: Duration
## listener.ws.external.proxy_protocol_timeout = 3s
## Enable the option for X.509 certificate based authentication.
## EMQX will use the common name of certificate as MQTT username.
## Only support Proxy Protocol V2, the CN is available in Proxy Protocol V2 additional info
##
## Value: cn
## listener.ws.external.peer_cert_as_username = cn
## Enable the option for X.509 certificate based authentication.
## EMQX will use the common name of certificate as MQTT clientid.
## Only support Proxy Protocol V2, the CN is available in Proxy Protocol V2 additional info
##
## Value: cn
## listener.ws.external.peer_cert_as_clientid = cn
## The TCP backlog of external MQTT/WebSocket Listener.
##
## See: listener.ws.$name.backlog

View File

@ -1572,6 +1572,14 @@ end}.
hidden
]}.
{mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [
{datatype, {enum, [cn]}}
]}.
{mapping, "listener.ws.$name.peer_cert_as_clientid", "emqx.listeners", [
{datatype, {enum, [cn]}}
]}.
%%--------------------------------------------------------------------
%% MQTT/WebSocket/SSL Listeners
@ -2097,6 +2105,18 @@ end}.
{datatype, flag}
]}.
%% @doc Performance toggle for subscribe/unsubscribe wildcard topic.
%% Change this toggle only when there are many wildcard topics.
%% key: mnesia translational updates with per-key locks. recommended for single node setup.
%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
%% global: global lock protected updates. recommended for larger cluster.
%% NOTE: when changing from/to 'global' lock, it requires a full cluster restart.
%%
{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [
{default, key},
{datatype, {enum, [key, tab, global]}}
]}.
%%--------------------------------------------------------------------
%% System Monitor
%%--------------------------------------------------------------------

View File

@ -7,7 +7,7 @@
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.7"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}.

View File

@ -8,20 +8,67 @@
end,
{VSN,
[
{"4.2.11", [
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
]},
{"4.2.10", [
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
]},
{"4.2.9", [
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
]},
{<<"4.2.[34567]">>, [
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
]},
{"4.2.2", [
{load_module, emqx_metrics, brutal_purge, soft_purge, []}
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
]},
{"4.2.1", [
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_json, brutal_purge, soft_purge, []}
]},
{"4.2.0", [
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_json, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
]},
{"4.2.0", [
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, []},
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_json, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
{apply, {application, set_env,
[emqx, force_shutdown_policy,
#{message_queue_len => DefaultLen,
@ -30,20 +77,67 @@
{<<".*">>, []}
],
[
{"4.2.11", [
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []}
]},
{"4.2.10", [
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
]},
{"4.2.9", [
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
]},
{<<"4.2.[34567]">>, [
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
]},
{"4.2.2", [
{load_module, emqx_metrics, brutal_purge, soft_purge, []}
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
]},
{"4.2.1", [
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_json, brutal_purge, soft_purge, []}
{load_module, emqx_json, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
]},
{"4.2.0", [
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
{load_module, emqx_broker, soft_purge, soft_purge, []},
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
{load_module, emqx_router, soft_purge, soft_purge, []},
{load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_json, brutal_purge, soft_purge, []}
{load_module, emqx_json, brutal_purge, soft_purge, []},
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
]

View File

@ -419,7 +419,7 @@ safe_update_stats(Tab, Stat, MaxStat) ->
-compile({inline, [call/2, cast/2, pick/1]}).
call(Broker, Req) ->
gen_server:call(Broker, Req).
gen_server:call(Broker, Req, infinity).
cast(Broker, Msg) ->
gen_server:cast(Broker, Msg).

View File

@ -343,9 +343,6 @@ handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
},
handle_incoming(Packet, NState);
handle_msg({incoming, ?PACKET(?PINGREQ)}, State) ->
handle_outgoing(?PACKET(?PINGRESP), State);
handle_msg({incoming, Packet}, State) ->
handle_incoming(Packet, State);

View File

@ -125,6 +125,9 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
parse_frame(Rest, Header, 2, Options);
parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options)
when Multiplier > 2097152 ->
error(malformed_variable_byte_integer);
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,

View File

@ -118,7 +118,8 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
false ->
ok = emqx_router_helper:monitor(Dest),
case emqx_topic:wildcard(Topic) of
true -> trans(fun insert_trie_route/1, [Route]);
true ->
maybe_trans(fun insert_trie_route/1, [Route]);
false -> insert_direct_route(Route)
end
end.
@ -164,7 +165,8 @@ do_delete_route(Topic) when is_binary(Topic) ->
do_delete_route(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true -> trans(fun delete_trie_route/1, [Route]);
true ->
maybe_trans(fun delete_trie_route/1, [Route]);
false -> delete_direct_route(Route)
end.
@ -247,10 +249,55 @@ delete_trie_route(Route = #route{topic = Topic}) ->
end.
%% @private
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
maybe_trans(Fun, Args) ->
case persistent_term:get(emqx_route_lock_type, key) of
key ->
trans(Fun, Args);
global ->
lock_router(),
try mnesia:sync_dirty(Fun, Args)
after
unlock_router()
end;
tab ->
trans(fun() ->
emqx_trie:lock_tables(),
apply(Fun, Args)
end, [])
end.
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
%% trigger selective receive optimization of compiler,
%% ideal for handling bursty traffic.
{_, RefMon} = spawn_monitor(
fun() ->
Res = case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
exit({shutdown, Res})
end),
receive
{'DOWN', RefMon, process, _, Info} ->
case Info of
{shutdown, Res} -> Res;
_ -> {error, {trans_crash, Info}}
end
end.
lock_router() ->
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
false ->
%% Force to sleep 1ms instead.
timer:sleep(1),
lock_router();
true ->
ok
end.
unlock_router() ->
global:del_lock({?MODULE, self()}).

View File

@ -34,6 +34,10 @@ init([]) ->
type => worker,
modules => [emqx_router_helper]},
ok = persistent_term:put(emqx_route_lock_type,
application:get_env(emqx, route_lock_type, key)
),
%% Router pool
RouterPool = emqx_pool_sup:spec([router_pool, hash,
{emqx_router, start_link, []}]),

View File

@ -31,7 +31,9 @@
, delete/1
]).
-export([empty/0]).
-export([ empty/0
, lock_tables/0
]).
-ifdef(TEST).
-compile(export_all).
@ -120,6 +122,11 @@ delete(Topic) when is_binary(Topic) ->
empty() ->
ets:info(?TRIE_TAB, size) == 0.
-spec lock_tables() -> ok.
lock_tables() ->
mnesia:write_lock_table(?TRIE_TAB),
mnesia:write_lock_table(?TRIE_NODE_TAB).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -196,15 +196,21 @@ init(Req, Opts) ->
end.
websocket_init([Req, Opts]) ->
Peername = case proplists:get_bool(proxy_protocol, Opts)
andalso maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort} ->
{SrcAddr, SrcPort};
_ ->
cowboy_req:peer(Req)
end,
{Peername, Peercert} =
case proplists:get_bool(proxy_protocol, Opts)
andalso maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
ProxyName = {SrcAddr, SrcPort},
%% Notice: Only CN is available in Proxy Protocol V2 additional info
ProxySSL = case maps:get(cn, SSL, undefined) of
undeined -> nossl;
CN -> [{pp2_ssl_cn, CN}]
end,
{ProxyName, ProxySSL};
_ ->
{cowboy_req:peer(Req), cowboy_req:cert(Req)}
end,
Sockname = cowboy_req:sock(Req),
Peercert = cowboy_req:cert(Req),
WsCookie = try cowboy_req:parse_cookies(Req)
catch
error:badarg ->
@ -296,9 +302,6 @@ websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
NState = State#state{serialize = Serialize},
handle_incoming(Packet, cancel_idle_timer(NState));
websocket_info({incoming, ?PACKET(?PINGREQ)}, State) ->
return(enqueue(?PACKET(?PINGRESP), State));
websocket_info({incoming, Packet}, State) ->
handle_incoming(Packet, State);

View File

@ -165,7 +165,6 @@ t_handle_msg(_) ->
t_handle_msg_incoming(_) ->
?assertMatch({ok, _Out, _St}, emqx_connection:handle_msg({incoming, ?CONNECT_PACKET(#mqtt_packet_connect{})}, st())),
?assertEqual(ok, emqx_connection:handle_msg({incoming, ?PACKET(?PINGREQ)}, st())),
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)}, st())),
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>}, st())),

View File

@ -42,7 +42,8 @@ all() ->
groups() ->
[{parse, [parallel],
[t_parse_cont,
t_parse_frame_too_large
t_parse_frame_too_large,
t_parse_frame_malformed_variable_byte_integer
]},
{connect, [parallel],
[t_serialize_parse_v3_connect,
@ -129,6 +130,12 @@ t_parse_frame_too_large(_) ->
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
t_parse_frame_malformed_variable_byte_integer(_) ->
MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 4) >>,
ParseState = emqx_frame:initial_parse_state(#{}),
?catch_error(malformed_variable_byte_integer,
emqx_frame:parse(MalformedPayload, ParseState)).
t_serialize_parse_v3_connect(_) ->
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,