Merge pull request #12289 from zmstone/0108-support-acl-cache-masks
0108 support acl cache excludes
This commit is contained in:
commit
7c8a36fc06
2
Makefile
2
Makefile
|
@ -317,7 +317,7 @@ $(foreach tt,$(ALL_ELIXIR_TGZS),$(eval $(call gen-elixir-tgz-target,$(tt))))
|
||||||
fmt: $(REBAR)
|
fmt: $(REBAR)
|
||||||
@$(SCRIPTS)/erlfmt -w 'apps/*/{src,include,priv,test,integration_test}/**/*.{erl,hrl,app.src,eterm}'
|
@$(SCRIPTS)/erlfmt -w 'apps/*/{src,include,priv,test,integration_test}/**/*.{erl,hrl,app.src,eterm}'
|
||||||
@$(SCRIPTS)/erlfmt -w '**/*.escript' --exclude-files '_build/**'
|
@$(SCRIPTS)/erlfmt -w '**/*.escript' --exclude-files '_build/**'
|
||||||
@$(SCRIPTS)/erlfmt -w '**/rebar.config'
|
@$(SCRIPTS)/erlfmt -w '**/rebar.config' --exclude-files '_build/**'
|
||||||
@$(SCRIPTS)/erlfmt -w 'rebar.config.erl'
|
@$(SCRIPTS)/erlfmt -w 'rebar.config.erl'
|
||||||
@$(SCRIPTS)/erlfmt -w 'bin/nodetool'
|
@$(SCRIPTS)/erlfmt -w 'bin/nodetool'
|
||||||
@mix format
|
@mix format
|
||||||
|
|
|
@ -102,7 +102,7 @@ authorize(ClientInfo, Action, <<"$delayed/", Data/binary>> = RawTopic) ->
|
||||||
end;
|
end;
|
||||||
authorize(ClientInfo, Action, Topic) ->
|
authorize(ClientInfo, Action, Topic) ->
|
||||||
Result =
|
Result =
|
||||||
case emqx_authz_cache:is_enabled() of
|
case emqx_authz_cache:is_enabled(Topic) of
|
||||||
true -> check_authorization_cache(ClientInfo, Action, Topic);
|
true -> check_authorization_cache(ClientInfo, Action, Topic);
|
||||||
false -> do_authorize(ClientInfo, Action, Topic)
|
false -> do_authorize(ClientInfo, Action, Topic)
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -24,10 +24,9 @@
|
||||||
put_authz_cache/3,
|
put_authz_cache/3,
|
||||||
cleanup_authz_cache/0,
|
cleanup_authz_cache/0,
|
||||||
empty_authz_cache/0,
|
empty_authz_cache/0,
|
||||||
dump_authz_cache/0,
|
|
||||||
get_cache_max_size/0,
|
get_cache_max_size/0,
|
||||||
get_cache_ttl/0,
|
get_cache_ttl/0,
|
||||||
is_enabled/0,
|
is_enabled/1,
|
||||||
drain_cache/0,
|
drain_cache/0,
|
||||||
drain_cache/1
|
drain_cache/1
|
||||||
]).
|
]).
|
||||||
|
@ -53,9 +52,20 @@ cache_k(PubSub, Topic) -> {PubSub, Topic}.
|
||||||
cache_v(AuthzResult) -> {AuthzResult, time_now()}.
|
cache_v(AuthzResult) -> {AuthzResult, time_now()}.
|
||||||
drain_k() -> {?MODULE, drain_timestamp}.
|
drain_k() -> {?MODULE, drain_timestamp}.
|
||||||
|
|
||||||
-spec is_enabled() -> boolean().
|
%% @doc Check if the authz cache is enabled for the given topic.
|
||||||
is_enabled() ->
|
-spec is_enabled(emqx_types:topic()) -> boolean().
|
||||||
emqx:get_config([authorization, cache, enable], false).
|
is_enabled(Topic) ->
|
||||||
|
case emqx:get_config([authorization, cache]) of
|
||||||
|
#{enable := true, excludes := Filters} ->
|
||||||
|
not is_excluded(Topic, Filters);
|
||||||
|
#{enable := IsEnabled} ->
|
||||||
|
IsEnabled
|
||||||
|
end.
|
||||||
|
|
||||||
|
is_excluded(_Topic, []) ->
|
||||||
|
false;
|
||||||
|
is_excluded(Topic, [Filter | Filters]) ->
|
||||||
|
emqx_topic:match(Topic, Filter) orelse is_excluded(Topic, Filters).
|
||||||
|
|
||||||
-spec get_cache_max_size() -> integer().
|
-spec get_cache_max_size() -> integer().
|
||||||
get_cache_max_size() ->
|
get_cache_max_size() ->
|
||||||
|
@ -153,14 +163,15 @@ get_cache_size() ->
|
||||||
Size -> Size
|
Size -> Size
|
||||||
end.
|
end.
|
||||||
|
|
||||||
dump_authz_cache() ->
|
|
||||||
map_authz_cache(fun(Cache) -> Cache end).
|
|
||||||
|
|
||||||
map_authz_cache(Fun) ->
|
map_authz_cache(Fun) ->
|
||||||
|
map_authz_cache(Fun, erlang:get()).
|
||||||
|
|
||||||
|
map_authz_cache(Fun, Dict) ->
|
||||||
[
|
[
|
||||||
Fun(R)
|
Fun(R)
|
||||||
|| R = {{?authz_action, _T}, _Authz} <- erlang:get()
|
|| R = {{?authz_action, _T}, _Authz} <- Dict
|
||||||
].
|
].
|
||||||
|
|
||||||
foreach_authz_cache(Fun) ->
|
foreach_authz_cache(Fun) ->
|
||||||
_ = map_authz_cache(Fun),
|
_ = map_authz_cache(Fun),
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -2317,7 +2317,7 @@ shutdown(Reason, Reply, Packet, Channel) ->
|
||||||
{shutdown, Reason, Reply, Packet, Channel}.
|
{shutdown, Reason, Reply, Packet, Channel}.
|
||||||
|
|
||||||
%% process exits with {shutdown, #{shutdown_count := Kind}} will trigger
|
%% process exits with {shutdown, #{shutdown_count := Kind}} will trigger
|
||||||
%% make the connection supervisor (esockd) keep a shutdown-counter groupd by Kind
|
%% the connection supervisor (esockd) to keep a shutdown-counter grouped by Kind
|
||||||
shutdown_count(Kind, Reason) when is_map(Reason) ->
|
shutdown_count(Kind, Reason) when is_map(Reason) ->
|
||||||
Reason#{shutdown_count => Kind};
|
Reason#{shutdown_count => Kind};
|
||||||
shutdown_count(Kind, Reason) ->
|
shutdown_count(Kind, Reason) ->
|
||||||
|
|
|
@ -439,9 +439,9 @@ fields("stats") ->
|
||||||
];
|
];
|
||||||
fields("authorization") ->
|
fields("authorization") ->
|
||||||
authz_fields();
|
authz_fields();
|
||||||
fields("authz_cache") ->
|
fields(authz_cache) ->
|
||||||
[
|
[
|
||||||
{"enable",
|
{enable,
|
||||||
sc(
|
sc(
|
||||||
boolean(),
|
boolean(),
|
||||||
#{
|
#{
|
||||||
|
@ -450,7 +450,7 @@ fields("authz_cache") ->
|
||||||
desc => ?DESC(fields_cache_enable)
|
desc => ?DESC(fields_cache_enable)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{"max_size",
|
{max_size,
|
||||||
sc(
|
sc(
|
||||||
range(1, 1048576),
|
range(1, 1048576),
|
||||||
#{
|
#{
|
||||||
|
@ -458,14 +458,19 @@ fields("authz_cache") ->
|
||||||
desc => ?DESC(fields_cache_max_size)
|
desc => ?DESC(fields_cache_max_size)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{"ttl",
|
{ttl,
|
||||||
sc(
|
sc(
|
||||||
duration(),
|
duration(),
|
||||||
#{
|
#{
|
||||||
default => <<"1m">>,
|
default => <<"1m">>,
|
||||||
desc => ?DESC(fields_cache_ttl)
|
desc => ?DESC(fields_cache_ttl)
|
||||||
}
|
}
|
||||||
)}
|
)},
|
||||||
|
{excludes,
|
||||||
|
sc(hoconsc:array(string()), #{
|
||||||
|
default => [],
|
||||||
|
desc => ?DESC(fields_authz_cache_excludes)
|
||||||
|
})}
|
||||||
];
|
];
|
||||||
fields("mqtt") ->
|
fields("mqtt") ->
|
||||||
mqtt_general() ++ mqtt_session();
|
mqtt_general() ++ mqtt_session();
|
||||||
|
@ -1994,7 +1999,7 @@ desc("authorization") ->
|
||||||
"Settings for client authorization.";
|
"Settings for client authorization.";
|
||||||
desc("mqtt") ->
|
desc("mqtt") ->
|
||||||
"Global MQTT configuration.";
|
"Global MQTT configuration.";
|
||||||
desc("authz_cache") ->
|
desc(authz_cache) ->
|
||||||
"Settings for the authorization cache.";
|
"Settings for the authorization cache.";
|
||||||
desc("zone") ->
|
desc("zone") ->
|
||||||
"A `Zone` defines a set of configuration items (such as the maximum number of connections)"
|
"A `Zone` defines a set of configuration items (such as the maximum number of connections)"
|
||||||
|
@ -2556,7 +2561,7 @@ authz_fields() ->
|
||||||
)},
|
)},
|
||||||
{"cache",
|
{"cache",
|
||||||
sc(
|
sc(
|
||||||
ref(?MODULE, "authz_cache"),
|
ref(?MODULE, authz_cache),
|
||||||
#{}
|
#{}
|
||||||
)}
|
)}
|
||||||
].
|
].
|
||||||
|
|
|
@ -20,11 +20,13 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
|
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
|
||||||
|
emqx_config:put([authorization, cache, excludes], [<<"nocache/#">>]),
|
||||||
[{apps, Apps} | Config].
|
[{apps, Apps} | Config].
|
||||||
|
|
||||||
end_per_suite(Config) ->
|
end_per_suite(Config) ->
|
||||||
|
@ -34,24 +36,27 @@ end_per_suite(Config) ->
|
||||||
%% Test cases
|
%% Test cases
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_cache_exclude(_) ->
|
||||||
|
ClientId = <<"test-id1">>,
|
||||||
|
{ok, Client} = emqtt:start_link([{clientid, ClientId}]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, <<"nocache/+/#">>, 0),
|
||||||
|
emqtt:publish(Client, <<"nocache/1">>, <<"{\"x\":1}">>, 0),
|
||||||
|
Caches = list_cache(ClientId),
|
||||||
|
?assertEqual([], Caches),
|
||||||
|
emqtt:stop(Client).
|
||||||
|
|
||||||
t_clean_authz_cache(_) ->
|
t_clean_authz_cache(_) ->
|
||||||
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]),
|
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||||
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
|
||||||
ct:sleep(100),
|
ClientPid = find_client_pid(<<"emqx_c">>),
|
||||||
ClientPid =
|
Caches = list_cache(ClientPid),
|
||||||
case emqx_cm:lookup_channels(<<"emqx_c">>) of
|
|
||||||
Pids when is_list(Pids) ->
|
|
||||||
lists:last(Pids);
|
|
||||||
_ ->
|
|
||||||
{error, not_found}
|
|
||||||
end,
|
|
||||||
Caches = gen_server:call(ClientPid, list_authz_cache),
|
|
||||||
ct:log("authz caches: ~p", [Caches]),
|
ct:log("authz caches: ~p", [Caches]),
|
||||||
?assert(length(Caches) > 0),
|
?assert(length(Caches) > 0),
|
||||||
erlang:send(ClientPid, clean_authz_cache),
|
erlang:send(ClientPid, clean_authz_cache),
|
||||||
?assertEqual(0, length(gen_server:call(ClientPid, list_authz_cache))),
|
?assertEqual([], list_cache(ClientPid)),
|
||||||
emqtt:stop(Client).
|
emqtt:stop(Client).
|
||||||
|
|
||||||
t_drain_authz_cache(_) ->
|
t_drain_authz_cache(_) ->
|
||||||
|
@ -59,22 +64,30 @@ t_drain_authz_cache(_) ->
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||||
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
|
||||||
ct:sleep(100),
|
ClientPid = find_client_pid(<<"emqx_c">>),
|
||||||
ClientPid =
|
Caches = list_cache(ClientPid),
|
||||||
case emqx_cm:lookup_channels(<<"emqx_c">>) of
|
|
||||||
[Pid] when is_pid(Pid) ->
|
|
||||||
Pid;
|
|
||||||
Pids when is_list(Pids) ->
|
|
||||||
lists:last(Pids);
|
|
||||||
_ ->
|
|
||||||
{error, not_found}
|
|
||||||
end,
|
|
||||||
Caches = gen_server:call(ClientPid, list_authz_cache),
|
|
||||||
ct:log("authz caches: ~p", [Caches]),
|
ct:log("authz caches: ~p", [Caches]),
|
||||||
?assert(length(Caches) > 0),
|
?assert(length(Caches) > 0),
|
||||||
emqx_authz_cache:drain_cache(),
|
emqx_authz_cache:drain_cache(),
|
||||||
?assertEqual(0, length(gen_server:call(ClientPid, list_authz_cache))),
|
?assertEqual([], list_cache(ClientPid)),
|
||||||
ct:sleep(100),
|
ct:sleep(100),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||||
?assert(length(gen_server:call(ClientPid, list_authz_cache)) > 0),
|
?assert(length(list_cache(ClientPid)) > 0),
|
||||||
emqtt:stop(Client).
|
emqtt:stop(Client).
|
||||||
|
|
||||||
|
list_cache(ClientId) when is_binary(ClientId) ->
|
||||||
|
ClientPid = find_client_pid(ClientId),
|
||||||
|
list_cache(ClientPid);
|
||||||
|
list_cache(ClientPid) ->
|
||||||
|
gen_server:call(ClientPid, list_authz_cache).
|
||||||
|
|
||||||
|
find_client_pid(ClientId) ->
|
||||||
|
?retry(_Inteval = 100, _Attempts = 10, do_find_client_pid(ClientId)).
|
||||||
|
|
||||||
|
do_find_client_pid(ClientId) ->
|
||||||
|
case emqx_cm:lookup_channels(ClientId) of
|
||||||
|
Pids when is_list(Pids) ->
|
||||||
|
lists:last(Pids);
|
||||||
|
_ ->
|
||||||
|
throw({not_found, ClientId})
|
||||||
|
end.
|
||||||
|
|
|
@ -70,7 +70,8 @@ t_api(_) ->
|
||||||
<<"cache">> => #{
|
<<"cache">> => #{
|
||||||
<<"enable">> => false,
|
<<"enable">> => false,
|
||||||
<<"max_size">> => 32,
|
<<"max_size">> => 32,
|
||||||
<<"ttl">> => <<"60s">>
|
<<"ttl">> => <<"60s">>,
|
||||||
|
<<"excludes">> => [<<"nocache/#">>]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -90,7 +91,9 @@ t_api(_) ->
|
||||||
|
|
||||||
{ok, 200, Result2} = request(put, uri(["authorization", "settings"]), Settings2),
|
{ok, 200, Result2} = request(put, uri(["authorization", "settings"]), Settings2),
|
||||||
{ok, 200, Result2} = request(get, uri(["authorization", "settings"]), []),
|
{ok, 200, Result2} = request(get, uri(["authorization", "settings"]), []),
|
||||||
?assertEqual(Settings2, emqx_utils_json:decode(Result2)),
|
Cache = maps:get(<<"cache">>, Settings2),
|
||||||
|
ExpectedSettings2 = Settings2#{<<"cache">> => Cache#{<<"excludes">> => []}},
|
||||||
|
?assertEqual(ExpectedSettings2, emqx_utils_json:decode(Result2)),
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
@ -86,8 +87,11 @@ t_clients(_) ->
|
||||||
timeout
|
timeout
|
||||||
end,
|
end,
|
||||||
?assertEqual(ok, Kick),
|
?assertEqual(ok, Kick),
|
||||||
|
%% Client info is cleared after DOWN event
|
||||||
|
?retry(_Interval = 100, _Attempts = 5, begin
|
||||||
AfterKickoutResponse2 = emqx_mgmt_api_test_util:request_api(get, Client2Path),
|
AfterKickoutResponse2 = emqx_mgmt_api_test_util:request_api(get, Client2Path),
|
||||||
?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse2),
|
?assertEqual(AfterKickoutResponse2, {error, {"HTTP/1.1", 404, "Not Found"}})
|
||||||
|
end),
|
||||||
|
|
||||||
%% get /clients/:clientid/authorization/cache should have no authz cache
|
%% get /clients/:clientid/authorization/cache should have no authz cache
|
||||||
Client1AuthzCachePath = emqx_mgmt_api_test_util:api_path([
|
Client1AuthzCachePath = emqx_mgmt_api_test_util:api_path([
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
Add new config `authorization.cache.excludes` to support ACL cache exclusion.
|
||||||
|
|
||||||
|
When configured with a list of topic-filters, the publish or subscribe permission check results for a matching topic or topic filter will not be cached.
|
|
@ -920,6 +920,12 @@ fields_cache_ttl.desc:
|
||||||
fields_cache_ttl.label:
|
fields_cache_ttl.label:
|
||||||
"""Time to live for the cached data."""
|
"""Time to live for the cached data."""
|
||||||
|
|
||||||
|
fields_authz_cache_excludes.label:
|
||||||
|
"""Excludes"""
|
||||||
|
|
||||||
|
fields_authz_cache_excludes.desc:
|
||||||
|
"""Exclude caching ACL check results for topics matching the given patterns."""
|
||||||
|
|
||||||
sys_topics.desc:
|
sys_topics.desc:
|
||||||
"""System topics configuration."""
|
"""System topics configuration."""
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue