acl cache using proc_dict
This commit is contained in:
parent
737fe19331
commit
a904031979
|
@ -434,16 +434,21 @@ acl_nomatch = allow
|
||||||
## Value: File Name
|
## Value: File Name
|
||||||
acl_file = {{ platform_etc_dir }}/acl.conf
|
acl_file = {{ platform_etc_dir }}/acl.conf
|
||||||
|
|
||||||
## Whether to enable ACL cache for publish.
|
## The ACL cache size
|
||||||
|
## The maximum count of ACL entries allowed for a client.
|
||||||
|
## Value 0 disables ACL cache
|
||||||
##
|
##
|
||||||
## Value: on | off
|
## Value: Integer
|
||||||
enable_acl_cache = on
|
## Default: 100
|
||||||
|
acl_cache_size = 100
|
||||||
|
|
||||||
## The ACL cache age.
|
## The ACL cache time-to-live.
|
||||||
|
## The time after which an ACL cache entry will be invalid
|
||||||
##
|
##
|
||||||
## Value: Duration
|
## Value: Duration
|
||||||
## Default: 5 minute
|
## Default: 1 minute
|
||||||
acl_cache_age = 5m
|
acl_cache_ttl = 1m
|
||||||
|
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
## MQTT Protocol
|
## MQTT Protocol
|
||||||
|
@ -1875,4 +1880,3 @@ sysmon.busy_port = false
|
||||||
##
|
##
|
||||||
## Value: true | false
|
## Value: true | false
|
||||||
sysmon.busy_dist_port = true
|
sysmon.busy_dist_port = true
|
||||||
|
|
||||||
|
|
|
@ -571,23 +571,17 @@ end}.
|
||||||
hidden
|
hidden
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
%% @doc Enable ACL cache for publish.
|
%% @doc ACL cache time-to-live.
|
||||||
{mapping, "enable_acl_cache", "emqx.enable_acl_cache", [
|
{mapping, "acl_cache_ttl", "emqx.acl_cache_ttl", [
|
||||||
{default, on},
|
{default, "1m"},
|
||||||
{datatype, flag}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
%% @doc ACL cache age.
|
|
||||||
{mapping, "acl_cache_age", "emqx.acl_cache_age", [
|
|
||||||
{default, "5m"},
|
|
||||||
{datatype, {duration, ms}}
|
{datatype, {duration, ms}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
%% @doc ACL cache size.
|
%% @doc ACL cache size.
|
||||||
%% {mapping, "acl_cache_size", "emqx.acl_cache_size", [
|
{mapping, "acl_cache_size", "emqx.acl_cache_size", [
|
||||||
%% {default, 0},
|
{default, 100},
|
||||||
%% {datatype, integer}
|
{datatype, integer}
|
||||||
%% ]}.
|
]}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% MQTT Protocol
|
%% MQTT Protocol
|
||||||
|
@ -1703,4 +1697,3 @@ end}.
|
||||||
{busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
|
{busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
|
||||||
{busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
|
{busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
|
||||||
end}.
|
end}.
|
||||||
|
|
||||||
|
|
|
@ -21,10 +21,18 @@
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
-export([authenticate/2]).
|
-export([authenticate/2]).
|
||||||
-export([check_acl/3, reload_acl/0, lookup_mods/1]).
|
-export([check_acl/3, reload_acl/0, lookup_mods/1]).
|
||||||
-export([clean_acl_cache/1, clean_acl_cache/2]).
|
|
||||||
-export([register_mod/3, register_mod/4, unregister_mod/2]).
|
-export([register_mod/3, register_mod/4, unregister_mod/2]).
|
||||||
-export([stop/0]).
|
-export([stop/0]).
|
||||||
|
|
||||||
|
-export([get_acl_cache/2,
|
||||||
|
put_acl_cache/3,
|
||||||
|
delete_acl_cache/2,
|
||||||
|
cleanup_acl_cache/0,
|
||||||
|
dump_acl_cache/0,
|
||||||
|
get_cache_size/0,
|
||||||
|
get_newest_key/0
|
||||||
|
]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||||
code_change/3]).
|
code_change/3]).
|
||||||
|
@ -33,6 +41,7 @@
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-type(password() :: undefined | binary()).
|
-type(password() :: undefined | binary()).
|
||||||
|
-type(acl_result() :: allow | deny).
|
||||||
|
|
||||||
-record(state, {}).
|
-record(state, {}).
|
||||||
|
|
||||||
|
@ -82,16 +91,19 @@ authenticate(Client, Password, [{Mod, State, _Seq} | Mods]) ->
|
||||||
%% @doc Check ACL
|
%% @doc Check ACL
|
||||||
-spec(check_acl(client(), pubsub(), topic()) -> allow | deny).
|
-spec(check_acl(client(), pubsub(), topic()) -> allow | deny).
|
||||||
check_acl(Client, PubSub, Topic) when ?PS(PubSub) ->
|
check_acl(Client, PubSub, Topic) when ?PS(PubSub) ->
|
||||||
check_acl(Client, PubSub, Topic, lookup_mods(acl)).
|
CacheEnabled = (get_cache_max_size() =/= 0),
|
||||||
|
check_acl(Client, PubSub, Topic, lookup_mods(acl), CacheEnabled).
|
||||||
|
|
||||||
check_acl(#client{zone = Zone}, _PubSub, _Topic, []) ->
|
check_acl(Client, PubSub, Topic, AclMods, false) ->
|
||||||
emqx_zone:get_env(Zone, acl_nomatch, deny);
|
check_acl_from_plugins(Client, PubSub, Topic, AclMods);
|
||||||
|
check_acl(Client, PubSub, Topic, AclMods, true) ->
|
||||||
check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
|
case get_acl_cache(PubSub, Topic) of
|
||||||
case Mod:check_acl({Client, PubSub, Topic}, State) of
|
not_found ->
|
||||||
allow -> allow;
|
AclResult = check_acl_from_plugins(Client, PubSub, Topic, AclMods),
|
||||||
deny -> deny;
|
put_acl_cache(PubSub, Topic, AclResult),
|
||||||
ignore -> check_acl(Client, PubSub, Topic, AclMods)
|
AclResult;
|
||||||
|
AclResult ->
|
||||||
|
AclResult
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Reload ACL Rules.
|
%% @doc Reload ACL Rules.
|
||||||
|
@ -130,12 +142,6 @@ tab_key(acl) -> acl_modules.
|
||||||
stop() ->
|
stop() ->
|
||||||
gen_server:stop(?MODULE, normal, infinity).
|
gen_server:stop(?MODULE, normal, infinity).
|
||||||
|
|
||||||
%%TODO: Support ACL cache...
|
|
||||||
clean_acl_cache(_ClientId) ->
|
|
||||||
ok.
|
|
||||||
clean_acl_cache(_ClientId, _Topic) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -193,6 +199,15 @@ terminate(_Reason, _State) ->
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
|
check_acl_from_plugins(#client{zone = Zone}, _PubSub, _Topic, []) ->
|
||||||
|
emqx_zone:get_env(Zone, acl_nomatch, deny);
|
||||||
|
check_acl_from_plugins(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
|
||||||
|
case Mod:check_acl({Client, PubSub, Topic}, State) of
|
||||||
|
allow -> allow;
|
||||||
|
deny -> deny;
|
||||||
|
ignore -> check_acl_from_plugins(Client, PubSub, Topic, AclMods)
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -202,3 +217,188 @@ if_existed(false, Fun) ->
|
||||||
if_existed(_Mod, _Fun) ->
|
if_existed(_Mod, _Fun) ->
|
||||||
{error, already_existed}.
|
{error, already_existed}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% ACL cache
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic())
|
||||||
|
-> (acl_result() | not_found)).
|
||||||
|
get_acl_cache(PubSub, Topic) ->
|
||||||
|
case erlang:get({PubSub, Topic}) of
|
||||||
|
undefined -> not_found;
|
||||||
|
{AclResult, CachedAt, _NextK, _PrevK} ->
|
||||||
|
if_acl_cache_expired(CachedAt,
|
||||||
|
fun(false) ->
|
||||||
|
AclResult;
|
||||||
|
(true) ->
|
||||||
|
%% this expired entry will get updated in
|
||||||
|
%% put_acl_cache/3
|
||||||
|
not_found
|
||||||
|
end)
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec(put_acl_cache(PubSub :: publish | subscribe,
|
||||||
|
Topic :: topic(), AclResult :: acl_result()) -> ok).
|
||||||
|
put_acl_cache(PubSub, Topic, AclResult) ->
|
||||||
|
MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
|
||||||
|
Size = get_cache_size(),
|
||||||
|
if
|
||||||
|
Size =:= 0 ->
|
||||||
|
create_first(PubSub, Topic, AclResult);
|
||||||
|
Size < MaxSize ->
|
||||||
|
append(PubSub, Topic, AclResult);
|
||||||
|
Size =:= MaxSize ->
|
||||||
|
%% when the cache get full, and also the latest one
|
||||||
|
%% is expired, we'll perform a cleanup.
|
||||||
|
NewestK = get_newest_key(),
|
||||||
|
{_AclResult, CachedAt, OldestK, _PrevK} = erlang:get(NewestK),
|
||||||
|
if_acl_cache_expired(CachedAt,
|
||||||
|
fun(true) ->
|
||||||
|
% try to cleanup first
|
||||||
|
cleanup_acl_cache(OldestK),
|
||||||
|
add_cache(PubSub, Topic, AclResult);
|
||||||
|
(false) ->
|
||||||
|
% cache full, perform cache replacement
|
||||||
|
delete_acl_cache(OldestK),
|
||||||
|
append(PubSub, Topic, AclResult)
|
||||||
|
end)
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec(delete_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) -> ok).
|
||||||
|
delete_acl_cache(PubSub, Topic) ->
|
||||||
|
delete_acl_cache(_K = {PubSub, Topic}).
|
||||||
|
delete_acl_cache(K) ->
|
||||||
|
case erlang:get(K) of
|
||||||
|
undefined -> ok;
|
||||||
|
{_AclResult, _CachedAt, NextK, PrevK} when NextK =:= PrevK ->
|
||||||
|
%% there is only one entry in the cache
|
||||||
|
erlang:erase(K),
|
||||||
|
decr_cache_size(),
|
||||||
|
set_newest_key(undefined);
|
||||||
|
{_AclResult, _CachedAt, NextK, PrevK} ->
|
||||||
|
update_next(PrevK, NextK),
|
||||||
|
update_prev(NextK, PrevK),
|
||||||
|
erlang:erase(K),
|
||||||
|
|
||||||
|
decr_cache_size(),
|
||||||
|
NewestK = get_newest_key(),
|
||||||
|
if
|
||||||
|
K =:= NewestK -> set_newest_key(NextK);
|
||||||
|
true -> ok
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% evict all the exipired cache entries
|
||||||
|
-spec(cleanup_acl_cache() -> ok).
|
||||||
|
cleanup_acl_cache() ->
|
||||||
|
case get_newest_key() of
|
||||||
|
undefined -> ok;
|
||||||
|
NewestK ->
|
||||||
|
{_AclResult, _CachedAt, OldestK, _PrevK} = erlang:get(NewestK),
|
||||||
|
cleanup_acl_cache(OldestK)
|
||||||
|
end.
|
||||||
|
cleanup_acl_cache(FromK) ->
|
||||||
|
case erlang:get(FromK) of
|
||||||
|
undefined -> ok;
|
||||||
|
{_AclResult, CachedAt, NextK, _PrevK} ->
|
||||||
|
if_acl_cache_expired(CachedAt,
|
||||||
|
fun(false) ->
|
||||||
|
ok;
|
||||||
|
(true) ->
|
||||||
|
delete_acl_cache(FromK),
|
||||||
|
cleanup_acl_cache(NextK)
|
||||||
|
end)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% for test only
|
||||||
|
dump_acl_cache() ->
|
||||||
|
[R || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish
|
||||||
|
orelse SubPub =:= subscribe].
|
||||||
|
|
||||||
|
add_cache(PubSub, Topic, AclResult) ->
|
||||||
|
Size = get_cache_size(),
|
||||||
|
MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
|
||||||
|
if
|
||||||
|
Size =:= 0 ->
|
||||||
|
create_first(PubSub, Topic, AclResult);
|
||||||
|
Size =:= MaxSize ->
|
||||||
|
OldestK = get_next_key(get_newest_key()),
|
||||||
|
delete_acl_cache(OldestK),
|
||||||
|
case get_cache_size() =:= 0 of
|
||||||
|
true -> create_first(PubSub, Topic, AclResult);
|
||||||
|
false -> append(PubSub, Topic, AclResult)
|
||||||
|
end;
|
||||||
|
true ->
|
||||||
|
append(PubSub, Topic, AclResult)
|
||||||
|
end.
|
||||||
|
|
||||||
|
create_first(PubSub, Topic, AclResult) ->
|
||||||
|
K = cache_k(PubSub, Topic),
|
||||||
|
V = cache_v(AclResult, _NextK = K, _PrevK = K),
|
||||||
|
erlang:put(K, V),
|
||||||
|
set_cache_size(1),
|
||||||
|
set_newest_key(K).
|
||||||
|
|
||||||
|
append(PubSub, Topic, AclResult) ->
|
||||||
|
%% try to update the existing one:
|
||||||
|
%% - we delete it and then append it at the tail
|
||||||
|
delete_acl_cache(PubSub, Topic),
|
||||||
|
|
||||||
|
case get_cache_size() =:= 0 of
|
||||||
|
true -> create_first(PubSub, Topic, AclResult);
|
||||||
|
false ->
|
||||||
|
NewestK = get_newest_key(),
|
||||||
|
OldestK = get_next_key(NewestK),
|
||||||
|
K = cache_k(PubSub, Topic),
|
||||||
|
V = cache_v(AclResult, OldestK, NewestK),
|
||||||
|
erlang:put(K, V),
|
||||||
|
|
||||||
|
update_next(NewestK, K),
|
||||||
|
update_prev(OldestK, K),
|
||||||
|
incr_cache_size(),
|
||||||
|
set_newest_key(K)
|
||||||
|
end.
|
||||||
|
|
||||||
|
get_next_key(K) ->
|
||||||
|
erlang:element(3, erlang:get(K)).
|
||||||
|
update_next(K, NextK) ->
|
||||||
|
NoNext = erlang:delete_element(3, erlang:get(K)),
|
||||||
|
erlang:put(K, erlang:insert_element(3, NoNext, NextK)).
|
||||||
|
update_prev(K, PrevK) ->
|
||||||
|
NoPrev = erlang:delete_element(4, erlang:get(K)),
|
||||||
|
erlang:put(K, erlang:insert_element(4, NoPrev, PrevK)).
|
||||||
|
|
||||||
|
cache_k(PubSub, Topic)-> {PubSub, Topic}.
|
||||||
|
cache_v(AclResult, NextK, PrevK)-> {AclResult, time_now(), NextK, PrevK}.
|
||||||
|
|
||||||
|
get_cache_max_size() ->
|
||||||
|
application:get_env(emqx, acl_cache_size, 100).
|
||||||
|
|
||||||
|
get_cache_size() ->
|
||||||
|
case erlang:get(acl_cache_size) of
|
||||||
|
undefined -> 0;
|
||||||
|
Size -> Size
|
||||||
|
end.
|
||||||
|
incr_cache_size() ->
|
||||||
|
erlang:put(acl_cache_size, get_cache_size() + 1), ok.
|
||||||
|
decr_cache_size() ->
|
||||||
|
erlang:put(acl_cache_size, get_cache_size() - 1), ok.
|
||||||
|
set_cache_size(N) ->
|
||||||
|
erlang:put(acl_cache_size, N), ok.
|
||||||
|
|
||||||
|
get_newest_key() ->
|
||||||
|
erlang:get(acl_cache_newest_key).
|
||||||
|
|
||||||
|
set_newest_key(Key) ->
|
||||||
|
erlang:put(acl_cache_newest_key, Key), ok.
|
||||||
|
|
||||||
|
time_now() -> erlang:system_time(millisecond).
|
||||||
|
|
||||||
|
if_acl_cache_expired(CachedAt, Fun) ->
|
||||||
|
TTL = application:get_env(emqx, acl_cache_ttl, 60000),
|
||||||
|
Now = time_now(),
|
||||||
|
if (CachedAt + TTL) =< Now ->
|
||||||
|
Fun(true);
|
||||||
|
true ->
|
||||||
|
Fun(false)
|
||||||
|
end.
|
||||||
|
|
|
@ -662,4 +662,3 @@ feed_var({<<"%u">>, Username}, MountPoint) ->
|
||||||
|
|
||||||
sp(true) -> 1;
|
sp(true) -> 1;
|
||||||
sp(false) -> 0.
|
sp(false) -> 0.
|
||||||
|
|
||||||
|
|
|
@ -29,27 +29,54 @@
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
[{group, access_control},
|
[{group, access_control},
|
||||||
{group, access_rule}].
|
{group, acl_cache},
|
||||||
|
{group, access_control_cache_mode},
|
||||||
|
{group, access_rule}
|
||||||
|
].
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
[{access_control, [sequence],
|
[{access_control, [sequence],
|
||||||
[reload_acl,
|
[reload_acl,
|
||||||
register_mod,
|
register_mod,
|
||||||
unregister_mod,
|
unregister_mod,
|
||||||
check_acl]},
|
check_acl_1,
|
||||||
|
check_acl_2
|
||||||
|
]},
|
||||||
|
{access_control_cache_mode, [],
|
||||||
|
[
|
||||||
|
acl_cache_basic,
|
||||||
|
acl_cache_expiry,
|
||||||
|
acl_cache_cleanup,
|
||||||
|
acl_cache_full
|
||||||
|
]},
|
||||||
|
{acl_cache, [], [
|
||||||
|
put_get_del_cache,
|
||||||
|
cache_update,
|
||||||
|
cache_expiry,
|
||||||
|
cache_full_replacement,
|
||||||
|
cache_cleanup,
|
||||||
|
cache_full_cleanup
|
||||||
|
]},
|
||||||
{access_rule, [],
|
{access_rule, [],
|
||||||
[compile_rule,
|
[compile_rule,
|
||||||
match_rule]}].
|
match_rule]}].
|
||||||
|
|
||||||
init_per_group(access_control, Config) ->
|
init_per_group(Group, Config) when Group =:= access_control;
|
||||||
|
Group =:= access_control_cache_mode ->
|
||||||
|
prepare_config(Group),
|
||||||
application:load(emqx),
|
application:load(emqx),
|
||||||
prepare_config(),
|
|
||||||
Config;
|
Config;
|
||||||
|
|
||||||
init_per_group(_Group, Config) ->
|
init_per_group(_Group, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
prepare_config() ->
|
prepare_config(Group = access_control) ->
|
||||||
|
set_acl_config_file(Group),
|
||||||
|
application:set_env(emqx, acl_cache_size, 0);
|
||||||
|
prepare_config(Group = access_control_cache_mode) ->
|
||||||
|
set_acl_config_file(Group),
|
||||||
|
application:set_env(emqx, acl_cache_size, 100).
|
||||||
|
|
||||||
|
set_acl_config_file(_Group) ->
|
||||||
Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]},
|
Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]},
|
||||||
{allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]},
|
{allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]},
|
||||||
{allow, {user, "admin"}, pubsub, ["a/b/c", "d/e/f/#"]},
|
{allow, {user, "admin"}, pubsub, ["a/b/c", "d/e/f/#"]},
|
||||||
|
@ -59,8 +86,8 @@ prepare_config() ->
|
||||||
{deny, all, subscribe, ["$SYS/#", "#"]},
|
{deny, all, subscribe, ["$SYS/#", "#"]},
|
||||||
{deny, all}],
|
{deny, all}],
|
||||||
write_config("access_SUITE_acl.conf", Rules),
|
write_config("access_SUITE_acl.conf", Rules),
|
||||||
application:set_env(emqx, acl_file, "access_SUITE_acl.conf"),
|
application:set_env(emqx, acl_file, "access_SUITE_acl.conf").
|
||||||
application:set_env(emqx, acl_cache, false).
|
|
||||||
|
|
||||||
write_config(Filename, Terms) ->
|
write_config(Filename, Terms) ->
|
||||||
file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]).
|
file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]).
|
||||||
|
@ -68,24 +95,18 @@ write_config(Filename, Terms) ->
|
||||||
end_per_group(_Group, Config) ->
|
end_per_group(_Group, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
init_per_testcase(TestCase, Config) when TestCase =:= reload_acl;
|
|
||||||
TestCase =:= register_mod;
|
|
||||||
TestCase =:= unregister_mod;
|
|
||||||
TestCase =:= check_acl ->
|
|
||||||
{ok, _Pid} = ?AC:start_link(), Config;
|
|
||||||
|
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
|
{ok, _Pid} = ?AC:start_link(),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(TestCase, _Config) when TestCase =:= reload_acl;
|
|
||||||
TestCase =:= register_mod;
|
|
||||||
TestCase =:= unregister_mod;
|
|
||||||
TestCase =:= check_acl ->
|
|
||||||
?AC:stop();
|
|
||||||
|
|
||||||
end_per_testcase(_TestCase, _Config) ->
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
per_testcase_config(acl_cache_full, Config) ->
|
||||||
|
Config;
|
||||||
|
per_testcase_config(_TestCase, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% emqx_access_control
|
%% emqx_access_control
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -115,15 +136,170 @@ unregister_mod(_) ->
|
||||||
timer:sleep(5),
|
timer:sleep(5),
|
||||||
[] = ?AC:lookup_mods(auth).
|
[] = ?AC:lookup_mods(auth).
|
||||||
|
|
||||||
check_acl(_) ->
|
check_acl_1(_) ->
|
||||||
User1 = #client{id = <<"client1">>, username = <<"testuser">>},
|
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
|
||||||
User2 = #client{id = <<"client2">>, username = <<"xyz">>},
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
|
||||||
allow = ?AC:check_acl(User1, subscribe, <<"users/testuser/1">>),
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
|
||||||
allow = ?AC:check_acl(User1, subscribe, <<"clients/client1">>),
|
deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>),
|
||||||
deny = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>),
|
allow = ?AC:check_acl(SelfUser, publish, <<"users/testuser/1">>),
|
||||||
allow = ?AC:check_acl(User1, publish, <<"users/testuser/1">>),
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>).
|
||||||
allow = ?AC:check_acl(User1, subscribe, <<"a/b/c">>),
|
check_acl_2(_) ->
|
||||||
deny = ?AC:check_acl(User2, subscribe, <<"a/b/c">>).
|
SelfUser = #client{id = <<"client2">>, username = <<"xyz">>},
|
||||||
|
deny = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>).
|
||||||
|
|
||||||
|
acl_cache_basic(_) ->
|
||||||
|
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
|
||||||
|
not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
||||||
|
not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
|
|
||||||
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
|
||||||
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
|
||||||
|
|
||||||
|
allow = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
||||||
|
allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
acl_cache_expiry(_) ->
|
||||||
|
application:set_env(emqx, acl_cache_ttl, 1000),
|
||||||
|
|
||||||
|
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
|
||||||
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
|
||||||
|
allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
|
ct:sleep(1100),
|
||||||
|
not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
acl_cache_full(_) ->
|
||||||
|
application:set_env(emqx, acl_cache_size, 1),
|
||||||
|
|
||||||
|
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
|
||||||
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
|
||||||
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
|
||||||
|
|
||||||
|
%% the older ones (the <<"users/testuser/1">>) will be evicted first
|
||||||
|
not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
||||||
|
allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
acl_cache_cleanup(_) ->
|
||||||
|
%% The acl cache will try to evict memory, if the size is full and the newest
|
||||||
|
%% cache entry is expired
|
||||||
|
application:set_env(emqx, acl_cache_ttl, 1000),
|
||||||
|
application:set_env(emqx, acl_cache_size, 2),
|
||||||
|
|
||||||
|
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
|
||||||
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
|
||||||
|
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
|
||||||
|
|
||||||
|
allow = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
||||||
|
allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
|
|
||||||
|
ct:sleep(1100),
|
||||||
|
%% now the cache is full and the newest one - "clients/client1"
|
||||||
|
%% should be expired, so we'll try to cleanup before putting the next cache entry
|
||||||
|
deny = ?AC:check_acl(SelfUser, subscribe, <<"#">>),
|
||||||
|
|
||||||
|
not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
||||||
|
not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>),
|
||||||
|
deny = ?AC:get_acl_cache(subscribe, <<"#">>),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
put_get_del_cache(_) ->
|
||||||
|
application:set_env(emqx, acl_cache_ttl, 300000),
|
||||||
|
application:set_env(emqx, acl_cache_size, 30),
|
||||||
|
|
||||||
|
not_found = ?AC:get_acl_cache(publish, <<"a">>),
|
||||||
|
ok = ?AC:put_acl_cache(publish, <<"a">>, allow),
|
||||||
|
allow = ?AC:get_acl_cache(publish, <<"a">>),
|
||||||
|
|
||||||
|
not_found = ?AC:get_acl_cache(subscribe, <<"b">>),
|
||||||
|
ok = ?AC:put_acl_cache(subscribe, <<"b">>, deny),
|
||||||
|
deny = ?AC:get_acl_cache(subscribe, <<"b">>),
|
||||||
|
|
||||||
|
2 = ?AC:get_cache_size(),
|
||||||
|
{subscribe, <<"b">>} = ?AC:get_newest_key().
|
||||||
|
|
||||||
|
cache_expiry(_) ->
|
||||||
|
application:set_env(emqx, acl_cache_ttl, 1000),
|
||||||
|
application:set_env(emqx, acl_cache_size, 30),
|
||||||
|
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
|
||||||
|
allow = ?AC:get_acl_cache(subscribe, <<"a">>),
|
||||||
|
|
||||||
|
ct:sleep(1100),
|
||||||
|
not_found = ?AC:get_acl_cache(subscribe, <<"a">>),
|
||||||
|
|
||||||
|
ok = ?AC:put_acl_cache(subscribe, <<"a">>, deny),
|
||||||
|
deny = ?AC:get_acl_cache(subscribe, <<"a">>),
|
||||||
|
|
||||||
|
ct:sleep(1100),
|
||||||
|
not_found = ?AC:get_acl_cache(subscribe, <<"a">>).
|
||||||
|
|
||||||
|
cache_update(_) ->
|
||||||
|
application:set_env(emqx, acl_cache_ttl, 300000),
|
||||||
|
application:set_env(emqx, acl_cache_size, 30),
|
||||||
|
[] = ?AC:dump_acl_cache(),
|
||||||
|
|
||||||
|
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
|
||||||
|
ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
|
||||||
|
ok = ?AC:put_acl_cache(publish, <<"c">>, allow),
|
||||||
|
3 = ?AC:get_cache_size(),
|
||||||
|
{publish, <<"c">>} = ?AC:get_newest_key(),
|
||||||
|
|
||||||
|
%% update the 2nd one
|
||||||
|
ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
|
||||||
|
%ct:pal("dump acl cache: ~p~n", [?AC:dump_acl_cache()]),
|
||||||
|
|
||||||
|
3 = ?AC:get_cache_size(),
|
||||||
|
{publish, <<"b">>} = ?AC:get_newest_key().
|
||||||
|
|
||||||
|
cache_full_replacement(_) ->
|
||||||
|
application:set_env(emqx, acl_cache_ttl, 300000),
|
||||||
|
application:set_env(emqx, acl_cache_size, 3),
|
||||||
|
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
|
||||||
|
ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
|
||||||
|
ok = ?AC:put_acl_cache(publish, <<"c">>, allow),
|
||||||
|
allow = ?AC:get_acl_cache(subscribe, <<"a">>),
|
||||||
|
allow = ?AC:get_acl_cache(publish, <<"b">>),
|
||||||
|
allow = ?AC:get_acl_cache(publish, <<"c">>),
|
||||||
|
3 = ?AC:get_cache_size(),
|
||||||
|
{publish, <<"c">>} = ?AC:get_newest_key(),
|
||||||
|
|
||||||
|
ok = ?AC:put_acl_cache(publish, <<"d">>, deny),
|
||||||
|
3 = ?AC:get_cache_size(),
|
||||||
|
{publish, <<"d">>} = ?AC:get_newest_key(),
|
||||||
|
|
||||||
|
ok = ?AC:put_acl_cache(publish, <<"e">>, deny),
|
||||||
|
3 = ?AC:get_cache_size(),
|
||||||
|
{publish, <<"e">>} = ?AC:get_newest_key(),
|
||||||
|
|
||||||
|
not_found = ?AC:get_acl_cache(subscribe, <<"a">>),
|
||||||
|
not_found = ?AC:get_acl_cache(publish, <<"b">>),
|
||||||
|
allow = ?AC:get_acl_cache(publish, <<"c">>).
|
||||||
|
|
||||||
|
cache_cleanup(_) ->
|
||||||
|
application:set_env(emqx, acl_cache_ttl, 1000),
|
||||||
|
application:set_env(emqx, acl_cache_size, 30),
|
||||||
|
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
|
||||||
|
ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
|
||||||
|
ok = ?AC:put_acl_cache(publish, <<"c">>, allow),
|
||||||
|
3 = ?AC:get_cache_size(),
|
||||||
|
|
||||||
|
ct:sleep(1100),
|
||||||
|
?AC:cleanup_acl_cache(),
|
||||||
|
0 = ?AC:get_cache_size().
|
||||||
|
|
||||||
|
cache_full_cleanup(_) ->
|
||||||
|
application:set_env(emqx, acl_cache_ttl, 1000),
|
||||||
|
application:set_env(emqx, acl_cache_size, 3),
|
||||||
|
ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow),
|
||||||
|
ok = ?AC:put_acl_cache(publish, <<"b">>, allow),
|
||||||
|
ok = ?AC:put_acl_cache(publish, <<"c">>, allow),
|
||||||
|
3 = ?AC:get_cache_size(),
|
||||||
|
|
||||||
|
ct:sleep(1100),
|
||||||
|
%% verify auto cleanup upon cache full
|
||||||
|
ok = ?AC:put_acl_cache(subscribe, <<"d">>, deny),
|
||||||
|
1 = ?AC:get_cache_size().
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% emqx_access_rule
|
%% emqx_access_rule
|
||||||
|
|
Loading…
Reference in New Issue