Merge pull request #1744 from terry-xiaoyu/emqx_30_acl_cache_v2

ACL Cache v2 for EMQ X R30
This commit is contained in:
Feng Lee 2018-08-27 09:43:08 +08:00 committed by GitHub
commit 3ba8c90864
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 541 additions and 103 deletions

View File

@ -435,15 +435,26 @@ acl_nomatch = allow
acl_file = {{ platform_etc_dir }}/acl.conf
## Whether to enable ACL cache for publish.
## The ACL cache size
## The maximum count of ACL entries allowed for a client.
##
## Value: on | off
enable_acl_cache = on
## The ACL cache age.
## The ACL cache size
## The maximum count of ACL entries allowed for a client.
##
## Value: Integer greater than 0
## Default: 32
acl_cache_max_size = 32
## The ACL cache time-to-live.
## The time after which an ACL cache entry will be invalid
##
## Value: Duration
## Default: 5 minute
acl_cache_age = 5m
## Default: 1 minute
acl_cache_ttl = 1m
##--------------------------------------------------------------------
## MQTT Protocol
@ -1875,4 +1886,3 @@ sysmon.busy_port = false
##
## Value: true | false
sysmon.busy_dist_port = true

View File

@ -577,17 +577,22 @@ end}.
{datatype, flag}
]}.
%% @doc ACL cache age.
{mapping, "acl_cache_age", "emqx.acl_cache_age", [
{default, "5m"},
%% @doc ACL cache time-to-live.
{mapping, "acl_cache_ttl", "emqx.acl_cache_ttl", [
{default, "1m"},
{datatype, {duration, ms}}
]}.
%% @doc ACL cache size.
%% {mapping, "acl_cache_size", "emqx.acl_cache_size", [
%% {default, 0},
%% {datatype, integer}
%% ]}.
{mapping, "acl_cache_max_size", "emqx.acl_cache_max_size", [
{default, 32},
{datatype, integer},
{validators, ["range:gt_0"]}
]}.
{validator, "range:gt_0", "must greater than 0",
fun(X) -> X > 0 end
}.
%%--------------------------------------------------------------------
%% MQTT Protocol
@ -1703,4 +1708,3 @@ end}.
{busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
{busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
end}.

View File

@ -3,7 +3,8 @@
{vsn,"3.0"},
{modules,[]},
{registered,[emqx_sup]},
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy]},
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy
]},
{env,[]},
{mod,{emqx_app,[]}},
{maintainers,["Feng Lee <feng@emqx.io>"]},

View File

@ -86,19 +86,22 @@ authenticate(Credentials, Password, [{Mod, State, _Seq} | Mods]) ->
{error, Error}
end.
%% @doc Check ACL
-spec(check_acl(credentials(), pubsub(), topic()) -> allow | deny).
check_acl(Credentials, PubSub, Topic) when ?PS(PubSub) ->
check_acl(Credentials, PubSub, Topic, lookup_mods(acl)).
CacheEnabled = emqx_acl_cache:is_enabled(),
check_acl(Credentials, PubSub, Topic, lookup_mods(acl), CacheEnabled).
check_acl(Credentials, _PubSub, _Topic, []) ->
Zone = maps:get(zone, Credentials, undefined),
emqx_zone:get_env(Zone, acl_nomatch, deny);
check_acl(Credentials, PubSub, Topic, [{Mod, State, _Seq}|Mods]) ->
case Mod:check_acl({Credentials, PubSub, Topic}, State) of
ignore -> check_acl(Credentials, PubSub, Topic, Mods);
allow -> allow;
deny -> deny
check_acl(Credentials, PubSub, Topic, AclMods, false) ->
do_check_acl(Credentials, PubSub, Topic, AclMods);
check_acl(Credentials, PubSub, Topic, AclMods, true) ->
case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
not_found ->
AclResult = do_check_acl(Credentials, PubSub, Topic, AclMods),
emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult),
AclResult;
AclResult ->
AclResult
end.
-spec(reload_acl() -> list(ok | {error, term()})).
@ -191,6 +194,15 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
do_check_acl(#client{zone = Zone}, _PubSub, _Topic, []) ->
emqx_zone:get_env(Zone, acl_nomatch, deny);
do_check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
case Mod:check_acl({Client, PubSub, Topic}, State) of
allow -> allow;
deny -> deny;
ignore -> do_check_acl(Client, PubSub, Topic, AclMods)
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

209
src/emqx_acl_cache.erl Normal file
View File

@ -0,0 +1,209 @@
-module(emqx_acl_cache).
-include("emqx.hrl").
-export([ get_acl_cache/2
, put_acl_cache/3
, cleanup_acl_cache/0
, empty_acl_cache/0
, dump_acl_cache/0
, get_cache_size/0
, get_cache_max_size/0
, get_newest_key/0
, get_oldest_key/0
, cache_k/2
, cache_v/1
, is_enabled/0
]).
-type(acl_result() :: allow | deny).
%% Wrappers for key and value
cache_k(PubSub, Topic)-> {PubSub, Topic}.
cache_v(AclResult)-> {AclResult, time_now()}.
-spec(is_enabled() -> boolean()).
is_enabled() ->
application:get_env(emqx, enable_acl_cache, true).
%% We'll cleanup the cache before repalcing an expired acl.
-spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic())
-> (acl_result() | not_found)).
get_acl_cache(PubSub, Topic) ->
case erlang:get(cache_k(PubSub, Topic)) of
undefined -> not_found;
{AclResult, CachedAt} ->
if_expired(CachedAt,
fun(false) ->
AclResult;
(true) ->
cleanup_acl_cache(),
not_found
end)
end.
%% If the cache get full, and also the latest one
%% is expired, then delete all the cache entries
-spec(put_acl_cache(PubSub :: publish | subscribe,
Topic :: topic(), AclResult :: acl_result()) -> ok).
put_acl_cache(PubSub, Topic, AclResult) ->
MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
Size = get_cache_size(),
if
Size < MaxSize ->
add_acl(PubSub, Topic, AclResult);
Size =:= MaxSize ->
NewestK = get_newest_key(),
{_AclResult, CachedAt} = erlang:get(NewestK),
if_expired(CachedAt,
fun(true) ->
% all cache expired, cleanup first
empty_acl_cache(),
add_acl(PubSub, Topic, AclResult);
(false) ->
% cache full, perform cache replacement
evict_acl_cache(),
add_acl(PubSub, Topic, AclResult)
end)
end.
%% delete all the acl entries
-spec(empty_acl_cache() -> ok).
empty_acl_cache() ->
map_acl_cache(fun({CacheK, _CacheV}) ->
erlang:erase(CacheK)
end),
set_cache_size(0),
keys_queue_set(queue:new()).
%% delete the oldest acl entry
-spec(evict_acl_cache() -> ok).
evict_acl_cache() ->
OldestK = keys_queue_out(),
erlang:erase(OldestK),
decr_cache_size().
%% cleanup all the exipired cache entries
-spec(cleanup_acl_cache() -> ok).
cleanup_acl_cache() ->
keys_queue_set(
cleanup_acl(keys_queue_get())).
get_oldest_key() ->
keys_queue_pick(queue_front()).
get_newest_key() ->
keys_queue_pick(queue_rear()).
get_cache_max_size() ->
application:get_env(emqx, acl_cache_max_size, 32).
get_cache_size() ->
case erlang:get(acl_cache_size) of
undefined -> 0;
Size -> Size
end.
dump_acl_cache() ->
map_acl_cache(fun(Cache) -> Cache end).
map_acl_cache(Fun) ->
[Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish
orelse SubPub =:= subscribe].
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
add_acl(PubSub, Topic, AclResult) ->
K = cache_k(PubSub, Topic),
V = cache_v(AclResult),
case erlang:get(K) of
undefined -> add_new_acl(K, V);
{_AclResult, _CachedAt} ->
update_acl(K, V)
end.
add_new_acl(K, V) ->
erlang:put(K, V),
keys_queue_in(K),
incr_cache_size().
update_acl(K, V) ->
erlang:put(K, V),
keys_queue_update(K).
cleanup_acl(KeysQ) ->
case queue:out(KeysQ) of
{{value, OldestK}, KeysQ2} ->
{_AclResult, CachedAt} = erlang:get(OldestK),
if_expired(CachedAt,
fun(false) -> KeysQ;
(true) ->
erlang:erase(OldestK),
decr_cache_size(),
cleanup_acl(KeysQ2)
end);
{empty, KeysQ} -> KeysQ
end.
incr_cache_size() ->
erlang:put(acl_cache_size, get_cache_size() + 1), ok.
decr_cache_size() ->
Size = get_cache_size(),
if Size > 1 ->
erlang:put(acl_cache_size, Size-1);
Size =< 1 ->
erlang:put(acl_cache_size, 0)
end, ok.
set_cache_size(N) ->
erlang:put(acl_cache_size, N), ok.
%%% Ordered Keys Q %%%
keys_queue_in(Key) ->
%% delete the key first if exists
KeysQ = keys_queue_get(),
keys_queue_set(queue:in(Key, KeysQ)).
keys_queue_out() ->
case queue:out(keys_queue_get()) of
{{value, OldestK}, Q2} ->
keys_queue_set(Q2), OldestK;
{empty, _Q} ->
undefined
end.
keys_queue_update(Key) ->
NewKeysQ = keys_queue_remove(Key, keys_queue_get()),
keys_queue_set(queue:in(Key, NewKeysQ)).
keys_queue_pick(Pick) ->
KeysQ = keys_queue_get(),
case queue:is_empty(KeysQ) of
true -> undefined;
false -> Pick(KeysQ)
end.
keys_queue_remove(Key, KeysQ) ->
queue:filter(fun
(K) when K =:= Key -> false; (_) -> true
end, KeysQ).
keys_queue_set(KeysQ) ->
erlang:put(acl_keys_q, KeysQ), ok.
keys_queue_get() ->
case erlang:get(acl_keys_q) of
undefined -> queue:new();
KeysQ -> KeysQ
end.
queue_front() -> fun queue:get/1.
queue_rear() -> fun queue:get_r/1.
time_now() -> erlang:system_time(millisecond).
if_expired(CachedAt, Fun) ->
TTL = application:get_env(emqx, acl_cache_ttl, 60000),
Now = time_now(),
if (CachedAt + TTL) =< Now ->
Fun(true);
true ->
Fun(false)
end.

View File

@ -103,12 +103,13 @@ reload_acl(#state{acl_file = undefined}) ->
ok;
reload_acl(State) ->
case catch load_rules_from_file(State) of
{'EXIT', Error} ->
{error, Error};
true -> ok
{'EXIT', Error} -> {error, Error};
#state{config=File} ->
io:format("reload acl_internal successfully: ~p~n", [File]),
ok
end.
-spec(description() -> string()).
description() ->
"Internal ACL with etc/acl.conf".

View File

@ -672,4 +672,3 @@ feed_var({<<"%u">>, Username}, MountPoint) ->
sp(true) -> 1;
sp(false) -> 0.

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqttc/include/emqttc_packet.hrl").
-include("emqx_mqtt.hrl").
-define(APP, emqx).
@ -79,7 +79,7 @@ mqtt_connect_with_tcp(_) ->
Packet = raw_send_serialise(?CLIENT),
gen_tcp:send(Sock, Packet),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data),
{ok, ?CONNACK_PACKET(0), _} = raw_recv_pase(Data),
gen_tcp:close(Sock).
mqtt_connect_with_ssl_oneway(_) ->
@ -133,7 +133,7 @@ mqtt_connect_with_ws(_Config) ->
Packet = raw_send_serialise(?CLIENT),
ok = rfc6455_client:send_binary(WS, Packet),
{binary, P} = rfc6455_client:recv(WS),
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(P),
{ok, ?CONNACK_PACKET(0), _} = raw_recv_pase(P),
{close, _} = rfc6455_client:close(WS),
ok.

View File

@ -22,34 +22,65 @@
-include("emqx.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(AC, emqx_access_control).
-define(CACHE, emqx_acl_cache).
-import(emqx_access_rule, [compile/1, match/3]).
all() ->
[{group, access_control},
{group, access_rule}].
{group, acl_cache},
{group, access_control_cache_mode},
{group, access_rule}
].
groups() ->
[{access_control, [sequence],
[reload_acl,
register_mod,
unregister_mod,
check_acl]},
[reload_acl,
register_mod,
unregister_mod,
check_acl_1,
check_acl_2
]},
{access_control_cache_mode, [],
[
acl_cache_basic,
acl_cache_expiry,
acl_cache_cleanup,
acl_cache_full
]},
{acl_cache, [], [
put_get_del_cache,
cache_update,
cache_expiry,
cache_replacement,
cache_cleanup,
cache_auto_emtpy,
cache_auto_cleanup
]},
{access_rule, [],
[compile_rule,
match_rule]}].
[compile_rule,
match_rule]}].
init_per_group(access_control, Config) ->
init_per_group(Group, Config) when Group =:= access_control;
Group =:= access_control_cache_mode ->
prepare_config(Group),
application:load(emqx),
prepare_config(),
Config;
init_per_group(_Group, Config) ->
Config.
prepare_config() ->
prepare_config(Group = access_control) ->
set_acl_config_file(Group),
application:set_env(emqx, enable_acl_cache, false);
prepare_config(Group = access_control_cache_mode) ->
set_acl_config_file(Group),
application:set_env(emqx, enable_acl_cache, true),
application:set_env(emqx, acl_cache_max_size, 100).
set_acl_config_file(_Group) ->
Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]},
{allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]},
{allow, {user, "admin"}, pubsub, ["a/b/c", "d/e/f/#"]},
@ -59,11 +90,8 @@ prepare_config() ->
{deny, all, subscribe, ["$SYS/#", "#"]},
{deny, all}],
write_config("access_SUITE_acl.conf", Rules),
Config = [{auth, anonymous, []},
{acl, internal, [{config, "access_SUITE_acl.conf"},
{nomatch, allow}]}],
write_config("access_SUITE_emqx.conf", Config),
application:set_env(emqx, conf, "access_SUITE_emqx.conf").
application:set_env(emqx, acl_file, "access_SUITE_acl.conf").
write_config(Filename, Terms) ->
file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]).
@ -71,24 +99,18 @@ write_config(Filename, Terms) ->
end_per_group(_Group, Config) ->
Config.
init_per_testcase(TestCase, Config) when TestCase =:= reload_acl;
TestCase =:= register_mod;
TestCase =:= unregister_mod;
TestCase =:= check_acl ->
{ok, _Pid} = ?AC:start_link(), Config;
init_per_testcase(_TestCase, Config) ->
{ok, _Pid} = ?AC:start_link(),
Config.
end_per_testcase(TestCase, _Config) when TestCase =:= reload_acl;
TestCase =:= register_mod;
TestCase =:= unregister_mod;
TestCase =:= check_acl ->
?AC:stop();
end_per_testcase(_TestCase, _Config) ->
ok.
per_testcase_config(acl_cache_full, Config) ->
Config;
per_testcase_config(_TestCase, Config) ->
Config.
%%--------------------------------------------------------------------
%% emqx_access_control
%%--------------------------------------------------------------------
@ -118,15 +140,196 @@ unregister_mod(_) ->
timer:sleep(5),
[] = ?AC:lookup_mods(auth).
check_acl(_) ->
User1 = #client{client_id = <<"client1">>, username = <<"testuser">>},
User2 = #client{client_id = <<"client2">>, username = <<"xyz">>},
allow = ?AC:check_acl(User1, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(User1, subscribe, <<"clients/client1">>),
allow = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>),
allow = ?AC:check_acl(User1, publish, <<"users/testuser/1">>),
allow = ?AC:check_acl(User1, subscribe, <<"a/b/c">>),
allow = ?AC:check_acl(User2, subscribe, <<"a/b/c">>).
check_acl_1(_) ->
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>),
allow = ?AC:check_acl(SelfUser, publish, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>).
check_acl_2(_) ->
SelfUser = #client{id = <<"client2">>, username = <<"xyz">>},
deny = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>).
acl_cache_basic(_) ->
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
ok.
acl_cache_expiry(_) ->
application:set_env(emqx, acl_cache_ttl, 100),
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
ct:sleep(150),
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
ok.
acl_cache_full(_) ->
application:set_env(emqx, acl_cache_max_size, 1),
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
%% the older ones (the <<"users/testuser/1">>) will be evicted first
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
ok.
acl_cache_cleanup(_) ->
%% The acl cache will try to evict memory, if the size is full and the newest
%% cache entry is expired
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 2),
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
ct:sleep(150),
%% now the cache is full and the newest one - "clients/client1"
%% should be expired, so we'll empty the cache before putting
%% the next cache entry
deny = ?AC:check_acl(SelfUser, subscribe, <<"#">>),
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
deny = ?CACHE:get_acl_cache(subscribe, <<"#">>),
ok.
put_get_del_cache(_) ->
application:set_env(emqx, acl_cache_ttl, 300000),
application:set_env(emqx, acl_cache_max_size, 30),
not_found = ?CACHE:get_acl_cache(publish, <<"a">>),
ok = ?CACHE:put_acl_cache(publish, <<"a">>, allow),
allow = ?CACHE:get_acl_cache(publish, <<"a">>),
not_found = ?CACHE:get_acl_cache(subscribe, <<"b">>),
ok = ?CACHE:put_acl_cache(subscribe, <<"b">>, deny),
deny = ?CACHE:get_acl_cache(subscribe, <<"b">>),
2 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(subscribe, <<"b">>), ?CACHE:get_newest_key()).
cache_expiry(_) ->
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 30),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
allow = ?CACHE:get_acl_cache(subscribe, <<"a">>),
ct:sleep(150),
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, deny),
deny = ?CACHE:get_acl_cache(subscribe, <<"a">>),
ct:sleep(150),
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>).
cache_update(_) ->
application:set_env(emqx, acl_cache_ttl, 300000),
application:set_env(emqx, acl_cache_max_size, 30),
[] = ?CACHE:dump_acl_cache(),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
3 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()),
%% update the 2nd one
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]),
3 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()),
?assertEqual(?CACHE:cache_k(subscribe, <<"a">>), ?CACHE:get_oldest_key()).
cache_replacement(_) ->
application:set_env(emqx, acl_cache_ttl, 300000),
application:set_env(emqx, acl_cache_max_size, 3),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
allow = ?CACHE:get_acl_cache(subscribe, <<"a">>),
allow = ?CACHE:get_acl_cache(publish, <<"b">>),
allow = ?CACHE:get_acl_cache(publish, <<"c">>),
3 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()),
ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny),
3 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(publish, <<"d">>), ?CACHE:get_newest_key()),
?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_oldest_key()),
ok = ?CACHE:put_acl_cache(publish, <<"e">>, deny),
3 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(publish, <<"e">>), ?CACHE:get_newest_key()),
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()),
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>),
not_found = ?CACHE:get_acl_cache(publish, <<"b">>),
allow = ?CACHE:get_acl_cache(publish, <<"c">>).
cache_cleanup(_) ->
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 30),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ct:sleep(150),
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
3 = ?CACHE:get_cache_size(),
?CACHE:cleanup_acl_cache(),
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()),
1 = ?CACHE:get_cache_size().
cache_auto_emtpy(_) ->
%% verify cache is emptied when cache full and even the newest
%% one is expired.
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 3),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
3 = ?CACHE:get_cache_size(),
ct:sleep(150),
ok = ?CACHE:put_acl_cache(subscribe, <<"d">>, deny),
1 = ?CACHE:get_cache_size().
cache_auto_cleanup(_) ->
%% verify we'll cleanup expired entries when we got a exipired acl
%% from cache.
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 30),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ct:sleep(150),
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny),
4 = ?CACHE:get_cache_size(),
%% "a" and "b" expires, while "c" and "d" not
not_found = ?CACHE:get_acl_cache(publish, <<"b">>),
2 = ?CACHE:get_cache_size(),
ct:sleep(150), %% now "c" and "d" expires
not_found = ?CACHE:get_acl_cache(publish, <<"c">>),
0 = ?CACHE:get_cache_size().
%%--------------------------------------------------------------------
%% emqx_access_rule
@ -159,9 +362,9 @@ compile_rule(_) ->
{deny, all} = compile({deny, all}).
match_rule(_) ->
User = #client{peername = {{127,0,0,1}, 2948}, client_id = <<"testClient">>, username = <<"TestUser">>},
User2 = #client{peername = {{192,168,0,10}, 3028}, client_id = <<"testClient">>, username = <<"TestUser">>},
User = #client{peername = {{127,0,0,1}, 2948}, id = <<"testClient">>, username = <<"TestUser">>},
User2 = #client{peername = {{192,168,0,10}, 3028}, id = <<"testClient">>, username = <<"TestUser">>},
{matched, allow} = match(User, <<"Test/Topic">>, {allow, all}),
{matched, deny} = match(User, <<"Test/Topic">>, {deny, all}),
{matched, allow} = match(User, <<"Test/Topic">>, compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]})),
@ -179,4 +382,3 @@ match_rule(_) ->
{matched, allow} = match(User, <<"Topic">>, AndRule),
OrRule = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}),
{matched, allow} = match(User, <<"Topic">>, OrRule).

View File

@ -149,7 +149,7 @@ start_session(_) ->
{ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>),
{ok, SessPid} = emqx_mock_client:start_session(ClientPid),
Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
Message1 = Message#mqtt_message{packet_id = 1},
Message1 = Message#message{id = 1},
emqx_session:publish(SessPid, Message1),
emqx_session:pubrel(SessPid, 1),
emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]),

View File

@ -35,22 +35,22 @@ t_in(_) ->
{store_qos0, true}],
Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()),
?assert(?Q:is_empty(Q)),
Q1 = ?Q:in(#mqtt_message{}, Q),
Q1 = ?Q:in(#message{}, Q),
?assertEqual(1, ?Q:len(Q1)),
Q2 = ?Q:in(#mqtt_message{qos = 1}, Q1),
Q2 = ?Q:in(#message{qos = 1}, Q1),
?assertEqual(2, ?Q:len(Q2)),
Q3 = ?Q:in(#mqtt_message{qos = 2}, Q2),
Q4 = ?Q:in(#mqtt_message{}, Q3),
Q5 = ?Q:in(#mqtt_message{}, Q4),
Q3 = ?Q:in(#message{qos = 2}, Q2),
Q4 = ?Q:in(#message{}, Q3),
Q5 = ?Q:in(#message{}, Q4),
?assertEqual(5, ?Q:len(Q5)).
t_in_qos0(_) ->
Opts = [{max_length, 5},
{store_qos0, false}],
Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()),
Q1 = ?Q:in(#mqtt_message{}, Q),
Q1 = ?Q:in(#message{}, Q),
?assert(?Q:is_empty(Q1)),
Q2 = ?Q:in(#mqtt_message{qos = 0}, Q1),
Q2 = ?Q:in(#message{qos = 0}, Q1),
?assert(?Q:is_empty(Q2)).
t_out(_) ->
@ -58,10 +58,10 @@ t_out(_) ->
{store_qos0, true}],
Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()),
{empty, Q} = ?Q:out(Q),
Q1 = ?Q:in(#mqtt_message{}, Q),
Q1 = ?Q:in(#message{}, Q),
{Value, Q2} = ?Q:out(Q1),
?assertEqual(0, ?Q:len(Q2)),
?assertEqual({value, #mqtt_message{}}, Value).
?assertEqual({value, #message{}}, Value).
t_simple_mqueue(_) ->
Opts = [{type, simple},
@ -74,13 +74,13 @@ t_simple_mqueue(_) ->
?assertEqual(3, ?Q:max_len(Q)),
?assertEqual(<<"simple_queue">>, ?Q:name(Q)),
?assert(?Q:is_empty(Q)),
Q1 = ?Q:in(#mqtt_message{qos = 1, payload = <<"1">>}, Q),
Q2 = ?Q:in(#mqtt_message{qos = 1, payload = <<"2">>}, Q1),
Q3 = ?Q:in(#mqtt_message{qos = 1, payload = <<"3">>}, Q2),
Q4 = ?Q:in(#mqtt_message{qos = 1, payload = <<"4">>}, Q3),
Q1 = ?Q:in(#message{qos = 1, payload = <<"1">>}, Q),
Q2 = ?Q:in(#message{qos = 1, payload = <<"2">>}, Q1),
Q3 = ?Q:in(#message{qos = 1, payload = <<"3">>}, Q2),
Q4 = ?Q:in(#message{qos = 1, payload = <<"4">>}, Q3),
?assertEqual(3, ?Q:len(Q4)),
{{value, Msg}, Q5} = ?Q:out(Q4),
?assertEqual(<<"2">>, Msg#mqtt_message.payload),
?assertEqual(<<"2">>, Msg#message.payload),
?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)).
t_infinity_simple_mqueue(_) ->
@ -93,12 +93,12 @@ t_infinity_simple_mqueue(_) ->
?assert(?Q:is_empty(Q)),
?assertEqual(0, ?Q:max_len(Q)),
Qx = lists:foldl(fun(I, AccQ) ->
?Q:in(#mqtt_message{qos = 1, payload = iolist_to_binary([I])}, AccQ)
?Q:in(#message{qos = 1, payload = iolist_to_binary([I])}, AccQ)
end, Q, lists:seq(1, 255)),
?assertEqual(255, ?Q:len(Qx)),
?assertEqual([{len, 255}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)),
{{value, V}, _Qy} = ?Q:out(Qx),
?assertEqual(<<1>>, V#mqtt_message.payload).
?assertEqual(<<1>>, V#message.payload).
t_priority_mqueue(_) ->
Opts = [{type, priority},
@ -113,18 +113,18 @@ t_priority_mqueue(_) ->
?assertEqual(<<"priority_queue">>, ?Q:name(Q)),
?assert(?Q:is_empty(Q)),
Q1 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q),
Q2 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t">>}, Q1),
Q3 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t2">>}, Q2),
Q1 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q),
Q2 = ?Q:in(#message{qos = 1, topic = <<"t">>}, Q1),
Q3 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q2),
?assertEqual(3, ?Q:len(Q3)),
Q4 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q3),
Q4 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q3),
?assertEqual(4, ?Q:len(Q4)),
Q5 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q4),
Q5 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q4),
?assertEqual(5, ?Q:len(Q5)),
Q6 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q5),
Q6 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q5),
?assertEqual(5, ?Q:len(Q6)),
{{value, Msg}, _Q7} = ?Q:out(Q6),
?assertEqual(<<"t">>, Msg#mqtt_message.topic).
?assertEqual(<<"t">>, Msg#message.topic).
t_infinity_priority_mqueue(_) ->
Opts = [{type, priority},
@ -135,8 +135,8 @@ t_infinity_priority_mqueue(_) ->
?assertEqual(0, ?Q:max_len(Q)),
Qx = lists:foldl(fun(I, AccQ) ->
AccQ1 =
?Q:in(#mqtt_message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ),
?Q:in(#mqtt_message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1)
?Q:in(#message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ),
?Q:in(#message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1)
end, Q, lists:seq(1, 255)),
?assertEqual(510, ?Q:len(Qx)),
?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)).
@ -149,10 +149,10 @@ t_priority_mqueue2(_) ->
{store_qos0, false}],
Q = ?Q:new("priority_queue2_test", Opts, alarm_fun()),
?assertEqual(2, ?Q:max_len(Q)),
Q1 = ?Q:in(#mqtt_message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q),
Q2 = ?Q:in(#mqtt_message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1),
Q3 = ?Q:in(#mqtt_message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2),
Q4 = ?Q:in(#mqtt_message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3),
Q1 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q),
Q2 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1),
Q3 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2),
Q4 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3),
?assertEqual(4, ?Q:len(Q4)),
{{value, _Val}, Q5} = ?Q:out(Q4),
?assertEqual(3, ?Q:len(Q5)).