Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-06-20 17:30:36 +08:00
commit e76451000c
11 changed files with 54 additions and 76 deletions

View File

@ -3,20 +3,17 @@
REBAR_GIT_CLONE_OPTIONS += --depth 1 REBAR_GIT_CLONE_OPTIONS += --depth 1
export REBAR_GIT_CLONE_OPTIONS export REBAR_GIT_CLONE_OPTIONS
# CT_SUITES = emqx_trie emqx_router emqx_frame emqx_mqtt_compat
CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ SUITES_FILES := $(shell find test -name '*_SUITE.erl')
emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ CT_SUITES := $(foreach value,$(SUITES_FILES),$(shell val=$$(basename $(value) .erl); echo $${val%_*}))
emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_channel \
emqx_packet emqx_channel emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \
emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping
CT_NODE_NAME = emqxct@127.0.0.1 CT_NODE_NAME = emqxct@127.0.0.1
.PHONY: cover
run:
@echo $(CT_TEST_SUITES)
compile: compile:
@rebar3 compile @rebar3 compile

View File

@ -2,10 +2,10 @@
[ {jsx, "2.9.0"} % hex [ {jsx, "2.9.0"} % hex
, {cowboy, "2.6.1"} % hex , {cowboy, "2.6.1"} % hex
, {gproc, "0.8.0"} % hex , {gproc, "0.8.0"} % hex
, {ekka, "0.5.6"} % hex
, {replayq, "0.1.1"} %hex
, {esockd, "5.5.0"} %hex
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.5"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.1.1"}}}
, {esockd, "5.5.0"}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}. ]}.

View File

@ -30,9 +30,8 @@ init([]) ->
shutdown => 1000, shutdown => 1000,
type => worker, type => worker,
modules => [emqx_banned]}, modules => [emqx_banned]},
FlappingOption = emqx_config:get_env(flapping_clean_interval, 3600000),
Flapping = #{id => flapping, Flapping = #{id => flapping,
start => {emqx_flapping, start_link, [FlappingOption]}, start => {emqx_flapping, start_link, []},
restart => permanent, restart => permanent,
shutdown => 1000, shutdown => 1000,
type => worker, type => worker,

View File

@ -19,7 +19,7 @@
-behaviour(gen_statem). -behaviour(gen_statem).
-export([start_link/1]). -export([start_link/0]).
%% This module is used to garbage clean the flapping records %% This module is used to garbage clean the flapping records
@ -33,6 +33,8 @@
-define(FLAPPING_TAB, ?MODULE). -define(FLAPPING_TAB, ?MODULE).
-define(default_flapping_clean_interval, 3600000).
-export([check/3]). -export([check/3]).
-record(flapping, -record(flapping,
@ -96,11 +98,12 @@ check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval},
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_statem callbacks %% gen_statem callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(start_link(TimerInterval :: [integer()]) -> startlink_ret()). -spec(start_link() -> startlink_ret()).
start_link(TimerInterval) -> start_link() ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []). gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
init([TimerInterval]) -> init([]) ->
TimerInterval = emqx_config:get_env(flapping_clean_interval, ?default_flapping_clean_interval),
TabOpts = [ public TabOpts = [ public
, set , set
, {keypos, 2} , {keypos, 2}

View File

@ -24,6 +24,8 @@
, get_caps/2 , get_caps/2
]). ]).
-export([default_caps/0]).
-type(caps() :: #{max_packet_size => integer(), -type(caps() :: #{max_packet_size => integer(),
max_clientid_len => integer(), max_clientid_len => integer(),
max_topic_alias => integer(), max_topic_alias => integer(),
@ -36,6 +38,7 @@
-export_type([caps/0]). -export_type([caps/0]).
-define(UNLIMITED, 0). -define(UNLIMITED, 0).
-define(DEFAULT_CAPS, [{max_packet_size, ?MAX_PACKET_SIZE}, -define(DEFAULT_CAPS, [{max_packet_size, ?MAX_PACKET_SIZE},
{max_clientid_len, ?MAX_CLIENTID_LEN}, {max_clientid_len, ?MAX_CLIENTID_LEN},
{max_topic_alias, ?UNLIMITED}, {max_topic_alias, ?UNLIMITED},
@ -119,6 +122,9 @@ check_sub(Topic, Opts, [{max_topic_levels, Limit}|Caps]) ->
_ -> check_sub(Topic, Opts, Caps) _ -> check_sub(Topic, Opts, Caps)
end. end.
default_caps() ->
?DEFAULT_CAPS.
get_caps(Zone, publish) -> get_caps(Zone, publish) ->
with_env(Zone, '$mqtt_pub_caps', with_env(Zone, '$mqtt_pub_caps',
fun() -> fun() ->

View File

@ -47,6 +47,11 @@
-define(OS_MON, ?MODULE). -define(OS_MON, ?MODULE).
-define(compat_windows(Expression), case os:type() of
{win32, nt} -> windows;
_Unix -> Expression
end).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -95,7 +100,7 @@ set_procmem_high_watermark(Float) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
init([Opts]) -> init([Opts]) ->
_ = cpu_sup:util(), _ = ?compat_windows(cpu_sup:util()),
set_mem_check_interval(proplists:get_value(mem_check_interval, Opts, 60)), set_mem_check_interval(proplists:get_value(mem_check_interval, Opts, 60)),
set_sysmem_high_watermark(proplists:get_value(sysmem_high_watermark, Opts, 0.70)), set_sysmem_high_watermark(proplists:get_value(sysmem_high_watermark, Opts, 0.70)),
set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts, 0.05)), set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts, 0.05)),
@ -126,16 +131,18 @@ handle_call(_Request, _From, State) ->
handle_cast(_Request, State) -> handle_cast(_Request, State) ->
{noreply, State}. {noreply, State}.
handle_info({timeout, Timer, check}, State = #{timer := Timer, handle_info({timeout, Timer, check}, State = #{timer := Timer,
cpu_high_watermark := CPUHighWatermark, cpu_high_watermark := CPUHighWatermark,
cpu_low_watermark := CPULowWatermark, cpu_low_watermark := CPULowWatermark,
is_cpu_alarm_set := IsCPUAlarmSet}) -> is_cpu_alarm_set := IsCPUAlarmSet}) ->
case cpu_sup:util() of case ?compat_windows(cpu_sup:util()) of
0 -> 0 ->
{noreply, State#{timer := undefined}}; {noreply, State#{timer := undefined}};
{error, Reason} -> {error, Reason} ->
?LOG(error, "Failed to get cpu utilization: ~p", [Reason]), ?LOG(error, "Failed to get cpu utilization: ~p", [Reason]),
{noreply, ensure_check_timer(State)}; {noreply, ensure_check_timer(State)};
windows ->
{noreply, State};
Busy when Busy / 100 >= CPUHighWatermark -> Busy when Busy / 100 >= CPUHighWatermark ->
alarm_handler:set_alarm({cpu_high_watermark, Busy}), alarm_handler:set_alarm({cpu_high_watermark, Busy}),
{noreply, ensure_check_timer(State#{is_cpu_alarm_set := true})}; {noreply, ensure_check_timer(State#{is_cpu_alarm_set := true})};
@ -163,4 +170,3 @@ call(Req) ->
ensure_check_timer(State = #{cpu_check_interval := Interval}) -> ensure_check_timer(State = #{cpu_check_interval := Interval}) ->
State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}.

View File

@ -963,7 +963,7 @@ do_flapping_detect(Action, #pstate{zone = Zone,
Threshold = emqx_zone:get_env(Zone, flapping_threshold, {10, 60}), Threshold = emqx_zone:get_env(Zone, flapping_threshold, {10, 60}),
case emqx_flapping:check(Action, ClientId, Threshold) of case emqx_flapping:check(Action, ClientId, Threshold) of
flapping -> flapping ->
BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000), BanExpiryInterval = emqx_zone:get_env(Zone, flapping_banned_expiry_interval, 3600000),
Until = erlang:system_time(second) + BanExpiryInterval, Until = erlang:system_time(second) + BanExpiryInterval,
emqx_banned:add(#banned{who = {client_id, ClientId}, emqx_banned:add(#banned{who = {client_id, ClientId},
reason = <<"flapping">>, reason = <<"flapping">>,

View File

@ -36,8 +36,6 @@ all() ->
groups() -> groups() ->
[{access_control, [sequence], [{access_control, [sequence],
[reload_acl, [reload_acl,
register_mod,
unregister_mod,
check_acl_1, check_acl_1,
check_acl_2]}, check_acl_2]},
{access_control_cache_mode, [], {access_control_cache_mode, [],
@ -98,58 +96,26 @@ write_config(Filename, Terms) ->
end_per_group(_Group, Config) -> end_per_group(_Group, Config) ->
Config. Config.
init_per_testcase(_TestCase, Config) ->
?AC:start_link(),
Config.
end_per_testcase(_TestCase, _Config) ->
ok.
per_testcase_config(acl_cache_full, Config) ->
Config;
per_testcase_config(_TestCase, Config) ->
Config.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% emqx_access_control %% emqx_access_control
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
reload_acl(_) -> reload_acl(_) ->
[ok] = ?AC:reload_acl(). ok = ?AC:reload_acl().
register_mod(_) ->
ok = ?AC:register_mod(acl, emqx_acl_test_mod, []),
{emqx_acl_test_mod, _, 0} = hd(?AC:lookup_mods(acl)),
ok = ?AC:register_mod(auth, emqx_auth_anonymous_test_mod,[]),
ok = ?AC:register_mod(auth, emqx_auth_dashboard, [], 99),
[{emqx_auth_dashboard, _, 99},
{emqx_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth).
unregister_mod(_) ->
ok = ?AC:register_mod(acl, emqx_acl_test_mod, []),
{emqx_acl_test_mod, _, 0} = hd(?AC:lookup_mods(acl)),
ok = ?AC:unregister_mod(acl, emqx_acl_test_mod),
timer:sleep(5),
{emqx_acl_internal, _, 0}= hd(?AC:lookup_mods(acl)),
ok = ?AC:register_mod(auth, emqx_auth_anonymous_test_mod,[]),
[{emqx_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth),
ok = ?AC:unregister_mod(auth, emqx_auth_anonymous_test_mod),
timer:sleep(5),
[] = ?AC:lookup_mods(auth).
check_acl_1(_) -> check_acl_1(_) ->
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>), deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>),
allow = ?AC:check_acl(SelfUser, publish, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, publish, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). allow = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>).
check_acl_2(_) -> check_acl_2(_) ->
SelfUser = #{client_id => <<"client2">>, username => <<"xyz">>}, SelfUser = #{client_id => <<"client2">>, username => <<"xyz">>, zone => external},
deny = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). deny = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>).
acl_cache_basic(_) -> acl_cache_basic(_) ->
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external},
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
@ -162,7 +128,7 @@ acl_cache_basic(_) ->
acl_cache_expiry(_) -> acl_cache_expiry(_) ->
application:set_env(emqx, acl_cache_ttl, 100), application:set_env(emqx, acl_cache_ttl, 100),
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external},
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
ct:sleep(150), ct:sleep(150),
@ -172,7 +138,7 @@ acl_cache_expiry(_) ->
acl_cache_full(_) -> acl_cache_full(_) ->
application:set_env(emqx, acl_cache_max_size, 1), application:set_env(emqx, acl_cache_max_size, 1),
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
@ -187,7 +153,7 @@ acl_cache_cleanup(_) ->
application:set_env(emqx, acl_cache_ttl, 100), application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 2), application:set_env(emqx, acl_cache_max_size, 2),
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
@ -357,8 +323,8 @@ compile_rule(_) ->
{deny, all} = compile({deny, all}). {deny, all} = compile({deny, all}).
match_rule(_) -> match_rule(_) ->
User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}}, User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}, zone => external},
User2 = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{192,168,0,10}, 3028}}, User2 = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{192,168,0,10}, 3028}, zone => external},
{matched, allow} = match(User, <<"Test/Topic">>, {allow, all}), {matched, allow} = match(User, <<"Test/Topic">>, {allow, all}),
{matched, deny} = match(User, <<"Test/Topic">>, {deny, all}), {matched, deny} = match(User, <<"Test/Topic">>, {deny, all}),

View File

@ -68,11 +68,11 @@ groups() -> [{connect, [sequence],
]}]. ]}].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(), emqx_ct_helpers:start_apps([]),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps(). emqx_ct_helpers:stop_apps([]).
init_per_group(_Group, Config) -> init_per_group(_Group, Config) ->
Config. Config.
@ -85,7 +85,7 @@ case1_protocol_name(_) ->
MqttPacket = serialize(?CASE1_PROTOCOL_NAME), MqttPacket = serialize(?CASE1_PROTOCOL_NAME),
emqx_client_sock:send(Sock, MqttPacket), emqx_client_sock:send(Sock, MqttPacket),
{ok, Data} = gen_tcp:recv(Sock, 0), {ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), <<>>, _} = raw_recv_pase(Data),
Disconnect = gen_tcp:recv(Sock, 0), Disconnect = gen_tcp:recv(Sock, 0),
?assertEqual({error, closed}, Disconnect). ?assertEqual({error, closed}, Disconnect).
@ -95,7 +95,7 @@ case2_protocol_ver(_) ->
emqx_client_sock:send(Sock, Packet), emqx_client_sock:send(Sock, Packet),
{ok, Data} = gen_tcp:recv(Sock, 0), {ok, Data} = gen_tcp:recv(Sock, 0),
%% case1 Unacceptable protocol version %% case1 Unacceptable protocol version
{ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), <<>>, _} = raw_recv_pase(Data),
Disconnect = gen_tcp:recv(Sock, 0), Disconnect = gen_tcp:recv(Sock, 0),
?assertEqual({error, closed}, Disconnect). ?assertEqual({error, closed}, Disconnect).

View File

@ -55,7 +55,7 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:start_apps([], fun set_special_configs/1), emqx_ct_helpers:start_apps([], fun set_special_configs/1),
MqttCaps = emqx_zone:get_env(external, '$mqtt_caps'), MqttCaps = maps:from_list(emqx_mqtt_caps:default_caps()),
emqx_zone:set_env(external, '$mqtt_caps', MqttCaps#{max_topic_alias => 20}), emqx_zone:set_env(external, '$mqtt_caps', MqttCaps#{max_topic_alias => 20}),
Config. Config.

View File

@ -22,10 +22,11 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]). emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps(). emqx_ct_helpers:stop_apps([]).
all() -> all() ->
[request_response]. [request_response].