Compare commits
19 Commits
master
...
dev/v4.2.1
Author | SHA1 | Date |
---|---|---|
![]() |
eeffc209df | |
![]() |
84e032f2e1 | |
![]() |
989ab64c21 | |
![]() |
b45a60fd3c | |
![]() |
2f405f9fa6 | |
![]() |
74bc7c6a07 | |
![]() |
a4d8ef4f93 | |
![]() |
e9c14df7a3 | |
![]() |
154bf0d446 | |
![]() |
ebc1b8521b | |
![]() |
cee378c345 | |
![]() |
23164f0a2d | |
![]() |
ad73e252dc | |
![]() |
6429c948c1 | |
![]() |
28810b62c8 | |
![]() |
054f9907c6 | |
![]() |
b0ad39376c | |
![]() |
198da2c688 | |
![]() |
9e5a868bf1 |
|
@ -1,6 +1,7 @@
|
||||||
name: Elvis Linter
|
name: Elvis Linter
|
||||||
|
|
||||||
on: [pull_request]
|
on: [push]
|
||||||
|
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
|
|
|
@ -724,6 +724,7 @@ zone.external.acl_deny_action = ignore
|
||||||
## Numbers delimited by `|'. Zero or negative is to disable.
|
## Numbers delimited by `|'. Zero or negative is to disable.
|
||||||
zone.external.force_gc_policy = 16000|16MB
|
zone.external.force_gc_policy = 16000|16MB
|
||||||
|
|
||||||
|
|
||||||
## Max message queue length and total heap size to force shutdown
|
## Max message queue length and total heap size to force shutdown
|
||||||
## connection/session process.
|
## connection/session process.
|
||||||
## Message queue here is the Erlang process mailbox, but not the number
|
## Message queue here is the Erlang process mailbox, but not the number
|
||||||
|
@ -736,6 +737,16 @@ zone.external.force_gc_policy = 16000|16MB
|
||||||
## - 1000|32MB on ARCH_32 sytem
|
## - 1000|32MB on ARCH_32 sytem
|
||||||
#zone.external.force_shutdown_policy = 10000|64MB
|
#zone.external.force_shutdown_policy = 10000|64MB
|
||||||
|
|
||||||
|
## Performance toggle for subscribe/unsubscribe wildcard topic.
|
||||||
|
## Change this toggle only when there are many wildcard topics.
|
||||||
|
## Value: Enum
|
||||||
|
## - key: mnesia translational updates with per-key locks. recommended for single node setup.
|
||||||
|
## - tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
|
||||||
|
## - global: global lock protected updates. recommended for larger cluster.
|
||||||
|
## NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
|
||||||
|
## to be stopped before the change.
|
||||||
|
broker.perf.route_lock_type = key
|
||||||
|
|
||||||
## Maximum MQTT packet size allowed.
|
## Maximum MQTT packet size allowed.
|
||||||
##
|
##
|
||||||
## Value: Bytes
|
## Value: Bytes
|
||||||
|
@ -1088,10 +1099,18 @@ listener.tcp.external.access.1 = allow all
|
||||||
|
|
||||||
## Enable the option for X.509 certificate based authentication.
|
## Enable the option for X.509 certificate based authentication.
|
||||||
## EMQX will use the common name of certificate as MQTT username.
|
## EMQX will use the common name of certificate as MQTT username.
|
||||||
|
## Only support Proxy Protocol V2, the CN is available in Proxy Protocol V2 additional info
|
||||||
##
|
##
|
||||||
## Value: cn | dn | crt
|
## Value: cn | dn | crt
|
||||||
## listener.tcp.external.peer_cert_as_username = cn
|
## listener.tcp.external.peer_cert_as_username = cn
|
||||||
|
|
||||||
|
## Enable the option for X.509 certificate based authentication.
|
||||||
|
## EMQX will use the common name of certificate as MQTT clientid.
|
||||||
|
## Only support Proxy Protocol V2, the CN is available in Proxy Protocol V2 additional info
|
||||||
|
##
|
||||||
|
## Value: cn
|
||||||
|
## listener.tcp.external.peer_cert_as_clientid = cn
|
||||||
|
|
||||||
## The TCP backlog defines the maximum length that the queue of pending
|
## The TCP backlog defines the maximum length that the queue of pending
|
||||||
## connections can grow to.
|
## connections can grow to.
|
||||||
##
|
##
|
||||||
|
@ -1567,6 +1586,20 @@ listener.ws.external.verify_protocol_header = on
|
||||||
## Value: Duration
|
## Value: Duration
|
||||||
## listener.ws.external.proxy_protocol_timeout = 3s
|
## listener.ws.external.proxy_protocol_timeout = 3s
|
||||||
|
|
||||||
|
## Enable the option for X.509 certificate based authentication.
|
||||||
|
## EMQX will use the common name of certificate as MQTT username.
|
||||||
|
## Only support Proxy Protocol V2, the CN is available in Proxy Protocol V2 additional info
|
||||||
|
##
|
||||||
|
## Value: cn
|
||||||
|
## listener.ws.external.peer_cert_as_username = cn
|
||||||
|
|
||||||
|
## Enable the option for X.509 certificate based authentication.
|
||||||
|
## EMQX will use the common name of certificate as MQTT clientid.
|
||||||
|
## Only support Proxy Protocol V2, the CN is available in Proxy Protocol V2 additional info
|
||||||
|
##
|
||||||
|
## Value: cn
|
||||||
|
## listener.ws.external.peer_cert_as_clientid = cn
|
||||||
|
|
||||||
## The TCP backlog of external MQTT/WebSocket Listener.
|
## The TCP backlog of external MQTT/WebSocket Listener.
|
||||||
##
|
##
|
||||||
## See: listener.ws.$name.backlog
|
## See: listener.ws.$name.backlog
|
||||||
|
|
|
@ -1572,6 +1572,14 @@ end}.
|
||||||
hidden
|
hidden
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [
|
||||||
|
{datatype, {enum, [cn]}}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.ws.$name.peer_cert_as_clientid", "emqx.listeners", [
|
||||||
|
{datatype, {enum, [cn]}}
|
||||||
|
]}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% MQTT/WebSocket/SSL Listeners
|
%% MQTT/WebSocket/SSL Listeners
|
||||||
|
|
||||||
|
@ -2097,6 +2105,18 @@ end}.
|
||||||
{datatype, flag}
|
{datatype, flag}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
%% @doc Performance toggle for subscribe/unsubscribe wildcard topic.
|
||||||
|
%% Change this toggle only when there are many wildcard topics.
|
||||||
|
%% key: mnesia translational updates with per-key locks. recommended for single node setup.
|
||||||
|
%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
|
||||||
|
%% global: global lock protected updates. recommended for larger cluster.
|
||||||
|
%% NOTE: when changing from/to 'global' lock, it requires a full cluster restart.
|
||||||
|
%%
|
||||||
|
{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [
|
||||||
|
{default, key},
|
||||||
|
{datatype, {enum, [key, tab, global]}}
|
||||||
|
]}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% System Monitor
|
%% System Monitor
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
|
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
|
||||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
|
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
|
||||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}},
|
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}},
|
||||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.6"}}},
|
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.7"}}},
|
||||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
|
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
|
||||||
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
|
@ -8,20 +8,67 @@
|
||||||
end,
|
end,
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
|
{"4.2.11", [
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
||||||
|
]},
|
||||||
|
{"4.2.10", [
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{"4.2.9", [
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{<<"4.2.[34567]">>, [
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{"4.2.2", [
|
{"4.2.2", [
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []}
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{"4.2.1", [
|
{"4.2.1", [
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_json, brutal_purge, soft_purge, []}
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
]},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{"4.2.0", [
|
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_json, brutal_purge, soft_purge, []},
|
{load_module, emqx_json, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{"4.2.0", [
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_json, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
{apply, {application, set_env,
|
{apply, {application, set_env,
|
||||||
[emqx, force_shutdown_policy,
|
[emqx, force_shutdown_policy,
|
||||||
#{message_queue_len => DefaultLen,
|
#{message_queue_len => DefaultLen,
|
||||||
|
@ -30,20 +77,67 @@
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
|
{"4.2.11", [
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{"4.2.10", [
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{"4.2.9", [
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{<<"4.2.[34567]">>, [
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{"4.2.2", [
|
{"4.2.2", [
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []}
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{"4.2.1", [
|
{"4.2.1", [
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_json, brutal_purge, soft_purge, []}
|
{load_module, emqx_json, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{"4.2.0", [
|
{"4.2.0", [
|
||||||
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_json, brutal_purge, soft_purge, []}
|
{load_module, emqx_json, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
]
|
]
|
||||||
|
|
|
@ -419,7 +419,7 @@ safe_update_stats(Tab, Stat, MaxStat) ->
|
||||||
-compile({inline, [call/2, cast/2, pick/1]}).
|
-compile({inline, [call/2, cast/2, pick/1]}).
|
||||||
|
|
||||||
call(Broker, Req) ->
|
call(Broker, Req) ->
|
||||||
gen_server:call(Broker, Req).
|
gen_server:call(Broker, Req, infinity).
|
||||||
|
|
||||||
cast(Broker, Msg) ->
|
cast(Broker, Msg) ->
|
||||||
gen_server:cast(Broker, Msg).
|
gen_server:cast(Broker, Msg).
|
||||||
|
|
|
@ -343,9 +343,6 @@ handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
|
||||||
},
|
},
|
||||||
handle_incoming(Packet, NState);
|
handle_incoming(Packet, NState);
|
||||||
|
|
||||||
handle_msg({incoming, ?PACKET(?PINGREQ)}, State) ->
|
|
||||||
handle_outgoing(?PACKET(?PINGRESP), State);
|
|
||||||
|
|
||||||
handle_msg({incoming, Packet}, State) ->
|
handle_msg({incoming, Packet}, State) ->
|
||||||
handle_incoming(Packet, State);
|
handle_incoming(Packet, State);
|
||||||
|
|
||||||
|
|
|
@ -125,6 +125,9 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
|
||||||
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
|
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
|
||||||
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
|
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
|
||||||
parse_frame(Rest, Header, 2, Options);
|
parse_frame(Rest, Header, 2, Options);
|
||||||
|
parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options)
|
||||||
|
when Multiplier > 2097152 ->
|
||||||
|
error(malformed_variable_byte_integer);
|
||||||
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
|
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
|
||||||
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
|
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
|
||||||
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
|
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
|
||||||
|
|
|
@ -118,7 +118,8 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_router_helper:monitor(Dest),
|
ok = emqx_router_helper:monitor(Dest),
|
||||||
case emqx_topic:wildcard(Topic) of
|
case emqx_topic:wildcard(Topic) of
|
||||||
true -> trans(fun insert_trie_route/1, [Route]);
|
true ->
|
||||||
|
maybe_trans(fun insert_trie_route/1, [Route]);
|
||||||
false -> insert_direct_route(Route)
|
false -> insert_direct_route(Route)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
@ -164,7 +165,8 @@ do_delete_route(Topic) when is_binary(Topic) ->
|
||||||
do_delete_route(Topic, Dest) ->
|
do_delete_route(Topic, Dest) ->
|
||||||
Route = #route{topic = Topic, dest = Dest},
|
Route = #route{topic = Topic, dest = Dest},
|
||||||
case emqx_topic:wildcard(Topic) of
|
case emqx_topic:wildcard(Topic) of
|
||||||
true -> trans(fun delete_trie_route/1, [Route]);
|
true ->
|
||||||
|
maybe_trans(fun delete_trie_route/1, [Route]);
|
||||||
false -> delete_direct_route(Route)
|
false -> delete_direct_route(Route)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -247,10 +249,55 @@ delete_trie_route(Route = #route{topic = Topic}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
-spec(trans(function(), list(any())) -> ok | {error, term()}).
|
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
|
||||||
trans(Fun, Args) ->
|
maybe_trans(Fun, Args) ->
|
||||||
case mnesia:transaction(Fun, Args) of
|
case persistent_term:get(emqx_route_lock_type, key) of
|
||||||
{atomic, Ok} -> Ok;
|
key ->
|
||||||
{aborted, Reason} -> {error, Reason}
|
trans(Fun, Args);
|
||||||
|
global ->
|
||||||
|
lock_router(),
|
||||||
|
try mnesia:sync_dirty(Fun, Args)
|
||||||
|
after
|
||||||
|
unlock_router()
|
||||||
|
end;
|
||||||
|
tab ->
|
||||||
|
trans(fun() ->
|
||||||
|
emqx_trie:lock_tables(),
|
||||||
|
apply(Fun, Args)
|
||||||
|
end, [])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec(trans(function(), list(any())) -> ok | {error, term()}).
|
||||||
|
trans(Fun, Args) ->
|
||||||
|
%% trigger selective receive optimization of compiler,
|
||||||
|
%% ideal for handling bursty traffic.
|
||||||
|
{_, RefMon} = spawn_monitor(
|
||||||
|
fun() ->
|
||||||
|
Res = case mnesia:transaction(Fun, Args) of
|
||||||
|
{atomic, Ok} -> Ok;
|
||||||
|
{aborted, Reason} -> {error, Reason}
|
||||||
|
end,
|
||||||
|
exit({shutdown, Res})
|
||||||
|
end),
|
||||||
|
receive
|
||||||
|
{'DOWN', RefMon, process, _, Info} ->
|
||||||
|
case Info of
|
||||||
|
{shutdown, Res} -> Res;
|
||||||
|
_ -> {error, {trans_crash, Info}}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
lock_router() ->
|
||||||
|
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
|
||||||
|
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
|
||||||
|
case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
|
||||||
|
false ->
|
||||||
|
%% Force to sleep 1ms instead.
|
||||||
|
timer:sleep(1),
|
||||||
|
lock_router();
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
unlock_router() ->
|
||||||
|
global:del_lock({?MODULE, self()}).
|
||||||
|
|
|
@ -34,6 +34,10 @@ init([]) ->
|
||||||
type => worker,
|
type => worker,
|
||||||
modules => [emqx_router_helper]},
|
modules => [emqx_router_helper]},
|
||||||
|
|
||||||
|
ok = persistent_term:put(emqx_route_lock_type,
|
||||||
|
application:get_env(emqx, route_lock_type, key)
|
||||||
|
),
|
||||||
|
|
||||||
%% Router pool
|
%% Router pool
|
||||||
RouterPool = emqx_pool_sup:spec([router_pool, hash,
|
RouterPool = emqx_pool_sup:spec([router_pool, hash,
|
||||||
{emqx_router, start_link, []}]),
|
{emqx_router, start_link, []}]),
|
||||||
|
|
|
@ -31,7 +31,9 @@
|
||||||
, delete/1
|
, delete/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([empty/0]).
|
-export([ empty/0
|
||||||
|
, lock_tables/0
|
||||||
|
]).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
@ -120,6 +122,11 @@ delete(Topic) when is_binary(Topic) ->
|
||||||
empty() ->
|
empty() ->
|
||||||
ets:info(?TRIE_TAB, size) == 0.
|
ets:info(?TRIE_TAB, size) == 0.
|
||||||
|
|
||||||
|
-spec lock_tables() -> ok.
|
||||||
|
lock_tables() ->
|
||||||
|
mnesia:write_lock_table(?TRIE_TAB),
|
||||||
|
mnesia:write_lock_table(?TRIE_NODE_TAB).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -196,15 +196,21 @@ init(Req, Opts) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
websocket_init([Req, Opts]) ->
|
websocket_init([Req, Opts]) ->
|
||||||
Peername = case proplists:get_bool(proxy_protocol, Opts)
|
{Peername, Peercert} =
|
||||||
andalso maps:get(proxy_header, Req) of
|
case proplists:get_bool(proxy_protocol, Opts)
|
||||||
#{src_address := SrcAddr, src_port := SrcPort} ->
|
andalso maps:get(proxy_header, Req) of
|
||||||
{SrcAddr, SrcPort};
|
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
|
||||||
_ ->
|
ProxyName = {SrcAddr, SrcPort},
|
||||||
cowboy_req:peer(Req)
|
%% Notice: Only CN is available in Proxy Protocol V2 additional info
|
||||||
end,
|
ProxySSL = case maps:get(cn, SSL, undefined) of
|
||||||
|
undeined -> nossl;
|
||||||
|
CN -> [{pp2_ssl_cn, CN}]
|
||||||
|
end,
|
||||||
|
{ProxyName, ProxySSL};
|
||||||
|
_ ->
|
||||||
|
{cowboy_req:peer(Req), cowboy_req:cert(Req)}
|
||||||
|
end,
|
||||||
Sockname = cowboy_req:sock(Req),
|
Sockname = cowboy_req:sock(Req),
|
||||||
Peercert = cowboy_req:cert(Req),
|
|
||||||
WsCookie = try cowboy_req:parse_cookies(Req)
|
WsCookie = try cowboy_req:parse_cookies(Req)
|
||||||
catch
|
catch
|
||||||
error:badarg ->
|
error:badarg ->
|
||||||
|
@ -296,9 +302,6 @@ websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
|
||||||
NState = State#state{serialize = Serialize},
|
NState = State#state{serialize = Serialize},
|
||||||
handle_incoming(Packet, cancel_idle_timer(NState));
|
handle_incoming(Packet, cancel_idle_timer(NState));
|
||||||
|
|
||||||
websocket_info({incoming, ?PACKET(?PINGREQ)}, State) ->
|
|
||||||
return(enqueue(?PACKET(?PINGRESP), State));
|
|
||||||
|
|
||||||
websocket_info({incoming, Packet}, State) ->
|
websocket_info({incoming, Packet}, State) ->
|
||||||
handle_incoming(Packet, State);
|
handle_incoming(Packet, State);
|
||||||
|
|
||||||
|
|
|
@ -165,7 +165,6 @@ t_handle_msg(_) ->
|
||||||
|
|
||||||
t_handle_msg_incoming(_) ->
|
t_handle_msg_incoming(_) ->
|
||||||
?assertMatch({ok, _Out, _St}, emqx_connection:handle_msg({incoming, ?CONNECT_PACKET(#mqtt_packet_connect{})}, st())),
|
?assertMatch({ok, _Out, _St}, emqx_connection:handle_msg({incoming, ?CONNECT_PACKET(#mqtt_packet_connect{})}, st())),
|
||||||
?assertEqual(ok, emqx_connection:handle_msg({incoming, ?PACKET(?PINGREQ)}, st())),
|
|
||||||
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
|
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
|
||||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)}, st())),
|
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)}, st())),
|
||||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>}, st())),
|
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>}, st())),
|
||||||
|
|
|
@ -42,7 +42,8 @@ all() ->
|
||||||
groups() ->
|
groups() ->
|
||||||
[{parse, [parallel],
|
[{parse, [parallel],
|
||||||
[t_parse_cont,
|
[t_parse_cont,
|
||||||
t_parse_frame_too_large
|
t_parse_frame_too_large,
|
||||||
|
t_parse_frame_malformed_variable_byte_integer
|
||||||
]},
|
]},
|
||||||
{connect, [parallel],
|
{connect, [parallel],
|
||||||
[t_serialize_parse_v3_connect,
|
[t_serialize_parse_v3_connect,
|
||||||
|
@ -129,6 +130,12 @@ t_parse_frame_too_large(_) ->
|
||||||
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
|
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
|
||||||
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
|
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
|
||||||
|
|
||||||
|
t_parse_frame_malformed_variable_byte_integer(_) ->
|
||||||
|
MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 4) >>,
|
||||||
|
ParseState = emqx_frame:initial_parse_state(#{}),
|
||||||
|
?catch_error(malformed_variable_byte_integer,
|
||||||
|
emqx_frame:parse(MalformedPayload, ParseState)).
|
||||||
|
|
||||||
t_serialize_parse_v3_connect(_) ->
|
t_serialize_parse_v3_connect(_) ->
|
||||||
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
|
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
|
||||||
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,
|
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,
|
||||||
|
|
Loading…
Reference in New Issue