diff --git a/src/emqx_acl_cache.erl b/src/emqx_acl_cache.erl index b417f16fa..d94c165e8 100644 --- a/src/emqx_acl_cache.erl +++ b/src/emqx_acl_cache.erl @@ -18,21 +18,31 @@ -include("emqx.hrl"). --export([ get_acl_cache/2 +-export([ list_acl_cache/0 + , get_acl_cache/2 , put_acl_cache/3 , cleanup_acl_cache/0 , empty_acl_cache/0 , dump_acl_cache/0 - , get_cache_size/0 , get_cache_max_size/0 - , get_newest_key/0 - , get_oldest_key/0 - , cache_k/2 - , cache_v/1 + , get_cache_ttl/0 , is_enabled/0 ]). +%% export for test +-export([ cache_k/2 + , cache_v/1 + , get_cache_size/0 + , get_newest_key/0 + , get_oldest_key/0 + ]). + -type(acl_result() :: allow | deny). +-type(system_time() :: integer()). +-type(cache_key() :: {emqx_types:pubsub(), emqx_types:topic()}). +-type(cache_val() :: {acl_result(), system_time()}). + +-type(acl_cache_entry() :: {cache_key(), cache_val()}). %% Wrappers for key and value cache_k(PubSub, Topic)-> {PubSub, Topic}. @@ -42,8 +52,21 @@ cache_v(AclResult)-> {AclResult, time_now()}. is_enabled() -> application:get_env(emqx, enable_acl_cache, true). -%% We'll cleanup the cache before repalcing an expired acl. --spec(get_acl_cache(publish | subscribe, emqx_topic:topic()) -> (acl_result() | not_found)). +-spec(get_cache_max_size() -> integer()). +get_cache_max_size() -> + application:get_env(emqx, acl_cache_max_size, 32). + +-spec(get_cache_ttl() -> integer()). +get_cache_ttl() -> + application:get_env(emqx, acl_cache_ttl, 60000). + +-spec(list_acl_cache() -> [acl_cache_entry()]). +list_acl_cache() -> + cleanup_acl_cache(), + map_acl_cache(fun(Cache) -> Cache end). + +%% We'll cleanup the cache before replacing an expired acl. +-spec(get_acl_cache(emqx_types:pubsub(), emqx_topic:topic()) -> (acl_result() | not_found)). get_acl_cache(PubSub, Topic) -> case erlang:get(cache_k(PubSub, Topic)) of undefined -> not_found; @@ -59,7 +82,7 @@ get_acl_cache(PubSub, Topic) -> %% If the cache get full, and also the latest one %% is expired, then delete all the cache entries --spec(put_acl_cache(publish | subscribe, emqx_topic:topic(), acl_result()) -> ok). +-spec(put_acl_cache(emqx_types:pubsub(), emqx_topic:topic(), acl_result()) -> ok). put_acl_cache(PubSub, Topic, AclResult) -> MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), Size = get_cache_size(), @@ -97,7 +120,7 @@ evict_acl_cache() -> erlang:erase(OldestK), decr_cache_size(). -%% cleanup all the exipired cache entries +%% cleanup all the expired cache entries -spec(cleanup_acl_cache() -> ok). cleanup_acl_cache() -> keys_queue_set( @@ -108,9 +131,6 @@ get_oldest_key() -> get_newest_key() -> keys_queue_pick(queue_rear()). -get_cache_max_size() -> - application:get_env(emqx, acl_cache_max_size, 32). - get_cache_size() -> case erlang:get(acl_cache_size) of undefined -> 0; @@ -215,7 +235,7 @@ queue_rear() -> fun queue:get_r/1. time_now() -> erlang:system_time(millisecond). if_expired(CachedAt, Fun) -> - TTL = application:get_env(emqx, acl_cache_ttl, 60000), + TTL = get_cache_ttl(), Now = time_now(), if (CachedAt + TTL) =< Now -> Fun(true); diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 99a595929..e995bd47e 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -722,6 +722,9 @@ handle_call({takeover, 'end'}, Channel = #channel{session = Session, AllPendings = lists:append(Delivers, Pendings), shutdown(takeovered, AllPendings, Channel); +handle_call(list_acl_cache, Channel) -> + {reply, emqx_acl_cache:list_acl_cache(), Channel}; + handle_call(Req, Channel) -> ?LOG(error, "Unexpected call: ~p", [Req]), reply(ignored, Channel). @@ -775,6 +778,11 @@ handle_info({sock_closed, Reason}, Channel = #channel{conninfo = ConnInfo, shutdown(Reason, Channel2) end; +handle_info(clean_acl_cache, Channel) -> + ?LOG(debug, "clear acl cache"), + ok = emqx_acl_cache:empty_acl_cache(), + {ok, Channel}; + handle_info(Info, Channel) -> ?LOG(error, "Unexpected info: ~p~n", [Info]), error(unexpected_info), diff --git a/test/emqx_acl_cache_SUITE.erl b/test/emqx_acl_cache_SUITE.erl index 1971612b3..67fde1df0 100644 --- a/test/emqx_acl_cache_SUITE.erl +++ b/test/emqx_acl_cache_SUITE.erl @@ -23,12 +23,40 @@ all() -> emqx_ct:all(?MODULE). +init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + init_per_testcase(_TestCase, Config) -> Config. end_per_testcase(_TestCase, Config) -> Config. +t_clean_acl_cache(_Config) -> + {ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), + emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0), + ct:sleep(100), + ClientPid = case emqx_cm:lookup_channels(<<"emqx_c">>) of + [Pid] when is_pid(Pid) -> + Pid; + Pids when is_list(Pids) -> + lists:last(Pids); + _ -> {error, not_found} + end, + Caches = gen_server:call(ClientPid, list_acl_cache), + ct:log("acl caches: ~p", [Caches]), + ?assert(length(Caches) > 0), + erlang:send(ClientPid, clean_acl_cache), + ?assertEqual(0, length(gen_server:call(ClientPid, list_acl_cache))), + emqtt:stop(Client). + t_cache_k(_) -> error('TODO').