add module emqx_acl_cache
This commit is contained in:
parent
8cd20744be
commit
9717f9b83e
|
@ -24,17 +24,6 @@
|
||||||
-export([register_mod/3, register_mod/4, unregister_mod/2]).
|
-export([register_mod/3, register_mod/4, unregister_mod/2]).
|
||||||
-export([stop/0]).
|
-export([stop/0]).
|
||||||
|
|
||||||
-export([get_acl_cache/2,
|
|
||||||
put_acl_cache/3,
|
|
||||||
cleanup_acl_cache/0,
|
|
||||||
dump_acl_cache/0,
|
|
||||||
get_cache_size/0,
|
|
||||||
get_newest_key/0,
|
|
||||||
get_oldest_key/0,
|
|
||||||
cache_k/2,
|
|
||||||
cache_v/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||||
code_change/3]).
|
code_change/3]).
|
||||||
|
@ -43,7 +32,6 @@
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-type(password() :: undefined | binary()).
|
-type(password() :: undefined | binary()).
|
||||||
-type(acl_result() :: allow | deny).
|
|
||||||
|
|
||||||
-record(state, {}).
|
-record(state, {}).
|
||||||
|
|
||||||
|
@ -93,16 +81,16 @@ authenticate(Client, Password, [{Mod, State, _Seq} | Mods]) ->
|
||||||
%% @doc Check ACL
|
%% @doc Check ACL
|
||||||
-spec(check_acl(client(), pubsub(), topic()) -> allow | deny).
|
-spec(check_acl(client(), pubsub(), topic()) -> allow | deny).
|
||||||
check_acl(Client, PubSub, Topic) when ?PS(PubSub) ->
|
check_acl(Client, PubSub, Topic) when ?PS(PubSub) ->
|
||||||
CacheEnabled = (get_cache_max_size() =/= 0),
|
CacheEnabled = (emqx_acl_cache:get_cache_max_size() =/= 0),
|
||||||
check_acl(Client, PubSub, Topic, lookup_mods(acl), CacheEnabled).
|
check_acl(Client, PubSub, Topic, lookup_mods(acl), CacheEnabled).
|
||||||
|
|
||||||
check_acl(Client, PubSub, Topic, AclMods, false) ->
|
check_acl(Client, PubSub, Topic, AclMods, false) ->
|
||||||
check_acl_from_plugins(Client, PubSub, Topic, AclMods);
|
check_acl_from_plugins(Client, PubSub, Topic, AclMods);
|
||||||
check_acl(Client, PubSub, Topic, AclMods, true) ->
|
check_acl(Client, PubSub, Topic, AclMods, true) ->
|
||||||
case get_acl_cache(PubSub, Topic) of
|
case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
|
||||||
not_found ->
|
not_found ->
|
||||||
AclResult = check_acl_from_plugins(Client, PubSub, Topic, AclMods),
|
AclResult = check_acl_from_plugins(Client, PubSub, Topic, AclMods),
|
||||||
put_acl_cache(PubSub, Topic, AclResult),
|
emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult),
|
||||||
AclResult;
|
AclResult;
|
||||||
AclResult ->
|
AclResult ->
|
||||||
AclResult
|
AclResult
|
||||||
|
@ -218,171 +206,3 @@ if_existed(false, Fun) ->
|
||||||
Fun();
|
Fun();
|
||||||
if_existed(_Mod, _Fun) ->
|
if_existed(_Mod, _Fun) ->
|
||||||
{error, already_existed}.
|
{error, already_existed}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% ACL cache
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% We'll cleanup the cache before repalcing an expired acl.
|
|
||||||
-spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic())
|
|
||||||
-> (acl_result() | not_found)).
|
|
||||||
get_acl_cache(PubSub, Topic) ->
|
|
||||||
case erlang:get(cache_k(PubSub, Topic)) of
|
|
||||||
undefined -> not_found;
|
|
||||||
{AclResult, CachedAt} ->
|
|
||||||
if_expired(CachedAt,
|
|
||||||
fun(false) ->
|
|
||||||
AclResult;
|
|
||||||
(true) ->
|
|
||||||
cleanup_acl_cache(),
|
|
||||||
not_found
|
|
||||||
end)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% If the cache get full, and also the latest one
|
|
||||||
%% is expired, then delete all the cache entries
|
|
||||||
-spec(put_acl_cache(PubSub :: publish | subscribe,
|
|
||||||
Topic :: topic(), AclResult :: acl_result()) -> ok).
|
|
||||||
put_acl_cache(PubSub, Topic, AclResult) ->
|
|
||||||
MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
|
|
||||||
Size = get_cache_size(),
|
|
||||||
if
|
|
||||||
Size < MaxSize ->
|
|
||||||
add_acl_cache(PubSub, Topic, AclResult);
|
|
||||||
Size =:= MaxSize ->
|
|
||||||
NewestK = get_newest_key(),
|
|
||||||
{_AclResult, CachedAt} = erlang:get(NewestK),
|
|
||||||
if_expired(CachedAt,
|
|
||||||
fun(true) ->
|
|
||||||
% all cache expired, cleanup first
|
|
||||||
empty_acl_cache(),
|
|
||||||
add_acl_cache(PubSub, Topic, AclResult);
|
|
||||||
(false) ->
|
|
||||||
% cache full, perform cache replacement
|
|
||||||
evict_acl_cache(),
|
|
||||||
add_acl_cache(PubSub, Topic, AclResult)
|
|
||||||
end)
|
|
||||||
end.
|
|
||||||
|
|
||||||
empty_acl_cache() ->
|
|
||||||
map_acl_cache(fun({CacheK, _CacheV}) ->
|
|
||||||
erlang:erase(CacheK)
|
|
||||||
end),
|
|
||||||
set_cache_size(0),
|
|
||||||
set_keys_queue(queue:new()).
|
|
||||||
|
|
||||||
evict_acl_cache() ->
|
|
||||||
{{value, OldestK}, RemKeys} = queue:out(get_keys_queue()),
|
|
||||||
set_keys_queue(RemKeys),
|
|
||||||
erlang:erase(OldestK),
|
|
||||||
decr_cache_size().
|
|
||||||
|
|
||||||
add_acl_cache(PubSub, Topic, AclResult) ->
|
|
||||||
K = cache_k(PubSub, Topic),
|
|
||||||
V = cache_v(AclResult),
|
|
||||||
case get(K) of
|
|
||||||
undefined -> add_new_acl(K, V);
|
|
||||||
{_AclResult, _CachedAt} ->
|
|
||||||
update_acl(K, V)
|
|
||||||
end.
|
|
||||||
|
|
||||||
add_new_acl(K, V) ->
|
|
||||||
erlang:put(K, V),
|
|
||||||
keys_queue_in(K),
|
|
||||||
incr_cache_size().
|
|
||||||
|
|
||||||
update_acl(K, V) ->
|
|
||||||
erlang:put(K, V),
|
|
||||||
keys_queue_update(K).
|
|
||||||
|
|
||||||
%% cleanup all the exipired cache entries
|
|
||||||
-spec(cleanup_acl_cache() -> ok).
|
|
||||||
cleanup_acl_cache() ->
|
|
||||||
set_keys_queue(
|
|
||||||
cleanup_acl_cache(get_keys_queue())).
|
|
||||||
|
|
||||||
cleanup_acl_cache(KeysQ) ->
|
|
||||||
case queue:out(KeysQ) of
|
|
||||||
{{value, OldestK}, RemKeys} ->
|
|
||||||
{_AclResult, CachedAt} = erlang:get(OldestK),
|
|
||||||
if_expired(CachedAt,
|
|
||||||
fun(false) -> KeysQ;
|
|
||||||
(true) ->
|
|
||||||
erlang:erase(OldestK),
|
|
||||||
decr_cache_size(),
|
|
||||||
cleanup_acl_cache(RemKeys)
|
|
||||||
end);
|
|
||||||
{empty, KeysQ} -> KeysQ
|
|
||||||
end.
|
|
||||||
|
|
||||||
get_newest_key() ->
|
|
||||||
get_key(fun(KeysQ) -> queue:get_r(KeysQ) end).
|
|
||||||
|
|
||||||
get_oldest_key() ->
|
|
||||||
get_key(fun(KeysQ) -> queue:get(KeysQ) end).
|
|
||||||
|
|
||||||
get_key(Pick) ->
|
|
||||||
KeysQ = get_keys_queue(),
|
|
||||||
case queue:is_empty(KeysQ) of
|
|
||||||
true -> undefined;
|
|
||||||
false -> Pick(KeysQ)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% for test only
|
|
||||||
dump_acl_cache() ->
|
|
||||||
map_acl_cache(fun(Cache) -> Cache end).
|
|
||||||
map_acl_cache(Fun) ->
|
|
||||||
[Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish
|
|
||||||
orelse SubPub =:= subscribe].
|
|
||||||
|
|
||||||
|
|
||||||
cache_k(PubSub, Topic)-> {PubSub, Topic}.
|
|
||||||
cache_v(AclResult)-> {AclResult, time_now()}.
|
|
||||||
|
|
||||||
get_cache_max_size() ->
|
|
||||||
application:get_env(emqx, acl_cache_max_size, 0).
|
|
||||||
|
|
||||||
get_cache_size() ->
|
|
||||||
case erlang:get(acl_cache_size) of
|
|
||||||
undefined -> 0;
|
|
||||||
Size -> Size
|
|
||||||
end.
|
|
||||||
incr_cache_size() ->
|
|
||||||
erlang:put(acl_cache_size, get_cache_size() + 1), ok.
|
|
||||||
decr_cache_size() ->
|
|
||||||
erlang:put(acl_cache_size, get_cache_size() - 1), ok.
|
|
||||||
set_cache_size(N) ->
|
|
||||||
erlang:put(acl_cache_size, N), ok.
|
|
||||||
|
|
||||||
keys_queue_in(Key) ->
|
|
||||||
%% delete the key first if exists
|
|
||||||
KeysQ = get_keys_queue(),
|
|
||||||
set_keys_queue(queue:in(Key, KeysQ)).
|
|
||||||
|
|
||||||
keys_queue_update(Key) ->
|
|
||||||
NewKeysQ = remove_key(Key, get_keys_queue()),
|
|
||||||
set_keys_queue(queue:in(Key, NewKeysQ)).
|
|
||||||
|
|
||||||
remove_key(Key, KeysQ) ->
|
|
||||||
queue:filter(fun
|
|
||||||
(K) when K =:= Key -> false; (_) -> true
|
|
||||||
end, KeysQ).
|
|
||||||
|
|
||||||
set_keys_queue(KeysQ) ->
|
|
||||||
erlang:put(acl_keys_q, KeysQ), ok.
|
|
||||||
get_keys_queue() ->
|
|
||||||
case erlang:get(acl_keys_q) of
|
|
||||||
undefined -> queue:new();
|
|
||||||
KeysQ -> KeysQ
|
|
||||||
end.
|
|
||||||
|
|
||||||
time_now() -> erlang:system_time(millisecond).
|
|
||||||
|
|
||||||
if_expired(CachedAt, Fun) ->
|
|
||||||
TTL = application:get_env(emqx, acl_cache_ttl, 60000),
|
|
||||||
Now = time_now(),
|
|
||||||
if (CachedAt + TTL) =< Now ->
|
|
||||||
Fun(true);
|
|
||||||
true ->
|
|
||||||
Fun(false)
|
|
||||||
end.
|
|
||||||
|
|
|
@ -0,0 +1,204 @@
|
||||||
|
-module(emqx_acl_cache).
|
||||||
|
|
||||||
|
-include("emqx.hrl").
|
||||||
|
|
||||||
|
-export([ get_acl_cache/2
|
||||||
|
, put_acl_cache/3
|
||||||
|
, cleanup_acl_cache/0
|
||||||
|
, empty_acl_cache/0
|
||||||
|
, dump_acl_cache/0
|
||||||
|
, get_cache_size/0
|
||||||
|
, get_cache_max_size/0
|
||||||
|
, get_newest_key/0
|
||||||
|
, get_oldest_key/0
|
||||||
|
, cache_k/2
|
||||||
|
, cache_v/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-type(acl_result() :: allow | deny).
|
||||||
|
|
||||||
|
%% Wrappers for key and value
|
||||||
|
cache_k(PubSub, Topic)-> {PubSub, Topic}.
|
||||||
|
cache_v(AclResult)-> {AclResult, time_now()}.
|
||||||
|
|
||||||
|
%% We'll cleanup the cache before repalcing an expired acl.
|
||||||
|
-spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic())
|
||||||
|
-> (acl_result() | not_found)).
|
||||||
|
get_acl_cache(PubSub, Topic) ->
|
||||||
|
case erlang:get(cache_k(PubSub, Topic)) of
|
||||||
|
undefined -> not_found;
|
||||||
|
{AclResult, CachedAt} ->
|
||||||
|
if_expired(CachedAt,
|
||||||
|
fun(false) ->
|
||||||
|
AclResult;
|
||||||
|
(true) ->
|
||||||
|
cleanup_acl_cache(),
|
||||||
|
not_found
|
||||||
|
end)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% If the cache get full, and also the latest one
|
||||||
|
%% is expired, then delete all the cache entries
|
||||||
|
-spec(put_acl_cache(PubSub :: publish | subscribe,
|
||||||
|
Topic :: topic(), AclResult :: acl_result()) -> ok).
|
||||||
|
put_acl_cache(PubSub, Topic, AclResult) ->
|
||||||
|
MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
|
||||||
|
Size = get_cache_size(),
|
||||||
|
if
|
||||||
|
Size < MaxSize ->
|
||||||
|
add_acl(PubSub, Topic, AclResult);
|
||||||
|
Size =:= MaxSize ->
|
||||||
|
NewestK = get_newest_key(),
|
||||||
|
{_AclResult, CachedAt} = erlang:get(NewestK),
|
||||||
|
if_expired(CachedAt,
|
||||||
|
fun(true) ->
|
||||||
|
% all cache expired, cleanup first
|
||||||
|
empty_acl_cache(),
|
||||||
|
add_acl(PubSub, Topic, AclResult);
|
||||||
|
(false) ->
|
||||||
|
% cache full, perform cache replacement
|
||||||
|
evict_acl_cache(),
|
||||||
|
add_acl(PubSub, Topic, AclResult)
|
||||||
|
end)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% delete all the acl entries
|
||||||
|
-spec(empty_acl_cache() -> ok).
|
||||||
|
empty_acl_cache() ->
|
||||||
|
map_acl_cache(fun({CacheK, _CacheV}) ->
|
||||||
|
erlang:erase(CacheK)
|
||||||
|
end),
|
||||||
|
set_cache_size(0),
|
||||||
|
keys_queue_set(queue:new()).
|
||||||
|
|
||||||
|
%% delete the oldest acl entry
|
||||||
|
-spec(evict_acl_cache() -> ok).
|
||||||
|
evict_acl_cache() ->
|
||||||
|
OldestK = keys_queue_out(),
|
||||||
|
erlang:erase(OldestK),
|
||||||
|
decr_cache_size().
|
||||||
|
|
||||||
|
%% cleanup all the exipired cache entries
|
||||||
|
-spec(cleanup_acl_cache() -> ok).
|
||||||
|
cleanup_acl_cache() ->
|
||||||
|
keys_queue_set(
|
||||||
|
cleanup_acl(keys_queue_get())).
|
||||||
|
|
||||||
|
get_oldest_key() ->
|
||||||
|
keys_queue_pick(queue_front()).
|
||||||
|
get_newest_key() ->
|
||||||
|
keys_queue_pick(queue_rear()).
|
||||||
|
|
||||||
|
get_cache_max_size() ->
|
||||||
|
application:get_env(emqx, acl_cache_max_size, 0).
|
||||||
|
|
||||||
|
get_cache_size() ->
|
||||||
|
case erlang:get(acl_cache_size) of
|
||||||
|
undefined -> 0;
|
||||||
|
Size -> Size
|
||||||
|
end.
|
||||||
|
|
||||||
|
dump_acl_cache() ->
|
||||||
|
map_acl_cache(fun(Cache) -> Cache end).
|
||||||
|
map_acl_cache(Fun) ->
|
||||||
|
[Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish
|
||||||
|
orelse SubPub =:= subscribe].
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
add_acl(PubSub, Topic, AclResult) ->
|
||||||
|
K = cache_k(PubSub, Topic),
|
||||||
|
V = cache_v(AclResult),
|
||||||
|
case erlang:get(K) of
|
||||||
|
undefined -> add_new_acl(K, V);
|
||||||
|
{_AclResult, _CachedAt} ->
|
||||||
|
update_acl(K, V)
|
||||||
|
end.
|
||||||
|
|
||||||
|
add_new_acl(K, V) ->
|
||||||
|
erlang:put(K, V),
|
||||||
|
keys_queue_in(K),
|
||||||
|
incr_cache_size().
|
||||||
|
|
||||||
|
update_acl(K, V) ->
|
||||||
|
erlang:put(K, V),
|
||||||
|
keys_queue_update(K).
|
||||||
|
|
||||||
|
cleanup_acl(KeysQ) ->
|
||||||
|
case queue:out(KeysQ) of
|
||||||
|
{{value, OldestK}, KeysQ2} ->
|
||||||
|
{_AclResult, CachedAt} = erlang:get(OldestK),
|
||||||
|
if_expired(CachedAt,
|
||||||
|
fun(false) -> KeysQ;
|
||||||
|
(true) ->
|
||||||
|
erlang:erase(OldestK),
|
||||||
|
decr_cache_size(),
|
||||||
|
cleanup_acl(KeysQ2)
|
||||||
|
end);
|
||||||
|
{empty, KeysQ} -> KeysQ
|
||||||
|
end.
|
||||||
|
|
||||||
|
incr_cache_size() ->
|
||||||
|
erlang:put(acl_cache_size, get_cache_size() + 1), ok.
|
||||||
|
decr_cache_size() ->
|
||||||
|
Size = get_cache_size(),
|
||||||
|
if Size > 1 ->
|
||||||
|
erlang:put(acl_cache_size, Size-1);
|
||||||
|
Size =< 1 ->
|
||||||
|
erlang:put(acl_cache_size, 0)
|
||||||
|
end, ok.
|
||||||
|
set_cache_size(N) ->
|
||||||
|
erlang:put(acl_cache_size, N), ok.
|
||||||
|
|
||||||
|
%%% Ordered Keys Q %%%
|
||||||
|
keys_queue_in(Key) ->
|
||||||
|
%% delete the key first if exists
|
||||||
|
KeysQ = keys_queue_get(),
|
||||||
|
keys_queue_set(queue:in(Key, KeysQ)).
|
||||||
|
|
||||||
|
keys_queue_out() ->
|
||||||
|
case queue:out(keys_queue_get()) of
|
||||||
|
{{value, OldestK}, Q2} ->
|
||||||
|
keys_queue_set(Q2), OldestK;
|
||||||
|
{empty, _Q} ->
|
||||||
|
undefined
|
||||||
|
end.
|
||||||
|
|
||||||
|
keys_queue_update(Key) ->
|
||||||
|
NewKeysQ = keys_queue_remove(Key, keys_queue_get()),
|
||||||
|
keys_queue_set(queue:in(Key, NewKeysQ)).
|
||||||
|
|
||||||
|
keys_queue_pick(Pick) ->
|
||||||
|
KeysQ = keys_queue_get(),
|
||||||
|
case queue:is_empty(KeysQ) of
|
||||||
|
true -> undefined;
|
||||||
|
false -> Pick(KeysQ)
|
||||||
|
end.
|
||||||
|
|
||||||
|
keys_queue_remove(Key, KeysQ) ->
|
||||||
|
queue:filter(fun
|
||||||
|
(K) when K =:= Key -> false; (_) -> true
|
||||||
|
end, KeysQ).
|
||||||
|
|
||||||
|
keys_queue_set(KeysQ) ->
|
||||||
|
erlang:put(acl_keys_q, KeysQ), ok.
|
||||||
|
keys_queue_get() ->
|
||||||
|
case erlang:get(acl_keys_q) of
|
||||||
|
undefined -> queue:new();
|
||||||
|
KeysQ -> KeysQ
|
||||||
|
end.
|
||||||
|
|
||||||
|
queue_front() -> fun queue:get/1.
|
||||||
|
queue_rear() -> fun queue:get_r/1.
|
||||||
|
|
||||||
|
time_now() -> erlang:system_time(millisecond).
|
||||||
|
|
||||||
|
if_expired(CachedAt, Fun) ->
|
||||||
|
TTL = application:get_env(emqx, acl_cache_ttl, 60000),
|
||||||
|
Now = time_now(),
|
||||||
|
if (CachedAt + TTL) =< Now ->
|
||||||
|
Fun(true);
|
||||||
|
true ->
|
||||||
|
Fun(false)
|
||||||
|
end.
|
|
@ -25,6 +25,7 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-define(AC, emqx_access_control).
|
-define(AC, emqx_access_control).
|
||||||
|
-define(CACHE, emqx_acl_cache).
|
||||||
|
|
||||||
-import(emqx_access_rule, [compile/1, match/3]).
|
-import(emqx_access_rule, [compile/1, match/3]).
|
||||||
|
|
||||||
|
@ -150,14 +151,14 @@ check_acl_2(_) ->
|
||||||
|
|
||||||
acl_cache_basic(_) ->
|
acl_cache_basic(_) ->
|
||||||
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
|
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
|
||||||
not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
||||||
not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
|
|
||||||
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
|
||||||
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
|
||||||
|
|
||||||
allow = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
||||||
allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
acl_cache_expiry(_) ->
|
acl_cache_expiry(_) ->
|
||||||
|
@ -165,9 +166,9 @@ acl_cache_expiry(_) ->
|
||||||
|
|
||||||
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
|
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
|
||||||
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
|
||||||
allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
ct:sleep(1100),
|
ct:sleep(1100),
|
||||||
not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
acl_cache_full(_) ->
|
acl_cache_full(_) ->
|
||||||
|
@ -178,8 +179,8 @@ acl_cache_full(_) ->
|
||||||
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
|
||||||
|
|
||||||
%% the older ones (the <<"users/testuser/1">>) will be evicted first
|
%% the older ones (the <<"users/testuser/1">>) will be evicted first
|
||||||
not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
||||||
allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
acl_cache_cleanup(_) ->
|
acl_cache_cleanup(_) ->
|
||||||
|
@ -192,115 +193,115 @@ acl_cache_cleanup(_) ->
|
||||||
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
|
||||||
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
|
||||||
|
|
||||||
allow = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
||||||
allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
|
|
||||||
ct:sleep(1100),
|
ct:sleep(1100),
|
||||||
%% now the cache is full and the newest one - "clients/client1"
|
%% now the cache is full and the newest one - "clients/client1"
|
||||||
%% should be expired, so we'll try to cleanup before putting the next cache entry
|
%% should be expired, so we'll try to cleanup before putting the next cache entry
|
||||||
deny = ?AC:check_acl(SelfUser, subscribe, <<"#">>),
|
deny = ?AC:check_acl(SelfUser, subscribe, <<"#">>),
|
||||||
|
|
||||||
not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
||||||
not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
deny = ?AC:get_acl_cache(subscribe, <<"#">>),
|
deny = ?CACHE:get_acl_cache(subscribe, <<"#">>),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
put_get_del_cache(_) ->
|
put_get_del_cache(_) ->
|
||||||
application:set_env(emqx, acl_cache_ttl, 300000),
|
application:set_env(emqx, acl_cache_ttl, 300000),
|
||||||
application:set_env(emqx, acl_cache_max_size, 30),
|
application:set_env(emqx, acl_cache_max_size, 30),
|
||||||
|
|
||||||
not_found = ?AC:get_acl_cache(publish, <<"a">>),
|
not_found = ?CACHE:get_acl_cache(publish, <<"a">>),
|
||||||
ok = ?AC:put_acl_cache(publish, <<"a">>, allow),
|
ok = ?CACHE:put_acl_cache(publish, <<"a">>, allow),
|
||||||
allow = ?AC:get_acl_cache(publish, <<"a">>),
|
allow = ?CACHE:get_acl_cache(publish, <<"a">>),
|
||||||
|
|
||||||
not_found = ?AC:get_acl_cache(subscribe, <<"b">>),
|
not_found = ?CACHE:get_acl_cache(subscribe, <<"b">>),
|
||||||
ok = ?AC:put_acl_cache(subscribe, <<"b">>, deny),
|
ok = ?CACHE:put_acl_cache(subscribe, <<"b">>, deny),
|
||||||
deny = ?AC:get_acl_cache(subscribe, <<"b">>),
|
deny = ?CACHE:get_acl_cache(subscribe, <<"b">>),
|
||||||
|
|
||||||
2 = ?AC:get_cache_size(),
|
2 = ?CACHE:get_cache_size(),
|
||||||
?assertEqual(?AC:cache_k(subscribe, <<"b">>), ?AC:get_newest_key()).
|
?assertEqual(?CACHE:cache_k(subscribe, <<"b">>), ?CACHE:get_newest_key()).
|
||||||
|
|
||||||
cache_expiry(_) ->
|
cache_expiry(_) ->
|
||||||
application:set_env(emqx, acl_cache_ttl, 1000),
|
application:set_env(emqx, acl_cache_ttl, 1000),
|
||||||
application:set_env(emqx, acl_cache_max_size, 30),
|
application:set_env(emqx, acl_cache_max_size, 30),
|
||||||
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
|
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
|
||||||
allow = ?AC:get_acl_cache(subscribe, <<"a">>),
|
allow = ?CACHE:get_acl_cache(subscribe, <<"a">>),
|
||||||
|
|
||||||
ct:sleep(1100),
|
ct:sleep(1100),
|
||||||
not_found = ?AC:get_acl_cache(subscribe, <<"a">>),
|
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>),
|
||||||
|
|
||||||
ok = ?AC:put_acl_cache(subscribe, <<"a">>, deny),
|
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, deny),
|
||||||
deny = ?AC:get_acl_cache(subscribe, <<"a">>),
|
deny = ?CACHE:get_acl_cache(subscribe, <<"a">>),
|
||||||
|
|
||||||
ct:sleep(1100),
|
ct:sleep(1100),
|
||||||
not_found = ?AC:get_acl_cache(subscribe, <<"a">>).
|
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>).
|
||||||
|
|
||||||
cache_update(_) ->
|
cache_update(_) ->
|
||||||
application:set_env(emqx, acl_cache_ttl, 300000),
|
application:set_env(emqx, acl_cache_ttl, 300000),
|
||||||
application:set_env(emqx, acl_cache_max_size, 30),
|
application:set_env(emqx, acl_cache_max_size, 30),
|
||||||
[] = ?AC:dump_acl_cache(),
|
[] = ?CACHE:dump_acl_cache(),
|
||||||
|
|
||||||
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
|
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
|
||||||
ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
|
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
|
||||||
ok = ?AC:put_acl_cache(publish, <<"c">>, allow),
|
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
|
||||||
3 = ?AC:get_cache_size(),
|
3 = ?CACHE:get_cache_size(),
|
||||||
?assertEqual(?AC:cache_k(publish, <<"c">>), ?AC:get_newest_key()),
|
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()),
|
||||||
|
|
||||||
%% update the 2nd one
|
%% update the 2nd one
|
||||||
ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
|
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
|
||||||
%ct:pal("dump acl cache: ~p~n", [?AC:dump_acl_cache()]),
|
%ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]),
|
||||||
|
|
||||||
3 = ?AC:get_cache_size(),
|
3 = ?CACHE:get_cache_size(),
|
||||||
?assertEqual(?AC:cache_k(publish, <<"b">>), ?AC:get_newest_key()).
|
?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()).
|
||||||
|
|
||||||
cache_full_replacement(_) ->
|
cache_full_replacement(_) ->
|
||||||
application:set_env(emqx, acl_cache_ttl, 300000),
|
application:set_env(emqx, acl_cache_ttl, 300000),
|
||||||
application:set_env(emqx, acl_cache_max_size, 3),
|
application:set_env(emqx, acl_cache_max_size, 3),
|
||||||
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
|
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
|
||||||
ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
|
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
|
||||||
ok = ?AC:put_acl_cache(publish, <<"c">>, allow),
|
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
|
||||||
allow = ?AC:get_acl_cache(subscribe, <<"a">>),
|
allow = ?CACHE:get_acl_cache(subscribe, <<"a">>),
|
||||||
allow = ?AC:get_acl_cache(publish, <<"b">>),
|
allow = ?CACHE:get_acl_cache(publish, <<"b">>),
|
||||||
allow = ?AC:get_acl_cache(publish, <<"c">>),
|
allow = ?CACHE:get_acl_cache(publish, <<"c">>),
|
||||||
3 = ?AC:get_cache_size(),
|
3 = ?CACHE:get_cache_size(),
|
||||||
?assertEqual(?AC:cache_k(publish, <<"c">>), ?AC:get_newest_key()),
|
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()),
|
||||||
|
|
||||||
ok = ?AC:put_acl_cache(publish, <<"d">>, deny),
|
ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny),
|
||||||
3 = ?AC:get_cache_size(),
|
3 = ?CACHE:get_cache_size(),
|
||||||
?assertEqual(?AC:cache_k(publish, <<"d">>), ?AC:get_newest_key()),
|
?assertEqual(?CACHE:cache_k(publish, <<"d">>), ?CACHE:get_newest_key()),
|
||||||
|
|
||||||
ok = ?AC:put_acl_cache(publish, <<"e">>, deny),
|
ok = ?CACHE:put_acl_cache(publish, <<"e">>, deny),
|
||||||
3 = ?AC:get_cache_size(),
|
3 = ?CACHE:get_cache_size(),
|
||||||
?assertEqual(?AC:cache_k(publish, <<"e">>), ?AC:get_newest_key()),
|
?assertEqual(?CACHE:cache_k(publish, <<"e">>), ?CACHE:get_newest_key()),
|
||||||
|
|
||||||
not_found = ?AC:get_acl_cache(subscribe, <<"a">>),
|
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>),
|
||||||
not_found = ?AC:get_acl_cache(publish, <<"b">>),
|
not_found = ?CACHE:get_acl_cache(publish, <<"b">>),
|
||||||
allow = ?AC:get_acl_cache(publish, <<"c">>).
|
allow = ?CACHE:get_acl_cache(publish, <<"c">>).
|
||||||
|
|
||||||
cache_cleanup(_) ->
|
cache_cleanup(_) ->
|
||||||
application:set_env(emqx, acl_cache_ttl, 1000),
|
application:set_env(emqx, acl_cache_ttl, 1000),
|
||||||
application:set_env(emqx, acl_cache_max_size, 30),
|
application:set_env(emqx, acl_cache_max_size, 30),
|
||||||
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
|
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
|
||||||
ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
|
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
|
||||||
ok = ?AC:put_acl_cache(publish, <<"c">>, allow),
|
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
|
||||||
3 = ?AC:get_cache_size(),
|
3 = ?CACHE:get_cache_size(),
|
||||||
|
|
||||||
ct:sleep(1100),
|
ct:sleep(1100),
|
||||||
?AC:cleanup_acl_cache(),
|
?CACHE:cleanup_acl_cache(),
|
||||||
0 = ?AC:get_cache_size().
|
0 = ?CACHE:get_cache_size().
|
||||||
|
|
||||||
cache_full_cleanup(_) ->
|
cache_full_cleanup(_) ->
|
||||||
application:set_env(emqx, acl_cache_ttl, 1000),
|
application:set_env(emqx, acl_cache_ttl, 1000),
|
||||||
application:set_env(emqx, acl_cache_max_size, 3),
|
application:set_env(emqx, acl_cache_max_size, 3),
|
||||||
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
|
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
|
||||||
ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
|
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
|
||||||
ok = ?AC:put_acl_cache(publish, <<"c">>, allow),
|
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
|
||||||
3 = ?AC:get_cache_size(),
|
3 = ?CACHE:get_cache_size(),
|
||||||
|
|
||||||
ct:sleep(1100),
|
ct:sleep(1100),
|
||||||
%% verify auto cleanup upon cache full
|
%% verify auto cleanup upon cache full
|
||||||
ok = ?AC:put_acl_cache(subscribe, <<"d">>, deny),
|
ok = ?CACHE:put_acl_cache(subscribe, <<"d">>, deny),
|
||||||
1 = ?AC:get_cache_size().
|
1 = ?CACHE:get_cache_size().
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% emqx_access_rule
|
%% emqx_access_rule
|
||||||
|
|
Loading…
Reference in New Issue