use config enable_acl_cache

This commit is contained in:
terry-xiaoyu 2018-08-26 18:22:17 +08:00
parent 9717f9b83e
commit 9d29dd0e10
6 changed files with 82 additions and 33 deletions

View File

@ -434,12 +434,17 @@ acl_nomatch = allow
## Value: File Name ## Value: File Name
acl_file = {{ platform_etc_dir }}/acl.conf acl_file = {{ platform_etc_dir }}/acl.conf
## Whether to enable ACL cache for publish.
## The ACL cache size ## The ACL cache size
## The maximum count of ACL entries allowed for a client. ## The maximum count of ACL entries allowed for a client.
## ##
## Value 0 disables ACL cache ## Value: on | off
enable_acl_cache = on
## The ACL cache size
## The maximum count of ACL entries allowed for a client.
## ##
## Value: Integer ## Value: Integer greater than 0
## Default: 32 ## Default: 32
acl_cache_max_size = 32 acl_cache_max_size = 32

View File

@ -571,6 +571,12 @@ end}.
hidden hidden
]}. ]}.
%% @doc Enable ACL cache for publish.
{mapping, "enable_acl_cache", "emqx.enable_acl_cache", [
{default, on},
{datatype, flag}
]}.
%% @doc ACL cache time-to-live. %% @doc ACL cache time-to-live.
{mapping, "acl_cache_ttl", "emqx.acl_cache_ttl", [ {mapping, "acl_cache_ttl", "emqx.acl_cache_ttl", [
{default, "1m"}, {default, "1m"},
@ -580,9 +586,14 @@ end}.
%% @doc ACL cache size. %% @doc ACL cache size.
{mapping, "acl_cache_max_size", "emqx.acl_cache_max_size", [ {mapping, "acl_cache_max_size", "emqx.acl_cache_max_size", [
{default, 32}, {default, 32},
{datatype, integer} {datatype, integer},
{validators, ["range:gt_0"]}
]}. ]}.
{validator, "range:gt_0", "must greater than 0",
fun(X) -> X > 0 end
}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% MQTT Protocol %% MQTT Protocol
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -3,8 +3,8 @@
{vsn,"3.0"}, {vsn,"3.0"},
{modules,[]}, {modules,[]},
{registered,[emqx_sup]}, {registered,[emqx_sup]},
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy, {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy
minirest]}, ]},
{env,[]}, {env,[]},
{mod,{emqx_app,[]}}, {mod,{emqx_app,[]}},
{maintainers,["Feng Lee <feng@emqx.io>"]}, {maintainers,["Feng Lee <feng@emqx.io>"]},

View File

@ -81,15 +81,15 @@ authenticate(Client, Password, [{Mod, State, _Seq} | Mods]) ->
%% @doc Check ACL %% @doc Check ACL
-spec(check_acl(client(), pubsub(), topic()) -> allow | deny). -spec(check_acl(client(), pubsub(), topic()) -> allow | deny).
check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> check_acl(Client, PubSub, Topic) when ?PS(PubSub) ->
CacheEnabled = (emqx_acl_cache:get_cache_max_size() =/= 0), CacheEnabled = emqx_acl_cache:is_enabled(),
check_acl(Client, PubSub, Topic, lookup_mods(acl), CacheEnabled). check_acl(Client, PubSub, Topic, lookup_mods(acl), CacheEnabled).
check_acl(Client, PubSub, Topic, AclMods, false) -> check_acl(Client, PubSub, Topic, AclMods, false) ->
check_acl_from_plugins(Client, PubSub, Topic, AclMods); do_check_acl(Client, PubSub, Topic, AclMods);
check_acl(Client, PubSub, Topic, AclMods, true) -> check_acl(Client, PubSub, Topic, AclMods, true) ->
case emqx_acl_cache:get_acl_cache(PubSub, Topic) of case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
not_found -> not_found ->
AclResult = check_acl_from_plugins(Client, PubSub, Topic, AclMods), AclResult = do_check_acl(Client, PubSub, Topic, AclMods),
emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult), emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult),
AclResult; AclResult;
AclResult -> AclResult ->
@ -189,13 +189,13 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
check_acl_from_plugins(#client{zone = Zone}, _PubSub, _Topic, []) -> do_check_acl(#client{zone = Zone}, _PubSub, _Topic, []) ->
emqx_zone:get_env(Zone, acl_nomatch, deny); emqx_zone:get_env(Zone, acl_nomatch, deny);
check_acl_from_plugins(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> do_check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
case Mod:check_acl({Client, PubSub, Topic}, State) of case Mod:check_acl({Client, PubSub, Topic}, State) of
allow -> allow; allow -> allow;
deny -> deny; deny -> deny;
ignore -> check_acl_from_plugins(Client, PubSub, Topic, AclMods) ignore -> do_check_acl(Client, PubSub, Topic, AclMods)
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -13,6 +13,7 @@
, get_oldest_key/0 , get_oldest_key/0
, cache_k/2 , cache_k/2
, cache_v/1 , cache_v/1
, is_enabled/0
]). ]).
-type(acl_result() :: allow | deny). -type(acl_result() :: allow | deny).
@ -21,6 +22,10 @@
cache_k(PubSub, Topic)-> {PubSub, Topic}. cache_k(PubSub, Topic)-> {PubSub, Topic}.
cache_v(AclResult)-> {AclResult, time_now()}. cache_v(AclResult)-> {AclResult, time_now()}.
-spec(is_enabled() -> boolean()).
is_enabled() ->
application:get_env(emqx, enable_acl_cache, true).
%% We'll cleanup the cache before repalcing an expired acl. %% We'll cleanup the cache before repalcing an expired acl.
-spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) -spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic())
-> (acl_result() | not_found)). -> (acl_result() | not_found)).
@ -90,7 +95,7 @@ get_newest_key() ->
keys_queue_pick(queue_rear()). keys_queue_pick(queue_rear()).
get_cache_max_size() -> get_cache_max_size() ->
application:get_env(emqx, acl_cache_max_size, 0). application:get_env(emqx, acl_cache_max_size, 32).
get_cache_size() -> get_cache_size() ->
case erlang:get(acl_cache_size) of case erlang:get(acl_cache_size) of

View File

@ -55,9 +55,10 @@ groups() ->
put_get_del_cache, put_get_del_cache,
cache_update, cache_update,
cache_expiry, cache_expiry,
cache_full_replacement, cache_replacement,
cache_cleanup, cache_cleanup,
cache_full_cleanup cache_auto_emtpy,
cache_auto_cleanup
]}, ]},
{access_rule, [], {access_rule, [],
[compile_rule, [compile_rule,
@ -73,9 +74,10 @@ init_per_group(_Group, Config) ->
prepare_config(Group = access_control) -> prepare_config(Group = access_control) ->
set_acl_config_file(Group), set_acl_config_file(Group),
application:set_env(emqx, acl_cache_max_size, 0); application:set_env(emqx, enable_acl_cache, false);
prepare_config(Group = access_control_cache_mode) -> prepare_config(Group = access_control_cache_mode) ->
set_acl_config_file(Group), set_acl_config_file(Group),
application:set_env(emqx, enable_acl_cache, true),
application:set_env(emqx, acl_cache_max_size, 100). application:set_env(emqx, acl_cache_max_size, 100).
set_acl_config_file(_Group) -> set_acl_config_file(_Group) ->
@ -162,12 +164,12 @@ acl_cache_basic(_) ->
ok. ok.
acl_cache_expiry(_) -> acl_cache_expiry(_) ->
application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_ttl, 100),
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
ct:sleep(1100), ct:sleep(150),
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
ok. ok.
@ -186,7 +188,7 @@ acl_cache_full(_) ->
acl_cache_cleanup(_) -> acl_cache_cleanup(_) ->
%% The acl cache will try to evict memory, if the size is full and the newest %% The acl cache will try to evict memory, if the size is full and the newest
%% cache entry is expired %% cache entry is expired
application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 2), application:set_env(emqx, acl_cache_max_size, 2),
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
@ -196,9 +198,10 @@ acl_cache_cleanup(_) ->
allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
ct:sleep(1100), ct:sleep(150),
%% now the cache is full and the newest one - "clients/client1" %% now the cache is full and the newest one - "clients/client1"
%% should be expired, so we'll try to cleanup before putting the next cache entry %% should be expired, so we'll empty the cache before putting
%% the next cache entry
deny = ?AC:check_acl(SelfUser, subscribe, <<"#">>), deny = ?AC:check_acl(SelfUser, subscribe, <<"#">>),
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
@ -222,18 +225,18 @@ put_get_del_cache(_) ->
?assertEqual(?CACHE:cache_k(subscribe, <<"b">>), ?CACHE:get_newest_key()). ?assertEqual(?CACHE:cache_k(subscribe, <<"b">>), ?CACHE:get_newest_key()).
cache_expiry(_) -> cache_expiry(_) ->
application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 30), application:set_env(emqx, acl_cache_max_size, 30),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
allow = ?CACHE:get_acl_cache(subscribe, <<"a">>), allow = ?CACHE:get_acl_cache(subscribe, <<"a">>),
ct:sleep(1100), ct:sleep(150),
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, deny), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, deny),
deny = ?CACHE:get_acl_cache(subscribe, <<"a">>), deny = ?CACHE:get_acl_cache(subscribe, <<"a">>),
ct:sleep(1100), ct:sleep(150),
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>). not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>).
cache_update(_) -> cache_update(_) ->
@ -249,12 +252,13 @@ cache_update(_) ->
%% update the 2nd one %% update the 2nd one
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
%ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]), ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]),
3 = ?CACHE:get_cache_size(), 3 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()). ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()),
?assertEqual(?CACHE:cache_k(subscribe, <<"a">>), ?CACHE:get_oldest_key()).
cache_full_replacement(_) -> cache_replacement(_) ->
application:set_env(emqx, acl_cache_ttl, 300000), application:set_env(emqx, acl_cache_ttl, 300000),
application:set_env(emqx, acl_cache_max_size, 3), application:set_env(emqx, acl_cache_max_size, 3),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
@ -269,40 +273,64 @@ cache_full_replacement(_) ->
ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny), ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny),
3 = ?CACHE:get_cache_size(), 3 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(publish, <<"d">>), ?CACHE:get_newest_key()), ?assertEqual(?CACHE:cache_k(publish, <<"d">>), ?CACHE:get_newest_key()),
?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_oldest_key()),
ok = ?CACHE:put_acl_cache(publish, <<"e">>, deny), ok = ?CACHE:put_acl_cache(publish, <<"e">>, deny),
3 = ?CACHE:get_cache_size(), 3 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(publish, <<"e">>), ?CACHE:get_newest_key()), ?assertEqual(?CACHE:cache_k(publish, <<"e">>), ?CACHE:get_newest_key()),
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()),
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>),
not_found = ?CACHE:get_acl_cache(publish, <<"b">>), not_found = ?CACHE:get_acl_cache(publish, <<"b">>),
allow = ?CACHE:get_acl_cache(publish, <<"c">>). allow = ?CACHE:get_acl_cache(publish, <<"c">>).
cache_cleanup(_) -> cache_cleanup(_) ->
application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 30), application:set_env(emqx, acl_cache_max_size, 30),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ct:sleep(150),
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
3 = ?CACHE:get_cache_size(), 3 = ?CACHE:get_cache_size(),
ct:sleep(1100),
?CACHE:cleanup_acl_cache(), ?CACHE:cleanup_acl_cache(),
0 = ?CACHE:get_cache_size(). ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()),
1 = ?CACHE:get_cache_size().
cache_full_cleanup(_) -> cache_auto_emtpy(_) ->
application:set_env(emqx, acl_cache_ttl, 1000), %% verify cache is emptied when cache full and even the newest
%% one is expired.
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 3), application:set_env(emqx, acl_cache_max_size, 3),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
3 = ?CACHE:get_cache_size(), 3 = ?CACHE:get_cache_size(),
ct:sleep(1100), ct:sleep(150),
%% verify auto cleanup upon cache full
ok = ?CACHE:put_acl_cache(subscribe, <<"d">>, deny), ok = ?CACHE:put_acl_cache(subscribe, <<"d">>, deny),
1 = ?CACHE:get_cache_size(). 1 = ?CACHE:get_cache_size().
cache_auto_cleanup(_) ->
%% verify we'll cleanup expired entries when we got a exipired acl
%% from cache.
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 30),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ct:sleep(150),
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny),
4 = ?CACHE:get_cache_size(),
%% "a" and "b" expires, while "c" and "d" not
not_found = ?CACHE:get_acl_cache(publish, <<"b">>),
2 = ?CACHE:get_cache_size(),
ct:sleep(150), %% now "c" and "d" expires
not_found = ?CACHE:get_acl_cache(publish, <<"c">>),
0 = ?CACHE:get_cache_size().
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% emqx_access_rule %% emqx_access_rule
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------