From 737fe193317e1ed05f7f3cff6ab2b3a6c0c82312 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Tue, 14 Aug 2018 18:02:27 +0800 Subject: [PATCH 1/5] update acl test cases --- src/emqx.app.src | 3 +- src/emqx_acl_internal.erl | 5 ++-- test/emqx_SUITE.erl | 6 ++-- test/emqx_access_SUITE.erl | 22 ++++++--------- test/emqx_broker_SUITE.erl | 2 +- test/emqx_mqueue_SUITE.erl | 58 +++++++++++++++++++------------------- 6 files changed, 47 insertions(+), 49 deletions(-) diff --git a/src/emqx.app.src b/src/emqx.app.src index d963e3662..c4a8b5c2e 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -3,7 +3,8 @@ {vsn,"3.0"}, {modules,[]}, {registered,[emqx_sup]}, - {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy]}, + {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy, + minirest]}, {env,[]}, {mod,{emqx_app,[]}}, {maintainers,["Feng Lee "]}, diff --git a/src/emqx_acl_internal.erl b/src/emqx_acl_internal.erl index 65a3199ae..23042dcbe 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_acl_internal.erl @@ -109,11 +109,12 @@ reload_acl(#state{config = undefined}) -> reload_acl(State) -> case catch load_rules_from_file(State) of {'EXIT', Error} -> {error, Error}; - true -> io:format("~s~n", ["reload acl_internal successfully"]), ok + #state{config=File} -> + io:format("reload acl_internal successfully: ~p~n", [File]), + ok end. %% @doc ACL Module Description -spec(description() -> string()). description() -> "Internal ACL with etc/acl.conf". - diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index e3e0bbb38..f2f00e55e 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -19,7 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqttc/include/emqttc_packet.hrl"). +-include("emqx_mqtt.hrl"). -define(APP, emqx). @@ -79,7 +79,7 @@ mqtt_connect_with_tcp(_) -> Packet = raw_send_serialise(?CLIENT), gen_tcp:send(Sock, Packet), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), + {ok, ?CONNACK_PACKET(0), _} = raw_recv_pase(Data), gen_tcp:close(Sock). mqtt_connect_with_ssl_oneway(_) -> @@ -133,7 +133,7 @@ mqtt_connect_with_ws(_Config) -> Packet = raw_send_serialise(?CLIENT), ok = rfc6455_client:send_binary(WS, Packet), {binary, P} = rfc6455_client:recv(WS), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(P), + {ok, ?CONNACK_PACKET(0), _} = raw_recv_pase(P), {close, _} = rfc6455_client:close(WS), ok. diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index 270d55f57..f206cca05 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -59,11 +59,8 @@ prepare_config() -> {deny, all, subscribe, ["$SYS/#", "#"]}, {deny, all}], write_config("access_SUITE_acl.conf", Rules), - Config = [{auth, anonymous, []}, - {acl, internal, [{config, "access_SUITE_acl.conf"}, - {nomatch, allow}]}], - write_config("access_SUITE_emqx.conf", Config), - application:set_env(emqx, conf, "access_SUITE_emqx.conf"). + application:set_env(emqx, acl_file, "access_SUITE_acl.conf"), + application:set_env(emqx, acl_cache, false). write_config(Filename, Terms) -> file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]). @@ -119,14 +116,14 @@ unregister_mod(_) -> [] = ?AC:lookup_mods(auth). check_acl(_) -> - User1 = #client{client_id = <<"client1">>, username = <<"testuser">>}, - User2 = #client{client_id = <<"client2">>, username = <<"xyz">>}, + User1 = #client{id = <<"client1">>, username = <<"testuser">>}, + User2 = #client{id = <<"client2">>, username = <<"xyz">>}, allow = ?AC:check_acl(User1, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(User1, subscribe, <<"clients/client1">>), - allow = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>), + deny = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>), allow = ?AC:check_acl(User1, publish, <<"users/testuser/1">>), allow = ?AC:check_acl(User1, subscribe, <<"a/b/c">>), - allow = ?AC:check_acl(User2, subscribe, <<"a/b/c">>). + deny = ?AC:check_acl(User2, subscribe, <<"a/b/c">>). %%-------------------------------------------------------------------- %% emqx_access_rule @@ -159,9 +156,9 @@ compile_rule(_) -> {deny, all} = compile({deny, all}). match_rule(_) -> - User = #client{peername = {{127,0,0,1}, 2948}, client_id = <<"testClient">>, username = <<"TestUser">>}, - User2 = #client{peername = {{192,168,0,10}, 3028}, client_id = <<"testClient">>, username = <<"TestUser">>}, - + User = #client{peername = {{127,0,0,1}, 2948}, id = <<"testClient">>, username = <<"TestUser">>}, + User2 = #client{peername = {{192,168,0,10}, 3028}, id = <<"testClient">>, username = <<"TestUser">>}, + {matched, allow} = match(User, <<"Test/Topic">>, {allow, all}), {matched, deny} = match(User, <<"Test/Topic">>, {deny, all}), {matched, allow} = match(User, <<"Test/Topic">>, compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]})), @@ -179,4 +176,3 @@ match_rule(_) -> {matched, allow} = match(User, <<"Topic">>, AndRule), OrRule = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}), {matched, allow} = match(User, <<"Topic">>, OrRule). - diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index e77d689d4..af5c64949 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -149,7 +149,7 @@ start_session(_) -> {ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>), {ok, SessPid} = emqx_mock_client:start_session(ClientPid), Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>), - Message1 = Message#mqtt_message{packet_id = 1}, + Message1 = Message#message{id = 1}, emqx_session:publish(SessPid, Message1), emqx_session:pubrel(SessPid, 1), emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]), diff --git a/test/emqx_mqueue_SUITE.erl b/test/emqx_mqueue_SUITE.erl index d174d980e..5ab510633 100644 --- a/test/emqx_mqueue_SUITE.erl +++ b/test/emqx_mqueue_SUITE.erl @@ -35,22 +35,22 @@ t_in(_) -> {store_qos0, true}], Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), ?assert(?Q:is_empty(Q)), - Q1 = ?Q:in(#mqtt_message{}, Q), + Q1 = ?Q:in(#message{}, Q), ?assertEqual(1, ?Q:len(Q1)), - Q2 = ?Q:in(#mqtt_message{qos = 1}, Q1), + Q2 = ?Q:in(#message{qos = 1}, Q1), ?assertEqual(2, ?Q:len(Q2)), - Q3 = ?Q:in(#mqtt_message{qos = 2}, Q2), - Q4 = ?Q:in(#mqtt_message{}, Q3), - Q5 = ?Q:in(#mqtt_message{}, Q4), + Q3 = ?Q:in(#message{qos = 2}, Q2), + Q4 = ?Q:in(#message{}, Q3), + Q5 = ?Q:in(#message{}, Q4), ?assertEqual(5, ?Q:len(Q5)). t_in_qos0(_) -> Opts = [{max_length, 5}, {store_qos0, false}], Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), - Q1 = ?Q:in(#mqtt_message{}, Q), + Q1 = ?Q:in(#message{}, Q), ?assert(?Q:is_empty(Q1)), - Q2 = ?Q:in(#mqtt_message{qos = 0}, Q1), + Q2 = ?Q:in(#message{qos = 0}, Q1), ?assert(?Q:is_empty(Q2)). t_out(_) -> @@ -58,10 +58,10 @@ t_out(_) -> {store_qos0, true}], Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), {empty, Q} = ?Q:out(Q), - Q1 = ?Q:in(#mqtt_message{}, Q), + Q1 = ?Q:in(#message{}, Q), {Value, Q2} = ?Q:out(Q1), ?assertEqual(0, ?Q:len(Q2)), - ?assertEqual({value, #mqtt_message{}}, Value). + ?assertEqual({value, #message{}}, Value). t_simple_mqueue(_) -> Opts = [{type, simple}, @@ -74,13 +74,13 @@ t_simple_mqueue(_) -> ?assertEqual(3, ?Q:max_len(Q)), ?assertEqual(<<"simple_queue">>, ?Q:name(Q)), ?assert(?Q:is_empty(Q)), - Q1 = ?Q:in(#mqtt_message{qos = 1, payload = <<"1">>}, Q), - Q2 = ?Q:in(#mqtt_message{qos = 1, payload = <<"2">>}, Q1), - Q3 = ?Q:in(#mqtt_message{qos = 1, payload = <<"3">>}, Q2), - Q4 = ?Q:in(#mqtt_message{qos = 1, payload = <<"4">>}, Q3), + Q1 = ?Q:in(#message{qos = 1, payload = <<"1">>}, Q), + Q2 = ?Q:in(#message{qos = 1, payload = <<"2">>}, Q1), + Q3 = ?Q:in(#message{qos = 1, payload = <<"3">>}, Q2), + Q4 = ?Q:in(#message{qos = 1, payload = <<"4">>}, Q3), ?assertEqual(3, ?Q:len(Q4)), {{value, Msg}, Q5} = ?Q:out(Q4), - ?assertEqual(<<"2">>, Msg#mqtt_message.payload), + ?assertEqual(<<"2">>, Msg#message.payload), ?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)). t_infinity_simple_mqueue(_) -> @@ -93,12 +93,12 @@ t_infinity_simple_mqueue(_) -> ?assert(?Q:is_empty(Q)), ?assertEqual(0, ?Q:max_len(Q)), Qx = lists:foldl(fun(I, AccQ) -> - ?Q:in(#mqtt_message{qos = 1, payload = iolist_to_binary([I])}, AccQ) + ?Q:in(#message{qos = 1, payload = iolist_to_binary([I])}, AccQ) end, Q, lists:seq(1, 255)), ?assertEqual(255, ?Q:len(Qx)), ?assertEqual([{len, 255}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)), {{value, V}, _Qy} = ?Q:out(Qx), - ?assertEqual(<<1>>, V#mqtt_message.payload). + ?assertEqual(<<1>>, V#message.payload). t_priority_mqueue(_) -> Opts = [{type, priority}, @@ -113,18 +113,18 @@ t_priority_mqueue(_) -> ?assertEqual(<<"priority_queue">>, ?Q:name(Q)), ?assert(?Q:is_empty(Q)), - Q1 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q), - Q2 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t">>}, Q1), - Q3 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t2">>}, Q2), + Q1 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q), + Q2 = ?Q:in(#message{qos = 1, topic = <<"t">>}, Q1), + Q3 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q2), ?assertEqual(3, ?Q:len(Q3)), - Q4 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q3), + Q4 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q3), ?assertEqual(4, ?Q:len(Q4)), - Q5 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q4), + Q5 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q4), ?assertEqual(5, ?Q:len(Q5)), - Q6 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q5), + Q6 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q5), ?assertEqual(5, ?Q:len(Q6)), {{value, Msg}, _Q7} = ?Q:out(Q6), - ?assertEqual(<<"t">>, Msg#mqtt_message.topic). + ?assertEqual(<<"t">>, Msg#message.topic). t_infinity_priority_mqueue(_) -> Opts = [{type, priority}, @@ -135,8 +135,8 @@ t_infinity_priority_mqueue(_) -> ?assertEqual(0, ?Q:max_len(Q)), Qx = lists:foldl(fun(I, AccQ) -> AccQ1 = - ?Q:in(#mqtt_message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ), - ?Q:in(#mqtt_message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1) + ?Q:in(#message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ), + ?Q:in(#message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1) end, Q, lists:seq(1, 255)), ?assertEqual(510, ?Q:len(Qx)), ?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)). @@ -149,10 +149,10 @@ t_priority_mqueue2(_) -> {store_qos0, false}], Q = ?Q:new("priority_queue2_test", Opts, alarm_fun()), ?assertEqual(2, ?Q:max_len(Q)), - Q1 = ?Q:in(#mqtt_message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q), - Q2 = ?Q:in(#mqtt_message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1), - Q3 = ?Q:in(#mqtt_message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2), - Q4 = ?Q:in(#mqtt_message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3), + Q1 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q), + Q2 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1), + Q3 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2), + Q4 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3), ?assertEqual(4, ?Q:len(Q4)), {{value, _Val}, Q5} = ?Q:out(Q4), ?assertEqual(3, ?Q:len(Q5)). From a90403197905a3eba2c7584efdc68bc73c555bad Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sat, 18 Aug 2018 19:47:26 +0800 Subject: [PATCH 2/5] acl cache using proc_dict --- etc/emqx.conf | 18 ++- priv/emqx.schema | 21 +-- src/emqx_access_control.erl | 232 +++++++++++++++++++++++++++++++--- src/emqx_protocol.erl | 1 - test/emqx_access_SUITE.erl | 246 +++++++++++++++++++++++++++++++----- 5 files changed, 445 insertions(+), 73 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 4703f5083..33c06b5d3 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -434,16 +434,21 @@ acl_nomatch = allow ## Value: File Name acl_file = {{ platform_etc_dir }}/acl.conf -## Whether to enable ACL cache for publish. +## The ACL cache size +## The maximum count of ACL entries allowed for a client. +## Value 0 disables ACL cache ## -## Value: on | off -enable_acl_cache = on +## Value: Integer +## Default: 100 +acl_cache_size = 100 -## The ACL cache age. +## The ACL cache time-to-live. +## The time after which an ACL cache entry will be invalid ## ## Value: Duration -## Default: 5 minute -acl_cache_age = 5m +## Default: 1 minute +acl_cache_ttl = 1m + ##-------------------------------------------------------------------- ## MQTT Protocol @@ -1875,4 +1880,3 @@ sysmon.busy_port = false ## ## Value: true | false sysmon.busy_dist_port = true - diff --git a/priv/emqx.schema b/priv/emqx.schema index a0d2bc0e2..f26c012ce 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -571,23 +571,17 @@ end}. hidden ]}. -%% @doc Enable ACL cache for publish. -{mapping, "enable_acl_cache", "emqx.enable_acl_cache", [ - {default, on}, - {datatype, flag} -]}. - -%% @doc ACL cache age. -{mapping, "acl_cache_age", "emqx.acl_cache_age", [ - {default, "5m"}, +%% @doc ACL cache time-to-live. +{mapping, "acl_cache_ttl", "emqx.acl_cache_ttl", [ + {default, "1m"}, {datatype, {duration, ms}} ]}. %% @doc ACL cache size. -%% {mapping, "acl_cache_size", "emqx.acl_cache_size", [ -%% {default, 0}, -%% {datatype, integer} -%% ]}. +{mapping, "acl_cache_size", "emqx.acl_cache_size", [ + {default, 100}, + {datatype, integer} +]}. %%-------------------------------------------------------------------- %% MQTT Protocol @@ -1703,4 +1697,3 @@ end}. {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] end}. - diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index f43309088..5b52312ef 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -21,10 +21,18 @@ -export([start_link/0]). -export([authenticate/2]). -export([check_acl/3, reload_acl/0, lookup_mods/1]). --export([clean_acl_cache/1, clean_acl_cache/2]). -export([register_mod/3, register_mod/4, unregister_mod/2]). -export([stop/0]). +-export([get_acl_cache/2, + put_acl_cache/3, + delete_acl_cache/2, + cleanup_acl_cache/0, + dump_acl_cache/0, + get_cache_size/0, + get_newest_key/0 + ]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -33,6 +41,7 @@ -define(SERVER, ?MODULE). -type(password() :: undefined | binary()). +-type(acl_result() :: allow | deny). -record(state, {}). @@ -82,16 +91,19 @@ authenticate(Client, Password, [{Mod, State, _Seq} | Mods]) -> %% @doc Check ACL -spec(check_acl(client(), pubsub(), topic()) -> allow | deny). check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> - check_acl(Client, PubSub, Topic, lookup_mods(acl)). + CacheEnabled = (get_cache_max_size() =/= 0), + check_acl(Client, PubSub, Topic, lookup_mods(acl), CacheEnabled). -check_acl(#client{zone = Zone}, _PubSub, _Topic, []) -> - emqx_zone:get_env(Zone, acl_nomatch, deny); - -check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> - case Mod:check_acl({Client, PubSub, Topic}, State) of - allow -> allow; - deny -> deny; - ignore -> check_acl(Client, PubSub, Topic, AclMods) +check_acl(Client, PubSub, Topic, AclMods, false) -> + check_acl_from_plugins(Client, PubSub, Topic, AclMods); +check_acl(Client, PubSub, Topic, AclMods, true) -> + case get_acl_cache(PubSub, Topic) of + not_found -> + AclResult = check_acl_from_plugins(Client, PubSub, Topic, AclMods), + put_acl_cache(PubSub, Topic, AclResult), + AclResult; + AclResult -> + AclResult end. %% @doc Reload ACL Rules. @@ -130,12 +142,6 @@ tab_key(acl) -> acl_modules. stop() -> gen_server:stop(?MODULE, normal, infinity). -%%TODO: Support ACL cache... -clean_acl_cache(_ClientId) -> - ok. -clean_acl_cache(_ClientId, _Topic) -> - ok. - %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -193,6 +199,15 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +check_acl_from_plugins(#client{zone = Zone}, _PubSub, _Topic, []) -> + emqx_zone:get_env(Zone, acl_nomatch, deny); +check_acl_from_plugins(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> + case Mod:check_acl({Client, PubSub, Topic}, State) of + allow -> allow; + deny -> deny; + ignore -> check_acl_from_plugins(Client, PubSub, Topic, AclMods) + end. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -202,3 +217,188 @@ if_existed(false, Fun) -> if_existed(_Mod, _Fun) -> {error, already_existed}. +%%-------------------------------------------------------------------- +%% ACL cache +%%-------------------------------------------------------------------- + +-spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) + -> (acl_result() | not_found)). +get_acl_cache(PubSub, Topic) -> + case erlang:get({PubSub, Topic}) of + undefined -> not_found; + {AclResult, CachedAt, _NextK, _PrevK} -> + if_acl_cache_expired(CachedAt, + fun(false) -> + AclResult; + (true) -> + %% this expired entry will get updated in + %% put_acl_cache/3 + not_found + end) + end. + +-spec(put_acl_cache(PubSub :: publish | subscribe, + Topic :: topic(), AclResult :: acl_result()) -> ok). +put_acl_cache(PubSub, Topic, AclResult) -> + MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), + Size = get_cache_size(), + if + Size =:= 0 -> + create_first(PubSub, Topic, AclResult); + Size < MaxSize -> + append(PubSub, Topic, AclResult); + Size =:= MaxSize -> + %% when the cache get full, and also the latest one + %% is expired, we'll perform a cleanup. + NewestK = get_newest_key(), + {_AclResult, CachedAt, OldestK, _PrevK} = erlang:get(NewestK), + if_acl_cache_expired(CachedAt, + fun(true) -> + % try to cleanup first + cleanup_acl_cache(OldestK), + add_cache(PubSub, Topic, AclResult); + (false) -> + % cache full, perform cache replacement + delete_acl_cache(OldestK), + append(PubSub, Topic, AclResult) + end) + end. + +-spec(delete_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) -> ok). +delete_acl_cache(PubSub, Topic) -> + delete_acl_cache(_K = {PubSub, Topic}). +delete_acl_cache(K) -> + case erlang:get(K) of + undefined -> ok; + {_AclResult, _CachedAt, NextK, PrevK} when NextK =:= PrevK -> + %% there is only one entry in the cache + erlang:erase(K), + decr_cache_size(), + set_newest_key(undefined); + {_AclResult, _CachedAt, NextK, PrevK} -> + update_next(PrevK, NextK), + update_prev(NextK, PrevK), + erlang:erase(K), + + decr_cache_size(), + NewestK = get_newest_key(), + if + K =:= NewestK -> set_newest_key(NextK); + true -> ok + end + end. + +%% evict all the exipired cache entries +-spec(cleanup_acl_cache() -> ok). +cleanup_acl_cache() -> + case get_newest_key() of + undefined -> ok; + NewestK -> + {_AclResult, _CachedAt, OldestK, _PrevK} = erlang:get(NewestK), + cleanup_acl_cache(OldestK) + end. +cleanup_acl_cache(FromK) -> + case erlang:get(FromK) of + undefined -> ok; + {_AclResult, CachedAt, NextK, _PrevK} -> + if_acl_cache_expired(CachedAt, + fun(false) -> + ok; + (true) -> + delete_acl_cache(FromK), + cleanup_acl_cache(NextK) + end) + end. + +%% for test only +dump_acl_cache() -> + [R || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish + orelse SubPub =:= subscribe]. + +add_cache(PubSub, Topic, AclResult) -> + Size = get_cache_size(), + MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), + if + Size =:= 0 -> + create_first(PubSub, Topic, AclResult); + Size =:= MaxSize -> + OldestK = get_next_key(get_newest_key()), + delete_acl_cache(OldestK), + case get_cache_size() =:= 0 of + true -> create_first(PubSub, Topic, AclResult); + false -> append(PubSub, Topic, AclResult) + end; + true -> + append(PubSub, Topic, AclResult) + end. + +create_first(PubSub, Topic, AclResult) -> + K = cache_k(PubSub, Topic), + V = cache_v(AclResult, _NextK = K, _PrevK = K), + erlang:put(K, V), + set_cache_size(1), + set_newest_key(K). + +append(PubSub, Topic, AclResult) -> + %% try to update the existing one: + %% - we delete it and then append it at the tail + delete_acl_cache(PubSub, Topic), + + case get_cache_size() =:= 0 of + true -> create_first(PubSub, Topic, AclResult); + false -> + NewestK = get_newest_key(), + OldestK = get_next_key(NewestK), + K = cache_k(PubSub, Topic), + V = cache_v(AclResult, OldestK, NewestK), + erlang:put(K, V), + + update_next(NewestK, K), + update_prev(OldestK, K), + incr_cache_size(), + set_newest_key(K) + end. + +get_next_key(K) -> + erlang:element(3, erlang:get(K)). +update_next(K, NextK) -> + NoNext = erlang:delete_element(3, erlang:get(K)), + erlang:put(K, erlang:insert_element(3, NoNext, NextK)). +update_prev(K, PrevK) -> + NoPrev = erlang:delete_element(4, erlang:get(K)), + erlang:put(K, erlang:insert_element(4, NoPrev, PrevK)). + +cache_k(PubSub, Topic)-> {PubSub, Topic}. +cache_v(AclResult, NextK, PrevK)-> {AclResult, time_now(), NextK, PrevK}. + +get_cache_max_size() -> + application:get_env(emqx, acl_cache_size, 100). + +get_cache_size() -> + case erlang:get(acl_cache_size) of + undefined -> 0; + Size -> Size + end. +incr_cache_size() -> + erlang:put(acl_cache_size, get_cache_size() + 1), ok. +decr_cache_size() -> + erlang:put(acl_cache_size, get_cache_size() - 1), ok. +set_cache_size(N) -> + erlang:put(acl_cache_size, N), ok. + +get_newest_key() -> + erlang:get(acl_cache_newest_key). + +set_newest_key(Key) -> + erlang:put(acl_cache_newest_key, Key), ok. + +time_now() -> erlang:system_time(millisecond). + +if_acl_cache_expired(CachedAt, Fun) -> + TTL = application:get_env(emqx, acl_cache_ttl, 60000), + Now = time_now(), + if (CachedAt + TTL) =< Now -> + Fun(true); + true -> + Fun(false) + end. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 3faa7781a..941baaa7d 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -662,4 +662,3 @@ feed_var({<<"%u">>, Username}, MountPoint) -> sp(true) -> 1; sp(false) -> 0. - diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index f206cca05..f2ca9fad6 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -29,27 +29,54 @@ all() -> [{group, access_control}, - {group, access_rule}]. + {group, acl_cache}, + {group, access_control_cache_mode}, + {group, access_rule} + ]. groups() -> [{access_control, [sequence], - [reload_acl, - register_mod, - unregister_mod, - check_acl]}, + [reload_acl, + register_mod, + unregister_mod, + check_acl_1, + check_acl_2 + ]}, + {access_control_cache_mode, [], + [ + acl_cache_basic, + acl_cache_expiry, + acl_cache_cleanup, + acl_cache_full + ]}, + {acl_cache, [], [ + put_get_del_cache, + cache_update, + cache_expiry, + cache_full_replacement, + cache_cleanup, + cache_full_cleanup + ]}, {access_rule, [], - [compile_rule, - match_rule]}]. + [compile_rule, + match_rule]}]. -init_per_group(access_control, Config) -> +init_per_group(Group, Config) when Group =:= access_control; + Group =:= access_control_cache_mode -> + prepare_config(Group), application:load(emqx), - prepare_config(), Config; - init_per_group(_Group, Config) -> Config. -prepare_config() -> +prepare_config(Group = access_control) -> + set_acl_config_file(Group), + application:set_env(emqx, acl_cache_size, 0); +prepare_config(Group = access_control_cache_mode) -> + set_acl_config_file(Group), + application:set_env(emqx, acl_cache_size, 100). + +set_acl_config_file(_Group) -> Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}, {allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}, {allow, {user, "admin"}, pubsub, ["a/b/c", "d/e/f/#"]}, @@ -59,8 +86,8 @@ prepare_config() -> {deny, all, subscribe, ["$SYS/#", "#"]}, {deny, all}], write_config("access_SUITE_acl.conf", Rules), - application:set_env(emqx, acl_file, "access_SUITE_acl.conf"), - application:set_env(emqx, acl_cache, false). + application:set_env(emqx, acl_file, "access_SUITE_acl.conf"). + write_config(Filename, Terms) -> file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]). @@ -68,24 +95,18 @@ write_config(Filename, Terms) -> end_per_group(_Group, Config) -> Config. -init_per_testcase(TestCase, Config) when TestCase =:= reload_acl; - TestCase =:= register_mod; - TestCase =:= unregister_mod; - TestCase =:= check_acl -> - {ok, _Pid} = ?AC:start_link(), Config; - init_per_testcase(_TestCase, Config) -> + {ok, _Pid} = ?AC:start_link(), Config. - -end_per_testcase(TestCase, _Config) when TestCase =:= reload_acl; - TestCase =:= register_mod; - TestCase =:= unregister_mod; - TestCase =:= check_acl -> - ?AC:stop(); - end_per_testcase(_TestCase, _Config) -> ok. +per_testcase_config(acl_cache_full, Config) -> + Config; +per_testcase_config(_TestCase, Config) -> + Config. + + %%-------------------------------------------------------------------- %% emqx_access_control %%-------------------------------------------------------------------- @@ -115,15 +136,170 @@ unregister_mod(_) -> timer:sleep(5), [] = ?AC:lookup_mods(auth). -check_acl(_) -> - User1 = #client{id = <<"client1">>, username = <<"testuser">>}, - User2 = #client{id = <<"client2">>, username = <<"xyz">>}, - allow = ?AC:check_acl(User1, subscribe, <<"users/testuser/1">>), - allow = ?AC:check_acl(User1, subscribe, <<"clients/client1">>), - deny = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>), - allow = ?AC:check_acl(User1, publish, <<"users/testuser/1">>), - allow = ?AC:check_acl(User1, subscribe, <<"a/b/c">>), - deny = ?AC:check_acl(User2, subscribe, <<"a/b/c">>). +check_acl_1(_) -> + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>), + allow = ?AC:check_acl(SelfUser, publish, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). +check_acl_2(_) -> + SelfUser = #client{id = <<"client2">>, username = <<"xyz">>}, + deny = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). + +acl_cache_basic(_) -> + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), + not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + + allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + + allow = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + ok. + +acl_cache_expiry(_) -> + application:set_env(emqx, acl_cache_ttl, 1000), + + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + ct:sleep(1100), + not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + ok. + +acl_cache_full(_) -> + application:set_env(emqx, acl_cache_size, 1), + + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + + %% the older ones (the <<"users/testuser/1">>) will be evicted first + not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + ok. + +acl_cache_cleanup(_) -> + %% The acl cache will try to evict memory, if the size is full and the newest + %% cache entry is expired + application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_size, 2), + + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + + allow = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + + ct:sleep(1100), + %% now the cache is full and the newest one - "clients/client1" + %% should be expired, so we'll try to cleanup before putting the next cache entry + deny = ?AC:check_acl(SelfUser, subscribe, <<"#">>), + + not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), + not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + deny = ?AC:get_acl_cache(subscribe, <<"#">>), + ok. + +put_get_del_cache(_) -> + application:set_env(emqx, acl_cache_ttl, 300000), + application:set_env(emqx, acl_cache_size, 30), + + not_found = ?AC:get_acl_cache(publish, <<"a">>), + ok = ?AC:put_acl_cache(publish, <<"a">>, allow), + allow = ?AC:get_acl_cache(publish, <<"a">>), + + not_found = ?AC:get_acl_cache(subscribe, <<"b">>), + ok = ?AC:put_acl_cache(subscribe, <<"b">>, deny), + deny = ?AC:get_acl_cache(subscribe, <<"b">>), + + 2 = ?AC:get_cache_size(), + {subscribe, <<"b">>} = ?AC:get_newest_key(). + +cache_expiry(_) -> + application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_size, 30), + ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), + allow = ?AC:get_acl_cache(subscribe, <<"a">>), + + ct:sleep(1100), + not_found = ?AC:get_acl_cache(subscribe, <<"a">>), + + ok = ?AC:put_acl_cache(subscribe, <<"a">>, deny), + deny = ?AC:get_acl_cache(subscribe, <<"a">>), + + ct:sleep(1100), + not_found = ?AC:get_acl_cache(subscribe, <<"a">>). + +cache_update(_) -> + application:set_env(emqx, acl_cache_ttl, 300000), + application:set_env(emqx, acl_cache_size, 30), + [] = ?AC:dump_acl_cache(), + + ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?AC:put_acl_cache(publish, <<"b">>, allow), + ok = ?AC:put_acl_cache(publish, <<"c">>, allow), + 3 = ?AC:get_cache_size(), + {publish, <<"c">>} = ?AC:get_newest_key(), + + %% update the 2nd one + ok = ?AC:put_acl_cache(publish, <<"b">>, allow), + %ct:pal("dump acl cache: ~p~n", [?AC:dump_acl_cache()]), + + 3 = ?AC:get_cache_size(), + {publish, <<"b">>} = ?AC:get_newest_key(). + +cache_full_replacement(_) -> + application:set_env(emqx, acl_cache_ttl, 300000), + application:set_env(emqx, acl_cache_size, 3), + ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?AC:put_acl_cache(publish, <<"b">>, allow), + ok = ?AC:put_acl_cache(publish, <<"c">>, allow), + allow = ?AC:get_acl_cache(subscribe, <<"a">>), + allow = ?AC:get_acl_cache(publish, <<"b">>), + allow = ?AC:get_acl_cache(publish, <<"c">>), + 3 = ?AC:get_cache_size(), + {publish, <<"c">>} = ?AC:get_newest_key(), + + ok = ?AC:put_acl_cache(publish, <<"d">>, deny), + 3 = ?AC:get_cache_size(), + {publish, <<"d">>} = ?AC:get_newest_key(), + + ok = ?AC:put_acl_cache(publish, <<"e">>, deny), + 3 = ?AC:get_cache_size(), + {publish, <<"e">>} = ?AC:get_newest_key(), + + not_found = ?AC:get_acl_cache(subscribe, <<"a">>), + not_found = ?AC:get_acl_cache(publish, <<"b">>), + allow = ?AC:get_acl_cache(publish, <<"c">>). + +cache_cleanup(_) -> + application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_size, 30), + ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?AC:put_acl_cache(publish, <<"b">>, allow), + ok = ?AC:put_acl_cache(publish, <<"c">>, allow), + 3 = ?AC:get_cache_size(), + + ct:sleep(1100), + ?AC:cleanup_acl_cache(), + 0 = ?AC:get_cache_size(). + +cache_full_cleanup(_) -> + application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_size, 3), + ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?AC:put_acl_cache(publish, <<"b">>, allow), + ok = ?AC:put_acl_cache(publish, <<"c">>, allow), + 3 = ?AC:get_cache_size(), + + ct:sleep(1100), + %% verify auto cleanup upon cache full + ok = ?AC:put_acl_cache(subscribe, <<"d">>, deny), + 1 = ?AC:get_cache_size(). %%-------------------------------------------------------------------- %% emqx_access_rule From 8cd20744bed0ebf4c1ed19587a7d0106e55d0164 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sun, 26 Aug 2018 12:40:23 +0800 Subject: [PATCH 3/5] improve cache datastruct using keys-queue --- etc/emqx.conf | 5 +- priv/emqx.schema | 4 +- src/emqx_access_control.erl | 218 +++++++++++++++++------------------- test/emqx_access_SUITE.erl | 33 +++--- 4 files changed, 123 insertions(+), 137 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 33c06b5d3..62f94b174 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -436,11 +436,12 @@ acl_file = {{ platform_etc_dir }}/acl.conf ## The ACL cache size ## The maximum count of ACL entries allowed for a client. +## ## Value 0 disables ACL cache ## ## Value: Integer -## Default: 100 -acl_cache_size = 100 +## Default: 32 +acl_cache_max_size = 32 ## The ACL cache time-to-live. ## The time after which an ACL cache entry will be invalid diff --git a/priv/emqx.schema b/priv/emqx.schema index f26c012ce..f6ee7c621 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -578,8 +578,8 @@ end}. ]}. %% @doc ACL cache size. -{mapping, "acl_cache_size", "emqx.acl_cache_size", [ - {default, 100}, +{mapping, "acl_cache_max_size", "emqx.acl_cache_max_size", [ + {default, 32}, {datatype, integer} ]}. diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 5b52312ef..07999b0fb 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -26,11 +26,13 @@ -export([get_acl_cache/2, put_acl_cache/3, - delete_acl_cache/2, cleanup_acl_cache/0, dump_acl_cache/0, get_cache_size/0, - get_newest_key/0 + get_newest_key/0, + get_oldest_key/0, + cache_k/2, + cache_v/1 ]). %% gen_server callbacks @@ -221,158 +223,124 @@ if_existed(_Mod, _Fun) -> %% ACL cache %%-------------------------------------------------------------------- +%% We'll cleanup the cache before repalcing an expired acl. -spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) -> (acl_result() | not_found)). get_acl_cache(PubSub, Topic) -> - case erlang:get({PubSub, Topic}) of + case erlang:get(cache_k(PubSub, Topic)) of undefined -> not_found; - {AclResult, CachedAt, _NextK, _PrevK} -> - if_acl_cache_expired(CachedAt, + {AclResult, CachedAt} -> + if_expired(CachedAt, fun(false) -> AclResult; (true) -> - %% this expired entry will get updated in - %% put_acl_cache/3 + cleanup_acl_cache(), not_found end) end. +%% If the cache get full, and also the latest one +%% is expired, then delete all the cache entries -spec(put_acl_cache(PubSub :: publish | subscribe, Topic :: topic(), AclResult :: acl_result()) -> ok). put_acl_cache(PubSub, Topic, AclResult) -> MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), Size = get_cache_size(), if - Size =:= 0 -> - create_first(PubSub, Topic, AclResult); Size < MaxSize -> - append(PubSub, Topic, AclResult); + add_acl_cache(PubSub, Topic, AclResult); Size =:= MaxSize -> - %% when the cache get full, and also the latest one - %% is expired, we'll perform a cleanup. NewestK = get_newest_key(), - {_AclResult, CachedAt, OldestK, _PrevK} = erlang:get(NewestK), - if_acl_cache_expired(CachedAt, + {_AclResult, CachedAt} = erlang:get(NewestK), + if_expired(CachedAt, fun(true) -> - % try to cleanup first - cleanup_acl_cache(OldestK), - add_cache(PubSub, Topic, AclResult); + % all cache expired, cleanup first + empty_acl_cache(), + add_acl_cache(PubSub, Topic, AclResult); (false) -> % cache full, perform cache replacement - delete_acl_cache(OldestK), - append(PubSub, Topic, AclResult) + evict_acl_cache(), + add_acl_cache(PubSub, Topic, AclResult) end) end. --spec(delete_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) -> ok). -delete_acl_cache(PubSub, Topic) -> - delete_acl_cache(_K = {PubSub, Topic}). -delete_acl_cache(K) -> - case erlang:get(K) of - undefined -> ok; - {_AclResult, _CachedAt, NextK, PrevK} when NextK =:= PrevK -> - %% there is only one entry in the cache - erlang:erase(K), - decr_cache_size(), - set_newest_key(undefined); - {_AclResult, _CachedAt, NextK, PrevK} -> - update_next(PrevK, NextK), - update_prev(NextK, PrevK), - erlang:erase(K), +empty_acl_cache() -> + map_acl_cache(fun({CacheK, _CacheV}) -> + erlang:erase(CacheK) + end), + set_cache_size(0), + set_keys_queue(queue:new()). - decr_cache_size(), - NewestK = get_newest_key(), - if - K =:= NewestK -> set_newest_key(NextK); - true -> ok - end +evict_acl_cache() -> + {{value, OldestK}, RemKeys} = queue:out(get_keys_queue()), + set_keys_queue(RemKeys), + erlang:erase(OldestK), + decr_cache_size(). + +add_acl_cache(PubSub, Topic, AclResult) -> + K = cache_k(PubSub, Topic), + V = cache_v(AclResult), + case get(K) of + undefined -> add_new_acl(K, V); + {_AclResult, _CachedAt} -> + update_acl(K, V) end. -%% evict all the exipired cache entries +add_new_acl(K, V) -> + erlang:put(K, V), + keys_queue_in(K), + incr_cache_size(). + +update_acl(K, V) -> + erlang:put(K, V), + keys_queue_update(K). + +%% cleanup all the exipired cache entries -spec(cleanup_acl_cache() -> ok). cleanup_acl_cache() -> - case get_newest_key() of - undefined -> ok; - NewestK -> - {_AclResult, _CachedAt, OldestK, _PrevK} = erlang:get(NewestK), - cleanup_acl_cache(OldestK) - end. -cleanup_acl_cache(FromK) -> - case erlang:get(FromK) of - undefined -> ok; - {_AclResult, CachedAt, NextK, _PrevK} -> - if_acl_cache_expired(CachedAt, - fun(false) -> - ok; + set_keys_queue( + cleanup_acl_cache(get_keys_queue())). + +cleanup_acl_cache(KeysQ) -> + case queue:out(KeysQ) of + {{value, OldestK}, RemKeys} -> + {_AclResult, CachedAt} = erlang:get(OldestK), + if_expired(CachedAt, + fun(false) -> KeysQ; (true) -> - delete_acl_cache(FromK), - cleanup_acl_cache(NextK) - end) + erlang:erase(OldestK), + decr_cache_size(), + cleanup_acl_cache(RemKeys) + end); + {empty, KeysQ} -> KeysQ + end. + +get_newest_key() -> + get_key(fun(KeysQ) -> queue:get_r(KeysQ) end). + +get_oldest_key() -> + get_key(fun(KeysQ) -> queue:get(KeysQ) end). + +get_key(Pick) -> + KeysQ = get_keys_queue(), + case queue:is_empty(KeysQ) of + true -> undefined; + false -> Pick(KeysQ) end. %% for test only dump_acl_cache() -> - [R || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish + map_acl_cache(fun(Cache) -> Cache end). +map_acl_cache(Fun) -> + [Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish orelse SubPub =:= subscribe]. -add_cache(PubSub, Topic, AclResult) -> - Size = get_cache_size(), - MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), - if - Size =:= 0 -> - create_first(PubSub, Topic, AclResult); - Size =:= MaxSize -> - OldestK = get_next_key(get_newest_key()), - delete_acl_cache(OldestK), - case get_cache_size() =:= 0 of - true -> create_first(PubSub, Topic, AclResult); - false -> append(PubSub, Topic, AclResult) - end; - true -> - append(PubSub, Topic, AclResult) - end. - -create_first(PubSub, Topic, AclResult) -> - K = cache_k(PubSub, Topic), - V = cache_v(AclResult, _NextK = K, _PrevK = K), - erlang:put(K, V), - set_cache_size(1), - set_newest_key(K). - -append(PubSub, Topic, AclResult) -> - %% try to update the existing one: - %% - we delete it and then append it at the tail - delete_acl_cache(PubSub, Topic), - - case get_cache_size() =:= 0 of - true -> create_first(PubSub, Topic, AclResult); - false -> - NewestK = get_newest_key(), - OldestK = get_next_key(NewestK), - K = cache_k(PubSub, Topic), - V = cache_v(AclResult, OldestK, NewestK), - erlang:put(K, V), - - update_next(NewestK, K), - update_prev(OldestK, K), - incr_cache_size(), - set_newest_key(K) - end. - -get_next_key(K) -> - erlang:element(3, erlang:get(K)). -update_next(K, NextK) -> - NoNext = erlang:delete_element(3, erlang:get(K)), - erlang:put(K, erlang:insert_element(3, NoNext, NextK)). -update_prev(K, PrevK) -> - NoPrev = erlang:delete_element(4, erlang:get(K)), - erlang:put(K, erlang:insert_element(4, NoPrev, PrevK)). cache_k(PubSub, Topic)-> {PubSub, Topic}. -cache_v(AclResult, NextK, PrevK)-> {AclResult, time_now(), NextK, PrevK}. +cache_v(AclResult)-> {AclResult, time_now()}. get_cache_max_size() -> - application:get_env(emqx, acl_cache_size, 100). + application:get_env(emqx, acl_cache_max_size, 0). get_cache_size() -> case erlang:get(acl_cache_size) of @@ -386,15 +354,31 @@ decr_cache_size() -> set_cache_size(N) -> erlang:put(acl_cache_size, N), ok. -get_newest_key() -> - erlang:get(acl_cache_newest_key). +keys_queue_in(Key) -> + %% delete the key first if exists + KeysQ = get_keys_queue(), + set_keys_queue(queue:in(Key, KeysQ)). -set_newest_key(Key) -> - erlang:put(acl_cache_newest_key, Key), ok. +keys_queue_update(Key) -> + NewKeysQ = remove_key(Key, get_keys_queue()), + set_keys_queue(queue:in(Key, NewKeysQ)). + +remove_key(Key, KeysQ) -> + queue:filter(fun + (K) when K =:= Key -> false; (_) -> true + end, KeysQ). + +set_keys_queue(KeysQ) -> + erlang:put(acl_keys_q, KeysQ), ok. +get_keys_queue() -> + case erlang:get(acl_keys_q) of + undefined -> queue:new(); + KeysQ -> KeysQ + end. time_now() -> erlang:system_time(millisecond). -if_acl_cache_expired(CachedAt, Fun) -> +if_expired(CachedAt, Fun) -> TTL = application:get_env(emqx, acl_cache_ttl, 60000), Now = time_now(), if (CachedAt + TTL) =< Now -> diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index f2ca9fad6..c3907c2db 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -22,6 +22,7 @@ -include("emqx.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). -define(AC, emqx_access_control). @@ -71,10 +72,10 @@ init_per_group(_Group, Config) -> prepare_config(Group = access_control) -> set_acl_config_file(Group), - application:set_env(emqx, acl_cache_size, 0); + application:set_env(emqx, acl_cache_max_size, 0); prepare_config(Group = access_control_cache_mode) -> set_acl_config_file(Group), - application:set_env(emqx, acl_cache_size, 100). + application:set_env(emqx, acl_cache_max_size, 100). set_acl_config_file(_Group) -> Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}, @@ -170,7 +171,7 @@ acl_cache_expiry(_) -> ok. acl_cache_full(_) -> - application:set_env(emqx, acl_cache_size, 1), + application:set_env(emqx, acl_cache_max_size, 1), SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), @@ -185,7 +186,7 @@ acl_cache_cleanup(_) -> %% The acl cache will try to evict memory, if the size is full and the newest %% cache entry is expired application:set_env(emqx, acl_cache_ttl, 1000), - application:set_env(emqx, acl_cache_size, 2), + application:set_env(emqx, acl_cache_max_size, 2), SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), @@ -206,7 +207,7 @@ acl_cache_cleanup(_) -> put_get_del_cache(_) -> application:set_env(emqx, acl_cache_ttl, 300000), - application:set_env(emqx, acl_cache_size, 30), + application:set_env(emqx, acl_cache_max_size, 30), not_found = ?AC:get_acl_cache(publish, <<"a">>), ok = ?AC:put_acl_cache(publish, <<"a">>, allow), @@ -217,11 +218,11 @@ put_get_del_cache(_) -> deny = ?AC:get_acl_cache(subscribe, <<"b">>), 2 = ?AC:get_cache_size(), - {subscribe, <<"b">>} = ?AC:get_newest_key(). + ?assertEqual(?AC:cache_k(subscribe, <<"b">>), ?AC:get_newest_key()). cache_expiry(_) -> application:set_env(emqx, acl_cache_ttl, 1000), - application:set_env(emqx, acl_cache_size, 30), + application:set_env(emqx, acl_cache_max_size, 30), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), allow = ?AC:get_acl_cache(subscribe, <<"a">>), @@ -236,25 +237,25 @@ cache_expiry(_) -> cache_update(_) -> application:set_env(emqx, acl_cache_ttl, 300000), - application:set_env(emqx, acl_cache_size, 30), + application:set_env(emqx, acl_cache_max_size, 30), [] = ?AC:dump_acl_cache(), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), ok = ?AC:put_acl_cache(publish, <<"b">>, allow), ok = ?AC:put_acl_cache(publish, <<"c">>, allow), 3 = ?AC:get_cache_size(), - {publish, <<"c">>} = ?AC:get_newest_key(), + ?assertEqual(?AC:cache_k(publish, <<"c">>), ?AC:get_newest_key()), %% update the 2nd one ok = ?AC:put_acl_cache(publish, <<"b">>, allow), %ct:pal("dump acl cache: ~p~n", [?AC:dump_acl_cache()]), 3 = ?AC:get_cache_size(), - {publish, <<"b">>} = ?AC:get_newest_key(). + ?assertEqual(?AC:cache_k(publish, <<"b">>), ?AC:get_newest_key()). cache_full_replacement(_) -> application:set_env(emqx, acl_cache_ttl, 300000), - application:set_env(emqx, acl_cache_size, 3), + application:set_env(emqx, acl_cache_max_size, 3), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), ok = ?AC:put_acl_cache(publish, <<"b">>, allow), ok = ?AC:put_acl_cache(publish, <<"c">>, allow), @@ -262,15 +263,15 @@ cache_full_replacement(_) -> allow = ?AC:get_acl_cache(publish, <<"b">>), allow = ?AC:get_acl_cache(publish, <<"c">>), 3 = ?AC:get_cache_size(), - {publish, <<"c">>} = ?AC:get_newest_key(), + ?assertEqual(?AC:cache_k(publish, <<"c">>), ?AC:get_newest_key()), ok = ?AC:put_acl_cache(publish, <<"d">>, deny), 3 = ?AC:get_cache_size(), - {publish, <<"d">>} = ?AC:get_newest_key(), + ?assertEqual(?AC:cache_k(publish, <<"d">>), ?AC:get_newest_key()), ok = ?AC:put_acl_cache(publish, <<"e">>, deny), 3 = ?AC:get_cache_size(), - {publish, <<"e">>} = ?AC:get_newest_key(), + ?assertEqual(?AC:cache_k(publish, <<"e">>), ?AC:get_newest_key()), not_found = ?AC:get_acl_cache(subscribe, <<"a">>), not_found = ?AC:get_acl_cache(publish, <<"b">>), @@ -278,7 +279,7 @@ cache_full_replacement(_) -> cache_cleanup(_) -> application:set_env(emqx, acl_cache_ttl, 1000), - application:set_env(emqx, acl_cache_size, 30), + application:set_env(emqx, acl_cache_max_size, 30), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), ok = ?AC:put_acl_cache(publish, <<"b">>, allow), ok = ?AC:put_acl_cache(publish, <<"c">>, allow), @@ -290,7 +291,7 @@ cache_cleanup(_) -> cache_full_cleanup(_) -> application:set_env(emqx, acl_cache_ttl, 1000), - application:set_env(emqx, acl_cache_size, 3), + application:set_env(emqx, acl_cache_max_size, 3), ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), ok = ?AC:put_acl_cache(publish, <<"b">>, allow), ok = ?AC:put_acl_cache(publish, <<"c">>, allow), From 9717f9b83e48db58172f55864b6b097a95b82199 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sun, 26 Aug 2018 15:46:22 +0800 Subject: [PATCH 4/5] add module emqx_acl_cache --- src/emqx_access_control.erl | 186 +------------------------------- src/emqx_acl_cache.erl | 204 ++++++++++++++++++++++++++++++++++++ test/emqx_access_SUITE.erl | 133 +++++++++++------------ 3 files changed, 274 insertions(+), 249 deletions(-) create mode 100644 src/emqx_acl_cache.erl diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 07999b0fb..1c5d04b4e 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -24,17 +24,6 @@ -export([register_mod/3, register_mod/4, unregister_mod/2]). -export([stop/0]). --export([get_acl_cache/2, - put_acl_cache/3, - cleanup_acl_cache/0, - dump_acl_cache/0, - get_cache_size/0, - get_newest_key/0, - get_oldest_key/0, - cache_k/2, - cache_v/1 - ]). - %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -43,7 +32,6 @@ -define(SERVER, ?MODULE). -type(password() :: undefined | binary()). --type(acl_result() :: allow | deny). -record(state, {}). @@ -93,16 +81,16 @@ authenticate(Client, Password, [{Mod, State, _Seq} | Mods]) -> %% @doc Check ACL -spec(check_acl(client(), pubsub(), topic()) -> allow | deny). check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> - CacheEnabled = (get_cache_max_size() =/= 0), + CacheEnabled = (emqx_acl_cache:get_cache_max_size() =/= 0), check_acl(Client, PubSub, Topic, lookup_mods(acl), CacheEnabled). check_acl(Client, PubSub, Topic, AclMods, false) -> check_acl_from_plugins(Client, PubSub, Topic, AclMods); check_acl(Client, PubSub, Topic, AclMods, true) -> - case get_acl_cache(PubSub, Topic) of + case emqx_acl_cache:get_acl_cache(PubSub, Topic) of not_found -> AclResult = check_acl_from_plugins(Client, PubSub, Topic, AclMods), - put_acl_cache(PubSub, Topic, AclResult), + emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult), AclResult; AclResult -> AclResult @@ -218,171 +206,3 @@ if_existed(false, Fun) -> Fun(); if_existed(_Mod, _Fun) -> {error, already_existed}. - -%%-------------------------------------------------------------------- -%% ACL cache -%%-------------------------------------------------------------------- - -%% We'll cleanup the cache before repalcing an expired acl. --spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) - -> (acl_result() | not_found)). -get_acl_cache(PubSub, Topic) -> - case erlang:get(cache_k(PubSub, Topic)) of - undefined -> not_found; - {AclResult, CachedAt} -> - if_expired(CachedAt, - fun(false) -> - AclResult; - (true) -> - cleanup_acl_cache(), - not_found - end) - end. - -%% If the cache get full, and also the latest one -%% is expired, then delete all the cache entries --spec(put_acl_cache(PubSub :: publish | subscribe, - Topic :: topic(), AclResult :: acl_result()) -> ok). -put_acl_cache(PubSub, Topic, AclResult) -> - MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), - Size = get_cache_size(), - if - Size < MaxSize -> - add_acl_cache(PubSub, Topic, AclResult); - Size =:= MaxSize -> - NewestK = get_newest_key(), - {_AclResult, CachedAt} = erlang:get(NewestK), - if_expired(CachedAt, - fun(true) -> - % all cache expired, cleanup first - empty_acl_cache(), - add_acl_cache(PubSub, Topic, AclResult); - (false) -> - % cache full, perform cache replacement - evict_acl_cache(), - add_acl_cache(PubSub, Topic, AclResult) - end) - end. - -empty_acl_cache() -> - map_acl_cache(fun({CacheK, _CacheV}) -> - erlang:erase(CacheK) - end), - set_cache_size(0), - set_keys_queue(queue:new()). - -evict_acl_cache() -> - {{value, OldestK}, RemKeys} = queue:out(get_keys_queue()), - set_keys_queue(RemKeys), - erlang:erase(OldestK), - decr_cache_size(). - -add_acl_cache(PubSub, Topic, AclResult) -> - K = cache_k(PubSub, Topic), - V = cache_v(AclResult), - case get(K) of - undefined -> add_new_acl(K, V); - {_AclResult, _CachedAt} -> - update_acl(K, V) - end. - -add_new_acl(K, V) -> - erlang:put(K, V), - keys_queue_in(K), - incr_cache_size(). - -update_acl(K, V) -> - erlang:put(K, V), - keys_queue_update(K). - -%% cleanup all the exipired cache entries --spec(cleanup_acl_cache() -> ok). -cleanup_acl_cache() -> - set_keys_queue( - cleanup_acl_cache(get_keys_queue())). - -cleanup_acl_cache(KeysQ) -> - case queue:out(KeysQ) of - {{value, OldestK}, RemKeys} -> - {_AclResult, CachedAt} = erlang:get(OldestK), - if_expired(CachedAt, - fun(false) -> KeysQ; - (true) -> - erlang:erase(OldestK), - decr_cache_size(), - cleanup_acl_cache(RemKeys) - end); - {empty, KeysQ} -> KeysQ - end. - -get_newest_key() -> - get_key(fun(KeysQ) -> queue:get_r(KeysQ) end). - -get_oldest_key() -> - get_key(fun(KeysQ) -> queue:get(KeysQ) end). - -get_key(Pick) -> - KeysQ = get_keys_queue(), - case queue:is_empty(KeysQ) of - true -> undefined; - false -> Pick(KeysQ) - end. - -%% for test only -dump_acl_cache() -> - map_acl_cache(fun(Cache) -> Cache end). -map_acl_cache(Fun) -> - [Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish - orelse SubPub =:= subscribe]. - - -cache_k(PubSub, Topic)-> {PubSub, Topic}. -cache_v(AclResult)-> {AclResult, time_now()}. - -get_cache_max_size() -> - application:get_env(emqx, acl_cache_max_size, 0). - -get_cache_size() -> - case erlang:get(acl_cache_size) of - undefined -> 0; - Size -> Size - end. -incr_cache_size() -> - erlang:put(acl_cache_size, get_cache_size() + 1), ok. -decr_cache_size() -> - erlang:put(acl_cache_size, get_cache_size() - 1), ok. -set_cache_size(N) -> - erlang:put(acl_cache_size, N), ok. - -keys_queue_in(Key) -> - %% delete the key first if exists - KeysQ = get_keys_queue(), - set_keys_queue(queue:in(Key, KeysQ)). - -keys_queue_update(Key) -> - NewKeysQ = remove_key(Key, get_keys_queue()), - set_keys_queue(queue:in(Key, NewKeysQ)). - -remove_key(Key, KeysQ) -> - queue:filter(fun - (K) when K =:= Key -> false; (_) -> true - end, KeysQ). - -set_keys_queue(KeysQ) -> - erlang:put(acl_keys_q, KeysQ), ok. -get_keys_queue() -> - case erlang:get(acl_keys_q) of - undefined -> queue:new(); - KeysQ -> KeysQ - end. - -time_now() -> erlang:system_time(millisecond). - -if_expired(CachedAt, Fun) -> - TTL = application:get_env(emqx, acl_cache_ttl, 60000), - Now = time_now(), - if (CachedAt + TTL) =< Now -> - Fun(true); - true -> - Fun(false) - end. diff --git a/src/emqx_acl_cache.erl b/src/emqx_acl_cache.erl new file mode 100644 index 000000000..7db23fb1e --- /dev/null +++ b/src/emqx_acl_cache.erl @@ -0,0 +1,204 @@ +-module(emqx_acl_cache). + +-include("emqx.hrl"). + +-export([ get_acl_cache/2 + , put_acl_cache/3 + , cleanup_acl_cache/0 + , empty_acl_cache/0 + , dump_acl_cache/0 + , get_cache_size/0 + , get_cache_max_size/0 + , get_newest_key/0 + , get_oldest_key/0 + , cache_k/2 + , cache_v/1 + ]). + +-type(acl_result() :: allow | deny). + +%% Wrappers for key and value +cache_k(PubSub, Topic)-> {PubSub, Topic}. +cache_v(AclResult)-> {AclResult, time_now()}. + +%% We'll cleanup the cache before repalcing an expired acl. +-spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) + -> (acl_result() | not_found)). +get_acl_cache(PubSub, Topic) -> + case erlang:get(cache_k(PubSub, Topic)) of + undefined -> not_found; + {AclResult, CachedAt} -> + if_expired(CachedAt, + fun(false) -> + AclResult; + (true) -> + cleanup_acl_cache(), + not_found + end) + end. + +%% If the cache get full, and also the latest one +%% is expired, then delete all the cache entries +-spec(put_acl_cache(PubSub :: publish | subscribe, + Topic :: topic(), AclResult :: acl_result()) -> ok). +put_acl_cache(PubSub, Topic, AclResult) -> + MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), + Size = get_cache_size(), + if + Size < MaxSize -> + add_acl(PubSub, Topic, AclResult); + Size =:= MaxSize -> + NewestK = get_newest_key(), + {_AclResult, CachedAt} = erlang:get(NewestK), + if_expired(CachedAt, + fun(true) -> + % all cache expired, cleanup first + empty_acl_cache(), + add_acl(PubSub, Topic, AclResult); + (false) -> + % cache full, perform cache replacement + evict_acl_cache(), + add_acl(PubSub, Topic, AclResult) + end) + end. + +%% delete all the acl entries +-spec(empty_acl_cache() -> ok). +empty_acl_cache() -> + map_acl_cache(fun({CacheK, _CacheV}) -> + erlang:erase(CacheK) + end), + set_cache_size(0), + keys_queue_set(queue:new()). + +%% delete the oldest acl entry +-spec(evict_acl_cache() -> ok). +evict_acl_cache() -> + OldestK = keys_queue_out(), + erlang:erase(OldestK), + decr_cache_size(). + +%% cleanup all the exipired cache entries +-spec(cleanup_acl_cache() -> ok). +cleanup_acl_cache() -> + keys_queue_set( + cleanup_acl(keys_queue_get())). + +get_oldest_key() -> + keys_queue_pick(queue_front()). +get_newest_key() -> + keys_queue_pick(queue_rear()). + +get_cache_max_size() -> + application:get_env(emqx, acl_cache_max_size, 0). + +get_cache_size() -> + case erlang:get(acl_cache_size) of + undefined -> 0; + Size -> Size + end. + +dump_acl_cache() -> + map_acl_cache(fun(Cache) -> Cache end). +map_acl_cache(Fun) -> + [Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish + orelse SubPub =:= subscribe]. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +add_acl(PubSub, Topic, AclResult) -> + K = cache_k(PubSub, Topic), + V = cache_v(AclResult), + case erlang:get(K) of + undefined -> add_new_acl(K, V); + {_AclResult, _CachedAt} -> + update_acl(K, V) + end. + +add_new_acl(K, V) -> + erlang:put(K, V), + keys_queue_in(K), + incr_cache_size(). + +update_acl(K, V) -> + erlang:put(K, V), + keys_queue_update(K). + +cleanup_acl(KeysQ) -> + case queue:out(KeysQ) of + {{value, OldestK}, KeysQ2} -> + {_AclResult, CachedAt} = erlang:get(OldestK), + if_expired(CachedAt, + fun(false) -> KeysQ; + (true) -> + erlang:erase(OldestK), + decr_cache_size(), + cleanup_acl(KeysQ2) + end); + {empty, KeysQ} -> KeysQ + end. + +incr_cache_size() -> + erlang:put(acl_cache_size, get_cache_size() + 1), ok. +decr_cache_size() -> + Size = get_cache_size(), + if Size > 1 -> + erlang:put(acl_cache_size, Size-1); + Size =< 1 -> + erlang:put(acl_cache_size, 0) + end, ok. +set_cache_size(N) -> + erlang:put(acl_cache_size, N), ok. + +%%% Ordered Keys Q %%% +keys_queue_in(Key) -> + %% delete the key first if exists + KeysQ = keys_queue_get(), + keys_queue_set(queue:in(Key, KeysQ)). + +keys_queue_out() -> + case queue:out(keys_queue_get()) of + {{value, OldestK}, Q2} -> + keys_queue_set(Q2), OldestK; + {empty, _Q} -> + undefined + end. + +keys_queue_update(Key) -> + NewKeysQ = keys_queue_remove(Key, keys_queue_get()), + keys_queue_set(queue:in(Key, NewKeysQ)). + +keys_queue_pick(Pick) -> + KeysQ = keys_queue_get(), + case queue:is_empty(KeysQ) of + true -> undefined; + false -> Pick(KeysQ) + end. + +keys_queue_remove(Key, KeysQ) -> + queue:filter(fun + (K) when K =:= Key -> false; (_) -> true + end, KeysQ). + +keys_queue_set(KeysQ) -> + erlang:put(acl_keys_q, KeysQ), ok. +keys_queue_get() -> + case erlang:get(acl_keys_q) of + undefined -> queue:new(); + KeysQ -> KeysQ + end. + +queue_front() -> fun queue:get/1. +queue_rear() -> fun queue:get_r/1. + +time_now() -> erlang:system_time(millisecond). + +if_expired(CachedAt, Fun) -> + TTL = application:get_env(emqx, acl_cache_ttl, 60000), + Now = time_now(), + if (CachedAt + TTL) =< Now -> + Fun(true); + true -> + Fun(false) + end. diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index c3907c2db..cf859d7c1 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -25,6 +25,7 @@ -include_lib("eunit/include/eunit.hrl"). -define(AC, emqx_access_control). +-define(CACHE, emqx_acl_cache). -import(emqx_access_rule, [compile/1, match/3]). @@ -150,14 +151,14 @@ check_acl_2(_) -> acl_cache_basic(_) -> SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, - not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), - not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), - allow = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), - allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ok. acl_cache_expiry(_) -> @@ -165,9 +166,9 @@ acl_cache_expiry(_) -> SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), - allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ct:sleep(1100), - not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ok. acl_cache_full(_) -> @@ -178,8 +179,8 @@ acl_cache_full(_) -> allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), %% the older ones (the <<"users/testuser/1">>) will be evicted first - not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), - allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ok. acl_cache_cleanup(_) -> @@ -192,115 +193,115 @@ acl_cache_cleanup(_) -> allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), - allow = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), - allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ct:sleep(1100), %% now the cache is full and the newest one - "clients/client1" %% should be expired, so we'll try to cleanup before putting the next cache entry deny = ?AC:check_acl(SelfUser, subscribe, <<"#">>), - not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), - not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), - deny = ?AC:get_acl_cache(subscribe, <<"#">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), + deny = ?CACHE:get_acl_cache(subscribe, <<"#">>), ok. put_get_del_cache(_) -> application:set_env(emqx, acl_cache_ttl, 300000), application:set_env(emqx, acl_cache_max_size, 30), - not_found = ?AC:get_acl_cache(publish, <<"a">>), - ok = ?AC:put_acl_cache(publish, <<"a">>, allow), - allow = ?AC:get_acl_cache(publish, <<"a">>), + not_found = ?CACHE:get_acl_cache(publish, <<"a">>), + ok = ?CACHE:put_acl_cache(publish, <<"a">>, allow), + allow = ?CACHE:get_acl_cache(publish, <<"a">>), - not_found = ?AC:get_acl_cache(subscribe, <<"b">>), - ok = ?AC:put_acl_cache(subscribe, <<"b">>, deny), - deny = ?AC:get_acl_cache(subscribe, <<"b">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"b">>), + ok = ?CACHE:put_acl_cache(subscribe, <<"b">>, deny), + deny = ?CACHE:get_acl_cache(subscribe, <<"b">>), - 2 = ?AC:get_cache_size(), - ?assertEqual(?AC:cache_k(subscribe, <<"b">>), ?AC:get_newest_key()). + 2 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(subscribe, <<"b">>), ?CACHE:get_newest_key()). cache_expiry(_) -> application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_max_size, 30), - ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), - allow = ?AC:get_acl_cache(subscribe, <<"a">>), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + allow = ?CACHE:get_acl_cache(subscribe, <<"a">>), ct:sleep(1100), - not_found = ?AC:get_acl_cache(subscribe, <<"a">>), + not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), - ok = ?AC:put_acl_cache(subscribe, <<"a">>, deny), - deny = ?AC:get_acl_cache(subscribe, <<"a">>), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, deny), + deny = ?CACHE:get_acl_cache(subscribe, <<"a">>), ct:sleep(1100), - not_found = ?AC:get_acl_cache(subscribe, <<"a">>). + not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>). cache_update(_) -> application:set_env(emqx, acl_cache_ttl, 300000), application:set_env(emqx, acl_cache_max_size, 30), - [] = ?AC:dump_acl_cache(), + [] = ?CACHE:dump_acl_cache(), - ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?AC:put_acl_cache(publish, <<"b">>, allow), - ok = ?AC:put_acl_cache(publish, <<"c">>, allow), - 3 = ?AC:get_cache_size(), - ?assertEqual(?AC:cache_k(publish, <<"c">>), ?AC:get_newest_key()), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()), %% update the 2nd one - ok = ?AC:put_acl_cache(publish, <<"b">>, allow), - %ct:pal("dump acl cache: ~p~n", [?AC:dump_acl_cache()]), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + %ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]), - 3 = ?AC:get_cache_size(), - ?assertEqual(?AC:cache_k(publish, <<"b">>), ?AC:get_newest_key()). + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()). cache_full_replacement(_) -> application:set_env(emqx, acl_cache_ttl, 300000), application:set_env(emqx, acl_cache_max_size, 3), - ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?AC:put_acl_cache(publish, <<"b">>, allow), - ok = ?AC:put_acl_cache(publish, <<"c">>, allow), - allow = ?AC:get_acl_cache(subscribe, <<"a">>), - allow = ?AC:get_acl_cache(publish, <<"b">>), - allow = ?AC:get_acl_cache(publish, <<"c">>), - 3 = ?AC:get_cache_size(), - ?assertEqual(?AC:cache_k(publish, <<"c">>), ?AC:get_newest_key()), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + allow = ?CACHE:get_acl_cache(subscribe, <<"a">>), + allow = ?CACHE:get_acl_cache(publish, <<"b">>), + allow = ?CACHE:get_acl_cache(publish, <<"c">>), + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()), - ok = ?AC:put_acl_cache(publish, <<"d">>, deny), - 3 = ?AC:get_cache_size(), - ?assertEqual(?AC:cache_k(publish, <<"d">>), ?AC:get_newest_key()), + ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny), + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"d">>), ?CACHE:get_newest_key()), - ok = ?AC:put_acl_cache(publish, <<"e">>, deny), - 3 = ?AC:get_cache_size(), - ?assertEqual(?AC:cache_k(publish, <<"e">>), ?AC:get_newest_key()), + ok = ?CACHE:put_acl_cache(publish, <<"e">>, deny), + 3 = ?CACHE:get_cache_size(), + ?assertEqual(?CACHE:cache_k(publish, <<"e">>), ?CACHE:get_newest_key()), - not_found = ?AC:get_acl_cache(subscribe, <<"a">>), - not_found = ?AC:get_acl_cache(publish, <<"b">>), - allow = ?AC:get_acl_cache(publish, <<"c">>). + not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), + not_found = ?CACHE:get_acl_cache(publish, <<"b">>), + allow = ?CACHE:get_acl_cache(publish, <<"c">>). cache_cleanup(_) -> application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_max_size, 30), - ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?AC:put_acl_cache(publish, <<"b">>, allow), - ok = ?AC:put_acl_cache(publish, <<"c">>, allow), - 3 = ?AC:get_cache_size(), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + 3 = ?CACHE:get_cache_size(), ct:sleep(1100), - ?AC:cleanup_acl_cache(), - 0 = ?AC:get_cache_size(). + ?CACHE:cleanup_acl_cache(), + 0 = ?CACHE:get_cache_size(). cache_full_cleanup(_) -> application:set_env(emqx, acl_cache_ttl, 1000), application:set_env(emqx, acl_cache_max_size, 3), - ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?AC:put_acl_cache(publish, <<"b">>, allow), - ok = ?AC:put_acl_cache(publish, <<"c">>, allow), - 3 = ?AC:get_cache_size(), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + 3 = ?CACHE:get_cache_size(), ct:sleep(1100), %% verify auto cleanup upon cache full - ok = ?AC:put_acl_cache(subscribe, <<"d">>, deny), - 1 = ?AC:get_cache_size(). + ok = ?CACHE:put_acl_cache(subscribe, <<"d">>, deny), + 1 = ?CACHE:get_cache_size(). %%-------------------------------------------------------------------- %% emqx_access_rule From 9d29dd0e1033c608f53035fcf4683ac295f3149a Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sun, 26 Aug 2018 18:22:17 +0800 Subject: [PATCH 5/5] use config enable_acl_cache --- etc/emqx.conf | 9 +++-- priv/emqx.schema | 13 ++++++- src/emqx.app.src | 4 +-- src/emqx_access_control.erl | 12 +++---- src/emqx_acl_cache.erl | 7 +++- test/emqx_access_SUITE.erl | 70 ++++++++++++++++++++++++++----------- 6 files changed, 82 insertions(+), 33 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 62f94b174..f3f46589e 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -434,12 +434,17 @@ acl_nomatch = allow ## Value: File Name acl_file = {{ platform_etc_dir }}/acl.conf +## Whether to enable ACL cache for publish. ## The ACL cache size ## The maximum count of ACL entries allowed for a client. ## -## Value 0 disables ACL cache +## Value: on | off +enable_acl_cache = on + +## The ACL cache size +## The maximum count of ACL entries allowed for a client. ## -## Value: Integer +## Value: Integer greater than 0 ## Default: 32 acl_cache_max_size = 32 diff --git a/priv/emqx.schema b/priv/emqx.schema index f6ee7c621..765363607 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -571,6 +571,12 @@ end}. hidden ]}. +%% @doc Enable ACL cache for publish. +{mapping, "enable_acl_cache", "emqx.enable_acl_cache", [ + {default, on}, + {datatype, flag} +]}. + %% @doc ACL cache time-to-live. {mapping, "acl_cache_ttl", "emqx.acl_cache_ttl", [ {default, "1m"}, @@ -580,9 +586,14 @@ end}. %% @doc ACL cache size. {mapping, "acl_cache_max_size", "emqx.acl_cache_max_size", [ {default, 32}, - {datatype, integer} + {datatype, integer}, + {validators, ["range:gt_0"]} ]}. +{validator, "range:gt_0", "must greater than 0", + fun(X) -> X > 0 end +}. + %%-------------------------------------------------------------------- %% MQTT Protocol %%-------------------------------------------------------------------- diff --git a/src/emqx.app.src b/src/emqx.app.src index c4a8b5c2e..39d876797 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -3,8 +3,8 @@ {vsn,"3.0"}, {modules,[]}, {registered,[emqx_sup]}, - {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy, - minirest]}, + {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy + ]}, {env,[]}, {mod,{emqx_app,[]}}, {maintainers,["Feng Lee "]}, diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 1c5d04b4e..56536501f 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -81,15 +81,15 @@ authenticate(Client, Password, [{Mod, State, _Seq} | Mods]) -> %% @doc Check ACL -spec(check_acl(client(), pubsub(), topic()) -> allow | deny). check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> - CacheEnabled = (emqx_acl_cache:get_cache_max_size() =/= 0), + CacheEnabled = emqx_acl_cache:is_enabled(), check_acl(Client, PubSub, Topic, lookup_mods(acl), CacheEnabled). check_acl(Client, PubSub, Topic, AclMods, false) -> - check_acl_from_plugins(Client, PubSub, Topic, AclMods); + do_check_acl(Client, PubSub, Topic, AclMods); check_acl(Client, PubSub, Topic, AclMods, true) -> case emqx_acl_cache:get_acl_cache(PubSub, Topic) of not_found -> - AclResult = check_acl_from_plugins(Client, PubSub, Topic, AclMods), + AclResult = do_check_acl(Client, PubSub, Topic, AclMods), emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult), AclResult; AclResult -> @@ -189,13 +189,13 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -check_acl_from_plugins(#client{zone = Zone}, _PubSub, _Topic, []) -> +do_check_acl(#client{zone = Zone}, _PubSub, _Topic, []) -> emqx_zone:get_env(Zone, acl_nomatch, deny); -check_acl_from_plugins(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> +do_check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> case Mod:check_acl({Client, PubSub, Topic}, State) of allow -> allow; deny -> deny; - ignore -> check_acl_from_plugins(Client, PubSub, Topic, AclMods) + ignore -> do_check_acl(Client, PubSub, Topic, AclMods) end. %%-------------------------------------------------------------------- diff --git a/src/emqx_acl_cache.erl b/src/emqx_acl_cache.erl index 7db23fb1e..65e1e3305 100644 --- a/src/emqx_acl_cache.erl +++ b/src/emqx_acl_cache.erl @@ -13,6 +13,7 @@ , get_oldest_key/0 , cache_k/2 , cache_v/1 + , is_enabled/0 ]). -type(acl_result() :: allow | deny). @@ -21,6 +22,10 @@ cache_k(PubSub, Topic)-> {PubSub, Topic}. cache_v(AclResult)-> {AclResult, time_now()}. +-spec(is_enabled() -> boolean()). +is_enabled() -> + application:get_env(emqx, enable_acl_cache, true). + %% We'll cleanup the cache before repalcing an expired acl. -spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) -> (acl_result() | not_found)). @@ -90,7 +95,7 @@ get_newest_key() -> keys_queue_pick(queue_rear()). get_cache_max_size() -> - application:get_env(emqx, acl_cache_max_size, 0). + application:get_env(emqx, acl_cache_max_size, 32). get_cache_size() -> case erlang:get(acl_cache_size) of diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index cf859d7c1..f88420e56 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -55,9 +55,10 @@ groups() -> put_get_del_cache, cache_update, cache_expiry, - cache_full_replacement, + cache_replacement, cache_cleanup, - cache_full_cleanup + cache_auto_emtpy, + cache_auto_cleanup ]}, {access_rule, [], [compile_rule, @@ -73,9 +74,10 @@ init_per_group(_Group, Config) -> prepare_config(Group = access_control) -> set_acl_config_file(Group), - application:set_env(emqx, acl_cache_max_size, 0); + application:set_env(emqx, enable_acl_cache, false); prepare_config(Group = access_control_cache_mode) -> set_acl_config_file(Group), + application:set_env(emqx, enable_acl_cache, true), application:set_env(emqx, acl_cache_max_size, 100). set_acl_config_file(_Group) -> @@ -162,12 +164,12 @@ acl_cache_basic(_) -> ok. acl_cache_expiry(_) -> - application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_ttl, 100), SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), - ct:sleep(1100), + ct:sleep(150), not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ok. @@ -186,7 +188,7 @@ acl_cache_full(_) -> acl_cache_cleanup(_) -> %% The acl cache will try to evict memory, if the size is full and the newest %% cache entry is expired - application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_ttl, 100), application:set_env(emqx, acl_cache_max_size, 2), SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, @@ -196,9 +198,10 @@ acl_cache_cleanup(_) -> allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), - ct:sleep(1100), + ct:sleep(150), %% now the cache is full and the newest one - "clients/client1" - %% should be expired, so we'll try to cleanup before putting the next cache entry + %% should be expired, so we'll empty the cache before putting + %% the next cache entry deny = ?AC:check_acl(SelfUser, subscribe, <<"#">>), not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), @@ -222,18 +225,18 @@ put_get_del_cache(_) -> ?assertEqual(?CACHE:cache_k(subscribe, <<"b">>), ?CACHE:get_newest_key()). cache_expiry(_) -> - application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_ttl, 100), application:set_env(emqx, acl_cache_max_size, 30), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), allow = ?CACHE:get_acl_cache(subscribe, <<"a">>), - ct:sleep(1100), + ct:sleep(150), not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, deny), deny = ?CACHE:get_acl_cache(subscribe, <<"a">>), - ct:sleep(1100), + ct:sleep(150), not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>). cache_update(_) -> @@ -249,12 +252,13 @@ cache_update(_) -> %% update the 2nd one ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - %ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]), + ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]), 3 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()). + ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()), + ?assertEqual(?CACHE:cache_k(subscribe, <<"a">>), ?CACHE:get_oldest_key()). -cache_full_replacement(_) -> +cache_replacement(_) -> application:set_env(emqx, acl_cache_ttl, 300000), application:set_env(emqx, acl_cache_max_size, 3), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), @@ -269,40 +273,64 @@ cache_full_replacement(_) -> ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny), 3 = ?CACHE:get_cache_size(), ?assertEqual(?CACHE:cache_k(publish, <<"d">>), ?CACHE:get_newest_key()), + ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_oldest_key()), ok = ?CACHE:put_acl_cache(publish, <<"e">>, deny), 3 = ?CACHE:get_cache_size(), ?assertEqual(?CACHE:cache_k(publish, <<"e">>), ?CACHE:get_newest_key()), + ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()), not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), not_found = ?CACHE:get_acl_cache(publish, <<"b">>), allow = ?CACHE:get_acl_cache(publish, <<"c">>). cache_cleanup(_) -> - application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_ttl, 100), application:set_env(emqx, acl_cache_max_size, 30), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ct:sleep(150), ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), 3 = ?CACHE:get_cache_size(), - ct:sleep(1100), ?CACHE:cleanup_acl_cache(), - 0 = ?CACHE:get_cache_size(). + ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()), + 1 = ?CACHE:get_cache_size(). -cache_full_cleanup(_) -> - application:set_env(emqx, acl_cache_ttl, 1000), +cache_auto_emtpy(_) -> + %% verify cache is emptied when cache full and even the newest + %% one is expired. + application:set_env(emqx, acl_cache_ttl, 100), application:set_env(emqx, acl_cache_max_size, 3), ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), 3 = ?CACHE:get_cache_size(), - ct:sleep(1100), - %% verify auto cleanup upon cache full + ct:sleep(150), ok = ?CACHE:put_acl_cache(subscribe, <<"d">>, deny), 1 = ?CACHE:get_cache_size(). +cache_auto_cleanup(_) -> + %% verify we'll cleanup expired entries when we got a exipired acl + %% from cache. + application:set_env(emqx, acl_cache_ttl, 100), + application:set_env(emqx, acl_cache_max_size, 30), + ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), + ct:sleep(150), + ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), + ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny), + 4 = ?CACHE:get_cache_size(), + + %% "a" and "b" expires, while "c" and "d" not + not_found = ?CACHE:get_acl_cache(publish, <<"b">>), + 2 = ?CACHE:get_cache_size(), + + ct:sleep(150), %% now "c" and "d" expires + not_found = ?CACHE:get_acl_cache(publish, <<"c">>), + 0 = ?CACHE:get_cache_size(). + %%-------------------------------------------------------------------- %% emqx_access_rule %%--------------------------------------------------------------------