diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 07999b0fb..1c5d04b4e 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -24,17 +24,6 @@ -export([register_mod/3, register_mod/4, unregister_mod/2]). -export([stop/0]). --export([get_acl_cache/2, - put_acl_cache/3, - cleanup_acl_cache/0, - dump_acl_cache/0, - get_cache_size/0, - get_newest_key/0, - get_oldest_key/0, - cache_k/2, - cache_v/1 - ]). - %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -43,7 +32,6 @@ -define(SERVER, ?MODULE). -type(password() :: undefined | binary()). --type(acl_result() :: allow | deny). -record(state, {}). @@ -93,16 +81,16 @@ authenticate(Client, Password, [{Mod, State, _Seq} | Mods]) -> %% @doc Check ACL -spec(check_acl(client(), pubsub(), topic()) -> allow | deny). check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> - CacheEnabled = (get_cache_max_size() =/= 0), + CacheEnabled = (emqx_acl_cache:get_cache_max_size() =/= 0), check_acl(Client, PubSub, Topic, lookup_mods(acl), CacheEnabled). check_acl(Client, PubSub, Topic, AclMods, false) -> check_acl_from_plugins(Client, PubSub, Topic, AclMods); check_acl(Client, PubSub, Topic, AclMods, true) -> - case get_acl_cache(PubSub, Topic) of + case emqx_acl_cache:get_acl_cache(PubSub, Topic) of not_found -> AclResult = check_acl_from_plugins(Client, PubSub, Topic, AclMods), - put_acl_cache(PubSub, Topic, AclResult), + emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult), AclResult; AclResult -> AclResult @@ -218,171 +206,3 @@ if_existed(false, Fun) -> Fun(); if_existed(_Mod, _Fun) -> {error, already_existed}. - -%%-------------------------------------------------------------------- -%% ACL cache -%%-------------------------------------------------------------------- - -%% We'll cleanup the cache before repalcing an expired acl. --spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) - -> (acl_result() | not_found)). -get_acl_cache(PubSub, Topic) -> - case erlang:get(cache_k(PubSub, Topic)) of - undefined -> not_found; - {AclResult, CachedAt} -> - if_expired(CachedAt, - fun(false) -> - AclResult; - (true) -> - cleanup_acl_cache(), - not_found - end) - end. - -%% If the cache get full, and also the latest one -%% is expired, then delete all the cache entries --spec(put_acl_cache(PubSub :: publish | subscribe, - Topic :: topic(), AclResult :: acl_result()) -> ok). -put_acl_cache(PubSub, Topic, AclResult) -> - MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), - Size = get_cache_size(), - if - Size < MaxSize -> - add_acl_cache(PubSub, Topic, AclResult); - Size =:= MaxSize -> - NewestK = get_newest_key(), - {_AclResult, CachedAt} = erlang:get(NewestK), - if_expired(CachedAt, - fun(true) -> - % all cache expired, cleanup first - empty_acl_cache(), - add_acl_cache(PubSub, Topic, AclResult); - (false) -> - % cache full, perform cache replacement - evict_acl_cache(), - add_acl_cache(PubSub, Topic, AclResult) - end) - end. - -empty_acl_cache() -> - map_acl_cache(fun({CacheK, _CacheV}) -> - erlang:erase(CacheK) - end), - set_cache_size(0), - set_keys_queue(queue:new()). - -evict_acl_cache() -> - {{value, OldestK}, RemKeys} = queue:out(get_keys_queue()), - set_keys_queue(RemKeys), - erlang:erase(OldestK), - decr_cache_size(). - -add_acl_cache(PubSub, Topic, AclResult) -> - K = cache_k(PubSub, Topic), - V = cache_v(AclResult), - case get(K) of - undefined -> add_new_acl(K, V); - {_AclResult, _CachedAt} -> - update_acl(K, V) - end. - -add_new_acl(K, V) -> - erlang:put(K, V), - keys_queue_in(K), - incr_cache_size(). - -update_acl(K, V) -> - erlang:put(K, V), - keys_queue_update(K). - -%% cleanup all the exipired cache entries --spec(cleanup_acl_cache() -> ok). -cleanup_acl_cache() -> - set_keys_queue( - cleanup_acl_cache(get_keys_queue())). - -cleanup_acl_cache(KeysQ) -> - case queue:out(KeysQ) of - {{value, OldestK}, RemKeys} -> - {_AclResult, CachedAt} = erlang:get(OldestK), - if_expired(CachedAt, - fun(false) -> KeysQ; - (true) -> - erlang:erase(OldestK), - decr_cache_size(), - cleanup_acl_cache(RemKeys) - end); - {empty, KeysQ} -> KeysQ - end. - -get_newest_key() -> - get_key(fun(KeysQ) -> queue:get_r(KeysQ) end). - -get_oldest_key() -> - get_key(fun(KeysQ) -> queue:get(KeysQ) end). - -get_key(Pick) -> - KeysQ = get_keys_queue(), - case queue:is_empty(KeysQ) of - true -> undefined; - false -> Pick(KeysQ) - end. - -%% for test only -dump_acl_cache() -> - map_acl_cache(fun(Cache) -> Cache end). -map_acl_cache(Fun) -> - [Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish - orelse SubPub =:= subscribe]. - - -cache_k(PubSub, Topic)-> {PubSub, Topic}. -cache_v(AclResult)-> {AclResult, time_now()}. - -get_cache_max_size() -> - application:get_env(emqx, acl_cache_max_size, 0). - -get_cache_size() -> - case erlang:get(acl_cache_size) of - undefined -> 0; - Size -> Size - end. -incr_cache_size() -> - erlang:put(acl_cache_size, get_cache_size() + 1), ok. -decr_cache_size() -> - erlang:put(acl_cache_size, get_cache_size() - 1), ok. -set_cache_size(N) -> - erlang:put(acl_cache_size, N), ok. - -keys_queue_in(Key) -> - %% delete the key first if exists - KeysQ = get_keys_queue(), - set_keys_queue(queue:in(Key, KeysQ)). - -keys_queue_update(Key) -> - NewKeysQ = remove_key(Key, get_keys_queue()), - set_keys_queue(queue:in(Key, NewKeysQ)). - -remove_key(Key, KeysQ) -> - queue:filter(fun - (K) when K =:= Key -> false; (_) -> true - end, KeysQ). - -set_keys_queue(KeysQ) -> - erlang:put(acl_keys_q, KeysQ), ok. -get_keys_queue() -> - case erlang:get(acl_keys_q) of - undefined -> queue:new(); - KeysQ -> KeysQ - end. - -time_now() -> erlang:system_time(millisecond). - -if_expired(CachedAt, Fun) -> - TTL = application:get_env(emqx, acl_cache_ttl, 60000), - Now = time_now(), - if (CachedAt + TTL) =< Now -> - Fun(true); - true -> - Fun(false) - end. diff --git a/src/emqx_acl_cache.erl b/src/emqx_acl_cache.erl new file mode 100644 index 000000000..7db23fb1e --- /dev/null +++ b/src/emqx_acl_cache.erl @@ -0,0 +1,204 @@ +-module(emqx_acl_cache). + +-include("emqx.hrl"). + +-export([ get_acl_cache/2 + , put_acl_cache/3 + , cleanup_acl_cache/0 + , empty_acl_cache/0 + , dump_acl_cache/0 + , get_cache_size/0 + , get_cache_max_size/0 + , get_newest_key/0 + , get_oldest_key/0 + , cache_k/2 + , cache_v/1 + ]). + +-type(acl_result() :: allow | deny). + +%% Wrappers for key and value +cache_k(PubSub, Topic)-> {PubSub, Topic}. +cache_v(AclResult)-> {AclResult, time_now()}. + +%% We'll cleanup the cache before repalcing an expired acl. +-spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) + -> (acl_result() | not_found)). +get_acl_cache(PubSub, Topic) -> + case erlang:get(cache_k(PubSub, Topic)) of + undefined -> not_found; + {AclResult, CachedAt} -> + if_expired(CachedAt, + fun(false) -> + AclResult; + (true) -> + cleanup_acl_cache(), + not_found + end) + end. + +%% If the cache get full, and also the latest one +%% is expired, then delete all the cache entries +-spec(put_acl_cache(PubSub :: publish | subscribe, + Topic :: topic(), AclResult :: acl_result()) -> ok). +put_acl_cache(PubSub, Topic, AclResult) -> + MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), + Size = get_cache_size(), + if + Size < MaxSize -> + add_acl(PubSub, Topic, AclResult); + Size =:= MaxSize -> + NewestK = get_newest_key(), + {_AclResult, CachedAt} = erlang:get(NewestK), + if_expired(CachedAt, + fun(true) -> + % all cache expired, cleanup first + empty_acl_cache(), + add_acl(PubSub, Topic, AclResult); + (false) -> + % cache full, perform cache replacement + evict_acl_cache(), + add_acl(PubSub, Topic, AclResult) + end) + end. + +%% delete all the acl entries +-spec(empty_acl_cache() -> ok). +empty_acl_cache() -> + map_acl_cache(fun({CacheK, _CacheV}) -> + erlang:erase(CacheK) + end), + set_cache_size(0), + keys_queue_set(queue:new()). + +%% delete the oldest acl entry +-spec(evict_acl_cache() -> ok). +evict_acl_cache() -> + OldestK = keys_queue_out(), + erlang:erase(OldestK), + decr_cache_size(). + +%% cleanup all the exipired cache entries +-spec(cleanup_acl_cache() -> ok). +cleanup_acl_cache() -> + keys_queue_set( + cleanup_acl(keys_queue_get())). + +get_oldest_key() -> + keys_queue_pick(queue_front()). +get_newest_key() -> + keys_queue_pick(queue_rear()). + +get_cache_max_size() -> + application:get_env(emqx, acl_cache_max_size, 0). + +get_cache_size() -> + case erlang:get(acl_cache_size) of + undefined -> 0; + Size -> Size + end. + +dump_acl_cache() -> + map_acl_cache(fun(Cache) -> Cache end). +map_acl_cache(Fun) -> + [Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish + orelse SubPub =:= subscribe]. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +add_acl(PubSub, Topic, AclResult) -> + K = cache_k(PubSub, Topic), + V = cache_v(AclResult), + case erlang:get(K) of + undefined -> add_new_acl(K, V); + {_AclResult, _CachedAt} -> + update_acl(K, V) + end. + +add_new_acl(K, V) -> + erlang:put(K, V), + keys_queue_in(K), + incr_cache_size(). + +update_acl(K, V) -> + erlang:put(K, V), + keys_queue_update(K). + +cleanup_acl(KeysQ) -> + case queue:out(KeysQ) of + {{value, OldestK}, KeysQ2} -> + {_AclResult, CachedAt} = erlang:get(OldestK), + if_expired(CachedAt, + fun(false) -> KeysQ; + (true) -> + erlang:erase(OldestK), + decr_cache_size(), + cleanup_acl(KeysQ2) + end); + {empty, KeysQ} -> KeysQ + end. + +incr_cache_size() -> + erlang:put(acl_cache_size, get_cache_size() + 1), ok. +decr_cache_size() -> + Size = get_cache_size(), + if Size > 1 -> + erlang:put(acl_cache_size, Size-1); + Size =< 1 -> + erlang:put(acl_cache_size, 0) + end, ok. +set_cache_size(N) -> + erlang:put(acl_cache_size, N), ok. + +%%% Ordered Keys Q %%% +keys_queue_in(Key) -> + %% delete the key first if exists + KeysQ = keys_queue_get(), + keys_queue_set(queue:in(Key, KeysQ)). + +keys_queue_out() -> + case queue:out(keys_queue_get()) of + {{value, OldestK}, Q2} -> + keys_queue_set(Q2), OldestK; + {empty, _Q} -> + undefined + end. + +keys_queue_update(Key) -> + NewKeysQ = keys_queue_remove(Key, keys_queue_get()), + keys_queue_set(queue:in(Key, NewKeysQ)). + +keys_queue_pick(Pick) -> + KeysQ = keys_queue_get(), + case queue:is_empty(KeysQ) of + true -> undefined; + false -> Pick(KeysQ) + end. + +keys_queue_remove(Key, KeysQ) -> + queue:filter(fun + (K) when K =:= Key -> false; (_) -> true + end, KeysQ). + +keys_queue_set(KeysQ) -> + erlang:put(acl_keys_q, KeysQ), ok. +keys_queue_get() -> + case erlang:get(acl_keys_q) of + undefined -> queue:new(); + KeysQ -> KeysQ + end. + +queue_front() -> fun queue:get/1. +queue_rear() -> fun queue:get_r/1. + +time_now() -> erlang:system_time(millisecond). + +if_expired(CachedAt, Fun) -> + TTL = application:get_env(emqx, acl_cache_ttl, 60000), + Now = time_now(), + if (CachedAt + TTL) =< Now -> + Fun(true); + true -> + Fun(false) + end. diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index c3907c2db..cf859d7c1 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -25,6 +25,7 @@ -include_lib("eunit/include/eunit.hrl"). -define(AC, emqx_access_control). +-define(CACHE, emqx_acl_cache). -import(emqx_access_rule, [compile/1, match/3]). @@ -150,14 +151,14 @@ check_acl_2(_) -> acl_cache_basic(_) -> SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, - not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), - not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), - allow = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), - allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ok. acl_cache_expiry(_) -> @@ -165,9 +166,9 @@ acl_cache_expiry(_) -> SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), - allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ct:sleep(1100), - not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ok. acl_cache_full(_) -> @@ -178,8 +179,8 @@ acl_cache_full(_) -> allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), %% the older ones (the <<"users/testuser/1">>) will be evicted first - not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), - allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ok. acl_cache_cleanup(_) -> @@ -192,115 +193,115 @@ acl_cache_cleanup(_) -> allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), - allow = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), - allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ct:sleep(1100), %% now the cache is full and the newest one - "clients/client1" %% should be expired, so we'll try to cleanup before putting the next cache entry deny = ?AC:check_acl(SelfUser, subscribe, <<"#">>), - not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), - not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), - deny = ?AC:get_acl_cache(subscribe, <<"#">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), + deny = ?CACHE:get_acl_cache(subscribe, <<"#">>), ok. put_get_del_cache(_) -> application:set_env(emqx, acl_cache_ttl, 300000), application:set_env(emqx, acl_cache_max_size, 30), - not_found = ?AC:get_acl_cache(publish, <<"a">>), - ok = ?AC:put_acl_cache(publish, <<"a">>, allow), - allow = ?AC:get_acl_cache(publish, <<"a">>), + not_found = ?CACHE:get_acl_cache(publish, <<"a">>), + ok = ?CACHE:put_acl_cache(publish, <<"a">>, allow), + allow = ?CACHE:get_acl_cache(publish, <<"a">>), - not_found = ?AC:get_acl_cache(subscribe, <<"b">>), - ok = ?AC:put_acl_cache(subscribe, <<"b">>, deny), - deny = ?AC:get_acl_cache(subscribe, <<"b">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"b">>), + ok = ?CACHE:put_acl_cache(subscribe, <<"b">>, deny), + deny = ?CACHE:get_acl_cache(subscribe, <<"b">>), - 2 = ?AC:get_cache_size(), - ?assertEqual(?AC:cache_k(subscribe, <<"b">>), ?AC:get_newest_key()). + 2 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(subscribe, <<"b">>), ?CACHE:get_newest_key()). cache_expiry(_) -> application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_max_size, 30), - ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), - allow = ?AC:get_acl_cache(subscribe, <<"a">>), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + allow = ?CACHE:get_acl_cache(subscribe, <<"a">>), ct:sleep(1100), - not_found = ?AC:get_acl_cache(subscribe, <<"a">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), - ok = ?AC:put_acl_cache(subscribe, <<"a">>, deny), - deny = ?AC:get_acl_cache(subscribe, <<"a">>), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, deny), + deny = ?CACHE:get_acl_cache(subscribe, <<"a">>), ct:sleep(1100), - not_found = ?AC:get_acl_cache(subscribe, <<"a">>). + not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>). cache_update(_) -> application:set_env(emqx, acl_cache_ttl, 300000), application:set_env(emqx, acl_cache_max_size, 30), - [] = ?AC:dump_acl_cache(), + [] = ?CACHE:dump_acl_cache(), - ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?AC:put_acl_cache(publish, <<"b">>, allow), - ok = ?AC:put_acl_cache(publish, <<"c">>, allow), - 3 = ?AC:get_cache_size(), - ?assertEqual(?AC:cache_k(publish, <<"c">>), ?AC:get_newest_key()), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()), %% update the 2nd one - ok = ?AC:put_acl_cache(publish, <<"b">>, allow), - %ct:pal("dump acl cache: ~p~n", [?AC:dump_acl_cache()]), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + %ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]), - 3 = ?AC:get_cache_size(), - ?assertEqual(?AC:cache_k(publish, <<"b">>), ?AC:get_newest_key()). + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()). cache_full_replacement(_) -> application:set_env(emqx, acl_cache_ttl, 300000), application:set_env(emqx, acl_cache_max_size, 3), - ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?AC:put_acl_cache(publish, <<"b">>, allow), - ok = ?AC:put_acl_cache(publish, <<"c">>, allow), - allow = ?AC:get_acl_cache(subscribe, <<"a">>), - allow = ?AC:get_acl_cache(publish, <<"b">>), - allow = ?AC:get_acl_cache(publish, <<"c">>), - 3 = ?AC:get_cache_size(), - ?assertEqual(?AC:cache_k(publish, <<"c">>), ?AC:get_newest_key()), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + allow = ?CACHE:get_acl_cache(subscribe, <<"a">>), + allow = ?CACHE:get_acl_cache(publish, <<"b">>), + allow = ?CACHE:get_acl_cache(publish, <<"c">>), + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()), - ok = ?AC:put_acl_cache(publish, <<"d">>, deny), - 3 = ?AC:get_cache_size(), - ?assertEqual(?AC:cache_k(publish, <<"d">>), ?AC:get_newest_key()), + ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny), + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"d">>), ?CACHE:get_newest_key()), - ok = ?AC:put_acl_cache(publish, <<"e">>, deny), - 3 = ?AC:get_cache_size(), - ?assertEqual(?AC:cache_k(publish, <<"e">>), ?AC:get_newest_key()), + ok = ?CACHE:put_acl_cache(publish, <<"e">>, deny), + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"e">>), ?CACHE:get_newest_key()), - not_found = ?AC:get_acl_cache(subscribe, <<"a">>), - not_found = ?AC:get_acl_cache(publish, <<"b">>), - allow = ?AC:get_acl_cache(publish, <<"c">>). + not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), + not_found = ?CACHE:get_acl_cache(publish, <<"b">>), + allow = ?CACHE:get_acl_cache(publish, <<"c">>). cache_cleanup(_) -> application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_max_size, 30), - ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?AC:put_acl_cache(publish, <<"b">>, allow), - ok = ?AC:put_acl_cache(publish, <<"c">>, allow), - 3 = ?AC:get_cache_size(), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + 3 = ?CACHE:get_cache_size(), ct:sleep(1100), - ?AC:cleanup_acl_cache(), - 0 = ?AC:get_cache_size(). + ?CACHE:cleanup_acl_cache(), + 0 = ?CACHE:get_cache_size(). cache_full_cleanup(_) -> application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_max_size, 3), - ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?AC:put_acl_cache(publish, <<"b">>, allow), - ok = ?AC:put_acl_cache(publish, <<"c">>, allow), - 3 = ?AC:get_cache_size(), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + 3 = ?CACHE:get_cache_size(), ct:sleep(1100), %% verify auto cleanup upon cache full - ok = ?AC:put_acl_cache(subscribe, <<"d">>, deny), - 1 = ?AC:get_cache_size(). + ok = ?CACHE:put_acl_cache(subscribe, <<"d">>, deny), + 1 = ?CACHE:get_cache_size(). %%-------------------------------------------------------------------- %% emqx_access_rule