From c7b44caa1dc8eca211438b512caab42809950d40 Mon Sep 17 00:00:00 2001 From: Karol Kaczmarek Date: Wed, 17 Mar 2021 16:41:04 +0100 Subject: [PATCH] feat(acl): Add possibility to remove all acl cache --- src/emqx_acl_cache.erl | 30 ++++++++++++++++++++---------- test/emqx_acl_cache_SUITE.erl | 24 ++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/src/emqx_acl_cache.erl b/src/emqx_acl_cache.erl index 5e327e210..2ba2bf04d 100644 --- a/src/emqx_acl_cache.erl +++ b/src/emqx_acl_cache.erl @@ -27,6 +27,7 @@ , get_cache_max_size/0 , get_cache_ttl/0 , is_enabled/0 + , drain_cache/0 ]). %% export for test @@ -47,6 +48,7 @@ %% Wrappers for key and value cache_k(PubSub, Topic)-> {PubSub, Topic}. cache_v(AclResult)-> {AclResult, time_now()}. +drain_k() -> {?MODULE, drain_timestamp}. -spec(is_enabled() -> boolean()). is_enabled() -> @@ -86,10 +88,10 @@ get_acl_cache(PubSub, Topic) -> put_acl_cache(PubSub, Topic, AclResult) -> MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), Size = get_cache_size(), - if - Size < MaxSize -> + case Size < MaxSize of + true -> add_acl(PubSub, Topic, AclResult); - Size =:= MaxSize -> + false -> NewestK = get_newest_key(), {_AclResult, CachedAt} = erlang:get(NewestK), if_expired(CachedAt, @@ -145,6 +147,11 @@ foreach_acl_cache(Fun) -> _ = map_acl_cache(Fun), ok. +%% All acl cache entries added before `drain_cache()` invocation will become expired +drain_cache() -> + _ = persistent_term:put(drain_k(), time_now()), + ok. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -185,11 +192,14 @@ incr_cache_size() -> erlang:put(acl_cache_size, get_cache_size() + 1), ok. decr_cache_size() -> Size = get_cache_size(), - if Size > 1 -> + case Size > 1 of + true -> erlang:put(acl_cache_size, Size-1); - Size =< 1 -> + false -> erlang:put(acl_cache_size, 0) - end, ok. + end, + ok. + set_cache_size(N) -> erlang:put(acl_cache_size, N), ok. @@ -239,8 +249,8 @@ time_now() -> erlang:system_time(millisecond). if_expired(CachedAt, Fun) -> TTL = get_cache_ttl(), Now = time_now(), - if (CachedAt + TTL) =< Now -> - Fun(true); - true -> - Fun(false) + CurrentEvictTimestamp = persistent_term:get(drain_k(), 0), + case CachedAt =< CurrentEvictTimestamp orelse (CachedAt + TTL) =< Now of + true -> Fun(true); + false -> Fun(false) end. diff --git a/test/emqx_acl_cache_SUITE.erl b/test/emqx_acl_cache_SUITE.erl index 8c7685aa2..8d760c284 100644 --- a/test/emqx_acl_cache_SUITE.erl +++ b/test/emqx_acl_cache_SUITE.erl @@ -55,6 +55,30 @@ t_clean_acl_cache(_) -> ?assertEqual(0, length(gen_server:call(ClientPid, list_acl_cache))), emqtt:stop(Client). + +t_drain_acl_cache(_) -> + {ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), + emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0), + ct:sleep(100), + ClientPid = case emqx_cm:lookup_channels(<<"emqx_c">>) of + [Pid] when is_pid(Pid) -> + Pid; + Pids when is_list(Pids) -> + lists:last(Pids); + _ -> {error, not_found} + end, + Caches = gen_server:call(ClientPid, list_acl_cache), + ct:log("acl caches: ~p", [Caches]), + ?assert(length(Caches) > 0), + emqx_acl_cache:drain_cache(), + ?assertEqual(0, length(gen_server:call(ClientPid, list_acl_cache))), + ct:sleep(100), + {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), + ?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0), + emqtt:stop(Client). + % optimize?? t_reload_aclfile_and_cleanall(_Config) ->