diff --git a/.gitignore b/.gitignore index ab0cbe156..16369a576 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ rebar.lock xrefr erlang.mk *.coverdata +etc/emqx.conf.rendered diff --git a/.travis.yml b/.travis.yml index f2c483cc6..abe6f7a6a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,18 +1,17 @@ language: erlang otp_release: - - 21.2 + - 21.3 before_install: - git clone https://github.com/erlang/rebar3.git; cd rebar3; ./bootstrap; sudo mv rebar3 /usr/local/bin/; cd .. script: - - make dep-vsn-check - - make rebar-compile - - make rebar-xref - - make rebar-eunit - - make rebar-ct - - make rebar-cover + - make compile + - make xref + - make eunit + - make ct + - make cover after_success: - make coveralls diff --git a/Makefile b/Makefile index f83b07330..0aa6a03c8 100644 --- a/Makefile +++ b/Makefile @@ -1,34 +1,9 @@ -.PHONY: plugins tests +## shallow clone for speed -PROJECT = emqx -PROJECT_DESCRIPTION = EMQ X Broker +REBAR_GIT_CLONE_OPTIONS += --depth 1 +export REBAR_GIT_CLONE_OPTIONS -DEPS = jsx gproc gen_rpc ekka esockd cowboy replayq - -dep_jsx = git-emqx https://github.com/talentdeficit/jsx 2.9.0 -dep_gproc = git-emqx https://github.com/uwiger/gproc 0.8.0 -dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.1 -dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4 -dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.4 -dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.6.1 -dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1 - -NO_AUTOPATCH = cuttlefish - -ERLC_OPTS += +debug_info -DAPPLICATION=emqx - -BUILD_DEPS = cuttlefish -dep_cuttlefish = git-emqx https://github.com/emqx/cuttlefish v2.2.1 - -TEST_DEPS = meck -dep_meck = hex-emqx 0.8.13 - -TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx - -EUNIT_OPTS = verbose - -# CT_SUITES = emqx_bridge -## emqx_trie emqx_router emqx_frame emqx_mqtt_compat +# CT_SUITES = emqx_trie emqx_router emqx_frame emqx_mqtt_compat CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ @@ -38,34 +13,71 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \ - emqx_vm_mon emqx_alarm_handler emqx_rpc + emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping CT_NODE_NAME = emqxct@127.0.0.1 -CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) -COVER = true +compile: + @rebar3 compile -PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl compiler mnesia -DIALYZER_DIRS := ebin/ -DIALYZER_OPTS := --verbose --statistics -Werror_handling -Wrace_conditions #-Wunmatched_returns +clean: distclean -$(shell [ -f erlang.mk ] || curl -s -o erlang.mk https://raw.githubusercontent.com/emqx/erlmk/master/erlang.mk) -include erlang.mk +## Cuttlefish escript is built by default when cuttlefish app (as dependency) was built +CUTTLEFISH_SCRIPT := _build/default/lib/cuttlefish/cuttlefish -clean:: gen-clean +.PHONY: cover +cover: + @rebar3 cover -.PHONY: gen-clean -gen-clean: - @rm -rf bbmustache - @rm -f etc/gen.emqx.conf +.PHONY: coveralls +coveralls: + @rebar3 coveralls send + +.PHONY: xref +xref: + @rebar3 xref + +.PHONY: deps +deps: + @rebar3 get-deps + +.PHONY: eunit +eunit: + @rebar3 eunit -v + +.PHONY: ct-setup +ct-setup: + rebar3 as test compile + @mkdir -p data + @if [ ! -f data/loaded_plugins ]; then touch data/loaded_plugins; fi + @ln -s -f '../../../../etc' _build/test/lib/emqx/ + @ln -s -f '../../../../data' _build/test/lib/emqx/ + +.PHONY: ct +ct: ct-setup + @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',') + +## Run one single CT with rebar3 +## e.g. make ct-one-suite suite=emqx_bridge +.PHONY: ct-one-suite +ct-one-suite: ct-setup + @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE + +.PHONY: app.config +app.config: $(CUTTLEFISH_SCRIPT) etc/gen.emqx.conf + $(CUTTLEFISH_SCRIPT) -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/ + +$(CUTTLEFISH_SCRIPT): + @rebar3 get-deps + @if [ ! -f cuttlefish ]; then make -C _build/default/lib/cuttlefish; fi bbmustache: - $(verbose) git clone https://github.com/soranoba/bbmustache.git && cd bbmustache && ./rebar3 compile && cd .. + @git clone https://github.com/soranoba/bbmustache.git && cd bbmustache && ./rebar3 compile && cd .. # This hack is to generate a conf file for testing # relx overlay is used for release etc/gen.emqx.conf: bbmustache etc/emqx.conf - $(verbose) erl -noshell -pa bbmustache/_build/default/lib/bbmustache/ebin -eval \ + @erl -noshell -pa bbmustache/_build/default/lib/bbmustache/ebin -eval \ "{ok, Temp} = file:read_file('etc/emqx.conf'), \ {ok, Vars0} = file:consult('vars'), \ Vars = [{atom_to_list(N), list_to_binary(V)} || {N, V} <- Vars0], \ @@ -73,51 +85,12 @@ etc/gen.emqx.conf: bbmustache etc/emqx.conf ok = file:write_file('etc/gen.emqx.conf', Targ), \ halt(0)." -CUTTLEFISH_SCRIPT = _build/default/lib/cuttlefish/cuttlefish +.PHONY: gen-clean +gen-clean: + @rm -rf bbmustache + @rm -f etc/gen.emqx.conf etc/emqx.conf.rendered -app.config: $(CUTTLEFISH_SCRIPT) etc/gen.emqx.conf - $(verbose) $(CUTTLEFISH_SCRIPT) -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/ - -ct: app.config - -rebar-cover: - @rebar3 cover - -coveralls: - @rebar3 coveralls send - - -$(CUTTLEFISH_SCRIPT): rebar-deps - @if [ ! -f cuttlefish ]; then make -C _build/default/lib/cuttlefish; fi - -rebar-xref: - @rebar3 xref - -rebar-deps: - @rebar3 get-deps - -rebar-eunit: $(CUTTLEFISH_SCRIPT) - @rebar3 eunit -v - -rebar-compile: - @rebar3 compile - -rebar-ct-setup: app.config - @rebar3 as test compile - @ln -s -f '../../../../etc' _build/test/lib/emqx/ - @ln -s -f '../../../../data' _build/test/lib/emqx/ - -rebar-ct: rebar-ct-setup - @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',') - -## Run one single CT with rebar3 -## e.g. make ct-one-suite suite=emqx_bridge -ct-one-suite: rebar-ct-setup - @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE - -rebar-clean: - @rebar3 clean - -distclean:: +.PHONY: distclean +distclean: gen-clean @rm -rf _build cover deps logs log data - @rm -f rebar.lock compile_commands.json cuttlefish + @rm -f rebar.lock compile_commands.json cuttlefish erl_crash.dump diff --git a/etc/emqx.conf b/etc/emqx.conf index 15ad24dd1..aa4acec3b 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -368,6 +368,12 @@ log.dir = {{ platform_log_dir }} ## Default: emqx.log log.file = emqx.log +## Limits the total number of characters printed for each log event. +## +## Value: Integer +## Default: 8192 +log.chars_limit = 8192 + ## Maximum size of each log file. ## ## Value: Number @@ -438,6 +444,17 @@ acl_cache_ttl = 1m ## Default: ignore acl_deny_action = ignore +## The cleanning interval for flapping +## +## Value: Duration +## -d: day +## -h: hour +## -m: minute +## -s: second +## +## Default: 1h, 1 hour +## flapping_clean_interval = 1h + ##-------------------------------------------------------------------- ## MQTT Protocol ##-------------------------------------------------------------------- @@ -533,6 +550,18 @@ zone.external.acl_deny_action = ignore ## Numbers delimited by `|'. Zero or negative is to disable. zone.external.force_gc_policy = 1000|1MB +## Max message queue length and total heap size to force shutdown +## connection/session process. +## Message queue here is the Erlang process mailbox, but not the number +## of queued MQTT messages of QoS 1 and 2. +## +## Numbers delimited by `|'. Zero or negative is to disable. +## +## Default: +## - 8000|800MB on ARCH_64 system +## - 1000|100MB on ARCH_32 sytem +## zone.external.force_shutdown_policy = 8000|800MB + ## Maximum MQTT packet size allowed. ## ## Value: Bytes @@ -650,11 +679,35 @@ zone.external.mqueue_priorities = none ## Value: highest | lowest zone.external.mqueue_default_priority = highest -## Whether to enqueue Qos0 messages. +## Whether to enqueue QoS0 messages. ## ## Value: false | true zone.external.mqueue_store_qos0 = true +## Whether to turn on flapping detect +## +## Value: on | off +zone.external.enable_flapping_detect = off + +## The times of state change per min, specifying the threshold which is used to +## detect if the connection starts flapping +## +## Value: number +zone.external.flapping_threshold = 10, 1m + +## Flapping expiry interval for connections. +## This config entry is used to determine when the connection +## will be unbanned. +## +## Value: Duration +## -d: day +## -h: hour +## -m: minute +## -s: second +## +## Default: 1h, 1 hour +zone.external.flapping_banned_expiry_interval = 1h + ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## ## Variables in mountpoint path: @@ -726,6 +779,30 @@ zone.internal.max_mqueue_len = 1000 ## Value: false | true zone.internal.mqueue_store_qos0 = true +## Whether to turn on flapping detect +## +## Value: on | off +zone.internal.enable_flapping_detect = off + +## The times of state change per second, specifying the threshold which is used to +## detect if the connection starts flapping +## +## Value: number +zone.internal.flapping_threshold = 10, 1m + +## Flapping expiry interval for connections. +## This config entry is used to determine when the connection +## will be unbanned. +## +## Value: Duration +## -d: day +## -h: hour +## -m: minute +## -s: second +## +## Default: 1h, 1 hour +zone.internal.flapping_banned_expiry_interval = 1h + ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## ## Variables in mountpoint path: @@ -786,8 +863,11 @@ listener.tcp.external.zone = external ## Rate limit for the external MQTT/TCP connections. Format is 'rate,burst'. ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * active_n)/2` ## Unit: Bps -## listener.tcp.external.rate_limit = 1024,4096 +## listener.tcp.external.rate_limit = 1024,52428800 ## The access control rules for the MQTT/TCP listener. ## @@ -917,8 +997,11 @@ listener.tcp.internal.zone = internal ## See: listener.tcp.$name.rate_limit ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * active_n)/2` ## Unit: Bps -## listener.tcp.internal.rate_limit = 1000000,2000000 +## listener.tcp.internal.rate_limit = 1000000,524288000 ## The TCP backlog of internal MQTT/TCP Listener. ## @@ -1027,8 +1110,11 @@ listener.ssl.external.access.1 = allow all ## Rate limit for the external MQTT/SSL connections. ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * active_n)/2` ## Unit: Bps -## listener.ssl.external.rate_limit = 1024,4096 +## listener.ssl.external.rate_limit = 1024,52428800 ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind ## HAProxy or Nginx. @@ -1261,8 +1347,11 @@ listener.ws.external.max_conn_rate = 1000 ## Rate limit for the MQTT/WebSocket connections. ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * 1)/2` ## Unit: Bps -## listener.ws.external.rate_limit = 1024,4096 +## listener.ws.external.rate_limit = 1024,524288 ## Zone of the external MQTT/WebSocket listener belonged to. ## @@ -1469,8 +1558,11 @@ listener.wss.external.max_conn_rate = 1000 ## Rate limit for the MQTT/WebSocket/SSL connections. ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * 1)/2` ## Unit: Bps -## listener.wss.external.rate_limit = 1024,4096 +## listener.wss.external.rate_limit = 1024,524288 ## Zone of the external MQTT/WebSocket/SSL listener belonged to. ## @@ -1784,13 +1876,13 @@ listener.wss.external.send_timeout_close = on ## SSL Ciphers used by the bridge. ## ## Value: String -#bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 +## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 ## Ciphers for TLS PSK. ## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot ## be configured at the same time. ## See 'https://tools.ietf.org/html/rfc4279#section-2'. -#bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA +## bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA ## Ping interval of a down bridge. ## @@ -2122,7 +2214,7 @@ broker.session_locking_strategy = quorum ## ## Value: Enum ## - random -## - round_robbin +## - round_robin ## - sticky ## - hash broker.shared_subscription_strategy = random @@ -2145,19 +2237,43 @@ broker.route_batch_clean = off ## System Monitor ##-------------------------------------------------------------------- -## Enable Long GC monitoring. +## Enable Long GC monitoring. Disable if the value is 0. ## Notice: don't enable the monitor in production for: ## https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 ## -## Value: true | false -sysmon.long_gc = false +## Value: Duration +## - h: hour +## - m: minute +## - s: second +## - ms: milliseconds +## +## Examples: +## - 2h: 2 hours +## - 30m: 30 minutes +## - 0.1s: 0.1 seconds +## - 100ms : 100 milliseconds +## +## Default: 0ms +sysmon.long_gc = 0 ## Enable Long Schedule(ms) monitoring. ## ## See: http://erlang.org/doc/man/erlang.html#system_monitor-2 ## -## Value: Number -sysmon.long_schedule = 240 +## Value: Duration +## - h: hour +## - m: minute +## - s: second +## - ms: milliseconds +## +## Examples: +## - 2h: 2 hours +## - 30m: 30 minutes +## - 0.1s: 0.1 seconds +## - 100ms: 100 milliseconds +## +## Default: 0ms +sysmon.long_schedule = 240ms ## Enable Large Heap monitoring. ## diff --git a/include/types.hrl b/include/types.hrl index 8032bfe7e..85a9aadf0 100644 --- a/include/types.hrl +++ b/include/types.hrl @@ -19,4 +19,3 @@ -type(ok_or_error(Reason) :: ok | {error, Reason}). -type(ok_or_error(Value, Reason) :: {ok, Value} | {error, Reason}). - diff --git a/priv/emqx.schema b/priv/emqx.schema index e6944ba79..804ab041d 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -270,8 +270,7 @@ end}. X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes; _ -> undefined end - end -}. + end}. {validator, "zdbbl_range", "must be between 1KB and 2097151KB", fun(ZDBBL) -> @@ -401,7 +400,7 @@ end}. {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} ]}. -{mapping, "log.primary_level", "emqx.primary_log_level", [ +{mapping, "log.primary_level", "kernel.primary_log_level", [ {default, error}, {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} ]}. @@ -421,6 +420,11 @@ end}. {datatype, file} ]}. +{mapping, "log.chars_limit", "kernel.logger", [ + {default, 8192}, + {datatype, integer} +]}. + {mapping, "log.rotation.size", "kernel.logger", [ {default, "10MB"}, {datatype, bytesize} @@ -458,13 +462,13 @@ end}. hidden ]}. -{translation, "emqx.primary_log_level", fun(Conf) -> - cuttlefish:conf_get("log.level", Conf) -end}. - {translation, "kernel.logger", fun(Conf) -> LogTo = cuttlefish:conf_get("log.to", Conf), LogLevel = cuttlefish:conf_get("log.level", Conf), + CharsLimit = case cuttlefish:conf_get("log.chars_limit", Conf) of + -1 -> unlimited; + V -> V + end, Formatter = {emqx_logger_formatter, #{template => [time," [",level,"] ", @@ -475,7 +479,8 @@ end}. [{peername, [peername," "], []}]}, - msg,"\n"]}}, + msg,"\n"], + chars_limit => CharsLimit}}, FileConf = fun(Filename) -> #{type => wrap, file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename), @@ -574,6 +579,11 @@ end}. {datatype, {enum, [ignore, disconnect]}} ]}. +%% @doc time interval to clean flapping records +{mapping, "flapping_clean_interval", "emqx.flapping_clean_interval", [ + {datatype, {duration, ms}} +]}. + {validator, "range:gt_0", "must greater than 0", fun(X) -> X > 0 end }. @@ -814,6 +824,18 @@ end}. {datatype, {enum, [true, false]}} ]}. +{mapping, "zone.$name.enable_flapping_detect", "emqx.zones", [ + {datatype, flag} +]}. + +{mapping, "zone.$name.flapping_threshold", "emqx.zones", [ + {datatype, string} +]}. + +{mapping, "zone.$name.flapping_banned_expiry_interval", "emqx.zones", [ + {datatype, {duration, s}} +]}. + %% @doc Force connection/session process GC after this number of %% messages | bytes passed through. %% Numbers delimited by `|'. Zero or negative is to disable. @@ -828,7 +850,7 @@ end}. %% of queued MQTT messages of QoS 1 and 2. %% Zero or negative is to disable. {mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [ - {default, "0 | 0MB"}, + {default, "default"}, {datatype, string} ]}. @@ -845,6 +867,15 @@ end}. {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> {mqtt_retain_available, Val}; + ("flapping_threshold", Val) -> + [Limit, Duration] = string:tokens(Val, ", "), + FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of + Min when is_integer(Min) -> + {list_to_integer(Limit), Min}; + {error, Reason} -> + error(Reason) + end, + {flapping_threshold, FlappingThreshold}; ("wildcard_subscription", Val) -> {mqtt_wildcard_subscription, Val}; ("shared_subscription", Val) -> @@ -868,15 +899,34 @@ end}. count => list_to_integer(Count)} end, {force_gc_policy, GcPolicy}; + ("force_shutdown_policy", "default") -> + {DefaultLen, DefaultSize} = + case erlang:system_info(wordsize) of + 8 -> % arch_64 + {8000, cuttlefish_bytesize:parse("800MB")}; + 4 -> % arch_32 + {1000, cuttlefish_bytesize:parse("100MB")} + end, + {force_shutdown_policy, #{message_queue_len => DefaultLen, + max_heap_size => DefaultSize}}; ("force_shutdown_policy", Val) -> [Len, Siz] = string:tokens(Val, "| "), - ShutdownPolicy = case cuttlefish_bytesize:parse(Siz) of - {error, Reason} -> - error(Reason); - Siz1 -> - #{message_queue_len => list_to_integer(Len), - max_heap_size => Siz1} - end, + MaxSiz = case erlang:system_info(wordsize) of + 8 -> % arch_64 + (1 bsl 59) - 1; + 4 -> % arch_32 + (1 bsl 27) - 1 + end, + ShutdownPolicy = + case cuttlefish_bytesize:parse(Siz) of + {error, Reason} -> + error(Reason); + Siz1 when Siz1 > MaxSiz -> + cuttlefish:invalid(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz])); + Siz1 -> + #{message_queue_len => list_to_integer(Len), + max_heap_size => Siz1} + end, {force_shutdown_policy, ShutdownPolicy}; ("mqueue_priorities", Val) -> case Val of @@ -1996,11 +2046,11 @@ end}. %% @doc Shared Subscription Dispatch Strategy. {mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [ - {default, round_robbin}, + {default, round_robin}, {datatype, {enum, [random, %% randomly pick a subscriber - round_robbin, %% round robin alive subscribers one message after another + round_robin, %% round robin alive subscribers one message after another sticky, %% pick a random subscriber and stick to it hash %% hash client ID to a group member ]}} @@ -2024,14 +2074,14 @@ end}. %% @doc Long GC, don't monitor in production mode for: %% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 {mapping, "sysmon.long_gc", "emqx.sysmon", [ - {default, false}, - {datatype, {enum, [true, false]}} + {default, 0}, + {datatype, [integer, {duration, ms}]} ]}. %% @doc Long Schedule(ms) {mapping, "sysmon.long_schedule", "emqx.sysmon", [ - {default, 1000}, - {datatype, integer} + {default, 240}, + {datatype, [integer, {duration, ms}]} ]}. %% @doc Large Heap @@ -2053,11 +2103,8 @@ end}. ]}. {translation, "emqx.sysmon", fun(Conf) -> - [{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)}, - {long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)}, - {large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)}, - {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, - {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] + Configs = cuttlefish_variable:filter_by_prefix("sysmon", Conf), + [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] end}. %%-------------------------------------------------------------------- @@ -2095,12 +2142,8 @@ end}. ]}. {translation, "emqx.os_mon", fun(Conf) -> - [{cpu_check_interval, cuttlefish:conf_get("os_mon.cpu_check_interval", Conf)}, - {cpu_high_watermark, cuttlefish:conf_get("os_mon.cpu_high_watermark", Conf)}, - {cpu_low_watermark, cuttlefish:conf_get("os_mon.cpu_low_watermark", Conf)}, - {mem_check_interval, cuttlefish:conf_get("os_mon.mem_check_interval", Conf)}, - {sysmem_high_watermark, cuttlefish:conf_get("os_mon.sysmem_high_watermark", Conf)}, - {procmem_high_watermark, cuttlefish:conf_get("os_mon.procmem_high_watermark", Conf)}] + Configs = cuttlefish_variable:filter_by_prefix("os_mon", Conf), + [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] end}. %%-------------------------------------------------------------------- @@ -2122,7 +2165,6 @@ end}. ]}. {translation, "emqx.vm_mon", fun(Conf) -> - [{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)}, - {process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)}, - {process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}] -end}. \ No newline at end of file + Configs = cuttlefish_variable:filter_by_prefix("vm_mon", Conf), + [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] +end}. diff --git a/rebar.config b/rebar.config index d879b493c..3bbe75002 100644 --- a/rebar.config +++ b/rebar.config @@ -1,16 +1,12 @@ -{deps, [{jsx, "2.9.0"}, - {gproc, "0.8.0"}, - {cowboy, "2.6.1"}, - {meck, "0.8.13"} %% temp workaround for version check - ]}. - -%% appended to deps in rebar.config.script -{github_emqx_deps, - [{gen_rpc, "2.3.1"}, - {ekka, "v0.5.4"}, - {replayq, "v0.1.1"}, - {esockd, "v5.4.4"}, - {cuttlefish, "v2.2.1"} +{deps, + [ {jsx, "2.9.0"} % hex + , {cowboy, "2.6.1"} % hex + , {gproc, "0.8.0"} % hex + , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.4"}}} + , {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.1.1"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {tag, "v5.4.4"}}} + , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. {edoc_opts, [{preprocess, true}]}. @@ -28,3 +24,13 @@ {cover_export_enabled, true}. {plugins, [coveralls]}. + +{profiles, + [{test, + [{deps, + [ {meck, "0.8.13"} % hex + , {bbmustache, "1.7.0"} % hex + , {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.1.1"}}} + ]} + ]} + ]}. diff --git a/rebar.config.script b/rebar.config.script index 2c04540dc..558910385 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -1,11 +1,3 @@ -CONFIG0 = case os:getenv("REBAR_GIT_CLONE_OPTIONS") of - "--depth 1" -> - CONFIG; - _ -> - os:putenv("REBAR_GIT_CLONE_OPTIONS", "--depth 1"), - CONFIG - end, - CONFIG1 = case os:getenv("TRAVIS") of "true" -> JobId = os:getenv("TRAVIS_JOB_ID"), @@ -16,10 +8,4 @@ CONFIG1 = case os:getenv("TRAVIS") of CONFIG end, -{_, Deps} = lists:keyfind(deps, 1, CONFIG1), -{_, OurDeps} = lists:keyfind(github_emqx_deps, 1, CONFIG1), -UrlPrefix = "https://github.com/emqx/", -NewDeps = Deps ++ [{Name, {git, UrlPrefix ++ atom_to_list(Name), {branch, Branch}}} || {Name, Branch} <- OurDeps], -CONFIG2 = lists:keystore(deps, 1, CONFIG1, {deps, NewDeps}), - -CONFIG2. +CONFIG1. diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 501d5f0db..d6aab26d4 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -15,7 +15,6 @@ -module(emqx_access_control). -include("emqx.hrl"). --include("logger.hrl"). -export([authenticate/1]). @@ -29,14 +28,12 @@ -spec(authenticate(emqx_types:credentials()) -> {ok, emqx_types:credentials()} | {error, term()}). authenticate(Credentials) -> - detect_anonymous_permission(Credentials, fun() -> - case emqx_hooks:run_fold('client.authenticate', [], init_auth_result(Credentials)) of - #{auth_result := success} = NewCredentials -> - {ok, NewCredentials}; - NewCredentials -> - {error, maps:get(auth_result, NewCredentials, unknown_error)} - end - end). + case emqx_hooks:run_fold('client.authenticate', [], init_auth_result(Credentials)) of + #{auth_result := success} = NewCredentials -> + {ok, NewCredentials}; + NewCredentials -> + {error, maps:get(auth_result, NewCredentials, unknown_error)} + end. %% @doc Check ACL -spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_types:topic()) -> allow | deny). @@ -69,22 +66,8 @@ reload_acl() -> emqx_mod_acl_internal:reload_acl(). init_auth_result(Credentials) -> - case anonymous_permission(Credentials) of - true -> Credentials#{auth_result => success}; - false -> Credentials#{auth_result => not_authorized} + case emqx_zone:get_env(maps:get(zone, Credentials, undefined), allow_anonymous, false) of + true -> Credentials#{auth_result => success, anonymous => true}; + false -> Credentials#{auth_result => not_authorized, anonymous => false} end. -detect_anonymous_permission(#{username := undefined, - password := undefined} = Credentials, Fun) -> - case anonymous_permission(Credentials) of - true -> {ok, Credentials}; - false -> Fun() - end; - -detect_anonymous_permission(_Credentials, Fun) -> - Fun(). - -anonymous_permission(Credentials) -> - emqx_zone:get_env(maps:get(zone, Credentials, undefined), - allow_anonymous, false). - diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 2ef660521..bc9d58408 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -31,7 +31,7 @@ start(_Type, _Args) -> %% kernel config `logger_level` before starting the erlang vm. %% This is because the latter approach an annoying debug msg will be printed out: %% "[debug] got_unexpected_message {'EXIT',<0.1198.0>,normal}" - logger:set_primary_config(level, application:get_env(emqx, primary_log_level, error)), + logger:set_primary_config(level, application:get_env(kernel, primary_log_level, error)), print_banner(), ekka:start(), diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index ab21bce50..126562401 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -70,13 +70,13 @@ check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) - orelse ets:member(?TAB, {username, Username}) orelse ets:member(?TAB, {ipaddr, IPAddr}). --spec(add(#banned{}) -> ok). +-spec(add(emqx_types:banned()) -> ok). add(Banned) when is_record(Banned, banned) -> mnesia:dirty_write(?TAB, Banned). -spec(delete({client_id, emqx_types:client_id()} - | {username, emqx_types:username()} - | {peername, emqx_types:peername()}) -> ok). + | {username, emqx_types:username()} + | {peername, emqx_types:peername()}) -> ok). delete(Key) -> mnesia:dirty_delete(?TAB, Key). @@ -127,4 +127,3 @@ expire_banned_items(Now) -> mnesia:delete_object(?TAB, B, sticky_write); (_, _Acc) -> ok end, ok, ?TAB). - diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 6af56d11b..a01837659 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -120,6 +120,7 @@ -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). -define(DEFAULT_SEG_BYTES, (1 bsl 20)). -define(NO_BRIDGE_HANDLER, undefined). +-define(NO_FROM, undefined). -define(maybe_send, {next_event, internal, maybe_send}). %% @doc Start a bridge worker. Supported configs: @@ -185,8 +186,10 @@ ensure_stopped(Id, Timeout) -> stop(Pid) -> gen_statem:stop(Pid). -status(Pid) -> - gen_statem:call(Pid, status). +status(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, status); +status(Id) -> + status(name(Id)). %% @doc This function is to be evaluated on message/batch receiver side. -spec import_batch(batch(), fun(() -> ok)) -> ok. @@ -297,8 +300,7 @@ standing_by(enter, _, #{start_type := auto}) -> standing_by(enter, _, #{start_type := manual}) -> keep_state_and_data; standing_by({call, From}, ensure_started, State) -> - {next_state, connecting, State, - [{reply, From, ok}]}; + do_connect({call, From}, standing_by, State); standing_by(state_timeout, do_connect, State) -> {next_state, connecting, State}; standing_by(info, Info, State) -> @@ -313,24 +315,8 @@ standing_by(Type, Content, State) -> connecting(enter, connected, #{reconnect_delay_ms := Timeout}) -> Action = {state_timeout, Timeout, reconnect}, {keep_state_and_data, Action}; -connecting(enter, _, #{reconnect_delay_ms := Timeout, - connect_fun := ConnectFun, - subscriptions := Subs, - forwards := Forwards - } = State) -> - ok = subscribe_local_topics(Forwards), - case ConnectFun(Subs) of - {ok, ConnRef, Conn} -> - ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), - Action = {state_timeout, 0, connected}, - {keep_state, - eval_bridge_handler(State#{ conn_ref => ConnRef - , connection => Conn}, connected), - Action}; - error -> - Action = {state_timeout, Timeout, reconnect}, - {keep_state_and_data, Action} - end; +connecting(enter, _, State) -> + do_connect(enter, connecting, State); connecting(state_timeout, connected, State) -> {next_state, connected, State}; connecting(state_timeout, reconnect, _State) -> @@ -424,7 +410,7 @@ common(StateName, Type, Content, State) -> eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) -> State; eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) -> - _ = Handler(Msg), + Handler(Msg), State. ensure_present(Key, Topic, State) -> @@ -456,6 +442,35 @@ is_topic_present({Topic, _QoS}, Topics) -> is_topic_present(Topic, Topics) -> lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics). +do_connect(Type, StateName, #{ forwards := Forwards + , subscriptions := Subs + , connect_fun := ConnectFun + , reconnect_delay_ms := Timeout + } = State) -> + ok = subscribe_local_topics(Forwards), + From = case StateName of + standing_by -> {call, Pid} = Type, Pid; + connecting -> ?NO_FROM + end, + DoEvent = fun (standing_by, StandingbyAction, _ConnectingAction) -> + StandingbyAction; + (connecting, _StandingbyAction, ConnectingAction) -> + ConnectingAction + end, + case ConnectFun(Subs) of + {ok, ConnRef, Conn} -> + ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), + State0 = State#{conn_ref => ConnRef, connection => Conn}, + State1 = eval_bridge_handler(State0, connected), + StandingbyAction = {next_state, connected, State1, [{reply, From, ok}]}, + ConnectingAction = {keep_state, State1, {state_timeout, 0, connected}}, + DoEvent(StateName, StandingbyAction, ConnectingAction); + {error, Reason} -> + StandingbyAction = {keep_state_and_data, [{reply, From, {error, Reason}}]}, + ConnectingAction = {keep_state_and_data, {state_timeout, Timeout, reconnect}}, + DoEvent(StateName, StandingbyAction, ConnectingAction) + end. + do_ensure_present(forwards, Topic, _) -> ok = subscribe_local_topic(Topic); do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule, @@ -564,9 +579,8 @@ disconnect(#{connection := Conn, connect_module := Module } = State) when Conn =/= undefined -> ok = Module:stop(ConnRef, Conn), - eval_bridge_handler(State#{conn_ref => undefined, - connection => undefined}, - disconnected); + State0 = State#{conn_ref => undefined, connection => undefined}, + eval_bridge_handler(State0, disconnected); disconnect(State) -> eval_bridge_handler(State, disconnected). diff --git a/src/emqx_bridge_connect.erl b/src/emqx_bridge_connect.erl index 37231ca88..8685451ae 100644 --- a/src/emqx_bridge_connect.erl +++ b/src/emqx_bridge_connect.erl @@ -56,7 +56,7 @@ start(Module, Config) -> Config1 = obfuscate(Config), ?LOG(error, "[Bridge connect] Failed to connect with module=~p\n" "config=~p\nreason:~p", [Module, Config1, Reason]), - error + {error, Reason} end. obfuscate(Map) -> @@ -69,4 +69,3 @@ obfuscate(Map) -> is_sensitive(password) -> true; is_sensitive(_) -> false. - diff --git a/src/emqx_bridge_mqtt.erl b/src/emqx_bridge_mqtt.erl index 870efe51e..8a66f77a0 100644 --- a/src/emqx_bridge_mqtt.erl +++ b/src/emqx_bridge_mqtt.erl @@ -56,7 +56,9 @@ start(Config = #{address := Address}) -> ClientConfig = Config#{msg_handler => Handlers, owner => AckCollector, host => Host, - port => Port}, + port => Port, + bridge_mode => true + }, case emqx_client:start_link(ClientConfig) of {ok, Pid} -> case emqx_client:connect(Pid) of diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index b00bb9012..a40e7b2e3 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -74,6 +74,6 @@ drop_bridge(Id) -> ok -> supervisor:delete_child(?SUP, Id); Error -> - ?LOG(error, "[Bridge] Delete bridge failed", [Error]), + ?LOG(error, "[Bridge] Delete bridge failed, error : ~p", [Error]), Error end. diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 371b020f1..cd83e61ad 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -88,7 +88,7 @@ ]). %% Default timeout --define(DEFAULT_KEEPALIVE, 60000). +-define(DEFAULT_KEEPALIVE, 60). -define(DEFAULT_ACK_TIMEOUT, 30000). -define(DEFAULT_CONNECT_TIMEOUT, 60000). @@ -503,7 +503,7 @@ init([{username, Username} | Opts], State) -> init([{password, Password} | Opts], State) -> init(Opts, State#state{password = iolist_to_binary(Password)}); init([{keepalive, Secs} | Opts], State) -> - init(Opts, State#state{keepalive = timer:seconds(Secs)}); + init(Opts, State#state{keepalive = Secs}); init([{proto_ver, v3} | Opts], State) -> init(Opts, State#state{proto_ver = ?MQTT_PROTO_V3, proto_name = <<"MQIsdp">>}); @@ -928,6 +928,11 @@ handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> ?LOG(error, "[Client] Got tcp error: ~p", [Reason]), {stop, {shutdown, Reason}, State}; +handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) -> + ?LOG(info, "[Client] State: ~s, Unexpected Event: (info, ~p)", + [StateName, EventContent]), + keep_state_and_data; + handle_event(EventType, EventContent, StateName, _StateData) -> ?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)", [StateName, EventType, EventContent]), @@ -1021,11 +1026,11 @@ publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), end. ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) -> - ensure_keepalive_timer(timer:seconds(Secs), State); + ensure_keepalive_timer(timer:seconds(Secs), State#state{keepalive = Secs}); ensure_keepalive_timer(State = #state{keepalive = 0}) -> State; ensure_keepalive_timer(State = #state{keepalive = I}) -> - ensure_keepalive_timer(I, State). + ensure_keepalive_timer(timer:seconds(I), State). ensure_keepalive_timer(I, State) when is_integer(I) -> State#state{keepalive_timer = erlang:start_timer(I, self(), keepalive)}. diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index a915fdddb..2a81e0bcb 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -207,4 +207,3 @@ stats_fun() -> undefined -> ok; Size -> emqx_stats:setstat('connections/count', 'connections/max', Size) end. - diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index 6b0a8fb15..19940da05 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -30,11 +30,20 @@ init([]) -> shutdown => 1000, type => worker, modules => [emqx_banned]}, + FlappingOption = emqx_config:get_env(flapping_clean_interval, 3600000), + Flapping = #{id => flapping, + start => {emqx_flapping, start_link, [FlappingOption]}, + restart => permanent, + shutdown => 1000, + type => worker, + modules => [emqx_flapping]}, Manager = #{id => manager, start => {emqx_cm, start_link, []}, restart => permanent, shutdown => 2000, type => worker, modules => [emqx_cm]}, - {ok, {{one_for_one, 10, 100}, [Banned, Manager]}}. - + SupFlags = #{strategy => one_for_one, + intensity => 100, + period => 10}, + {ok, {SupFlags, [Banned, Manager, Flapping]}}. diff --git a/src/emqx_config.erl b/src/emqx_config.erl index 3d37fc001..fd80de0c2 100644 --- a/src/emqx_config.erl +++ b/src/emqx_config.erl @@ -19,7 +19,6 @@ %% 1. Store in mnesia database? %% 2. Store in dets? %% 3. Store in data/app.config? -%% -module(emqx_config). @@ -138,4 +137,3 @@ read_(_App) -> error(no_impl). % end, [], Configs), % RequiredCfg ++ OptionalCfg % end. - diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index e9cfd6ae4..8ede37e57 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -87,15 +87,15 @@ info(#state{transport = Transport, rate_limit = RateLimit, pub_limit = PubLimit, proto_state = ProtoState}) -> - ConnInfo = [{socktype, Transport:type(Socket)}, - {peername, Peername}, - {sockname, Sockname}, - {conn_state, ConnState}, - {active_n, ActiveN}, - {rate_limit, rate_limit_info(RateLimit)}, - {pub_limit, rate_limit_info(PubLimit)}], + ConnInfo = #{socktype => Transport:type(Socket), + peername => Peername, + sockname => Sockname, + conn_state => ConnState, + active_n => ActiveN, + rate_limit => rate_limit_info(RateLimit), + pub_limit => rate_limit_info(PubLimit)}, ProtoInfo = emqx_protocol:info(ProtoState), - lists:usort(lists:append(ConnInfo, ProtoInfo)). + maps:merge(ConnInfo, ProtoInfo). rate_limit_info(undefined) -> #{}; @@ -109,10 +109,10 @@ attrs(CPid) when is_pid(CPid) -> attrs(#state{peername = Peername, sockname = Sockname, proto_state = ProtoState}) -> - SockAttrs = [{peername, Peername}, - {sockname, Sockname}], + SockAttrs = #{peername => Peername, + sockname => Sockname}, ProtoAttrs = emqx_protocol:attrs(ProtoState), - lists:usort(lists:append(SockAttrs, ProtoAttrs)). + maps:merge(SockAttrs, ProtoAttrs). %% Conn stats stats(CPid) when is_pid(CPid) -> @@ -242,10 +242,10 @@ connected(info, {deliver, PubOrAck}, State = #state{proto_state = ProtoState}) - connected(info, {keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) -> StatFun = fun() -> - case Transport:getstat(Socket, [recv_oct]) of - {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; - Error -> Error - end + case Transport:getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; + Error -> Error + end end, case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of {ok, KeepAlive} -> @@ -317,9 +317,15 @@ handle(info, {tcp_passive, _Sock}, State) -> ok = activate_socket(NState), {keep_state, NState}; +handle(info, {ssl_passive, _Sock}, State) -> + %% Rate limit here:) + NState = ensure_rate_limit(State), + ok = activate_socket(NState), + {keep_state, NState}; + handle(info, activate_socket, State) -> %% Rate limit timer expired. - ok = activate_socket(State), + ok = activate_socket(State#state{conn_state = running}), {keep_state, State#state{conn_state = running, limit_timer = undefined}}; handle(info, {inet_reply, _Sock, ok}, State) -> @@ -442,6 +448,7 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> {0, Rl1} -> ensure_rate_limit(Limiters, setelement(Pos, State, Rl1)); {Pause, Rl1} -> + ?LOG(debug, "[Connection] Rate limit pause connection ~pms", [Pause]), TRef = erlang:send_after(Pause, self(), activate_socket), setelement(Pos, State#state{conn_state = blocked, limit_timer = TRef}, Rl1) end. @@ -453,11 +460,7 @@ activate_socket(#state{conn_state = blocked}) -> ok; activate_socket(#state{transport = Transport, socket = Socket, active_n = N}) -> - TrueOrN = case Transport:is_ssl(Socket) of - true -> true; %% Cannot set '{active, N}' for SSL:( - false -> N - end, - case Transport:setopts(Socket, [{active, TrueOrN}]) of + case Transport:setopts(Socket, [{active, N}]) of ok -> ok; {error, Reason} -> self() ! {shutdown, Reason}, diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index ed1d3e0c8..099bf3910 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -12,70 +12,125 @@ %% See the License for the specific language governing permissions and %% limitations under the License. -%% @doc TODO: -%% 1. Flapping Detection -%% 2. Conflict Detection? -module(emqx_flapping). -%% Use ets:update_counter??? +-include("emqx.hrl"). +-include("types.hrl"). --behaviour(gen_server). +-behaviour(gen_statem). --export([start_link/0]). +-export([start_link/1]). --export([ is_banned/1 - , banned/1 +%% This module is used to garbage clean the flapping records + +%% gen_statem callbacks +-export([ terminate/3 + , code_change/4 + , init/1 + , initialized/3 + , callback_mode/0 ]). -%% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). +-define(FLAPPING_TAB, ?MODULE). --define(SERVER, ?MODULE). +-export([check/3]). --record(state, {}). +-record(flapping, + { client_id :: binary() + , check_count :: integer() + , timestamp :: integer() + }). --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-type(flapping_record() :: #flapping{}). +-type(flapping_state() :: flapping | ok). -is_banned(ClientId) -> - ets:member(banned, ClientId). -banned(ClientId) -> - ets:insert(banned, {ClientId, os:timestamp()}). +%% @doc This function is used to initialize flapping records +%% the expiry time unit is minutes. +-spec(init_flapping(ClientId :: binary(), Interval :: integer()) -> flapping_record()). +init_flapping(ClientId, Interval) -> + #flapping{client_id = ClientId, + check_count = 1, + timestamp = emqx_time:now_secs() + Interval}. + +%% @doc This function is used to initialize flapping records +%% the expiry time unit is minutes. +-spec(check(Action :: atom(), ClientId :: binary(), + Threshold :: {integer(), integer()}) -> flapping_state()). +check(Action, ClientId, Threshold = {_TimesThreshold, TimeInterval}) -> + check(Action, ClientId, Threshold, init_flapping(ClientId, TimeInterval)). + +-spec(check(Action :: atom(), ClientId :: binary(), + Threshold :: {integer(), integer()}, + InitFlapping :: flapping_record()) -> flapping_state()). +check(Action, ClientId, Threshold, InitFlapping) -> + case ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.check_count, 1}, InitFlapping) of + 1 -> ok; + CheckCount -> + case ets:lookup(?FLAPPING_TAB, ClientId) of + [Flapping] -> + check_flapping(Action, CheckCount, Threshold, Flapping); + _Flapping -> + ok + end + end. + +check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval}, + Flapping = #flapping{ client_id = ClientId + , timestamp = Timestamp }) -> + case emqx_time:now_secs() of + NowTimestamp when NowTimestamp =< Timestamp, + CheckCount > TimesThreshold -> + ets:delete(?FLAPPING_TAB, ClientId), + flapping; + NowTimestamp when NowTimestamp > Timestamp, + Action =:= disconnect -> + ets:delete(?FLAPPING_TAB, ClientId), + ok; + NowTimestamp -> + NewFlapping = Flapping#flapping{timestamp = NowTimestamp + TimeInterval}, + ets:insert(?FLAPPING_TAB, NewFlapping), + ok + end. %%-------------------------------------------------------------------- -%% gen_server callbacks +%% gen_statem callbacks %%-------------------------------------------------------------------- +-spec(start_link(TimerInterval :: [integer()]) -> startlink_ret()). +start_link(TimerInterval) -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []). -init([]) -> - %% ets:new(banned, [public, ordered_set, named_table]), - {ok, #state{}}. +init([TimerInterval]) -> + TabOpts = [ public + , set + , {keypos, 2} + , {write_concurrency, true} + , {read_concurrency, true}], + ok = emqx_tables:new(?FLAPPING_TAB, TabOpts), + {ok, initialized, #{timer_interval => TimerInterval}}. -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. +callback_mode() -> [state_functions, state_enter]. -handle_cast(_Msg, State) -> - {noreply, State}. +initialized(enter, _OldState, #{timer_interval := Time}) -> + Action = {state_timeout, Time, clean_expired_records}, + {keep_state_and_data, Action}; +initialized(state_timeout, clean_expired_records, #{}) -> + clean_expired_records(), + repeat_state_and_data. -handle_info(_Info, State) -> - {noreply, State}. +code_change(_Vsn, State, Data, _Extra) -> + {ok, State, Data}. -terminate(_Reason, _State) -> +terminate(_Reason, _StateName, _State) -> + emqx_tables:delete(?FLAPPING_TAB), ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- - +%% @doc clean expired records in ets +clean_expired_records() -> + NowTime = emqx_time:now_secs(), + MatchSpec = [{{'$1', '$2', '$3'},[{'<', '$3', NowTime}], [true]}], + ets:select_delete(?FLAPPING_TAB, MatchSpec). diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index bdc440215..79c8ed3c8 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -141,16 +141,16 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> {Properties, Rest3} = parse_properties(Rest2, ProtoVer), {ClientId, Rest4} = parse_utf8_string(Rest3), - ConnPacket = #mqtt_packet_connect{proto_name = ProtoName, - proto_ver = ProtoVer, - is_bridge = (BridgeTag =:= 8), - clean_start = bool(CleanStart), - will_flag = bool(WillFlag), - will_qos = WillQoS, - will_retain = bool(WillRetain), - keepalive = KeepAlive, - properties = Properties, - client_id = ClientId}, + ConnPacket = #mqtt_packet_connect{proto_name = ProtoName, + proto_ver = ProtoVer, + is_bridge = (BridgeTag =:= 8), + clean_start = bool(CleanStart), + will_flag = bool(WillFlag), + will_qos = WillQoS, + will_retain = bool(WillRetain), + keepalive = KeepAlive, + properties = Properties, + client_id = ClientId}, {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4), {Username, Rest6} = parse_utf8_string(Rest5, bool(UsernameFlag)), {Passsword, <<>>} = parse_utf8_string(Rest6, bool(PasswordFlag)), diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index d2ce98abc..9789474d7 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -42,12 +42,15 @@ load(Env) -> on_client_connected(#{client_id := ClientId, username := Username, peername := {IpAddr, _}}, ConnAck, ConnAttrs, Env) -> - Attrs = lists:filter(fun({K, _}) -> lists:member(K, ?ATTR_KEYS) end, ConnAttrs), - case emqx_json:safe_encode([{clientid, ClientId}, - {username, Username}, - {ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))}, - {connack, ConnAck}, - {ts, os:system_time(second)} | Attrs]) of + Attrs = maps:filter(fun(K, _) -> + lists:member(K, ?ATTR_KEYS) + end, ConnAttrs), + case emqx_json:safe_encode(Attrs#{clientid => ClientId, + username => Username, + ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)), + connack => ConnAck, + ts => os:system_time(second) + }) of {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); {error, Reason} -> @@ -84,4 +87,3 @@ qos(Env) -> proplists:get_value(qos, Env, 0). reason(Reason) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; reason(_) -> internal_error. - diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index af7a32f9a..1c1185bf0 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -108,13 +108,20 @@ ensure_file(File) -> with_loaded_file(File, SuccFun) -> case read_loaded(File) of - {ok, Names} -> + {ok, Names0} -> + Names = filter_plugins(Names0), SuccFun(Names); {error, Error} -> ?LOG(alert, "[Plugins] Failed to read: ~p, error: ~p", [File, Error]), {error, Error} end. +filter_plugins(Names) -> + lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1}; + ({Name1, true}) -> {true, Name1}; + ({_Name1, false}) -> false + end, Names). + load_plugins(Names, Persistent) -> Plugins = list(), NotFound = Names -- names(Plugins), case NotFound of @@ -264,7 +271,7 @@ plugin_loaded(Name, true) -> case lists:member(Name, Names) of false -> %% write file if plugin is loaded - write_loaded(lists:append(Names, [Name])); + write_loaded(lists:append(Names, [{Name, true}])); true -> ignore end; @@ -277,10 +284,7 @@ plugin_unloaded(_Name, false) -> plugin_unloaded(Name, true) -> case read_loaded() of {ok, Names0} -> - Names = lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1}; - ({Name1, true}) -> {true, Name1}; - ({_Name1, false}) -> false - end, Names0), + Names = filter_plugins(Names0), case lists:member(Name, Names) of true -> write_loaded(lists:delete(Name, Names)); @@ -304,7 +308,7 @@ write_loaded(AppNames) -> case file:open(File, [binary, write]) of {ok, Fd} -> lists:foreach(fun(Name) -> - file:write(Fd, iolist_to_binary(io_lib:format("~s.~n", [Name]))) + file:write(Fd, iolist_to_binary(io_lib:format("~p.~n", [Name]))) end, AppNames); {error, Error} -> ?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]), diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index a8bc0b288..4ac683938 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -40,6 +40,7 @@ -record(pstate, { zone, sendfun, + sockname, peername, peercert, proto_ver, @@ -60,6 +61,7 @@ is_bridge, enable_ban, enable_acl, + enable_flapping_detect, acl_deny_action, recv_stats, send_stats, @@ -68,7 +70,8 @@ ignore_loop, topic_alias_maximum, conn_mod, - credentials + credentials, + ws_cookie }). -opaque(state() :: #pstate{}). @@ -85,32 +88,38 @@ %%------------------------------------------------------------------------------ -spec(init(map(), list()) -> state()). -init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> +init(SocketOpts = #{ sockname := Sockname + , peername := Peername + , peercert := Peercert + , sendfun := SendFun}, Options) -> Zone = proplists:get_value(zone, Options), - #pstate{zone = Zone, - sendfun = SendFun, - peername = Peername, - peercert = Peercert, - proto_ver = ?MQTT_PROTO_V4, - proto_name = <<"MQTT">>, - client_id = <<>>, - is_assigned = false, - conn_pid = self(), - username = init_username(Peercert, Options), - clean_start = false, - topic_aliases = #{}, - packet_size = emqx_zone:get_env(Zone, max_packet_size), - is_bridge = false, - enable_ban = emqx_zone:get_env(Zone, enable_ban, false), - enable_acl = emqx_zone:get_env(Zone, enable_acl), - acl_deny_action = emqx_zone:get_env(Zone, acl_deny_action, ignore), - recv_stats = #{msg => 0, pkt => 0}, - send_stats = #{msg => 0, pkt => 0}, - connected = false, - ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), - topic_alias_maximum = #{to_client => 0, from_client => 0}, - conn_mod = maps:get(conn_mod, SocketOpts, undefined), - credentials = #{}}. + #pstate{zone = Zone, + sendfun = SendFun, + sockname = Sockname, + peername = Peername, + peercert = Peercert, + proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>, + client_id = <<>>, + is_assigned = false, + conn_pid = self(), + username = init_username(Peercert, Options), + clean_start = false, + topic_aliases = #{}, + packet_size = emqx_zone:get_env(Zone, max_packet_size), + is_bridge = false, + enable_ban = emqx_zone:get_env(Zone, enable_ban, false), + enable_acl = emqx_zone:get_env(Zone, enable_acl), + enable_flapping_detect = emqx_zone:get_env(Zone, enable_flapping_detect, false), + acl_deny_action = emqx_zone:get_env(Zone, acl_deny_action, ignore), + recv_stats = #{msg => 0, pkt => 0}, + send_stats = #{msg => 0, pkt => 0}, + connected = false, + ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), + topic_alias_maximum = #{to_client => 0, from_client => 0}, + conn_mod = maps:get(conn_mod, SocketOpts, undefined), + credentials = #{}, + ws_cookie = maps:get(ws_cookie, SocketOpts, undefined)}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -135,12 +144,13 @@ info(PState = #pstate{conn_props = ConnProps, topic_aliases = Aliases, will_msg = WillMsg, enable_acl = EnableAcl}) -> - attrs(PState) ++ [{conn_props, ConnProps}, - {ack_props, AckProps}, - {session, Session}, - {topic_aliases, Aliases}, - {will_msg, WillMsg}, - {enable_acl, EnableAcl}]. + maps:merge(attrs(PState), #{conn_props => ConnProps, + ack_props => AckProps, + session => Session, + topic_aliases => Aliases, + will_msg => WillMsg, + enable_acl => EnableAcl + }). attrs(#pstate{zone = Zone, client_id = ClientId, @@ -155,20 +165,20 @@ attrs(#pstate{zone = Zone, connected_at = ConnectedAt, conn_mod = ConnMod, credentials = Credentials}) -> - [{zone, Zone}, - {client_id, ClientId}, - {username, Username}, - {peername, Peername}, - {peercert, Peercert}, - {proto_ver, ProtoVer}, - {proto_name, ProtoName}, - {clean_start, CleanStart}, - {keepalive, Keepalive}, - {is_bridge, IsBridge}, - {connected_at, ConnectedAt}, - {conn_mod, ConnMod}, - {credentials, Credentials} - ]. + #{ zone => Zone + , client_id => ClientId + , username => Username + , peername => Peername + , peercert => Peercert + , proto_ver => ProtoVer + , proto_name => ProtoName + , clean_start => CleanStart + , keepalive => Keepalive + , is_bridge => IsBridge + , connected_at => ConnectedAt + , conn_mod => ConnMod + , credentials => Credentials + }. attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> get_property('Receive-Maximum', ConnProps, 65535); @@ -201,12 +211,16 @@ client_id(#pstate{client_id = ClientId}) -> credentials(#pstate{zone = Zone, client_id = ClientId, username = Username, + sockname = Sockname, peername = Peername, - peercert = Peercert}) -> + peercert = Peercert, + ws_cookie = WsCookie}) -> with_cert(#{zone => Zone, client_id => ClientId, + sockname => Sockname, username => Username, peername => Peername, + ws_cookie => WsCookie, mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert). with_cert(Credentials, undefined) -> Credentials; @@ -367,21 +381,22 @@ process(?CONNECT_PACKET( username = Username, password = Password} = ConnPkt), PState) -> - NewClientId = maybe_use_username_as_clientid(ClientId, Username, PState), + %% TODO: Mountpoint... + %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) + PState0 = maybe_use_username_as_clientid(ClientId, + set_username(Username, + PState#pstate{proto_ver = ProtoVer, + proto_name = ProtoName, + clean_start = CleanStart, + keepalive = Keepalive, + conn_props = ConnProps, + is_bridge = IsBridge, + connected_at = os:timestamp()})), + + NewClientId = PState0#pstate.client_id, emqx_logger:set_metadata_client_id(NewClientId), - %% TODO: Mountpoint... - %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) - PState0 = set_username(Username, - PState#pstate{client_id = NewClientId, - proto_ver = ProtoVer, - proto_name = ProtoName, - clean_start = CleanStart, - keepalive = Keepalive, - conn_props = ConnProps, - is_bridge = IsBridge, - connected_at = os:timestamp()}), Credentials = credentials(PState0), PState1 = PState0#pstate{credentials = Credentials}, connack( @@ -692,12 +707,12 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send}) %%------------------------------------------------------------------------------ %% Maybe use username replace client id -maybe_use_username_as_clientid(ClientId, undefined, _PState) -> - ClientId; -maybe_use_username_as_clientid(ClientId, Username, #pstate{zone = Zone}) -> +maybe_use_username_as_clientid(ClientId, PState = #pstate{username = undefined}) -> + PState#pstate{client_id = ClientId}; +maybe_use_username_as_clientid(ClientId, PState = #pstate{username = Username, zone = Zone}) -> case emqx_zone:get_env(Zone, use_username_as_clientid, false) of - true -> Username; - false -> ClientId + true -> PState#pstate{client_id = Username}; + false -> PState#pstate{client_id = ClientId} end. %%------------------------------------------------------------------------------ @@ -759,6 +774,7 @@ make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer, check_connect(Packet, PState) -> run_check_steps([fun check_proto_ver/2, fun check_client_id/2, + fun check_flapping/2, fun check_banned/2, fun check_will_topic/2], Packet, PState). @@ -791,6 +807,9 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone} false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} end. +check_flapping(#mqtt_packet_connect{}, PState) -> + do_flapping_detect(connect, PState). + check_banned(_ConnPkt, #pstate{enable_ban = false}) -> ok; check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username}, @@ -889,14 +908,16 @@ inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) -> terminate(_Reason, #pstate{client_id = undefined}) -> ok; -terminate(_Reason, #pstate{connected = false}) -> +terminate(_Reason, PState = #pstate{connected = false}) -> + do_flapping_detect(disconnect, PState), ok; -terminate(conflict, _PState) -> - ok; -terminate(discard, _PState) -> +terminate(Reason, PState) when Reason =:= conflict; + Reason =:= discard -> + do_flapping_detect(disconnect, PState), ok; -terminate(Reason, #pstate{credentials = Credentials}) -> +terminate(Reason, PState = #pstate{credentials = Credentials}) -> + do_flapping_detect(disconnect, PState), ?LOG(info, "[Protocol] Shutdown for ~p", [Reason]), ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]). @@ -925,6 +946,25 @@ flag(true) -> 1. %%------------------------------------------------------------------------------ %% Execute actions in case acl deny +do_flapping_detect(Action, #pstate{zone = Zone, + client_id = ClientId, + enable_flapping_detect = true}) -> + BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000), + Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20), + Until = erlang:system_time(second) + BanExpiryInterval, + case emqx_flapping:check(Action, ClientId, Threshold) of + flapping -> + emqx_banned:add(#banned{who = {client_id, ClientId}, + reason = <<"flapping">>, + by = <<"flapping_checker">>, + until = Until}), + ok; + _Other -> + ok + end; +do_flapping_detect(_Action, _PState) -> + ok. + do_acl_deny_action(?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload), ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer, acl_deny_action = disconnect}) -> diff --git a/src/emqx_rate_limiter.erl b/src/emqx_rate_limiter.erl deleted file mode 100644 index 5298651a6..000000000 --- a/src/emqx_rate_limiter.erl +++ /dev/null @@ -1,70 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_rate_limiter). - --behaviour(gen_server). - -%% API --export([start_link/0]). - -%% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). - --define(SERVER, ?MODULE). - --record(state, {}). - -%%------------------------------------------------------------------------------ -%%% API -%%------------------------------------------------------------------------------ - -%% @doc Starts the server --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%------------------------------------------------------------------------------ -%%% gen_server callbacks -%%------------------------------------------------------------------------------ - -init([]) -> - {ok, #state{}}. - -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%------------------------------------------------------------------------------ -%%% Internal functions -%%------------------------------------------------------------------------------ - diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 2ef64c9aa..f1aa6d3bd 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -676,6 +676,7 @@ terminate(Reason, #state{will_msg = WillMsg, username = Username, conn_pid = ConnPid, old_conn_pid = OldConnPid}) -> + emqx_metrics:commit(), send_willmsg(WillMsg), [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]], ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]). diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index bec56f142..771069704 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -73,6 +73,8 @@ -define(PUBSUB_STATS, [ 'topics/count', 'topics/max', + 'suboptions/count', + 'suboptions/max', 'subscribers/count', 'subscribers/max', 'subscriptions/count', @@ -242,9 +244,12 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ safe_update_element(Key, Val) -> - try ets:update_element(?TAB, Key, {2, Val}) + try ets:update_element(?TAB, Key, {2, Val}) of + false -> + ets:insert_new(?TAB, {Key, Val}); + true -> + true catch error:badarg -> - ets:insert_new(?TAB, {Key, Val}) + ?LOG(warning, "[Stats] Update ~p to ~p failed", [Key, Val]) end. - diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index 03cc95819..b75503b53 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -33,9 +33,9 @@ , code_change/3 ]). --type(option() :: {long_gc, false | pos_integer()} - | {long_schedule, false | pos_integer()} - | {large_heap, pos_integer()} +-type(option() :: {long_gc, non_neg_integer()} + | {long_schedule, non_neg_integer()} + | {large_heap, non_neg_integer()} | {busy_port, boolean()} | {busy_dist_port, boolean()}). @@ -66,11 +66,11 @@ parse_opt(Opts) -> parse_opt(Opts, []). parse_opt([], Acc) -> Acc; -parse_opt([{long_gc, false}|Opts], Acc) -> +parse_opt([{long_gc, 0}|Opts], Acc) -> parse_opt(Opts, Acc); parse_opt([{long_gc, Ms}|Opts], Acc) when is_integer(Ms) -> parse_opt(Opts, [{long_gc, Ms}|Acc]); -parse_opt([{long_schedule, false}|Opts], Acc) -> +parse_opt([{long_schedule, 0}|Opts], Acc) -> parse_opt(Opts, Acc); parse_opt([{long_schedule, Ms}|Opts], Acc) when is_integer(Ms) -> parse_opt(Opts, [{long_schedule, Ms}|Acc]); @@ -177,4 +177,3 @@ safe_publish(Event, WarnMsg) -> sysmon_msg(Topic, Payload) -> Msg = emqx_message:make(?SYSMON, Topic, Payload), emqx_message:set_flag(sys, Msg). - diff --git a/src/emqx_tables.erl b/src/emqx_tables.erl index 2c11b9d88..16812036a 100644 --- a/src/emqx_tables.erl +++ b/src/emqx_tables.erl @@ -14,7 +14,7 @@ -module(emqx_tables). --export([new/2]). +-export([new/2, delete/1]). -export([ lookup_value/2 , lookup_value/3 @@ -30,6 +30,16 @@ new(Tab, Opts) -> Tab -> ok end. +-spec(delete(atom()) -> ok). +delete(Tab) -> + case ets:info(Tab, name) of + undefined -> + ok; + Tab -> + ets:delete(Tab), + ok + end. + %% KV lookup -spec(lookup_value(atom(), term()) -> any()). lookup_value(Tab, Key) -> @@ -42,4 +52,3 @@ lookup_value(Tab, Key, Def) -> catch error:badarg -> Def end. - diff --git a/src/emqx_types.erl b/src/emqx_types.erl index c8c274b70..50e7f78c0 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -53,6 +53,7 @@ -export_type([ alarm/0 , plugin/0 + , banned/0 , command/0 ]). @@ -79,11 +80,16 @@ | banned | bad_authentication_method). -type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()). --type(credentials() :: #{client_id := client_id(), - username := username(), - peername := peername(), - auth_result := auth_result(), - zone => zone(), +-type(credentials() :: #{zone := zone(), + client_id := client_id(), + username := username(), + sockname := peername(), + peername := peername(), + ws_cookie := undefined | list(), + mountpoint := binary(), + password => binary(), + auth_result => auth_result(), + anonymous => boolean(), atom() => term() }). -type(subscription() :: #subscription{}). @@ -91,6 +97,7 @@ -type(topic_table() :: [{topic(), subopts()}]). -type(payload() :: binary() | iodata()). -type(message() :: #message{}). +-type(banned() :: #banned{}). -type(delivery() :: #delivery{}). -type(deliver_results() :: [{route, node(), topic()} | {dispatch, topic(), pos_integer()}]). @@ -98,4 +105,3 @@ -type(alarm() :: #alarm{}). -type(plugin() :: #plugin{}). -type(command() :: #command{}). - diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 92ae6a7dd..e828dd0a2 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -61,11 +61,11 @@ info(#state{peername = Peername, sockname = Sockname, proto_state = ProtoState}) -> ProtoInfo = emqx_protocol:info(ProtoState), - ConnInfo = [{socktype, websocket}, - {conn_state, running}, - {peername, Peername}, - {sockname, Sockname}], - lists:append([ConnInfo, ProtoInfo]). + ConnInfo = #{socktype => websocket, + conn_state => running, + peername => Peername, + sockname => Sockname}, + maps:merge(ProtoInfo, ConnInfo). %% for dashboard attrs(WSPid) when is_pid(WSPid) -> @@ -74,10 +74,10 @@ attrs(WSPid) when is_pid(WSPid) -> attrs(#state{peername = Peername, sockname = Sockname, proto_state = ProtoState}) -> - SockAttrs = [{peername, Peername}, - {sockname, Sockname}], + SockAttrs = #{peername => Peername, + sockname => Sockname}, ProtoAttrs = emqx_protocol:attrs(ProtoState), - lists:usort(lists:append(SockAttrs, ProtoAttrs)). + maps:merge(SockAttrs, ProtoAttrs). stats(WSPid) when is_pid(WSPid) -> call(WSPid, stats); @@ -138,16 +138,29 @@ websocket_init(#state{request = Req, options = Options}) -> Peername = cowboy_req:peer(Req), Sockname = cowboy_req:sock(Req), Peercert = cowboy_req:cert(Req), + WsCookie = try cowboy_req:parse_cookies(Req) + catch + error:badarg -> + ?LOG(error, "[WS Connection] Illegal cookie"), + undefined; + Error:Reason -> + ?LOG(error, + "[WS Connection] Cookie is parsed failed, Error: ~p, Reason ~p", + [Error, Reason]), + undefined + end, ProtoState = emqx_protocol:init(#{peername => Peername, sockname => Sockname, peercert => Peercert, sendfun => send_fun(self()), + ws_cookie => WsCookie, conn_mod => ?MODULE}, Options), ParserState = emqx_protocol:parser(ProtoState), Zone = proplists:get_value(zone, Options), EnableStats = emqx_zone:get_env(Zone, enable_stats, true), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), emqx_logger:set_metadata_peername(esockd_net:format(Peername)), + ok = emqx_misc:init_proc_mng_policy(Zone), {ok, #state{peername = Peername, sockname = Sockname, parse_state = ParserState, diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index 1c591b404..bcc0a6b6b 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -70,25 +70,21 @@ all() -> groups() -> [{connect, [non_parallel_tests], - [ - mqtt_connect, - mqtt_connect_with_tcp, - mqtt_connect_with_will_props, - mqtt_connect_with_ssl_oneway, - mqtt_connect_with_ssl_twoway, - mqtt_connect_with_ws - ]}, - {publish, [non_parallel_tests], - [ - packet_size - ]}]. + [mqtt_connect, + mqtt_connect_with_tcp, + mqtt_connect_with_will_props, + mqtt_connect_with_ssl_oneway, + mqtt_connect_with_ssl_twoway, + mqtt_connect_with_ws]}, + {publish, [non_parallel_tests], + [packet_size]}]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). %%-------------------------------------------------------------------- %% Protocol Test @@ -127,9 +123,9 @@ mqtt_connect_with_will_props(_) -> mqtt_connect_with_ssl_oneway(_) -> emqx:shutdown(), - emqx_ct_broker_helpers:change_opts(ssl_oneway), + emqx_ct_helpers:change_emqx_opts(ssl_oneway), emqx:start(), - ClientSsl = emqx_ct_broker_helpers:client_ssl(), + ClientSsl = emqx_ct_helpers:client_ssl(), {ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock} = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000), Packet = raw_send_serialize(?CLIENT), @@ -145,11 +141,11 @@ mqtt_connect_with_ssl_oneway(_) -> mqtt_connect_with_ssl_twoway(_Config) -> emqx:shutdown(), - emqx_ct_broker_helpers:change_opts(ssl_twoway), + emqx_ct_helpers:change_emqx_opts(ssl_twoway), emqx:start(), - ClientSsl = emqx_ct_broker_helpers:client_ssl_twoway(), + ClientSsl = emqx_ct_helpers:client_ssl_twoway(), {ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock} - = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000), + = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000), Packet = raw_send_serialize(?CLIENT), emqx_client_sock:setopts(Sock, [{active, once}]), emqx_client_sock:send(Sock, Packet), diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index 60f709ee5..d57094925 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -31,39 +31,41 @@ all() -> [{group, access_control}, {group, acl_cache}, {group, access_control_cache_mode}, - {group, access_rule} - ]. + {group, access_rule}]. groups() -> [{access_control, [sequence], - [reload_acl, - register_mod, - unregister_mod, - check_acl_1, - check_acl_2 - ]}, + [reload_acl, + register_mod, + unregister_mod, + check_acl_1, + check_acl_2]}, {access_control_cache_mode, [], - [ - acl_cache_basic, - acl_cache_expiry, - acl_cache_cleanup, - acl_cache_full - ]}, - {acl_cache, [], [ - put_get_del_cache, + [acl_cache_basic, + acl_cache_expiry, + acl_cache_cleanup, + acl_cache_full]}, + {acl_cache, [], + [put_get_del_cache, cache_update, cache_expiry, cache_replacement, cache_cleanup, cache_auto_emtpy, - cache_auto_cleanup - ]}, + cache_auto_cleanup]}, {access_rule, [], - [compile_rule, - match_rule]}]. + [compile_rule, + match_rule]}]. -init_per_group(Group, Config) when Group =:= access_control; - Group =:= access_control_cache_mode -> +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +init_per_group(Group, Config) when Group =:= access_control; + Group =:= access_control_cache_mode -> prepare_config(Group), application:load(emqx), Config; @@ -90,7 +92,6 @@ set_acl_config_file(_Group) -> write_config("access_SUITE_acl.conf", Rules), application:set_env(emqx, acl_file, "access_SUITE_acl.conf"). - write_config(Filename, Terms) -> file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]). @@ -98,7 +99,6 @@ end_per_group(_Group, Config) -> Config. init_per_testcase(_TestCase, Config) -> - %% {ok, _Pid} = ?AC:start_link(), Config. end_per_testcase(_TestCase, _Config) -> @@ -109,7 +109,6 @@ per_testcase_config(acl_cache_full, Config) -> per_testcase_config(_TestCase, Config) -> Config. - %%-------------------------------------------------------------------- %% emqx_access_control %%-------------------------------------------------------------------- diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl index 919418a62..d50c7fb5f 100644 --- a/test/emqx_alarm_handler_SUITE.erl +++ b/test/emqx_alarm_handler_SUITE.erl @@ -24,51 +24,18 @@ -include("emqx_mqtt.hrl"). -include("emqx.hrl"). -all() -> [t_alarm_handler, t_logger_handler]. +all() -> [t_alarm_handler, + t_logger_handler]. init_per_suite(Config) -> - [start_apps(App, {SchemaFile, ConfigFile}) || - {App, SchemaFile, ConfigFile} - <- [{emqx, local_path("priv/emqx.schema"), - local_path("etc/gen.emqx.conf")}]], + emqx_ct_helpers:start_apps([], fun set_special_configs/1), Config. end_per_suite(_Config) -> - application:stop(emqx). - -local_path(RelativePath) -> - filename:join([get_base_dir(), RelativePath]). - -deps_path(App, RelativePath) -> - %% Note: not lib_dir because etc dir is not sym-link-ed to _build dir - %% but priv dir is - Path0 = code:priv_dir(App), - Path = case file:read_link(Path0) of - {ok, Resolved} -> Resolved; - {error, _} -> Path0 - end, - filename:join([Path, "..", RelativePath]). - -get_base_dir() -> - {file, Here} = code:is_loaded(?MODULE), - filename:dirname(filename:dirname(Here)). - -start_apps(App, {SchemaFile, ConfigFile}) -> - read_schema_configs(App, {SchemaFile, ConfigFile}), - set_special_configs(App), - application:ensure_all_started(App). - -read_schema_configs(App, {SchemaFile, ConfigFile}) -> - ct:pal("Read configs - SchemaFile: ~p, ConfigFile: ~p", [SchemaFile, ConfigFile]), - Schema = cuttlefish_schema:files([SchemaFile]), - Conf = conf_parse:file(ConfigFile), - NewConfig = cuttlefish_generator:map(Schema, Conf), - Vals = proplists:get_value(App, NewConfig, []), - [application:set_env(App, Par, Value) || {Par, Value} <- Vals]. + emqx_ct_helpers:stop_apps([]). set_special_configs(emqx) -> - application:set_env(emqx, acl_file, deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf")); - + application:set_env(emqx, acl_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf")); set_special_configs(_App) -> ok. diff --git a/test/emqx_banned_SUITE.erl b/test/emqx_banned_SUITE.erl index bf7c9908a..02b057ebf 100644 --- a/test/emqx_banned_SUITE.erl +++ b/test/emqx_banned_SUITE.erl @@ -24,7 +24,7 @@ all() -> [t_banned_all]. t_banned_all(_) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), emqx_banned:start_link(), TimeNow = erlang:system_time(second), Banned = #banned{who = {client_id, <<"TestClient">>}, @@ -49,5 +49,4 @@ t_banned_all(_) -> ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), - emqx_ct_broker_helpers:run_teardown_steps(). - + emqx_ct_helpers:stop_apps([]). diff --git a/test/emqx_bridge_SUITE.erl b/test/emqx_bridge_SUITE.erl index b33a64210..7d94091e1 100644 --- a/test/emqx_bridge_SUITE.erl +++ b/test/emqx_bridge_SUITE.erl @@ -14,11 +14,12 @@ -module(emqx_bridge_SUITE). --export([all/0, init_per_suite/1, end_per_suite/1]). --export([t_rpc/1, - t_mqtt/1, - t_mngr/1 - ]). +-export([ all/0 + , init_per_suite/1 + , end_per_suite/1]). +-export([ t_rpc/1 + , t_mqtt/1 + , t_mngr/1]). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -27,21 +28,21 @@ -define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). -all() -> [t_rpc, - t_mqtt, - t_mngr]. +all() -> [ t_rpc + , t_mqtt + , t_mngr]. init_per_suite(Config) -> case node() of - nonode@nohost -> - net_kernel:start(['emqx@127.0.0.1', longnames]); - _ -> - ok + nonode@nohost -> net_kernel:start(['emqx@127.0.0.1', longnames]); + _ -> ok end, - emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]). + emqx_ct_helpers:start_apps([]), + emqx_logger:set_log_level(error), + [{log_level, error} | Config]. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). t_mngr(Config) when is_list(Config) -> Subs = [{<<"a">>, 1}, {<<"b">>, 2}], @@ -50,8 +51,7 @@ t_mngr(Config) when is_list(Config) -> connect_module => emqx_bridge_rpc, mountpoint => <<"forwarded">>, subscriptions => Subs, - start_type => auto - }, + start_type => auto}, Name = ?FUNCTION_NAME, {ok, Pid} = emqx_bridge:start_link(Name, Cfg), try @@ -77,8 +77,7 @@ t_rpc(Config) when is_list(Config) -> forwards => [<<"t_rpc/#">>], connect_module => emqx_bridge_rpc, mountpoint => <<"forwarded">>, - start_type => auto - }, + start_type => auto}, {ok, Pid} = emqx_bridge:start_link(?FUNCTION_NAME, Cfg), ClientId = <<"ClientId">>, try @@ -132,8 +131,7 @@ t_mqtt(Config) when is_list(Config) -> %% Consume back to forwarded message for verification %% NOTE: this is a indefenite loopback without mocking emqx_bridge:import_batch/2 subscriptions => [{ForwardedTopic, _QoS = 1}], - start_type => auto - }, + start_type => auto}, Tester = self(), Ref = make_ref(), meck:new(emqx_bridge, [passthrough, no_history]), @@ -156,14 +154,14 @@ t_mqtt(Config) when is_list(Config) -> Max = 100, Msgs = lists:seq(1, Max), lists:foreach(fun(I) -> - Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic, integer_to_binary(I)), - emqx_session:publish(SPid, I, Msg) + Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic, integer_to_binary(I)), + emqx_session:publish(SPid, I, Msg) end, Msgs), ok = receive_and_match_messages(Ref, Msgs), Msgs2 = lists:seq(Max + 1, Max * 2), lists:foreach(fun(I) -> - Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)), - emqx_session:publish(SPid, I, Msg) + Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)), + emqx_session:publish(SPid, I, Msg) end, Msgs2), ok = receive_and_match_messages(Ref, Msgs2), emqx_mock_client:close_session(ConnPid) diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 1aeb3314f..e2cfe638d 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -32,23 +32,21 @@ all() -> {group, stats}]. groups() -> - [ - {pubsub, [sequence], [subscribe_unsubscribe, + [{pubsub, [sequence], [subscribe_unsubscribe, publish, pubsub, t_shared_subscribe, dispatch_with_no_sub, 'pubsub#', 'pubsub+']}, {session, [sequence], [start_session]}, {metrics, [sequence], [inc_dec_metric]}, - {stats, [sequence], [set_get_stat]} - ]. + {stats, [sequence], [set_get_stat]}]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). %%-------------------------------------------------------------------- %% PubSub Test diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index 0de33d28b..b9f0daf10 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -46,11 +46,11 @@ groups() -> dollar_topics_test]}]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). receive_messages(Count) -> receive_messages(Count, []). diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index a64c8139a..05e8f3514 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -31,11 +31,11 @@ groups() -> t_lookup_conn_pid]}]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). init_per_testcase(_TestCase, Config) -> register_connection(), diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 9c9c3ab55..2c124fd44 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -27,11 +27,11 @@ all() -> [t_connect_api]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). t_connect_api(_Config) -> {ok, T1} = emqx_client:start_link([{host, "localhost"}, @@ -51,16 +51,16 @@ t_connect_api(_Config) -> emqx_client:disconnect(T1). t_info(ConnInfo) -> - ?assertEqual(tcp, proplists:get_value(socktype, ConnInfo)), - ?assertEqual(running, proplists:get_value(conn_state, ConnInfo)), - ?assertEqual(<<"client1">>, proplists:get_value(client_id, ConnInfo)), - ?assertEqual(<<"testuser1">>, proplists:get_value(username, ConnInfo)), - ?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, ConnInfo)). + ?assertEqual(tcp, maps:get(socktype, ConnInfo)), + ?assertEqual(running, maps:get(conn_state, ConnInfo)), + ?assertEqual(<<"client1">>, maps:get(client_id, ConnInfo)), + ?assertEqual(<<"testuser1">>, maps:get(username, ConnInfo)), + ?assertEqual(<<"MQTT">>, maps:get(proto_name, ConnInfo)). t_attrs(AttrsData) -> - ?assertEqual(<<"client1">>, proplists:get_value(client_id, AttrsData)), - ?assertEqual(emqx_connection, proplists:get_value(conn_mod, AttrsData)), - ?assertEqual(<<"testuser1">>, proplists:get_value(username, AttrsData)). + ?assertEqual(<<"client1">>, maps:get(client_id, AttrsData)), + ?assertEqual(emqx_connection, maps:get(conn_mod, AttrsData)), + ?assertEqual(<<"testuser1">>, maps:get(username, AttrsData)). t_stats(StatsData) -> ?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0), diff --git a/test/emqx_ct_broker_helpers.erl b/test/emqx_ct_broker_helpers.erl deleted file mode 100644 index 88240be85..000000000 --- a/test/emqx_ct_broker_helpers.erl +++ /dev/null @@ -1,198 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_ct_broker_helpers). - --compile(export_all). --compile(nowarn_export_all). - --define(APP, emqx). - --define(MQTT_SSL_TWOWAY, [{cacertfile, "certs/cacert.pem"}, - {verify, verify_peer}, - {fail_if_no_peer_cert, true}]). - --define(MQTT_SSL_CLIENT, [{keyfile, "certs/client-key.pem"}, - {cacertfile, "certs/cacert.pem"}, - {certfile, "certs/client-cert.pem"}]). - --define(CIPHERS, [{ciphers, - ["ECDHE-ECDSA-AES256-GCM-SHA384", - "ECDHE-RSA-AES256-GCM-SHA384", - "ECDHE-ECDSA-AES256-SHA384", - "ECDHE-RSA-AES256-SHA384","ECDHE-ECDSA-DES-CBC3-SHA", - "ECDH-ECDSA-AES256-GCM-SHA384", - "ECDH-RSA-AES256-GCM-SHA384", - "ECDH-ECDSA-AES256-SHA384","ECDH-RSA-AES256-SHA384", - "DHE-DSS-AES256-GCM-SHA384","DHE-DSS-AES256-SHA256", - "AES256-GCM-SHA384","AES256-SHA256", - "ECDHE-ECDSA-AES128-GCM-SHA256", - "ECDHE-RSA-AES128-GCM-SHA256", - "ECDHE-ECDSA-AES128-SHA256", - "ECDHE-RSA-AES128-SHA256", - "ECDH-ECDSA-AES128-GCM-SHA256", - "ECDH-RSA-AES128-GCM-SHA256", - "ECDH-ECDSA-AES128-SHA256","ECDH-RSA-AES128-SHA256", - "DHE-DSS-AES128-GCM-SHA256","DHE-DSS-AES128-SHA256", - "AES128-GCM-SHA256","AES128-SHA256", - "ECDHE-ECDSA-AES256-SHA","ECDHE-RSA-AES256-SHA", - "DHE-DSS-AES256-SHA","ECDH-ECDSA-AES256-SHA", - "ECDH-RSA-AES256-SHA","AES256-SHA", - "ECDHE-ECDSA-AES128-SHA","ECDHE-RSA-AES128-SHA", - "DHE-DSS-AES128-SHA","ECDH-ECDSA-AES128-SHA", - "ECDH-RSA-AES128-SHA","AES128-SHA"]}]). - -run_setup_steps() -> - _ = run_setup_steps([]), - %% return ok to be backward compatible - ok. - -run_setup_steps(Config) -> - NewConfig = generate_config(), - lists:foreach(fun set_app_env/1, NewConfig), - set_bridge_env(), - {ok, _} = application:ensure_all_started(?APP), - set_log_level(Config), - Config. - -run_teardown_steps() -> - ?APP:shutdown(). - -generate_config() -> - Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]), - Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]), - cuttlefish_generator:map(Schema, Conf). - -set_log_level(Config) -> - case proplists:get_value(log_level, Config) of - undefined -> ok; - Level -> emqx_logger:set_log_level(Level) - end. - -get_base_dir(Module) -> - {file, Here} = code:is_loaded(Module), - filename:dirname(filename:dirname(Here)). - -get_base_dir() -> - get_base_dir(?MODULE). - -local_path(Components, Module) -> - filename:join([get_base_dir(Module) | Components]). - -local_path(Components) -> - local_path(Components, ?MODULE). - -set_app_env({App, Lists}) -> - lists:foreach(fun({acl_file, _Var}) -> - application:set_env(App, acl_file, local_path(["etc", "acl.conf"])); - ({plugins_loaded_file, _Var}) -> - application:set_env(App, plugins_loaded_file, local_path(["test", "emqx_SUITE_data","loaded_plugins"])); - ({Par, Var}) -> - application:set_env(App, Par, Var) - end, Lists). - -set_bridge_env() -> - BridgeEnvs = bridge_conf(), - application:set_env(?APP, bridges, BridgeEnvs). - -change_opts(SslType) -> - {ok, Listeners} = application:get_env(?APP, listeners), - NewListeners = - lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) -> - case Protocol of - ssl -> - SslOpts = proplists:get_value(ssl_options, Opts), - Keyfile = local_path(["etc/certs", "key.pem"]), - Certfile = local_path(["etc/certs", "cert.pem"]), - TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}), - TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}), - TupleList3 = - case SslType of - ssl_twoway-> - CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]), - MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}), - lists:merge(TupleList2, MutSslList); - _ -> - lists:filter(fun ({cacertfile, _}) -> false; - ({verify, _}) -> false; - ({fail_if_no_peer_cert, _}) -> false; - (_) -> true - end, TupleList2) - end, - [{Protocol, Port, lists:keyreplace(ssl_options, 1, Opts, {ssl_options, TupleList3})} | Acc]; - _ -> - [Listener | Acc] - end - end, [], Listeners), - application:set_env(?APP, listeners, NewListeners). - -client_ssl_twoway() -> - [{Key, local_path(["etc", File])} || {Key, File} <- ?MQTT_SSL_CLIENT] ++ ?CIPHERS. - -client_ssl() -> - ?CIPHERS ++ [{reuse_sessions, true}]. - -wait_mqtt_payload(Payload) -> - receive - {publish, #{payload := Payload}} -> - ct:pal("OK - received msg: ~p~n", [Payload]) - after 1000 -> - ct:fail({timeout, Payload, {msg_box, flush()}}) - end. - -not_wait_mqtt_payload(Payload) -> - receive - {publish, #{payload := Payload}} -> - ct:fail({received, Payload}) - after 1000 -> - ct:pal("OK - msg ~p is not received", [Payload]) - end. - -flush() -> - flush([]). -flush(Msgs) -> - receive - M -> flush([M|Msgs]) - after - 0 -> lists:reverse(Msgs) - end. - -bridge_conf() -> - [ {local_rpc, - [{connect_module, emqx_bridge_rpc}, - {address, node()}, - {forwards, ["bridge-1/#", "bridge-2/#"]} - ]} - ]. - % [{aws, - % [{connect_module, emqx_bridge_mqtt}, - % {username,"user"}, - % {address,"127.0.0.1:1883"}, - % {clean_start,true}, - % {client_id,"bridge_aws"}, - % {forwards,["topic1/#","topic2/#"]}, - % {keepalive,60000}, - % {max_inflight,32}, - % {mountpoint,"bridge/aws/${node}/"}, - % {password,"passwd"}, - % {proto_ver,mqttv4}, - % {queue, - % #{batch_coun t_limit => 1000, - % replayq_dir => "data/emqx_aws_bridge/", - % replayq_seg_bytes => 10485760}}, - % {reconnect_delay_ms,30000}, - % {ssl,false}, - % {ssl_opts,[{versions,[tlsv1,'tlsv1.1','tlsv1.2']}]}, - % {start_type,manual}, - % {subscriptions,[{"cmd/topic1",1},{"cmd/topic2",1}]}]}]. diff --git a/test/emqx_ct_helpers.erl b/test/emqx_ct_helpers.erl deleted file mode 100644 index 2f35f42cc..000000000 --- a/test/emqx_ct_helpers.erl +++ /dev/null @@ -1,68 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_ct_helpers). - --export([ensure_mnesia_stopped/0, wait_for/4]). - -ensure_mnesia_stopped() -> - ekka_mnesia:ensure_stopped(), - ekka_mnesia:delete_schema(). - -%% Help function to wait for Fun to yield 'true'. -wait_for(Fn, Ln, F, Timeout) -> - {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end), - wait_for_down(Fn, Ln, Timeout, Pid, Mref, false). - -wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) -> - receive - {'DOWN', Mref, process, Pid, normal} -> - ok; - {'DOWN', Mref, process, Pid, {unexpected, Result}} -> - erlang:error({unexpected, Fn, Ln, Result}); - {'DOWN', Mref, process, Pid, {crashed, {C, E, S}}} -> - erlang:raise(C, {Fn, Ln, E}, S) - after - Timeout -> - case Kill of - true -> - erlang:demonitor(Mref, [flush]), - erlang:exit(Pid, kill), - erlang:error({Fn, Ln, timeout}); - false -> - Pid ! stop, - wait_for_down(Fn, Ln, Timeout, Pid, Mref, true) - end - end. - -wait_loop(_F, ok) -> exit(normal); -wait_loop(F, LastRes) -> - receive - stop -> erlang:exit(LastRes) - after - 100 -> - Res = catch_call(F), - wait_loop(F, Res) - end. - -catch_call(F) -> - try - case F() of - true -> ok; - Other -> {unexpected, Other} - end - catch - C : E : S -> - {crashed, {C, E, S}} - end. diff --git a/test/emqx_flapping_SUITE.erl b/test/emqx_flapping_SUITE.erl new file mode 100644 index 000000000..407bf6f6a --- /dev/null +++ b/test/emqx_flapping_SUITE.erl @@ -0,0 +1,60 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_flapping_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx.hrl"). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [t_flapping]. + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + prepare_for_test(), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_flapping(_Config) -> + process_flag(trap_exit, true), + flapping_connect(5), + {ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]), + {error, _} = emqx_client:connect(C), + receive + {'EXIT', Client, _Reason} -> + ct:log("receive exit signal, Client: ~p", [Client]) + after 1000 -> + ct:log("timeout") + end. + + +flapping_connect(Times) -> + [flapping_connect() || _ <- lists:seq(1, Times)]. + +flapping_connect() -> + {ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]), + {ok, _} = emqx_client:connect(C), + ok = emqx_client:disconnect(C). + +prepare_for_test() -> + emqx_zone:set_env(external, enable_flapping_detect, true), + emqx_zone:set_env(external, flapping_threshold, {10, 60}), + emqx_zone:set_env(external, flapping_expiry_interval, 3600). diff --git a/test/emqx_listeners_SUITE.erl b/test/emqx_listeners_SUITE.erl index d969699bc..cab8707d5 100644 --- a/test/emqx_listeners_SUITE.erl +++ b/test/emqx_listeners_SUITE.erl @@ -49,9 +49,27 @@ restart_listeners(_) -> ok = emqx_listeners:restart(), ok = emqx_listeners:stop(). +render_config_file() -> + Path = local_path(["etc", "emqx.conf"]), + {ok, Temp} = file:read_file(Path), + Vars0 = mustache_vars(), + Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- Vars0], + Targ = bbmustache:render(Temp, Vars), + NewName = Path ++ ".rendered", + ok = file:write_file(NewName, Targ), + NewName. + +mustache_vars() -> + [{platform_data_dir, local_path(["data"])}, + {platform_etc_dir, local_path(["etc"])}, + {platform_log_dir, local_path(["log"])}, + {platform_plugins_dir, local_path(["plugins"])} + ]. + generate_config() -> Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]), - Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]), + ConfFile = render_config_file(), + Conf = conf_parse:file(ConfFile), cuttlefish_generator:map(Schema, Conf). set_app_env({App, Lists}) -> diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 8df878147..cd6b0d2ce 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -27,24 +27,21 @@ <<"/TopicA">>]). -define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{ - client_id = <<"mqtt_client">>, - username = <<"emqx">>, - password = <<"public">>})). + client_id = <<"mqtt_client">>, + username = <<"emqx">>, + password = <<"public">>})). all() -> - [ - {group, mqtt_common}, + [{group, mqtt_common}, {group, mqttv4}, {group, mqttv5}, {group, acl}, - {group, frame_partial} - ]. + {group, frame_partial}]. groups() -> [{mqtt_common, [sequence], [will_topic_check, - will_acl_check - ]}, + will_acl_check]}, {mqttv4, [sequence], [connect_v4, subscribe_v4]}, @@ -54,18 +51,15 @@ groups() -> {acl, [sequence], [acl_deny_action_ct]}, {frame_partial, [sequence], - [handle_followed_packet]}]. + [handle_followed_packet]}]. init_per_suite(Config) -> - [start_apps(App, SchemaFile, ConfigFile) || - {App, SchemaFile, ConfigFile} - <- [{emqx, deps_path(emqx, "priv/emqx.schema"), - deps_path(emqx, "etc/gen.emqx.conf")}]], + emqx_ct_helpers:start_apps([], fun set_special_configs/1), emqx_zone:set_env(external, max_topic_alias, 20), Config. end_per_suite(_Config) -> - application:stop(emqx). + emqx_ct_helpers:stop_apps([]). batch_connect(NumberOfConnections) -> batch_connect([], NumberOfConnections). @@ -567,7 +561,7 @@ will_topic_check(_) -> emqx_client:stop(Client), ct:sleep(100), false = is_process_alive(Client), - emqx_ct_broker_helpers:wait_mqtt_payload(<<"I have died">>), + emqx_ct_helpers:wait_mqtt_payload(<<"I have died">>), emqx_client:stop(T). will_acl_check(_) -> @@ -613,37 +607,12 @@ acl_deny_do_disconnect(subscribe, QoS, Topic) -> after 1000 -> ct:fail({timeout, wait_tcp_closed}) end. -start_apps(App, SchemaFile, ConfigFile) -> - read_schema_configs(App, SchemaFile, ConfigFile), - set_special_configs(App), - application:ensure_all_started(App). - -read_schema_configs(App, SchemaFile, ConfigFile) -> - Schema = cuttlefish_schema:files([SchemaFile]), - Conf = conf_parse:file(ConfigFile), - NewConfig = cuttlefish_generator:map(Schema, Conf), - Vals = proplists:get_value(App, NewConfig, []), - [application:set_env(App, Par, Value) || {Par, Value} <- Vals]. - set_special_configs(emqx) -> application:set_env(emqx, enable_acl_cache, false), application:set_env(emqx, plugins_loaded_file, - deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")), + emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")), application:set_env(emqx, acl_deny_action, disconnect), application:set_env(emqx, acl_file, - deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf")); + emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf")); set_special_configs(_App) -> ok. - -deps_path(App, RelativePath) -> - %% Note: not lib_dir because etc dir is not sym-link-ed to _build dir - %% but priv dir is - Path0 = code:priv_dir(App), - Path = case file:read_link(Path0) of - {ok, Resolved} -> Resolved; - {error, _} -> Path0 - end, - filename:join([Path, "..", RelativePath]). - -local_path(RelativePath) -> - deps_path(emqx_auth_username, RelativePath). diff --git a/test/emqx_router_SUITE.erl b/test/emqx_router_SUITE.erl index 3a1b77e27..4fed02835 100644 --- a/test/emqx_router_SUITE.erl +++ b/test/emqx_router_SUITE.erl @@ -36,11 +36,11 @@ groups() -> t_unexpected]}]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). init_per_testcase(_TestCase, Config) -> clear_tables(), @@ -106,4 +106,3 @@ t_unexpected(_) -> clear_tables() -> lists:foreach(fun mnesia:clear_table/1, [emqx_route, emqx_trie, emqx_trie_node]). - diff --git a/test/emqx_rpc_SUITE.erl b/test/emqx_rpc_SUITE.erl index c3c163257..41c71b1bc 100644 --- a/test/emqx_rpc_SUITE.erl +++ b/test/emqx_rpc_SUITE.erl @@ -24,13 +24,13 @@ all() -> [t_rpc]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). t_rpc(_) -> 60000 = emqx_rpc:call(?MASTER, timer, seconds, [60]), {badrpc, _} = emqx_rpc:call(?MASTER, os, test, []), - {_, []} = emqx_rpc:multicall([?MASTER, ?MASTER], os, timestamp, []). \ No newline at end of file + {_, []} = emqx_rpc:multicall([?MASTER, ?MASTER], os, timestamp, []). diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 37ce34be9..6a7e8cabd 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -24,11 +24,11 @@ all() -> [ignore_loop, t_session_all]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). ignore_loop(_Config) -> application:set_env(emqx, mqtt_ignore_loop_deliver, true), diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 6cd6c98d0..d1699a653 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -37,15 +37,14 @@ all() -> [t_random_basic, t_sticky, t_hash, t_not_so_sticky, - t_no_connection_nack - ]. + t_no_connection_nack]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). t_random_basic(_) -> ok = ensure_config(random), @@ -258,4 +257,3 @@ ensure_config(Strategy, AckEnabled) -> subscribed(Group, Topic, Pid) -> lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)). - diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 2985bc823..38a5b5ff0 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -43,11 +43,11 @@ groups() -> t_lookup_session_pids]}]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). init_per_testcase(_All, Config) -> {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => self()}), diff --git a/test/emqx_sys_mon_SUITE.erl b/test/emqx_sys_mon_SUITE.erl index 53b6cebfd..86e7bd6a8 100644 --- a/test/emqx_sys_mon_SUITE.erl +++ b/test/emqx_sys_mon_SUITE.erl @@ -34,11 +34,11 @@ all() -> [t_sys_mon]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). t_sys_mon(_Config) -> lists:foreach(fun({PidOrPort, SysMonName,ValidateInfo, InfoOrPort}) -> @@ -64,4 +64,3 @@ validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) -> concat_str(ValidateInfo, InfoOrPort, Info) -> WarnInfo = io_lib:format(ValidateInfo, [InfoOrPort, Info]), lists:flatten(WarnInfo). - diff --git a/test/emqx_tables_SUITE.erl b/test/emqx_tables_SUITE.erl index c282e93af..c028d3681 100644 --- a/test/emqx_tables_SUITE.erl +++ b/test/emqx_tables_SUITE.erl @@ -23,4 +23,6 @@ t_new(_) -> ok = emqx_tables:new(test_table, [{read_concurrency, true}]), ets:insert(test_table, {key, 100}), ok = emqx_tables:new(test_table, [{read_concurrency, true}]), - 100 = ets:lookup_element(test_table, key, 2). + 100 = ets:lookup_element(test_table, key, 2), + ok = emqx_tables:delete(test_table), + ok = emqx_tables:delete(test_table). diff --git a/test/emqx_tracer_SUITE.erl b/test/emqx_tracer_SUITE.erl index 327930fc3..0e04de963 100644 --- a/test/emqx_tracer_SUITE.erl +++ b/test/emqx_tracer_SUITE.erl @@ -24,11 +24,11 @@ all() -> [start_traces]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). start_traces(_Config) -> {ok, T} = emqx_client:start_link([{host, "localhost"}, diff --git a/test/emqx_vm_mon_SUITE.erl b/test/emqx_vm_mon_SUITE.erl index 41a717293..3718e3626 100644 --- a/test/emqx_vm_mon_SUITE.erl +++ b/test/emqx_vm_mon_SUITE.erl @@ -23,6 +23,15 @@ -include_lib("common_test/include/ct.hrl"). +-define(WAIT(PATTERN, TIMEOUT), + receive + PATTERN -> + ok + after + TIMEOUT -> + error(timeout) + end). + all() -> [t_api]. init_per_suite(Config) -> @@ -33,18 +42,36 @@ end_per_suite(_Config) -> application:stop(sasl). t_api(_) -> - gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}), - {ok, _} = emqx_vm_mon:start_link([{check_interval, 1}, - {process_high_watermark, 0}, - {process_low_watermark, 0.6}]), - timer:sleep(2000), - ?assertEqual(true, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), - emqx_vm_mon:set_process_high_watermark(0.8), - emqx_vm_mon:set_process_low_watermark(0.75), - ?assertEqual(0.8, emqx_vm_mon:get_process_high_watermark()), - ?assertEqual(0.75, emqx_vm_mon:get_process_low_watermark()), - timer:sleep(3000), - ?assertEqual(false, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), - emqx_vm_mon:set_check_interval(20), - ?assertEqual(20, emqx_vm_mon:get_check_interval()), - ok. + meck:new(alarm_handler, [passthrough, no_history]), + Tester = self(), + Ref = make_ref(), + try + meck:expect(alarm_handler, set_alarm, + fun(What) -> + Res = meck:passthrough([What]), + Tester ! {Ref, set_alarm, What}, + Res + end), + meck:expect(alarm_handler, clear_alarm, + fun(What) -> + Res = meck:passthrough([What]), + Tester ! {Ref, clear_alarm, What}, + Res + end), + gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}), + {ok, _} = emqx_vm_mon:start_link([{check_interval, 1}, + {process_high_watermark, 0}, + {process_low_watermark, 0.6}]), + ?WAIT({Ref, set_alarm, {too_many_processes, _Count}}, 2000), + ?assertEqual(true, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), + emqx_vm_mon:set_process_high_watermark(0.8), + emqx_vm_mon:set_process_low_watermark(0.75), + ?assertEqual(0.8, emqx_vm_mon:get_process_high_watermark()), + ?assertEqual(0.75, emqx_vm_mon:get_process_low_watermark()), + ?WAIT({Ref, clear_alarm, too_many_processes}, 3000), + ?assertEqual(false, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), + emqx_vm_mon:set_check_interval(20), + ?assertEqual(20, emqx_vm_mon:get_check_interval()) + after + meck:unload(alarm_handler) + end. diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index c45344bae..c086ef6b7 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -39,11 +39,11 @@ all() -> [t_ws_connect_api]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). t_ws_connect_api(_Config) -> WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), @@ -73,16 +73,16 @@ raw_recv_pase(P) -> version => ?MQTT_PROTO_V4} }). t_info(InfoData) -> - ?assertEqual(websocket, proplists:get_value(socktype, InfoData)), - ?assertEqual(running, proplists:get_value(conn_state, InfoData)), - ?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, InfoData)), - ?assertEqual(<<"admin">>, proplists:get_value(username, InfoData)), - ?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, InfoData)). + ?assertEqual(websocket, maps:get(socktype, InfoData)), + ?assertEqual(running, maps:get(conn_state, InfoData)), + ?assertEqual(<<"mqtt_client">>, maps:get(client_id, InfoData)), + ?assertEqual(<<"admin">>, maps:get(username, InfoData)), + ?assertEqual(<<"MQTT">>, maps:get(proto_name, InfoData)). t_attrs(AttrsData) -> - ?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, AttrsData)), - ?assertEqual(emqx_ws_connection, proplists:get_value(conn_mod, AttrsData)), - ?assertEqual(<<"admin">>, proplists:get_value(username, AttrsData)). + ?assertEqual(<<"mqtt_client">>, maps:get(client_id, AttrsData)), + ?assertEqual(emqx_ws_connection, maps:get(conn_mod, AttrsData)), + ?assertEqual(<<"admin">>, maps:get(username, AttrsData)). t_stats(StatsData) -> ?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0), @@ -91,4 +91,4 @@ t_stats(StatsData) -> ?assertEqual(true, proplists:get_value(reductions, StatsData) >=0), ?assertEqual(true, proplists:get_value(recv_pkt, StatsData) =:=1), ?assertEqual(true, proplists:get_value(recv_msg, StatsData) >=0), - ?assertEqual(true, proplists:get_value(send_pkt, StatsData) =:=1). \ No newline at end of file + ?assertEqual(true, proplists:get_value(send_pkt, StatsData) =:=1). diff --git a/test/emqx_zone_SUITE.erl b/test/emqx_zone_SUITE.erl index 23ef3c67d..7f17d5258 100644 --- a/test/emqx_zone_SUITE.erl +++ b/test/emqx_zone_SUITE.erl @@ -35,4 +35,3 @@ t_set_get_env(_) -> emqx_zone:force_reload(), ?assertEqual(val, emqx_zone:get_env(zone1, key)), emqx_zone:stop(). - diff --git a/test/rfc6455_client.erl b/test/rfc6455_client.erl index f5d8f1ef4..987b72407 100644 --- a/test/rfc6455_client.erl +++ b/test/rfc6455_client.erl @@ -36,7 +36,7 @@ new(WsUrl, PPid) -> addr = Addr, path = "/" ++ Path, ppid = PPid}, - spawn(fun () -> + spawn(fun() -> start_conn(State) end). diff --git a/vars b/vars index fedd69a45..359c27c22 100644 --- a/vars +++ b/vars @@ -5,5 +5,4 @@ {platform_etc_dir, "etc"}. {platform_lib_dir, "lib"}. {platform_log_dir, "log"}. -{platform_plugins_dir, "plugins"}. - +{platform_plugins_dir, "plugins"}.