Merge pull request #3202 from emqx/master

Auto-pull-request-by-2020-01-17
This commit is contained in:
turtleDeng 2020-01-17 19:49:48 +08:00 committed by GitHub
commit 7a1d22e79f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 350 additions and 130 deletions

View File

@ -2,7 +2,7 @@
[![GitHub Release](https://img.shields.io/github/release/emqx/emqx?color=brightgreen)](https://github.com/emqx/emqx/releases) [![GitHub Release](https://img.shields.io/github/release/emqx/emqx?color=brightgreen)](https://github.com/emqx/emqx/releases)
[![Build Status](https://travis-ci.org/emqx/emqx.svg)](https://travis-ci.org/emqx/emqx) [![Build Status](https://travis-ci.org/emqx/emqx.svg)](https://travis-ci.org/emqx/emqx)
[![Coverage Status](https://coveralls.io/repos/github/emqx/emqx/badge.svg)](https://coveralls.io/github/emqx/emqx) [![Coverage Status](https://coveralls.io/repos/github/emqx/emqx/badge.svg?branch=master)](https://coveralls.io/github/emqx/emqx?branch=master)
[![Docker Pulls](https://img.shields.io/docker/pulls/emqx/emqx)](https://hub.docker.com/r/emqx/emqx) [![Docker Pulls](https://img.shields.io/docker/pulls/emqx/emqx)](https://hub.docker.com/r/emqx/emqx)
[![Slack Invite](<https://slack-invite.emqx.io/badge.svg>)](https://slack-invite.emqx.io) [![Slack Invite](<https://slack-invite.emqx.io/badge.svg>)](https://slack-invite.emqx.io)
[![Twitter](https://img.shields.io/badge/Twitter-EMQ%20X-1DA1F2?logo=twitter)](https://twitter.com/emqtt) [![Twitter](https://img.shields.io/badge/Twitter-EMQ%20X-1DA1F2?logo=twitter)](https://twitter.com/emqtt)
@ -40,7 +40,7 @@ docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p
The *EMQ X* broker requires Erlang/OTP R21+ to build since 3.0 release. The *EMQ X* broker requires Erlang/OTP R21+ to build since 3.0 release.
``` ```
git clone https://github.com/emqx/emqx-rel.git git clone -b v4.0.0 https://github.com/emqx/emqx-rel.git
cd emqx-rel && make cd emqx-rel && make
@ -74,7 +74,7 @@ The [EMQ X Roadmap uses Github milestones](https://github.com/emqx/emqx/mileston
## Community, discussion, contribution, and support ## Community, discussion, contribution, and support
You can reach the EMQ community and developers via the following channels: You can reach the EMQ community and developers via the following channels:
- [EMQX Slack](http://emqx.slack.com) - [EMQX Slack](https://slack-invite.emqx.io/)
- [Twitter](https://twitter.com/emqtt) - [Twitter](https://twitter.com/emqtt)
- [Forum](https://groups.google.com/d/forum/emqtt) - [Forum](https://groups.google.com/d/forum/emqtt)
- [Blog](https://medium.com/@emqtt) - [Blog](https://medium.com/@emqtt)

View File

@ -211,7 +211,7 @@ node.data_dir = {{ platform_data_dir }}
## Value: 0-1024 ## Value: 0-1024
## ##
## vm.args: +A Number ## vm.args: +A Number
node.async_threads = 32 ## node.async_threads = 4
## Sets the maximum number of simultaneously existing processes for this ## Sets the maximum number of simultaneously existing processes for this
## system if a Number is passed as value. ## system if a Number is passed as value.
@ -221,7 +221,7 @@ node.async_threads = 32
## Value: Number [1024-134217727] ## Value: Number [1024-134217727]
## ##
## vm.args: +P Number ## vm.args: +P Number
node.process_limit = 2048000 ## node.process_limit = 2097152
## Sets the maximum number of simultaneously existing ports for this system. ## Sets the maximum number of simultaneously existing ports for this system.
## ##
@ -230,16 +230,16 @@ node.process_limit = 2048000
## Value: Number [1024-134217727] ## Value: Number [1024-134217727]
## ##
## vm.args: +Q Number ## vm.args: +Q Number
node.max_ports = 1024000 ## node.max_ports = 1048576
## Set the distribution buffer busy limit (dist_buf_busy_limit). ## Sets the distribution buffer busy limit (dist_buf_busy_limit).
## ##
## See: http://erlang.org/doc/man/erl.html ## See: http://erlang.org/doc/man/erl.html
## ##
## Value: Number [1KB-2GB] ## Value: Number [1KB-2GB]
## ##
## vm.args: +zdbbl size ## vm.args: +zdbbl size
node.dist_buffer_size = 8MB ## node.dist_buffer_size = 8MB
## Sets the maximum number of ETS tables. Note that mnesia and SSL will ## Sets the maximum number of ETS tables. Note that mnesia and SSL will
## create temporary ETS tables. ## create temporary ETS tables.
@ -247,14 +247,26 @@ node.dist_buffer_size = 8MB
## Value: Number ## Value: Number
## ##
## vm.args: +e Number ## vm.args: +e Number
node.max_ets_tables = 256000 ## node.max_ets_tables = 262144
## Global GC Interval.
##
## Value: Duration
##
## Examples:
## - 2h: 2 hours
## - 30m: 30 minutes
## - 20s: 20 seconds
##
## Defaut: 15 minutes
node.global_gc_interval = 15m
## Tweak GC to run more often. ## Tweak GC to run more often.
## ##
## Value: Number [0-65535] ## Value: Number [0-65535]
## ##
## vm.args: -env ERL_FULLSWEEP_AFTER Number ## vm.args: -env ERL_FULLSWEEP_AFTER Number
node.fullsweep_after = 1000 ## node.fullsweep_after = 1000
## Crash dump log file. ## Crash dump log file.
## ##
@ -277,7 +289,7 @@ node.crash_dump = {{ platform_log_dir }}/crash.dump
## Value: Number ## Value: Number
## ##
## vm.args: -kernel net_ticktime Number ## vm.args: -kernel net_ticktime Number
node.dist_net_ticktime = 60 ## node.dist_net_ticktime = 120
## Sets the port range for the listener socket of a distributed Erlang node. ## Sets the port range for the listener socket of a distributed Erlang node.
## Note that if there are firewalls between clustered nodes, this port segment ## Note that if there are firewalls between clustered nodes, this port segment
@ -315,10 +327,11 @@ rpc.tcp_server_port = 5369
## Value: Port [1024-65535] ## Value: Port [1024-65535]
rpc.tcp_client_port = 5369 rpc.tcp_client_port = 5369
## Number of utgoing RPC connections. ## Number of outgoing RPC connections.
## ##
## Value: Interger [1-256] ## Value: Interger [1-256]
rpc.tcp_client_num = 32 ## Defaults to NumberOfCPUSchedulers / 2
#rpc.tcp_client_num = 1
## RCP Client connect timeout. ## RCP Client connect timeout.
## ##
@ -589,11 +602,11 @@ zone.external.enable_stats = on
## Default: ignore ## Default: ignore
zone.external.acl_deny_action = ignore zone.external.acl_deny_action = ignore
## Force MQTT connection/session process GC after this number of ## Force the MQTT connection process GC after this number of
## messages | bytes passed through. ## messages | bytes passed through.
## ##
## Numbers delimited by `|'. Zero or negative is to disable. ## Numbers delimited by `|'. Zero or negative is to disable.
zone.external.force_gc_policy = 1000|1MB zone.external.force_gc_policy = 16000|16MB
## Max message queue length and total heap size to force shutdown ## Max message queue length and total heap size to force shutdown
## connection/session process. ## connection/session process.
@ -601,11 +614,7 @@ zone.external.force_gc_policy = 1000|1MB
## of queued MQTT messages of QoS 1 and 2. ## of queued MQTT messages of QoS 1 and 2.
## ##
## Numbers delimited by `|'. Zero or negative is to disable. ## Numbers delimited by `|'. Zero or negative is to disable.
## ## zone.external.force_shutdown_policy = 32000|32MB
## Default:
## - 10000|32MB on ARCH_64 system
## - 10000|16MB on ARCH_32 sytem
## zone.external.force_shutdown_policy = 10000|32MB
## Maximum MQTT packet size allowed. ## Maximum MQTT packet size allowed.
## ##
@ -780,6 +789,9 @@ zone.internal.enable_acl = off
## Default: ignore ## Default: ignore
zone.internal.acl_deny_action = ignore zone.internal.acl_deny_action = ignore
## See zone.$name.force_gc_policy
## zone.internal.force_gc_policy = 128000|128MB
## See zone.$name.wildcard_subscription. ## See zone.$name.wildcard_subscription.
## ##
## Value: boolean ## Value: boolean
@ -821,11 +833,7 @@ zone.internal.mqueue_store_qos0 = true
zone.internal.enable_flapping_detect = off zone.internal.enable_flapping_detect = off
## See zone.$name.force_shutdown_policy ## See zone.$name.force_shutdown_policy
## ## zone.internal.force_shutdown_policy = 128000|128MB
## Default:
## - 10000|32MB on ARCH_64 system
## - 10000|16MB on ARCH_32 sytem
zone.internal.force_shutdown_policy = 100000|64MB
## All the topics will be prefixed with the mountpoint path if this option is enabled. ## All the topics will be prefixed with the mountpoint path if this option is enabled.
## ##

View File

@ -1,6 +1,6 @@
############################## ######################################################################
# Erlang VM Args ## Erlang VM Args for EMQ X Broker
############################## ######################################################################
## NOTE: ## NOTE:
## ##
@ -10,13 +10,13 @@
## such as `node.name` for `-name` and `node.cooke` for `-setcookie`. ## such as `node.name` for `-name` and `node.cooke` for `-setcookie`.
## Sets the maximum number of simultaneously existing processes for this system. ## Sets the maximum number of simultaneously existing processes for this system.
#+P 2048000 +P 2097152
## Sets the maximum number of simultaneously existing ports for this system. ## Sets the maximum number of simultaneously existing ports for this system.
#+Q 1024000 +Q 1048576
## Sets the maximum number of ETS tables ## Sets the maximum number of ETS tables
#+e 256000 +e 262144
## Sets the maximum number of atoms the virtual machine can handle. ## Sets the maximum number of atoms the virtual machine can handle.
#+t 1048576 #+t 1048576
@ -26,7 +26,7 @@
## Set how many times generational garbages collections can be done without ## Set how many times generational garbages collections can be done without
## forcing a fullsweep collection. ## forcing a fullsweep collection.
#-env ERL_FULLSWEEP_AFTER 1000 -env ERL_FULLSWEEP_AFTER 1000
## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive ## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive
## (Disabled by default..use with caution!) ## (Disabled by default..use with caution!)
@ -43,7 +43,7 @@
## Specifies the net_kernel tick time in seconds. ## Specifies the net_kernel tick time in seconds.
## This is the approximate time a connected node may be unresponsive until ## This is the approximate time a connected node may be unresponsive until
## it is considered down and thereby disconnected. ## it is considered down and thereby disconnected.
#-kernel net_ticktime 60 -kernel net_ticktime 120
## Sets the distribution buffer busy limit (dist_buf_busy_limit). ## Sets the distribution buffer busy limit (dist_buf_busy_limit).
#+zdbbl 8192 #+zdbbl 8192
@ -52,7 +52,8 @@
+spp true +spp true
## Sets the number of threads in async thread pool. Valid range is 0-1024. ## Sets the number of threads in async thread pool. Valid range is 0-1024.
#+A 8 ## Increase the parameter if there are many simultaneous file I/O operations.
+A 4
## Sets the default heap size of processes to the size Size. ## Sets the default heap size of processes to the size Size.
#+hms 233 #+hms 233
@ -60,11 +61,20 @@
## Sets the default binary virtual heap size of processes to the size Size. ## Sets the default binary virtual heap size of processes to the size Size.
#+hmbs 46422 #+hmbs 46422
## Sets the default maximum heap size of processes to the size Size.
## Defaults to 0, which means that no maximum heap size is used.
##For more information, see process_flag(max_heap_size, MaxHeapSize).
#+hmax 0
## Sets the default value for process flag message_queue_data. Defaults to on_heap.
#+hmqd on_heap | off_heap
## Sets the number of IO pollsets to use when polling for I/O. ## Sets the number of IO pollsets to use when polling for I/O.
#+IOp 1 #+IOp 1
## Sets the number of IO poll threads to use when polling for I/O. ## Sets the number of IO poll threads to use when polling for I/O.
#+IOt 1 ## Increase this for the busy systems with many concurrent connection.
+IOt 4
## Sets the number of scheduler threads to create and scheduler threads to set online. ## Sets the number of scheduler threads to create and scheduler threads to set online.
#+S 8:8 #+S 8:8
@ -73,7 +83,7 @@
#+SDcpu 8:8 #+SDcpu 8:8
## Sets the number of dirty I/O scheduler threads to create. ## Sets the number of dirty I/O scheduler threads to create.
#+SDio 10 +SDio 8
## Suggested stack size, in kilowords, for scheduler threads. ## Suggested stack size, in kilowords, for scheduler threads.
#+sss 32 #+sss 32
@ -92,4 +102,14 @@
#+sct L0-3c0-3p0N0:L4-7c0-3p1N1 #+sct L0-3c0-3p0N0:L4-7c0-3p1N1
## Sets the mapping of warning messages for error_logger ## Sets the mapping of warning messages for error_logger
#+W w #+W w
## Sets time warp mode: no_time_warp | single_time_warp | multi_time_warp
#+C no_time_warp
## Prevents loading information about source filenames and line numbers.
#+L
## Specifies how long time (in milliseconds) to spend shutting down the system.
## See: http://erlang.org/doc/man/erl.html
-shutdown_time 30000

View File

@ -1,6 +1,6 @@
############################## ######################################################################
# Erlang VM Args ## Erlang VM Args for EMQ X Edge
############################## ######################################################################
## NOTE: ## NOTE:
## ##
@ -10,8 +10,7 @@
## such as `node.name` for `-name` and `node.cooke` for `-setcookie`. ## such as `node.name` for `-name` and `node.cooke` for `-setcookie`.
## Sets the maximum number of simultaneously existing processes for this system. ## Sets the maximum number of simultaneously existing processes for this system.
+P 20480 +P 16384
## Sets the maximum number of simultaneously existing ports for this system. ## Sets the maximum number of simultaneously existing ports for this system.
+Q 4096 +Q 4096
@ -19,7 +18,7 @@
+e 512 +e 512
## Sets the maximum number of atoms the virtual machine can handle. ## Sets the maximum number of atoms the virtual machine can handle.
+t 65536 +t 262144
## Set the location of crash dumps ## Set the location of crash dumps
-env ERL_CRASH_DUMP {{ platform_log_dir }}/crash.dump -env ERL_CRASH_DUMP {{ platform_log_dir }}/crash.dump
@ -30,7 +29,7 @@
## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive ## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive
## (Disabled by default..use with caution!) ## (Disabled by default..use with caution!)
#-heart -heart
## Specify the erlang distributed protocol. ## Specify the erlang distributed protocol.
## Can be one of: inet_tcp, inet6_tcp, inet_tls ## Can be one of: inet_tcp, inet6_tcp, inet_tls
@ -52,6 +51,7 @@
+spp false +spp false
## Sets the number of threads in async thread pool. Valid range is 0-1024. ## Sets the number of threads in async thread pool. Valid range is 0-1024.
## Increase the parameter if there are many simultaneous file I/O operations.
+A 1 +A 1
## Sets the default heap size of processes to the size Size. ## Sets the default heap size of processes to the size Size.
@ -60,6 +60,14 @@
## Sets the default binary virtual heap size of processes to the size Size. ## Sets the default binary virtual heap size of processes to the size Size.
#+hmbs 46422 #+hmbs 46422
## Sets the default maximum heap size of processes to the size Size.
## Defaults to 0, which means that no maximum heap size is used.
##For more information, see process_flag(max_heap_size, MaxHeapSize).
#+hmax 0
## Sets the default value for process flag message_queue_data. Defaults to on_heap.
#+hmqd on_heap | off_heap
## Sets the number of IO pollsets to use when polling for I/O. ## Sets the number of IO pollsets to use when polling for I/O.
+IOp 1 +IOp 1
@ -94,5 +102,13 @@
## Sets the mapping of warning messages for error_logger ## Sets the mapping of warning messages for error_logger
#+W w #+W w
#Prevents loading information about source filenames and line numbers. ## Sets time warp mode: no_time_warp | single_time_warp | multi_time_warp
#+C no_time_warp
## Prevents loading information about source filenames and line numbers.
+L +L
## Specifies how long time (in milliseconds) to spend shutting down the system.
## See: http://erlang.org/doc/man/erl.html
-shutdown_time 10000

View File

@ -205,8 +205,6 @@ end}.
{default, "emqx@127.0.0.1"} {default, "emqx@127.0.0.1"}
]}. ]}.
%% @doc Specify SSL Options in the file if using SSL for erlang distribution %% @doc Specify SSL Options in the file if using SSL for erlang distribution
{mapping, "node.ssl_dist_optfile", "vm_args.-ssl_dist_optfile", [ {mapping, "node.ssl_dist_optfile", "vm_args.-ssl_dist_optfile", [
{datatype, string}, {datatype, string},
@ -237,7 +235,6 @@ end}.
%% @doc More information at: http://erlang.org/doc/man/erl.html %% @doc More information at: http://erlang.org/doc/man/erl.html
{mapping, "node.async_threads", "vm_args.+A", [ {mapping, "node.async_threads", "vm_args.+A", [
{default, 64},
{datatype, integer}, {datatype, integer},
{validators, ["range:0-1024"]} {validators, ["range:0-1024"]}
]}. ]}.
@ -245,16 +242,12 @@ end}.
%% @doc Erlang Process Limit %% @doc Erlang Process Limit
{mapping, "node.process_limit", "vm_args.+P", [ {mapping, "node.process_limit", "vm_args.+P", [
{datatype, integer}, {datatype, integer},
{default, 256000},
hidden hidden
]}. ]}.
%% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q %% @doc The maximum number of concurrent ports/sockets.
%% @doc The number of concurrent ports/sockets
%% Valid range is 1024-134217727 %% Valid range is 1024-134217727
{mapping, "node.max_ports", {mapping, "node.max_ports", "vm_args.+Q", [
cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [
{default, 262144},
{datatype, integer}, {datatype, integer},
{validators, ["range4ports"]} {validators, ["range4ports"]}
]}. ]}.
@ -287,6 +280,11 @@ end}.
end end
}. }.
%% @doc Global GC Interval
{mapping, "node.global_gc_interval", "emqx.global_gc_interval", [
{datatype, {duration, s}}
]}.
%% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2 %% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2
{mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [ {mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [
{default, 1000}, {default, 1000},
@ -317,7 +315,6 @@ end}.
%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime %% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime
{mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [ {mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [
{commented, 60},
{datatype, integer}, {datatype, integer},
hidden hidden
]}. ]}.
@ -365,11 +362,18 @@ end}.
%% Default TCP port for outgoing connections %% Default TCP port for outgoing connections
{mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [ {mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [
{default, 32}, {default, 0},
{datatype, integer}, {datatype, integer},
{validators, ["range:gt_0_lt_256"]} {validators, ["range:gt_0_lt_256"]}
]}. ]}.
{translation, "gen_rpc.tcp_client_num", fun(Conf) ->
case cuttlefish:conf_get("rpc.tcp_client_num", Conf) of
0 -> max(1, erlang:system_info(schedulers) div 2);
V -> V
end
end}.
%% Client connect timeout %% Client connect timeout
{mapping, "rpc.connect_timeout", "gen_rpc.connect_timeout", [ {mapping, "rpc.connect_timeout", "gen_rpc.connect_timeout", [
{default, "5s"}, {default, "5s"},
@ -431,7 +435,7 @@ end}.
]}. ]}.
{validator, "range:gt_0_lt_256", "must greater than 0 and less than 256", {validator, "range:gt_0_lt_256", "must greater than 0 and less than 256",
fun(X) -> X > 0 andalso X < 256 end fun(X) -> X >= 0 andalso X < 256 end
}. }.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -640,15 +644,15 @@ end}.
{translation, "emqx.flapping_detect_policy", fun(Conf) -> {translation, "emqx.flapping_detect_policy", fun(Conf) ->
Policy = cuttlefish:conf_get("flapping_detect_policy", Conf), Policy = cuttlefish:conf_get("flapping_detect_policy", Conf),
[Threshold, Duration, Interval] = string:tokens(Policy, ", "), [Threshold, Duration, Interval] = string:tokens(Policy, ", "),
ParseDuration = fun(S) -> ParseDuration = fun(S, Dur) ->
case cuttlefish_duration:parse(S, ms) of case cuttlefish_duration:parse(S, Dur) of
I when is_integer(I) -> I; I when is_integer(I) -> I;
{error, Reason} -> error(Reason) {error, Reason} -> error(Reason)
end end
end, end,
#{threshold => list_to_integer(Threshold), #{threshold => list_to_integer(Threshold),
duration => ParseDuration(Duration), duration => ParseDuration(Duration, ms),
banned_interval => ParseDuration(Interval) banned_interval => ParseDuration(Interval, s)
} }
end}. end}.
@ -913,7 +917,6 @@ end}.
%% messages | bytes passed through. %% messages | bytes passed through.
%% Numbers delimited by `|'. Zero or negative is to disable. %% Numbers delimited by `|'. Zero or negative is to disable.
{mapping, "zone.$name.force_gc_policy", "emqx.zones", [ {mapping, "zone.$name.force_gc_policy", "emqx.zones", [
{default, "0 | 0MB"},
{datatype, string} {datatype, string}
]}. ]}.
@ -923,7 +926,6 @@ end}.
%% of queued MQTT messages of QoS 1 and 2. %% of queued MQTT messages of QoS 1 and 2.
%% Zero or negative is to disable. %% Zero or negative is to disable.
{mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [ {mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [
{default, "default"},
{datatype, string} {datatype, string}
]}. ]}.
@ -963,17 +965,6 @@ end}.
count => list_to_integer(Count)} count => list_to_integer(Count)}
end, end,
{force_gc_policy, GcPolicy}; {force_gc_policy, GcPolicy};
("force_shutdown_policy", "default") ->
{DefaultLen, DefaultSize} =
case WordSize = erlang:system_info(wordsize) of
8 -> % arch_64
{10000, cuttlefish_bytesize:parse("32MB")};
4 -> % arch_32
{10000, cuttlefish_bytesize:parse("16MB")}
end,
{force_shutdown_policy, #{message_queue_len => DefaultLen,
max_heap_size => DefaultSize div WordSize
}};
("force_shutdown_policy", Val) -> ("force_shutdown_policy", Val) ->
[Len, Siz] = string:tokens(Val, "| "), [Len, Siz] = string:tokens(Val, "| "),
MaxSiz = case WordSize = erlang:system_info(wordsize) of MaxSiz = case WordSize = erlang:system_info(wordsize) of

View File

@ -700,14 +700,10 @@ return_unsuback(Packet, Channel) ->
| {shutdown, Reason :: term(), Reply :: term(), channel()}). | {shutdown, Reason :: term(), Reply :: term(), channel()}).
handle_call(kick, Channel) -> handle_call(kick, Channel) ->
Channel1 = ensure_disconnected(kicked, Channel), Channel1 = ensure_disconnected(kicked, Channel),
shutdown(kicked, ok, Channel1); disconnect_and_shutdown(kicked, ok, Channel1);
handle_call(discard, Channel = #channel{conn_state = connected}) -> handle_call(discard, Channel) ->
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), disconnect_and_shutdown(discarded, ok, Channel);
{shutdown, discarded, ok, Packet, Channel};
handle_call(discard, Channel = #channel{conn_state = disconnected}) ->
shutdown(discarded, ok, Channel);
%% Session Takeover %% Session Takeover
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
@ -719,7 +715,7 @@ handle_call({takeover, 'end'}, Channel = #channel{session = Session,
%% TODO: Should not drain deliver here (side effect) %% TODO: Should not drain deliver here (side effect)
Delivers = emqx_misc:drain_deliver(), Delivers = emqx_misc:drain_deliver(),
AllPendings = lists:append(Delivers, Pendings), AllPendings = lists:append(Delivers, Pendings),
shutdown(takeovered, AllPendings, Channel); disconnect_and_shutdown(takeovered, AllPendings, Channel);
handle_call(list_acl_cache, Channel) -> handle_call(list_acl_cache, Channel) ->
{reply, emqx_acl_cache:list_acl_cache(), Channel}; {reply, emqx_acl_cache:list_acl_cache(), Channel};
@ -1293,6 +1289,10 @@ publish_will_msg(Msg) -> emqx_broker:publish(Msg).
disconnect_reason(?RC_SUCCESS) -> normal; disconnect_reason(?RC_SUCCESS) -> normal;
disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode). disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode).
reason_code(takeovered) -> ?RC_SESSION_TAKEN_OVER;
reason_code(discarded) -> ?RC_SESSION_TAKEN_OVER;
reason_code(_) -> ?RC_NORMAL_DISCONNECTION.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1330,6 +1330,18 @@ shutdown(success, Reply, Channel) ->
shutdown(Reason, Reply, Channel) -> shutdown(Reason, Reply, Channel) ->
{shutdown, Reason, Reply, Channel}. {shutdown, Reason, Reply, Channel}.
shutdown(success, Reply, Packet, Channel) ->
shutdown(normal, Reply, Packet, Channel);
shutdown(Reason, Reply, Packet, Channel) ->
{shutdown, Reason, Reply, Packet, Channel}.
disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = connected,
conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel);
disconnect_and_shutdown(Reason, Reply, Channel) ->
shutdown(Reason, Reply, Channel).
sp(true) -> 1; sp(true) -> 1;
sp(false) -> 0. sp(false) -> 0.

View File

@ -124,7 +124,7 @@ handle_cast({detected, #flapping{clientid = ClientId,
true -> %% Flapping happened:( true -> %% Flapping happened:(
?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms", ?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms",
[ClientId, inet:ntoa(PeerHost), DetectCnt, Duration]), [ClientId, inet:ntoa(PeerHost), DetectCnt, Duration]),
Now = erlang:system_time(millisecond), Now = erlang:system_time(second),
Banned = #banned{who = {clientid, ClientId}, Banned = #banned{who = {clientid, ClientId},
by = <<"flapping detector">>, by = <<"flapping detector">>,
reason = <<"flapping is detected">>, reason = <<"flapping is detected">>,

102
src/emqx_global_gc.erl Normal file
View File

@ -0,0 +1,102 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_global_gc).
-behaviour(gen_server).
-include("types.hrl").
-export([start_link/0, stop/0]).
-export([run/0]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
%% 5 minutes
%% -define(DEFAULT_INTERVAL, 300000).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-spec(start_link() -> startlink_ret()).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec(run() -> {ok, timer:time()}).
run() -> gen_server:call(?MODULE, run, infinity).
-spec(stop() -> ok).
stop() -> gen_server:stop(?MODULE).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
{ok, ensure_timer(#{timer => undefined})}.
handle_call(run, _From, State) ->
{Time, ok} = timer:tc(fun run_gc/0),
{reply, {ok, Time div 1000}, State, hibernate};
handle_call(_Req, _From, State) ->
{reply, ignored, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({timeout, TRef, run}, State = #{timer := TRef}) ->
ok = run_gc(),
{noreply, ensure_timer(State), hibernate};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internel function
%%--------------------------------------------------------------------
ensure_timer(State) ->
case emqx:get_env(global_gc_interval) of
undefined -> State;
Interval -> TRef = emqx_misc:start_timer(timer:seconds(Interval), run),
State#{timer := TRef}
end.
run_gc() -> lists:foreach(fun do_gc/1, processes()).
do_gc(Pid) ->
is_waiting(Pid) andalso garbage_collect(Pid, [{type, 'minor'}]).
-compile({inline, [is_waiting/1]}).
is_waiting(Pid) ->
{status, waiting} == process_info(Pid, status).

View File

@ -27,7 +27,8 @@ start_link() ->
init([]) -> init([]) ->
{ok, {{one_for_one, 10, 100}, {ok, {{one_for_one, 10, 100},
[child_spec(emqx_pool_sup, supervisor), [child_spec(emqx_global_gc, worker),
child_spec(emqx_pool_sup, supervisor),
child_spec(emqx_hooks, worker), child_spec(emqx_hooks, worker),
child_spec(emqx_stats, worker), child_spec(emqx_stats, worker),
child_spec(emqx_metrics, worker), child_spec(emqx_metrics, worker),
@ -40,7 +41,8 @@ child_spec(M, worker) ->
restart => permanent, restart => permanent,
shutdown => 5000, shutdown => 5000,
type => worker, type => worker,
modules => [M]}; modules => [M]
};
child_spec(M, supervisor) -> child_spec(M, supervisor) ->
#{id => M, #{id => M,
@ -48,6 +50,6 @@ child_spec(M, supervisor) ->
restart => permanent, restart => permanent,
shutdown => infinity, shutdown => infinity,
type => supervisor, type => supervisor,
modules => [M]}. modules => [M]
}.

View File

@ -82,7 +82,7 @@ start_mqtt_listener(Name, ListenOn, Options) ->
{emqx_connection, start_link, [Options -- SockOpts]}). {emqx_connection, start_link, [Options -- SockOpts]}).
start_http_listener(Start, Name, ListenOn, RanchOpts, ProtoOpts) -> start_http_listener(Start, Name, ListenOn, RanchOpts, ProtoOpts) ->
Start(Name, with_port(ListenOn, RanchOpts), ProtoOpts). Start(ws_name(Name, ListenOn), with_port(ListenOn, RanchOpts), ProtoOpts).
mqtt_path(Options) -> mqtt_path(Options) ->
proplists:get_value(mqtt_path, Options, "/mqtt"). proplists:get_value(mqtt_path, Options, "/mqtt").
@ -125,10 +125,10 @@ restart_listener(tcp, ListenOn, _Options) ->
restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls -> restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls ->
esockd:reopen('mqtt:ssl', ListenOn); esockd:reopen('mqtt:ssl', ListenOn);
restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
cowboy:stop_listener('mqtt:ws'), cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)),
start_listener(Proto, ListenOn, Options); start_listener(Proto, ListenOn, Options);
restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
cowboy:stop_listener('mqtt:wss'), cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)),
start_listener(Proto, ListenOn, Options); start_listener(Proto, ListenOn, Options);
restart_listener(Proto, ListenOn, _Opts) -> restart_listener(Proto, ListenOn, _Opts) ->
esockd:reopen(Proto, ListenOn). esockd:reopen(Proto, ListenOn).
@ -156,10 +156,10 @@ stop_listener(tcp, ListenOn, _Opts) ->
esockd:close('mqtt:tcp', ListenOn); esockd:close('mqtt:tcp', ListenOn);
stop_listener(Proto, ListenOn, _Opts) when Proto == ssl; Proto == tls -> stop_listener(Proto, ListenOn, _Opts) when Proto == ssl; Proto == tls ->
esockd:close('mqtt:ssl', ListenOn); esockd:close('mqtt:ssl', ListenOn);
stop_listener(Proto, _ListenOn, _Opts) when Proto == http; Proto == ws -> stop_listener(Proto, ListenOn, _Opts) when Proto == http; Proto == ws ->
cowboy:stop_listener('mqtt:ws'); cowboy:stop_listener(ws_name('mqtt:ws', ListenOn));
stop_listener(Proto, _ListenOn, _Opts) when Proto == https; Proto == wss -> stop_listener(Proto, ListenOn, _Opts) when Proto == https; Proto == wss ->
cowboy:stop_listener('mqtt:wss'); cowboy:stop_listener(ws_name('mqtt:wss', ListenOn));
stop_listener(Proto, ListenOn, _Opts) -> stop_listener(Proto, ListenOn, _Opts) ->
esockd:close(Proto, ListenOn). esockd:close(Proto, ListenOn).
@ -178,3 +178,7 @@ format({Addr, Port}) when is_list(Addr) ->
format({Addr, Port}) when is_tuple(Addr) -> format({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
ws_name(Name, {_Addr, Port}) ->
ws_name(Name, Port);
ws_name(Name, Port) ->
list_to_atom(lists:concat([Name, ":", Port])).

View File

@ -23,6 +23,8 @@
-export([ merge_opts/2 -export([ merge_opts/2
, maybe_apply/2 , maybe_apply/2
, compose/1
, compose/2
, run_fold/3 , run_fold/3
, pipeline/3 , pipeline/3
, start_timer/2 , start_timer/2
@ -56,11 +58,19 @@ merge_opts(Defaults, Options) ->
%% @doc Apply a function to a maybe argument. %% @doc Apply a function to a maybe argument.
-spec(maybe_apply(fun((maybe(A)) -> maybe(A)), maybe(A)) -spec(maybe_apply(fun((maybe(A)) -> maybe(A)), maybe(A))
-> maybe(A) when A :: any()). -> maybe(A) when A :: any()).
maybe_apply(_Fun, undefined) -> maybe_apply(_Fun, undefined) -> undefined;
undefined;
maybe_apply(Fun, Arg) when is_function(Fun) -> maybe_apply(Fun, Arg) when is_function(Fun) ->
erlang:apply(Fun, [Arg]). erlang:apply(Fun, [Arg]).
-spec(compose(list(F)) -> G when F :: fun((any()) -> any()),
G :: fun((any()) -> any())).
compose([F|More]) -> compose(F, More).
-spec(compose(fun((X) -> Y), fun((Y) -> Z)) -> fun((X) -> Z)).
compose(F, G) when is_function(G) -> fun(X) -> G(F(X)) end;
compose(F, [G]) -> compose(F, G);
compose(F, [G|More]) -> compose(compose(F, G), More).
%% @doc RunFold %% @doc RunFold
run_fold([], Acc, _State) -> run_fold([], Acc, _State) ->
Acc; Acc;

View File

@ -29,6 +29,8 @@
, get_caps/3 , get_caps/3
]). ]).
-export([default_caps/0]).
-export([default/0]). -export([default/0]).
-export_type([caps/0]). -export_type([caps/0]).
@ -116,23 +118,42 @@ do_check_sub(#{is_shared := true}, #{shared_subscription := false}) ->
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
do_check_sub(_Flags, _Caps) -> ok. do_check_sub(_Flags, _Caps) -> ok.
-spec(get_caps(emqx_zone:zone()) -> caps()). default_caps() ->
get_caps(Zone) -> ?DEFAULT_CAPS.
maps:map(fun(Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def) end, ?DEFAULT_CAPS).
-spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()).
get_caps(Zone, publish) ->
filter_caps(?PUBCAP_KEYS, get_caps(Zone));
get_caps(Zone, subscribe) ->
filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
-spec(get_caps(emqx_zone:zone(), atom(), term()) -> term()).
get_caps(Zone, Cap, Def) -> get_caps(Zone, Cap, Def) ->
emqx_zone:get_env(Zone, Cap, Def). emqx_zone:get_env(Zone, Cap, Def).
get_caps(Zone, publish) ->
with_env(Zone, '$mqtt_pub_caps',
fun() ->
filter_caps(?PUBCAP_KEYS, get_caps(Zone))
end);
get_caps(Zone, subscribe) ->
with_env(Zone, '$mqtt_sub_caps',
fun() ->
filter_caps(?SUBCAP_KEYS, get_caps(Zone))
end).
get_caps(Zone) ->
with_env(Zone, '$mqtt_caps',
fun() ->
maps:map(fun(Cap, Def) ->
emqx_zone:get_env(Zone, Cap, Def)
end, ?DEFAULT_CAPS)
end).
filter_caps(Keys, Caps) -> filter_caps(Keys, Caps) ->
maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps). maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
-spec(default() -> caps()). -spec(default() -> caps()).
default() -> ?DEFAULT_CAPS. default() -> ?DEFAULT_CAPS.
with_env(Zone, Key, InitFun) ->
case emqx_zone:get_env(Zone, Key) of
undefined -> Caps = InitFun(),
ok = emqx_zone:set_env(Zone, Key, Caps),
Caps;
ZoneCaps -> ZoneCaps
end.

View File

@ -429,9 +429,9 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
run_gc(Stats, State = #state{gc_state = GcSt}) -> run_gc(Stats, State = #state{gc_state = GcSt}) ->
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
false -> State;
{_IsGC, GcSt1} -> {_IsGC, GcSt1} ->
State#state{gc_state = GcSt1}; State#state{gc_state = GcSt1}
false -> State
end. end.
check_oom(State = #state{channel = Channel}) -> check_oom(State = #state{channel = Channel}) ->

View File

@ -45,6 +45,8 @@
, session_expiry_interval/1 , session_expiry_interval/1
, force_gc_policy/1 , force_gc_policy/1
, force_shutdown_policy/1 , force_shutdown_policy/1
, get_env/2
, get_env/3
]}). ]}).
%% APIs %% APIs
@ -114,7 +116,7 @@ start_link() ->
stop() -> stop() ->
gen_server:stop(?SERVER). gen_server:stop(?SERVER).
-spec(init_gc_state(zone()) -> emqx_gc:gc_state()). -spec(init_gc_state(zone()) -> maybe(emqx_gc:gc_state())).
init_gc_state(Zone) -> init_gc_state(Zone) ->
maybe_apply(fun emqx_gc:init/1, force_gc_policy(Zone)). maybe_apply(fun emqx_gc:init/1, force_gc_policy(Zone)).

View File

@ -407,7 +407,7 @@ t_handle_call_takeover_begin(_) ->
t_handle_call_takeover_end(_) -> t_handle_call_takeover_end(_) ->
ok = meck:expect(emqx_session, takeover, fun(_) -> ok end), ok = meck:expect(emqx_session, takeover, fun(_) -> ok end),
{shutdown, takeovered, [], _Chan} = {shutdown, takeovered, [], _, _Chan} =
emqx_channel:handle_call({takeover, 'end'}, channel()). emqx_channel:handle_call({takeover, 'end'}, channel()).
t_handle_call_unexpected(_) -> t_handle_call_unexpected(_) ->

View File

@ -31,7 +31,7 @@ set_special_configs(emqx) ->
application:set_env(emqx, flapping_detect_policy, application:set_env(emqx, flapping_detect_policy,
#{threshold => 3, #{threshold => 3,
duration => 100, duration => 100,
banned_interval => 200 banned_interval => 2
}); });
set_special_configs(_App) -> ok. set_special_configs(_App) -> ok.
@ -52,7 +52,7 @@ t_detect_check(_) ->
true = emqx_flapping:detect(ClientInfo), true = emqx_flapping:detect(ClientInfo),
timer:sleep(100), timer:sleep(100),
true = emqx_banned:check(ClientInfo), true = emqx_banned:check(ClientInfo),
timer:sleep(200), timer:sleep(3000),
false = emqx_banned:check(ClientInfo), false = emqx_banned:check(ClientInfo),
Childrens = supervisor:which_children(emqx_cm_sup), Childrens = supervisor:which_children(emqx_cm_sup),
{flapping, Pid, _, _} = lists:keyfind(flapping, 1, Childrens), {flapping, Pid, _, _} = lists:keyfind(flapping, 1, Childrens),

View File

@ -0,0 +1,33 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_global_gc_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
t_run_gc(_) ->
ok = application:set_env(emqx, global_gc_interval, 1),
{ok, _} = emqx_global_gc:start_link(),
ok = timer:sleep(1500),
{ok, MilliSecs} = emqx_global_gc:run(),
ct:print("Global GC: ~w(ms)~n", [MilliSecs]),
emqx_global_gc:stop().

View File

@ -28,9 +28,8 @@ t_check_pub(_) ->
PubCaps = #{max_qos_allowed => ?QOS_1, PubCaps = #{max_qos_allowed => ?QOS_1,
retain_available => false retain_available => false
}, },
lists:foreach(fun({Key, Val}) -> emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps),
ok = emqx_zone:set_env(zone, Key, Val) timer:sleep(50),
end, maps:to_list(PubCaps)),
ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1, ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1,
retain => false}), retain => false}),
PubFlags1 = #{qos => ?QOS_2, retain => false}, PubFlags1 = #{qos => ?QOS_2, retain => false},
@ -39,9 +38,7 @@ t_check_pub(_) ->
PubFlags2 = #{qos => ?QOS_1, retain => true}, PubFlags2 = #{qos => ?QOS_1, retain => true},
?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED}, ?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED},
emqx_mqtt_caps:check_pub(zone, PubFlags2)), emqx_mqtt_caps:check_pub(zone, PubFlags2)),
lists:foreach(fun({Key, _Val}) -> emqx_zone:unset_env(zone, '$mqtt_pub_caps').
true = emqx_zone:unset_env(zone, Key)
end, maps:to_list(PubCaps)).
t_check_sub(_) -> t_check_sub(_) ->
SubOpts = #{rh => 0, SubOpts = #{rh => 0,
@ -54,9 +51,8 @@ t_check_sub(_) ->
shared_subscription => false, shared_subscription => false,
wildcard_subscription => false wildcard_subscription => false
}, },
lists:foreach(fun({Key, Val}) -> emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps),
ok = emqx_zone:set_env(zone, Key, Val) timer:sleep(50),
end, maps:to_list(SubCaps)),
ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts), ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts),
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)), emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)),
@ -64,6 +60,4 @@ t_check_sub(_) ->
emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)), emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)),
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})), emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})),
lists:foreach(fun({Key, _Val}) -> emqx_zone:unset_env(zone, '$mqtt_pub_caps').
true = emqx_zone:unset_env(zone, Key)
end, maps:to_list(SubCaps)).

View File

@ -147,8 +147,8 @@ t_connect_keepalive_timeout(_) ->
Msg -> Msg ->
ReasonCode = 141, ReasonCode = 141,
?assertMatch({disconnected, ReasonCode, _Channel}, Msg) ?assertMatch({disconnected, ReasonCode, _Channel}, Msg)
after after round(timer:seconds(Keepalive) * 2 * 1.5 ) ->
round(timer:seconds(Keepalive) * 2 * 1.5 ) -> error("keepalive timeout") error("keepalive timeout")
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -160,7 +160,7 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) ->
application:set_env(emqx, shared_dispatch_ack_enabled, true), application:set_env(emqx, shared_dispatch_ack_enabled, true),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
Shared_topic = list_to_binary("$share/sharename/" ++ binary_to_list(<<"TopicA">>)), SharedTopic = list_to_binary("$share/sharename/" ++ binary_to_list(<<"TopicA">>)),
CRef = counters:new(1, [atomics]), CRef = counters:new(1, [atomics]),
meck:expect(emqtt, connected, meck:expect(emqtt, connected,
@ -174,18 +174,23 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) ->
{clientid, <<"sub_client_1">>}, {clientid, <<"sub_client_1">>},
{keepalive, 5}]), {keepalive, 5}]),
{ok, _} = emqtt:connect(Sub1), {ok, _} = emqtt:connect(Sub1),
{ok, _, [2]} = emqtt:subscribe(Sub1, Shared_topic, qos2), {ok, _, [2]} = emqtt:subscribe(Sub1, SharedTopic, qos2),
{ok, Sub2} = emqtt:start_link([{proto_ver, v5}, {ok, Sub2} = emqtt:start_link([{proto_ver, v5},
{clientid, <<"sub_client_2">>}, {clientid, <<"sub_client_2">>},
{keepalive, 5}]), {keepalive, 5}]),
{ok, _} = emqtt:connect(Sub2), {ok, _} = emqtt:connect(Sub2),
{ok, _, [2]} = emqtt:subscribe(Sub2, Shared_topic, qos2), {ok, _, [2]} = emqtt:subscribe(Sub2, SharedTopic, qos2),
{ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>}]), {ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>}]),
{ok, _} = emqtt:connect(Pub), {ok, _} = emqtt:connect(Pub),
{ok, _} = emqtt:publish(Pub, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 2), {ok, _} = emqtt:publish(Pub, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 2),
receive receive
{disconnected,shutdown,for_testiong} -> ok {'EXIT', _,{shutdown, for_testiong}} ->
ok
after 1000 ->
error("disconnected timeout")
end, end,
?assertEqual(1, counters:get(CRef, 1)). ?assertEqual(1, counters:get(CRef, 1)).