diff --git a/apps/emqx/src/emqx_access_control.erl b/apps/emqx/src/emqx_access_control.erl index 71531c421..a1ca54680 100644 --- a/apps/emqx/src/emqx_access_control.erl +++ b/apps/emqx/src/emqx_access_control.erl @@ -34,17 +34,17 @@ authenticate(Credential) -> %% @doc Check ACL -spec authorize(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic()) -> allow | deny. -authorize(ClientInfo = #{zone := Zone, listener := Listener}, PubSub, Topic) -> - case emqx_acl_cache:is_enabled(Zone, Listener) of +authorize(ClientInfo = #{zone := Zone}, PubSub, Topic) -> + case emqx_acl_cache:is_enabled(Zone) of true -> check_authorization_cache(ClientInfo, PubSub, Topic); false -> do_authorize(ClientInfo, PubSub, Topic) end. -check_authorization_cache(ClientInfo = #{zone := Zone, listener := Listener}, PubSub, Topic) -> - case emqx_acl_cache:get_acl_cache(Zone, Listener, PubSub, Topic) of +check_authorization_cache(ClientInfo = #{zone := Zone}, PubSub, Topic) -> + case emqx_acl_cache:get_acl_cache(Zone, PubSub, Topic) of not_found -> AclResult = do_authorize(ClientInfo, PubSub, Topic), - emqx_acl_cache:put_acl_cache(Zone, Listener, PubSub, Topic, AclResult), + emqx_acl_cache:put_acl_cache(Zone, PubSub, Topic, AclResult), AclResult; AclResult -> AclResult end. diff --git a/apps/emqx/src/emqx_acl_cache.erl b/apps/emqx/src/emqx_acl_cache.erl index f10aab711..189faca2f 100644 --- a/apps/emqx/src/emqx_acl_cache.erl +++ b/apps/emqx/src/emqx_acl_cache.erl @@ -18,15 +18,15 @@ -include("emqx.hrl"). --export([ list_acl_cache/2 - , get_acl_cache/4 - , put_acl_cache/5 - , cleanup_acl_cache/2 +-export([ list_acl_cache/1 + , get_acl_cache/3 + , put_acl_cache/4 + , cleanup_acl_cache/1 , empty_acl_cache/0 , dump_acl_cache/0 - , get_cache_max_size/2 - , get_cache_ttl/2 - , is_enabled/2 + , get_cache_max_size/1 + , get_cache_ttl/1 + , is_enabled/1 , drain_cache/0 ]). @@ -50,45 +50,45 @@ cache_k(PubSub, Topic)-> {PubSub, Topic}. cache_v(AclResult)-> {AclResult, time_now()}. drain_k() -> {?MODULE, drain_timestamp}. --spec(is_enabled(atom(), atom()) -> boolean()). -is_enabled(Zone, Listener) -> +-spec(is_enabled(atom()) -> boolean()). +is_enabled(Zone) -> emqx_config:get_zone_conf(Zone, [acl, cache, enable]). --spec(get_cache_max_size(atom(), atom()) -> integer()). -get_cache_max_size(Zone, Listener) -> +-spec(get_cache_max_size(atom()) -> integer()). +get_cache_max_size(Zone) -> emqx_config:get_zone_conf(Zone, [acl, cache, max_size]). --spec(get_cache_ttl(atom(), atom()) -> integer()). -get_cache_ttl(Zone, Listener) -> +-spec(get_cache_ttl(atom()) -> integer()). +get_cache_ttl(Zone) -> emqx_config:get_zone_conf(Zone, [acl, cache, ttl]). --spec(list_acl_cache(atom(), atom()) -> [acl_cache_entry()]). -list_acl_cache(Zone, Listener) -> - cleanup_acl_cache(Zone, Listener), +-spec(list_acl_cache(atom()) -> [acl_cache_entry()]). +list_acl_cache(Zone) -> + cleanup_acl_cache(Zone), map_acl_cache(fun(Cache) -> Cache end). %% We'll cleanup the cache before replacing an expired acl. --spec get_acl_cache(atom(), atom(), emqx_types:pubsub(), emqx_topic:topic()) -> +-spec get_acl_cache(atom(), emqx_types:pubsub(), emqx_topic:topic()) -> acl_result() | not_found. -get_acl_cache(Zone, Listener, PubSub, Topic) -> +get_acl_cache(Zone, PubSub, Topic) -> case erlang:get(cache_k(PubSub, Topic)) of undefined -> not_found; {AclResult, CachedAt} -> - if_expired(get_cache_ttl(Zone, Listener), CachedAt, + if_expired(get_cache_ttl(Zone), CachedAt, fun(false) -> AclResult; (true) -> - cleanup_acl_cache(Zone, Listener), + cleanup_acl_cache(Zone), not_found end) end. %% If the cache get full, and also the latest one %% is expired, then delete all the cache entries --spec put_acl_cache(atom(), atom(), emqx_types:pubsub(), emqx_topic:topic(), acl_result()) +-spec put_acl_cache(atom(), emqx_types:pubsub(), emqx_topic:topic(), acl_result()) -> ok. -put_acl_cache(Zone, Listener, PubSub, Topic, AclResult) -> - MaxSize = get_cache_max_size(Zone, Listener), true = (MaxSize =/= 0), +put_acl_cache(Zone, PubSub, Topic, AclResult) -> + MaxSize = get_cache_max_size(Zone), true = (MaxSize =/= 0), Size = get_cache_size(), case Size < MaxSize of true -> @@ -96,7 +96,7 @@ put_acl_cache(Zone, Listener, PubSub, Topic, AclResult) -> false -> NewestK = get_newest_key(), {_AclResult, CachedAt} = erlang:get(NewestK), - if_expired(get_cache_ttl(Zone, Listener), CachedAt, + if_expired(get_cache_ttl(Zone), CachedAt, fun(true) -> % all cache expired, cleanup first empty_acl_cache(), @@ -123,10 +123,10 @@ evict_acl_cache() -> decr_cache_size(). %% cleanup all the expired cache entries --spec(cleanup_acl_cache(atom(), atom()) -> ok). -cleanup_acl_cache(Zone, Listener) -> +-spec(cleanup_acl_cache(atom()) -> ok). +cleanup_acl_cache(Zone) -> keys_queue_set( - cleanup_acl(get_cache_ttl(Zone, Listener), keys_queue_get())). + cleanup_acl(get_cache_ttl(Zone), keys_queue_get())). get_oldest_key() -> keys_queue_pick(queue_front()). diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 8c59f10c9..2769559c7 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -956,9 +956,9 @@ handle_call({takeover, 'end'}, Channel = #channel{session = Session, AllPendings = lists:append(Delivers, Pendings), disconnect_and_shutdown(takeovered, AllPendings, Channel); -handle_call(list_acl_cache, #channel{clientinfo = #{zone := Zone, listener := Listener}} +handle_call(list_acl_cache, #channel{clientinfo = #{zone := Zone}} = Channel) -> - {reply, emqx_acl_cache:list_acl_cache(Zone, Listener), Channel}; + {reply, emqx_acl_cache:list_acl_cache(Zone), Channel}; handle_call({quota, Policy}, Channel) -> Zone = info(zone, Channel),