Merge pull request #2999 from emqx/clean_acl_cache

Add API for clean and get acl cache
This commit is contained in:
Shawn 2019-10-28 19:15:06 +08:00 committed by GitHub
commit eb2fec884c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 14 deletions

View File

@ -18,21 +18,31 @@
-include("emqx.hrl"). -include("emqx.hrl").
-export([ get_acl_cache/2 -export([ list_acl_cache/0
, get_acl_cache/2
, put_acl_cache/3 , put_acl_cache/3
, cleanup_acl_cache/0 , cleanup_acl_cache/0
, empty_acl_cache/0 , empty_acl_cache/0
, dump_acl_cache/0 , dump_acl_cache/0
, get_cache_size/0
, get_cache_max_size/0 , get_cache_max_size/0
, get_newest_key/0 , get_cache_ttl/0
, get_oldest_key/0
, cache_k/2
, cache_v/1
, is_enabled/0 , is_enabled/0
]). ]).
%% export for test
-export([ cache_k/2
, cache_v/1
, get_cache_size/0
, get_newest_key/0
, get_oldest_key/0
]).
-type(acl_result() :: allow | deny). -type(acl_result() :: allow | deny).
-type(system_time() :: integer()).
-type(cache_key() :: {emqx_types:pubsub(), emqx_types:topic()}).
-type(cache_val() :: {acl_result(), system_time()}).
-type(acl_cache_entry() :: {cache_key(), cache_val()}).
%% Wrappers for key and value %% Wrappers for key and value
cache_k(PubSub, Topic)-> {PubSub, Topic}. cache_k(PubSub, Topic)-> {PubSub, Topic}.
@ -42,8 +52,21 @@ cache_v(AclResult)-> {AclResult, time_now()}.
is_enabled() -> is_enabled() ->
application:get_env(emqx, enable_acl_cache, true). application:get_env(emqx, enable_acl_cache, true).
%% We'll cleanup the cache before repalcing an expired acl. -spec(get_cache_max_size() -> integer()).
-spec(get_acl_cache(publish | subscribe, emqx_topic:topic()) -> (acl_result() | not_found)). get_cache_max_size() ->
application:get_env(emqx, acl_cache_max_size, 32).
-spec(get_cache_ttl() -> integer()).
get_cache_ttl() ->
application:get_env(emqx, acl_cache_ttl, 60000).
-spec(list_acl_cache() -> [acl_cache_entry()]).
list_acl_cache() ->
cleanup_acl_cache(),
map_acl_cache(fun(Cache) -> Cache end).
%% We'll cleanup the cache before replacing an expired acl.
-spec(get_acl_cache(emqx_types:pubsub(), emqx_topic:topic()) -> (acl_result() | not_found)).
get_acl_cache(PubSub, Topic) -> get_acl_cache(PubSub, Topic) ->
case erlang:get(cache_k(PubSub, Topic)) of case erlang:get(cache_k(PubSub, Topic)) of
undefined -> not_found; undefined -> not_found;
@ -59,7 +82,7 @@ get_acl_cache(PubSub, Topic) ->
%% If the cache get full, and also the latest one %% If the cache get full, and also the latest one
%% is expired, then delete all the cache entries %% is expired, then delete all the cache entries
-spec(put_acl_cache(publish | subscribe, emqx_topic:topic(), acl_result()) -> ok). -spec(put_acl_cache(emqx_types:pubsub(), emqx_topic:topic(), acl_result()) -> ok).
put_acl_cache(PubSub, Topic, AclResult) -> put_acl_cache(PubSub, Topic, AclResult) ->
MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
Size = get_cache_size(), Size = get_cache_size(),
@ -97,7 +120,7 @@ evict_acl_cache() ->
erlang:erase(OldestK), erlang:erase(OldestK),
decr_cache_size(). decr_cache_size().
%% cleanup all the exipired cache entries %% cleanup all the expired cache entries
-spec(cleanup_acl_cache() -> ok). -spec(cleanup_acl_cache() -> ok).
cleanup_acl_cache() -> cleanup_acl_cache() ->
keys_queue_set( keys_queue_set(
@ -108,9 +131,6 @@ get_oldest_key() ->
get_newest_key() -> get_newest_key() ->
keys_queue_pick(queue_rear()). keys_queue_pick(queue_rear()).
get_cache_max_size() ->
application:get_env(emqx, acl_cache_max_size, 32).
get_cache_size() -> get_cache_size() ->
case erlang:get(acl_cache_size) of case erlang:get(acl_cache_size) of
undefined -> 0; undefined -> 0;
@ -215,7 +235,7 @@ queue_rear() -> fun queue:get_r/1.
time_now() -> erlang:system_time(millisecond). time_now() -> erlang:system_time(millisecond).
if_expired(CachedAt, Fun) -> if_expired(CachedAt, Fun) ->
TTL = application:get_env(emqx, acl_cache_ttl, 60000), TTL = get_cache_ttl(),
Now = time_now(), Now = time_now(),
if (CachedAt + TTL) =< Now -> if (CachedAt + TTL) =< Now ->
Fun(true); Fun(true);

View File

@ -722,6 +722,9 @@ handle_call({takeover, 'end'}, Channel = #channel{session = Session,
AllPendings = lists:append(Delivers, Pendings), AllPendings = lists:append(Delivers, Pendings),
shutdown(takeovered, AllPendings, Channel); shutdown(takeovered, AllPendings, Channel);
handle_call(list_acl_cache, Channel) ->
{reply, emqx_acl_cache:list_acl_cache(), Channel};
handle_call(Req, Channel) -> handle_call(Req, Channel) ->
?LOG(error, "Unexpected call: ~p", [Req]), ?LOG(error, "Unexpected call: ~p", [Req]),
reply(ignored, Channel). reply(ignored, Channel).
@ -775,6 +778,11 @@ handle_info({sock_closed, Reason}, Channel = #channel{conninfo = ConnInfo,
shutdown(Reason, Channel2) shutdown(Reason, Channel2)
end; end;
handle_info(clean_acl_cache, Channel) ->
?LOG(debug, "clear acl cache"),
ok = emqx_acl_cache:empty_acl_cache(),
{ok, Channel};
handle_info(Info, Channel) -> handle_info(Info, Channel) ->
?LOG(error, "Unexpected info: ~p~n", [Info]), ?LOG(error, "Unexpected info: ~p~n", [Info]),
error(unexpected_info), error(unexpected_info),

View File

@ -23,12 +23,40 @@
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
Config. Config.
end_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) ->
Config. Config.
t_clean_acl_cache(_Config) ->
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
ct:sleep(100),
ClientPid = case emqx_cm:lookup_channels(<<"emqx_c">>) of
[Pid] when is_pid(Pid) ->
Pid;
Pids when is_list(Pids) ->
lists:last(Pids);
_ -> {error, not_found}
end,
Caches = gen_server:call(ClientPid, list_acl_cache),
ct:log("acl caches: ~p", [Caches]),
?assert(length(Caches) > 0),
erlang:send(ClientPid, clean_acl_cache),
?assertEqual(0, length(gen_server:call(ClientPid, list_acl_cache))),
emqtt:stop(Client).
t_cache_k(_) -> t_cache_k(_) ->
error('TODO'). error('TODO').