Merge pull request #2573 from emqx/develop
Merge the develop branch into the master
This commit is contained in:
commit
297a385def
|
@ -39,3 +39,4 @@ rebar.lock
|
|||
xrefr
|
||||
erlang.mk
|
||||
*.coverdata
|
||||
etc/emqx.conf.rendered
|
||||
|
|
13
.travis.yml
13
.travis.yml
|
@ -1,18 +1,17 @@
|
|||
language: erlang
|
||||
|
||||
otp_release:
|
||||
- 21.2
|
||||
- 21.3
|
||||
|
||||
before_install:
|
||||
- git clone https://github.com/erlang/rebar3.git; cd rebar3; ./bootstrap; sudo mv rebar3 /usr/local/bin/; cd ..
|
||||
|
||||
script:
|
||||
- make dep-vsn-check
|
||||
- make rebar-compile
|
||||
- make rebar-xref
|
||||
- make rebar-eunit
|
||||
- make rebar-ct
|
||||
- make rebar-cover
|
||||
- make compile
|
||||
- make xref
|
||||
- make eunit
|
||||
- make ct
|
||||
- make cover
|
||||
|
||||
after_success:
|
||||
- make coveralls
|
||||
|
|
154
Makefile
154
Makefile
|
@ -1,37 +1,9 @@
|
|||
.PHONY: plugins tests
|
||||
## shallow clone for speed
|
||||
|
||||
PROJECT = emqx
|
||||
PROJECT_DESCRIPTION = EMQ X Broker
|
||||
REBAR_GIT_CLONE_OPTIONS += --depth 1
|
||||
export REBAR_GIT_CLONE_OPTIONS
|
||||
|
||||
DEPS = jsx gproc gen_rpc ekka esockd cowboy replayq
|
||||
|
||||
dep_jsx = git-emqx https://github.com/talentdeficit/jsx 2.9.0
|
||||
dep_gproc = git-emqx https://github.com/uwiger/gproc 0.8.0
|
||||
dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.1
|
||||
dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4
|
||||
dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.4
|
||||
dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.6.1
|
||||
dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1
|
||||
|
||||
NO_AUTOPATCH = cuttlefish
|
||||
|
||||
ERLC_OPTS += +debug_info -DAPPLICATION=emqx
|
||||
|
||||
BUILD_DEPS = cuttlefish
|
||||
dep_cuttlefish = git-emqx https://github.com/emqx/cuttlefish v2.2.1
|
||||
|
||||
CUR_BRANCH := $(shell git branch | grep -e "^*" | cut -d' ' -f 2)
|
||||
BRANCH := $(if $(filter $(CUR_BRANCH), master develop), $(CUR_BRANCH), develop)
|
||||
|
||||
TEST_DEPS = emqx_ct_helpers
|
||||
dep_emqx_ct_helpers = git-emqx https://github.com/emqx/emqx-ct-helpers.git v1.0
|
||||
|
||||
TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx
|
||||
|
||||
EUNIT_OPTS = verbose
|
||||
|
||||
# CT_SUITES = emqx_bridge
|
||||
## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
|
||||
# CT_SUITES = emqx_trie emqx_router emqx_frame emqx_mqtt_compat
|
||||
|
||||
CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
|
||||
emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
|
||||
|
@ -44,31 +16,68 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
|
|||
emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping
|
||||
|
||||
CT_NODE_NAME = emqxct@127.0.0.1
|
||||
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
|
||||
|
||||
COVER = true
|
||||
compile:
|
||||
@rebar3 compile
|
||||
|
||||
PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl compiler mnesia
|
||||
DIALYZER_DIRS := ebin/
|
||||
DIALYZER_OPTS := --verbose --statistics -Werror_handling -Wrace_conditions #-Wunmatched_returns
|
||||
clean: distclean
|
||||
|
||||
$(shell [ -f erlang.mk ] || curl -s -o erlang.mk https://raw.githubusercontent.com/emqx/erlmk/master/erlang.mk)
|
||||
include erlang.mk
|
||||
## Cuttlefish escript is built by default when cuttlefish app (as dependency) was built
|
||||
CUTTLEFISH_SCRIPT := _build/default/lib/cuttlefish/cuttlefish
|
||||
|
||||
clean:: gen-clean
|
||||
.PHONY: cover
|
||||
cover:
|
||||
@rebar3 cover
|
||||
|
||||
.PHONY: gen-clean
|
||||
gen-clean:
|
||||
@rm -rf bbmustache
|
||||
@rm -f etc/gen.emqx.conf
|
||||
.PHONY: coveralls
|
||||
coveralls:
|
||||
@rebar3 coveralls send
|
||||
|
||||
.PHONY: xref
|
||||
xref:
|
||||
@rebar3 xref
|
||||
|
||||
.PHONY: deps
|
||||
deps:
|
||||
@rebar3 get-deps
|
||||
|
||||
.PHONY: eunit
|
||||
eunit:
|
||||
@rebar3 eunit -v
|
||||
|
||||
.PHONY: ct-setup
|
||||
ct-setup:
|
||||
rebar3 as test compile
|
||||
@mkdir -p data
|
||||
@if [ ! -f data/loaded_plugins ]; then touch data/loaded_plugins; fi
|
||||
@ln -s -f '../../../../etc' _build/test/lib/emqx/
|
||||
@ln -s -f '../../../../data' _build/test/lib/emqx/
|
||||
|
||||
.PHONY: ct
|
||||
ct: ct-setup
|
||||
@rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',')
|
||||
|
||||
## Run one single CT with rebar3
|
||||
## e.g. make ct-one-suite suite=emqx_bridge
|
||||
.PHONY: ct-one-suite
|
||||
ct-one-suite: ct-setup
|
||||
@rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE
|
||||
|
||||
.PHONY: app.config
|
||||
app.config: $(CUTTLEFISH_SCRIPT) etc/gen.emqx.conf
|
||||
$(CUTTLEFISH_SCRIPT) -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/
|
||||
|
||||
$(CUTTLEFISH_SCRIPT):
|
||||
@rebar3 get-deps
|
||||
@if [ ! -f cuttlefish ]; then make -C _build/default/lib/cuttlefish; fi
|
||||
|
||||
bbmustache:
|
||||
$(verbose) git clone https://github.com/soranoba/bbmustache.git && cd bbmustache && ./rebar3 compile && cd ..
|
||||
@git clone https://github.com/soranoba/bbmustache.git && cd bbmustache && ./rebar3 compile && cd ..
|
||||
|
||||
# This hack is to generate a conf file for testing
|
||||
# relx overlay is used for release
|
||||
etc/gen.emqx.conf: bbmustache etc/emqx.conf
|
||||
$(verbose) erl -noshell -pa bbmustache/_build/default/lib/bbmustache/ebin -eval \
|
||||
@erl -noshell -pa bbmustache/_build/default/lib/bbmustache/ebin -eval \
|
||||
"{ok, Temp} = file:read_file('etc/emqx.conf'), \
|
||||
{ok, Vars0} = file:consult('vars'), \
|
||||
Vars = [{atom_to_list(N), list_to_binary(V)} || {N, V} <- Vars0], \
|
||||
|
@ -76,51 +85,12 @@ etc/gen.emqx.conf: bbmustache etc/emqx.conf
|
|||
ok = file:write_file('etc/gen.emqx.conf', Targ), \
|
||||
halt(0)."
|
||||
|
||||
CUTTLEFISH_SCRIPT = _build/default/lib/cuttlefish/cuttlefish
|
||||
.PHONY: gen-clean
|
||||
gen-clean:
|
||||
@rm -rf bbmustache
|
||||
@rm -f etc/gen.emqx.conf etc/emqx.conf.rendered
|
||||
|
||||
app.config: $(CUTTLEFISH_SCRIPT) etc/gen.emqx.conf
|
||||
$(verbose) $(CUTTLEFISH_SCRIPT) -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/
|
||||
|
||||
ct: app.config
|
||||
|
||||
rebar-cover:
|
||||
@rebar3 cover
|
||||
|
||||
coveralls:
|
||||
@rebar3 coveralls send
|
||||
|
||||
|
||||
$(CUTTLEFISH_SCRIPT): rebar-deps
|
||||
@if [ ! -f cuttlefish ]; then make -C _build/default/lib/cuttlefish; fi
|
||||
|
||||
rebar-xref:
|
||||
@rebar3 xref
|
||||
|
||||
rebar-deps:
|
||||
@rebar3 get-deps
|
||||
|
||||
rebar-eunit: $(CUTTLEFISH_SCRIPT)
|
||||
@rebar3 eunit -v
|
||||
|
||||
rebar-compile:
|
||||
@rebar3 compile
|
||||
|
||||
rebar-ct-setup: app.config
|
||||
@rebar3 as test compile
|
||||
@ln -s -f '../../../../etc' _build/test/lib/emqx/
|
||||
@ln -s -f '../../../../data' _build/test/lib/emqx/
|
||||
|
||||
rebar-ct: rebar-ct-setup
|
||||
@rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',')
|
||||
|
||||
## Run one single CT with rebar3
|
||||
## e.g. make ct-one-suite suite=emqx_bridge
|
||||
ct-one-suite: rebar-ct-setup
|
||||
@rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE
|
||||
|
||||
rebar-clean:
|
||||
@rebar3 clean
|
||||
|
||||
distclean::
|
||||
.PHONY: distclean
|
||||
distclean: gen-clean
|
||||
@rm -rf _build cover deps logs log data
|
||||
@rm -f rebar.lock compile_commands.json cuttlefish
|
||||
@rm -f rebar.lock compile_commands.json cuttlefish erl_crash.dump
|
||||
|
|
|
@ -723,6 +723,11 @@ zone.external.flapping_banned_expiry_interval = 1h
|
|||
## Default: false
|
||||
zone.external.use_username_as_clientid = false
|
||||
|
||||
## Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
|
||||
##
|
||||
## Value: true | false
|
||||
zone.external.ignore_loop_deliver = false
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## Internal Zone
|
||||
|
||||
|
@ -818,6 +823,11 @@ zone.internal.flapping_banned_expiry_interval = 1h
|
|||
## Default: false
|
||||
zone.internal.use_username_as_clientid = false
|
||||
|
||||
## Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
|
||||
##
|
||||
## Value: true | false
|
||||
zone.internal.ignore_loop_deliver = false
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## Listeners
|
||||
##--------------------------------------------------------------------
|
||||
|
@ -863,8 +873,11 @@ listener.tcp.external.zone = external
|
|||
## Rate limit for the external MQTT/TCP connections. Format is 'rate,burst'.
|
||||
##
|
||||
## Value: rate,burst
|
||||
## - rate: The average limit value for per second
|
||||
## - burst: The maximum allowed for each check, To avoid frequent restriction
|
||||
## this value is recommended to be set to `(max_packet_size * active_n)/2`
|
||||
## Unit: Bps
|
||||
## listener.tcp.external.rate_limit = 1024,4096
|
||||
## listener.tcp.external.rate_limit = 1024,52428800
|
||||
|
||||
## The access control rules for the MQTT/TCP listener.
|
||||
##
|
||||
|
@ -994,8 +1007,11 @@ listener.tcp.internal.zone = internal
|
|||
## See: listener.tcp.$name.rate_limit
|
||||
##
|
||||
## Value: rate,burst
|
||||
## - rate: The average limit value for per second
|
||||
## - burst: The maximum allowed for each check, To avoid frequent restriction
|
||||
## this value is recommended to be set to `(max_packet_size * active_n)/2`
|
||||
## Unit: Bps
|
||||
## listener.tcp.internal.rate_limit = 1000000,2000000
|
||||
## listener.tcp.internal.rate_limit = 1000000,524288000
|
||||
|
||||
## The TCP backlog of internal MQTT/TCP Listener.
|
||||
##
|
||||
|
@ -1104,8 +1120,11 @@ listener.ssl.external.access.1 = allow all
|
|||
## Rate limit for the external MQTT/SSL connections.
|
||||
##
|
||||
## Value: rate,burst
|
||||
## - rate: The average limit value for per second
|
||||
## - burst: The maximum allowed for each check, To avoid frequent restriction
|
||||
## this value is recommended to be set to `(max_packet_size * active_n)/2`
|
||||
## Unit: Bps
|
||||
## listener.ssl.external.rate_limit = 1024,4096
|
||||
## listener.ssl.external.rate_limit = 1024,52428800
|
||||
|
||||
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
|
||||
## HAProxy or Nginx.
|
||||
|
@ -1338,8 +1357,11 @@ listener.ws.external.max_conn_rate = 1000
|
|||
## Rate limit for the MQTT/WebSocket connections.
|
||||
##
|
||||
## Value: rate,burst
|
||||
## - rate: The average limit value for per second
|
||||
## - burst: The maximum allowed for each check, To avoid frequent restriction
|
||||
## this value is recommended to be set to `(max_packet_size * 1)/2`
|
||||
## Unit: Bps
|
||||
## listener.ws.external.rate_limit = 1024,4096
|
||||
## listener.ws.external.rate_limit = 1024,524288
|
||||
|
||||
## Zone of the external MQTT/WebSocket listener belonged to.
|
||||
##
|
||||
|
@ -1546,8 +1568,11 @@ listener.wss.external.max_conn_rate = 1000
|
|||
## Rate limit for the MQTT/WebSocket/SSL connections.
|
||||
##
|
||||
## Value: rate,burst
|
||||
## - rate: The average limit value for per second
|
||||
## - burst: The maximum allowed for each check, To avoid frequent restriction
|
||||
## this value is recommended to be set to `(max_packet_size * 1)/2`
|
||||
## Unit: Bps
|
||||
## listener.wss.external.rate_limit = 1024,4096
|
||||
## listener.wss.external.rate_limit = 1024,524288
|
||||
|
||||
## Zone of the external MQTT/WebSocket/SSL listener belonged to.
|
||||
##
|
||||
|
|
|
@ -400,7 +400,7 @@ end}.
|
|||
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}}
|
||||
]}.
|
||||
|
||||
{mapping, "log.primary_level", "kernel.primary_log_level", [
|
||||
{mapping, "log.primary_log_level", "kernel.logger_level", [
|
||||
{default, error},
|
||||
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}}
|
||||
]}.
|
||||
|
@ -462,6 +462,10 @@ end}.
|
|||
hidden
|
||||
]}.
|
||||
|
||||
{translation, "kernel.logger_level", fun(_, _, Conf) ->
|
||||
cuttlefish:conf_get("log.level", Conf)
|
||||
end}.
|
||||
|
||||
{translation, "kernel.logger", fun(Conf) ->
|
||||
LogTo = cuttlefish:conf_get("log.to", Conf),
|
||||
LogLevel = cuttlefish:conf_get("log.level", Conf),
|
||||
|
@ -642,7 +646,7 @@ end}.
|
|||
]}.
|
||||
|
||||
%% @doc Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
|
||||
{mapping, "mqtt.ignore_loop_deliver", "emqx.mqtt_ignore_loop_deliver", [
|
||||
{mapping, "mqtt.ignore_loop_deliver", "emqx.ignore_loop_deliver", [
|
||||
{default, true},
|
||||
{datatype, {enum, [true, false]}}
|
||||
]}.
|
||||
|
|
37
rebar.config
37
rebar.config
|
@ -1,25 +1,20 @@
|
|||
{deps, [{jsx, "2.9.0"},
|
||||
{gproc, "0.8.0"},
|
||||
{cowboy, "2.6.1"}]}.
|
||||
|
||||
%% appended to deps in rebar.config.script
|
||||
{github_emqx_libs,
|
||||
[{gen_rpc, "2.3.1"},
|
||||
{ekka, "v0.5.4"},
|
||||
{replayq, "v0.1.1"},
|
||||
{esockd, "v5.4.4"},
|
||||
{cuttlefish, "v2.2.1"}]}.
|
||||
|
||||
{github_emqx_projects,
|
||||
[{emqx_ct_helpers, "v1.0"}]}.
|
||||
{deps,
|
||||
[ {jsx, "2.9.0"} % hex
|
||||
, {cowboy, "2.6.1"} % hex
|
||||
, {gproc, "0.8.0"} % hex
|
||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.4"}}}
|
||||
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.1.1"}}}
|
||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "v5.4.4"}}}
|
||||
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
||||
]}.
|
||||
|
||||
{edoc_opts, [{preprocess, true}]}.
|
||||
{erl_opts, [warn_unused_vars,
|
||||
warn_shadow_vars,
|
||||
warn_unused_import,
|
||||
warn_obsolete_guard,
|
||||
debug_info,
|
||||
{d, 'APPLICATION', emqx}]}.
|
||||
debug_info]}.
|
||||
{xref_checks, [undefined_function_calls, undefined_functions,
|
||||
locals_not_used, deprecated_function_calls,
|
||||
warnings_as_errors, deprecated_functions]}.
|
||||
|
@ -28,3 +23,13 @@
|
|||
{cover_export_enabled, true}.
|
||||
|
||||
{plugins, [coveralls]}.
|
||||
|
||||
{profiles,
|
||||
[{test,
|
||||
[{deps,
|
||||
[ {meck, "0.8.13"} % hex
|
||||
, {bbmustache, "1.7.0"} % hex
|
||||
, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.1.1"}}}
|
||||
]}
|
||||
]}
|
||||
]}.
|
||||
|
|
|
@ -1,11 +1,3 @@
|
|||
CONFIG0 = case os:getenv("REBAR_GIT_CLONE_OPTIONS") of
|
||||
"--depth 1" ->
|
||||
CONFIG;
|
||||
_ ->
|
||||
os:putenv("REBAR_GIT_CLONE_OPTIONS", "--depth 1"),
|
||||
CONFIG
|
||||
end,
|
||||
|
||||
CONFIG1 = case os:getenv("TRAVIS") of
|
||||
"true" ->
|
||||
JobId = os:getenv("TRAVIS_JOB_ID"),
|
||||
|
@ -16,28 +8,4 @@ CONFIG1 = case os:getenv("TRAVIS") of
|
|||
CONFIG
|
||||
end,
|
||||
|
||||
FindDeps = fun(DepsType, Config) ->
|
||||
case lists:keyfind(DepsType, 1, Config) of
|
||||
{_, RawDeps} -> RawDeps;
|
||||
_ -> []
|
||||
end
|
||||
end,
|
||||
Deps = FindDeps(deps, CONFIG1),
|
||||
LibDeps = FindDeps(github_emqx_libs, CONFIG1),
|
||||
ProjDeps = FindDeps(github_emqx_projects, CONFIG1),
|
||||
UrlPrefix = "https://github.com/emqx/",
|
||||
RealName = fun TransName([$_ | Tail], Result) ->
|
||||
TransName(Tail, [$- | Result]);
|
||||
TransName([Head | Tail], Result) ->
|
||||
TransName(Tail, [Head | Result]);
|
||||
TransName([], Result) ->
|
||||
lists:reverse(Result)
|
||||
end,
|
||||
|
||||
NewLibDeps = [{LibName, {git, UrlPrefix ++ atom_to_list(LibName), {branch, Branch}}}
|
||||
|| {LibName, Branch} <- LibDeps],
|
||||
NewProjDeps = [{ProjName, {git, UrlPrefix ++ RealName(atom_to_list(ProjName), []), {branch, Branch}}} || {ProjName, Branch} <- ProjDeps],
|
||||
|
||||
NewDeps = Deps ++ NewLibDeps ++ NewProjDeps,
|
||||
CONFIG2 = lists:keystore(deps, 1, CONFIG1, {deps, NewDeps}),
|
||||
CONFIG2.
|
||||
CONFIG1.
|
||||
|
|
|
@ -28,14 +28,12 @@
|
|||
-spec(authenticate(emqx_types:credentials())
|
||||
-> {ok, emqx_types:credentials()} | {error, term()}).
|
||||
authenticate(Credentials) ->
|
||||
detect_anonymous_permission(Credentials, fun() ->
|
||||
case emqx_hooks:run_fold('client.authenticate', [], init_auth_result(Credentials)) of
|
||||
#{auth_result := success} = NewCredentials ->
|
||||
{ok, NewCredentials};
|
||||
NewCredentials ->
|
||||
{error, maps:get(auth_result, NewCredentials, unknown_error)}
|
||||
end
|
||||
end).
|
||||
case emqx_hooks:run_fold('client.authenticate', [], init_auth_result(Credentials)) of
|
||||
#{auth_result := success} = NewCredentials ->
|
||||
{ok, NewCredentials};
|
||||
NewCredentials ->
|
||||
{error, maps:get(auth_result, NewCredentials, unknown_error)}
|
||||
end.
|
||||
|
||||
%% @doc Check ACL
|
||||
-spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_types:topic()) -> allow | deny).
|
||||
|
@ -68,22 +66,8 @@ reload_acl() ->
|
|||
emqx_mod_acl_internal:reload_acl().
|
||||
|
||||
init_auth_result(Credentials) ->
|
||||
case anonymous_permission(Credentials) of
|
||||
true -> Credentials#{auth_result => success};
|
||||
false -> Credentials#{auth_result => not_authorized}
|
||||
case emqx_zone:get_env(maps:get(zone, Credentials, undefined), allow_anonymous, false) of
|
||||
true -> Credentials#{auth_result => success, anonymous => true};
|
||||
false -> Credentials#{auth_result => not_authorized, anonymous => false}
|
||||
end.
|
||||
|
||||
detect_anonymous_permission(#{username := undefined,
|
||||
password := undefined} = Credentials, Fun) ->
|
||||
case anonymous_permission(Credentials) of
|
||||
true -> {ok, Credentials};
|
||||
false -> Fun()
|
||||
end;
|
||||
|
||||
detect_anonymous_permission(_Credentials, Fun) ->
|
||||
Fun().
|
||||
|
||||
anonymous_permission(Credentials) ->
|
||||
emqx_zone:get_env(maps:get(zone, Credentials, undefined),
|
||||
allow_anonymous, false).
|
||||
|
||||
|
|
|
@ -27,12 +27,6 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
start(_Type, _Args) ->
|
||||
%% We'd like to configure the primary logger level here, rather than set the
|
||||
%% kernel config `logger_level` before starting the erlang vm.
|
||||
%% This is because the latter approach an annoying debug msg will be printed out:
|
||||
%% "[debug] got_unexpected_message {'EXIT',<0.1198.0>,normal}"
|
||||
logger:set_primary_config(level, application:get_env(kernel, primary_log_level, error)),
|
||||
|
||||
print_banner(),
|
||||
ekka:start(),
|
||||
{ok, Sup} = emqx_sup:start_link(),
|
||||
|
|
|
@ -45,6 +45,7 @@
|
|||
]).
|
||||
|
||||
-record(state, {
|
||||
zone,
|
||||
transport,
|
||||
socket,
|
||||
peername,
|
||||
|
@ -55,12 +56,10 @@
|
|||
parse_state,
|
||||
gc_state,
|
||||
keepalive,
|
||||
enable_stats,
|
||||
stats_timer,
|
||||
rate_limit,
|
||||
pub_limit,
|
||||
limit_timer,
|
||||
idle_timeout
|
||||
limit_timer
|
||||
}).
|
||||
|
||||
-define(ACTIVE_N, 100).
|
||||
|
@ -152,7 +151,6 @@ init({Transport, RawSocket, Options}) ->
|
|||
RateLimit = init_limiter(proplists:get_value(rate_limit, Options)),
|
||||
PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)),
|
||||
ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
|
||||
EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
|
||||
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
|
||||
SendFun = fun(Packet, SeriaOpts) ->
|
||||
Data = emqx_frame:serialize(Packet, SeriaOpts),
|
||||
|
@ -171,7 +169,8 @@ init({Transport, RawSocket, Options}) ->
|
|||
ParseState = emqx_protocol:parser(ProtoState),
|
||||
GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
|
||||
GcState = emqx_gc:init(GcPolicy),
|
||||
State = #state{transport = Transport,
|
||||
State = #state{zone = Zone,
|
||||
transport = Transport,
|
||||
socket = Socket,
|
||||
peername = Peername,
|
||||
conn_state = running,
|
||||
|
@ -180,9 +179,7 @@ init({Transport, RawSocket, Options}) ->
|
|||
pub_limit = PubLimit,
|
||||
proto_state = ProtoState,
|
||||
parse_state = ParseState,
|
||||
gc_state = GcState,
|
||||
enable_stats = EnableStats,
|
||||
idle_timeout = IdleTimout},
|
||||
gc_state = GcState},
|
||||
ok = emqx_misc:init_proc_mng_policy(Zone),
|
||||
gen_statem:enter_loop(?MODULE, [{hibernate_after, 2 * IdleTimout}],
|
||||
idle, State, self(), [IdleTimout]).
|
||||
|
@ -317,9 +314,15 @@ handle(info, {tcp_passive, _Sock}, State) ->
|
|||
ok = activate_socket(NState),
|
||||
{keep_state, NState};
|
||||
|
||||
handle(info, {ssl_passive, _Sock}, State) ->
|
||||
%% Rate limit here:)
|
||||
NState = ensure_rate_limit(State),
|
||||
ok = activate_socket(NState),
|
||||
{keep_state, NState};
|
||||
|
||||
handle(info, activate_socket, State) ->
|
||||
%% Rate limit timer expired.
|
||||
ok = activate_socket(State),
|
||||
ok = activate_socket(State#state{conn_state = running}),
|
||||
{keep_state, State#state{conn_state = running, limit_timer = undefined}};
|
||||
|
||||
handle(info, {inet_reply, _Sock, ok}, State) ->
|
||||
|
@ -442,6 +445,7 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) ->
|
|||
{0, Rl1} ->
|
||||
ensure_rate_limit(Limiters, setelement(Pos, State, Rl1));
|
||||
{Pause, Rl1} ->
|
||||
?LOG(debug, "[Connection] Rate limit pause connection ~pms", [Pause]),
|
||||
TRef = erlang:send_after(Pause, self(), activate_socket),
|
||||
setelement(Pos, State#state{conn_state = blocked, limit_timer = TRef}, Rl1)
|
||||
end.
|
||||
|
@ -453,11 +457,7 @@ activate_socket(#state{conn_state = blocked}) ->
|
|||
ok;
|
||||
|
||||
activate_socket(#state{transport = Transport, socket = Socket, active_n = N}) ->
|
||||
TrueOrN = case Transport:is_ssl(Socket) of
|
||||
true -> true; %% Cannot set '{active, N}' for SSL:(
|
||||
false -> N
|
||||
end,
|
||||
case Transport:setopts(Socket, [{active, TrueOrN}]) of
|
||||
case Transport:setopts(Socket, [{active, N}]) of
|
||||
ok -> ok;
|
||||
{error, Reason} ->
|
||||
self() ! {shutdown, Reason},
|
||||
|
@ -467,11 +467,14 @@ activate_socket(#state{transport = Transport, socket = Socket, active_n = N}) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
%% Ensure stats timer
|
||||
|
||||
ensure_stats_timer(State = #state{enable_stats = true,
|
||||
stats_timer = undefined,
|
||||
idle_timeout = IdleTimeout}) ->
|
||||
State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
|
||||
|
||||
ensure_stats_timer(State = #state{zone = Zone, stats_timer = undefined}) ->
|
||||
case emqx_zone:get_env(Zone, enable_stats, true) of
|
||||
true ->
|
||||
IdleTimeout = emqx_zone:get_env(Zone, idle_timeout, 30000),
|
||||
State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
|
||||
false ->
|
||||
State
|
||||
end;
|
||||
ensure_stats_timer(State) -> State.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
|
|
@ -108,13 +108,20 @@ ensure_file(File) ->
|
|||
|
||||
with_loaded_file(File, SuccFun) ->
|
||||
case read_loaded(File) of
|
||||
{ok, Names} ->
|
||||
{ok, Names0} ->
|
||||
Names = filter_plugins(Names0),
|
||||
SuccFun(Names);
|
||||
{error, Error} ->
|
||||
?LOG(alert, "[Plugins] Failed to read: ~p, error: ~p", [File, Error]),
|
||||
{error, Error}
|
||||
end.
|
||||
|
||||
filter_plugins(Names) ->
|
||||
lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1};
|
||||
({Name1, true}) -> {true, Name1};
|
||||
({_Name1, false}) -> false
|
||||
end, Names).
|
||||
|
||||
load_plugins(Names, Persistent) ->
|
||||
Plugins = list(), NotFound = Names -- names(Plugins),
|
||||
case NotFound of
|
||||
|
@ -264,7 +271,7 @@ plugin_loaded(Name, true) ->
|
|||
case lists:member(Name, Names) of
|
||||
false ->
|
||||
%% write file if plugin is loaded
|
||||
write_loaded(lists:append(Names, [Name]));
|
||||
write_loaded(lists:append(Names, [{Name, true}]));
|
||||
true ->
|
||||
ignore
|
||||
end;
|
||||
|
@ -277,10 +284,7 @@ plugin_unloaded(_Name, false) ->
|
|||
plugin_unloaded(Name, true) ->
|
||||
case read_loaded() of
|
||||
{ok, Names0} ->
|
||||
Names = lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1};
|
||||
({Name1, true}) -> {true, Name1};
|
||||
({_Name1, false}) -> false
|
||||
end, Names0),
|
||||
Names = filter_plugins(Names0),
|
||||
case lists:member(Name, Names) of
|
||||
true ->
|
||||
write_loaded(lists:delete(Name, Names));
|
||||
|
@ -304,7 +308,7 @@ write_loaded(AppNames) ->
|
|||
case file:open(File, [binary, write]) of
|
||||
{ok, Fd} ->
|
||||
lists:foreach(fun(Name) ->
|
||||
file:write(Fd, iolist_to_binary(io_lib:format("~s.~n", [Name])))
|
||||
file:write(Fd, iolist_to_binary(io_lib:format("~p.~n", [Name])))
|
||||
end, AppNames);
|
||||
{error, Error} ->
|
||||
?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]),
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
-record(pstate, {
|
||||
zone,
|
||||
sendfun,
|
||||
sockname,
|
||||
peername,
|
||||
peercert,
|
||||
proto_ver,
|
||||
|
@ -53,20 +54,14 @@
|
|||
session,
|
||||
clean_start,
|
||||
topic_aliases,
|
||||
packet_size,
|
||||
will_topic,
|
||||
will_msg,
|
||||
keepalive,
|
||||
is_bridge,
|
||||
enable_ban,
|
||||
enable_acl,
|
||||
enable_flapping_detect,
|
||||
acl_deny_action,
|
||||
recv_stats,
|
||||
send_stats,
|
||||
connected,
|
||||
connected_at,
|
||||
ignore_loop,
|
||||
topic_alias_maximum,
|
||||
conn_mod,
|
||||
credentials,
|
||||
|
@ -87,12 +82,14 @@
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
-spec(init(map(), list()) -> state()).
|
||||
init(SocketOpts = #{ peername := Peername
|
||||
init(SocketOpts = #{ sockname := Sockname
|
||||
, peername := Peername
|
||||
, peercert := Peercert
|
||||
, sendfun := SendFun}, Options) ->
|
||||
Zone = proplists:get_value(zone, Options),
|
||||
#pstate{zone = Zone,
|
||||
sendfun = SendFun,
|
||||
sockname = Sockname,
|
||||
peername = Peername,
|
||||
peercert = Peercert,
|
||||
proto_ver = ?MQTT_PROTO_V4,
|
||||
|
@ -103,16 +100,10 @@ init(SocketOpts = #{ peername := Peername
|
|||
username = init_username(Peercert, Options),
|
||||
clean_start = false,
|
||||
topic_aliases = #{},
|
||||
packet_size = emqx_zone:get_env(Zone, max_packet_size),
|
||||
is_bridge = false,
|
||||
enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
|
||||
enable_acl = emqx_zone:get_env(Zone, enable_acl),
|
||||
enable_flapping_detect = emqx_zone:get_env(Zone, enable_flapping_detect, false),
|
||||
acl_deny_action = emqx_zone:get_env(Zone, acl_deny_action, ignore),
|
||||
recv_stats = #{msg => 0, pkt => 0},
|
||||
send_stats = #{msg => 0, pkt => 0},
|
||||
connected = false,
|
||||
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
|
||||
topic_alias_maximum = #{to_client => 0, from_client => 0},
|
||||
conn_mod = maps:get(conn_mod, SocketOpts, undefined),
|
||||
credentials = #{},
|
||||
|
@ -135,18 +126,18 @@ set_username(_Username, PState) ->
|
|||
%% API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
info(PState = #pstate{conn_props = ConnProps,
|
||||
info(PState = #pstate{zone = Zone,
|
||||
conn_props = ConnProps,
|
||||
ack_props = AckProps,
|
||||
session = Session,
|
||||
topic_aliases = Aliases,
|
||||
will_msg = WillMsg,
|
||||
enable_acl = EnableAcl}) ->
|
||||
will_msg = WillMsg}) ->
|
||||
maps:merge(attrs(PState), #{conn_props => ConnProps,
|
||||
ack_props => AckProps,
|
||||
session => Session,
|
||||
topic_aliases => Aliases,
|
||||
will_msg => WillMsg,
|
||||
enable_acl => EnableAcl
|
||||
enable_acl => emqx_zone:get_env(Zone, enable_acl, false)
|
||||
}).
|
||||
|
||||
attrs(#pstate{zone = Zone,
|
||||
|
@ -208,11 +199,13 @@ client_id(#pstate{client_id = ClientId}) ->
|
|||
credentials(#pstate{zone = Zone,
|
||||
client_id = ClientId,
|
||||
username = Username,
|
||||
sockname = Sockname,
|
||||
peername = Peername,
|
||||
peercert = Peercert,
|
||||
ws_cookie = WsCookie}) ->
|
||||
with_cert(#{zone => Zone,
|
||||
client_id => ClientId,
|
||||
sockname => Sockname,
|
||||
username => Username,
|
||||
peername => Peername,
|
||||
ws_cookie => WsCookie,
|
||||
|
@ -239,7 +232,8 @@ stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg},
|
|||
session(#pstate{session = SPid}) ->
|
||||
SPid.
|
||||
|
||||
parser(#pstate{packet_size = Size, proto_ver = Ver}) ->
|
||||
parser(#pstate{zone = Zone, proto_ver = Ver}) ->
|
||||
Size = emqx_zone:get_env(Zone, max_packet_size),
|
||||
emqx_frame:initial_state(#{max_packet_size => Size, version => Ver}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -376,21 +370,22 @@ process(?CONNECT_PACKET(
|
|||
username = Username,
|
||||
password = Password} = ConnPkt), PState) ->
|
||||
|
||||
NewClientId = maybe_use_username_as_clientid(ClientId, Username, PState),
|
||||
%% TODO: Mountpoint...
|
||||
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
|
||||
PState0 = maybe_use_username_as_clientid(ClientId,
|
||||
set_username(Username,
|
||||
PState#pstate{proto_ver = ProtoVer,
|
||||
proto_name = ProtoName,
|
||||
clean_start = CleanStart,
|
||||
keepalive = Keepalive,
|
||||
conn_props = ConnProps,
|
||||
is_bridge = IsBridge,
|
||||
connected_at = os:timestamp()})),
|
||||
|
||||
NewClientId = PState0#pstate.client_id,
|
||||
|
||||
emqx_logger:set_metadata_client_id(NewClientId),
|
||||
|
||||
%% TODO: Mountpoint...
|
||||
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
|
||||
PState0 = set_username(Username,
|
||||
PState#pstate{client_id = NewClientId,
|
||||
proto_ver = ProtoVer,
|
||||
proto_name = ProtoName,
|
||||
clean_start = CleanStart,
|
||||
keepalive = Keepalive,
|
||||
conn_props = ConnProps,
|
||||
is_bridge = IsBridge,
|
||||
connected_at = os:timestamp()}),
|
||||
Credentials = credentials(PState0),
|
||||
PState1 = PState0#pstate{credentials = Credentials},
|
||||
connack(
|
||||
|
@ -424,31 +419,37 @@ process(?CONNECT_PACKET(
|
|||
{ReasonCode, PState1}
|
||||
end);
|
||||
|
||||
process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
|
||||
process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
|
||||
PState = #pstate{zone = Zone, proto_ver = ProtoVer}) ->
|
||||
case check_publish(Packet, PState) of
|
||||
ok ->
|
||||
do_publish(Packet, PState);
|
||||
{error, ReasonCode} ->
|
||||
?LOG(warning, "[Protocol] Cannot publish qos0 message to ~s for ~s",
|
||||
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
||||
do_acl_deny_action(Packet, ReasonCode, PState)
|
||||
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
|
||||
ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState},
|
||||
do_acl_deny_action(AclDenyAction, Packet, ReasonCode, ErrorTerm)
|
||||
end;
|
||||
|
||||
process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) ->
|
||||
process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
|
||||
PState = #pstate{zone = Zone, proto_ver = ProtoVer}) ->
|
||||
case check_publish(Packet, PState) of
|
||||
ok ->
|
||||
do_publish(Packet, PState);
|
||||
{error, ReasonCode} ->
|
||||
?LOG(warning, "[Protocol] Cannot publish qos1 message to ~s for ~s",
|
||||
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
||||
?LOG(warning, "[Protocol] Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]),
|
||||
case deliver({puback, PacketId, ReasonCode}, PState) of
|
||||
{ok, PState1} ->
|
||||
do_acl_deny_action(Packet, ReasonCode, PState1);
|
||||
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
|
||||
ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState1},
|
||||
do_acl_deny_action(AclDenyAction, Packet, ReasonCode, ErrorTerm);
|
||||
Error -> Error
|
||||
end
|
||||
end;
|
||||
|
||||
process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) ->
|
||||
process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload),
|
||||
PState = #pstate{zone = Zone, proto_ver = ProtoVer}) ->
|
||||
case check_publish(Packet, PState) of
|
||||
ok ->
|
||||
do_publish(Packet, PState);
|
||||
|
@ -457,7 +458,9 @@ process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) ->
|
|||
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
||||
case deliver({pubrec, PacketId, ReasonCode}, PState) of
|
||||
{ok, PState1} ->
|
||||
do_acl_deny_action(Packet, ReasonCode, PState1);
|
||||
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
|
||||
ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState1},
|
||||
do_acl_deny_action(AclDenyAction, Packet, ReasonCode, ErrorTerm);
|
||||
Error -> Error
|
||||
end
|
||||
end;
|
||||
|
@ -485,7 +488,7 @@ process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid})
|
|||
{ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
|
||||
|
||||
process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
|
||||
PState = #pstate{session = SPid, credentials = Credentials}) ->
|
||||
PState = #pstate{zone = Zone, proto_ver = ProtoVer, session = SPid, credentials = Credentials}) ->
|
||||
case check_subscribe(parse_topic_filters(?SUBSCRIBE, raw_topic_filters(PState, RawTopicFilters)), PState) of
|
||||
{ok, TopicFilters} ->
|
||||
TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters),
|
||||
|
@ -503,7 +506,9 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
|
|||
[SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]),
|
||||
case deliver({suback, PacketId, ReasonCodes}, PState) of
|
||||
{ok, PState1} ->
|
||||
do_acl_deny_action(Packet, ReasonCodes, PState1);
|
||||
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
|
||||
ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState1},
|
||||
do_acl_deny_action(AclDenyAction, Packet, ReasonCodes, ErrorTerm);
|
||||
Error ->
|
||||
Error
|
||||
end
|
||||
|
@ -689,6 +694,9 @@ deliver({disconnect, _ReasonCode}, PState) ->
|
|||
-spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}).
|
||||
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send}) ->
|
||||
case Send(Packet, #{version => Ver}) of
|
||||
ok ->
|
||||
trace(send, Packet),
|
||||
{ok, PState};
|
||||
{ok, Data} ->
|
||||
trace(send, Packet),
|
||||
emqx_metrics:sent(Packet),
|
||||
|
@ -701,12 +709,12 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send})
|
|||
%%------------------------------------------------------------------------------
|
||||
%% Maybe use username replace client id
|
||||
|
||||
maybe_use_username_as_clientid(ClientId, undefined, _PState) ->
|
||||
ClientId;
|
||||
maybe_use_username_as_clientid(ClientId, Username, #pstate{zone = Zone}) ->
|
||||
maybe_use_username_as_clientid(ClientId, PState = #pstate{username = undefined}) ->
|
||||
PState#pstate{client_id = ClientId};
|
||||
maybe_use_username_as_clientid(ClientId, PState = #pstate{username = Username, zone = Zone}) ->
|
||||
case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
|
||||
true -> Username;
|
||||
false -> ClientId
|
||||
true -> PState#pstate{client_id = Username};
|
||||
false -> PState#pstate{client_id = ClientId}
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -804,16 +812,13 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}
|
|||
check_flapping(#mqtt_packet_connect{}, PState) ->
|
||||
do_flapping_detect(connect, PState).
|
||||
|
||||
check_banned(_ConnPkt, #pstate{enable_ban = false}) ->
|
||||
ok;
|
||||
check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username},
|
||||
#pstate{peername = Peername}) ->
|
||||
case emqx_banned:check(#{client_id => ClientId,
|
||||
username => Username,
|
||||
peername => Peername}) of
|
||||
true -> {error, ?RC_BANNED};
|
||||
false -> ok
|
||||
end.
|
||||
#pstate{zone = Zone, peername = Peername}) ->
|
||||
Credentials = #{client_id => ClientId,
|
||||
username => Username,
|
||||
peername => Peername},
|
||||
EnableBan = emqx_zone:get_env(Zone, enable_ban, false),
|
||||
do_check_banned(EnableBan, Credentials).
|
||||
|
||||
check_will_topic(#mqtt_packet_connect{will_flag = false}, _PState) ->
|
||||
ok;
|
||||
|
@ -824,14 +829,14 @@ check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState)
|
|||
{error, ?RC_TOPIC_NAME_INVALID}
|
||||
end.
|
||||
|
||||
check_will_acl(_ConnPkt, #pstate{enable_acl = EnableAcl}) when not EnableAcl ->
|
||||
ok;
|
||||
check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, #pstate{credentials = Credentials}) ->
|
||||
case emqx_access_control:check_acl(Credentials, publish, WillTopic) of
|
||||
allow -> ok;
|
||||
deny ->
|
||||
check_will_acl(#mqtt_packet_connect{will_topic = WillTopic},
|
||||
#pstate{zone = Zone, credentials = Credentials}) ->
|
||||
EnableAcl = emqx_zone:get_env(Zone, enable_acl, false),
|
||||
case do_acl_check(EnableAcl, publish, Credentials, WillTopic) of
|
||||
ok -> ok;
|
||||
Other ->
|
||||
?LOG(warning, "[Protocol] Cannot publish will message to ~p for acl denied", [WillTopic]),
|
||||
{error, ?RC_NOT_AUTHORIZED}
|
||||
Other
|
||||
end.
|
||||
|
||||
check_publish(Packet, PState) ->
|
||||
|
@ -843,14 +848,13 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret
|
|||
#pstate{zone = Zone}) ->
|
||||
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
|
||||
|
||||
check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
|
||||
when IsSuper orelse (not EnableAcl) ->
|
||||
check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}})
|
||||
when IsSuper ->
|
||||
ok;
|
||||
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, #pstate{credentials = Credentials}) ->
|
||||
case emqx_access_control:check_acl(Credentials, publish, Topic) of
|
||||
allow -> ok;
|
||||
deny -> {error, ?RC_NOT_AUTHORIZED}
|
||||
end.
|
||||
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
|
||||
#pstate{zone = Zone, credentials = Credentials}) ->
|
||||
EnableAcl = emqx_zone:get_env(Zone, enable_acl, false),
|
||||
do_acl_check(EnableAcl, publish, Credentials, Topic).
|
||||
|
||||
run_check_steps([], _Packet, _PState) ->
|
||||
ok;
|
||||
|
@ -870,17 +874,18 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) ->
|
|||
{error, TopicFilter1}
|
||||
end.
|
||||
|
||||
check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
|
||||
when IsSuper orelse (not EnableAcl) ->
|
||||
check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}})
|
||||
when IsSuper ->
|
||||
{ok, TopicFilters};
|
||||
check_sub_acl(TopicFilters, #pstate{credentials = Credentials}) ->
|
||||
check_sub_acl(TopicFilters, #pstate{zone = Zone, credentials = Credentials}) ->
|
||||
EnableAcl = emqx_zone:get_env(Zone, enable_acl, false),
|
||||
lists:foldr(
|
||||
fun({Topic, SubOpts}, {Ok, Acc}) ->
|
||||
case emqx_access_control:check_acl(Credentials, subscribe, Topic) of
|
||||
allow -> {Ok, [{Topic, SubOpts}|Acc]};
|
||||
deny ->
|
||||
{error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}
|
||||
end
|
||||
fun({Topic, SubOpts}, {Ok, Acc}) when EnableAcl ->
|
||||
AllowTerm = {Ok, [{Topic, SubOpts}|Acc]},
|
||||
DenyTerm = {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]},
|
||||
do_acl_check(subscribe, Credentials, Topic, AllowTerm, DenyTerm);
|
||||
(TopicFilter, Acc) ->
|
||||
{ok, [TopicFilter | Acc]}
|
||||
end, {ok, []}, TopicFilters).
|
||||
|
||||
trace(recv, Packet) ->
|
||||
|
@ -941,52 +946,45 @@ flag(true) -> 1.
|
|||
%% Execute actions in case acl deny
|
||||
|
||||
do_flapping_detect(Action, #pstate{zone = Zone,
|
||||
client_id = ClientId,
|
||||
enable_flapping_detect = true}) ->
|
||||
BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000),
|
||||
Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20),
|
||||
Until = erlang:system_time(second) + BanExpiryInterval,
|
||||
case emqx_flapping:check(Action, ClientId, Threshold) of
|
||||
flapping ->
|
||||
emqx_banned:add(#banned{who = {client_id, ClientId},
|
||||
reason = <<"flapping">>,
|
||||
by = <<"flapping_checker">>,
|
||||
until = Until}),
|
||||
ok;
|
||||
_Other ->
|
||||
ok
|
||||
end;
|
||||
do_flapping_detect(_Action, _PState) ->
|
||||
ok.
|
||||
client_id = ClientId}) ->
|
||||
ok = case emqx_zone:get_env(Zone, enable_flapping_detect, false) of
|
||||
true ->
|
||||
Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20),
|
||||
case emqx_flapping:check(Action, ClientId, Threshold) of
|
||||
flapping ->
|
||||
BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000),
|
||||
Until = erlang:system_time(second) + BanExpiryInterval,
|
||||
emqx_banned:add(#banned{who = {client_id, ClientId},
|
||||
reason = <<"flapping">>,
|
||||
by = <<"flapping_checker">>,
|
||||
until = Until}),
|
||||
ok;
|
||||
_Other ->
|
||||
ok
|
||||
end;
|
||||
_EnableFlappingDetect -> ok
|
||||
end.
|
||||
|
||||
do_acl_deny_action(?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
|
||||
?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
|
||||
acl_deny_action = disconnect}) ->
|
||||
{error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
|
||||
do_acl_deny_action(disconnect, ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
|
||||
?RC_NOT_AUTHORIZED, ErrorTerm) ->
|
||||
ErrorTerm;
|
||||
|
||||
do_acl_deny_action(?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, _Payload),
|
||||
?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
|
||||
acl_deny_action = disconnect}) ->
|
||||
do_acl_deny_action(disconnect, ?PUBLISH_PACKET(QoS, _Topic, _PacketId, _Payload),
|
||||
?RC_NOT_AUTHORIZED, ErrorTerm = {_Error, _CodeName, PState})
|
||||
when QoS =:= ?QOS_1; QoS =:= ?QOS_2 ->
|
||||
deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
|
||||
{error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
|
||||
ErrorTerm;
|
||||
|
||||
do_acl_deny_action(?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, _Payload),
|
||||
?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
|
||||
acl_deny_action = disconnect}) ->
|
||||
deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
|
||||
{error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
|
||||
|
||||
do_acl_deny_action(?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters),
|
||||
ReasonCodes, PState = #pstate{proto_ver = ProtoVer,
|
||||
acl_deny_action = disconnect}) ->
|
||||
do_acl_deny_action(disconnect, ?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters),
|
||||
ReasonCodes, ErrorTerm = {_Error, _CodeName, PState}) ->
|
||||
case lists:member(?RC_NOT_AUTHORIZED, ReasonCodes) of
|
||||
true ->
|
||||
deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
|
||||
{error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
|
||||
ErrorTerm;
|
||||
false ->
|
||||
{ok, PState}
|
||||
end;
|
||||
do_acl_deny_action(_PubSupPacket, _ReasonCode, PState) ->
|
||||
do_acl_deny_action(_OtherAction, _PubSupPacket, _ReasonCode, {_Error, _CodeName, PState}) ->
|
||||
{ok, PState}.
|
||||
|
||||
%% Reason code compat
|
||||
|
@ -997,9 +995,8 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) ->
|
|||
reason_codes_compat(PktType, ReasonCodes, _ProtoVer) ->
|
||||
[emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes].
|
||||
|
||||
raw_topic_filters(#pstate{proto_ver = ProtoVer,
|
||||
is_bridge = IsBridge,
|
||||
ignore_loop = IgnoreLoop}, RawTopicFilters) ->
|
||||
raw_topic_filters(#pstate{zone = Zone, proto_ver = ProtoVer, is_bridge = IsBridge}, RawTopicFilters) ->
|
||||
IgnoreLoop = emqx_zone:get_env(Zone, ignore_loop_deliver, false),
|
||||
case ProtoVer < ?MQTT_PROTO_V5 of
|
||||
true ->
|
||||
IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end,
|
||||
|
@ -1013,3 +1010,23 @@ raw_topic_filters(#pstate{proto_ver = ProtoVer,
|
|||
|
||||
mountpoint(Credentials) ->
|
||||
maps:get(mountpoint, Credentials, undefined).
|
||||
|
||||
do_check_banned(_EnableBan = true, Credentials) ->
|
||||
case emqx_banned:check(Credentials) of
|
||||
true -> {error, ?RC_BANNED};
|
||||
false -> ok
|
||||
end;
|
||||
do_check_banned(_EnableBan, Credentials) -> ok.
|
||||
|
||||
do_acl_check(_EnableAcl = true, Action, Credentials, Topic) ->
|
||||
AllowTerm = ok,
|
||||
DenyTerm = {error, ?RC_NOT_AUTHORIZED},
|
||||
do_acl_check(Action, Credentials, Topic, AllowTerm, DenyTerm);
|
||||
do_acl_check(_EnableAcl, _Action, _Credentials, _Topic) ->
|
||||
ok.
|
||||
|
||||
do_acl_check(Action, Credentials, Topic, AllowTerm, DenyTerm) ->
|
||||
case emqx_access_control:check_acl(Credentials, Action, Topic) of
|
||||
allow -> AllowTerm;
|
||||
deny -> DenyTerm
|
||||
end.
|
||||
|
|
|
@ -84,6 +84,9 @@
|
|||
-import(emqx_zone, [get_env/2, get_env/3]).
|
||||
|
||||
-record(state, {
|
||||
%% zone
|
||||
zone :: atom(),
|
||||
|
||||
%% Idle timeout
|
||||
idle_timeout :: pos_integer(),
|
||||
|
||||
|
@ -111,24 +114,15 @@
|
|||
%% Next packet id of the session
|
||||
next_pkt_id = 1 :: emqx_mqtt_types:packet_id(),
|
||||
|
||||
%% Max subscriptions
|
||||
max_subscriptions :: non_neg_integer(),
|
||||
|
||||
%% Client’s Subscriptions.
|
||||
subscriptions :: map(),
|
||||
|
||||
%% Upgrade QoS?
|
||||
upgrade_qos = false :: boolean(),
|
||||
|
||||
%% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked.
|
||||
inflight :: emqx_inflight:inflight(),
|
||||
|
||||
%% Max Inflight Size. DEPRECATED: Get from inflight
|
||||
%% max_inflight = 32 :: non_neg_integer(),
|
||||
|
||||
%% Retry interval for redelivering QoS1/2 messages
|
||||
retry_interval = 20000 :: timeout(),
|
||||
|
||||
%% Retry Timer
|
||||
retry_timer :: maybe(reference()),
|
||||
|
||||
|
@ -141,12 +135,6 @@
|
|||
%% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel.
|
||||
awaiting_rel :: map(),
|
||||
|
||||
%% Max Packets Awaiting PUBREL
|
||||
max_awaiting_rel = 100 :: non_neg_integer(),
|
||||
|
||||
%% Awaiting PUBREL Timeout
|
||||
await_rel_timeout = 20000 :: timeout(),
|
||||
|
||||
%% Awaiting PUBREL Timer
|
||||
await_rel_timer :: maybe(reference()),
|
||||
|
||||
|
@ -156,9 +144,6 @@
|
|||
%% Expired Timer
|
||||
expiry_timer :: maybe(reference()),
|
||||
|
||||
%% Enable Stats
|
||||
enable_stats :: boolean(),
|
||||
|
||||
%% Stats timer
|
||||
stats_timer :: maybe(reference()),
|
||||
|
||||
|
@ -191,28 +176,24 @@ start_link(SessAttrs) ->
|
|||
info(SPid) when is_pid(SPid) ->
|
||||
gen_server:call(SPid, info, infinity);
|
||||
|
||||
info(State = #state{conn_pid = ConnPid,
|
||||
info(State = #state{zone = Zone,
|
||||
conn_pid = ConnPid,
|
||||
next_pkt_id = PktId,
|
||||
max_subscriptions = MaxSubscriptions,
|
||||
subscriptions = Subscriptions,
|
||||
upgrade_qos = UpgradeQoS,
|
||||
inflight = Inflight,
|
||||
retry_interval = RetryInterval,
|
||||
mqueue = MQueue,
|
||||
awaiting_rel = AwaitingRel,
|
||||
max_awaiting_rel = MaxAwaitingRel,
|
||||
await_rel_timeout = AwaitRelTimeout}) ->
|
||||
awaiting_rel = AwaitingRel}) ->
|
||||
attrs(State) ++ [{conn_pid, ConnPid},
|
||||
{next_pkt_id, PktId},
|
||||
{max_subscriptions, MaxSubscriptions},
|
||||
{max_subscriptions, get_env(Zone, max_subscriptions, 0)},
|
||||
{subscriptions, Subscriptions},
|
||||
{upgrade_qos, UpgradeQoS},
|
||||
{upgrade_qos, get_env(Zone, upgrade_qos, false)},
|
||||
{inflight, Inflight},
|
||||
{retry_interval, RetryInterval},
|
||||
{retry_interval, get_env(Zone, retry_interval, 0)},
|
||||
{mqueue_len, emqx_mqueue:len(MQueue)},
|
||||
{awaiting_rel, AwaitingRel},
|
||||
{max_awaiting_rel, MaxAwaitingRel},
|
||||
{await_rel_timeout, AwaitRelTimeout}].
|
||||
{max_awaiting_rel, get_env(Zone, max_awaiting_rel)},
|
||||
{await_rel_timeout, get_env(Zone, await_rel_timeout)}].
|
||||
|
||||
%% @doc Get session attrs
|
||||
-spec(attrs(spid() | #state{}) -> list({atom(), term()})).
|
||||
|
@ -236,21 +217,20 @@ attrs(#state{clean_start = CleanStart,
|
|||
stats(SPid) when is_pid(SPid) ->
|
||||
gen_server:call(SPid, stats, infinity);
|
||||
|
||||
stats(#state{max_subscriptions = MaxSubscriptions,
|
||||
stats(#state{zone = Zone,
|
||||
subscriptions = Subscriptions,
|
||||
inflight = Inflight,
|
||||
mqueue = MQueue,
|
||||
max_awaiting_rel = MaxAwaitingRel,
|
||||
awaiting_rel = AwaitingRel}) ->
|
||||
lists:append(emqx_misc:proc_stats(),
|
||||
[{max_subscriptions, MaxSubscriptions},
|
||||
[{max_subscriptions, get_env(Zone, max_subscriptions, 0)},
|
||||
{subscriptions_count, maps:size(Subscriptions)},
|
||||
{max_inflight, emqx_inflight:max_size(Inflight)},
|
||||
{inflight_len, emqx_inflight:size(Inflight)},
|
||||
{max_mqueue, emqx_mqueue:max_len(MQueue)},
|
||||
{mqueue_len, emqx_mqueue:len(MQueue)},
|
||||
{mqueue_dropped, emqx_mqueue:dropped(MQueue)},
|
||||
{max_awaiting_rel, MaxAwaitingRel},
|
||||
{max_awaiting_rel, get_env(Zone, max_awaiting_rel)},
|
||||
{awaiting_rel_len, maps:size(AwaitingRel)},
|
||||
{deliver_msg, emqx_pd:get_counter(deliver_stats)},
|
||||
{enqueue_msg, emqx_pd:get_counter(enqueue_stats)}]).
|
||||
|
@ -364,23 +344,18 @@ init([Parent, #{zone := Zone,
|
|||
emqx_logger:set_metadata_client_id(ClientId),
|
||||
GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
|
||||
IdleTimout = get_env(Zone, idle_timeout, 30000),
|
||||
State = #state{idle_timeout = IdleTimout,
|
||||
State = #state{zone = Zone,
|
||||
idle_timeout = IdleTimout,
|
||||
clean_start = CleanStart,
|
||||
deliver_fun = deliver_fun(ConnPid),
|
||||
client_id = ClientId,
|
||||
username = Username,
|
||||
conn_pid = ConnPid,
|
||||
subscriptions = #{},
|
||||
max_subscriptions = get_env(Zone, max_subscriptions, 0),
|
||||
upgrade_qos = get_env(Zone, upgrade_qos, false),
|
||||
inflight = emqx_inflight:new(MaxInflight),
|
||||
mqueue = init_mqueue(Zone),
|
||||
retry_interval = get_env(Zone, retry_interval, 0),
|
||||
awaiting_rel = #{},
|
||||
await_rel_timeout = get_env(Zone, await_rel_timeout),
|
||||
max_awaiting_rel = get_env(Zone, max_awaiting_rel),
|
||||
expiry_interval = ExpiryInterval,
|
||||
enable_stats = get_env(Zone, enable_stats, true),
|
||||
gc_state = emqx_gc:init(GcPolicy),
|
||||
created_at = os:timestamp(),
|
||||
will_msg = WillMsg
|
||||
|
@ -433,9 +408,10 @@ handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_p
|
|||
%% PUBLISH: This is only to register packetId to session state.
|
||||
%% The actual message dispatching should be done by the caller (e.g. connection) process.
|
||||
handle_call({register_publish_packet_id, PacketId, Ts}, _From,
|
||||
State = #state{awaiting_rel = AwaitingRel}) ->
|
||||
State = #state{zone = Zone, awaiting_rel = AwaitingRel}) ->
|
||||
MaxAwaitingRel = get_env(Zone, max_awaiting_rel),
|
||||
reply(
|
||||
case is_awaiting_full(State) of
|
||||
case is_awaiting_full(MaxAwaitingRel, AwaitingRel) of
|
||||
false ->
|
||||
case maps:is_key(PacketId, AwaitingRel) of
|
||||
true ->
|
||||
|
@ -742,7 +718,8 @@ retry_delivery(_Force, [], _Now, State) ->
|
|||
ensure_retry_timer(State);
|
||||
|
||||
retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
|
||||
State = #state{inflight = Inflight, retry_interval = Interval}) ->
|
||||
State = #state{zone = Zone, inflight = Inflight}) ->
|
||||
Interval = get_env(Zone, retry_interval, 0),
|
||||
%% Microseconds -> MilliSeconds
|
||||
Age = timer:now_diff(Now, Ts) div 1000,
|
||||
if
|
||||
|
@ -789,7 +766,8 @@ expire_awaiting_rel([], _Now, State) ->
|
|||
State#state{await_rel_timer = undefined};
|
||||
|
||||
expire_awaiting_rel([{PacketId, Ts} | More], Now,
|
||||
State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
|
||||
State = #state{zone = Zone, awaiting_rel = AwaitingRel}) ->
|
||||
Timeout = get_env(Zone, await_rel_timeout),
|
||||
case (timer:now_diff(Now, Ts) div 1000) of
|
||||
Age when Age >= Timeout ->
|
||||
emqx_metrics:trans(inc, 'messages/qos2/expired'),
|
||||
|
@ -803,11 +781,10 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now,
|
|||
%% Check awaiting rel
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
is_awaiting_full(#state{max_awaiting_rel = 0}) ->
|
||||
is_awaiting_full(_MaxAwaitingRel = 0, _AwaitingRel) ->
|
||||
false;
|
||||
is_awaiting_full(#state{awaiting_rel = AwaitingRel,
|
||||
max_awaiting_rel = MaxLen}) ->
|
||||
maps:size(AwaitingRel) >= MaxLen.
|
||||
is_awaiting_full(MaxAwaitingRel, AwaitingRel) ->
|
||||
maps:size(AwaitingRel) >= MaxAwaitingRel.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Dispatch messages
|
||||
|
@ -900,10 +877,11 @@ process_subopts([{nl, 1}|_Opts], #message{from = ClientId}, #state{client_id = C
|
|||
ignore;
|
||||
process_subopts([{nl, _}|Opts], Msg, State) ->
|
||||
process_subopts(Opts, Msg, State);
|
||||
process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = false}) ->
|
||||
process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State);
|
||||
process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) ->
|
||||
process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State);
|
||||
process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #state{zone = Zone}) ->
|
||||
case get_env(Zone, upgrade_qos, false) of
|
||||
true -> process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State);
|
||||
false -> process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State)
|
||||
end;
|
||||
process_subopts([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, State = #state{}) ->
|
||||
process_subopts(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, State);
|
||||
process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags}, State = #state{}) ->
|
||||
|
@ -1053,8 +1031,9 @@ drain_q(Cnt, Msgs, Q) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
%% Ensure timers
|
||||
|
||||
ensure_await_rel_timer(State = #state{await_rel_timer = undefined,
|
||||
await_rel_timeout = Timeout}) ->
|
||||
ensure_await_rel_timer(State = #state{zone = Zone,
|
||||
await_rel_timer = undefined}) ->
|
||||
Timeout = get_env(Zone, await_rel_timeout),
|
||||
ensure_await_rel_timer(Timeout, State);
|
||||
ensure_await_rel_timer(State) ->
|
||||
State.
|
||||
|
@ -1064,8 +1043,8 @@ ensure_await_rel_timer(Timeout, State = #state{await_rel_timer = undefined}) ->
|
|||
ensure_await_rel_timer(_Timeout, State) ->
|
||||
State.
|
||||
|
||||
ensure_retry_timer(State = #state{retry_timer = undefined,
|
||||
retry_interval = Interval}) ->
|
||||
ensure_retry_timer(State = #state{zone = Zone, retry_timer = undefined}) ->
|
||||
Interval = get_env(Zone, retry_interval, 0),
|
||||
ensure_retry_timer(Interval, State);
|
||||
ensure_retry_timer(State) ->
|
||||
State.
|
||||
|
@ -1087,10 +1066,13 @@ ensure_will_delay_timer(State = #state{will_msg = WillMsg}) ->
|
|||
send_willmsg(WillMsg),
|
||||
State#state{will_msg = undefined}.
|
||||
|
||||
ensure_stats_timer(State = #state{enable_stats = true,
|
||||
ensure_stats_timer(State = #state{zone = Zone,
|
||||
stats_timer = undefined,
|
||||
idle_timeout = IdleTimeout}) ->
|
||||
State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
|
||||
case get_env(Zone, enable_stats, true) of
|
||||
true -> State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
|
||||
_Other -> State
|
||||
end;
|
||||
ensure_stats_timer(State) ->
|
||||
State.
|
||||
|
||||
|
|
|
@ -223,7 +223,11 @@ set_session_stats(ClientId, SessPid, Stats) when is_binary(ClientId), is_pid(Ses
|
|||
lookup_session_pids(ClientId) ->
|
||||
case emqx_sm_registry:is_enabled() of
|
||||
true -> emqx_sm_registry:lookup_session(ClientId);
|
||||
false -> emqx_tables:lookup_value(?SESSION_TAB, ClientId, [])
|
||||
false ->
|
||||
case emqx_tables:lookup_value(?SESSION_TAB, ClientId) of
|
||||
undefined -> [];
|
||||
SessPid when is_pid(SessPid) -> [SessPid]
|
||||
end
|
||||
end.
|
||||
|
||||
%% @doc Dispatch a message to the session.
|
||||
|
|
|
@ -80,11 +80,16 @@
|
|||
| banned
|
||||
| bad_authentication_method).
|
||||
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()).
|
||||
-type(credentials() :: #{client_id := client_id(),
|
||||
username := username(),
|
||||
peername := peername(),
|
||||
auth_result := auth_result(),
|
||||
zone => zone(),
|
||||
-type(credentials() :: #{zone := zone(),
|
||||
client_id := client_id(),
|
||||
username := username(),
|
||||
sockname := peername(),
|
||||
peername := peername(),
|
||||
ws_cookie := undefined | list(),
|
||||
mountpoint := binary(),
|
||||
password => binary(),
|
||||
auth_result => auth_result(),
|
||||
anonymous => boolean(),
|
||||
atom() => term()
|
||||
}).
|
||||
-type(subscription() :: #subscription{}).
|
||||
|
|
|
@ -62,7 +62,7 @@ get_env(Zone, Key) ->
|
|||
get_env(undefined, Key, Def) ->
|
||||
emqx_config:get_env(Key, Def);
|
||||
get_env(Zone, Key, Def) ->
|
||||
try ets:lookup_element(?TAB, {Zone, Key}, 2)
|
||||
try persistent_term:get({Zone, Key})
|
||||
catch error:badarg ->
|
||||
emqx_config:get_env(Key, Def)
|
||||
end.
|
||||
|
@ -84,7 +84,6 @@ stop() ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
ok = emqx_tables:new(?TAB, [set, {read_concurrency, true}]),
|
||||
{ok, element(2, handle_info(reload, #{timer => undefined}))}.
|
||||
|
||||
handle_call(force_reload, _From, State) ->
|
||||
|
@ -96,7 +95,7 @@ handle_call(Req, _From, State) ->
|
|||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({set_env, Zone, Key, Val}, State) ->
|
||||
true = ets:insert(?TAB, {{Zone, Key}, Val}),
|
||||
persistent_term:put({Zone, Key}, Val),
|
||||
{noreply, State};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
|
@ -122,11 +121,11 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
do_reload() ->
|
||||
[ets:insert(?TAB, [{{Zone, Key}, Val} || {Key, Val} <- Opts])
|
||||
[[persistent_term:put({Zone, Key}, Val)
|
||||
|| {Key, Val} <- Opts]
|
||||
|| {Zone, Opts} <- emqx_config:get_env(zones, [])].
|
||||
|
||||
ensure_reload_timer(State = #{timer := undefined}) ->
|
||||
State#{timer := erlang:send_after(timer:minutes(5), self(), reload)};
|
||||
ensure_reload_timer(State) ->
|
||||
State.
|
||||
|
||||
|
|
|
@ -49,9 +49,27 @@ restart_listeners(_) ->
|
|||
ok = emqx_listeners:restart(),
|
||||
ok = emqx_listeners:stop().
|
||||
|
||||
render_config_file() ->
|
||||
Path = local_path(["etc", "emqx.conf"]),
|
||||
{ok, Temp} = file:read_file(Path),
|
||||
Vars0 = mustache_vars(),
|
||||
Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- Vars0],
|
||||
Targ = bbmustache:render(Temp, Vars),
|
||||
NewName = Path ++ ".rendered",
|
||||
ok = file:write_file(NewName, Targ),
|
||||
NewName.
|
||||
|
||||
mustache_vars() ->
|
||||
[{platform_data_dir, local_path(["data"])},
|
||||
{platform_etc_dir, local_path(["etc"])},
|
||||
{platform_log_dir, local_path(["log"])},
|
||||
{platform_plugins_dir, local_path(["plugins"])}
|
||||
].
|
||||
|
||||
generate_config() ->
|
||||
Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]),
|
||||
Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]),
|
||||
ConfFile = render_config_file(),
|
||||
Conf = conf_parse:file(ConfFile),
|
||||
cuttlefish_generator:map(Schema, Conf).
|
||||
|
||||
set_app_env({App, Lists}) ->
|
||||
|
|
|
@ -55,7 +55,8 @@ groups() ->
|
|||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
|
||||
emqx_zone:set_env(external, max_topic_alias, 20),
|
||||
MqttCaps = emqx_zone:get_env(external, '$mqtt_caps'),
|
||||
emqx_zone:set_env(external, '$mqtt_caps', MqttCaps#{max_topic_alias => 20}),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
|
@ -85,15 +86,6 @@ with_connection(DoFun, NumberOfConnections) ->
|
|||
with_connection(DoFun) ->
|
||||
with_connection(DoFun, 1).
|
||||
|
||||
% {ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
|
||||
% [binary, {packet, raw},
|
||||
% {active, false}], 3000),
|
||||
% try
|
||||
% DoFun(Sock)
|
||||
% after
|
||||
% emqx_client_sock:close(Sock)
|
||||
% end.
|
||||
|
||||
handle_followed_packet(_Config) ->
|
||||
ConnPkt = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
|
||||
PartialPkt1 = <<50,182,1,0,4,116,101,115,116,0,1,48,48,48,48,48,48,48,48,48,48,48,48,48,
|
||||
|
@ -191,16 +183,18 @@ connect_v5(_) ->
|
|||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), _} =
|
||||
raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||
?assertNot(maps:is_key('Response-Information', Props)),
|
||||
ok
|
||||
?assertNot(maps:is_key('Response-Information', Props))
|
||||
end),
|
||||
|
||||
% topic alias = 0
|
||||
with_connection(fun([Sock]) ->
|
||||
|
||||
%% ct:log("emqx_protocol: ~p~n", [emqx_zone:get_zone(external, max_topic_alias)]),
|
||||
emqx_client_sock:send(Sock,
|
||||
raw_send_serialize(
|
||||
?CONNECT_PACKET(
|
||||
#mqtt_packet_connect{
|
||||
client_id = "hello",
|
||||
proto_ver = ?MQTT_PROTO_V5,
|
||||
properties =
|
||||
#{'Topic-Alias-Maximum' => 10}}),
|
||||
|
@ -210,7 +204,6 @@ connect_v5(_) ->
|
|||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0,
|
||||
#{'Topic-Alias-Maximum' := 20}), _} =
|
||||
raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||
|
||||
emqx_client_sock:send(Sock,
|
||||
raw_send_serialize(
|
||||
?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, 1, #{'Topic-Alias' => 0}, <<"hello">>),
|
||||
|
@ -383,9 +376,7 @@ connect_v5(_) ->
|
|||
),
|
||||
|
||||
{ok, WillData} = gen_tcp:recv(Sock2, 0, 5000),
|
||||
{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5),
|
||||
|
||||
emqx_client_sock:close(Sock2)
|
||||
{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5)
|
||||
end),
|
||||
|
||||
% duplicate client id
|
||||
|
|
|
@ -31,7 +31,7 @@ end_per_suite(_Config) ->
|
|||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
ignore_loop(_Config) ->
|
||||
application:set_env(emqx, mqtt_ignore_loop_deliver, true),
|
||||
emqx_zone:set_env(external, ignore_loop_deliver, true),
|
||||
{ok, Client} = emqx_client:start_link(),
|
||||
{ok, _} = emqx_client:connect(Client),
|
||||
TestTopic = <<"Self">>,
|
||||
|
@ -41,7 +41,7 @@ ignore_loop(_Config) ->
|
|||
{ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 2),
|
||||
?assertEqual(0, length(emqx_client_SUITE:receive_messages(3))),
|
||||
ok = emqx_client:disconnect(Client),
|
||||
application:set_env(emqx, mqtt_ignore_loop_deliver, false).
|
||||
emqx_zone:set_env(external, ignore_loop_deliver, false).
|
||||
|
||||
t_session_all(_) ->
|
||||
emqx_zone:set_env(internal, idle_timeout, 1000),
|
||||
|
|
|
@ -251,8 +251,8 @@ ensure_config(Strategy) ->
|
|||
ensure_config(Strategy, _AckEnabled = true).
|
||||
|
||||
ensure_config(Strategy, AckEnabled) ->
|
||||
application:set_env(?APPLICATION, shared_subscription_strategy, Strategy),
|
||||
application:set_env(?APPLICATION, shared_dispatch_ack_enabled, AckEnabled),
|
||||
application:set_env(emqx, shared_subscription_strategy, Strategy),
|
||||
application:set_env(emqx, shared_dispatch_ack_enabled, AckEnabled),
|
||||
ok.
|
||||
|
||||
subscribed(Group, Topic, Pid) ->
|
||||
|
|
|
@ -30,23 +30,33 @@
|
|||
topic_alias_maximum => 0,
|
||||
will_msg => undefined}).
|
||||
|
||||
all() -> [{group, sm}].
|
||||
all() -> [{group, registry}, {group, ets}].
|
||||
|
||||
groups() ->
|
||||
[{sm, [non_parallel_tests],
|
||||
[
|
||||
t_resume_session,
|
||||
t_discard_session,
|
||||
t_register_unregister_session,
|
||||
t_get_set_session_attrs,
|
||||
t_get_set_session_stats,
|
||||
t_lookup_session_pids]}].
|
||||
Cases =
|
||||
[ t_resume_session,
|
||||
t_discard_session,
|
||||
t_register_unregister_session,
|
||||
t_get_set_session_attrs,
|
||||
t_get_set_session_stats,
|
||||
t_lookup_session_pids],
|
||||
[ {registry, [non_parallel_tests], Cases},
|
||||
{ets, [non_parallel_tests], Cases}].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok.
|
||||
|
||||
init_per_group(registry, Config) ->
|
||||
emqx_ct_helpers:start_apps([], fun enable_session_registry/1),
|
||||
Config;
|
||||
init_per_group(ets, Config) ->
|
||||
emqx_ct_helpers:start_apps([], fun disable_session_registry/1),
|
||||
Config.
|
||||
|
||||
end_per_group(_, _Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
init_per_testcase(_All, Config) ->
|
||||
|
@ -60,6 +70,14 @@ end_per_testcase(_All, Config) ->
|
|||
after 500 -> ct:fail({timeout, wait_session_shutdown})
|
||||
end.
|
||||
|
||||
enable_session_registry(_) ->
|
||||
application:set_env(emqx, enable_session_registry, true),
|
||||
ok.
|
||||
|
||||
disable_session_registry(_) ->
|
||||
application:set_env(emqx, enable_session_registry, false),
|
||||
ok.
|
||||
|
||||
t_resume_session(Config) ->
|
||||
?assertEqual({ok, ?config(session_pid, Config)}, emqx_sm:resume_session(<<"client">>, ?ATTRS#{conn_pid => self()})).
|
||||
|
||||
|
|
|
@ -23,6 +23,15 @@
|
|||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-define(WAIT(PATTERN, TIMEOUT),
|
||||
receive
|
||||
PATTERN ->
|
||||
ok
|
||||
after
|
||||
TIMEOUT ->
|
||||
error(timeout)
|
||||
end).
|
||||
|
||||
all() -> [t_api].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
|
@ -33,18 +42,36 @@ end_per_suite(_Config) ->
|
|||
application:stop(sasl).
|
||||
|
||||
t_api(_) ->
|
||||
gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}),
|
||||
{ok, _} = emqx_vm_mon:start_link([{check_interval, 1},
|
||||
{process_high_watermark, 0},
|
||||
{process_low_watermark, 0.6}]),
|
||||
timer:sleep(2000),
|
||||
?assertEqual(true, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())),
|
||||
emqx_vm_mon:set_process_high_watermark(0.8),
|
||||
emqx_vm_mon:set_process_low_watermark(0.75),
|
||||
?assertEqual(0.8, emqx_vm_mon:get_process_high_watermark()),
|
||||
?assertEqual(0.75, emqx_vm_mon:get_process_low_watermark()),
|
||||
timer:sleep(3000),
|
||||
?assertEqual(false, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())),
|
||||
emqx_vm_mon:set_check_interval(20),
|
||||
?assertEqual(20, emqx_vm_mon:get_check_interval()),
|
||||
ok.
|
||||
meck:new(alarm_handler, [passthrough, no_history]),
|
||||
Tester = self(),
|
||||
Ref = make_ref(),
|
||||
try
|
||||
meck:expect(alarm_handler, set_alarm,
|
||||
fun(What) ->
|
||||
Res = meck:passthrough([What]),
|
||||
Tester ! {Ref, set_alarm, What},
|
||||
Res
|
||||
end),
|
||||
meck:expect(alarm_handler, clear_alarm,
|
||||
fun(What) ->
|
||||
Res = meck:passthrough([What]),
|
||||
Tester ! {Ref, clear_alarm, What},
|
||||
Res
|
||||
end),
|
||||
gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}),
|
||||
{ok, _} = emqx_vm_mon:start_link([{check_interval, 1},
|
||||
{process_high_watermark, 0},
|
||||
{process_low_watermark, 0.6}]),
|
||||
?WAIT({Ref, set_alarm, {too_many_processes, _Count}}, 2000),
|
||||
?assertEqual(true, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())),
|
||||
emqx_vm_mon:set_process_high_watermark(0.8),
|
||||
emqx_vm_mon:set_process_low_watermark(0.75),
|
||||
?assertEqual(0.8, emqx_vm_mon:get_process_high_watermark()),
|
||||
?assertEqual(0.75, emqx_vm_mon:get_process_low_watermark()),
|
||||
?WAIT({Ref, clear_alarm, too_many_processes}, 3000),
|
||||
?assertEqual(false, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())),
|
||||
emqx_vm_mon:set_check_interval(20),
|
||||
?assertEqual(20, emqx_vm_mon:get_check_interval())
|
||||
after
|
||||
meck:unload(alarm_handler)
|
||||
end.
|
||||
|
|
|
@ -25,7 +25,6 @@ all() -> [t_set_get_env].
|
|||
t_set_get_env(_) ->
|
||||
application:set_env(emqx, zones, [{china, [{language, chinese}]}]),
|
||||
{ok, _} = emqx_zone:start_link(),
|
||||
ct:print("~p~n", [ets:tab2list(emqx_zone)]),
|
||||
chinese = emqx_zone:get_env(china, language),
|
||||
cn470 = emqx_zone:get_env(china, ism_band, cn470),
|
||||
undefined = emqx_zone:get_env(undefined, delay),
|
||||
|
|
Loading…
Reference in New Issue