From 7c0fd642bb3573ed6e3eedd4b4aa9223db12c635 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 6 Jul 2021 18:36:40 +0800 Subject: [PATCH] feat(acl): make acl cache work with the new config --- apps/emqx/src/emqx_access_control.erl | 10 ++--- apps/emqx/src/emqx_acl_cache.erl | 64 +++++++++++++-------------- apps/emqx/src/emqx_channel.erl | 2 - 3 files changed, 37 insertions(+), 39 deletions(-) diff --git a/apps/emqx/src/emqx_access_control.erl b/apps/emqx/src/emqx_access_control.erl index a2a11bd15..4aa8eb505 100644 --- a/apps/emqx/src/emqx_access_control.erl +++ b/apps/emqx/src/emqx_access_control.erl @@ -39,17 +39,17 @@ authenticate(ClientInfo = #{zone := Zone, listener := Listener}) -> %% @doc Check ACL -spec(check_acl(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic()) -> allow | deny). -check_acl(ClientInfo, PubSub, Topic) -> - case emqx_acl_cache:is_enabled() of +check_acl(ClientInfo = #{zone := Zone, listener := Listener}, PubSub, Topic) -> + case emqx_acl_cache:is_enabled(Zone, Listener) of true -> check_acl_cache(ClientInfo, PubSub, Topic); false -> do_check_acl(ClientInfo, PubSub, Topic) end. -check_acl_cache(ClientInfo, PubSub, Topic) -> - case emqx_acl_cache:get_acl_cache(PubSub, Topic) of +check_acl_cache(ClientInfo = #{zone := Zone, listener := Listener}, PubSub, Topic) -> + case emqx_acl_cache:get_acl_cache(Zone, Listener, PubSub, Topic) of not_found -> AclResult = do_check_acl(ClientInfo, PubSub, Topic), - emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult), + emqx_acl_cache:put_acl_cache(Zone, Listener, PubSub, Topic, AclResult), AclResult; AclResult -> AclResult end. diff --git a/apps/emqx/src/emqx_acl_cache.erl b/apps/emqx/src/emqx_acl_cache.erl index 4cbe6b06a..a49f42c83 100644 --- a/apps/emqx/src/emqx_acl_cache.erl +++ b/apps/emqx/src/emqx_acl_cache.erl @@ -19,14 +19,14 @@ -include("emqx.hrl"). -export([ list_acl_cache/0 - , get_acl_cache/2 - , put_acl_cache/3 - , cleanup_acl_cache/0 + , get_acl_cache/4 + , put_acl_cache/5 + , cleanup_acl_cache/2 , empty_acl_cache/0 , dump_acl_cache/0 - , get_cache_max_size/0 - , get_cache_ttl/0 - , is_enabled/0 + , get_cache_max_size/2 + , get_cache_ttl/2 + , is_enabled/2 , drain_cache/0 ]). @@ -50,43 +50,44 @@ cache_k(PubSub, Topic)-> {PubSub, Topic}. cache_v(AclResult)-> {AclResult, time_now()}. drain_k() -> {?MODULE, drain_timestamp}. --spec(is_enabled() -> boolean()). -is_enabled() -> - application:get_env(emqx, enable_acl_cache, true). +-spec(is_enabled(atom(), atom()) -> boolean()). +is_enabled(Zone, Listener) -> + emqx_config:get_listener_conf(Zone, Listener, [acl, cache, enable]). --spec(get_cache_max_size() -> integer()). -get_cache_max_size() -> - application:get_env(emqx, acl_cache_max_size, 32). +-spec(get_cache_max_size(atom(), atom()) -> integer()). +get_cache_max_size(Zone, Listener) -> + emqx_config:get_listener_conf(Zone, Listener, [acl, cache, max_size]). --spec(get_cache_ttl() -> integer()). -get_cache_ttl() -> - application:get_env(emqx, acl_cache_ttl, 60000). +-spec(get_cache_ttl(atom(), atom()) -> integer()). +get_cache_ttl(Zone, Listener) -> + emqx_config:get_listener_conf(Zone, Listener, [acl, cache, ttl]). -spec(list_acl_cache() -> [acl_cache_entry()]). list_acl_cache() -> - cleanup_acl_cache(), map_acl_cache(fun(Cache) -> Cache end). %% We'll cleanup the cache before replacing an expired acl. --spec(get_acl_cache(emqx_types:pubsub(), emqx_topic:topic()) -> (acl_result() | not_found)). -get_acl_cache(PubSub, Topic) -> +-spec get_acl_cache(atom(), atom(), emqx_types:pubsub(), emqx_topic:topic()) -> + acl_result() | not_found. +get_acl_cache(Zone, Listener, PubSub, Topic) -> case erlang:get(cache_k(PubSub, Topic)) of undefined -> not_found; {AclResult, CachedAt} -> - if_expired(CachedAt, + if_expired(get_cache_ttl(Zone, Listener), CachedAt, fun(false) -> AclResult; (true) -> - cleanup_acl_cache(), + cleanup_acl_cache(Zone, Listener), not_found end) end. %% If the cache get full, and also the latest one %% is expired, then delete all the cache entries --spec(put_acl_cache(emqx_types:pubsub(), emqx_topic:topic(), acl_result()) -> ok). -put_acl_cache(PubSub, Topic, AclResult) -> - MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), +-spec put_acl_cache(atom(), atom(), emqx_types:pubsub(), emqx_topic:topic(), acl_result()) + -> ok. +put_acl_cache(Zone, Listener, PubSub, Topic, AclResult) -> + MaxSize = get_cache_max_size(Zone, Listener), true = (MaxSize =/= 0), Size = get_cache_size(), case Size < MaxSize of true -> @@ -94,7 +95,7 @@ put_acl_cache(PubSub, Topic, AclResult) -> false -> NewestK = get_newest_key(), {_AclResult, CachedAt} = erlang:get(NewestK), - if_expired(CachedAt, + if_expired(get_cache_ttl(Zone, Listener), CachedAt, fun(true) -> % all cache expired, cleanup first empty_acl_cache(), @@ -121,10 +122,10 @@ evict_acl_cache() -> decr_cache_size(). %% cleanup all the expired cache entries --spec(cleanup_acl_cache() -> ok). -cleanup_acl_cache() -> +-spec(cleanup_acl_cache(atom(), atom()) -> ok). +cleanup_acl_cache(Zone, Listener) -> keys_queue_set( - cleanup_acl(keys_queue_get())). + cleanup_acl(get_cache_ttl(Zone, Listener), keys_queue_get())). get_oldest_key() -> keys_queue_pick(queue_front()). @@ -174,16 +175,16 @@ update_acl(K, V) -> erlang:put(K, V), keys_queue_update(K). -cleanup_acl(KeysQ) -> +cleanup_acl(TTL, KeysQ) -> case queue:out(KeysQ) of {{value, OldestK}, KeysQ2} -> {_AclResult, CachedAt} = erlang:get(OldestK), - if_expired(CachedAt, + if_expired(TTL, CachedAt, fun(false) -> KeysQ; (true) -> erlang:erase(OldestK), decr_cache_size(), - cleanup_acl(KeysQ2) + cleanup_acl(TTL, KeysQ2) end); {empty, KeysQ} -> KeysQ end. @@ -246,8 +247,7 @@ queue_rear() -> fun queue:get_r/1. time_now() -> erlang:system_time(millisecond). -if_expired(CachedAt, Fun) -> - TTL = get_cache_ttl(), +if_expired(TTL, CachedAt, Fun) -> Now = time_now(), CurrentEvictTimestamp = persistent_term:get(drain_k(), 0), case CachedAt =< CurrentEvictTimestamp orelse (CachedAt + TTL) =< Now of diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index bbc48e709..4c9ee7682 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1622,8 +1622,6 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) -> %%-------------------------------------------------------------------- %% Is ACL enabled? - --compile({inline, [is_acl_enabled/1]}). is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) -> (not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [acl, enable]).