diff --git a/etc/emqx.conf b/etc/emqx.conf index 33c06b5d3..62f94b174 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -436,11 +436,12 @@ acl_file = {{ platform_etc_dir }}/acl.conf ## The ACL cache size ## The maximum count of ACL entries allowed for a client. +## ## Value 0 disables ACL cache ## ## Value: Integer -## Default: 100 -acl_cache_size = 100 +## Default: 32 +acl_cache_max_size = 32 ## The ACL cache time-to-live. ## The time after which an ACL cache entry will be invalid diff --git a/priv/emqx.schema b/priv/emqx.schema index f26c012ce..f6ee7c621 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -578,8 +578,8 @@ end}. ]}. %% @doc ACL cache size. -{mapping, "acl_cache_size", "emqx.acl_cache_size", [ - {default, 100}, +{mapping, "acl_cache_max_size", "emqx.acl_cache_max_size", [ + {default, 32}, {datatype, integer} ]}. diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 5b52312ef..07999b0fb 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -26,11 +26,13 @@ -export([get_acl_cache/2, put_acl_cache/3, - delete_acl_cache/2, cleanup_acl_cache/0, dump_acl_cache/0, get_cache_size/0, - get_newest_key/0 + get_newest_key/0, + get_oldest_key/0, + cache_k/2, + cache_v/1 ]). %% gen_server callbacks @@ -221,158 +223,124 @@ if_existed(_Mod, _Fun) -> %% ACL cache %%-------------------------------------------------------------------- +%% We'll cleanup the cache before repalcing an expired acl. -spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) -> (acl_result() | not_found)). get_acl_cache(PubSub, Topic) -> - case erlang:get({PubSub, Topic}) of + case erlang:get(cache_k(PubSub, Topic)) of undefined -> not_found; - {AclResult, CachedAt, _NextK, _PrevK} -> - if_acl_cache_expired(CachedAt, + {AclResult, CachedAt} -> + if_expired(CachedAt, fun(false) -> AclResult; (true) -> - %% this expired entry will get updated in - %% put_acl_cache/3 + cleanup_acl_cache(), not_found end) end. +%% If the cache get full, and also the latest one +%% is expired, then delete all the cache entries -spec(put_acl_cache(PubSub :: publish | subscribe, Topic :: topic(), AclResult :: acl_result()) -> ok). put_acl_cache(PubSub, Topic, AclResult) -> MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), Size = get_cache_size(), if - Size =:= 0 -> - create_first(PubSub, Topic, AclResult); Size < MaxSize -> - append(PubSub, Topic, AclResult); + add_acl_cache(PubSub, Topic, AclResult); Size =:= MaxSize -> - %% when the cache get full, and also the latest one - %% is expired, we'll perform a cleanup. NewestK = get_newest_key(), - {_AclResult, CachedAt, OldestK, _PrevK} = erlang:get(NewestK), - if_acl_cache_expired(CachedAt, + {_AclResult, CachedAt} = erlang:get(NewestK), + if_expired(CachedAt, fun(true) -> - % try to cleanup first - cleanup_acl_cache(OldestK), - add_cache(PubSub, Topic, AclResult); + % all cache expired, cleanup first + empty_acl_cache(), + add_acl_cache(PubSub, Topic, AclResult); (false) -> % cache full, perform cache replacement - delete_acl_cache(OldestK), - append(PubSub, Topic, AclResult) + evict_acl_cache(), + add_acl_cache(PubSub, Topic, AclResult) end) end. --spec(delete_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) -> ok). -delete_acl_cache(PubSub, Topic) -> - delete_acl_cache(_K = {PubSub, Topic}). -delete_acl_cache(K) -> - case erlang:get(K) of - undefined -> ok; - {_AclResult, _CachedAt, NextK, PrevK} when NextK =:= PrevK -> - %% there is only one entry in the cache - erlang:erase(K), - decr_cache_size(), - set_newest_key(undefined); - {_AclResult, _CachedAt, NextK, PrevK} -> - update_next(PrevK, NextK), - update_prev(NextK, PrevK), - erlang:erase(K), +empty_acl_cache() -> + map_acl_cache(fun({CacheK, _CacheV}) -> + erlang:erase(CacheK) + end), + set_cache_size(0), + set_keys_queue(queue:new()). - decr_cache_size(), - NewestK = get_newest_key(), - if - K =:= NewestK -> set_newest_key(NextK); - true -> ok - end +evict_acl_cache() -> + {{value, OldestK}, RemKeys} = queue:out(get_keys_queue()), + set_keys_queue(RemKeys), + erlang:erase(OldestK), + decr_cache_size(). + +add_acl_cache(PubSub, Topic, AclResult) -> + K = cache_k(PubSub, Topic), + V = cache_v(AclResult), + case get(K) of + undefined -> add_new_acl(K, V); + {_AclResult, _CachedAt} -> + update_acl(K, V) end. -%% evict all the exipired cache entries +add_new_acl(K, V) -> + erlang:put(K, V), + keys_queue_in(K), + incr_cache_size(). + +update_acl(K, V) -> + erlang:put(K, V), + keys_queue_update(K). + +%% cleanup all the exipired cache entries -spec(cleanup_acl_cache() -> ok). cleanup_acl_cache() -> - case get_newest_key() of - undefined -> ok; - NewestK -> - {_AclResult, _CachedAt, OldestK, _PrevK} = erlang:get(NewestK), - cleanup_acl_cache(OldestK) - end. -cleanup_acl_cache(FromK) -> - case erlang:get(FromK) of - undefined -> ok; - {_AclResult, CachedAt, NextK, _PrevK} -> - if_acl_cache_expired(CachedAt, - fun(false) -> - ok; + set_keys_queue( + cleanup_acl_cache(get_keys_queue())). + +cleanup_acl_cache(KeysQ) -> + case queue:out(KeysQ) of + {{value, OldestK}, RemKeys} -> + {_AclResult, CachedAt} = erlang:get(OldestK), + if_expired(CachedAt, + fun(false) -> KeysQ; (true) -> - delete_acl_cache(FromK), - cleanup_acl_cache(NextK) - end) + erlang:erase(OldestK), + decr_cache_size(), + cleanup_acl_cache(RemKeys) + end); + {empty, KeysQ} -> KeysQ + end. + +get_newest_key() -> + get_key(fun(KeysQ) -> queue:get_r(KeysQ) end). + +get_oldest_key() -> + get_key(fun(KeysQ) -> queue:get(KeysQ) end). + +get_key(Pick) -> + KeysQ = get_keys_queue(), + case queue:is_empty(KeysQ) of + true -> undefined; + false -> Pick(KeysQ) end. %% for test only dump_acl_cache() -> - [R || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish + map_acl_cache(fun(Cache) -> Cache end). +map_acl_cache(Fun) -> + [Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish orelse SubPub =:= subscribe]. -add_cache(PubSub, Topic, AclResult) -> - Size = get_cache_size(), - MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), - if - Size =:= 0 -> - create_first(PubSub, Topic, AclResult); - Size =:= MaxSize -> - OldestK = get_next_key(get_newest_key()), - delete_acl_cache(OldestK), - case get_cache_size() =:= 0 of - true -> create_first(PubSub, Topic, AclResult); - false -> append(PubSub, Topic, AclResult) - end; - true -> - append(PubSub, Topic, AclResult) - end. - -create_first(PubSub, Topic, AclResult) -> - K = cache_k(PubSub, Topic), - V = cache_v(AclResult, _NextK = K, _PrevK = K), - erlang:put(K, V), - set_cache_size(1), - set_newest_key(K). - -append(PubSub, Topic, AclResult) -> - %% try to update the existing one: - %% - we delete it and then append it at the tail - delete_acl_cache(PubSub, Topic), - - case get_cache_size() =:= 0 of - true -> create_first(PubSub, Topic, AclResult); - false -> - NewestK = get_newest_key(), - OldestK = get_next_key(NewestK), - K = cache_k(PubSub, Topic), - V = cache_v(AclResult, OldestK, NewestK), - erlang:put(K, V), - - update_next(NewestK, K), - update_prev(OldestK, K), - incr_cache_size(), - set_newest_key(K) - end. - -get_next_key(K) -> - erlang:element(3, erlang:get(K)). -update_next(K, NextK) -> - NoNext = erlang:delete_element(3, erlang:get(K)), - erlang:put(K, erlang:insert_element(3, NoNext, NextK)). -update_prev(K, PrevK) -> - NoPrev = erlang:delete_element(4, erlang:get(K)), - erlang:put(K, erlang:insert_element(4, NoPrev, PrevK)). cache_k(PubSub, Topic)-> {PubSub, Topic}. -cache_v(AclResult, NextK, PrevK)-> {AclResult, time_now(), NextK, PrevK}. +cache_v(AclResult)-> {AclResult, time_now()}. get_cache_max_size() -> - application:get_env(emqx, acl_cache_size, 100). + application:get_env(emqx, acl_cache_max_size, 0). get_cache_size() -> case erlang:get(acl_cache_size) of @@ -386,15 +354,31 @@ decr_cache_size() -> set_cache_size(N) -> erlang:put(acl_cache_size, N), ok. -get_newest_key() -> - erlang:get(acl_cache_newest_key). +keys_queue_in(Key) -> + %% delete the key first if exists + KeysQ = get_keys_queue(), + set_keys_queue(queue:in(Key, KeysQ)). -set_newest_key(Key) -> - erlang:put(acl_cache_newest_key, Key), ok. +keys_queue_update(Key) -> + NewKeysQ = remove_key(Key, get_keys_queue()), + set_keys_queue(queue:in(Key, NewKeysQ)). + +remove_key(Key, KeysQ) -> + queue:filter(fun + (K) when K =:= Key -> false; (_) -> true + end, KeysQ). + +set_keys_queue(KeysQ) -> + erlang:put(acl_keys_q, KeysQ), ok. +get_keys_queue() -> + case erlang:get(acl_keys_q) of + undefined -> queue:new(); + KeysQ -> KeysQ + end. time_now() -> erlang:system_time(millisecond). -if_acl_cache_expired(CachedAt, Fun) -> +if_expired(CachedAt, Fun) -> TTL = application:get_env(emqx, acl_cache_ttl, 60000), Now = time_now(), if (CachedAt + TTL) =< Now -> diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index f2ca9fad6..c3907c2db 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -22,6 +22,7 @@ -include("emqx.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). -define(AC, emqx_access_control). @@ -71,10 +72,10 @@ init_per_group(_Group, Config) -> prepare_config(Group = access_control) -> set_acl_config_file(Group), - application:set_env(emqx, acl_cache_size, 0); + application:set_env(emqx, acl_cache_max_size, 0); prepare_config(Group = access_control_cache_mode) -> set_acl_config_file(Group), - application:set_env(emqx, acl_cache_size, 100). + application:set_env(emqx, acl_cache_max_size, 100). set_acl_config_file(_Group) -> Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}, @@ -170,7 +171,7 @@ acl_cache_expiry(_) -> ok. acl_cache_full(_) -> - application:set_env(emqx, acl_cache_size, 1), + application:set_env(emqx, acl_cache_max_size, 1), SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), @@ -185,7 +186,7 @@ acl_cache_cleanup(_) -> %% The acl cache will try to evict memory, if the size is full and the newest %% cache entry is expired application:set_env(emqx, acl_cache_ttl, 1000), - application:set_env(emqx, acl_cache_size, 2), + application:set_env(emqx, acl_cache_max_size, 2), SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), @@ -206,7 +207,7 @@ acl_cache_cleanup(_) -> put_get_del_cache(_) -> application:set_env(emqx, acl_cache_ttl, 300000), - application:set_env(emqx, acl_cache_size, 30), + application:set_env(emqx, acl_cache_max_size, 30), not_found = ?AC:get_acl_cache(publish, <<"a">>), ok = ?AC:put_acl_cache(publish, <<"a">>, allow), @@ -217,11 +218,11 @@ put_get_del_cache(_) -> deny = ?AC:get_acl_cache(subscribe, <<"b">>), 2 = ?AC:get_cache_size(), - {subscribe, <<"b">>} = ?AC:get_newest_key(). + ?assertEqual(?AC:cache_k(subscribe, <<"b">>), ?AC:get_newest_key()). cache_expiry(_) -> application:set_env(emqx, acl_cache_ttl, 1000), - application:set_env(emqx, acl_cache_size, 30), + application:set_env(emqx, acl_cache_max_size, 30), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), allow = ?AC:get_acl_cache(subscribe, <<"a">>), @@ -236,25 +237,25 @@ cache_expiry(_) -> cache_update(_) -> application:set_env(emqx, acl_cache_ttl, 300000), - application:set_env(emqx, acl_cache_size, 30), + application:set_env(emqx, acl_cache_max_size, 30), [] = ?AC:dump_acl_cache(), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), ok = ?AC:put_acl_cache(publish, <<"b">>, allow), ok = ?AC:put_acl_cache(publish, <<"c">>, allow), 3 = ?AC:get_cache_size(), - {publish, <<"c">>} = ?AC:get_newest_key(), + ?assertEqual(?AC:cache_k(publish, <<"c">>), ?AC:get_newest_key()), %% update the 2nd one ok = ?AC:put_acl_cache(publish, <<"b">>, allow), %ct:pal("dump acl cache: ~p~n", [?AC:dump_acl_cache()]), 3 = ?AC:get_cache_size(), - {publish, <<"b">>} = ?AC:get_newest_key(). + ?assertEqual(?AC:cache_k(publish, <<"b">>), ?AC:get_newest_key()). cache_full_replacement(_) -> application:set_env(emqx, acl_cache_ttl, 300000), - application:set_env(emqx, acl_cache_size, 3), + application:set_env(emqx, acl_cache_max_size, 3), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), ok = ?AC:put_acl_cache(publish, <<"b">>, allow), ok = ?AC:put_acl_cache(publish, <<"c">>, allow), @@ -262,15 +263,15 @@ cache_full_replacement(_) -> allow = ?AC:get_acl_cache(publish, <<"b">>), allow = ?AC:get_acl_cache(publish, <<"c">>), 3 = ?AC:get_cache_size(), - {publish, <<"c">>} = ?AC:get_newest_key(), + ?assertEqual(?AC:cache_k(publish, <<"c">>), ?AC:get_newest_key()), ok = ?AC:put_acl_cache(publish, <<"d">>, deny), 3 = ?AC:get_cache_size(), - {publish, <<"d">>} = ?AC:get_newest_key(), + ?assertEqual(?AC:cache_k(publish, <<"d">>), ?AC:get_newest_key()), ok = ?AC:put_acl_cache(publish, <<"e">>, deny), 3 = ?AC:get_cache_size(), - {publish, <<"e">>} = ?AC:get_newest_key(), + ?assertEqual(?AC:cache_k(publish, <<"e">>), ?AC:get_newest_key()), not_found = ?AC:get_acl_cache(subscribe, <<"a">>), not_found = ?AC:get_acl_cache(publish, <<"b">>), @@ -278,7 +279,7 @@ cache_full_replacement(_) -> cache_cleanup(_) -> application:set_env(emqx, acl_cache_ttl, 1000), - application:set_env(emqx, acl_cache_size, 30), + application:set_env(emqx, acl_cache_max_size, 30), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), ok = ?AC:put_acl_cache(publish, <<"b">>, allow), ok = ?AC:put_acl_cache(publish, <<"c">>, allow), @@ -290,7 +291,7 @@ cache_cleanup(_) -> cache_full_cleanup(_) -> application:set_env(emqx, acl_cache_ttl, 1000), - application:set_env(emqx, acl_cache_size, 3), + application:set_env(emqx, acl_cache_max_size, 3), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), ok = ?AC:put_acl_cache(publish, <<"b">>, allow), ok = ?AC:put_acl_cache(publish, <<"c">>, allow),