diff --git a/.gitignore b/.gitignore index ab0cbe156..16369a576 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ rebar.lock xrefr erlang.mk *.coverdata +etc/emqx.conf.rendered diff --git a/.travis.yml b/.travis.yml index f2c483cc6..abe6f7a6a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,18 +1,17 @@ language: erlang otp_release: - - 21.2 + - 21.3 before_install: - git clone https://github.com/erlang/rebar3.git; cd rebar3; ./bootstrap; sudo mv rebar3 /usr/local/bin/; cd .. script: - - make dep-vsn-check - - make rebar-compile - - make rebar-xref - - make rebar-eunit - - make rebar-ct - - make rebar-cover + - make compile + - make xref + - make eunit + - make ct + - make cover after_success: - make coveralls diff --git a/Makefile b/Makefile index acb854fa4..0aa6a03c8 100644 --- a/Makefile +++ b/Makefile @@ -1,37 +1,9 @@ -.PHONY: plugins tests +## shallow clone for speed -PROJECT = emqx -PROJECT_DESCRIPTION = EMQ X Broker +REBAR_GIT_CLONE_OPTIONS += --depth 1 +export REBAR_GIT_CLONE_OPTIONS -DEPS = jsx gproc gen_rpc ekka esockd cowboy replayq - -dep_jsx = git-emqx https://github.com/talentdeficit/jsx 2.9.0 -dep_gproc = git-emqx https://github.com/uwiger/gproc 0.8.0 -dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.1 -dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4 -dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.4 -dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.6.1 -dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1 - -NO_AUTOPATCH = cuttlefish - -ERLC_OPTS += +debug_info -DAPPLICATION=emqx - -BUILD_DEPS = cuttlefish -dep_cuttlefish = git-emqx https://github.com/emqx/cuttlefish v2.2.1 - -CUR_BRANCH := $(shell git branch | grep -e "^*" | cut -d' ' -f 2) -BRANCH := $(if $(filter $(CUR_BRANCH), master develop), $(CUR_BRANCH), develop) - -TEST_DEPS = emqx_ct_helpers -dep_emqx_ct_helpers = git-emqx https://github.com/emqx/emqx-ct-helpers.git v1.0 - -TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx - -EUNIT_OPTS = verbose - -# CT_SUITES = emqx_bridge -## emqx_trie emqx_router emqx_frame emqx_mqtt_compat +# CT_SUITES = emqx_trie emqx_router emqx_frame emqx_mqtt_compat CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ @@ -44,31 +16,68 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping CT_NODE_NAME = emqxct@127.0.0.1 -CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) -COVER = true +compile: + @rebar3 compile -PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl compiler mnesia -DIALYZER_DIRS := ebin/ -DIALYZER_OPTS := --verbose --statistics -Werror_handling -Wrace_conditions #-Wunmatched_returns +clean: distclean -$(shell [ -f erlang.mk ] || curl -s -o erlang.mk https://raw.githubusercontent.com/emqx/erlmk/master/erlang.mk) -include erlang.mk +## Cuttlefish escript is built by default when cuttlefish app (as dependency) was built +CUTTLEFISH_SCRIPT := _build/default/lib/cuttlefish/cuttlefish -clean:: gen-clean +.PHONY: cover +cover: + @rebar3 cover -.PHONY: gen-clean -gen-clean: - @rm -rf bbmustache - @rm -f etc/gen.emqx.conf +.PHONY: coveralls +coveralls: + @rebar3 coveralls send + +.PHONY: xref +xref: + @rebar3 xref + +.PHONY: deps +deps: + @rebar3 get-deps + +.PHONY: eunit +eunit: + @rebar3 eunit -v + +.PHONY: ct-setup +ct-setup: + rebar3 as test compile + @mkdir -p data + @if [ ! -f data/loaded_plugins ]; then touch data/loaded_plugins; fi + @ln -s -f '../../../../etc' _build/test/lib/emqx/ + @ln -s -f '../../../../data' _build/test/lib/emqx/ + +.PHONY: ct +ct: ct-setup + @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',') + +## Run one single CT with rebar3 +## e.g. make ct-one-suite suite=emqx_bridge +.PHONY: ct-one-suite +ct-one-suite: ct-setup + @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE + +.PHONY: app.config +app.config: $(CUTTLEFISH_SCRIPT) etc/gen.emqx.conf + $(CUTTLEFISH_SCRIPT) -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/ + +$(CUTTLEFISH_SCRIPT): + @rebar3 get-deps + @if [ ! -f cuttlefish ]; then make -C _build/default/lib/cuttlefish; fi bbmustache: - $(verbose) git clone https://github.com/soranoba/bbmustache.git && cd bbmustache && ./rebar3 compile && cd .. + @git clone https://github.com/soranoba/bbmustache.git && cd bbmustache && ./rebar3 compile && cd .. # This hack is to generate a conf file for testing # relx overlay is used for release etc/gen.emqx.conf: bbmustache etc/emqx.conf - $(verbose) erl -noshell -pa bbmustache/_build/default/lib/bbmustache/ebin -eval \ + @erl -noshell -pa bbmustache/_build/default/lib/bbmustache/ebin -eval \ "{ok, Temp} = file:read_file('etc/emqx.conf'), \ {ok, Vars0} = file:consult('vars'), \ Vars = [{atom_to_list(N), list_to_binary(V)} || {N, V} <- Vars0], \ @@ -76,51 +85,12 @@ etc/gen.emqx.conf: bbmustache etc/emqx.conf ok = file:write_file('etc/gen.emqx.conf', Targ), \ halt(0)." -CUTTLEFISH_SCRIPT = _build/default/lib/cuttlefish/cuttlefish +.PHONY: gen-clean +gen-clean: + @rm -rf bbmustache + @rm -f etc/gen.emqx.conf etc/emqx.conf.rendered -app.config: $(CUTTLEFISH_SCRIPT) etc/gen.emqx.conf - $(verbose) $(CUTTLEFISH_SCRIPT) -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/ - -ct: app.config - -rebar-cover: - @rebar3 cover - -coveralls: - @rebar3 coveralls send - - -$(CUTTLEFISH_SCRIPT): rebar-deps - @if [ ! -f cuttlefish ]; then make -C _build/default/lib/cuttlefish; fi - -rebar-xref: - @rebar3 xref - -rebar-deps: - @rebar3 get-deps - -rebar-eunit: $(CUTTLEFISH_SCRIPT) - @rebar3 eunit -v - -rebar-compile: - @rebar3 compile - -rebar-ct-setup: app.config - @rebar3 as test compile - @ln -s -f '../../../../etc' _build/test/lib/emqx/ - @ln -s -f '../../../../data' _build/test/lib/emqx/ - -rebar-ct: rebar-ct-setup - @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',') - -## Run one single CT with rebar3 -## e.g. make ct-one-suite suite=emqx_bridge -ct-one-suite: rebar-ct-setup - @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE - -rebar-clean: - @rebar3 clean - -distclean:: +.PHONY: distclean +distclean: gen-clean @rm -rf _build cover deps logs log data - @rm -f rebar.lock compile_commands.json cuttlefish + @rm -f rebar.lock compile_commands.json cuttlefish erl_crash.dump diff --git a/etc/emqx.conf b/etc/emqx.conf index df50884e9..dcae95e3b 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -723,6 +723,11 @@ zone.external.flapping_banned_expiry_interval = 1h ## Default: false zone.external.use_username_as_clientid = false +## Whether to ignore loop delivery of messages.(for mqtt v3.1.1) +## +## Value: true | false +zone.external.ignore_loop_deliver = false + ##-------------------------------------------------------------------- ## Internal Zone @@ -818,6 +823,11 @@ zone.internal.flapping_banned_expiry_interval = 1h ## Default: false zone.internal.use_username_as_clientid = false +## Whether to ignore loop delivery of messages.(for mqtt v3.1.1) +## +## Value: true | false +zone.internal.ignore_loop_deliver = false + ##-------------------------------------------------------------------- ## Listeners ##-------------------------------------------------------------------- @@ -863,8 +873,11 @@ listener.tcp.external.zone = external ## Rate limit for the external MQTT/TCP connections. Format is 'rate,burst'. ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * active_n)/2` ## Unit: Bps -## listener.tcp.external.rate_limit = 1024,4096 +## listener.tcp.external.rate_limit = 1024,52428800 ## The access control rules for the MQTT/TCP listener. ## @@ -994,8 +1007,11 @@ listener.tcp.internal.zone = internal ## See: listener.tcp.$name.rate_limit ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * active_n)/2` ## Unit: Bps -## listener.tcp.internal.rate_limit = 1000000,2000000 +## listener.tcp.internal.rate_limit = 1000000,524288000 ## The TCP backlog of internal MQTT/TCP Listener. ## @@ -1104,8 +1120,11 @@ listener.ssl.external.access.1 = allow all ## Rate limit for the external MQTT/SSL connections. ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * active_n)/2` ## Unit: Bps -## listener.ssl.external.rate_limit = 1024,4096 +## listener.ssl.external.rate_limit = 1024,52428800 ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind ## HAProxy or Nginx. @@ -1338,8 +1357,11 @@ listener.ws.external.max_conn_rate = 1000 ## Rate limit for the MQTT/WebSocket connections. ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * 1)/2` ## Unit: Bps -## listener.ws.external.rate_limit = 1024,4096 +## listener.ws.external.rate_limit = 1024,524288 ## Zone of the external MQTT/WebSocket listener belonged to. ## @@ -1546,8 +1568,11 @@ listener.wss.external.max_conn_rate = 1000 ## Rate limit for the MQTT/WebSocket/SSL connections. ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * 1)/2` ## Unit: Bps -## listener.wss.external.rate_limit = 1024,4096 +## listener.wss.external.rate_limit = 1024,524288 ## Zone of the external MQTT/WebSocket/SSL listener belonged to. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index 804ab041d..1d8e60a8f 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -400,7 +400,7 @@ end}. {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} ]}. -{mapping, "log.primary_level", "kernel.primary_log_level", [ +{mapping, "log.primary_log_level", "kernel.logger_level", [ {default, error}, {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} ]}. @@ -462,6 +462,10 @@ end}. hidden ]}. +{translation, "kernel.logger_level", fun(_, _, Conf) -> + cuttlefish:conf_get("log.level", Conf) +end}. + {translation, "kernel.logger", fun(Conf) -> LogTo = cuttlefish:conf_get("log.to", Conf), LogLevel = cuttlefish:conf_get("log.level", Conf), @@ -642,7 +646,7 @@ end}. ]}. %% @doc Whether to ignore loop delivery of messages.(for mqtt v3.1.1) -{mapping, "mqtt.ignore_loop_deliver", "emqx.mqtt_ignore_loop_deliver", [ +{mapping, "mqtt.ignore_loop_deliver", "emqx.ignore_loop_deliver", [ {default, true}, {datatype, {enum, [true, false]}} ]}. diff --git a/rebar.config b/rebar.config index 8db9caa3e..fdc7954d4 100644 --- a/rebar.config +++ b/rebar.config @@ -1,25 +1,20 @@ -{deps, [{jsx, "2.9.0"}, - {gproc, "0.8.0"}, - {cowboy, "2.6.1"}]}. - -%% appended to deps in rebar.config.script -{github_emqx_libs, - [{gen_rpc, "2.3.1"}, - {ekka, "v0.5.4"}, - {replayq, "v0.1.1"}, - {esockd, "v5.4.4"}, - {cuttlefish, "v2.2.1"}]}. - -{github_emqx_projects, - [{emqx_ct_helpers, "v1.0"}]}. +{deps, + [ {jsx, "2.9.0"} % hex + , {cowboy, "2.6.1"} % hex + , {gproc, "0.8.0"} % hex + , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.4"}}} + , {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.1.1"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {tag, "v5.4.4"}}} + , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} + ]}. {edoc_opts, [{preprocess, true}]}. {erl_opts, [warn_unused_vars, warn_shadow_vars, warn_unused_import, warn_obsolete_guard, - debug_info, - {d, 'APPLICATION', emqx}]}. + debug_info]}. {xref_checks, [undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls, warnings_as_errors, deprecated_functions]}. @@ -28,3 +23,13 @@ {cover_export_enabled, true}. {plugins, [coveralls]}. + +{profiles, + [{test, + [{deps, + [ {meck, "0.8.13"} % hex + , {bbmustache, "1.7.0"} % hex + , {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.1.1"}}} + ]} + ]} + ]}. diff --git a/rebar.config.script b/rebar.config.script index 36028ee3d..558910385 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -1,11 +1,3 @@ -CONFIG0 = case os:getenv("REBAR_GIT_CLONE_OPTIONS") of - "--depth 1" -> - CONFIG; - _ -> - os:putenv("REBAR_GIT_CLONE_OPTIONS", "--depth 1"), - CONFIG - end, - CONFIG1 = case os:getenv("TRAVIS") of "true" -> JobId = os:getenv("TRAVIS_JOB_ID"), @@ -16,28 +8,4 @@ CONFIG1 = case os:getenv("TRAVIS") of CONFIG end, -FindDeps = fun(DepsType, Config) -> - case lists:keyfind(DepsType, 1, Config) of - {_, RawDeps} -> RawDeps; - _ -> [] - end - end, -Deps = FindDeps(deps, CONFIG1), -LibDeps = FindDeps(github_emqx_libs, CONFIG1), -ProjDeps = FindDeps(github_emqx_projects, CONFIG1), -UrlPrefix = "https://github.com/emqx/", -RealName = fun TransName([$_ | Tail], Result) -> - TransName(Tail, [$- | Result]); - TransName([Head | Tail], Result) -> - TransName(Tail, [Head | Result]); - TransName([], Result) -> - lists:reverse(Result) - end, - -NewLibDeps = [{LibName, {git, UrlPrefix ++ atom_to_list(LibName), {branch, Branch}}} - || {LibName, Branch} <- LibDeps], -NewProjDeps = [{ProjName, {git, UrlPrefix ++ RealName(atom_to_list(ProjName), []), {branch, Branch}}} || {ProjName, Branch} <- ProjDeps], - -NewDeps = Deps ++ NewLibDeps ++ NewProjDeps, -CONFIG2 = lists:keystore(deps, 1, CONFIG1, {deps, NewDeps}), -CONFIG2. +CONFIG1. diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 1c577bf27..d6aab26d4 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -28,14 +28,12 @@ -spec(authenticate(emqx_types:credentials()) -> {ok, emqx_types:credentials()} | {error, term()}). authenticate(Credentials) -> - detect_anonymous_permission(Credentials, fun() -> - case emqx_hooks:run_fold('client.authenticate', [], init_auth_result(Credentials)) of - #{auth_result := success} = NewCredentials -> - {ok, NewCredentials}; - NewCredentials -> - {error, maps:get(auth_result, NewCredentials, unknown_error)} - end - end). + case emqx_hooks:run_fold('client.authenticate', [], init_auth_result(Credentials)) of + #{auth_result := success} = NewCredentials -> + {ok, NewCredentials}; + NewCredentials -> + {error, maps:get(auth_result, NewCredentials, unknown_error)} + end. %% @doc Check ACL -spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_types:topic()) -> allow | deny). @@ -68,22 +66,8 @@ reload_acl() -> emqx_mod_acl_internal:reload_acl(). init_auth_result(Credentials) -> - case anonymous_permission(Credentials) of - true -> Credentials#{auth_result => success}; - false -> Credentials#{auth_result => not_authorized} + case emqx_zone:get_env(maps:get(zone, Credentials, undefined), allow_anonymous, false) of + true -> Credentials#{auth_result => success, anonymous => true}; + false -> Credentials#{auth_result => not_authorized, anonymous => false} end. -detect_anonymous_permission(#{username := undefined, - password := undefined} = Credentials, Fun) -> - case anonymous_permission(Credentials) of - true -> {ok, Credentials}; - false -> Fun() - end; - -detect_anonymous_permission(_Credentials, Fun) -> - Fun(). - -anonymous_permission(Credentials) -> - emqx_zone:get_env(maps:get(zone, Credentials, undefined), - allow_anonymous, false). - diff --git a/src/emqx_app.erl b/src/emqx_app.erl index bc9d58408..47075922b 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -27,12 +27,6 @@ %%-------------------------------------------------------------------- start(_Type, _Args) -> - %% We'd like to configure the primary logger level here, rather than set the - %% kernel config `logger_level` before starting the erlang vm. - %% This is because the latter approach an annoying debug msg will be printed out: - %% "[debug] got_unexpected_message {'EXIT',<0.1198.0>,normal}" - logger:set_primary_config(level, application:get_env(kernel, primary_log_level, error)), - print_banner(), ekka:start(), {ok, Sup} = emqx_sup:start_link(), diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 89b84dc6b..0bd55756e 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -45,6 +45,7 @@ ]). -record(state, { + zone, transport, socket, peername, @@ -55,12 +56,10 @@ parse_state, gc_state, keepalive, - enable_stats, stats_timer, rate_limit, pub_limit, - limit_timer, - idle_timeout + limit_timer }). -define(ACTIVE_N, 100). @@ -152,7 +151,6 @@ init({Transport, RawSocket, Options}) -> RateLimit = init_limiter(proplists:get_value(rate_limit, Options)), PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)), ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), - EnableStats = emqx_zone:get_env(Zone, enable_stats, true), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), SendFun = fun(Packet, SeriaOpts) -> Data = emqx_frame:serialize(Packet, SeriaOpts), @@ -171,7 +169,8 @@ init({Transport, RawSocket, Options}) -> ParseState = emqx_protocol:parser(ProtoState), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), GcState = emqx_gc:init(GcPolicy), - State = #state{transport = Transport, + State = #state{zone = Zone, + transport = Transport, socket = Socket, peername = Peername, conn_state = running, @@ -180,9 +179,7 @@ init({Transport, RawSocket, Options}) -> pub_limit = PubLimit, proto_state = ProtoState, parse_state = ParseState, - gc_state = GcState, - enable_stats = EnableStats, - idle_timeout = IdleTimout}, + gc_state = GcState}, ok = emqx_misc:init_proc_mng_policy(Zone), gen_statem:enter_loop(?MODULE, [{hibernate_after, 2 * IdleTimout}], idle, State, self(), [IdleTimout]). @@ -317,9 +314,15 @@ handle(info, {tcp_passive, _Sock}, State) -> ok = activate_socket(NState), {keep_state, NState}; +handle(info, {ssl_passive, _Sock}, State) -> + %% Rate limit here:) + NState = ensure_rate_limit(State), + ok = activate_socket(NState), + {keep_state, NState}; + handle(info, activate_socket, State) -> %% Rate limit timer expired. - ok = activate_socket(State), + ok = activate_socket(State#state{conn_state = running}), {keep_state, State#state{conn_state = running, limit_timer = undefined}}; handle(info, {inet_reply, _Sock, ok}, State) -> @@ -442,6 +445,7 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> {0, Rl1} -> ensure_rate_limit(Limiters, setelement(Pos, State, Rl1)); {Pause, Rl1} -> + ?LOG(debug, "[Connection] Rate limit pause connection ~pms", [Pause]), TRef = erlang:send_after(Pause, self(), activate_socket), setelement(Pos, State#state{conn_state = blocked, limit_timer = TRef}, Rl1) end. @@ -453,11 +457,7 @@ activate_socket(#state{conn_state = blocked}) -> ok; activate_socket(#state{transport = Transport, socket = Socket, active_n = N}) -> - TrueOrN = case Transport:is_ssl(Socket) of - true -> true; %% Cannot set '{active, N}' for SSL:( - false -> N - end, - case Transport:setopts(Socket, [{active, TrueOrN}]) of + case Transport:setopts(Socket, [{active, N}]) of ok -> ok; {error, Reason} -> self() ! {shutdown, Reason}, @@ -467,11 +467,14 @@ activate_socket(#state{transport = Transport, socket = Socket, active_n = N}) -> %%------------------------------------------------------------------------------ %% Ensure stats timer -ensure_stats_timer(State = #state{enable_stats = true, - stats_timer = undefined, - idle_timeout = IdleTimeout}) -> - State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; - +ensure_stats_timer(State = #state{zone = Zone, stats_timer = undefined}) -> + case emqx_zone:get_env(Zone, enable_stats, true) of + true -> + IdleTimeout = emqx_zone:get_env(Zone, idle_timeout, 30000), + State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; + false -> + State + end; ensure_stats_timer(State) -> State. %%------------------------------------------------------------------------------ diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index af7a32f9a..1c1185bf0 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -108,13 +108,20 @@ ensure_file(File) -> with_loaded_file(File, SuccFun) -> case read_loaded(File) of - {ok, Names} -> + {ok, Names0} -> + Names = filter_plugins(Names0), SuccFun(Names); {error, Error} -> ?LOG(alert, "[Plugins] Failed to read: ~p, error: ~p", [File, Error]), {error, Error} end. +filter_plugins(Names) -> + lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1}; + ({Name1, true}) -> {true, Name1}; + ({_Name1, false}) -> false + end, Names). + load_plugins(Names, Persistent) -> Plugins = list(), NotFound = Names -- names(Plugins), case NotFound of @@ -264,7 +271,7 @@ plugin_loaded(Name, true) -> case lists:member(Name, Names) of false -> %% write file if plugin is loaded - write_loaded(lists:append(Names, [Name])); + write_loaded(lists:append(Names, [{Name, true}])); true -> ignore end; @@ -277,10 +284,7 @@ plugin_unloaded(_Name, false) -> plugin_unloaded(Name, true) -> case read_loaded() of {ok, Names0} -> - Names = lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1}; - ({Name1, true}) -> {true, Name1}; - ({_Name1, false}) -> false - end, Names0), + Names = filter_plugins(Names0), case lists:member(Name, Names) of true -> write_loaded(lists:delete(Name, Names)); @@ -304,7 +308,7 @@ write_loaded(AppNames) -> case file:open(File, [binary, write]) of {ok, Fd} -> lists:foreach(fun(Name) -> - file:write(Fd, iolist_to_binary(io_lib:format("~s.~n", [Name]))) + file:write(Fd, iolist_to_binary(io_lib:format("~p.~n", [Name]))) end, AppNames); {error, Error} -> ?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]), diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index e5ca486cc..eb86a1841 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -40,6 +40,7 @@ -record(pstate, { zone, sendfun, + sockname, peername, peercert, proto_ver, @@ -53,20 +54,14 @@ session, clean_start, topic_aliases, - packet_size, will_topic, will_msg, keepalive, is_bridge, - enable_ban, - enable_acl, - enable_flapping_detect, - acl_deny_action, recv_stats, send_stats, connected, connected_at, - ignore_loop, topic_alias_maximum, conn_mod, credentials, @@ -87,12 +82,14 @@ %%------------------------------------------------------------------------------ -spec(init(map(), list()) -> state()). -init(SocketOpts = #{ peername := Peername +init(SocketOpts = #{ sockname := Sockname + , peername := Peername , peercert := Peercert , sendfun := SendFun}, Options) -> Zone = proplists:get_value(zone, Options), #pstate{zone = Zone, sendfun = SendFun, + sockname = Sockname, peername = Peername, peercert = Peercert, proto_ver = ?MQTT_PROTO_V4, @@ -103,16 +100,10 @@ init(SocketOpts = #{ peername := Peername username = init_username(Peercert, Options), clean_start = false, topic_aliases = #{}, - packet_size = emqx_zone:get_env(Zone, max_packet_size), is_bridge = false, - enable_ban = emqx_zone:get_env(Zone, enable_ban, false), - enable_acl = emqx_zone:get_env(Zone, enable_acl), - enable_flapping_detect = emqx_zone:get_env(Zone, enable_flapping_detect, false), - acl_deny_action = emqx_zone:get_env(Zone, acl_deny_action, ignore), recv_stats = #{msg => 0, pkt => 0}, send_stats = #{msg => 0, pkt => 0}, connected = false, - ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), topic_alias_maximum = #{to_client => 0, from_client => 0}, conn_mod = maps:get(conn_mod, SocketOpts, undefined), credentials = #{}, @@ -135,18 +126,18 @@ set_username(_Username, PState) -> %% API %%------------------------------------------------------------------------------ -info(PState = #pstate{conn_props = ConnProps, +info(PState = #pstate{zone = Zone, + conn_props = ConnProps, ack_props = AckProps, session = Session, topic_aliases = Aliases, - will_msg = WillMsg, - enable_acl = EnableAcl}) -> + will_msg = WillMsg}) -> maps:merge(attrs(PState), #{conn_props => ConnProps, ack_props => AckProps, session => Session, topic_aliases => Aliases, will_msg => WillMsg, - enable_acl => EnableAcl + enable_acl => emqx_zone:get_env(Zone, enable_acl, false) }). attrs(#pstate{zone = Zone, @@ -208,11 +199,13 @@ client_id(#pstate{client_id = ClientId}) -> credentials(#pstate{zone = Zone, client_id = ClientId, username = Username, + sockname = Sockname, peername = Peername, peercert = Peercert, ws_cookie = WsCookie}) -> with_cert(#{zone => Zone, client_id => ClientId, + sockname => Sockname, username => Username, peername => Peername, ws_cookie => WsCookie, @@ -239,7 +232,8 @@ stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg}, session(#pstate{session = SPid}) -> SPid. -parser(#pstate{packet_size = Size, proto_ver = Ver}) -> +parser(#pstate{zone = Zone, proto_ver = Ver}) -> + Size = emqx_zone:get_env(Zone, max_packet_size), emqx_frame:initial_state(#{max_packet_size => Size, version => Ver}). %%------------------------------------------------------------------------------ @@ -376,21 +370,22 @@ process(?CONNECT_PACKET( username = Username, password = Password} = ConnPkt), PState) -> - NewClientId = maybe_use_username_as_clientid(ClientId, Username, PState), + %% TODO: Mountpoint... + %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) + PState0 = maybe_use_username_as_clientid(ClientId, + set_username(Username, + PState#pstate{proto_ver = ProtoVer, + proto_name = ProtoName, + clean_start = CleanStart, + keepalive = Keepalive, + conn_props = ConnProps, + is_bridge = IsBridge, + connected_at = os:timestamp()})), + + NewClientId = PState0#pstate.client_id, emqx_logger:set_metadata_client_id(NewClientId), - %% TODO: Mountpoint... - %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) - PState0 = set_username(Username, - PState#pstate{client_id = NewClientId, - proto_ver = ProtoVer, - proto_name = ProtoName, - clean_start = CleanStart, - keepalive = Keepalive, - conn_props = ConnProps, - is_bridge = IsBridge, - connected_at = os:timestamp()}), Credentials = credentials(PState0), PState1 = PState0#pstate{credentials = Credentials}, connack( @@ -424,31 +419,37 @@ process(?CONNECT_PACKET( {ReasonCode, PState1} end); -process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) -> +process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), + PState = #pstate{zone = Zone, proto_ver = ProtoVer}) -> case check_publish(Packet, PState) of ok -> do_publish(Packet, PState); {error, ReasonCode} -> ?LOG(warning, "[Protocol] Cannot publish qos0 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), - do_acl_deny_action(Packet, ReasonCode, PState) + AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), + ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState}, + do_acl_deny_action(AclDenyAction, Packet, ReasonCode, ErrorTerm) end; -process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) -> +process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), + PState = #pstate{zone = Zone, proto_ver = ProtoVer}) -> case check_publish(Packet, PState) of ok -> do_publish(Packet, PState); {error, ReasonCode} -> - ?LOG(warning, "[Protocol] Cannot publish qos1 message to ~s for ~s", - [Topic, emqx_reason_codes:text(ReasonCode)]), + ?LOG(warning, "[Protocol] Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), case deliver({puback, PacketId, ReasonCode}, PState) of {ok, PState1} -> - do_acl_deny_action(Packet, ReasonCode, PState1); + AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), + ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState1}, + do_acl_deny_action(AclDenyAction, Packet, ReasonCode, ErrorTerm); Error -> Error end end; -process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) -> +process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), + PState = #pstate{zone = Zone, proto_ver = ProtoVer}) -> case check_publish(Packet, PState) of ok -> do_publish(Packet, PState); @@ -457,7 +458,9 @@ process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) -> [Topic, emqx_reason_codes:text(ReasonCode)]), case deliver({pubrec, PacketId, ReasonCode}, PState) of {ok, PState1} -> - do_acl_deny_action(Packet, ReasonCode, PState1); + AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), + ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState1}, + do_acl_deny_action(AclDenyAction, Packet, ReasonCode, ErrorTerm); Error -> Error end end; @@ -485,7 +488,7 @@ process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState}; process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{session = SPid, credentials = Credentials}) -> + PState = #pstate{zone = Zone, proto_ver = ProtoVer, session = SPid, credentials = Credentials}) -> case check_subscribe(parse_topic_filters(?SUBSCRIBE, raw_topic_filters(PState, RawTopicFilters)), PState) of {ok, TopicFilters} -> TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters), @@ -503,7 +506,9 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), [SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]), case deliver({suback, PacketId, ReasonCodes}, PState) of {ok, PState1} -> - do_acl_deny_action(Packet, ReasonCodes, PState1); + AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), + ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState1}, + do_acl_deny_action(AclDenyAction, Packet, ReasonCodes, ErrorTerm); Error -> Error end @@ -689,6 +694,9 @@ deliver({disconnect, _ReasonCode}, PState) -> -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}). send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send}) -> case Send(Packet, #{version => Ver}) of + ok -> + trace(send, Packet), + {ok, PState}; {ok, Data} -> trace(send, Packet), emqx_metrics:sent(Packet), @@ -701,12 +709,12 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send}) %%------------------------------------------------------------------------------ %% Maybe use username replace client id -maybe_use_username_as_clientid(ClientId, undefined, _PState) -> - ClientId; -maybe_use_username_as_clientid(ClientId, Username, #pstate{zone = Zone}) -> +maybe_use_username_as_clientid(ClientId, PState = #pstate{username = undefined}) -> + PState#pstate{client_id = ClientId}; +maybe_use_username_as_clientid(ClientId, PState = #pstate{username = Username, zone = Zone}) -> case emqx_zone:get_env(Zone, use_username_as_clientid, false) of - true -> Username; - false -> ClientId + true -> PState#pstate{client_id = Username}; + false -> PState#pstate{client_id = ClientId} end. %%------------------------------------------------------------------------------ @@ -804,16 +812,13 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone} check_flapping(#mqtt_packet_connect{}, PState) -> do_flapping_detect(connect, PState). -check_banned(_ConnPkt, #pstate{enable_ban = false}) -> - ok; check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username}, - #pstate{peername = Peername}) -> - case emqx_banned:check(#{client_id => ClientId, - username => Username, - peername => Peername}) of - true -> {error, ?RC_BANNED}; - false -> ok - end. + #pstate{zone = Zone, peername = Peername}) -> + Credentials = #{client_id => ClientId, + username => Username, + peername => Peername}, + EnableBan = emqx_zone:get_env(Zone, enable_ban, false), + do_check_banned(EnableBan, Credentials). check_will_topic(#mqtt_packet_connect{will_flag = false}, _PState) -> ok; @@ -824,14 +829,14 @@ check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState) {error, ?RC_TOPIC_NAME_INVALID} end. -check_will_acl(_ConnPkt, #pstate{enable_acl = EnableAcl}) when not EnableAcl -> - ok; -check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, #pstate{credentials = Credentials}) -> - case emqx_access_control:check_acl(Credentials, publish, WillTopic) of - allow -> ok; - deny -> +check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, + #pstate{zone = Zone, credentials = Credentials}) -> + EnableAcl = emqx_zone:get_env(Zone, enable_acl, false), + case do_acl_check(EnableAcl, publish, Credentials, WillTopic) of + ok -> ok; + Other -> ?LOG(warning, "[Protocol] Cannot publish will message to ~p for acl denied", [WillTopic]), - {error, ?RC_NOT_AUTHORIZED} + Other end. check_publish(Packet, PState) -> @@ -843,14 +848,13 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret #pstate{zone = Zone}) -> emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}). -check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl}) - when IsSuper orelse (not EnableAcl) -> +check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}}) + when IsSuper -> ok; -check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, #pstate{credentials = Credentials}) -> - case emqx_access_control:check_acl(Credentials, publish, Topic) of - allow -> ok; - deny -> {error, ?RC_NOT_AUTHORIZED} - end. +check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, + #pstate{zone = Zone, credentials = Credentials}) -> + EnableAcl = emqx_zone:get_env(Zone, enable_acl, false), + do_acl_check(EnableAcl, publish, Credentials, Topic). run_check_steps([], _Packet, _PState) -> ok; @@ -870,17 +874,18 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) -> {error, TopicFilter1} end. -check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl}) - when IsSuper orelse (not EnableAcl) -> +check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}}) + when IsSuper -> {ok, TopicFilters}; -check_sub_acl(TopicFilters, #pstate{credentials = Credentials}) -> +check_sub_acl(TopicFilters, #pstate{zone = Zone, credentials = Credentials}) -> + EnableAcl = emqx_zone:get_env(Zone, enable_acl, false), lists:foldr( - fun({Topic, SubOpts}, {Ok, Acc}) -> - case emqx_access_control:check_acl(Credentials, subscribe, Topic) of - allow -> {Ok, [{Topic, SubOpts}|Acc]}; - deny -> - {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]} - end + fun({Topic, SubOpts}, {Ok, Acc}) when EnableAcl -> + AllowTerm = {Ok, [{Topic, SubOpts}|Acc]}, + DenyTerm = {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}, + do_acl_check(subscribe, Credentials, Topic, AllowTerm, DenyTerm); + (TopicFilter, Acc) -> + {ok, [TopicFilter | Acc]} end, {ok, []}, TopicFilters). trace(recv, Packet) -> @@ -941,52 +946,45 @@ flag(true) -> 1. %% Execute actions in case acl deny do_flapping_detect(Action, #pstate{zone = Zone, - client_id = ClientId, - enable_flapping_detect = true}) -> - BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000), - Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20), - Until = erlang:system_time(second) + BanExpiryInterval, - case emqx_flapping:check(Action, ClientId, Threshold) of - flapping -> - emqx_banned:add(#banned{who = {client_id, ClientId}, - reason = <<"flapping">>, - by = <<"flapping_checker">>, - until = Until}), - ok; - _Other -> - ok - end; -do_flapping_detect(_Action, _PState) -> - ok. + client_id = ClientId}) -> + ok = case emqx_zone:get_env(Zone, enable_flapping_detect, false) of + true -> + Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20), + case emqx_flapping:check(Action, ClientId, Threshold) of + flapping -> + BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000), + Until = erlang:system_time(second) + BanExpiryInterval, + emqx_banned:add(#banned{who = {client_id, ClientId}, + reason = <<"flapping">>, + by = <<"flapping_checker">>, + until = Until}), + ok; + _Other -> + ok + end; + _EnableFlappingDetect -> ok + end. -do_acl_deny_action(?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload), - ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer, - acl_deny_action = disconnect}) -> - {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState}; +do_acl_deny_action(disconnect, ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload), + ?RC_NOT_AUTHORIZED, ErrorTerm) -> + ErrorTerm; -do_acl_deny_action(?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, _Payload), - ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer, - acl_deny_action = disconnect}) -> +do_acl_deny_action(disconnect, ?PUBLISH_PACKET(QoS, _Topic, _PacketId, _Payload), + ?RC_NOT_AUTHORIZED, ErrorTerm = {_Error, _CodeName, PState}) + when QoS =:= ?QOS_1; QoS =:= ?QOS_2 -> deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState), - {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState}; + ErrorTerm; -do_acl_deny_action(?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, _Payload), - ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer, - acl_deny_action = disconnect}) -> - deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState), - {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState}; - -do_acl_deny_action(?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters), - ReasonCodes, PState = #pstate{proto_ver = ProtoVer, - acl_deny_action = disconnect}) -> +do_acl_deny_action(disconnect, ?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters), + ReasonCodes, ErrorTerm = {_Error, _CodeName, PState}) -> case lists:member(?RC_NOT_AUTHORIZED, ReasonCodes) of true -> deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState), - {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState}; + ErrorTerm; false -> {ok, PState} end; -do_acl_deny_action(_PubSupPacket, _ReasonCode, PState) -> +do_acl_deny_action(_OtherAction, _PubSupPacket, _ReasonCode, {_Error, _CodeName, PState}) -> {ok, PState}. %% Reason code compat @@ -997,9 +995,8 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) -> reason_codes_compat(PktType, ReasonCodes, _ProtoVer) -> [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes]. -raw_topic_filters(#pstate{proto_ver = ProtoVer, - is_bridge = IsBridge, - ignore_loop = IgnoreLoop}, RawTopicFilters) -> +raw_topic_filters(#pstate{zone = Zone, proto_ver = ProtoVer, is_bridge = IsBridge}, RawTopicFilters) -> + IgnoreLoop = emqx_zone:get_env(Zone, ignore_loop_deliver, false), case ProtoVer < ?MQTT_PROTO_V5 of true -> IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end, @@ -1013,3 +1010,23 @@ raw_topic_filters(#pstate{proto_ver = ProtoVer, mountpoint(Credentials) -> maps:get(mountpoint, Credentials, undefined). + +do_check_banned(_EnableBan = true, Credentials) -> + case emqx_banned:check(Credentials) of + true -> {error, ?RC_BANNED}; + false -> ok + end; +do_check_banned(_EnableBan, Credentials) -> ok. + +do_acl_check(_EnableAcl = true, Action, Credentials, Topic) -> + AllowTerm = ok, + DenyTerm = {error, ?RC_NOT_AUTHORIZED}, + do_acl_check(Action, Credentials, Topic, AllowTerm, DenyTerm); +do_acl_check(_EnableAcl, _Action, _Credentials, _Topic) -> + ok. + +do_acl_check(Action, Credentials, Topic, AllowTerm, DenyTerm) -> + case emqx_access_control:check_acl(Credentials, Action, Topic) of + allow -> AllowTerm; + deny -> DenyTerm + end. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index f1aa6d3bd..d38a28bab 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -84,6 +84,9 @@ -import(emqx_zone, [get_env/2, get_env/3]). -record(state, { + %% zone + zone :: atom(), + %% Idle timeout idle_timeout :: pos_integer(), @@ -111,24 +114,15 @@ %% Next packet id of the session next_pkt_id = 1 :: emqx_mqtt_types:packet_id(), - %% Max subscriptions - max_subscriptions :: non_neg_integer(), - %% Client’s Subscriptions. subscriptions :: map(), - %% Upgrade QoS? - upgrade_qos = false :: boolean(), - %% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked. inflight :: emqx_inflight:inflight(), %% Max Inflight Size. DEPRECATED: Get from inflight %% max_inflight = 32 :: non_neg_integer(), - %% Retry interval for redelivering QoS1/2 messages - retry_interval = 20000 :: timeout(), - %% Retry Timer retry_timer :: maybe(reference()), @@ -141,12 +135,6 @@ %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. awaiting_rel :: map(), - %% Max Packets Awaiting PUBREL - max_awaiting_rel = 100 :: non_neg_integer(), - - %% Awaiting PUBREL Timeout - await_rel_timeout = 20000 :: timeout(), - %% Awaiting PUBREL Timer await_rel_timer :: maybe(reference()), @@ -156,9 +144,6 @@ %% Expired Timer expiry_timer :: maybe(reference()), - %% Enable Stats - enable_stats :: boolean(), - %% Stats timer stats_timer :: maybe(reference()), @@ -191,28 +176,24 @@ start_link(SessAttrs) -> info(SPid) when is_pid(SPid) -> gen_server:call(SPid, info, infinity); -info(State = #state{conn_pid = ConnPid, +info(State = #state{zone = Zone, + conn_pid = ConnPid, next_pkt_id = PktId, - max_subscriptions = MaxSubscriptions, subscriptions = Subscriptions, - upgrade_qos = UpgradeQoS, inflight = Inflight, - retry_interval = RetryInterval, mqueue = MQueue, - awaiting_rel = AwaitingRel, - max_awaiting_rel = MaxAwaitingRel, - await_rel_timeout = AwaitRelTimeout}) -> + awaiting_rel = AwaitingRel}) -> attrs(State) ++ [{conn_pid, ConnPid}, {next_pkt_id, PktId}, - {max_subscriptions, MaxSubscriptions}, + {max_subscriptions, get_env(Zone, max_subscriptions, 0)}, {subscriptions, Subscriptions}, - {upgrade_qos, UpgradeQoS}, + {upgrade_qos, get_env(Zone, upgrade_qos, false)}, {inflight, Inflight}, - {retry_interval, RetryInterval}, + {retry_interval, get_env(Zone, retry_interval, 0)}, {mqueue_len, emqx_mqueue:len(MQueue)}, {awaiting_rel, AwaitingRel}, - {max_awaiting_rel, MaxAwaitingRel}, - {await_rel_timeout, AwaitRelTimeout}]. + {max_awaiting_rel, get_env(Zone, max_awaiting_rel)}, + {await_rel_timeout, get_env(Zone, await_rel_timeout)}]. %% @doc Get session attrs -spec(attrs(spid() | #state{}) -> list({atom(), term()})). @@ -236,21 +217,20 @@ attrs(#state{clean_start = CleanStart, stats(SPid) when is_pid(SPid) -> gen_server:call(SPid, stats, infinity); -stats(#state{max_subscriptions = MaxSubscriptions, +stats(#state{zone = Zone, subscriptions = Subscriptions, inflight = Inflight, mqueue = MQueue, - max_awaiting_rel = MaxAwaitingRel, awaiting_rel = AwaitingRel}) -> lists:append(emqx_misc:proc_stats(), - [{max_subscriptions, MaxSubscriptions}, + [{max_subscriptions, get_env(Zone, max_subscriptions, 0)}, {subscriptions_count, maps:size(Subscriptions)}, {max_inflight, emqx_inflight:max_size(Inflight)}, {inflight_len, emqx_inflight:size(Inflight)}, {max_mqueue, emqx_mqueue:max_len(MQueue)}, {mqueue_len, emqx_mqueue:len(MQueue)}, {mqueue_dropped, emqx_mqueue:dropped(MQueue)}, - {max_awaiting_rel, MaxAwaitingRel}, + {max_awaiting_rel, get_env(Zone, max_awaiting_rel)}, {awaiting_rel_len, maps:size(AwaitingRel)}, {deliver_msg, emqx_pd:get_counter(deliver_stats)}, {enqueue_msg, emqx_pd:get_counter(enqueue_stats)}]). @@ -364,23 +344,18 @@ init([Parent, #{zone := Zone, emqx_logger:set_metadata_client_id(ClientId), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), IdleTimout = get_env(Zone, idle_timeout, 30000), - State = #state{idle_timeout = IdleTimout, + State = #state{zone = Zone, + idle_timeout = IdleTimout, clean_start = CleanStart, deliver_fun = deliver_fun(ConnPid), client_id = ClientId, username = Username, conn_pid = ConnPid, subscriptions = #{}, - max_subscriptions = get_env(Zone, max_subscriptions, 0), - upgrade_qos = get_env(Zone, upgrade_qos, false), inflight = emqx_inflight:new(MaxInflight), mqueue = init_mqueue(Zone), - retry_interval = get_env(Zone, retry_interval, 0), awaiting_rel = #{}, - await_rel_timeout = get_env(Zone, await_rel_timeout), - max_awaiting_rel = get_env(Zone, max_awaiting_rel), expiry_interval = ExpiryInterval, - enable_stats = get_env(Zone, enable_stats, true), gc_state = emqx_gc:init(GcPolicy), created_at = os:timestamp(), will_msg = WillMsg @@ -433,9 +408,10 @@ handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_p %% PUBLISH: This is only to register packetId to session state. %% The actual message dispatching should be done by the caller (e.g. connection) process. handle_call({register_publish_packet_id, PacketId, Ts}, _From, - State = #state{awaiting_rel = AwaitingRel}) -> + State = #state{zone = Zone, awaiting_rel = AwaitingRel}) -> + MaxAwaitingRel = get_env(Zone, max_awaiting_rel), reply( - case is_awaiting_full(State) of + case is_awaiting_full(MaxAwaitingRel, AwaitingRel) of false -> case maps:is_key(PacketId, AwaitingRel) of true -> @@ -742,7 +718,8 @@ retry_delivery(_Force, [], _Now, State) -> ensure_retry_timer(State); retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, - State = #state{inflight = Inflight, retry_interval = Interval}) -> + State = #state{zone = Zone, inflight = Inflight}) -> + Interval = get_env(Zone, retry_interval, 0), %% Microseconds -> MilliSeconds Age = timer:now_diff(Now, Ts) div 1000, if @@ -789,7 +766,8 @@ expire_awaiting_rel([], _Now, State) -> State#state{await_rel_timer = undefined}; expire_awaiting_rel([{PacketId, Ts} | More], Now, - State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> + State = #state{zone = Zone, awaiting_rel = AwaitingRel}) -> + Timeout = get_env(Zone, await_rel_timeout), case (timer:now_diff(Now, Ts) div 1000) of Age when Age >= Timeout -> emqx_metrics:trans(inc, 'messages/qos2/expired'), @@ -803,11 +781,10 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now, %% Check awaiting rel %%------------------------------------------------------------------------------ -is_awaiting_full(#state{max_awaiting_rel = 0}) -> +is_awaiting_full(_MaxAwaitingRel = 0, _AwaitingRel) -> false; -is_awaiting_full(#state{awaiting_rel = AwaitingRel, - max_awaiting_rel = MaxLen}) -> - maps:size(AwaitingRel) >= MaxLen. +is_awaiting_full(MaxAwaitingRel, AwaitingRel) -> + maps:size(AwaitingRel) >= MaxAwaitingRel. %%------------------------------------------------------------------------------ %% Dispatch messages @@ -900,10 +877,11 @@ process_subopts([{nl, 1}|_Opts], #message{from = ClientId}, #state{client_id = C ignore; process_subopts([{nl, _}|Opts], Msg, State) -> process_subopts(Opts, Msg, State); -process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = false}) -> - process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State); -process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) -> - process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State); +process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #state{zone = Zone}) -> + case get_env(Zone, upgrade_qos, false) of + true -> process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State); + false -> process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State) + end; process_subopts([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, State = #state{}) -> process_subopts(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, State); process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags}, State = #state{}) -> @@ -1053,8 +1031,9 @@ drain_q(Cnt, Msgs, Q) -> %%------------------------------------------------------------------------------ %% Ensure timers -ensure_await_rel_timer(State = #state{await_rel_timer = undefined, - await_rel_timeout = Timeout}) -> +ensure_await_rel_timer(State = #state{zone = Zone, + await_rel_timer = undefined}) -> + Timeout = get_env(Zone, await_rel_timeout), ensure_await_rel_timer(Timeout, State); ensure_await_rel_timer(State) -> State. @@ -1064,8 +1043,8 @@ ensure_await_rel_timer(Timeout, State = #state{await_rel_timer = undefined}) -> ensure_await_rel_timer(_Timeout, State) -> State. -ensure_retry_timer(State = #state{retry_timer = undefined, - retry_interval = Interval}) -> +ensure_retry_timer(State = #state{zone = Zone, retry_timer = undefined}) -> + Interval = get_env(Zone, retry_interval, 0), ensure_retry_timer(Interval, State); ensure_retry_timer(State) -> State. @@ -1087,10 +1066,13 @@ ensure_will_delay_timer(State = #state{will_msg = WillMsg}) -> send_willmsg(WillMsg), State#state{will_msg = undefined}. -ensure_stats_timer(State = #state{enable_stats = true, +ensure_stats_timer(State = #state{zone = Zone, stats_timer = undefined, idle_timeout = IdleTimeout}) -> - State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; + case get_env(Zone, enable_stats, true) of + true -> State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; + _Other -> State + end; ensure_stats_timer(State) -> State. diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 4319b6ccb..a5c145601 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -223,7 +223,11 @@ set_session_stats(ClientId, SessPid, Stats) when is_binary(ClientId), is_pid(Ses lookup_session_pids(ClientId) -> case emqx_sm_registry:is_enabled() of true -> emqx_sm_registry:lookup_session(ClientId); - false -> emqx_tables:lookup_value(?SESSION_TAB, ClientId, []) + false -> + case emqx_tables:lookup_value(?SESSION_TAB, ClientId) of + undefined -> []; + SessPid when is_pid(SessPid) -> [SessPid] + end end. %% @doc Dispatch a message to the session. diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 021609dc8..50e7f78c0 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -80,11 +80,16 @@ | banned | bad_authentication_method). -type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()). --type(credentials() :: #{client_id := client_id(), - username := username(), - peername := peername(), - auth_result := auth_result(), - zone => zone(), +-type(credentials() :: #{zone := zone(), + client_id := client_id(), + username := username(), + sockname := peername(), + peername := peername(), + ws_cookie := undefined | list(), + mountpoint := binary(), + password => binary(), + auth_result => auth_result(), + anonymous => boolean(), atom() => term() }). -type(subscription() :: #subscription{}). diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 25bace1f2..225d0ec4a 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -62,7 +62,7 @@ get_env(Zone, Key) -> get_env(undefined, Key, Def) -> emqx_config:get_env(Key, Def); get_env(Zone, Key, Def) -> - try ets:lookup_element(?TAB, {Zone, Key}, 2) + try persistent_term:get({Zone, Key}) catch error:badarg -> emqx_config:get_env(Key, Def) end. @@ -84,7 +84,6 @@ stop() -> %%------------------------------------------------------------------------------ init([]) -> - ok = emqx_tables:new(?TAB, [set, {read_concurrency, true}]), {ok, element(2, handle_info(reload, #{timer => undefined}))}. handle_call(force_reload, _From, State) -> @@ -96,7 +95,7 @@ handle_call(Req, _From, State) -> {reply, ignored, State}. handle_cast({set_env, Zone, Key, Val}, State) -> - true = ets:insert(?TAB, {{Zone, Key}, Val}), + persistent_term:put({Zone, Key}, Val), {noreply, State}; handle_cast(Msg, State) -> @@ -122,11 +121,11 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ do_reload() -> - [ets:insert(?TAB, [{{Zone, Key}, Val} || {Key, Val} <- Opts]) + [[persistent_term:put({Zone, Key}, Val) + || {Key, Val} <- Opts] || {Zone, Opts} <- emqx_config:get_env(zones, [])]. ensure_reload_timer(State = #{timer := undefined}) -> State#{timer := erlang:send_after(timer:minutes(5), self(), reload)}; ensure_reload_timer(State) -> State. - diff --git a/test/emqx_listeners_SUITE.erl b/test/emqx_listeners_SUITE.erl index d969699bc..cab8707d5 100644 --- a/test/emqx_listeners_SUITE.erl +++ b/test/emqx_listeners_SUITE.erl @@ -49,9 +49,27 @@ restart_listeners(_) -> ok = emqx_listeners:restart(), ok = emqx_listeners:stop(). +render_config_file() -> + Path = local_path(["etc", "emqx.conf"]), + {ok, Temp} = file:read_file(Path), + Vars0 = mustache_vars(), + Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- Vars0], + Targ = bbmustache:render(Temp, Vars), + NewName = Path ++ ".rendered", + ok = file:write_file(NewName, Targ), + NewName. + +mustache_vars() -> + [{platform_data_dir, local_path(["data"])}, + {platform_etc_dir, local_path(["etc"])}, + {platform_log_dir, local_path(["log"])}, + {platform_plugins_dir, local_path(["plugins"])} + ]. + generate_config() -> Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]), - Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]), + ConfFile = render_config_file(), + Conf = conf_parse:file(ConfFile), cuttlefish_generator:map(Schema, Conf). set_app_env({App, Lists}) -> diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index cd6b0d2ce..9e8107665 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -55,7 +55,8 @@ groups() -> init_per_suite(Config) -> emqx_ct_helpers:start_apps([], fun set_special_configs/1), - emqx_zone:set_env(external, max_topic_alias, 20), + MqttCaps = emqx_zone:get_env(external, '$mqtt_caps'), + emqx_zone:set_env(external, '$mqtt_caps', MqttCaps#{max_topic_alias => 20}), Config. end_per_suite(_Config) -> @@ -85,15 +86,6 @@ with_connection(DoFun, NumberOfConnections) -> with_connection(DoFun) -> with_connection(DoFun, 1). - % {ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, - % [binary, {packet, raw}, - % {active, false}], 3000), - % try - % DoFun(Sock) - % after - % emqx_client_sock:close(Sock) - % end. - handle_followed_packet(_Config) -> ConnPkt = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>, PartialPkt1 = <<50,182,1,0,4,116,101,115,116,0,1,48,48,48,48,48,48,48,48,48,48,48,48,48, @@ -191,16 +183,18 @@ connect_v5(_) -> {ok, Data} = gen_tcp:recv(Sock, 0), {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), - ?assertNot(maps:is_key('Response-Information', Props)), - ok + ?assertNot(maps:is_key('Response-Information', Props)) end), % topic alias = 0 with_connection(fun([Sock]) -> + + %% ct:log("emqx_protocol: ~p~n", [emqx_zone:get_zone(external, max_topic_alias)]), emqx_client_sock:send(Sock, raw_send_serialize( ?CONNECT_PACKET( #mqtt_packet_connect{ + client_id = "hello", proto_ver = ?MQTT_PROTO_V5, properties = #{'Topic-Alias-Maximum' => 10}}), @@ -210,7 +204,6 @@ connect_v5(_) -> {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, #{'Topic-Alias-Maximum' := 20}), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), - emqx_client_sock:send(Sock, raw_send_serialize( ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, 1, #{'Topic-Alias' => 0}, <<"hello">>), @@ -383,9 +376,7 @@ connect_v5(_) -> ), {ok, WillData} = gen_tcp:recv(Sock2, 0, 5000), - {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5), - - emqx_client_sock:close(Sock2) + {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5) end), % duplicate client id diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 6a7e8cabd..1bab8b216 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -31,7 +31,7 @@ end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). ignore_loop(_Config) -> - application:set_env(emqx, mqtt_ignore_loop_deliver, true), + emqx_zone:set_env(external, ignore_loop_deliver, true), {ok, Client} = emqx_client:start_link(), {ok, _} = emqx_client:connect(Client), TestTopic = <<"Self">>, @@ -41,7 +41,7 @@ ignore_loop(_Config) -> {ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 2), ?assertEqual(0, length(emqx_client_SUITE:receive_messages(3))), ok = emqx_client:disconnect(Client), - application:set_env(emqx, mqtt_ignore_loop_deliver, false). + emqx_zone:set_env(external, ignore_loop_deliver, false). t_session_all(_) -> emqx_zone:set_env(internal, idle_timeout, 1000), diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index d1699a653..ae67cde69 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -251,8 +251,8 @@ ensure_config(Strategy) -> ensure_config(Strategy, _AckEnabled = true). ensure_config(Strategy, AckEnabled) -> - application:set_env(?APPLICATION, shared_subscription_strategy, Strategy), - application:set_env(?APPLICATION, shared_dispatch_ack_enabled, AckEnabled), + application:set_env(emqx, shared_subscription_strategy, Strategy), + application:set_env(emqx, shared_dispatch_ack_enabled, AckEnabled), ok. subscribed(Group, Topic, Pid) -> diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 38a5b5ff0..0c0a398b3 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -30,23 +30,33 @@ topic_alias_maximum => 0, will_msg => undefined}). -all() -> [{group, sm}]. +all() -> [{group, registry}, {group, ets}]. groups() -> - [{sm, [non_parallel_tests], - [ - t_resume_session, - t_discard_session, - t_register_unregister_session, - t_get_set_session_attrs, - t_get_set_session_stats, - t_lookup_session_pids]}]. + Cases = + [ t_resume_session, + t_discard_session, + t_register_unregister_session, + t_get_set_session_attrs, + t_get_set_session_stats, + t_lookup_session_pids], + [ {registry, [non_parallel_tests], Cases}, + {ets, [non_parallel_tests], Cases}]. init_per_suite(Config) -> - emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> + ok. + +init_per_group(registry, Config) -> + emqx_ct_helpers:start_apps([], fun enable_session_registry/1), + Config; +init_per_group(ets, Config) -> + emqx_ct_helpers:start_apps([], fun disable_session_registry/1), + Config. + +end_per_group(_, _Config) -> emqx_ct_helpers:stop_apps([]). init_per_testcase(_All, Config) -> @@ -60,6 +70,14 @@ end_per_testcase(_All, Config) -> after 500 -> ct:fail({timeout, wait_session_shutdown}) end. +enable_session_registry(_) -> + application:set_env(emqx, enable_session_registry, true), + ok. + +disable_session_registry(_) -> + application:set_env(emqx, enable_session_registry, false), + ok. + t_resume_session(Config) -> ?assertEqual({ok, ?config(session_pid, Config)}, emqx_sm:resume_session(<<"client">>, ?ATTRS#{conn_pid => self()})). diff --git a/test/emqx_vm_mon_SUITE.erl b/test/emqx_vm_mon_SUITE.erl index 41a717293..3718e3626 100644 --- a/test/emqx_vm_mon_SUITE.erl +++ b/test/emqx_vm_mon_SUITE.erl @@ -23,6 +23,15 @@ -include_lib("common_test/include/ct.hrl"). +-define(WAIT(PATTERN, TIMEOUT), + receive + PATTERN -> + ok + after + TIMEOUT -> + error(timeout) + end). + all() -> [t_api]. init_per_suite(Config) -> @@ -33,18 +42,36 @@ end_per_suite(_Config) -> application:stop(sasl). t_api(_) -> - gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}), - {ok, _} = emqx_vm_mon:start_link([{check_interval, 1}, - {process_high_watermark, 0}, - {process_low_watermark, 0.6}]), - timer:sleep(2000), - ?assertEqual(true, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), - emqx_vm_mon:set_process_high_watermark(0.8), - emqx_vm_mon:set_process_low_watermark(0.75), - ?assertEqual(0.8, emqx_vm_mon:get_process_high_watermark()), - ?assertEqual(0.75, emqx_vm_mon:get_process_low_watermark()), - timer:sleep(3000), - ?assertEqual(false, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), - emqx_vm_mon:set_check_interval(20), - ?assertEqual(20, emqx_vm_mon:get_check_interval()), - ok. + meck:new(alarm_handler, [passthrough, no_history]), + Tester = self(), + Ref = make_ref(), + try + meck:expect(alarm_handler, set_alarm, + fun(What) -> + Res = meck:passthrough([What]), + Tester ! {Ref, set_alarm, What}, + Res + end), + meck:expect(alarm_handler, clear_alarm, + fun(What) -> + Res = meck:passthrough([What]), + Tester ! {Ref, clear_alarm, What}, + Res + end), + gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}), + {ok, _} = emqx_vm_mon:start_link([{check_interval, 1}, + {process_high_watermark, 0}, + {process_low_watermark, 0.6}]), + ?WAIT({Ref, set_alarm, {too_many_processes, _Count}}, 2000), + ?assertEqual(true, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), + emqx_vm_mon:set_process_high_watermark(0.8), + emqx_vm_mon:set_process_low_watermark(0.75), + ?assertEqual(0.8, emqx_vm_mon:get_process_high_watermark()), + ?assertEqual(0.75, emqx_vm_mon:get_process_low_watermark()), + ?WAIT({Ref, clear_alarm, too_many_processes}, 3000), + ?assertEqual(false, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), + emqx_vm_mon:set_check_interval(20), + ?assertEqual(20, emqx_vm_mon:get_check_interval()) + after + meck:unload(alarm_handler) + end. diff --git a/test/emqx_zone_SUITE.erl b/test/emqx_zone_SUITE.erl index 7f17d5258..983e542f3 100644 --- a/test/emqx_zone_SUITE.erl +++ b/test/emqx_zone_SUITE.erl @@ -25,7 +25,6 @@ all() -> [t_set_get_env]. t_set_get_env(_) -> application:set_env(emqx, zones, [{china, [{language, chinese}]}]), {ok, _} = emqx_zone:start_link(), - ct:print("~p~n", [ets:tab2list(emqx_zone)]), chinese = emqx_zone:get_env(china, language), cn470 = emqx_zone:get_env(china, ism_band, cn470), undefined = emqx_zone:get_env(undefined, delay), diff --git a/vars b/vars index fedd69a45..359c27c22 100644 --- a/vars +++ b/vars @@ -5,5 +5,4 @@ {platform_etc_dir, "etc"}. {platform_lib_dir, "lib"}. {platform_log_dir, "log"}. -{platform_plugins_dir, "plugins"}. - +{platform_plugins_dir, "plugins"}.