diff --git a/etc/emqx.conf b/etc/emqx.conf index 62f94b174..f3f46589e 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -434,12 +434,17 @@ acl_nomatch = allow ## Value: File Name acl_file = {{ platform_etc_dir }}/acl.conf +## Whether to enable ACL cache for publish. ## The ACL cache size ## The maximum count of ACL entries allowed for a client. ## -## Value 0 disables ACL cache +## Value: on | off +enable_acl_cache = on + +## The ACL cache size +## The maximum count of ACL entries allowed for a client. ## -## Value: Integer +## Value: Integer greater than 0 ## Default: 32 acl_cache_max_size = 32 diff --git a/priv/emqx.schema b/priv/emqx.schema index f6ee7c621..765363607 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -571,6 +571,12 @@ end}. hidden ]}. +%% @doc Enable ACL cache for publish. +{mapping, "enable_acl_cache", "emqx.enable_acl_cache", [ + {default, on}, + {datatype, flag} +]}. + %% @doc ACL cache time-to-live. {mapping, "acl_cache_ttl", "emqx.acl_cache_ttl", [ {default, "1m"}, @@ -580,9 +586,14 @@ end}. %% @doc ACL cache size. {mapping, "acl_cache_max_size", "emqx.acl_cache_max_size", [ {default, 32}, - {datatype, integer} + {datatype, integer}, + {validators, ["range:gt_0"]} ]}. +{validator, "range:gt_0", "must greater than 0", + fun(X) -> X > 0 end +}. + %%-------------------------------------------------------------------- %% MQTT Protocol %%-------------------------------------------------------------------- diff --git a/src/emqx.app.src b/src/emqx.app.src index c4a8b5c2e..39d876797 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -3,8 +3,8 @@ {vsn,"3.0"}, {modules,[]}, {registered,[emqx_sup]}, - {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy, - minirest]}, + {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy + ]}, {env,[]}, {mod,{emqx_app,[]}}, {maintainers,["Feng Lee "]}, diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 1c5d04b4e..56536501f 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -81,15 +81,15 @@ authenticate(Client, Password, [{Mod, State, _Seq} | Mods]) -> %% @doc Check ACL -spec(check_acl(client(), pubsub(), topic()) -> allow | deny). check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> - CacheEnabled = (emqx_acl_cache:get_cache_max_size() =/= 0), + CacheEnabled = emqx_acl_cache:is_enabled(), check_acl(Client, PubSub, Topic, lookup_mods(acl), CacheEnabled). check_acl(Client, PubSub, Topic, AclMods, false) -> - check_acl_from_plugins(Client, PubSub, Topic, AclMods); + do_check_acl(Client, PubSub, Topic, AclMods); check_acl(Client, PubSub, Topic, AclMods, true) -> case emqx_acl_cache:get_acl_cache(PubSub, Topic) of not_found -> - AclResult = check_acl_from_plugins(Client, PubSub, Topic, AclMods), + AclResult = do_check_acl(Client, PubSub, Topic, AclMods), emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult), AclResult; AclResult -> @@ -189,13 +189,13 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -check_acl_from_plugins(#client{zone = Zone}, _PubSub, _Topic, []) -> +do_check_acl(#client{zone = Zone}, _PubSub, _Topic, []) -> emqx_zone:get_env(Zone, acl_nomatch, deny); -check_acl_from_plugins(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> +do_check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> case Mod:check_acl({Client, PubSub, Topic}, State) of allow -> allow; deny -> deny; - ignore -> check_acl_from_plugins(Client, PubSub, Topic, AclMods) + ignore -> do_check_acl(Client, PubSub, Topic, AclMods) end. %%-------------------------------------------------------------------- diff --git a/src/emqx_acl_cache.erl b/src/emqx_acl_cache.erl index 7db23fb1e..65e1e3305 100644 --- a/src/emqx_acl_cache.erl +++ b/src/emqx_acl_cache.erl @@ -13,6 +13,7 @@ , get_oldest_key/0 , cache_k/2 , cache_v/1 + , is_enabled/0 ]). -type(acl_result() :: allow | deny). @@ -21,6 +22,10 @@ cache_k(PubSub, Topic)-> {PubSub, Topic}. cache_v(AclResult)-> {AclResult, time_now()}. +-spec(is_enabled() -> boolean()). +is_enabled() -> + application:get_env(emqx, enable_acl_cache, true). + %% We'll cleanup the cache before repalcing an expired acl. -spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) -> (acl_result() | not_found)). @@ -90,7 +95,7 @@ get_newest_key() -> keys_queue_pick(queue_rear()). get_cache_max_size() -> - application:get_env(emqx, acl_cache_max_size, 0). + application:get_env(emqx, acl_cache_max_size, 32). get_cache_size() -> case erlang:get(acl_cache_size) of diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index cf859d7c1..f88420e56 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -55,9 +55,10 @@ groups() -> put_get_del_cache, cache_update, cache_expiry, - cache_full_replacement, + cache_replacement, cache_cleanup, - cache_full_cleanup + cache_auto_emtpy, + cache_auto_cleanup ]}, {access_rule, [], [compile_rule, @@ -73,9 +74,10 @@ init_per_group(_Group, Config) -> prepare_config(Group = access_control) -> set_acl_config_file(Group), - application:set_env(emqx, acl_cache_max_size, 0); + application:set_env(emqx, enable_acl_cache, false); prepare_config(Group = access_control_cache_mode) -> set_acl_config_file(Group), + application:set_env(emqx, enable_acl_cache, true), application:set_env(emqx, acl_cache_max_size, 100). set_acl_config_file(_Group) -> @@ -162,12 +164,12 @@ acl_cache_basic(_) -> ok. acl_cache_expiry(_) -> - application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_ttl, 100), SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), - ct:sleep(1100), + ct:sleep(150), not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ok. @@ -186,7 +188,7 @@ acl_cache_full(_) -> acl_cache_cleanup(_) -> %% The acl cache will try to evict memory, if the size is full and the newest %% cache entry is expired - application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_ttl, 100), application:set_env(emqx, acl_cache_max_size, 2), SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, @@ -196,9 +198,10 @@ acl_cache_cleanup(_) -> allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), - ct:sleep(1100), + ct:sleep(150), %% now the cache is full and the newest one - "clients/client1" - %% should be expired, so we'll try to cleanup before putting the next cache entry + %% should be expired, so we'll empty the cache before putting + %% the next cache entry deny = ?AC:check_acl(SelfUser, subscribe, <<"#">>), not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), @@ -222,18 +225,18 @@ put_get_del_cache(_) -> ?assertEqual(?CACHE:cache_k(subscribe, <<"b">>), ?CACHE:get_newest_key()). cache_expiry(_) -> - application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_ttl, 100), application:set_env(emqx, acl_cache_max_size, 30), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), allow = ?CACHE:get_acl_cache(subscribe, <<"a">>), - ct:sleep(1100), + ct:sleep(150), not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, deny), deny = ?CACHE:get_acl_cache(subscribe, <<"a">>), - ct:sleep(1100), + ct:sleep(150), not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>). cache_update(_) -> @@ -249,12 +252,13 @@ cache_update(_) -> %% update the 2nd one ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - %ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]), + ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]), 3 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()). + ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()), + ?assertEqual(?CACHE:cache_k(subscribe, <<"a">>), ?CACHE:get_oldest_key()). -cache_full_replacement(_) -> +cache_replacement(_) -> application:set_env(emqx, acl_cache_ttl, 300000), application:set_env(emqx, acl_cache_max_size, 3), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), @@ -269,40 +273,64 @@ cache_full_replacement(_) -> ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny), 3 = ?CACHE:get_cache_size(), ?assertEqual(?CACHE:cache_k(publish, <<"d">>), ?CACHE:get_newest_key()), + ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_oldest_key()), ok = ?CACHE:put_acl_cache(publish, <<"e">>, deny), 3 = ?CACHE:get_cache_size(), ?assertEqual(?CACHE:cache_k(publish, <<"e">>), ?CACHE:get_newest_key()), + ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()), not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), not_found = ?CACHE:get_acl_cache(publish, <<"b">>), allow = ?CACHE:get_acl_cache(publish, <<"c">>). cache_cleanup(_) -> - application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_ttl, 100), application:set_env(emqx, acl_cache_max_size, 30), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ct:sleep(150), ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), 3 = ?CACHE:get_cache_size(), - ct:sleep(1100), ?CACHE:cleanup_acl_cache(), - 0 = ?CACHE:get_cache_size(). + ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()), + 1 = ?CACHE:get_cache_size(). -cache_full_cleanup(_) -> - application:set_env(emqx, acl_cache_ttl, 1000), +cache_auto_emtpy(_) -> + %% verify cache is emptied when cache full and even the newest + %% one is expired. + application:set_env(emqx, acl_cache_ttl, 100), application:set_env(emqx, acl_cache_max_size, 3), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), 3 = ?CACHE:get_cache_size(), - ct:sleep(1100), - %% verify auto cleanup upon cache full + ct:sleep(150), ok = ?CACHE:put_acl_cache(subscribe, <<"d">>, deny), 1 = ?CACHE:get_cache_size(). +cache_auto_cleanup(_) -> + %% verify we'll cleanup expired entries when we got a exipired acl + %% from cache. + application:set_env(emqx, acl_cache_ttl, 100), + application:set_env(emqx, acl_cache_max_size, 30), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ct:sleep(150), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny), + 4 = ?CACHE:get_cache_size(), + + %% "a" and "b" expires, while "c" and "d" not + not_found = ?CACHE:get_acl_cache(publish, <<"b">>), + 2 = ?CACHE:get_cache_size(), + + ct:sleep(150), %% now "c" and "d" expires + not_found = ?CACHE:get_acl_cache(publish, <<"c">>), + 0 = ?CACHE:get_cache_size(). + %%-------------------------------------------------------------------- %% emqx_access_rule %%--------------------------------------------------------------------