Merge pull request #2549 from emqx/develop

This commit is contained in:
turtleDeng 2019-05-18 12:11:11 +08:00 committed by GitHub
commit 3110778286
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
59 changed files with 945 additions and 987 deletions

1
.gitignore vendored
View File

@ -39,3 +39,4 @@ rebar.lock
xrefr
erlang.mk
*.coverdata
etc/emqx.conf.rendered

View File

@ -1,18 +1,17 @@
language: erlang
otp_release:
- 21.2
- 21.3
before_install:
- git clone https://github.com/erlang/rebar3.git; cd rebar3; ./bootstrap; sudo mv rebar3 /usr/local/bin/; cd ..
script:
- make dep-vsn-check
- make rebar-compile
- make rebar-xref
- make rebar-eunit
- make rebar-ct
- make rebar-cover
- make compile
- make xref
- make eunit
- make ct
- make cover
after_success:
- make coveralls

153
Makefile
View File

@ -1,34 +1,9 @@
.PHONY: plugins tests
## shallow clone for speed
PROJECT = emqx
PROJECT_DESCRIPTION = EMQ X Broker
REBAR_GIT_CLONE_OPTIONS += --depth 1
export REBAR_GIT_CLONE_OPTIONS
DEPS = jsx gproc gen_rpc ekka esockd cowboy replayq
dep_jsx = git-emqx https://github.com/talentdeficit/jsx 2.9.0
dep_gproc = git-emqx https://github.com/uwiger/gproc 0.8.0
dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.1
dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4
dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.4
dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.6.1
dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1
NO_AUTOPATCH = cuttlefish
ERLC_OPTS += +debug_info -DAPPLICATION=emqx
BUILD_DEPS = cuttlefish
dep_cuttlefish = git-emqx https://github.com/emqx/cuttlefish v2.2.1
TEST_DEPS = meck
dep_meck = hex-emqx 0.8.13
TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx
EUNIT_OPTS = verbose
# CT_SUITES = emqx_bridge
## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
# CT_SUITES = emqx_trie emqx_router emqx_frame emqx_mqtt_compat
CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
@ -38,34 +13,71 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \
emqx_vm_mon emqx_alarm_handler emqx_rpc
emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping
CT_NODE_NAME = emqxct@127.0.0.1
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
COVER = true
compile:
@rebar3 compile
PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl compiler mnesia
DIALYZER_DIRS := ebin/
DIALYZER_OPTS := --verbose --statistics -Werror_handling -Wrace_conditions #-Wunmatched_returns
clean: distclean
$(shell [ -f erlang.mk ] || curl -s -o erlang.mk https://raw.githubusercontent.com/emqx/erlmk/master/erlang.mk)
include erlang.mk
## Cuttlefish escript is built by default when cuttlefish app (as dependency) was built
CUTTLEFISH_SCRIPT := _build/default/lib/cuttlefish/cuttlefish
clean:: gen-clean
.PHONY: cover
cover:
@rebar3 cover
.PHONY: gen-clean
gen-clean:
@rm -rf bbmustache
@rm -f etc/gen.emqx.conf
.PHONY: coveralls
coveralls:
@rebar3 coveralls send
.PHONY: xref
xref:
@rebar3 xref
.PHONY: deps
deps:
@rebar3 get-deps
.PHONY: eunit
eunit:
@rebar3 eunit -v
.PHONY: ct-setup
ct-setup:
rebar3 as test compile
@mkdir -p data
@if [ ! -f data/loaded_plugins ]; then touch data/loaded_plugins; fi
@ln -s -f '../../../../etc' _build/test/lib/emqx/
@ln -s -f '../../../../data' _build/test/lib/emqx/
.PHONY: ct
ct: ct-setup
@rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',')
## Run one single CT with rebar3
## e.g. make ct-one-suite suite=emqx_bridge
.PHONY: ct-one-suite
ct-one-suite: ct-setup
@rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE
.PHONY: app.config
app.config: $(CUTTLEFISH_SCRIPT) etc/gen.emqx.conf
$(CUTTLEFISH_SCRIPT) -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/
$(CUTTLEFISH_SCRIPT):
@rebar3 get-deps
@if [ ! -f cuttlefish ]; then make -C _build/default/lib/cuttlefish; fi
bbmustache:
$(verbose) git clone https://github.com/soranoba/bbmustache.git && cd bbmustache && ./rebar3 compile && cd ..
@git clone https://github.com/soranoba/bbmustache.git && cd bbmustache && ./rebar3 compile && cd ..
# This hack is to generate a conf file for testing
# relx overlay is used for release
etc/gen.emqx.conf: bbmustache etc/emqx.conf
$(verbose) erl -noshell -pa bbmustache/_build/default/lib/bbmustache/ebin -eval \
@erl -noshell -pa bbmustache/_build/default/lib/bbmustache/ebin -eval \
"{ok, Temp} = file:read_file('etc/emqx.conf'), \
{ok, Vars0} = file:consult('vars'), \
Vars = [{atom_to_list(N), list_to_binary(V)} || {N, V} <- Vars0], \
@ -73,51 +85,12 @@ etc/gen.emqx.conf: bbmustache etc/emqx.conf
ok = file:write_file('etc/gen.emqx.conf', Targ), \
halt(0)."
CUTTLEFISH_SCRIPT = _build/default/lib/cuttlefish/cuttlefish
.PHONY: gen-clean
gen-clean:
@rm -rf bbmustache
@rm -f etc/gen.emqx.conf etc/emqx.conf.rendered
app.config: $(CUTTLEFISH_SCRIPT) etc/gen.emqx.conf
$(verbose) $(CUTTLEFISH_SCRIPT) -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/
ct: app.config
rebar-cover:
@rebar3 cover
coveralls:
@rebar3 coveralls send
$(CUTTLEFISH_SCRIPT): rebar-deps
@if [ ! -f cuttlefish ]; then make -C _build/default/lib/cuttlefish; fi
rebar-xref:
@rebar3 xref
rebar-deps:
@rebar3 get-deps
rebar-eunit: $(CUTTLEFISH_SCRIPT)
@rebar3 eunit -v
rebar-compile:
@rebar3 compile
rebar-ct-setup: app.config
@rebar3 as test compile
@ln -s -f '../../../../etc' _build/test/lib/emqx/
@ln -s -f '../../../../data' _build/test/lib/emqx/
rebar-ct: rebar-ct-setup
@rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',')
## Run one single CT with rebar3
## e.g. make ct-one-suite suite=emqx_bridge
ct-one-suite: rebar-ct-setup
@rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE
rebar-clean:
@rebar3 clean
distclean::
.PHONY: distclean
distclean: gen-clean
@rm -rf _build cover deps logs log data
@rm -f rebar.lock compile_commands.json cuttlefish
@rm -f rebar.lock compile_commands.json cuttlefish erl_crash.dump

View File

@ -368,6 +368,12 @@ log.dir = {{ platform_log_dir }}
## Default: emqx.log
log.file = emqx.log
## Limits the total number of characters printed for each log event.
##
## Value: Integer
## Default: 8192
log.chars_limit = 8192
## Maximum size of each log file.
##
## Value: Number
@ -438,6 +444,17 @@ acl_cache_ttl = 1m
## Default: ignore
acl_deny_action = ignore
## The cleanning interval for flapping
##
## Value: Duration
## -d: day
## -h: hour
## -m: minute
## -s: second
##
## Default: 1h, 1 hour
## flapping_clean_interval = 1h
##--------------------------------------------------------------------
## MQTT Protocol
##--------------------------------------------------------------------
@ -533,6 +550,18 @@ zone.external.acl_deny_action = ignore
## Numbers delimited by `|'. Zero or negative is to disable.
zone.external.force_gc_policy = 1000|1MB
## Max message queue length and total heap size to force shutdown
## connection/session process.
## Message queue here is the Erlang process mailbox, but not the number
## of queued MQTT messages of QoS 1 and 2.
##
## Numbers delimited by `|'. Zero or negative is to disable.
##
## Default:
## - 8000|800MB on ARCH_64 system
## - 1000|100MB on ARCH_32 sytem
## zone.external.force_shutdown_policy = 8000|800MB
## Maximum MQTT packet size allowed.
##
## Value: Bytes
@ -650,11 +679,35 @@ zone.external.mqueue_priorities = none
## Value: highest | lowest
zone.external.mqueue_default_priority = highest
## Whether to enqueue Qos0 messages.
## Whether to enqueue QoS0 messages.
##
## Value: false | true
zone.external.mqueue_store_qos0 = true
## Whether to turn on flapping detect
##
## Value: on | off
zone.external.enable_flapping_detect = off
## The times of state change per min, specifying the threshold which is used to
## detect if the connection starts flapping
##
## Value: number
zone.external.flapping_threshold = 10, 1m
## Flapping expiry interval for connections.
## This config entry is used to determine when the connection
## will be unbanned.
##
## Value: Duration
## -d: day
## -h: hour
## -m: minute
## -s: second
##
## Default: 1h, 1 hour
zone.external.flapping_banned_expiry_interval = 1h
## All the topics will be prefixed with the mountpoint path if this option is enabled.
##
## Variables in mountpoint path:
@ -726,6 +779,30 @@ zone.internal.max_mqueue_len = 1000
## Value: false | true
zone.internal.mqueue_store_qos0 = true
## Whether to turn on flapping detect
##
## Value: on | off
zone.internal.enable_flapping_detect = off
## The times of state change per second, specifying the threshold which is used to
## detect if the connection starts flapping
##
## Value: number
zone.internal.flapping_threshold = 10, 1m
## Flapping expiry interval for connections.
## This config entry is used to determine when the connection
## will be unbanned.
##
## Value: Duration
## -d: day
## -h: hour
## -m: minute
## -s: second
##
## Default: 1h, 1 hour
zone.internal.flapping_banned_expiry_interval = 1h
## All the topics will be prefixed with the mountpoint path if this option is enabled.
##
## Variables in mountpoint path:
@ -786,8 +863,11 @@ listener.tcp.external.zone = external
## Rate limit for the external MQTT/TCP connections. Format is 'rate,burst'.
##
## Value: rate,burst
## - rate: The average limit value for per second
## - burst: The maximum allowed for each check, To avoid frequent restriction
## this value is recommended to be set to `(max_packet_size * active_n)/2`
## Unit: Bps
## listener.tcp.external.rate_limit = 1024,4096
## listener.tcp.external.rate_limit = 1024,52428800
## The access control rules for the MQTT/TCP listener.
##
@ -917,8 +997,11 @@ listener.tcp.internal.zone = internal
## See: listener.tcp.$name.rate_limit
##
## Value: rate,burst
## - rate: The average limit value for per second
## - burst: The maximum allowed for each check, To avoid frequent restriction
## this value is recommended to be set to `(max_packet_size * active_n)/2`
## Unit: Bps
## listener.tcp.internal.rate_limit = 1000000,2000000
## listener.tcp.internal.rate_limit = 1000000,524288000
## The TCP backlog of internal MQTT/TCP Listener.
##
@ -1027,8 +1110,11 @@ listener.ssl.external.access.1 = allow all
## Rate limit for the external MQTT/SSL connections.
##
## Value: rate,burst
## - rate: The average limit value for per second
## - burst: The maximum allowed for each check, To avoid frequent restriction
## this value is recommended to be set to `(max_packet_size * active_n)/2`
## Unit: Bps
## listener.ssl.external.rate_limit = 1024,4096
## listener.ssl.external.rate_limit = 1024,52428800
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
## HAProxy or Nginx.
@ -1261,8 +1347,11 @@ listener.ws.external.max_conn_rate = 1000
## Rate limit for the MQTT/WebSocket connections.
##
## Value: rate,burst
## - rate: The average limit value for per second
## - burst: The maximum allowed for each check, To avoid frequent restriction
## this value is recommended to be set to `(max_packet_size * 1)/2`
## Unit: Bps
## listener.ws.external.rate_limit = 1024,4096
## listener.ws.external.rate_limit = 1024,524288
## Zone of the external MQTT/WebSocket listener belonged to.
##
@ -1469,8 +1558,11 @@ listener.wss.external.max_conn_rate = 1000
## Rate limit for the MQTT/WebSocket/SSL connections.
##
## Value: rate,burst
## - rate: The average limit value for per second
## - burst: The maximum allowed for each check, To avoid frequent restriction
## this value is recommended to be set to `(max_packet_size * 1)/2`
## Unit: Bps
## listener.wss.external.rate_limit = 1024,4096
## listener.wss.external.rate_limit = 1024,524288
## Zone of the external MQTT/WebSocket/SSL listener belonged to.
##
@ -1784,13 +1876,13 @@ listener.wss.external.send_timeout_close = on
## SSL Ciphers used by the bridge.
##
## Value: String
#bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## Ciphers for TLS PSK.
## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
#bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## Ping interval of a down bridge.
##
@ -2122,7 +2214,7 @@ broker.session_locking_strategy = quorum
##
## Value: Enum
## - random
## - round_robbin
## - round_robin
## - sticky
## - hash
broker.shared_subscription_strategy = random
@ -2145,19 +2237,43 @@ broker.route_batch_clean = off
## System Monitor
##--------------------------------------------------------------------
## Enable Long GC monitoring.
## Enable Long GC monitoring. Disable if the value is 0.
## Notice: don't enable the monitor in production for:
## https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
##
## Value: true | false
sysmon.long_gc = false
## Value: Duration
## - h: hour
## - m: minute
## - s: second
## - ms: milliseconds
##
## Examples:
## - 2h: 2 hours
## - 30m: 30 minutes
## - 0.1s: 0.1 seconds
## - 100ms : 100 milliseconds
##
## Default: 0ms
sysmon.long_gc = 0
## Enable Long Schedule(ms) monitoring.
##
## See: http://erlang.org/doc/man/erlang.html#system_monitor-2
##
## Value: Number
sysmon.long_schedule = 240
## Value: Duration
## - h: hour
## - m: minute
## - s: second
## - ms: milliseconds
##
## Examples:
## - 2h: 2 hours
## - 30m: 30 minutes
## - 0.1s: 0.1 seconds
## - 100ms: 100 milliseconds
##
## Default: 0ms
sysmon.long_schedule = 240ms
## Enable Large Heap monitoring.
##

View File

@ -19,4 +19,3 @@
-type(ok_or_error(Reason) :: ok | {error, Reason}).
-type(ok_or_error(Value, Reason) :: {ok, Value} | {error, Reason}).

View File

@ -270,8 +270,7 @@ end}.
X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes;
_ -> undefined
end
end
}.
end}.
{validator, "zdbbl_range", "must be between 1KB and 2097151KB",
fun(ZDBBL) ->
@ -401,7 +400,7 @@ end}.
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}}
]}.
{mapping, "log.primary_level", "emqx.primary_log_level", [
{mapping, "log.primary_level", "kernel.primary_log_level", [
{default, error},
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}}
]}.
@ -421,6 +420,11 @@ end}.
{datatype, file}
]}.
{mapping, "log.chars_limit", "kernel.logger", [
{default, 8192},
{datatype, integer}
]}.
{mapping, "log.rotation.size", "kernel.logger", [
{default, "10MB"},
{datatype, bytesize}
@ -458,13 +462,13 @@ end}.
hidden
]}.
{translation, "emqx.primary_log_level", fun(Conf) ->
cuttlefish:conf_get("log.level", Conf)
end}.
{translation, "kernel.logger", fun(Conf) ->
LogTo = cuttlefish:conf_get("log.to", Conf),
LogLevel = cuttlefish:conf_get("log.level", Conf),
CharsLimit = case cuttlefish:conf_get("log.chars_limit", Conf) of
-1 -> unlimited;
V -> V
end,
Formatter = {emqx_logger_formatter,
#{template =>
[time," [",level,"] ",
@ -475,7 +479,8 @@ end}.
[{peername,
[peername," "],
[]}]},
msg,"\n"]}},
msg,"\n"],
chars_limit => CharsLimit}},
FileConf = fun(Filename) ->
#{type => wrap,
file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename),
@ -574,6 +579,11 @@ end}.
{datatype, {enum, [ignore, disconnect]}}
]}.
%% @doc time interval to clean flapping records
{mapping, "flapping_clean_interval", "emqx.flapping_clean_interval", [
{datatype, {duration, ms}}
]}.
{validator, "range:gt_0", "must greater than 0",
fun(X) -> X > 0 end
}.
@ -814,6 +824,18 @@ end}.
{datatype, {enum, [true, false]}}
]}.
{mapping, "zone.$name.enable_flapping_detect", "emqx.zones", [
{datatype, flag}
]}.
{mapping, "zone.$name.flapping_threshold", "emqx.zones", [
{datatype, string}
]}.
{mapping, "zone.$name.flapping_banned_expiry_interval", "emqx.zones", [
{datatype, {duration, s}}
]}.
%% @doc Force connection/session process GC after this number of
%% messages | bytes passed through.
%% Numbers delimited by `|'. Zero or negative is to disable.
@ -828,7 +850,7 @@ end}.
%% of queued MQTT messages of QoS 1 and 2.
%% Zero or negative is to disable.
{mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [
{default, "0 | 0MB"},
{default, "default"},
{datatype, string}
]}.
@ -845,6 +867,15 @@ end}.
{translation, "emqx.zones", fun(Conf) ->
Mapping = fun("retain_available", Val) ->
{mqtt_retain_available, Val};
("flapping_threshold", Val) ->
[Limit, Duration] = string:tokens(Val, ", "),
FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of
Min when is_integer(Min) ->
{list_to_integer(Limit), Min};
{error, Reason} ->
error(Reason)
end,
{flapping_threshold, FlappingThreshold};
("wildcard_subscription", Val) ->
{mqtt_wildcard_subscription, Val};
("shared_subscription", Val) ->
@ -868,15 +899,34 @@ end}.
count => list_to_integer(Count)}
end,
{force_gc_policy, GcPolicy};
("force_shutdown_policy", "default") ->
{DefaultLen, DefaultSize} =
case erlang:system_info(wordsize) of
8 -> % arch_64
{8000, cuttlefish_bytesize:parse("800MB")};
4 -> % arch_32
{1000, cuttlefish_bytesize:parse("100MB")}
end,
{force_shutdown_policy, #{message_queue_len => DefaultLen,
max_heap_size => DefaultSize}};
("force_shutdown_policy", Val) ->
[Len, Siz] = string:tokens(Val, "| "),
ShutdownPolicy = case cuttlefish_bytesize:parse(Siz) of
{error, Reason} ->
error(Reason);
Siz1 ->
#{message_queue_len => list_to_integer(Len),
max_heap_size => Siz1}
end,
MaxSiz = case erlang:system_info(wordsize) of
8 -> % arch_64
(1 bsl 59) - 1;
4 -> % arch_32
(1 bsl 27) - 1
end,
ShutdownPolicy =
case cuttlefish_bytesize:parse(Siz) of
{error, Reason} ->
error(Reason);
Siz1 when Siz1 > MaxSiz ->
cuttlefish:invalid(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz]));
Siz1 ->
#{message_queue_len => list_to_integer(Len),
max_heap_size => Siz1}
end,
{force_shutdown_policy, ShutdownPolicy};
("mqueue_priorities", Val) ->
case Val of
@ -1996,11 +2046,11 @@ end}.
%% @doc Shared Subscription Dispatch Strategy.
{mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [
{default, round_robbin},
{default, round_robin},
{datatype,
{enum,
[random, %% randomly pick a subscriber
round_robbin, %% round robin alive subscribers one message after another
round_robin, %% round robin alive subscribers one message after another
sticky, %% pick a random subscriber and stick to it
hash %% hash client ID to a group member
]}}
@ -2024,14 +2074,14 @@ end}.
%% @doc Long GC, don't monitor in production mode for:
%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
{mapping, "sysmon.long_gc", "emqx.sysmon", [
{default, false},
{datatype, {enum, [true, false]}}
{default, 0},
{datatype, [integer, {duration, ms}]}
]}.
%% @doc Long Schedule(ms)
{mapping, "sysmon.long_schedule", "emqx.sysmon", [
{default, 1000},
{datatype, integer}
{default, 240},
{datatype, [integer, {duration, ms}]}
]}.
%% @doc Large Heap
@ -2053,11 +2103,8 @@ end}.
]}.
{translation, "emqx.sysmon", fun(Conf) ->
[{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)},
{long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)},
{large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)},
{busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
{busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
Configs = cuttlefish_variable:filter_by_prefix("sysmon", Conf),
[{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs]
end}.
%%--------------------------------------------------------------------
@ -2095,12 +2142,8 @@ end}.
]}.
{translation, "emqx.os_mon", fun(Conf) ->
[{cpu_check_interval, cuttlefish:conf_get("os_mon.cpu_check_interval", Conf)},
{cpu_high_watermark, cuttlefish:conf_get("os_mon.cpu_high_watermark", Conf)},
{cpu_low_watermark, cuttlefish:conf_get("os_mon.cpu_low_watermark", Conf)},
{mem_check_interval, cuttlefish:conf_get("os_mon.mem_check_interval", Conf)},
{sysmem_high_watermark, cuttlefish:conf_get("os_mon.sysmem_high_watermark", Conf)},
{procmem_high_watermark, cuttlefish:conf_get("os_mon.procmem_high_watermark", Conf)}]
Configs = cuttlefish_variable:filter_by_prefix("os_mon", Conf),
[{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs]
end}.
%%--------------------------------------------------------------------
@ -2122,7 +2165,6 @@ end}.
]}.
{translation, "emqx.vm_mon", fun(Conf) ->
[{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)},
{process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)},
{process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}]
Configs = cuttlefish_variable:filter_by_prefix("vm_mon", Conf),
[{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs]
end}.

View File

@ -1,16 +1,12 @@
{deps, [{jsx, "2.9.0"},
{gproc, "0.8.0"},
{cowboy, "2.6.1"},
{meck, "0.8.13"} %% temp workaround for version check
]}.
%% appended to deps in rebar.config.script
{github_emqx_deps,
[{gen_rpc, "2.3.1"},
{ekka, "v0.5.4"},
{replayq, "v0.1.1"},
{esockd, "v5.4.4"},
{cuttlefish, "v2.2.1"}
{deps,
[ {jsx, "2.9.0"} % hex
, {cowboy, "2.6.1"} % hex
, {gproc, "0.8.0"} % hex
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.4"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.1.1"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "v5.4.4"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}.
{edoc_opts, [{preprocess, true}]}.
@ -28,3 +24,13 @@
{cover_export_enabled, true}.
{plugins, [coveralls]}.
{profiles,
[{test,
[{deps,
[ {meck, "0.8.13"} % hex
, {bbmustache, "1.7.0"} % hex
, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.1.1"}}}
]}
]}
]}.

View File

@ -1,11 +1,3 @@
CONFIG0 = case os:getenv("REBAR_GIT_CLONE_OPTIONS") of
"--depth 1" ->
CONFIG;
_ ->
os:putenv("REBAR_GIT_CLONE_OPTIONS", "--depth 1"),
CONFIG
end,
CONFIG1 = case os:getenv("TRAVIS") of
"true" ->
JobId = os:getenv("TRAVIS_JOB_ID"),
@ -16,10 +8,4 @@ CONFIG1 = case os:getenv("TRAVIS") of
CONFIG
end,
{_, Deps} = lists:keyfind(deps, 1, CONFIG1),
{_, OurDeps} = lists:keyfind(github_emqx_deps, 1, CONFIG1),
UrlPrefix = "https://github.com/emqx/",
NewDeps = Deps ++ [{Name, {git, UrlPrefix ++ atom_to_list(Name), {branch, Branch}}} || {Name, Branch} <- OurDeps],
CONFIG2 = lists:keystore(deps, 1, CONFIG1, {deps, NewDeps}),
CONFIG2.
CONFIG1.

View File

@ -15,7 +15,6 @@
-module(emqx_access_control).
-include("emqx.hrl").
-include("logger.hrl").
-export([authenticate/1]).
@ -29,14 +28,12 @@
-spec(authenticate(emqx_types:credentials())
-> {ok, emqx_types:credentials()} | {error, term()}).
authenticate(Credentials) ->
detect_anonymous_permission(Credentials, fun() ->
case emqx_hooks:run_fold('client.authenticate', [], init_auth_result(Credentials)) of
#{auth_result := success} = NewCredentials ->
{ok, NewCredentials};
NewCredentials ->
{error, maps:get(auth_result, NewCredentials, unknown_error)}
end
end).
case emqx_hooks:run_fold('client.authenticate', [], init_auth_result(Credentials)) of
#{auth_result := success} = NewCredentials ->
{ok, NewCredentials};
NewCredentials ->
{error, maps:get(auth_result, NewCredentials, unknown_error)}
end.
%% @doc Check ACL
-spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_types:topic()) -> allow | deny).
@ -69,22 +66,8 @@ reload_acl() ->
emqx_mod_acl_internal:reload_acl().
init_auth_result(Credentials) ->
case anonymous_permission(Credentials) of
true -> Credentials#{auth_result => success};
false -> Credentials#{auth_result => not_authorized}
case emqx_zone:get_env(maps:get(zone, Credentials, undefined), allow_anonymous, false) of
true -> Credentials#{auth_result => success, anonymous => true};
false -> Credentials#{auth_result => not_authorized, anonymous => false}
end.
detect_anonymous_permission(#{username := undefined,
password := undefined} = Credentials, Fun) ->
case anonymous_permission(Credentials) of
true -> {ok, Credentials};
false -> Fun()
end;
detect_anonymous_permission(_Credentials, Fun) ->
Fun().
anonymous_permission(Credentials) ->
emqx_zone:get_env(maps:get(zone, Credentials, undefined),
allow_anonymous, false).

View File

@ -31,7 +31,7 @@ start(_Type, _Args) ->
%% kernel config `logger_level` before starting the erlang vm.
%% This is because the latter approach an annoying debug msg will be printed out:
%% "[debug] got_unexpected_message {'EXIT',<0.1198.0>,normal}"
logger:set_primary_config(level, application:get_env(emqx, primary_log_level, error)),
logger:set_primary_config(level, application:get_env(kernel, primary_log_level, error)),
print_banner(),
ekka:start(),

View File

@ -70,13 +70,13 @@ check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) -
orelse ets:member(?TAB, {username, Username})
orelse ets:member(?TAB, {ipaddr, IPAddr}).
-spec(add(#banned{}) -> ok).
-spec(add(emqx_types:banned()) -> ok).
add(Banned) when is_record(Banned, banned) ->
mnesia:dirty_write(?TAB, Banned).
-spec(delete({client_id, emqx_types:client_id()}
| {username, emqx_types:username()}
| {peername, emqx_types:peername()}) -> ok).
| {username, emqx_types:username()}
| {peername, emqx_types:peername()}) -> ok).
delete(Key) ->
mnesia:dirty_delete(?TAB, Key).
@ -127,4 +127,3 @@ expire_banned_items(Now) ->
mnesia:delete_object(?TAB, B, sticky_write);
(_, _Acc) -> ok
end, ok, ?TAB).

View File

@ -120,6 +120,7 @@
-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
-define(DEFAULT_SEG_BYTES, (1 bsl 20)).
-define(NO_BRIDGE_HANDLER, undefined).
-define(NO_FROM, undefined).
-define(maybe_send, {next_event, internal, maybe_send}).
%% @doc Start a bridge worker. Supported configs:
@ -185,8 +186,10 @@ ensure_stopped(Id, Timeout) ->
stop(Pid) -> gen_statem:stop(Pid).
status(Pid) ->
gen_statem:call(Pid, status).
status(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, status);
status(Id) ->
status(name(Id)).
%% @doc This function is to be evaluated on message/batch receiver side.
-spec import_batch(batch(), fun(() -> ok)) -> ok.
@ -297,8 +300,7 @@ standing_by(enter, _, #{start_type := auto}) ->
standing_by(enter, _, #{start_type := manual}) ->
keep_state_and_data;
standing_by({call, From}, ensure_started, State) ->
{next_state, connecting, State,
[{reply, From, ok}]};
do_connect({call, From}, standing_by, State);
standing_by(state_timeout, do_connect, State) ->
{next_state, connecting, State};
standing_by(info, Info, State) ->
@ -313,24 +315,8 @@ standing_by(Type, Content, State) ->
connecting(enter, connected, #{reconnect_delay_ms := Timeout}) ->
Action = {state_timeout, Timeout, reconnect},
{keep_state_and_data, Action};
connecting(enter, _, #{reconnect_delay_ms := Timeout,
connect_fun := ConnectFun,
subscriptions := Subs,
forwards := Forwards
} = State) ->
ok = subscribe_local_topics(Forwards),
case ConnectFun(Subs) of
{ok, ConnRef, Conn} ->
?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
Action = {state_timeout, 0, connected},
{keep_state,
eval_bridge_handler(State#{ conn_ref => ConnRef
, connection => Conn}, connected),
Action};
error ->
Action = {state_timeout, Timeout, reconnect},
{keep_state_and_data, Action}
end;
connecting(enter, _, State) ->
do_connect(enter, connecting, State);
connecting(state_timeout, connected, State) ->
{next_state, connected, State};
connecting(state_timeout, reconnect, _State) ->
@ -424,7 +410,7 @@ common(StateName, Type, Content, State) ->
eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) ->
State;
eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) ->
_ = Handler(Msg),
Handler(Msg),
State.
ensure_present(Key, Topic, State) ->
@ -456,6 +442,35 @@ is_topic_present({Topic, _QoS}, Topics) ->
is_topic_present(Topic, Topics) ->
lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics).
do_connect(Type, StateName, #{ forwards := Forwards
, subscriptions := Subs
, connect_fun := ConnectFun
, reconnect_delay_ms := Timeout
} = State) ->
ok = subscribe_local_topics(Forwards),
From = case StateName of
standing_by -> {call, Pid} = Type, Pid;
connecting -> ?NO_FROM
end,
DoEvent = fun (standing_by, StandingbyAction, _ConnectingAction) ->
StandingbyAction;
(connecting, _StandingbyAction, ConnectingAction) ->
ConnectingAction
end,
case ConnectFun(Subs) of
{ok, ConnRef, Conn} ->
?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
State0 = State#{conn_ref => ConnRef, connection => Conn},
State1 = eval_bridge_handler(State0, connected),
StandingbyAction = {next_state, connected, State1, [{reply, From, ok}]},
ConnectingAction = {keep_state, State1, {state_timeout, 0, connected}},
DoEvent(StateName, StandingbyAction, ConnectingAction);
{error, Reason} ->
StandingbyAction = {keep_state_and_data, [{reply, From, {error, Reason}}]},
ConnectingAction = {keep_state_and_data, {state_timeout, Timeout, reconnect}},
DoEvent(StateName, StandingbyAction, ConnectingAction)
end.
do_ensure_present(forwards, Topic, _) ->
ok = subscribe_local_topic(Topic);
do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule,
@ -564,9 +579,8 @@ disconnect(#{connection := Conn,
connect_module := Module
} = State) when Conn =/= undefined ->
ok = Module:stop(ConnRef, Conn),
eval_bridge_handler(State#{conn_ref => undefined,
connection => undefined},
disconnected);
State0 = State#{conn_ref => undefined, connection => undefined},
eval_bridge_handler(State0, disconnected);
disconnect(State) ->
eval_bridge_handler(State, disconnected).

View File

@ -56,7 +56,7 @@ start(Module, Config) ->
Config1 = obfuscate(Config),
?LOG(error, "[Bridge connect] Failed to connect with module=~p\n"
"config=~p\nreason:~p", [Module, Config1, Reason]),
error
{error, Reason}
end.
obfuscate(Map) ->
@ -69,4 +69,3 @@ obfuscate(Map) ->
is_sensitive(password) -> true;
is_sensitive(_) -> false.

View File

@ -56,7 +56,9 @@ start(Config = #{address := Address}) ->
ClientConfig = Config#{msg_handler => Handlers,
owner => AckCollector,
host => Host,
port => Port},
port => Port,
bridge_mode => true
},
case emqx_client:start_link(ClientConfig) of
{ok, Pid} ->
case emqx_client:connect(Pid) of

View File

@ -74,6 +74,6 @@ drop_bridge(Id) ->
ok ->
supervisor:delete_child(?SUP, Id);
Error ->
?LOG(error, "[Bridge] Delete bridge failed", [Error]),
?LOG(error, "[Bridge] Delete bridge failed, error : ~p", [Error]),
Error
end.

View File

@ -88,7 +88,7 @@
]).
%% Default timeout
-define(DEFAULT_KEEPALIVE, 60000).
-define(DEFAULT_KEEPALIVE, 60).
-define(DEFAULT_ACK_TIMEOUT, 30000).
-define(DEFAULT_CONNECT_TIMEOUT, 60000).
@ -503,7 +503,7 @@ init([{username, Username} | Opts], State) ->
init([{password, Password} | Opts], State) ->
init(Opts, State#state{password = iolist_to_binary(Password)});
init([{keepalive, Secs} | Opts], State) ->
init(Opts, State#state{keepalive = timer:seconds(Secs)});
init(Opts, State#state{keepalive = Secs});
init([{proto_ver, v3} | Opts], State) ->
init(Opts, State#state{proto_ver = ?MQTT_PROTO_V3,
proto_name = <<"MQIsdp">>});
@ -928,6 +928,11 @@ handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
?LOG(error, "[Client] Got tcp error: ~p", [Reason]),
{stop, {shutdown, Reason}, State};
handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) ->
?LOG(info, "[Client] State: ~s, Unexpected Event: (info, ~p)",
[StateName, EventContent]),
keep_state_and_data;
handle_event(EventType, EventContent, StateName, _StateData) ->
?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)",
[StateName, EventType, EventContent]),
@ -1021,11 +1026,11 @@ publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId),
end.
ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) ->
ensure_keepalive_timer(timer:seconds(Secs), State);
ensure_keepalive_timer(timer:seconds(Secs), State#state{keepalive = Secs});
ensure_keepalive_timer(State = #state{keepalive = 0}) ->
State;
ensure_keepalive_timer(State = #state{keepalive = I}) ->
ensure_keepalive_timer(I, State).
ensure_keepalive_timer(timer:seconds(I), State).
ensure_keepalive_timer(I, State) when is_integer(I) ->
State#state{keepalive_timer = erlang:start_timer(I, self(), keepalive)}.

View File

@ -207,4 +207,3 @@ stats_fun() ->
undefined -> ok;
Size -> emqx_stats:setstat('connections/count', 'connections/max', Size)
end.

View File

@ -30,11 +30,20 @@ init([]) ->
shutdown => 1000,
type => worker,
modules => [emqx_banned]},
FlappingOption = emqx_config:get_env(flapping_clean_interval, 3600000),
Flapping = #{id => flapping,
start => {emqx_flapping, start_link, [FlappingOption]},
restart => permanent,
shutdown => 1000,
type => worker,
modules => [emqx_flapping]},
Manager = #{id => manager,
start => {emqx_cm, start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => [emqx_cm]},
{ok, {{one_for_one, 10, 100}, [Banned, Manager]}}.
SupFlags = #{strategy => one_for_one,
intensity => 100,
period => 10},
{ok, {SupFlags, [Banned, Manager, Flapping]}}.

View File

@ -19,7 +19,6 @@
%% 1. Store in mnesia database?
%% 2. Store in dets?
%% 3. Store in data/app.config?
%%
-module(emqx_config).
@ -138,4 +137,3 @@ read_(_App) -> error(no_impl).
% end, [], Configs),
% RequiredCfg ++ OptionalCfg
% end.

View File

@ -87,15 +87,15 @@ info(#state{transport = Transport,
rate_limit = RateLimit,
pub_limit = PubLimit,
proto_state = ProtoState}) ->
ConnInfo = [{socktype, Transport:type(Socket)},
{peername, Peername},
{sockname, Sockname},
{conn_state, ConnState},
{active_n, ActiveN},
{rate_limit, rate_limit_info(RateLimit)},
{pub_limit, rate_limit_info(PubLimit)}],
ConnInfo = #{socktype => Transport:type(Socket),
peername => Peername,
sockname => Sockname,
conn_state => ConnState,
active_n => ActiveN,
rate_limit => rate_limit_info(RateLimit),
pub_limit => rate_limit_info(PubLimit)},
ProtoInfo = emqx_protocol:info(ProtoState),
lists:usort(lists:append(ConnInfo, ProtoInfo)).
maps:merge(ConnInfo, ProtoInfo).
rate_limit_info(undefined) ->
#{};
@ -109,10 +109,10 @@ attrs(CPid) when is_pid(CPid) ->
attrs(#state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
SockAttrs = [{peername, Peername},
{sockname, Sockname}],
SockAttrs = #{peername => Peername,
sockname => Sockname},
ProtoAttrs = emqx_protocol:attrs(ProtoState),
lists:usort(lists:append(SockAttrs, ProtoAttrs)).
maps:merge(SockAttrs, ProtoAttrs).
%% Conn stats
stats(CPid) when is_pid(CPid) ->
@ -242,10 +242,10 @@ connected(info, {deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -
connected(info, {keepalive, start, Interval},
State = #state{transport = Transport, socket = Socket}) ->
StatFun = fun() ->
case Transport:getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
Error -> Error
end
case Transport:getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
Error -> Error
end
end,
case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of
{ok, KeepAlive} ->
@ -317,9 +317,15 @@ handle(info, {tcp_passive, _Sock}, State) ->
ok = activate_socket(NState),
{keep_state, NState};
handle(info, {ssl_passive, _Sock}, State) ->
%% Rate limit here:)
NState = ensure_rate_limit(State),
ok = activate_socket(NState),
{keep_state, NState};
handle(info, activate_socket, State) ->
%% Rate limit timer expired.
ok = activate_socket(State),
ok = activate_socket(State#state{conn_state = running}),
{keep_state, State#state{conn_state = running, limit_timer = undefined}};
handle(info, {inet_reply, _Sock, ok}, State) ->
@ -442,6 +448,7 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) ->
{0, Rl1} ->
ensure_rate_limit(Limiters, setelement(Pos, State, Rl1));
{Pause, Rl1} ->
?LOG(debug, "[Connection] Rate limit pause connection ~pms", [Pause]),
TRef = erlang:send_after(Pause, self(), activate_socket),
setelement(Pos, State#state{conn_state = blocked, limit_timer = TRef}, Rl1)
end.
@ -453,11 +460,7 @@ activate_socket(#state{conn_state = blocked}) ->
ok;
activate_socket(#state{transport = Transport, socket = Socket, active_n = N}) ->
TrueOrN = case Transport:is_ssl(Socket) of
true -> true; %% Cannot set '{active, N}' for SSL:(
false -> N
end,
case Transport:setopts(Socket, [{active, TrueOrN}]) of
case Transport:setopts(Socket, [{active, N}]) of
ok -> ok;
{error, Reason} ->
self() ! {shutdown, Reason},

View File

@ -12,70 +12,125 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% @doc TODO:
%% 1. Flapping Detection
%% 2. Conflict Detection?
-module(emqx_flapping).
%% Use ets:update_counter???
-include("emqx.hrl").
-include("types.hrl").
-behaviour(gen_server).
-behaviour(gen_statem).
-export([start_link/0]).
-export([start_link/1]).
-export([ is_banned/1
, banned/1
%% This module is used to garbage clean the flapping records
%% gen_statem callbacks
-export([ terminate/3
, code_change/4
, init/1
, initialized/3
, callback_mode/0
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(FLAPPING_TAB, ?MODULE).
-define(SERVER, ?MODULE).
-export([check/3]).
-record(state, {}).
-record(flapping,
{ client_id :: binary()
, check_count :: integer()
, timestamp :: integer()
}).
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-type(flapping_record() :: #flapping{}).
-type(flapping_state() :: flapping | ok).
is_banned(ClientId) ->
ets:member(banned, ClientId).
banned(ClientId) ->
ets:insert(banned, {ClientId, os:timestamp()}).
%% @doc This function is used to initialize flapping records
%% the expiry time unit is minutes.
-spec(init_flapping(ClientId :: binary(), Interval :: integer()) -> flapping_record()).
init_flapping(ClientId, Interval) ->
#flapping{client_id = ClientId,
check_count = 1,
timestamp = emqx_time:now_secs() + Interval}.
%% @doc This function is used to initialize flapping records
%% the expiry time unit is minutes.
-spec(check(Action :: atom(), ClientId :: binary(),
Threshold :: {integer(), integer()}) -> flapping_state()).
check(Action, ClientId, Threshold = {_TimesThreshold, TimeInterval}) ->
check(Action, ClientId, Threshold, init_flapping(ClientId, TimeInterval)).
-spec(check(Action :: atom(), ClientId :: binary(),
Threshold :: {integer(), integer()},
InitFlapping :: flapping_record()) -> flapping_state()).
check(Action, ClientId, Threshold, InitFlapping) ->
case ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.check_count, 1}, InitFlapping) of
1 -> ok;
CheckCount ->
case ets:lookup(?FLAPPING_TAB, ClientId) of
[Flapping] ->
check_flapping(Action, CheckCount, Threshold, Flapping);
_Flapping ->
ok
end
end.
check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval},
Flapping = #flapping{ client_id = ClientId
, timestamp = Timestamp }) ->
case emqx_time:now_secs() of
NowTimestamp when NowTimestamp =< Timestamp,
CheckCount > TimesThreshold ->
ets:delete(?FLAPPING_TAB, ClientId),
flapping;
NowTimestamp when NowTimestamp > Timestamp,
Action =:= disconnect ->
ets:delete(?FLAPPING_TAB, ClientId),
ok;
NowTimestamp ->
NewFlapping = Flapping#flapping{timestamp = NowTimestamp + TimeInterval},
ets:insert(?FLAPPING_TAB, NewFlapping),
ok
end.
%%--------------------------------------------------------------------
%% gen_server callbacks
%% gen_statem callbacks
%%--------------------------------------------------------------------
-spec(start_link(TimerInterval :: [integer()]) -> startlink_ret()).
start_link(TimerInterval) ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []).
init([]) ->
%% ets:new(banned, [public, ordered_set, named_table]),
{ok, #state{}}.
init([TimerInterval]) ->
TabOpts = [ public
, set
, {keypos, 2}
, {write_concurrency, true}
, {read_concurrency, true}],
ok = emqx_tables:new(?FLAPPING_TAB, TabOpts),
{ok, initialized, #{timer_interval => TimerInterval}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
callback_mode() -> [state_functions, state_enter].
handle_cast(_Msg, State) ->
{noreply, State}.
initialized(enter, _OldState, #{timer_interval := Time}) ->
Action = {state_timeout, Time, clean_expired_records},
{keep_state_and_data, Action};
initialized(state_timeout, clean_expired_records, #{}) ->
clean_expired_records(),
repeat_state_and_data.
handle_info(_Info, State) ->
{noreply, State}.
code_change(_Vsn, State, Data, _Extra) ->
{ok, State, Data}.
terminate(_Reason, _State) ->
terminate(_Reason, _StateName, _State) ->
emqx_tables:delete(?FLAPPING_TAB),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%% @doc clean expired records in ets
clean_expired_records() ->
NowTime = emqx_time:now_secs(),
MatchSpec = [{{'$1', '$2', '$3'},[{'<', '$3', NowTime}], [true]}],
ets:select_delete(?FLAPPING_TAB, MatchSpec).

View File

@ -141,16 +141,16 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) ->
{Properties, Rest3} = parse_properties(Rest2, ProtoVer),
{ClientId, Rest4} = parse_utf8_string(Rest3),
ConnPacket = #mqtt_packet_connect{proto_name = ProtoName,
proto_ver = ProtoVer,
is_bridge = (BridgeTag =:= 8),
clean_start = bool(CleanStart),
will_flag = bool(WillFlag),
will_qos = WillQoS,
will_retain = bool(WillRetain),
keepalive = KeepAlive,
properties = Properties,
client_id = ClientId},
ConnPacket = #mqtt_packet_connect{proto_name = ProtoName,
proto_ver = ProtoVer,
is_bridge = (BridgeTag =:= 8),
clean_start = bool(CleanStart),
will_flag = bool(WillFlag),
will_qos = WillQoS,
will_retain = bool(WillRetain),
keepalive = KeepAlive,
properties = Properties,
client_id = ClientId},
{ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4),
{Username, Rest6} = parse_utf8_string(Rest5, bool(UsernameFlag)),
{Passsword, <<>>} = parse_utf8_string(Rest6, bool(PasswordFlag)),

View File

@ -42,12 +42,15 @@ load(Env) ->
on_client_connected(#{client_id := ClientId,
username := Username,
peername := {IpAddr, _}}, ConnAck, ConnAttrs, Env) ->
Attrs = lists:filter(fun({K, _}) -> lists:member(K, ?ATTR_KEYS) end, ConnAttrs),
case emqx_json:safe_encode([{clientid, ClientId},
{username, Username},
{ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))},
{connack, ConnAck},
{ts, os:system_time(second)} | Attrs]) of
Attrs = maps:filter(fun(K, _) ->
lists:member(K, ?ATTR_KEYS)
end, ConnAttrs),
case emqx_json:safe_encode(Attrs#{clientid => ClientId,
username => Username,
ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)),
connack => ConnAck,
ts => os:system_time(second)
}) of
{ok, Payload} ->
emqx:publish(message(qos(Env), topic(connected, ClientId), Payload));
{error, Reason} ->
@ -84,4 +87,3 @@ qos(Env) -> proplists:get_value(qos, Env, 0).
reason(Reason) when is_atom(Reason) -> Reason;
reason({Error, _}) when is_atom(Error) -> Error;
reason(_) -> internal_error.

View File

@ -108,13 +108,20 @@ ensure_file(File) ->
with_loaded_file(File, SuccFun) ->
case read_loaded(File) of
{ok, Names} ->
{ok, Names0} ->
Names = filter_plugins(Names0),
SuccFun(Names);
{error, Error} ->
?LOG(alert, "[Plugins] Failed to read: ~p, error: ~p", [File, Error]),
{error, Error}
end.
filter_plugins(Names) ->
lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1};
({Name1, true}) -> {true, Name1};
({_Name1, false}) -> false
end, Names).
load_plugins(Names, Persistent) ->
Plugins = list(), NotFound = Names -- names(Plugins),
case NotFound of
@ -264,7 +271,7 @@ plugin_loaded(Name, true) ->
case lists:member(Name, Names) of
false ->
%% write file if plugin is loaded
write_loaded(lists:append(Names, [Name]));
write_loaded(lists:append(Names, [{Name, true}]));
true ->
ignore
end;
@ -277,10 +284,7 @@ plugin_unloaded(_Name, false) ->
plugin_unloaded(Name, true) ->
case read_loaded() of
{ok, Names0} ->
Names = lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1};
({Name1, true}) -> {true, Name1};
({_Name1, false}) -> false
end, Names0),
Names = filter_plugins(Names0),
case lists:member(Name, Names) of
true ->
write_loaded(lists:delete(Name, Names));
@ -304,7 +308,7 @@ write_loaded(AppNames) ->
case file:open(File, [binary, write]) of
{ok, Fd} ->
lists:foreach(fun(Name) ->
file:write(Fd, iolist_to_binary(io_lib:format("~s.~n", [Name])))
file:write(Fd, iolist_to_binary(io_lib:format("~p.~n", [Name])))
end, AppNames);
{error, Error} ->
?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]),

View File

@ -40,6 +40,7 @@
-record(pstate, {
zone,
sendfun,
sockname,
peername,
peercert,
proto_ver,
@ -60,6 +61,7 @@
is_bridge,
enable_ban,
enable_acl,
enable_flapping_detect,
acl_deny_action,
recv_stats,
send_stats,
@ -68,7 +70,8 @@
ignore_loop,
topic_alias_maximum,
conn_mod,
credentials
credentials,
ws_cookie
}).
-opaque(state() :: #pstate{}).
@ -85,32 +88,38 @@
%%------------------------------------------------------------------------------
-spec(init(map(), list()) -> state()).
init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) ->
init(SocketOpts = #{ sockname := Sockname
, peername := Peername
, peercert := Peercert
, sendfun := SendFun}, Options) ->
Zone = proplists:get_value(zone, Options),
#pstate{zone = Zone,
sendfun = SendFun,
peername = Peername,
peercert = Peercert,
proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>,
client_id = <<>>,
is_assigned = false,
conn_pid = self(),
username = init_username(Peercert, Options),
clean_start = false,
topic_aliases = #{},
packet_size = emqx_zone:get_env(Zone, max_packet_size),
is_bridge = false,
enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
enable_acl = emqx_zone:get_env(Zone, enable_acl),
acl_deny_action = emqx_zone:get_env(Zone, acl_deny_action, ignore),
recv_stats = #{msg => 0, pkt => 0},
send_stats = #{msg => 0, pkt => 0},
connected = false,
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
topic_alias_maximum = #{to_client => 0, from_client => 0},
conn_mod = maps:get(conn_mod, SocketOpts, undefined),
credentials = #{}}.
#pstate{zone = Zone,
sendfun = SendFun,
sockname = Sockname,
peername = Peername,
peercert = Peercert,
proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>,
client_id = <<>>,
is_assigned = false,
conn_pid = self(),
username = init_username(Peercert, Options),
clean_start = false,
topic_aliases = #{},
packet_size = emqx_zone:get_env(Zone, max_packet_size),
is_bridge = false,
enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
enable_acl = emqx_zone:get_env(Zone, enable_acl),
enable_flapping_detect = emqx_zone:get_env(Zone, enable_flapping_detect, false),
acl_deny_action = emqx_zone:get_env(Zone, acl_deny_action, ignore),
recv_stats = #{msg => 0, pkt => 0},
send_stats = #{msg => 0, pkt => 0},
connected = false,
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
topic_alias_maximum = #{to_client => 0, from_client => 0},
conn_mod = maps:get(conn_mod, SocketOpts, undefined),
credentials = #{},
ws_cookie = maps:get(ws_cookie, SocketOpts, undefined)}.
init_username(Peercert, Options) ->
case proplists:get_value(peer_cert_as_username, Options) of
@ -135,12 +144,13 @@ info(PState = #pstate{conn_props = ConnProps,
topic_aliases = Aliases,
will_msg = WillMsg,
enable_acl = EnableAcl}) ->
attrs(PState) ++ [{conn_props, ConnProps},
{ack_props, AckProps},
{session, Session},
{topic_aliases, Aliases},
{will_msg, WillMsg},
{enable_acl, EnableAcl}].
maps:merge(attrs(PState), #{conn_props => ConnProps,
ack_props => AckProps,
session => Session,
topic_aliases => Aliases,
will_msg => WillMsg,
enable_acl => EnableAcl
}).
attrs(#pstate{zone = Zone,
client_id = ClientId,
@ -155,20 +165,20 @@ attrs(#pstate{zone = Zone,
connected_at = ConnectedAt,
conn_mod = ConnMod,
credentials = Credentials}) ->
[{zone, Zone},
{client_id, ClientId},
{username, Username},
{peername, Peername},
{peercert, Peercert},
{proto_ver, ProtoVer},
{proto_name, ProtoName},
{clean_start, CleanStart},
{keepalive, Keepalive},
{is_bridge, IsBridge},
{connected_at, ConnectedAt},
{conn_mod, ConnMod},
{credentials, Credentials}
].
#{ zone => Zone
, client_id => ClientId
, username => Username
, peername => Peername
, peercert => Peercert
, proto_ver => ProtoVer
, proto_name => ProtoName
, clean_start => CleanStart
, keepalive => Keepalive
, is_bridge => IsBridge
, connected_at => ConnectedAt
, conn_mod => ConnMod
, credentials => Credentials
}.
attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
get_property('Receive-Maximum', ConnProps, 65535);
@ -201,12 +211,16 @@ client_id(#pstate{client_id = ClientId}) ->
credentials(#pstate{zone = Zone,
client_id = ClientId,
username = Username,
sockname = Sockname,
peername = Peername,
peercert = Peercert}) ->
peercert = Peercert,
ws_cookie = WsCookie}) ->
with_cert(#{zone => Zone,
client_id => ClientId,
sockname => Sockname,
username => Username,
peername => Peername,
ws_cookie => WsCookie,
mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert).
with_cert(Credentials, undefined) -> Credentials;
@ -367,21 +381,22 @@ process(?CONNECT_PACKET(
username = Username,
password = Password} = ConnPkt), PState) ->
NewClientId = maybe_use_username_as_clientid(ClientId, Username, PState),
%% TODO: Mountpoint...
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
PState0 = maybe_use_username_as_clientid(ClientId,
set_username(Username,
PState#pstate{proto_ver = ProtoVer,
proto_name = ProtoName,
clean_start = CleanStart,
keepalive = Keepalive,
conn_props = ConnProps,
is_bridge = IsBridge,
connected_at = os:timestamp()})),
NewClientId = PState0#pstate.client_id,
emqx_logger:set_metadata_client_id(NewClientId),
%% TODO: Mountpoint...
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
PState0 = set_username(Username,
PState#pstate{client_id = NewClientId,
proto_ver = ProtoVer,
proto_name = ProtoName,
clean_start = CleanStart,
keepalive = Keepalive,
conn_props = ConnProps,
is_bridge = IsBridge,
connected_at = os:timestamp()}),
Credentials = credentials(PState0),
PState1 = PState0#pstate{credentials = Credentials},
connack(
@ -692,12 +707,12 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send})
%%------------------------------------------------------------------------------
%% Maybe use username replace client id
maybe_use_username_as_clientid(ClientId, undefined, _PState) ->
ClientId;
maybe_use_username_as_clientid(ClientId, Username, #pstate{zone = Zone}) ->
maybe_use_username_as_clientid(ClientId, PState = #pstate{username = undefined}) ->
PState#pstate{client_id = ClientId};
maybe_use_username_as_clientid(ClientId, PState = #pstate{username = Username, zone = Zone}) ->
case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
true -> Username;
false -> ClientId
true -> PState#pstate{client_id = Username};
false -> PState#pstate{client_id = ClientId}
end.
%%------------------------------------------------------------------------------
@ -759,6 +774,7 @@ make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer,
check_connect(Packet, PState) ->
run_check_steps([fun check_proto_ver/2,
fun check_client_id/2,
fun check_flapping/2,
fun check_banned/2,
fun check_will_topic/2], Packet, PState).
@ -791,6 +807,9 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}
false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
end.
check_flapping(#mqtt_packet_connect{}, PState) ->
do_flapping_detect(connect, PState).
check_banned(_ConnPkt, #pstate{enable_ban = false}) ->
ok;
check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username},
@ -889,14 +908,16 @@ inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) ->
terminate(_Reason, #pstate{client_id = undefined}) ->
ok;
terminate(_Reason, #pstate{connected = false}) ->
terminate(_Reason, PState = #pstate{connected = false}) ->
do_flapping_detect(disconnect, PState),
ok;
terminate(conflict, _PState) ->
ok;
terminate(discard, _PState) ->
terminate(Reason, PState) when Reason =:= conflict;
Reason =:= discard ->
do_flapping_detect(disconnect, PState),
ok;
terminate(Reason, #pstate{credentials = Credentials}) ->
terminate(Reason, PState = #pstate{credentials = Credentials}) ->
do_flapping_detect(disconnect, PState),
?LOG(info, "[Protocol] Shutdown for ~p", [Reason]),
ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]).
@ -925,6 +946,25 @@ flag(true) -> 1.
%%------------------------------------------------------------------------------
%% Execute actions in case acl deny
do_flapping_detect(Action, #pstate{zone = Zone,
client_id = ClientId,
enable_flapping_detect = true}) ->
BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000),
Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20),
Until = erlang:system_time(second) + BanExpiryInterval,
case emqx_flapping:check(Action, ClientId, Threshold) of
flapping ->
emqx_banned:add(#banned{who = {client_id, ClientId},
reason = <<"flapping">>,
by = <<"flapping_checker">>,
until = Until}),
ok;
_Other ->
ok
end;
do_flapping_detect(_Action, _PState) ->
ok.
do_acl_deny_action(?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
acl_deny_action = disconnect}) ->

View File

@ -1,70 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_rate_limiter).
-behaviour(gen_server).
%% API
-export([start_link/0]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(SERVER, ?MODULE).
-record(state, {}).
%%------------------------------------------------------------------------------
%%% API
%%------------------------------------------------------------------------------
%% @doc Starts the server
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%------------------------------------------------------------------------------
%%% gen_server callbacks
%%------------------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------
%%% Internal functions
%%------------------------------------------------------------------------------

View File

@ -676,6 +676,7 @@ terminate(Reason, #state{will_msg = WillMsg,
username = Username,
conn_pid = ConnPid,
old_conn_pid = OldConnPid}) ->
emqx_metrics:commit(),
send_willmsg(WillMsg),
[maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).

View File

@ -73,6 +73,8 @@
-define(PUBSUB_STATS, [
'topics/count',
'topics/max',
'suboptions/count',
'suboptions/max',
'subscribers/count',
'subscribers/max',
'subscriptions/count',
@ -242,9 +244,12 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------
safe_update_element(Key, Val) ->
try ets:update_element(?TAB, Key, {2, Val})
try ets:update_element(?TAB, Key, {2, Val}) of
false ->
ets:insert_new(?TAB, {Key, Val});
true ->
true
catch
error:badarg ->
ets:insert_new(?TAB, {Key, Val})
?LOG(warning, "[Stats] Update ~p to ~p failed", [Key, Val])
end.

View File

@ -33,9 +33,9 @@
, code_change/3
]).
-type(option() :: {long_gc, false | pos_integer()}
| {long_schedule, false | pos_integer()}
| {large_heap, pos_integer()}
-type(option() :: {long_gc, non_neg_integer()}
| {long_schedule, non_neg_integer()}
| {large_heap, non_neg_integer()}
| {busy_port, boolean()}
| {busy_dist_port, boolean()}).
@ -66,11 +66,11 @@ parse_opt(Opts) ->
parse_opt(Opts, []).
parse_opt([], Acc) ->
Acc;
parse_opt([{long_gc, false}|Opts], Acc) ->
parse_opt([{long_gc, 0}|Opts], Acc) ->
parse_opt(Opts, Acc);
parse_opt([{long_gc, Ms}|Opts], Acc) when is_integer(Ms) ->
parse_opt(Opts, [{long_gc, Ms}|Acc]);
parse_opt([{long_schedule, false}|Opts], Acc) ->
parse_opt([{long_schedule, 0}|Opts], Acc) ->
parse_opt(Opts, Acc);
parse_opt([{long_schedule, Ms}|Opts], Acc) when is_integer(Ms) ->
parse_opt(Opts, [{long_schedule, Ms}|Acc]);
@ -177,4 +177,3 @@ safe_publish(Event, WarnMsg) ->
sysmon_msg(Topic, Payload) ->
Msg = emqx_message:make(?SYSMON, Topic, Payload),
emqx_message:set_flag(sys, Msg).

View File

@ -14,7 +14,7 @@
-module(emqx_tables).
-export([new/2]).
-export([new/2, delete/1]).
-export([ lookup_value/2
, lookup_value/3
@ -30,6 +30,16 @@ new(Tab, Opts) ->
Tab -> ok
end.
-spec(delete(atom()) -> ok).
delete(Tab) ->
case ets:info(Tab, name) of
undefined ->
ok;
Tab ->
ets:delete(Tab),
ok
end.
%% KV lookup
-spec(lookup_value(atom(), term()) -> any()).
lookup_value(Tab, Key) ->
@ -42,4 +52,3 @@ lookup_value(Tab, Key, Def) ->
catch
error:badarg -> Def
end.

View File

@ -53,6 +53,7 @@
-export_type([ alarm/0
, plugin/0
, banned/0
, command/0
]).
@ -79,11 +80,16 @@
| banned
| bad_authentication_method).
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()).
-type(credentials() :: #{client_id := client_id(),
username := username(),
peername := peername(),
auth_result := auth_result(),
zone => zone(),
-type(credentials() :: #{zone := zone(),
client_id := client_id(),
username := username(),
sockname := peername(),
peername := peername(),
ws_cookie := undefined | list(),
mountpoint := binary(),
password => binary(),
auth_result => auth_result(),
anonymous => boolean(),
atom() => term()
}).
-type(subscription() :: #subscription{}).
@ -91,6 +97,7 @@
-type(topic_table() :: [{topic(), subopts()}]).
-type(payload() :: binary() | iodata()).
-type(message() :: #message{}).
-type(banned() :: #banned{}).
-type(delivery() :: #delivery{}).
-type(deliver_results() :: [{route, node(), topic()} |
{dispatch, topic(), pos_integer()}]).
@ -98,4 +105,3 @@
-type(alarm() :: #alarm{}).
-type(plugin() :: #plugin{}).
-type(command() :: #command{}).

View File

@ -61,11 +61,11 @@ info(#state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
ProtoInfo = emqx_protocol:info(ProtoState),
ConnInfo = [{socktype, websocket},
{conn_state, running},
{peername, Peername},
{sockname, Sockname}],
lists:append([ConnInfo, ProtoInfo]).
ConnInfo = #{socktype => websocket,
conn_state => running,
peername => Peername,
sockname => Sockname},
maps:merge(ProtoInfo, ConnInfo).
%% for dashboard
attrs(WSPid) when is_pid(WSPid) ->
@ -74,10 +74,10 @@ attrs(WSPid) when is_pid(WSPid) ->
attrs(#state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
SockAttrs = [{peername, Peername},
{sockname, Sockname}],
SockAttrs = #{peername => Peername,
sockname => Sockname},
ProtoAttrs = emqx_protocol:attrs(ProtoState),
lists:usort(lists:append(SockAttrs, ProtoAttrs)).
maps:merge(SockAttrs, ProtoAttrs).
stats(WSPid) when is_pid(WSPid) ->
call(WSPid, stats);
@ -138,16 +138,29 @@ websocket_init(#state{request = Req, options = Options}) ->
Peername = cowboy_req:peer(Req),
Sockname = cowboy_req:sock(Req),
Peercert = cowboy_req:cert(Req),
WsCookie = try cowboy_req:parse_cookies(Req)
catch
error:badarg ->
?LOG(error, "[WS Connection] Illegal cookie"),
undefined;
Error:Reason ->
?LOG(error,
"[WS Connection] Cookie is parsed failed, Error: ~p, Reason ~p",
[Error, Reason]),
undefined
end,
ProtoState = emqx_protocol:init(#{peername => Peername,
sockname => Sockname,
peercert => Peercert,
sendfun => send_fun(self()),
ws_cookie => WsCookie,
conn_mod => ?MODULE}, Options),
ParserState = emqx_protocol:parser(ProtoState),
Zone = proplists:get_value(zone, Options),
EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
ok = emqx_misc:init_proc_mng_policy(Zone),
{ok, #state{peername = Peername,
sockname = Sockname,
parse_state = ParserState,

View File

@ -70,25 +70,21 @@ all() ->
groups() ->
[{connect, [non_parallel_tests],
[
mqtt_connect,
mqtt_connect_with_tcp,
mqtt_connect_with_will_props,
mqtt_connect_with_ssl_oneway,
mqtt_connect_with_ssl_twoway,
mqtt_connect_with_ws
]},
{publish, [non_parallel_tests],
[
packet_size
]}].
[mqtt_connect,
mqtt_connect_with_tcp,
mqtt_connect_with_will_props,
mqtt_connect_with_ssl_oneway,
mqtt_connect_with_ssl_twoway,
mqtt_connect_with_ws]},
{publish, [non_parallel_tests],
[packet_size]}].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
%%--------------------------------------------------------------------
%% Protocol Test
@ -127,9 +123,9 @@ mqtt_connect_with_will_props(_) ->
mqtt_connect_with_ssl_oneway(_) ->
emqx:shutdown(),
emqx_ct_broker_helpers:change_opts(ssl_oneway),
emqx_ct_helpers:change_emqx_opts(ssl_oneway),
emqx:start(),
ClientSsl = emqx_ct_broker_helpers:client_ssl(),
ClientSsl = emqx_ct_helpers:client_ssl(),
{ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock}
= emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000),
Packet = raw_send_serialize(?CLIENT),
@ -145,11 +141,11 @@ mqtt_connect_with_ssl_oneway(_) ->
mqtt_connect_with_ssl_twoway(_Config) ->
emqx:shutdown(),
emqx_ct_broker_helpers:change_opts(ssl_twoway),
emqx_ct_helpers:change_emqx_opts(ssl_twoway),
emqx:start(),
ClientSsl = emqx_ct_broker_helpers:client_ssl_twoway(),
ClientSsl = emqx_ct_helpers:client_ssl_twoway(),
{ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock}
= emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000),
= emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000),
Packet = raw_send_serialize(?CLIENT),
emqx_client_sock:setopts(Sock, [{active, once}]),
emqx_client_sock:send(Sock, Packet),

View File

@ -31,39 +31,41 @@ all() ->
[{group, access_control},
{group, acl_cache},
{group, access_control_cache_mode},
{group, access_rule}
].
{group, access_rule}].
groups() ->
[{access_control, [sequence],
[reload_acl,
register_mod,
unregister_mod,
check_acl_1,
check_acl_2
]},
[reload_acl,
register_mod,
unregister_mod,
check_acl_1,
check_acl_2]},
{access_control_cache_mode, [],
[
acl_cache_basic,
acl_cache_expiry,
acl_cache_cleanup,
acl_cache_full
]},
{acl_cache, [], [
put_get_del_cache,
[acl_cache_basic,
acl_cache_expiry,
acl_cache_cleanup,
acl_cache_full]},
{acl_cache, [],
[put_get_del_cache,
cache_update,
cache_expiry,
cache_replacement,
cache_cleanup,
cache_auto_emtpy,
cache_auto_cleanup
]},
cache_auto_cleanup]},
{access_rule, [],
[compile_rule,
match_rule]}].
[compile_rule,
match_rule]}].
init_per_group(Group, Config) when Group =:= access_control;
Group =:= access_control_cache_mode ->
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
init_per_group(Group, Config) when Group =:= access_control;
Group =:= access_control_cache_mode ->
prepare_config(Group),
application:load(emqx),
Config;
@ -90,7 +92,6 @@ set_acl_config_file(_Group) ->
write_config("access_SUITE_acl.conf", Rules),
application:set_env(emqx, acl_file, "access_SUITE_acl.conf").
write_config(Filename, Terms) ->
file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]).
@ -98,7 +99,6 @@ end_per_group(_Group, Config) ->
Config.
init_per_testcase(_TestCase, Config) ->
%% {ok, _Pid} =
?AC:start_link(),
Config.
end_per_testcase(_TestCase, _Config) ->
@ -109,7 +109,6 @@ per_testcase_config(acl_cache_full, Config) ->
per_testcase_config(_TestCase, Config) ->
Config.
%%--------------------------------------------------------------------
%% emqx_access_control
%%--------------------------------------------------------------------

View File

@ -24,51 +24,18 @@
-include("emqx_mqtt.hrl").
-include("emqx.hrl").
all() -> [t_alarm_handler, t_logger_handler].
all() -> [t_alarm_handler,
t_logger_handler].
init_per_suite(Config) ->
[start_apps(App, {SchemaFile, ConfigFile}) ||
{App, SchemaFile, ConfigFile}
<- [{emqx, local_path("priv/emqx.schema"),
local_path("etc/gen.emqx.conf")}]],
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
Config.
end_per_suite(_Config) ->
application:stop(emqx).
local_path(RelativePath) ->
filename:join([get_base_dir(), RelativePath]).
deps_path(App, RelativePath) ->
%% Note: not lib_dir because etc dir is not sym-link-ed to _build dir
%% but priv dir is
Path0 = code:priv_dir(App),
Path = case file:read_link(Path0) of
{ok, Resolved} -> Resolved;
{error, _} -> Path0
end,
filename:join([Path, "..", RelativePath]).
get_base_dir() ->
{file, Here} = code:is_loaded(?MODULE),
filename:dirname(filename:dirname(Here)).
start_apps(App, {SchemaFile, ConfigFile}) ->
read_schema_configs(App, {SchemaFile, ConfigFile}),
set_special_configs(App),
application:ensure_all_started(App).
read_schema_configs(App, {SchemaFile, ConfigFile}) ->
ct:pal("Read configs - SchemaFile: ~p, ConfigFile: ~p", [SchemaFile, ConfigFile]),
Schema = cuttlefish_schema:files([SchemaFile]),
Conf = conf_parse:file(ConfigFile),
NewConfig = cuttlefish_generator:map(Schema, Conf),
Vals = proplists:get_value(App, NewConfig, []),
[application:set_env(App, Par, Value) || {Par, Value} <- Vals].
emqx_ct_helpers:stop_apps([]).
set_special_configs(emqx) ->
application:set_env(emqx, acl_file, deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"));
application:set_env(emqx, acl_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"));
set_special_configs(_App) ->
ok.

View File

@ -24,7 +24,7 @@
all() -> [t_banned_all].
t_banned_all(_) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
emqx_banned:start_link(),
TimeNow = erlang:system_time(second),
Banned = #banned{who = {client_id, <<"TestClient">>},
@ -49,5 +49,4 @@ t_banned_all(_) ->
?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>,
username => undefined,
peername => {undefined, undefined}})),
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).

View File

@ -14,11 +14,12 @@
-module(emqx_bridge_SUITE).
-export([all/0, init_per_suite/1, end_per_suite/1]).
-export([t_rpc/1,
t_mqtt/1,
t_mngr/1
]).
-export([ all/0
, init_per_suite/1
, end_per_suite/1]).
-export([ t_rpc/1
, t_mqtt/1
, t_mngr/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@ -27,21 +28,21 @@
-define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
all() -> [t_rpc,
t_mqtt,
t_mngr].
all() -> [ t_rpc
, t_mqtt
, t_mngr].
init_per_suite(Config) ->
case node() of
nonode@nohost ->
net_kernel:start(['emqx@127.0.0.1', longnames]);
_ ->
ok
nonode@nohost -> net_kernel:start(['emqx@127.0.0.1', longnames]);
_ -> ok
end,
emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]).
emqx_ct_helpers:start_apps([]),
emqx_logger:set_log_level(error),
[{log_level, error} | Config].
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
t_mngr(Config) when is_list(Config) ->
Subs = [{<<"a">>, 1}, {<<"b">>, 2}],
@ -50,8 +51,7 @@ t_mngr(Config) when is_list(Config) ->
connect_module => emqx_bridge_rpc,
mountpoint => <<"forwarded">>,
subscriptions => Subs,
start_type => auto
},
start_type => auto},
Name = ?FUNCTION_NAME,
{ok, Pid} = emqx_bridge:start_link(Name, Cfg),
try
@ -77,8 +77,7 @@ t_rpc(Config) when is_list(Config) ->
forwards => [<<"t_rpc/#">>],
connect_module => emqx_bridge_rpc,
mountpoint => <<"forwarded">>,
start_type => auto
},
start_type => auto},
{ok, Pid} = emqx_bridge:start_link(?FUNCTION_NAME, Cfg),
ClientId = <<"ClientId">>,
try
@ -132,8 +131,7 @@ t_mqtt(Config) when is_list(Config) ->
%% Consume back to forwarded message for verification
%% NOTE: this is a indefenite loopback without mocking emqx_bridge:import_batch/2
subscriptions => [{ForwardedTopic, _QoS = 1}],
start_type => auto
},
start_type => auto},
Tester = self(),
Ref = make_ref(),
meck:new(emqx_bridge, [passthrough, no_history]),
@ -156,14 +154,14 @@ t_mqtt(Config) when is_list(Config) ->
Max = 100,
Msgs = lists:seq(1, Max),
lists:foreach(fun(I) ->
Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic, integer_to_binary(I)),
emqx_session:publish(SPid, I, Msg)
Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic, integer_to_binary(I)),
emqx_session:publish(SPid, I, Msg)
end, Msgs),
ok = receive_and_match_messages(Ref, Msgs),
Msgs2 = lists:seq(Max + 1, Max * 2),
lists:foreach(fun(I) ->
Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)),
emqx_session:publish(SPid, I, Msg)
Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)),
emqx_session:publish(SPid, I, Msg)
end, Msgs2),
ok = receive_and_match_messages(Ref, Msgs2),
emqx_mock_client:close_session(ConnPid)

View File

@ -32,23 +32,21 @@ all() ->
{group, stats}].
groups() ->
[
{pubsub, [sequence], [subscribe_unsubscribe,
[{pubsub, [sequence], [subscribe_unsubscribe,
publish, pubsub,
t_shared_subscribe,
dispatch_with_no_sub,
'pubsub#', 'pubsub+']},
{session, [sequence], [start_session]},
{metrics, [sequence], [inc_dec_metric]},
{stats, [sequence], [set_get_stat]}
].
{stats, [sequence], [set_get_stat]}].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
%%--------------------------------------------------------------------
%% PubSub Test

View File

@ -46,11 +46,11 @@ groups() ->
dollar_topics_test]}].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
receive_messages(Count) ->
receive_messages(Count, []).

View File

@ -31,11 +31,11 @@ groups() ->
t_lookup_conn_pid]}].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
init_per_testcase(_TestCase, Config) ->
register_connection(),

View File

@ -27,11 +27,11 @@ all() ->
[t_connect_api].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
t_connect_api(_Config) ->
{ok, T1} = emqx_client:start_link([{host, "localhost"},
@ -51,16 +51,16 @@ t_connect_api(_Config) ->
emqx_client:disconnect(T1).
t_info(ConnInfo) ->
?assertEqual(tcp, proplists:get_value(socktype, ConnInfo)),
?assertEqual(running, proplists:get_value(conn_state, ConnInfo)),
?assertEqual(<<"client1">>, proplists:get_value(client_id, ConnInfo)),
?assertEqual(<<"testuser1">>, proplists:get_value(username, ConnInfo)),
?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, ConnInfo)).
?assertEqual(tcp, maps:get(socktype, ConnInfo)),
?assertEqual(running, maps:get(conn_state, ConnInfo)),
?assertEqual(<<"client1">>, maps:get(client_id, ConnInfo)),
?assertEqual(<<"testuser1">>, maps:get(username, ConnInfo)),
?assertEqual(<<"MQTT">>, maps:get(proto_name, ConnInfo)).
t_attrs(AttrsData) ->
?assertEqual(<<"client1">>, proplists:get_value(client_id, AttrsData)),
?assertEqual(emqx_connection, proplists:get_value(conn_mod, AttrsData)),
?assertEqual(<<"testuser1">>, proplists:get_value(username, AttrsData)).
?assertEqual(<<"client1">>, maps:get(client_id, AttrsData)),
?assertEqual(emqx_connection, maps:get(conn_mod, AttrsData)),
?assertEqual(<<"testuser1">>, maps:get(username, AttrsData)).
t_stats(StatsData) ->
?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0),

View File

@ -1,198 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_ct_broker_helpers).
-compile(export_all).
-compile(nowarn_export_all).
-define(APP, emqx).
-define(MQTT_SSL_TWOWAY, [{cacertfile, "certs/cacert.pem"},
{verify, verify_peer},
{fail_if_no_peer_cert, true}]).
-define(MQTT_SSL_CLIENT, [{keyfile, "certs/client-key.pem"},
{cacertfile, "certs/cacert.pem"},
{certfile, "certs/client-cert.pem"}]).
-define(CIPHERS, [{ciphers,
["ECDHE-ECDSA-AES256-GCM-SHA384",
"ECDHE-RSA-AES256-GCM-SHA384",
"ECDHE-ECDSA-AES256-SHA384",
"ECDHE-RSA-AES256-SHA384","ECDHE-ECDSA-DES-CBC3-SHA",
"ECDH-ECDSA-AES256-GCM-SHA384",
"ECDH-RSA-AES256-GCM-SHA384",
"ECDH-ECDSA-AES256-SHA384","ECDH-RSA-AES256-SHA384",
"DHE-DSS-AES256-GCM-SHA384","DHE-DSS-AES256-SHA256",
"AES256-GCM-SHA384","AES256-SHA256",
"ECDHE-ECDSA-AES128-GCM-SHA256",
"ECDHE-RSA-AES128-GCM-SHA256",
"ECDHE-ECDSA-AES128-SHA256",
"ECDHE-RSA-AES128-SHA256",
"ECDH-ECDSA-AES128-GCM-SHA256",
"ECDH-RSA-AES128-GCM-SHA256",
"ECDH-ECDSA-AES128-SHA256","ECDH-RSA-AES128-SHA256",
"DHE-DSS-AES128-GCM-SHA256","DHE-DSS-AES128-SHA256",
"AES128-GCM-SHA256","AES128-SHA256",
"ECDHE-ECDSA-AES256-SHA","ECDHE-RSA-AES256-SHA",
"DHE-DSS-AES256-SHA","ECDH-ECDSA-AES256-SHA",
"ECDH-RSA-AES256-SHA","AES256-SHA",
"ECDHE-ECDSA-AES128-SHA","ECDHE-RSA-AES128-SHA",
"DHE-DSS-AES128-SHA","ECDH-ECDSA-AES128-SHA",
"ECDH-RSA-AES128-SHA","AES128-SHA"]}]).
run_setup_steps() ->
_ = run_setup_steps([]),
%% return ok to be backward compatible
ok.
run_setup_steps(Config) ->
NewConfig = generate_config(),
lists:foreach(fun set_app_env/1, NewConfig),
set_bridge_env(),
{ok, _} = application:ensure_all_started(?APP),
set_log_level(Config),
Config.
run_teardown_steps() ->
?APP:shutdown().
generate_config() ->
Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]),
Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]),
cuttlefish_generator:map(Schema, Conf).
set_log_level(Config) ->
case proplists:get_value(log_level, Config) of
undefined -> ok;
Level -> emqx_logger:set_log_level(Level)
end.
get_base_dir(Module) ->
{file, Here} = code:is_loaded(Module),
filename:dirname(filename:dirname(Here)).
get_base_dir() ->
get_base_dir(?MODULE).
local_path(Components, Module) ->
filename:join([get_base_dir(Module) | Components]).
local_path(Components) ->
local_path(Components, ?MODULE).
set_app_env({App, Lists}) ->
lists:foreach(fun({acl_file, _Var}) ->
application:set_env(App, acl_file, local_path(["etc", "acl.conf"]));
({plugins_loaded_file, _Var}) ->
application:set_env(App, plugins_loaded_file, local_path(["test", "emqx_SUITE_data","loaded_plugins"]));
({Par, Var}) ->
application:set_env(App, Par, Var)
end, Lists).
set_bridge_env() ->
BridgeEnvs = bridge_conf(),
application:set_env(?APP, bridges, BridgeEnvs).
change_opts(SslType) ->
{ok, Listeners} = application:get_env(?APP, listeners),
NewListeners =
lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) ->
case Protocol of
ssl ->
SslOpts = proplists:get_value(ssl_options, Opts),
Keyfile = local_path(["etc/certs", "key.pem"]),
Certfile = local_path(["etc/certs", "cert.pem"]),
TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}),
TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}),
TupleList3 =
case SslType of
ssl_twoway->
CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]),
MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}),
lists:merge(TupleList2, MutSslList);
_ ->
lists:filter(fun ({cacertfile, _}) -> false;
({verify, _}) -> false;
({fail_if_no_peer_cert, _}) -> false;
(_) -> true
end, TupleList2)
end,
[{Protocol, Port, lists:keyreplace(ssl_options, 1, Opts, {ssl_options, TupleList3})} | Acc];
_ ->
[Listener | Acc]
end
end, [], Listeners),
application:set_env(?APP, listeners, NewListeners).
client_ssl_twoway() ->
[{Key, local_path(["etc", File])} || {Key, File} <- ?MQTT_SSL_CLIENT] ++ ?CIPHERS.
client_ssl() ->
?CIPHERS ++ [{reuse_sessions, true}].
wait_mqtt_payload(Payload) ->
receive
{publish, #{payload := Payload}} ->
ct:pal("OK - received msg: ~p~n", [Payload])
after 1000 ->
ct:fail({timeout, Payload, {msg_box, flush()}})
end.
not_wait_mqtt_payload(Payload) ->
receive
{publish, #{payload := Payload}} ->
ct:fail({received, Payload})
after 1000 ->
ct:pal("OK - msg ~p is not received", [Payload])
end.
flush() ->
flush([]).
flush(Msgs) ->
receive
M -> flush([M|Msgs])
after
0 -> lists:reverse(Msgs)
end.
bridge_conf() ->
[ {local_rpc,
[{connect_module, emqx_bridge_rpc},
{address, node()},
{forwards, ["bridge-1/#", "bridge-2/#"]}
]}
].
% [{aws,
% [{connect_module, emqx_bridge_mqtt},
% {username,"user"},
% {address,"127.0.0.1:1883"},
% {clean_start,true},
% {client_id,"bridge_aws"},
% {forwards,["topic1/#","topic2/#"]},
% {keepalive,60000},
% {max_inflight,32},
% {mountpoint,"bridge/aws/${node}/"},
% {password,"passwd"},
% {proto_ver,mqttv4},
% {queue,
% #{batch_coun t_limit => 1000,
% replayq_dir => "data/emqx_aws_bridge/",
% replayq_seg_bytes => 10485760}},
% {reconnect_delay_ms,30000},
% {ssl,false},
% {ssl_opts,[{versions,[tlsv1,'tlsv1.1','tlsv1.2']}]},
% {start_type,manual},
% {subscriptions,[{"cmd/topic1",1},{"cmd/topic2",1}]}]}].

View File

@ -1,68 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_ct_helpers).
-export([ensure_mnesia_stopped/0, wait_for/4]).
ensure_mnesia_stopped() ->
ekka_mnesia:ensure_stopped(),
ekka_mnesia:delete_schema().
%% Help function to wait for Fun to yield 'true'.
wait_for(Fn, Ln, F, Timeout) ->
{Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end),
wait_for_down(Fn, Ln, Timeout, Pid, Mref, false).
wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) ->
receive
{'DOWN', Mref, process, Pid, normal} ->
ok;
{'DOWN', Mref, process, Pid, {unexpected, Result}} ->
erlang:error({unexpected, Fn, Ln, Result});
{'DOWN', Mref, process, Pid, {crashed, {C, E, S}}} ->
erlang:raise(C, {Fn, Ln, E}, S)
after
Timeout ->
case Kill of
true ->
erlang:demonitor(Mref, [flush]),
erlang:exit(Pid, kill),
erlang:error({Fn, Ln, timeout});
false ->
Pid ! stop,
wait_for_down(Fn, Ln, Timeout, Pid, Mref, true)
end
end.
wait_loop(_F, ok) -> exit(normal);
wait_loop(F, LastRes) ->
receive
stop -> erlang:exit(LastRes)
after
100 ->
Res = catch_call(F),
wait_loop(F, Res)
end.
catch_call(F) ->
try
case F() of
true -> ok;
Other -> {unexpected, Other}
end
catch
C : E : S ->
{crashed, {C, E, S}}
end.

View File

@ -0,0 +1,60 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_flapping_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
all() ->
[t_flapping].
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([]),
prepare_for_test(),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_flapping(_Config) ->
process_flag(trap_exit, true),
flapping_connect(5),
{ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]),
{error, _} = emqx_client:connect(C),
receive
{'EXIT', Client, _Reason} ->
ct:log("receive exit signal, Client: ~p", [Client])
after 1000 ->
ct:log("timeout")
end.
flapping_connect(Times) ->
[flapping_connect() || _ <- lists:seq(1, Times)].
flapping_connect() ->
{ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]),
{ok, _} = emqx_client:connect(C),
ok = emqx_client:disconnect(C).
prepare_for_test() ->
emqx_zone:set_env(external, enable_flapping_detect, true),
emqx_zone:set_env(external, flapping_threshold, {10, 60}),
emqx_zone:set_env(external, flapping_expiry_interval, 3600).

View File

@ -49,9 +49,27 @@ restart_listeners(_) ->
ok = emqx_listeners:restart(),
ok = emqx_listeners:stop().
render_config_file() ->
Path = local_path(["etc", "emqx.conf"]),
{ok, Temp} = file:read_file(Path),
Vars0 = mustache_vars(),
Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- Vars0],
Targ = bbmustache:render(Temp, Vars),
NewName = Path ++ ".rendered",
ok = file:write_file(NewName, Targ),
NewName.
mustache_vars() ->
[{platform_data_dir, local_path(["data"])},
{platform_etc_dir, local_path(["etc"])},
{platform_log_dir, local_path(["log"])},
{platform_plugins_dir, local_path(["plugins"])}
].
generate_config() ->
Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]),
Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]),
ConfFile = render_config_file(),
Conf = conf_parse:file(ConfFile),
cuttlefish_generator:map(Schema, Conf).
set_app_env({App, Lists}) ->

View File

@ -27,24 +27,21 @@
<<"/TopicA">>]).
-define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{
client_id = <<"mqtt_client">>,
username = <<"emqx">>,
password = <<"public">>})).
client_id = <<"mqtt_client">>,
username = <<"emqx">>,
password = <<"public">>})).
all() ->
[
{group, mqtt_common},
[{group, mqtt_common},
{group, mqttv4},
{group, mqttv5},
{group, acl},
{group, frame_partial}
].
{group, frame_partial}].
groups() ->
[{mqtt_common, [sequence],
[will_topic_check,
will_acl_check
]},
will_acl_check]},
{mqttv4, [sequence],
[connect_v4,
subscribe_v4]},
@ -54,18 +51,15 @@ groups() ->
{acl, [sequence],
[acl_deny_action_ct]},
{frame_partial, [sequence],
[handle_followed_packet]}].
[handle_followed_packet]}].
init_per_suite(Config) ->
[start_apps(App, SchemaFile, ConfigFile) ||
{App, SchemaFile, ConfigFile}
<- [{emqx, deps_path(emqx, "priv/emqx.schema"),
deps_path(emqx, "etc/gen.emqx.conf")}]],
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
emqx_zone:set_env(external, max_topic_alias, 20),
Config.
end_per_suite(_Config) ->
application:stop(emqx).
emqx_ct_helpers:stop_apps([]).
batch_connect(NumberOfConnections) ->
batch_connect([], NumberOfConnections).
@ -567,7 +561,7 @@ will_topic_check(_) ->
emqx_client:stop(Client),
ct:sleep(100),
false = is_process_alive(Client),
emqx_ct_broker_helpers:wait_mqtt_payload(<<"I have died">>),
emqx_ct_helpers:wait_mqtt_payload(<<"I have died">>),
emqx_client:stop(T).
will_acl_check(_) ->
@ -613,37 +607,12 @@ acl_deny_do_disconnect(subscribe, QoS, Topic) ->
after 1000 -> ct:fail({timeout, wait_tcp_closed})
end.
start_apps(App, SchemaFile, ConfigFile) ->
read_schema_configs(App, SchemaFile, ConfigFile),
set_special_configs(App),
application:ensure_all_started(App).
read_schema_configs(App, SchemaFile, ConfigFile) ->
Schema = cuttlefish_schema:files([SchemaFile]),
Conf = conf_parse:file(ConfigFile),
NewConfig = cuttlefish_generator:map(Schema, Conf),
Vals = proplists:get_value(App, NewConfig, []),
[application:set_env(App, Par, Value) || {Par, Value} <- Vals].
set_special_configs(emqx) ->
application:set_env(emqx, enable_acl_cache, false),
application:set_env(emqx, plugins_loaded_file,
deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")),
emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")),
application:set_env(emqx, acl_deny_action, disconnect),
application:set_env(emqx, acl_file,
deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"));
emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"));
set_special_configs(_App) ->
ok.
deps_path(App, RelativePath) ->
%% Note: not lib_dir because etc dir is not sym-link-ed to _build dir
%% but priv dir is
Path0 = code:priv_dir(App),
Path = case file:read_link(Path0) of
{ok, Resolved} -> Resolved;
{error, _} -> Path0
end,
filename:join([Path, "..", RelativePath]).
local_path(RelativePath) ->
deps_path(emqx_auth_username, RelativePath).

View File

@ -36,11 +36,11 @@ groups() ->
t_unexpected]}].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
init_per_testcase(_TestCase, Config) ->
clear_tables(),
@ -106,4 +106,3 @@ t_unexpected(_) ->
clear_tables() ->
lists:foreach(fun mnesia:clear_table/1, [emqx_route, emqx_trie, emqx_trie_node]).

View File

@ -24,11 +24,11 @@
all() -> [t_rpc].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
t_rpc(_) ->
60000 = emqx_rpc:call(?MASTER, timer, seconds, [60]),

View File

@ -24,11 +24,11 @@
all() -> [ignore_loop, t_session_all].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
ignore_loop(_Config) ->
application:set_env(emqx, mqtt_ignore_loop_deliver, true),

View File

@ -37,15 +37,14 @@ all() -> [t_random_basic,
t_sticky,
t_hash,
t_not_so_sticky,
t_no_connection_nack
].
t_no_connection_nack].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
t_random_basic(_) ->
ok = ensure_config(random),
@ -258,4 +257,3 @@ ensure_config(Strategy, AckEnabled) ->
subscribed(Group, Topic, Pid) ->
lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)).

View File

@ -43,11 +43,11 @@ groups() ->
t_lookup_session_pids]}].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
init_per_testcase(_All, Config) ->
{ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => self()}),

View File

@ -34,11 +34,11 @@
all() -> [t_sys_mon].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
t_sys_mon(_Config) ->
lists:foreach(fun({PidOrPort, SysMonName,ValidateInfo, InfoOrPort}) ->
@ -64,4 +64,3 @@ validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) ->
concat_str(ValidateInfo, InfoOrPort, Info) ->
WarnInfo = io_lib:format(ValidateInfo, [InfoOrPort, Info]),
lists:flatten(WarnInfo).

View File

@ -23,4 +23,6 @@ t_new(_) ->
ok = emqx_tables:new(test_table, [{read_concurrency, true}]),
ets:insert(test_table, {key, 100}),
ok = emqx_tables:new(test_table, [{read_concurrency, true}]),
100 = ets:lookup_element(test_table, key, 2).
100 = ets:lookup_element(test_table, key, 2),
ok = emqx_tables:delete(test_table),
ok = emqx_tables:delete(test_table).

View File

@ -24,11 +24,11 @@
all() -> [start_traces].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
start_traces(_Config) ->
{ok, T} = emqx_client:start_link([{host, "localhost"},

View File

@ -23,6 +23,15 @@
-include_lib("common_test/include/ct.hrl").
-define(WAIT(PATTERN, TIMEOUT),
receive
PATTERN ->
ok
after
TIMEOUT ->
error(timeout)
end).
all() -> [t_api].
init_per_suite(Config) ->
@ -33,18 +42,36 @@ end_per_suite(_Config) ->
application:stop(sasl).
t_api(_) ->
gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}),
{ok, _} = emqx_vm_mon:start_link([{check_interval, 1},
{process_high_watermark, 0},
{process_low_watermark, 0.6}]),
timer:sleep(2000),
?assertEqual(true, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())),
emqx_vm_mon:set_process_high_watermark(0.8),
emqx_vm_mon:set_process_low_watermark(0.75),
?assertEqual(0.8, emqx_vm_mon:get_process_high_watermark()),
?assertEqual(0.75, emqx_vm_mon:get_process_low_watermark()),
timer:sleep(3000),
?assertEqual(false, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())),
emqx_vm_mon:set_check_interval(20),
?assertEqual(20, emqx_vm_mon:get_check_interval()),
ok.
meck:new(alarm_handler, [passthrough, no_history]),
Tester = self(),
Ref = make_ref(),
try
meck:expect(alarm_handler, set_alarm,
fun(What) ->
Res = meck:passthrough([What]),
Tester ! {Ref, set_alarm, What},
Res
end),
meck:expect(alarm_handler, clear_alarm,
fun(What) ->
Res = meck:passthrough([What]),
Tester ! {Ref, clear_alarm, What},
Res
end),
gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}),
{ok, _} = emqx_vm_mon:start_link([{check_interval, 1},
{process_high_watermark, 0},
{process_low_watermark, 0.6}]),
?WAIT({Ref, set_alarm, {too_many_processes, _Count}}, 2000),
?assertEqual(true, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())),
emqx_vm_mon:set_process_high_watermark(0.8),
emqx_vm_mon:set_process_low_watermark(0.75),
?assertEqual(0.8, emqx_vm_mon:get_process_high_watermark()),
?assertEqual(0.75, emqx_vm_mon:get_process_low_watermark()),
?WAIT({Ref, clear_alarm, too_many_processes}, 3000),
?assertEqual(false, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())),
emqx_vm_mon:set_check_interval(20),
?assertEqual(20, emqx_vm_mon:get_check_interval())
after
meck:unload(alarm_handler)
end.

View File

@ -39,11 +39,11 @@ all() ->
[t_ws_connect_api].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
t_ws_connect_api(_Config) ->
WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
@ -73,16 +73,16 @@ raw_recv_pase(P) ->
version => ?MQTT_PROTO_V4} }).
t_info(InfoData) ->
?assertEqual(websocket, proplists:get_value(socktype, InfoData)),
?assertEqual(running, proplists:get_value(conn_state, InfoData)),
?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, InfoData)),
?assertEqual(<<"admin">>, proplists:get_value(username, InfoData)),
?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, InfoData)).
?assertEqual(websocket, maps:get(socktype, InfoData)),
?assertEqual(running, maps:get(conn_state, InfoData)),
?assertEqual(<<"mqtt_client">>, maps:get(client_id, InfoData)),
?assertEqual(<<"admin">>, maps:get(username, InfoData)),
?assertEqual(<<"MQTT">>, maps:get(proto_name, InfoData)).
t_attrs(AttrsData) ->
?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, AttrsData)),
?assertEqual(emqx_ws_connection, proplists:get_value(conn_mod, AttrsData)),
?assertEqual(<<"admin">>, proplists:get_value(username, AttrsData)).
?assertEqual(<<"mqtt_client">>, maps:get(client_id, AttrsData)),
?assertEqual(emqx_ws_connection, maps:get(conn_mod, AttrsData)),
?assertEqual(<<"admin">>, maps:get(username, AttrsData)).
t_stats(StatsData) ->
?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0),

View File

@ -35,4 +35,3 @@ t_set_get_env(_) ->
emqx_zone:force_reload(),
?assertEqual(val, emqx_zone:get_env(zone1, key)),
emqx_zone:stop().

View File

@ -36,7 +36,7 @@ new(WsUrl, PPid) ->
addr = Addr,
path = "/" ++ Path,
ppid = PPid},
spawn(fun () ->
spawn(fun() ->
start_conn(State)
end).

3
vars
View File

@ -5,5 +5,4 @@
{platform_etc_dir, "etc"}.
{platform_lib_dir, "lib"}.
{platform_log_dir, "log"}.
{platform_plugins_dir, "plugins"}.
{platform_plugins_dir, "plugins"}.