feat(acl): make acl cache work with the new config
This commit is contained in:
parent
5efd5c8d3b
commit
7c0fd642bb
|
@ -39,17 +39,17 @@ authenticate(ClientInfo = #{zone := Zone, listener := Listener}) ->
|
||||||
%% @doc Check ACL
|
%% @doc Check ACL
|
||||||
-spec(check_acl(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic())
|
-spec(check_acl(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic())
|
||||||
-> allow | deny).
|
-> allow | deny).
|
||||||
check_acl(ClientInfo, PubSub, Topic) ->
|
check_acl(ClientInfo = #{zone := Zone, listener := Listener}, PubSub, Topic) ->
|
||||||
case emqx_acl_cache:is_enabled() of
|
case emqx_acl_cache:is_enabled(Zone, Listener) of
|
||||||
true -> check_acl_cache(ClientInfo, PubSub, Topic);
|
true -> check_acl_cache(ClientInfo, PubSub, Topic);
|
||||||
false -> do_check_acl(ClientInfo, PubSub, Topic)
|
false -> do_check_acl(ClientInfo, PubSub, Topic)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_acl_cache(ClientInfo, PubSub, Topic) ->
|
check_acl_cache(ClientInfo = #{zone := Zone, listener := Listener}, PubSub, Topic) ->
|
||||||
case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
|
case emqx_acl_cache:get_acl_cache(Zone, Listener, PubSub, Topic) of
|
||||||
not_found ->
|
not_found ->
|
||||||
AclResult = do_check_acl(ClientInfo, PubSub, Topic),
|
AclResult = do_check_acl(ClientInfo, PubSub, Topic),
|
||||||
emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult),
|
emqx_acl_cache:put_acl_cache(Zone, Listener, PubSub, Topic, AclResult),
|
||||||
AclResult;
|
AclResult;
|
||||||
AclResult -> AclResult
|
AclResult -> AclResult
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -19,14 +19,14 @@
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
|
||||||
-export([ list_acl_cache/0
|
-export([ list_acl_cache/0
|
||||||
, get_acl_cache/2
|
, get_acl_cache/4
|
||||||
, put_acl_cache/3
|
, put_acl_cache/5
|
||||||
, cleanup_acl_cache/0
|
, cleanup_acl_cache/2
|
||||||
, empty_acl_cache/0
|
, empty_acl_cache/0
|
||||||
, dump_acl_cache/0
|
, dump_acl_cache/0
|
||||||
, get_cache_max_size/0
|
, get_cache_max_size/2
|
||||||
, get_cache_ttl/0
|
, get_cache_ttl/2
|
||||||
, is_enabled/0
|
, is_enabled/2
|
||||||
, drain_cache/0
|
, drain_cache/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -50,43 +50,44 @@ cache_k(PubSub, Topic)-> {PubSub, Topic}.
|
||||||
cache_v(AclResult)-> {AclResult, time_now()}.
|
cache_v(AclResult)-> {AclResult, time_now()}.
|
||||||
drain_k() -> {?MODULE, drain_timestamp}.
|
drain_k() -> {?MODULE, drain_timestamp}.
|
||||||
|
|
||||||
-spec(is_enabled() -> boolean()).
|
-spec(is_enabled(atom(), atom()) -> boolean()).
|
||||||
is_enabled() ->
|
is_enabled(Zone, Listener) ->
|
||||||
application:get_env(emqx, enable_acl_cache, true).
|
emqx_config:get_listener_conf(Zone, Listener, [acl, cache, enable]).
|
||||||
|
|
||||||
-spec(get_cache_max_size() -> integer()).
|
-spec(get_cache_max_size(atom(), atom()) -> integer()).
|
||||||
get_cache_max_size() ->
|
get_cache_max_size(Zone, Listener) ->
|
||||||
application:get_env(emqx, acl_cache_max_size, 32).
|
emqx_config:get_listener_conf(Zone, Listener, [acl, cache, max_size]).
|
||||||
|
|
||||||
-spec(get_cache_ttl() -> integer()).
|
-spec(get_cache_ttl(atom(), atom()) -> integer()).
|
||||||
get_cache_ttl() ->
|
get_cache_ttl(Zone, Listener) ->
|
||||||
application:get_env(emqx, acl_cache_ttl, 60000).
|
emqx_config:get_listener_conf(Zone, Listener, [acl, cache, ttl]).
|
||||||
|
|
||||||
-spec(list_acl_cache() -> [acl_cache_entry()]).
|
-spec(list_acl_cache() -> [acl_cache_entry()]).
|
||||||
list_acl_cache() ->
|
list_acl_cache() ->
|
||||||
cleanup_acl_cache(),
|
|
||||||
map_acl_cache(fun(Cache) -> Cache end).
|
map_acl_cache(fun(Cache) -> Cache end).
|
||||||
|
|
||||||
%% We'll cleanup the cache before replacing an expired acl.
|
%% We'll cleanup the cache before replacing an expired acl.
|
||||||
-spec(get_acl_cache(emqx_types:pubsub(), emqx_topic:topic()) -> (acl_result() | not_found)).
|
-spec get_acl_cache(atom(), atom(), emqx_types:pubsub(), emqx_topic:topic()) ->
|
||||||
get_acl_cache(PubSub, Topic) ->
|
acl_result() | not_found.
|
||||||
|
get_acl_cache(Zone, Listener, PubSub, Topic) ->
|
||||||
case erlang:get(cache_k(PubSub, Topic)) of
|
case erlang:get(cache_k(PubSub, Topic)) of
|
||||||
undefined -> not_found;
|
undefined -> not_found;
|
||||||
{AclResult, CachedAt} ->
|
{AclResult, CachedAt} ->
|
||||||
if_expired(CachedAt,
|
if_expired(get_cache_ttl(Zone, Listener), CachedAt,
|
||||||
fun(false) ->
|
fun(false) ->
|
||||||
AclResult;
|
AclResult;
|
||||||
(true) ->
|
(true) ->
|
||||||
cleanup_acl_cache(),
|
cleanup_acl_cache(Zone, Listener),
|
||||||
not_found
|
not_found
|
||||||
end)
|
end)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% If the cache get full, and also the latest one
|
%% If the cache get full, and also the latest one
|
||||||
%% is expired, then delete all the cache entries
|
%% is expired, then delete all the cache entries
|
||||||
-spec(put_acl_cache(emqx_types:pubsub(), emqx_topic:topic(), acl_result()) -> ok).
|
-spec put_acl_cache(atom(), atom(), emqx_types:pubsub(), emqx_topic:topic(), acl_result())
|
||||||
put_acl_cache(PubSub, Topic, AclResult) ->
|
-> ok.
|
||||||
MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
|
put_acl_cache(Zone, Listener, PubSub, Topic, AclResult) ->
|
||||||
|
MaxSize = get_cache_max_size(Zone, Listener), true = (MaxSize =/= 0),
|
||||||
Size = get_cache_size(),
|
Size = get_cache_size(),
|
||||||
case Size < MaxSize of
|
case Size < MaxSize of
|
||||||
true ->
|
true ->
|
||||||
|
@ -94,7 +95,7 @@ put_acl_cache(PubSub, Topic, AclResult) ->
|
||||||
false ->
|
false ->
|
||||||
NewestK = get_newest_key(),
|
NewestK = get_newest_key(),
|
||||||
{_AclResult, CachedAt} = erlang:get(NewestK),
|
{_AclResult, CachedAt} = erlang:get(NewestK),
|
||||||
if_expired(CachedAt,
|
if_expired(get_cache_ttl(Zone, Listener), CachedAt,
|
||||||
fun(true) ->
|
fun(true) ->
|
||||||
% all cache expired, cleanup first
|
% all cache expired, cleanup first
|
||||||
empty_acl_cache(),
|
empty_acl_cache(),
|
||||||
|
@ -121,10 +122,10 @@ evict_acl_cache() ->
|
||||||
decr_cache_size().
|
decr_cache_size().
|
||||||
|
|
||||||
%% cleanup all the expired cache entries
|
%% cleanup all the expired cache entries
|
||||||
-spec(cleanup_acl_cache() -> ok).
|
-spec(cleanup_acl_cache(atom(), atom()) -> ok).
|
||||||
cleanup_acl_cache() ->
|
cleanup_acl_cache(Zone, Listener) ->
|
||||||
keys_queue_set(
|
keys_queue_set(
|
||||||
cleanup_acl(keys_queue_get())).
|
cleanup_acl(get_cache_ttl(Zone, Listener), keys_queue_get())).
|
||||||
|
|
||||||
get_oldest_key() ->
|
get_oldest_key() ->
|
||||||
keys_queue_pick(queue_front()).
|
keys_queue_pick(queue_front()).
|
||||||
|
@ -174,16 +175,16 @@ update_acl(K, V) ->
|
||||||
erlang:put(K, V),
|
erlang:put(K, V),
|
||||||
keys_queue_update(K).
|
keys_queue_update(K).
|
||||||
|
|
||||||
cleanup_acl(KeysQ) ->
|
cleanup_acl(TTL, KeysQ) ->
|
||||||
case queue:out(KeysQ) of
|
case queue:out(KeysQ) of
|
||||||
{{value, OldestK}, KeysQ2} ->
|
{{value, OldestK}, KeysQ2} ->
|
||||||
{_AclResult, CachedAt} = erlang:get(OldestK),
|
{_AclResult, CachedAt} = erlang:get(OldestK),
|
||||||
if_expired(CachedAt,
|
if_expired(TTL, CachedAt,
|
||||||
fun(false) -> KeysQ;
|
fun(false) -> KeysQ;
|
||||||
(true) ->
|
(true) ->
|
||||||
erlang:erase(OldestK),
|
erlang:erase(OldestK),
|
||||||
decr_cache_size(),
|
decr_cache_size(),
|
||||||
cleanup_acl(KeysQ2)
|
cleanup_acl(TTL, KeysQ2)
|
||||||
end);
|
end);
|
||||||
{empty, KeysQ} -> KeysQ
|
{empty, KeysQ} -> KeysQ
|
||||||
end.
|
end.
|
||||||
|
@ -246,8 +247,7 @@ queue_rear() -> fun queue:get_r/1.
|
||||||
|
|
||||||
time_now() -> erlang:system_time(millisecond).
|
time_now() -> erlang:system_time(millisecond).
|
||||||
|
|
||||||
if_expired(CachedAt, Fun) ->
|
if_expired(TTL, CachedAt, Fun) ->
|
||||||
TTL = get_cache_ttl(),
|
|
||||||
Now = time_now(),
|
Now = time_now(),
|
||||||
CurrentEvictTimestamp = persistent_term:get(drain_k(), 0),
|
CurrentEvictTimestamp = persistent_term:get(drain_k(), 0),
|
||||||
case CachedAt =< CurrentEvictTimestamp orelse (CachedAt + TTL) =< Now of
|
case CachedAt =< CurrentEvictTimestamp orelse (CachedAt + TTL) =< Now of
|
||||||
|
|
|
@ -1622,8 +1622,6 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Is ACL enabled?
|
%% Is ACL enabled?
|
||||||
|
|
||||||
-compile({inline, [is_acl_enabled/1]}).
|
|
||||||
is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) ->
|
is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) ->
|
||||||
(not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [acl, enable]).
|
(not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [acl, enable]).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue