diff --git a/etc/emqx.conf b/etc/emqx.conf index 4703f5083..f3f46589e 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -435,15 +435,26 @@ acl_nomatch = allow acl_file = {{ platform_etc_dir }}/acl.conf ## Whether to enable ACL cache for publish. +## The ACL cache size +## The maximum count of ACL entries allowed for a client. ## ## Value: on | off enable_acl_cache = on -## The ACL cache age. +## The ACL cache size +## The maximum count of ACL entries allowed for a client. +## +## Value: Integer greater than 0 +## Default: 32 +acl_cache_max_size = 32 + +## The ACL cache time-to-live. +## The time after which an ACL cache entry will be invalid ## ## Value: Duration -## Default: 5 minute -acl_cache_age = 5m +## Default: 1 minute +acl_cache_ttl = 1m + ##-------------------------------------------------------------------- ## MQTT Protocol @@ -1875,4 +1886,3 @@ sysmon.busy_port = false ## ## Value: true | false sysmon.busy_dist_port = true - diff --git a/priv/emqx.schema b/priv/emqx.schema index a0d2bc0e2..765363607 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -577,17 +577,22 @@ end}. {datatype, flag} ]}. -%% @doc ACL cache age. -{mapping, "acl_cache_age", "emqx.acl_cache_age", [ - {default, "5m"}, +%% @doc ACL cache time-to-live. +{mapping, "acl_cache_ttl", "emqx.acl_cache_ttl", [ + {default, "1m"}, {datatype, {duration, ms}} ]}. %% @doc ACL cache size. -%% {mapping, "acl_cache_size", "emqx.acl_cache_size", [ -%% {default, 0}, -%% {datatype, integer} -%% ]}. +{mapping, "acl_cache_max_size", "emqx.acl_cache_max_size", [ + {default, 32}, + {datatype, integer}, + {validators, ["range:gt_0"]} +]}. + +{validator, "range:gt_0", "must greater than 0", + fun(X) -> X > 0 end +}. %%-------------------------------------------------------------------- %% MQTT Protocol @@ -1703,4 +1708,3 @@ end}. {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] end}. - diff --git a/src/emqx.app.src b/src/emqx.app.src index d963e3662..39d876797 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -3,7 +3,8 @@ {vsn,"3.0"}, {modules,[]}, {registered,[emqx_sup]}, - {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy]}, + {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy + ]}, {env,[]}, {mod,{emqx_app,[]}}, {maintainers,["Feng Lee "]}, diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 1577ca122..3bb31057e 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -86,19 +86,22 @@ authenticate(Credentials, Password, [{Mod, State, _Seq} | Mods]) -> {error, Error} end. +%% @doc Check ACL -spec(check_acl(credentials(), pubsub(), topic()) -> allow | deny). check_acl(Credentials, PubSub, Topic) when ?PS(PubSub) -> - check_acl(Credentials, PubSub, Topic, lookup_mods(acl)). + CacheEnabled = emqx_acl_cache:is_enabled(), + check_acl(Credentials, PubSub, Topic, lookup_mods(acl), CacheEnabled). -check_acl(Credentials, _PubSub, _Topic, []) -> - Zone = maps:get(zone, Credentials, undefined), - emqx_zone:get_env(Zone, acl_nomatch, deny); - -check_acl(Credentials, PubSub, Topic, [{Mod, State, _Seq}|Mods]) -> - case Mod:check_acl({Credentials, PubSub, Topic}, State) of - ignore -> check_acl(Credentials, PubSub, Topic, Mods); - allow -> allow; - deny -> deny +check_acl(Credentials, PubSub, Topic, AclMods, false) -> + do_check_acl(Credentials, PubSub, Topic, AclMods); +check_acl(Credentials, PubSub, Topic, AclMods, true) -> + case emqx_acl_cache:get_acl_cache(PubSub, Topic) of + not_found -> + AclResult = do_check_acl(Credentials, PubSub, Topic, AclMods), + emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult), + AclResult; + AclResult -> + AclResult end. -spec(reload_acl() -> list(ok | {error, term()})). @@ -191,6 +194,15 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +do_check_acl(#client{zone = Zone}, _PubSub, _Topic, []) -> + emqx_zone:get_env(Zone, acl_nomatch, deny); +do_check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> + case Mod:check_acl({Client, PubSub, Topic}, State) of + allow -> allow; + deny -> deny; + ignore -> do_check_acl(Client, PubSub, Topic, AclMods) + end. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/src/emqx_acl_cache.erl b/src/emqx_acl_cache.erl new file mode 100644 index 000000000..65e1e3305 --- /dev/null +++ b/src/emqx_acl_cache.erl @@ -0,0 +1,209 @@ +-module(emqx_acl_cache). + +-include("emqx.hrl"). + +-export([ get_acl_cache/2 + , put_acl_cache/3 + , cleanup_acl_cache/0 + , empty_acl_cache/0 + , dump_acl_cache/0 + , get_cache_size/0 + , get_cache_max_size/0 + , get_newest_key/0 + , get_oldest_key/0 + , cache_k/2 + , cache_v/1 + , is_enabled/0 + ]). + +-type(acl_result() :: allow | deny). + +%% Wrappers for key and value +cache_k(PubSub, Topic)-> {PubSub, Topic}. +cache_v(AclResult)-> {AclResult, time_now()}. + +-spec(is_enabled() -> boolean()). +is_enabled() -> + application:get_env(emqx, enable_acl_cache, true). + +%% We'll cleanup the cache before repalcing an expired acl. +-spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) + -> (acl_result() | not_found)). +get_acl_cache(PubSub, Topic) -> + case erlang:get(cache_k(PubSub, Topic)) of + undefined -> not_found; + {AclResult, CachedAt} -> + if_expired(CachedAt, + fun(false) -> + AclResult; + (true) -> + cleanup_acl_cache(), + not_found + end) + end. + +%% If the cache get full, and also the latest one +%% is expired, then delete all the cache entries +-spec(put_acl_cache(PubSub :: publish | subscribe, + Topic :: topic(), AclResult :: acl_result()) -> ok). +put_acl_cache(PubSub, Topic, AclResult) -> + MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), + Size = get_cache_size(), + if + Size < MaxSize -> + add_acl(PubSub, Topic, AclResult); + Size =:= MaxSize -> + NewestK = get_newest_key(), + {_AclResult, CachedAt} = erlang:get(NewestK), + if_expired(CachedAt, + fun(true) -> + % all cache expired, cleanup first + empty_acl_cache(), + add_acl(PubSub, Topic, AclResult); + (false) -> + % cache full, perform cache replacement + evict_acl_cache(), + add_acl(PubSub, Topic, AclResult) + end) + end. + +%% delete all the acl entries +-spec(empty_acl_cache() -> ok). +empty_acl_cache() -> + map_acl_cache(fun({CacheK, _CacheV}) -> + erlang:erase(CacheK) + end), + set_cache_size(0), + keys_queue_set(queue:new()). + +%% delete the oldest acl entry +-spec(evict_acl_cache() -> ok). +evict_acl_cache() -> + OldestK = keys_queue_out(), + erlang:erase(OldestK), + decr_cache_size(). + +%% cleanup all the exipired cache entries +-spec(cleanup_acl_cache() -> ok). +cleanup_acl_cache() -> + keys_queue_set( + cleanup_acl(keys_queue_get())). + +get_oldest_key() -> + keys_queue_pick(queue_front()). +get_newest_key() -> + keys_queue_pick(queue_rear()). + +get_cache_max_size() -> + application:get_env(emqx, acl_cache_max_size, 32). + +get_cache_size() -> + case erlang:get(acl_cache_size) of + undefined -> 0; + Size -> Size + end. + +dump_acl_cache() -> + map_acl_cache(fun(Cache) -> Cache end). +map_acl_cache(Fun) -> + [Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish + orelse SubPub =:= subscribe]. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +add_acl(PubSub, Topic, AclResult) -> + K = cache_k(PubSub, Topic), + V = cache_v(AclResult), + case erlang:get(K) of + undefined -> add_new_acl(K, V); + {_AclResult, _CachedAt} -> + update_acl(K, V) + end. + +add_new_acl(K, V) -> + erlang:put(K, V), + keys_queue_in(K), + incr_cache_size(). + +update_acl(K, V) -> + erlang:put(K, V), + keys_queue_update(K). + +cleanup_acl(KeysQ) -> + case queue:out(KeysQ) of + {{value, OldestK}, KeysQ2} -> + {_AclResult, CachedAt} = erlang:get(OldestK), + if_expired(CachedAt, + fun(false) -> KeysQ; + (true) -> + erlang:erase(OldestK), + decr_cache_size(), + cleanup_acl(KeysQ2) + end); + {empty, KeysQ} -> KeysQ + end. + +incr_cache_size() -> + erlang:put(acl_cache_size, get_cache_size() + 1), ok. +decr_cache_size() -> + Size = get_cache_size(), + if Size > 1 -> + erlang:put(acl_cache_size, Size-1); + Size =< 1 -> + erlang:put(acl_cache_size, 0) + end, ok. +set_cache_size(N) -> + erlang:put(acl_cache_size, N), ok. + +%%% Ordered Keys Q %%% +keys_queue_in(Key) -> + %% delete the key first if exists + KeysQ = keys_queue_get(), + keys_queue_set(queue:in(Key, KeysQ)). + +keys_queue_out() -> + case queue:out(keys_queue_get()) of + {{value, OldestK}, Q2} -> + keys_queue_set(Q2), OldestK; + {empty, _Q} -> + undefined + end. + +keys_queue_update(Key) -> + NewKeysQ = keys_queue_remove(Key, keys_queue_get()), + keys_queue_set(queue:in(Key, NewKeysQ)). + +keys_queue_pick(Pick) -> + KeysQ = keys_queue_get(), + case queue:is_empty(KeysQ) of + true -> undefined; + false -> Pick(KeysQ) + end. + +keys_queue_remove(Key, KeysQ) -> + queue:filter(fun + (K) when K =:= Key -> false; (_) -> true + end, KeysQ). + +keys_queue_set(KeysQ) -> + erlang:put(acl_keys_q, KeysQ), ok. +keys_queue_get() -> + case erlang:get(acl_keys_q) of + undefined -> queue:new(); + KeysQ -> KeysQ + end. + +queue_front() -> fun queue:get/1. +queue_rear() -> fun queue:get_r/1. + +time_now() -> erlang:system_time(millisecond). + +if_expired(CachedAt, Fun) -> + TTL = application:get_env(emqx, acl_cache_ttl, 60000), + Now = time_now(), + if (CachedAt + TTL) =< Now -> + Fun(true); + true -> + Fun(false) + end. diff --git a/src/emqx_acl_internal.erl b/src/emqx_acl_internal.erl index 07aada812..1ba5c93df 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_acl_internal.erl @@ -103,12 +103,13 @@ reload_acl(#state{acl_file = undefined}) -> ok; reload_acl(State) -> case catch load_rules_from_file(State) of - {'EXIT', Error} -> - {error, Error}; - true -> ok + + {'EXIT', Error} -> {error, Error}; + #state{config=File} -> + io:format("reload acl_internal successfully: ~p~n", [File]), + ok end. -spec(description() -> string()). description() -> "Internal ACL with etc/acl.conf". - diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index f2451e474..7b4c80754 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -672,4 +672,3 @@ feed_var({<<"%u">>, Username}, MountPoint) -> sp(true) -> 1; sp(false) -> 0. - diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index e3e0bbb38..f2f00e55e 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -19,7 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqttc/include/emqttc_packet.hrl"). +-include("emqx_mqtt.hrl"). -define(APP, emqx). @@ -79,7 +79,7 @@ mqtt_connect_with_tcp(_) -> Packet = raw_send_serialise(?CLIENT), gen_tcp:send(Sock, Packet), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), + {ok, ?CONNACK_PACKET(0), _} = raw_recv_pase(Data), gen_tcp:close(Sock). mqtt_connect_with_ssl_oneway(_) -> @@ -133,7 +133,7 @@ mqtt_connect_with_ws(_Config) -> Packet = raw_send_serialise(?CLIENT), ok = rfc6455_client:send_binary(WS, Packet), {binary, P} = rfc6455_client:recv(WS), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(P), + {ok, ?CONNACK_PACKET(0), _} = raw_recv_pase(P), {close, _} = rfc6455_client:close(WS), ok. diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index 270d55f57..f88420e56 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -22,34 +22,65 @@ -include("emqx.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). -define(AC, emqx_access_control). +-define(CACHE, emqx_acl_cache). -import(emqx_access_rule, [compile/1, match/3]). all() -> [{group, access_control}, - {group, access_rule}]. + {group, acl_cache}, + {group, access_control_cache_mode}, + {group, access_rule} + ]. groups() -> [{access_control, [sequence], - [reload_acl, - register_mod, - unregister_mod, - check_acl]}, + [reload_acl, + register_mod, + unregister_mod, + check_acl_1, + check_acl_2 + ]}, + {access_control_cache_mode, [], + [ + acl_cache_basic, + acl_cache_expiry, + acl_cache_cleanup, + acl_cache_full + ]}, + {acl_cache, [], [ + put_get_del_cache, + cache_update, + cache_expiry, + cache_replacement, + cache_cleanup, + cache_auto_emtpy, + cache_auto_cleanup + ]}, {access_rule, [], - [compile_rule, - match_rule]}]. + [compile_rule, + match_rule]}]. -init_per_group(access_control, Config) -> +init_per_group(Group, Config) when Group =:= access_control; + Group =:= access_control_cache_mode -> + prepare_config(Group), application:load(emqx), - prepare_config(), Config; - init_per_group(_Group, Config) -> Config. -prepare_config() -> +prepare_config(Group = access_control) -> + set_acl_config_file(Group), + application:set_env(emqx, enable_acl_cache, false); +prepare_config(Group = access_control_cache_mode) -> + set_acl_config_file(Group), + application:set_env(emqx, enable_acl_cache, true), + application:set_env(emqx, acl_cache_max_size, 100). + +set_acl_config_file(_Group) -> Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}, {allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}, {allow, {user, "admin"}, pubsub, ["a/b/c", "d/e/f/#"]}, @@ -59,11 +90,8 @@ prepare_config() -> {deny, all, subscribe, ["$SYS/#", "#"]}, {deny, all}], write_config("access_SUITE_acl.conf", Rules), - Config = [{auth, anonymous, []}, - {acl, internal, [{config, "access_SUITE_acl.conf"}, - {nomatch, allow}]}], - write_config("access_SUITE_emqx.conf", Config), - application:set_env(emqx, conf, "access_SUITE_emqx.conf"). + application:set_env(emqx, acl_file, "access_SUITE_acl.conf"). + write_config(Filename, Terms) -> file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]). @@ -71,24 +99,18 @@ write_config(Filename, Terms) -> end_per_group(_Group, Config) -> Config. -init_per_testcase(TestCase, Config) when TestCase =:= reload_acl; - TestCase =:= register_mod; - TestCase =:= unregister_mod; - TestCase =:= check_acl -> - {ok, _Pid} = ?AC:start_link(), Config; - init_per_testcase(_TestCase, Config) -> + {ok, _Pid} = ?AC:start_link(), Config. - -end_per_testcase(TestCase, _Config) when TestCase =:= reload_acl; - TestCase =:= register_mod; - TestCase =:= unregister_mod; - TestCase =:= check_acl -> - ?AC:stop(); - end_per_testcase(_TestCase, _Config) -> ok. +per_testcase_config(acl_cache_full, Config) -> + Config; +per_testcase_config(_TestCase, Config) -> + Config. + + %%-------------------------------------------------------------------- %% emqx_access_control %%-------------------------------------------------------------------- @@ -118,15 +140,196 @@ unregister_mod(_) -> timer:sleep(5), [] = ?AC:lookup_mods(auth). -check_acl(_) -> - User1 = #client{client_id = <<"client1">>, username = <<"testuser">>}, - User2 = #client{client_id = <<"client2">>, username = <<"xyz">>}, - allow = ?AC:check_acl(User1, subscribe, <<"users/testuser/1">>), - allow = ?AC:check_acl(User1, subscribe, <<"clients/client1">>), - allow = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>), - allow = ?AC:check_acl(User1, publish, <<"users/testuser/1">>), - allow = ?AC:check_acl(User1, subscribe, <<"a/b/c">>), - allow = ?AC:check_acl(User2, subscribe, <<"a/b/c">>). +check_acl_1(_) -> + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>), + allow = ?AC:check_acl(SelfUser, publish, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). +check_acl_2(_) -> + SelfUser = #client{id = <<"client2">>, username = <<"xyz">>}, + deny = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). + +acl_cache_basic(_) -> + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), + + allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + + allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), + ok. + +acl_cache_expiry(_) -> + application:set_env(emqx, acl_cache_ttl, 100), + + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), + ct:sleep(150), + not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), + ok. + +acl_cache_full(_) -> + application:set_env(emqx, acl_cache_max_size, 1), + + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + + %% the older ones (the <<"users/testuser/1">>) will be evicted first + not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), + ok. + +acl_cache_cleanup(_) -> + %% The acl cache will try to evict memory, if the size is full and the newest + %% cache entry is expired + application:set_env(emqx, acl_cache_ttl, 100), + application:set_env(emqx, acl_cache_max_size, 2), + + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + + allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), + + ct:sleep(150), + %% now the cache is full and the newest one - "clients/client1" + %% should be expired, so we'll empty the cache before putting + %% the next cache entry + deny = ?AC:check_acl(SelfUser, subscribe, <<"#">>), + + not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), + deny = ?CACHE:get_acl_cache(subscribe, <<"#">>), + ok. + +put_get_del_cache(_) -> + application:set_env(emqx, acl_cache_ttl, 300000), + application:set_env(emqx, acl_cache_max_size, 30), + + not_found = ?CACHE:get_acl_cache(publish, <<"a">>), + ok = ?CACHE:put_acl_cache(publish, <<"a">>, allow), + allow = ?CACHE:get_acl_cache(publish, <<"a">>), + + not_found = ?CACHE:get_acl_cache(subscribe, <<"b">>), + ok = ?CACHE:put_acl_cache(subscribe, <<"b">>, deny), + deny = ?CACHE:get_acl_cache(subscribe, <<"b">>), + + 2 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(subscribe, <<"b">>), ?CACHE:get_newest_key()). + +cache_expiry(_) -> + application:set_env(emqx, acl_cache_ttl, 100), + application:set_env(emqx, acl_cache_max_size, 30), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + allow = ?CACHE:get_acl_cache(subscribe, <<"a">>), + + ct:sleep(150), + not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), + + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, deny), + deny = ?CACHE:get_acl_cache(subscribe, <<"a">>), + + ct:sleep(150), + not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>). + +cache_update(_) -> + application:set_env(emqx, acl_cache_ttl, 300000), + application:set_env(emqx, acl_cache_max_size, 30), + [] = ?CACHE:dump_acl_cache(), + + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()), + + %% update the 2nd one + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]), + + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()), + ?assertEqual(?CACHE:cache_k(subscribe, <<"a">>), ?CACHE:get_oldest_key()). + +cache_replacement(_) -> + application:set_env(emqx, acl_cache_ttl, 300000), + application:set_env(emqx, acl_cache_max_size, 3), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + allow = ?CACHE:get_acl_cache(subscribe, <<"a">>), + allow = ?CACHE:get_acl_cache(publish, <<"b">>), + allow = ?CACHE:get_acl_cache(publish, <<"c">>), + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()), + + ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny), + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"d">>), ?CACHE:get_newest_key()), + ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_oldest_key()), + + ok = ?CACHE:put_acl_cache(publish, <<"e">>, deny), + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"e">>), ?CACHE:get_newest_key()), + ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()), + + not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), + not_found = ?CACHE:get_acl_cache(publish, <<"b">>), + allow = ?CACHE:get_acl_cache(publish, <<"c">>). + +cache_cleanup(_) -> + application:set_env(emqx, acl_cache_ttl, 100), + application:set_env(emqx, acl_cache_max_size, 30), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ct:sleep(150), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + 3 = ?CACHE:get_cache_size(), + + ?CACHE:cleanup_acl_cache(), + ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()), + 1 = ?CACHE:get_cache_size(). + +cache_auto_emtpy(_) -> + %% verify cache is emptied when cache full and even the newest + %% one is expired. + application:set_env(emqx, acl_cache_ttl, 100), + application:set_env(emqx, acl_cache_max_size, 3), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + 3 = ?CACHE:get_cache_size(), + + ct:sleep(150), + ok = ?CACHE:put_acl_cache(subscribe, <<"d">>, deny), + 1 = ?CACHE:get_cache_size(). + +cache_auto_cleanup(_) -> + %% verify we'll cleanup expired entries when we got a exipired acl + %% from cache. + application:set_env(emqx, acl_cache_ttl, 100), + application:set_env(emqx, acl_cache_max_size, 30), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ct:sleep(150), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny), + 4 = ?CACHE:get_cache_size(), + + %% "a" and "b" expires, while "c" and "d" not + not_found = ?CACHE:get_acl_cache(publish, <<"b">>), + 2 = ?CACHE:get_cache_size(), + + ct:sleep(150), %% now "c" and "d" expires + not_found = ?CACHE:get_acl_cache(publish, <<"c">>), + 0 = ?CACHE:get_cache_size(). %%-------------------------------------------------------------------- %% emqx_access_rule @@ -159,9 +362,9 @@ compile_rule(_) -> {deny, all} = compile({deny, all}). match_rule(_) -> - User = #client{peername = {{127,0,0,1}, 2948}, client_id = <<"testClient">>, username = <<"TestUser">>}, - User2 = #client{peername = {{192,168,0,10}, 3028}, client_id = <<"testClient">>, username = <<"TestUser">>}, - + User = #client{peername = {{127,0,0,1}, 2948}, id = <<"testClient">>, username = <<"TestUser">>}, + User2 = #client{peername = {{192,168,0,10}, 3028}, id = <<"testClient">>, username = <<"TestUser">>}, + {matched, allow} = match(User, <<"Test/Topic">>, {allow, all}), {matched, deny} = match(User, <<"Test/Topic">>, {deny, all}), {matched, allow} = match(User, <<"Test/Topic">>, compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]})), @@ -179,4 +382,3 @@ match_rule(_) -> {matched, allow} = match(User, <<"Topic">>, AndRule), OrRule = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}), {matched, allow} = match(User, <<"Topic">>, OrRule). - diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index e77d689d4..af5c64949 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -149,7 +149,7 @@ start_session(_) -> {ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>), {ok, SessPid} = emqx_mock_client:start_session(ClientPid), Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>), - Message1 = Message#mqtt_message{packet_id = 1}, + Message1 = Message#message{id = 1}, emqx_session:publish(SessPid, Message1), emqx_session:pubrel(SessPid, 1), emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]), diff --git a/test/emqx_mqueue_SUITE.erl b/test/emqx_mqueue_SUITE.erl index d174d980e..5ab510633 100644 --- a/test/emqx_mqueue_SUITE.erl +++ b/test/emqx_mqueue_SUITE.erl @@ -35,22 +35,22 @@ t_in(_) -> {store_qos0, true}], Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), ?assert(?Q:is_empty(Q)), - Q1 = ?Q:in(#mqtt_message{}, Q), + Q1 = ?Q:in(#message{}, Q), ?assertEqual(1, ?Q:len(Q1)), - Q2 = ?Q:in(#mqtt_message{qos = 1}, Q1), + Q2 = ?Q:in(#message{qos = 1}, Q1), ?assertEqual(2, ?Q:len(Q2)), - Q3 = ?Q:in(#mqtt_message{qos = 2}, Q2), - Q4 = ?Q:in(#mqtt_message{}, Q3), - Q5 = ?Q:in(#mqtt_message{}, Q4), + Q3 = ?Q:in(#message{qos = 2}, Q2), + Q4 = ?Q:in(#message{}, Q3), + Q5 = ?Q:in(#message{}, Q4), ?assertEqual(5, ?Q:len(Q5)). t_in_qos0(_) -> Opts = [{max_length, 5}, {store_qos0, false}], Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), - Q1 = ?Q:in(#mqtt_message{}, Q), + Q1 = ?Q:in(#message{}, Q), ?assert(?Q:is_empty(Q1)), - Q2 = ?Q:in(#mqtt_message{qos = 0}, Q1), + Q2 = ?Q:in(#message{qos = 0}, Q1), ?assert(?Q:is_empty(Q2)). t_out(_) -> @@ -58,10 +58,10 @@ t_out(_) -> {store_qos0, true}], Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), {empty, Q} = ?Q:out(Q), - Q1 = ?Q:in(#mqtt_message{}, Q), + Q1 = ?Q:in(#message{}, Q), {Value, Q2} = ?Q:out(Q1), ?assertEqual(0, ?Q:len(Q2)), - ?assertEqual({value, #mqtt_message{}}, Value). + ?assertEqual({value, #message{}}, Value). t_simple_mqueue(_) -> Opts = [{type, simple}, @@ -74,13 +74,13 @@ t_simple_mqueue(_) -> ?assertEqual(3, ?Q:max_len(Q)), ?assertEqual(<<"simple_queue">>, ?Q:name(Q)), ?assert(?Q:is_empty(Q)), - Q1 = ?Q:in(#mqtt_message{qos = 1, payload = <<"1">>}, Q), - Q2 = ?Q:in(#mqtt_message{qos = 1, payload = <<"2">>}, Q1), - Q3 = ?Q:in(#mqtt_message{qos = 1, payload = <<"3">>}, Q2), - Q4 = ?Q:in(#mqtt_message{qos = 1, payload = <<"4">>}, Q3), + Q1 = ?Q:in(#message{qos = 1, payload = <<"1">>}, Q), + Q2 = ?Q:in(#message{qos = 1, payload = <<"2">>}, Q1), + Q3 = ?Q:in(#message{qos = 1, payload = <<"3">>}, Q2), + Q4 = ?Q:in(#message{qos = 1, payload = <<"4">>}, Q3), ?assertEqual(3, ?Q:len(Q4)), {{value, Msg}, Q5} = ?Q:out(Q4), - ?assertEqual(<<"2">>, Msg#mqtt_message.payload), + ?assertEqual(<<"2">>, Msg#message.payload), ?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)). t_infinity_simple_mqueue(_) -> @@ -93,12 +93,12 @@ t_infinity_simple_mqueue(_) -> ?assert(?Q:is_empty(Q)), ?assertEqual(0, ?Q:max_len(Q)), Qx = lists:foldl(fun(I, AccQ) -> - ?Q:in(#mqtt_message{qos = 1, payload = iolist_to_binary([I])}, AccQ) + ?Q:in(#message{qos = 1, payload = iolist_to_binary([I])}, AccQ) end, Q, lists:seq(1, 255)), ?assertEqual(255, ?Q:len(Qx)), ?assertEqual([{len, 255}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)), {{value, V}, _Qy} = ?Q:out(Qx), - ?assertEqual(<<1>>, V#mqtt_message.payload). + ?assertEqual(<<1>>, V#message.payload). t_priority_mqueue(_) -> Opts = [{type, priority}, @@ -113,18 +113,18 @@ t_priority_mqueue(_) -> ?assertEqual(<<"priority_queue">>, ?Q:name(Q)), ?assert(?Q:is_empty(Q)), - Q1 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q), - Q2 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t">>}, Q1), - Q3 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t2">>}, Q2), + Q1 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q), + Q2 = ?Q:in(#message{qos = 1, topic = <<"t">>}, Q1), + Q3 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q2), ?assertEqual(3, ?Q:len(Q3)), - Q4 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q3), + Q4 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q3), ?assertEqual(4, ?Q:len(Q4)), - Q5 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q4), + Q5 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q4), ?assertEqual(5, ?Q:len(Q5)), - Q6 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q5), + Q6 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q5), ?assertEqual(5, ?Q:len(Q6)), {{value, Msg}, _Q7} = ?Q:out(Q6), - ?assertEqual(<<"t">>, Msg#mqtt_message.topic). + ?assertEqual(<<"t">>, Msg#message.topic). t_infinity_priority_mqueue(_) -> Opts = [{type, priority}, @@ -135,8 +135,8 @@ t_infinity_priority_mqueue(_) -> ?assertEqual(0, ?Q:max_len(Q)), Qx = lists:foldl(fun(I, AccQ) -> AccQ1 = - ?Q:in(#mqtt_message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ), - ?Q:in(#mqtt_message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1) + ?Q:in(#message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ), + ?Q:in(#message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1) end, Q, lists:seq(1, 255)), ?assertEqual(510, ?Q:len(Qx)), ?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)). @@ -149,10 +149,10 @@ t_priority_mqueue2(_) -> {store_qos0, false}], Q = ?Q:new("priority_queue2_test", Opts, alarm_fun()), ?assertEqual(2, ?Q:max_len(Q)), - Q1 = ?Q:in(#mqtt_message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q), - Q2 = ?Q:in(#mqtt_message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1), - Q3 = ?Q:in(#mqtt_message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2), - Q4 = ?Q:in(#mqtt_message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3), + Q1 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q), + Q2 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1), + Q3 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2), + Q4 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3), ?assertEqual(4, ?Q:len(Q4)), {{value, _Val}, Q5} = ?Q:out(Q4), ?assertEqual(3, ?Q:len(Q5)).