improve cache datastruct using keys-queue

This commit is contained in:
terry-xiaoyu 2018-08-26 12:40:23 +08:00
parent a904031979
commit 8cd20744be
4 changed files with 123 additions and 137 deletions

View File

@ -436,11 +436,12 @@ acl_file = {{ platform_etc_dir }}/acl.conf
## The ACL cache size ## The ACL cache size
## The maximum count of ACL entries allowed for a client. ## The maximum count of ACL entries allowed for a client.
##
## Value 0 disables ACL cache ## Value 0 disables ACL cache
## ##
## Value: Integer ## Value: Integer
## Default: 100 ## Default: 32
acl_cache_size = 100 acl_cache_max_size = 32
## The ACL cache time-to-live. ## The ACL cache time-to-live.
## The time after which an ACL cache entry will be invalid ## The time after which an ACL cache entry will be invalid

View File

@ -578,8 +578,8 @@ end}.
]}. ]}.
%% @doc ACL cache size. %% @doc ACL cache size.
{mapping, "acl_cache_size", "emqx.acl_cache_size", [ {mapping, "acl_cache_max_size", "emqx.acl_cache_max_size", [
{default, 100}, {default, 32},
{datatype, integer} {datatype, integer}
]}. ]}.

View File

@ -26,11 +26,13 @@
-export([get_acl_cache/2, -export([get_acl_cache/2,
put_acl_cache/3, put_acl_cache/3,
delete_acl_cache/2,
cleanup_acl_cache/0, cleanup_acl_cache/0,
dump_acl_cache/0, dump_acl_cache/0,
get_cache_size/0, get_cache_size/0,
get_newest_key/0 get_newest_key/0,
get_oldest_key/0,
cache_k/2,
cache_v/1
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -221,158 +223,124 @@ if_existed(_Mod, _Fun) ->
%% ACL cache %% ACL cache
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% We'll cleanup the cache before repalcing an expired acl.
-spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) -spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic())
-> (acl_result() | not_found)). -> (acl_result() | not_found)).
get_acl_cache(PubSub, Topic) -> get_acl_cache(PubSub, Topic) ->
case erlang:get({PubSub, Topic}) of case erlang:get(cache_k(PubSub, Topic)) of
undefined -> not_found; undefined -> not_found;
{AclResult, CachedAt, _NextK, _PrevK} -> {AclResult, CachedAt} ->
if_acl_cache_expired(CachedAt, if_expired(CachedAt,
fun(false) -> fun(false) ->
AclResult; AclResult;
(true) -> (true) ->
%% this expired entry will get updated in cleanup_acl_cache(),
%% put_acl_cache/3
not_found not_found
end) end)
end. end.
%% If the cache get full, and also the latest one
%% is expired, then delete all the cache entries
-spec(put_acl_cache(PubSub :: publish | subscribe, -spec(put_acl_cache(PubSub :: publish | subscribe,
Topic :: topic(), AclResult :: acl_result()) -> ok). Topic :: topic(), AclResult :: acl_result()) -> ok).
put_acl_cache(PubSub, Topic, AclResult) -> put_acl_cache(PubSub, Topic, AclResult) ->
MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
Size = get_cache_size(), Size = get_cache_size(),
if if
Size =:= 0 ->
create_first(PubSub, Topic, AclResult);
Size < MaxSize -> Size < MaxSize ->
append(PubSub, Topic, AclResult); add_acl_cache(PubSub, Topic, AclResult);
Size =:= MaxSize -> Size =:= MaxSize ->
%% when the cache get full, and also the latest one
%% is expired, we'll perform a cleanup.
NewestK = get_newest_key(), NewestK = get_newest_key(),
{_AclResult, CachedAt, OldestK, _PrevK} = erlang:get(NewestK), {_AclResult, CachedAt} = erlang:get(NewestK),
if_acl_cache_expired(CachedAt, if_expired(CachedAt,
fun(true) -> fun(true) ->
% try to cleanup first % all cache expired, cleanup first
cleanup_acl_cache(OldestK), empty_acl_cache(),
add_cache(PubSub, Topic, AclResult); add_acl_cache(PubSub, Topic, AclResult);
(false) -> (false) ->
% cache full, perform cache replacement % cache full, perform cache replacement
delete_acl_cache(OldestK), evict_acl_cache(),
append(PubSub, Topic, AclResult) add_acl_cache(PubSub, Topic, AclResult)
end) end)
end. end.
-spec(delete_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) -> ok). empty_acl_cache() ->
delete_acl_cache(PubSub, Topic) -> map_acl_cache(fun({CacheK, _CacheV}) ->
delete_acl_cache(_K = {PubSub, Topic}). erlang:erase(CacheK)
delete_acl_cache(K) -> end),
case erlang:get(K) of set_cache_size(0),
undefined -> ok; set_keys_queue(queue:new()).
{_AclResult, _CachedAt, NextK, PrevK} when NextK =:= PrevK ->
%% there is only one entry in the cache
erlang:erase(K),
decr_cache_size(),
set_newest_key(undefined);
{_AclResult, _CachedAt, NextK, PrevK} ->
update_next(PrevK, NextK),
update_prev(NextK, PrevK),
erlang:erase(K),
decr_cache_size(), evict_acl_cache() ->
NewestK = get_newest_key(), {{value, OldestK}, RemKeys} = queue:out(get_keys_queue()),
if set_keys_queue(RemKeys),
K =:= NewestK -> set_newest_key(NextK); erlang:erase(OldestK),
true -> ok decr_cache_size().
end
add_acl_cache(PubSub, Topic, AclResult) ->
K = cache_k(PubSub, Topic),
V = cache_v(AclResult),
case get(K) of
undefined -> add_new_acl(K, V);
{_AclResult, _CachedAt} ->
update_acl(K, V)
end. end.
%% evict all the exipired cache entries add_new_acl(K, V) ->
erlang:put(K, V),
keys_queue_in(K),
incr_cache_size().
update_acl(K, V) ->
erlang:put(K, V),
keys_queue_update(K).
%% cleanup all the exipired cache entries
-spec(cleanup_acl_cache() -> ok). -spec(cleanup_acl_cache() -> ok).
cleanup_acl_cache() -> cleanup_acl_cache() ->
case get_newest_key() of set_keys_queue(
undefined -> ok; cleanup_acl_cache(get_keys_queue())).
NewestK ->
{_AclResult, _CachedAt, OldestK, _PrevK} = erlang:get(NewestK), cleanup_acl_cache(KeysQ) ->
cleanup_acl_cache(OldestK) case queue:out(KeysQ) of
end. {{value, OldestK}, RemKeys} ->
cleanup_acl_cache(FromK) -> {_AclResult, CachedAt} = erlang:get(OldestK),
case erlang:get(FromK) of if_expired(CachedAt,
undefined -> ok; fun(false) -> KeysQ;
{_AclResult, CachedAt, NextK, _PrevK} ->
if_acl_cache_expired(CachedAt,
fun(false) ->
ok;
(true) -> (true) ->
delete_acl_cache(FromK), erlang:erase(OldestK),
cleanup_acl_cache(NextK) decr_cache_size(),
end) cleanup_acl_cache(RemKeys)
end);
{empty, KeysQ} -> KeysQ
end.
get_newest_key() ->
get_key(fun(KeysQ) -> queue:get_r(KeysQ) end).
get_oldest_key() ->
get_key(fun(KeysQ) -> queue:get(KeysQ) end).
get_key(Pick) ->
KeysQ = get_keys_queue(),
case queue:is_empty(KeysQ) of
true -> undefined;
false -> Pick(KeysQ)
end. end.
%% for test only %% for test only
dump_acl_cache() -> dump_acl_cache() ->
[R || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish map_acl_cache(fun(Cache) -> Cache end).
map_acl_cache(Fun) ->
[Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish
orelse SubPub =:= subscribe]. orelse SubPub =:= subscribe].
add_cache(PubSub, Topic, AclResult) ->
Size = get_cache_size(),
MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
if
Size =:= 0 ->
create_first(PubSub, Topic, AclResult);
Size =:= MaxSize ->
OldestK = get_next_key(get_newest_key()),
delete_acl_cache(OldestK),
case get_cache_size() =:= 0 of
true -> create_first(PubSub, Topic, AclResult);
false -> append(PubSub, Topic, AclResult)
end;
true ->
append(PubSub, Topic, AclResult)
end.
create_first(PubSub, Topic, AclResult) ->
K = cache_k(PubSub, Topic),
V = cache_v(AclResult, _NextK = K, _PrevK = K),
erlang:put(K, V),
set_cache_size(1),
set_newest_key(K).
append(PubSub, Topic, AclResult) ->
%% try to update the existing one:
%% - we delete it and then append it at the tail
delete_acl_cache(PubSub, Topic),
case get_cache_size() =:= 0 of
true -> create_first(PubSub, Topic, AclResult);
false ->
NewestK = get_newest_key(),
OldestK = get_next_key(NewestK),
K = cache_k(PubSub, Topic),
V = cache_v(AclResult, OldestK, NewestK),
erlang:put(K, V),
update_next(NewestK, K),
update_prev(OldestK, K),
incr_cache_size(),
set_newest_key(K)
end.
get_next_key(K) ->
erlang:element(3, erlang:get(K)).
update_next(K, NextK) ->
NoNext = erlang:delete_element(3, erlang:get(K)),
erlang:put(K, erlang:insert_element(3, NoNext, NextK)).
update_prev(K, PrevK) ->
NoPrev = erlang:delete_element(4, erlang:get(K)),
erlang:put(K, erlang:insert_element(4, NoPrev, PrevK)).
cache_k(PubSub, Topic)-> {PubSub, Topic}. cache_k(PubSub, Topic)-> {PubSub, Topic}.
cache_v(AclResult, NextK, PrevK)-> {AclResult, time_now(), NextK, PrevK}. cache_v(AclResult)-> {AclResult, time_now()}.
get_cache_max_size() -> get_cache_max_size() ->
application:get_env(emqx, acl_cache_size, 100). application:get_env(emqx, acl_cache_max_size, 0).
get_cache_size() -> get_cache_size() ->
case erlang:get(acl_cache_size) of case erlang:get(acl_cache_size) of
@ -386,15 +354,31 @@ decr_cache_size() ->
set_cache_size(N) -> set_cache_size(N) ->
erlang:put(acl_cache_size, N), ok. erlang:put(acl_cache_size, N), ok.
get_newest_key() -> keys_queue_in(Key) ->
erlang:get(acl_cache_newest_key). %% delete the key first if exists
KeysQ = get_keys_queue(),
set_keys_queue(queue:in(Key, KeysQ)).
set_newest_key(Key) -> keys_queue_update(Key) ->
erlang:put(acl_cache_newest_key, Key), ok. NewKeysQ = remove_key(Key, get_keys_queue()),
set_keys_queue(queue:in(Key, NewKeysQ)).
remove_key(Key, KeysQ) ->
queue:filter(fun
(K) when K =:= Key -> false; (_) -> true
end, KeysQ).
set_keys_queue(KeysQ) ->
erlang:put(acl_keys_q, KeysQ), ok.
get_keys_queue() ->
case erlang:get(acl_keys_q) of
undefined -> queue:new();
KeysQ -> KeysQ
end.
time_now() -> erlang:system_time(millisecond). time_now() -> erlang:system_time(millisecond).
if_acl_cache_expired(CachedAt, Fun) -> if_expired(CachedAt, Fun) ->
TTL = application:get_env(emqx, acl_cache_ttl, 60000), TTL = application:get_env(emqx, acl_cache_ttl, 60000),
Now = time_now(), Now = time_now(),
if (CachedAt + TTL) =< Now -> if (CachedAt + TTL) =< Now ->

View File

@ -22,6 +22,7 @@
-include("emqx.hrl"). -include("emqx.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(AC, emqx_access_control). -define(AC, emqx_access_control).
@ -71,10 +72,10 @@ init_per_group(_Group, Config) ->
prepare_config(Group = access_control) -> prepare_config(Group = access_control) ->
set_acl_config_file(Group), set_acl_config_file(Group),
application:set_env(emqx, acl_cache_size, 0); application:set_env(emqx, acl_cache_max_size, 0);
prepare_config(Group = access_control_cache_mode) -> prepare_config(Group = access_control_cache_mode) ->
set_acl_config_file(Group), set_acl_config_file(Group),
application:set_env(emqx, acl_cache_size, 100). application:set_env(emqx, acl_cache_max_size, 100).
set_acl_config_file(_Group) -> set_acl_config_file(_Group) ->
Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}, Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]},
@ -170,7 +171,7 @@ acl_cache_expiry(_) ->
ok. ok.
acl_cache_full(_) -> acl_cache_full(_) ->
application:set_env(emqx, acl_cache_size, 1), application:set_env(emqx, acl_cache_max_size, 1),
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
@ -185,7 +186,7 @@ acl_cache_cleanup(_) ->
%% The acl cache will try to evict memory, if the size is full and the newest %% The acl cache will try to evict memory, if the size is full and the newest
%% cache entry is expired %% cache entry is expired
application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_ttl, 1000),
application:set_env(emqx, acl_cache_size, 2), application:set_env(emqx, acl_cache_max_size, 2),
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
@ -206,7 +207,7 @@ acl_cache_cleanup(_) ->
put_get_del_cache(_) -> put_get_del_cache(_) ->
application:set_env(emqx, acl_cache_ttl, 300000), application:set_env(emqx, acl_cache_ttl, 300000),
application:set_env(emqx, acl_cache_size, 30), application:set_env(emqx, acl_cache_max_size, 30),
not_found = ?AC:get_acl_cache(publish, <<"a">>), not_found = ?AC:get_acl_cache(publish, <<"a">>),
ok = ?AC:put_acl_cache(publish, <<"a">>, allow), ok = ?AC:put_acl_cache(publish, <<"a">>, allow),
@ -217,11 +218,11 @@ put_get_del_cache(_) ->
deny = ?AC:get_acl_cache(subscribe, <<"b">>), deny = ?AC:get_acl_cache(subscribe, <<"b">>),
2 = ?AC:get_cache_size(), 2 = ?AC:get_cache_size(),
{subscribe, <<"b">>} = ?AC:get_newest_key(). ?assertEqual(?AC:cache_k(subscribe, <<"b">>), ?AC:get_newest_key()).
cache_expiry(_) -> cache_expiry(_) ->
application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_ttl, 1000),
application:set_env(emqx, acl_cache_size, 30), application:set_env(emqx, acl_cache_max_size, 30),
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
allow = ?AC:get_acl_cache(subscribe, <<"a">>), allow = ?AC:get_acl_cache(subscribe, <<"a">>),
@ -236,25 +237,25 @@ cache_expiry(_) ->
cache_update(_) -> cache_update(_) ->
application:set_env(emqx, acl_cache_ttl, 300000), application:set_env(emqx, acl_cache_ttl, 300000),
application:set_env(emqx, acl_cache_size, 30), application:set_env(emqx, acl_cache_max_size, 30),
[] = ?AC:dump_acl_cache(), [] = ?AC:dump_acl_cache(),
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?AC:put_acl_cache(publish, <<"b">>, allow), ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
ok = ?AC:put_acl_cache(publish, <<"c">>, allow), ok = ?AC:put_acl_cache(publish, <<"c">>, allow),
3 = ?AC:get_cache_size(), 3 = ?AC:get_cache_size(),
{publish, <<"c">>} = ?AC:get_newest_key(), ?assertEqual(?AC:cache_k(publish, <<"c">>), ?AC:get_newest_key()),
%% update the 2nd one %% update the 2nd one
ok = ?AC:put_acl_cache(publish, <<"b">>, allow), ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
%ct:pal("dump acl cache: ~p~n", [?AC:dump_acl_cache()]), %ct:pal("dump acl cache: ~p~n", [?AC:dump_acl_cache()]),
3 = ?AC:get_cache_size(), 3 = ?AC:get_cache_size(),
{publish, <<"b">>} = ?AC:get_newest_key(). ?assertEqual(?AC:cache_k(publish, <<"b">>), ?AC:get_newest_key()).
cache_full_replacement(_) -> cache_full_replacement(_) ->
application:set_env(emqx, acl_cache_ttl, 300000), application:set_env(emqx, acl_cache_ttl, 300000),
application:set_env(emqx, acl_cache_size, 3), application:set_env(emqx, acl_cache_max_size, 3),
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?AC:put_acl_cache(publish, <<"b">>, allow), ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
ok = ?AC:put_acl_cache(publish, <<"c">>, allow), ok = ?AC:put_acl_cache(publish, <<"c">>, allow),
@ -262,15 +263,15 @@ cache_full_replacement(_) ->
allow = ?AC:get_acl_cache(publish, <<"b">>), allow = ?AC:get_acl_cache(publish, <<"b">>),
allow = ?AC:get_acl_cache(publish, <<"c">>), allow = ?AC:get_acl_cache(publish, <<"c">>),
3 = ?AC:get_cache_size(), 3 = ?AC:get_cache_size(),
{publish, <<"c">>} = ?AC:get_newest_key(), ?assertEqual(?AC:cache_k(publish, <<"c">>), ?AC:get_newest_key()),
ok = ?AC:put_acl_cache(publish, <<"d">>, deny), ok = ?AC:put_acl_cache(publish, <<"d">>, deny),
3 = ?AC:get_cache_size(), 3 = ?AC:get_cache_size(),
{publish, <<"d">>} = ?AC:get_newest_key(), ?assertEqual(?AC:cache_k(publish, <<"d">>), ?AC:get_newest_key()),
ok = ?AC:put_acl_cache(publish, <<"e">>, deny), ok = ?AC:put_acl_cache(publish, <<"e">>, deny),
3 = ?AC:get_cache_size(), 3 = ?AC:get_cache_size(),
{publish, <<"e">>} = ?AC:get_newest_key(), ?assertEqual(?AC:cache_k(publish, <<"e">>), ?AC:get_newest_key()),
not_found = ?AC:get_acl_cache(subscribe, <<"a">>), not_found = ?AC:get_acl_cache(subscribe, <<"a">>),
not_found = ?AC:get_acl_cache(publish, <<"b">>), not_found = ?AC:get_acl_cache(publish, <<"b">>),
@ -278,7 +279,7 @@ cache_full_replacement(_) ->
cache_cleanup(_) -> cache_cleanup(_) ->
application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_ttl, 1000),
application:set_env(emqx, acl_cache_size, 30), application:set_env(emqx, acl_cache_max_size, 30),
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?AC:put_acl_cache(publish, <<"b">>, allow), ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
ok = ?AC:put_acl_cache(publish, <<"c">>, allow), ok = ?AC:put_acl_cache(publish, <<"c">>, allow),
@ -290,7 +291,7 @@ cache_cleanup(_) ->
cache_full_cleanup(_) -> cache_full_cleanup(_) ->
application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_ttl, 1000),
application:set_env(emqx, acl_cache_size, 3), application:set_env(emqx, acl_cache_max_size, 3),
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?AC:put_acl_cache(publish, <<"b">>, allow), ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
ok = ?AC:put_acl_cache(publish, <<"c">>, allow), ok = ?AC:put_acl_cache(publish, <<"c">>, allow),