diff --git a/etc/emqx.conf b/etc/emqx.conf index 4703f5083..33c06b5d3 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -434,16 +434,21 @@ acl_nomatch = allow ## Value: File Name acl_file = {{ platform_etc_dir }}/acl.conf -## Whether to enable ACL cache for publish. +## The ACL cache size +## The maximum count of ACL entries allowed for a client. +## Value 0 disables ACL cache ## -## Value: on | off -enable_acl_cache = on +## Value: Integer +## Default: 100 +acl_cache_size = 100 -## The ACL cache age. +## The ACL cache time-to-live. +## The time after which an ACL cache entry will be invalid ## ## Value: Duration -## Default: 5 minute -acl_cache_age = 5m +## Default: 1 minute +acl_cache_ttl = 1m + ##-------------------------------------------------------------------- ## MQTT Protocol @@ -1875,4 +1880,3 @@ sysmon.busy_port = false ## ## Value: true | false sysmon.busy_dist_port = true - diff --git a/priv/emqx.schema b/priv/emqx.schema index a0d2bc0e2..f26c012ce 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -571,23 +571,17 @@ end}. hidden ]}. -%% @doc Enable ACL cache for publish. -{mapping, "enable_acl_cache", "emqx.enable_acl_cache", [ - {default, on}, - {datatype, flag} -]}. - -%% @doc ACL cache age. -{mapping, "acl_cache_age", "emqx.acl_cache_age", [ - {default, "5m"}, +%% @doc ACL cache time-to-live. +{mapping, "acl_cache_ttl", "emqx.acl_cache_ttl", [ + {default, "1m"}, {datatype, {duration, ms}} ]}. %% @doc ACL cache size. -%% {mapping, "acl_cache_size", "emqx.acl_cache_size", [ -%% {default, 0}, -%% {datatype, integer} -%% ]}. +{mapping, "acl_cache_size", "emqx.acl_cache_size", [ + {default, 100}, + {datatype, integer} +]}. %%-------------------------------------------------------------------- %% MQTT Protocol @@ -1703,4 +1697,3 @@ end}. {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] end}. - diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index f43309088..5b52312ef 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -21,10 +21,18 @@ -export([start_link/0]). -export([authenticate/2]). -export([check_acl/3, reload_acl/0, lookup_mods/1]). --export([clean_acl_cache/1, clean_acl_cache/2]). -export([register_mod/3, register_mod/4, unregister_mod/2]). -export([stop/0]). +-export([get_acl_cache/2, + put_acl_cache/3, + delete_acl_cache/2, + cleanup_acl_cache/0, + dump_acl_cache/0, + get_cache_size/0, + get_newest_key/0 + ]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -33,6 +41,7 @@ -define(SERVER, ?MODULE). -type(password() :: undefined | binary()). +-type(acl_result() :: allow | deny). -record(state, {}). @@ -82,16 +91,19 @@ authenticate(Client, Password, [{Mod, State, _Seq} | Mods]) -> %% @doc Check ACL -spec(check_acl(client(), pubsub(), topic()) -> allow | deny). check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> - check_acl(Client, PubSub, Topic, lookup_mods(acl)). + CacheEnabled = (get_cache_max_size() =/= 0), + check_acl(Client, PubSub, Topic, lookup_mods(acl), CacheEnabled). -check_acl(#client{zone = Zone}, _PubSub, _Topic, []) -> - emqx_zone:get_env(Zone, acl_nomatch, deny); - -check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> - case Mod:check_acl({Client, PubSub, Topic}, State) of - allow -> allow; - deny -> deny; - ignore -> check_acl(Client, PubSub, Topic, AclMods) +check_acl(Client, PubSub, Topic, AclMods, false) -> + check_acl_from_plugins(Client, PubSub, Topic, AclMods); +check_acl(Client, PubSub, Topic, AclMods, true) -> + case get_acl_cache(PubSub, Topic) of + not_found -> + AclResult = check_acl_from_plugins(Client, PubSub, Topic, AclMods), + put_acl_cache(PubSub, Topic, AclResult), + AclResult; + AclResult -> + AclResult end. %% @doc Reload ACL Rules. @@ -130,12 +142,6 @@ tab_key(acl) -> acl_modules. stop() -> gen_server:stop(?MODULE, normal, infinity). -%%TODO: Support ACL cache... -clean_acl_cache(_ClientId) -> - ok. -clean_acl_cache(_ClientId, _Topic) -> - ok. - %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -193,6 +199,15 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +check_acl_from_plugins(#client{zone = Zone}, _PubSub, _Topic, []) -> + emqx_zone:get_env(Zone, acl_nomatch, deny); +check_acl_from_plugins(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> + case Mod:check_acl({Client, PubSub, Topic}, State) of + allow -> allow; + deny -> deny; + ignore -> check_acl_from_plugins(Client, PubSub, Topic, AclMods) + end. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -202,3 +217,188 @@ if_existed(false, Fun) -> if_existed(_Mod, _Fun) -> {error, already_existed}. +%%-------------------------------------------------------------------- +%% ACL cache +%%-------------------------------------------------------------------- + +-spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) + -> (acl_result() | not_found)). +get_acl_cache(PubSub, Topic) -> + case erlang:get({PubSub, Topic}) of + undefined -> not_found; + {AclResult, CachedAt, _NextK, _PrevK} -> + if_acl_cache_expired(CachedAt, + fun(false) -> + AclResult; + (true) -> + %% this expired entry will get updated in + %% put_acl_cache/3 + not_found + end) + end. + +-spec(put_acl_cache(PubSub :: publish | subscribe, + Topic :: topic(), AclResult :: acl_result()) -> ok). +put_acl_cache(PubSub, Topic, AclResult) -> + MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), + Size = get_cache_size(), + if + Size =:= 0 -> + create_first(PubSub, Topic, AclResult); + Size < MaxSize -> + append(PubSub, Topic, AclResult); + Size =:= MaxSize -> + %% when the cache get full, and also the latest one + %% is expired, we'll perform a cleanup. + NewestK = get_newest_key(), + {_AclResult, CachedAt, OldestK, _PrevK} = erlang:get(NewestK), + if_acl_cache_expired(CachedAt, + fun(true) -> + % try to cleanup first + cleanup_acl_cache(OldestK), + add_cache(PubSub, Topic, AclResult); + (false) -> + % cache full, perform cache replacement + delete_acl_cache(OldestK), + append(PubSub, Topic, AclResult) + end) + end. + +-spec(delete_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) -> ok). +delete_acl_cache(PubSub, Topic) -> + delete_acl_cache(_K = {PubSub, Topic}). +delete_acl_cache(K) -> + case erlang:get(K) of + undefined -> ok; + {_AclResult, _CachedAt, NextK, PrevK} when NextK =:= PrevK -> + %% there is only one entry in the cache + erlang:erase(K), + decr_cache_size(), + set_newest_key(undefined); + {_AclResult, _CachedAt, NextK, PrevK} -> + update_next(PrevK, NextK), + update_prev(NextK, PrevK), + erlang:erase(K), + + decr_cache_size(), + NewestK = get_newest_key(), + if + K =:= NewestK -> set_newest_key(NextK); + true -> ok + end + end. + +%% evict all the exipired cache entries +-spec(cleanup_acl_cache() -> ok). +cleanup_acl_cache() -> + case get_newest_key() of + undefined -> ok; + NewestK -> + {_AclResult, _CachedAt, OldestK, _PrevK} = erlang:get(NewestK), + cleanup_acl_cache(OldestK) + end. +cleanup_acl_cache(FromK) -> + case erlang:get(FromK) of + undefined -> ok; + {_AclResult, CachedAt, NextK, _PrevK} -> + if_acl_cache_expired(CachedAt, + fun(false) -> + ok; + (true) -> + delete_acl_cache(FromK), + cleanup_acl_cache(NextK) + end) + end. + +%% for test only +dump_acl_cache() -> + [R || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish + orelse SubPub =:= subscribe]. + +add_cache(PubSub, Topic, AclResult) -> + Size = get_cache_size(), + MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), + if + Size =:= 0 -> + create_first(PubSub, Topic, AclResult); + Size =:= MaxSize -> + OldestK = get_next_key(get_newest_key()), + delete_acl_cache(OldestK), + case get_cache_size() =:= 0 of + true -> create_first(PubSub, Topic, AclResult); + false -> append(PubSub, Topic, AclResult) + end; + true -> + append(PubSub, Topic, AclResult) + end. + +create_first(PubSub, Topic, AclResult) -> + K = cache_k(PubSub, Topic), + V = cache_v(AclResult, _NextK = K, _PrevK = K), + erlang:put(K, V), + set_cache_size(1), + set_newest_key(K). + +append(PubSub, Topic, AclResult) -> + %% try to update the existing one: + %% - we delete it and then append it at the tail + delete_acl_cache(PubSub, Topic), + + case get_cache_size() =:= 0 of + true -> create_first(PubSub, Topic, AclResult); + false -> + NewestK = get_newest_key(), + OldestK = get_next_key(NewestK), + K = cache_k(PubSub, Topic), + V = cache_v(AclResult, OldestK, NewestK), + erlang:put(K, V), + + update_next(NewestK, K), + update_prev(OldestK, K), + incr_cache_size(), + set_newest_key(K) + end. + +get_next_key(K) -> + erlang:element(3, erlang:get(K)). +update_next(K, NextK) -> + NoNext = erlang:delete_element(3, erlang:get(K)), + erlang:put(K, erlang:insert_element(3, NoNext, NextK)). +update_prev(K, PrevK) -> + NoPrev = erlang:delete_element(4, erlang:get(K)), + erlang:put(K, erlang:insert_element(4, NoPrev, PrevK)). + +cache_k(PubSub, Topic)-> {PubSub, Topic}. +cache_v(AclResult, NextK, PrevK)-> {AclResult, time_now(), NextK, PrevK}. + +get_cache_max_size() -> + application:get_env(emqx, acl_cache_size, 100). + +get_cache_size() -> + case erlang:get(acl_cache_size) of + undefined -> 0; + Size -> Size + end. +incr_cache_size() -> + erlang:put(acl_cache_size, get_cache_size() + 1), ok. +decr_cache_size() -> + erlang:put(acl_cache_size, get_cache_size() - 1), ok. +set_cache_size(N) -> + erlang:put(acl_cache_size, N), ok. + +get_newest_key() -> + erlang:get(acl_cache_newest_key). + +set_newest_key(Key) -> + erlang:put(acl_cache_newest_key, Key), ok. + +time_now() -> erlang:system_time(millisecond). + +if_acl_cache_expired(CachedAt, Fun) -> + TTL = application:get_env(emqx, acl_cache_ttl, 60000), + Now = time_now(), + if (CachedAt + TTL) =< Now -> + Fun(true); + true -> + Fun(false) + end. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 3faa7781a..941baaa7d 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -662,4 +662,3 @@ feed_var({<<"%u">>, Username}, MountPoint) -> sp(true) -> 1; sp(false) -> 0. - diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index f206cca05..f2ca9fad6 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -29,27 +29,54 @@ all() -> [{group, access_control}, - {group, access_rule}]. + {group, acl_cache}, + {group, access_control_cache_mode}, + {group, access_rule} + ]. groups() -> [{access_control, [sequence], - [reload_acl, - register_mod, - unregister_mod, - check_acl]}, + [reload_acl, + register_mod, + unregister_mod, + check_acl_1, + check_acl_2 + ]}, + {access_control_cache_mode, [], + [ + acl_cache_basic, + acl_cache_expiry, + acl_cache_cleanup, + acl_cache_full + ]}, + {acl_cache, [], [ + put_get_del_cache, + cache_update, + cache_expiry, + cache_full_replacement, + cache_cleanup, + cache_full_cleanup + ]}, {access_rule, [], - [compile_rule, - match_rule]}]. + [compile_rule, + match_rule]}]. -init_per_group(access_control, Config) -> +init_per_group(Group, Config) when Group =:= access_control; + Group =:= access_control_cache_mode -> + prepare_config(Group), application:load(emqx), - prepare_config(), Config; - init_per_group(_Group, Config) -> Config. -prepare_config() -> +prepare_config(Group = access_control) -> + set_acl_config_file(Group), + application:set_env(emqx, acl_cache_size, 0); +prepare_config(Group = access_control_cache_mode) -> + set_acl_config_file(Group), + application:set_env(emqx, acl_cache_size, 100). + +set_acl_config_file(_Group) -> Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}, {allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}, {allow, {user, "admin"}, pubsub, ["a/b/c", "d/e/f/#"]}, @@ -59,8 +86,8 @@ prepare_config() -> {deny, all, subscribe, ["$SYS/#", "#"]}, {deny, all}], write_config("access_SUITE_acl.conf", Rules), - application:set_env(emqx, acl_file, "access_SUITE_acl.conf"), - application:set_env(emqx, acl_cache, false). + application:set_env(emqx, acl_file, "access_SUITE_acl.conf"). + write_config(Filename, Terms) -> file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]). @@ -68,24 +95,18 @@ write_config(Filename, Terms) -> end_per_group(_Group, Config) -> Config. -init_per_testcase(TestCase, Config) when TestCase =:= reload_acl; - TestCase =:= register_mod; - TestCase =:= unregister_mod; - TestCase =:= check_acl -> - {ok, _Pid} = ?AC:start_link(), Config; - init_per_testcase(_TestCase, Config) -> + {ok, _Pid} = ?AC:start_link(), Config. - -end_per_testcase(TestCase, _Config) when TestCase =:= reload_acl; - TestCase =:= register_mod; - TestCase =:= unregister_mod; - TestCase =:= check_acl -> - ?AC:stop(); - end_per_testcase(_TestCase, _Config) -> ok. +per_testcase_config(acl_cache_full, Config) -> + Config; +per_testcase_config(_TestCase, Config) -> + Config. + + %%-------------------------------------------------------------------- %% emqx_access_control %%-------------------------------------------------------------------- @@ -115,15 +136,170 @@ unregister_mod(_) -> timer:sleep(5), [] = ?AC:lookup_mods(auth). -check_acl(_) -> - User1 = #client{id = <<"client1">>, username = <<"testuser">>}, - User2 = #client{id = <<"client2">>, username = <<"xyz">>}, - allow = ?AC:check_acl(User1, subscribe, <<"users/testuser/1">>), - allow = ?AC:check_acl(User1, subscribe, <<"clients/client1">>), - deny = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>), - allow = ?AC:check_acl(User1, publish, <<"users/testuser/1">>), - allow = ?AC:check_acl(User1, subscribe, <<"a/b/c">>), - deny = ?AC:check_acl(User2, subscribe, <<"a/b/c">>). +check_acl_1(_) -> + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>), + allow = ?AC:check_acl(SelfUser, publish, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). +check_acl_2(_) -> + SelfUser = #client{id = <<"client2">>, username = <<"xyz">>}, + deny = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). + +acl_cache_basic(_) -> + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), + not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + + allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + + allow = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + ok. + +acl_cache_expiry(_) -> + application:set_env(emqx, acl_cache_ttl, 1000), + + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + ct:sleep(1100), + not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + ok. + +acl_cache_full(_) -> + application:set_env(emqx, acl_cache_size, 1), + + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + + %% the older ones (the <<"users/testuser/1">>) will be evicted first + not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + ok. + +acl_cache_cleanup(_) -> + %% The acl cache will try to evict memory, if the size is full and the newest + %% cache entry is expired + application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_size, 2), + + SelfUser = #client{id = <<"client1">>, username = <<"testuser">>}, + allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), + allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), + + allow = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), + allow = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + + ct:sleep(1100), + %% now the cache is full and the newest one - "clients/client1" + %% should be expired, so we'll try to cleanup before putting the next cache entry + deny = ?AC:check_acl(SelfUser, subscribe, <<"#">>), + + not_found = ?AC:get_acl_cache(subscribe, <<"users/testuser/1">>), + not_found = ?AC:get_acl_cache(subscribe, <<"clients/client1">>), + deny = ?AC:get_acl_cache(subscribe, <<"#">>), + ok. + +put_get_del_cache(_) -> + application:set_env(emqx, acl_cache_ttl, 300000), + application:set_env(emqx, acl_cache_size, 30), + + not_found = ?AC:get_acl_cache(publish, <<"a">>), + ok = ?AC:put_acl_cache(publish, <<"a">>, allow), + allow = ?AC:get_acl_cache(publish, <<"a">>), + + not_found = ?AC:get_acl_cache(subscribe, <<"b">>), + ok = ?AC:put_acl_cache(subscribe, <<"b">>, deny), + deny = ?AC:get_acl_cache(subscribe, <<"b">>), + + 2 = ?AC:get_cache_size(), + {subscribe, <<"b">>} = ?AC:get_newest_key(). + +cache_expiry(_) -> + application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_size, 30), + ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), + allow = ?AC:get_acl_cache(subscribe, <<"a">>), + + ct:sleep(1100), + not_found = ?AC:get_acl_cache(subscribe, <<"a">>), + + ok = ?AC:put_acl_cache(subscribe, <<"a">>, deny), + deny = ?AC:get_acl_cache(subscribe, <<"a">>), + + ct:sleep(1100), + not_found = ?AC:get_acl_cache(subscribe, <<"a">>). + +cache_update(_) -> + application:set_env(emqx, acl_cache_ttl, 300000), + application:set_env(emqx, acl_cache_size, 30), + [] = ?AC:dump_acl_cache(), + + ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?AC:put_acl_cache(publish, <<"b">>, allow), + ok = ?AC:put_acl_cache(publish, <<"c">>, allow), + 3 = ?AC:get_cache_size(), + {publish, <<"c">>} = ?AC:get_newest_key(), + + %% update the 2nd one + ok = ?AC:put_acl_cache(publish, <<"b">>, allow), + %ct:pal("dump acl cache: ~p~n", [?AC:dump_acl_cache()]), + + 3 = ?AC:get_cache_size(), + {publish, <<"b">>} = ?AC:get_newest_key(). + +cache_full_replacement(_) -> + application:set_env(emqx, acl_cache_ttl, 300000), + application:set_env(emqx, acl_cache_size, 3), + ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?AC:put_acl_cache(publish, <<"b">>, allow), + ok = ?AC:put_acl_cache(publish, <<"c">>, allow), + allow = ?AC:get_acl_cache(subscribe, <<"a">>), + allow = ?AC:get_acl_cache(publish, <<"b">>), + allow = ?AC:get_acl_cache(publish, <<"c">>), + 3 = ?AC:get_cache_size(), + {publish, <<"c">>} = ?AC:get_newest_key(), + + ok = ?AC:put_acl_cache(publish, <<"d">>, deny), + 3 = ?AC:get_cache_size(), + {publish, <<"d">>} = ?AC:get_newest_key(), + + ok = ?AC:put_acl_cache(publish, <<"e">>, deny), + 3 = ?AC:get_cache_size(), + {publish, <<"e">>} = ?AC:get_newest_key(), + + not_found = ?AC:get_acl_cache(subscribe, <<"a">>), + not_found = ?AC:get_acl_cache(publish, <<"b">>), + allow = ?AC:get_acl_cache(publish, <<"c">>). + +cache_cleanup(_) -> + application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_size, 30), + ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?AC:put_acl_cache(publish, <<"b">>, allow), + ok = ?AC:put_acl_cache(publish, <<"c">>, allow), + 3 = ?AC:get_cache_size(), + + ct:sleep(1100), + ?AC:cleanup_acl_cache(), + 0 = ?AC:get_cache_size(). + +cache_full_cleanup(_) -> + application:set_env(emqx, acl_cache_ttl, 1000), + application:set_env(emqx, acl_cache_size, 3), + ok = ?AC:put_acl_cache(subscribe, <<"a">>, allow), + ok = ?AC:put_acl_cache(publish, <<"b">>, allow), + ok = ?AC:put_acl_cache(publish, <<"c">>, allow), + 3 = ?AC:get_cache_size(), + + ct:sleep(1100), + %% verify auto cleanup upon cache full + ok = ?AC:put_acl_cache(subscribe, <<"d">>, deny), + 1 = ?AC:get_cache_size(). %%-------------------------------------------------------------------- %% emqx_access_rule