From 98e94d8619b07ea81580030ae9d46ed3cff2528c Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 9 Jan 2024 19:16:05 +0100 Subject: [PATCH 1/3] docs: fix typo in comments --- apps/emqx/src/emqx_channel.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 92715cf80..cf519fd5d 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -2317,7 +2317,7 @@ shutdown(Reason, Reply, Packet, Channel) -> {shutdown, Reason, Reply, Packet, Channel}. %% process exits with {shutdown, #{shutdown_count := Kind}} will trigger -%% make the connection supervisor (esockd) keep a shutdown-counter groupd by Kind +%% the connection supervisor (esockd) to keep a shutdown-counter grouped by Kind shutdown_count(Kind, Reason) when is_map(Reason) -> Reason#{shutdown_count => Kind}; shutdown_count(Kind, Reason) -> From 9e8a67fd68a003b3f1f811bbefd0fe14770f95a5 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 8 Jan 2024 10:45:14 +0100 Subject: [PATCH 2/3] feat: support authz cache exclusion config now one can configure a list of topic-filters to avoid caching ACL check results for example authorization.cache.excludes = ["nocache/#"] this means ACL check results for topics having 'nocache/' prefix will not be cached --- Makefile | 2 +- apps/emqx/src/emqx_access_control.erl | 2 +- apps/emqx/src/emqx_authz_cache.erl | 29 ++++++--- apps/emqx/src/emqx_schema.erl | 19 +++--- apps/emqx/test/emqx_authz_cache_SUITE.erl | 59 +++++++++++-------- .../emqx_authz_api_settings_SUITE.erl | 7 ++- changes/ce/feat-12289.en.md | 3 + rel/i18n/emqx_schema.hocon | 6 ++ 8 files changed, 84 insertions(+), 43 deletions(-) create mode 100644 changes/ce/feat-12289.en.md diff --git a/Makefile b/Makefile index 8328f9add..facba0c32 100644 --- a/Makefile +++ b/Makefile @@ -317,7 +317,7 @@ $(foreach tt,$(ALL_ELIXIR_TGZS),$(eval $(call gen-elixir-tgz-target,$(tt)))) fmt: $(REBAR) @$(SCRIPTS)/erlfmt -w 'apps/*/{src,include,priv,test,integration_test}/**/*.{erl,hrl,app.src,eterm}' @$(SCRIPTS)/erlfmt -w '**/*.escript' --exclude-files '_build/**' - @$(SCRIPTS)/erlfmt -w '**/rebar.config' + @$(SCRIPTS)/erlfmt -w '**/rebar.config' --exclude-files '_build/**' @$(SCRIPTS)/erlfmt -w 'rebar.config.erl' @$(SCRIPTS)/erlfmt -w 'bin/nodetool' @mix format diff --git a/apps/emqx/src/emqx_access_control.erl b/apps/emqx/src/emqx_access_control.erl index ce594a15f..983d78a64 100644 --- a/apps/emqx/src/emqx_access_control.erl +++ b/apps/emqx/src/emqx_access_control.erl @@ -102,7 +102,7 @@ authorize(ClientInfo, Action, <<"$delayed/", Data/binary>> = RawTopic) -> end; authorize(ClientInfo, Action, Topic) -> Result = - case emqx_authz_cache:is_enabled() of + case emqx_authz_cache:is_enabled(Topic) of true -> check_authorization_cache(ClientInfo, Action, Topic); false -> do_authorize(ClientInfo, Action, Topic) end, diff --git a/apps/emqx/src/emqx_authz_cache.erl b/apps/emqx/src/emqx_authz_cache.erl index af19ecf8f..016c720ed 100644 --- a/apps/emqx/src/emqx_authz_cache.erl +++ b/apps/emqx/src/emqx_authz_cache.erl @@ -24,10 +24,9 @@ put_authz_cache/3, cleanup_authz_cache/0, empty_authz_cache/0, - dump_authz_cache/0, get_cache_max_size/0, get_cache_ttl/0, - is_enabled/0, + is_enabled/1, drain_cache/0, drain_cache/1 ]). @@ -53,9 +52,20 @@ cache_k(PubSub, Topic) -> {PubSub, Topic}. cache_v(AuthzResult) -> {AuthzResult, time_now()}. drain_k() -> {?MODULE, drain_timestamp}. --spec is_enabled() -> boolean(). -is_enabled() -> - emqx:get_config([authorization, cache, enable], false). +%% @doc Check if the authz cache is enabled for the given topic. +-spec is_enabled(emqx_types:topic()) -> boolean(). +is_enabled(Topic) -> + case emqx:get_config([authorization, cache]) of + #{enable := true, excludes := Filters} -> + not is_excluded(Topic, Filters); + #{enable := IsEnabled} -> + IsEnabled + end. + +is_excluded(_Topic, []) -> + false; +is_excluded(Topic, [Filter | Filters]) -> + emqx_topic:match(Topic, Filter) orelse is_excluded(Topic, Filters). -spec get_cache_max_size() -> integer(). get_cache_max_size() -> @@ -153,14 +163,15 @@ get_cache_size() -> Size -> Size end. -dump_authz_cache() -> - map_authz_cache(fun(Cache) -> Cache end). - map_authz_cache(Fun) -> + map_authz_cache(Fun, erlang:get()). + +map_authz_cache(Fun, Dict) -> [ Fun(R) - || R = {{?authz_action, _T}, _Authz} <- erlang:get() + || R = {{?authz_action, _T}, _Authz} <- Dict ]. + foreach_authz_cache(Fun) -> _ = map_authz_cache(Fun), ok. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 2b795d85d..e972c57e0 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -439,9 +439,9 @@ fields("stats") -> ]; fields("authorization") -> authz_fields(); -fields("authz_cache") -> +fields(authz_cache) -> [ - {"enable", + {enable, sc( boolean(), #{ @@ -450,7 +450,7 @@ fields("authz_cache") -> desc => ?DESC(fields_cache_enable) } )}, - {"max_size", + {max_size, sc( range(1, 1048576), #{ @@ -458,14 +458,19 @@ fields("authz_cache") -> desc => ?DESC(fields_cache_max_size) } )}, - {"ttl", + {ttl, sc( duration(), #{ default => <<"1m">>, desc => ?DESC(fields_cache_ttl) } - )} + )}, + {excludes, + sc(hoconsc:array(string()), #{ + default => [], + desc => ?DESC(fields_authz_cache_excludes) + })} ]; fields("mqtt") -> mqtt_general() ++ mqtt_session(); @@ -1994,7 +1999,7 @@ desc("authorization") -> "Settings for client authorization."; desc("mqtt") -> "Global MQTT configuration."; -desc("authz_cache") -> +desc(authz_cache) -> "Settings for the authorization cache."; desc("zone") -> "A `Zone` defines a set of configuration items (such as the maximum number of connections)" @@ -2556,7 +2561,7 @@ authz_fields() -> )}, {"cache", sc( - ref(?MODULE, "authz_cache"), + ref(?MODULE, authz_cache), #{} )} ]. diff --git a/apps/emqx/test/emqx_authz_cache_SUITE.erl b/apps/emqx/test/emqx_authz_cache_SUITE.erl index 53e38c8f1..e2baad9d1 100644 --- a/apps/emqx/test/emqx_authz_cache_SUITE.erl +++ b/apps/emqx/test/emqx_authz_cache_SUITE.erl @@ -20,11 +20,13 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}), + emqx_config:put([authorization, cache, excludes], [<<"nocache/#">>]), [{apps, Apps} | Config]. end_per_suite(Config) -> @@ -34,24 +36,27 @@ end_per_suite(Config) -> %% Test cases %%-------------------------------------------------------------------- +t_cache_exclude(_) -> + ClientId = <<"test-id1">>, + {ok, Client} = emqtt:start_link([{clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, <<"nocache/+/#">>, 0), + emqtt:publish(Client, <<"nocache/1">>, <<"{\"x\":1}">>, 0), + Caches = list_cache(ClientId), + ?assertEqual([], Caches), + emqtt:stop(Client). + t_clean_authz_cache(_) -> {ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0), - ct:sleep(100), - ClientPid = - case emqx_cm:lookup_channels(<<"emqx_c">>) of - Pids when is_list(Pids) -> - lists:last(Pids); - _ -> - {error, not_found} - end, - Caches = gen_server:call(ClientPid, list_authz_cache), + ClientPid = find_client_pid(<<"emqx_c">>), + Caches = list_cache(ClientPid), ct:log("authz caches: ~p", [Caches]), ?assert(length(Caches) > 0), erlang:send(ClientPid, clean_authz_cache), - ?assertEqual(0, length(gen_server:call(ClientPid, list_authz_cache))), + ?assertEqual([], list_cache(ClientPid)), emqtt:stop(Client). t_drain_authz_cache(_) -> @@ -59,22 +64,30 @@ t_drain_authz_cache(_) -> {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0), - ct:sleep(100), - ClientPid = - case emqx_cm:lookup_channels(<<"emqx_c">>) of - [Pid] when is_pid(Pid) -> - Pid; - Pids when is_list(Pids) -> - lists:last(Pids); - _ -> - {error, not_found} - end, - Caches = gen_server:call(ClientPid, list_authz_cache), + ClientPid = find_client_pid(<<"emqx_c">>), + Caches = list_cache(ClientPid), ct:log("authz caches: ~p", [Caches]), ?assert(length(Caches) > 0), emqx_authz_cache:drain_cache(), - ?assertEqual(0, length(gen_server:call(ClientPid, list_authz_cache))), + ?assertEqual([], list_cache(ClientPid)), ct:sleep(100), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), - ?assert(length(gen_server:call(ClientPid, list_authz_cache)) > 0), + ?assert(length(list_cache(ClientPid)) > 0), emqtt:stop(Client). + +list_cache(ClientId) when is_binary(ClientId) -> + ClientPid = find_client_pid(ClientId), + list_cache(ClientPid); +list_cache(ClientPid) -> + gen_server:call(ClientPid, list_authz_cache). + +find_client_pid(ClientId) -> + ?retry(_Inteval = 100, _Attempts = 10, do_find_client_pid(ClientId)). + +do_find_client_pid(ClientId) -> + case emqx_cm:lookup_channels(ClientId) of + Pids when is_list(Pids) -> + lists:last(Pids); + _ -> + throw({not_found, ClientId}) + end. diff --git a/apps/emqx_auth/test/emqx_authz/emqx_authz_api_settings_SUITE.erl b/apps/emqx_auth/test/emqx_authz/emqx_authz_api_settings_SUITE.erl index ccbe0298b..f5e7eae95 100644 --- a/apps/emqx_auth/test/emqx_authz/emqx_authz_api_settings_SUITE.erl +++ b/apps/emqx_auth/test/emqx_authz/emqx_authz_api_settings_SUITE.erl @@ -70,7 +70,8 @@ t_api(_) -> <<"cache">> => #{ <<"enable">> => false, <<"max_size">> => 32, - <<"ttl">> => <<"60s">> + <<"ttl">> => <<"60s">>, + <<"excludes">> => [<<"nocache/#">>] } }, @@ -90,7 +91,9 @@ t_api(_) -> {ok, 200, Result2} = request(put, uri(["authorization", "settings"]), Settings2), {ok, 200, Result2} = request(get, uri(["authorization", "settings"]), []), - ?assertEqual(Settings2, emqx_utils_json:decode(Result2)), + Cache = maps:get(<<"cache">>, Settings2), + ExpectedSettings2 = Settings2#{<<"cache">> => Cache#{<<"excludes">> => []}}, + ?assertEqual(ExpectedSettings2, emqx_utils_json:decode(Result2)), ok. diff --git a/changes/ce/feat-12289.en.md b/changes/ce/feat-12289.en.md new file mode 100644 index 000000000..dcab2bfc1 --- /dev/null +++ b/changes/ce/feat-12289.en.md @@ -0,0 +1,3 @@ +Add new config `authorization.cache.excludes` to support ACL cache exclusion. + +When configured with a list of topic-filters, the publish or subscribe permission check results for a matching topic or topic filter will not be cached. diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index ea2847341..84305317f 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -920,6 +920,12 @@ fields_cache_ttl.desc: fields_cache_ttl.label: """Time to live for the cached data.""" +fields_authz_cache_excludes.label: +"""Excludes""" + +fields_authz_cache_excludes.desc: +"""Exclude caching ACL check results for topics matching the given patterns.""" + sys_topics.desc: """System topics configuration.""" From 6fcb2fdb054ac67d298c47f912f775a890fe361a Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 10 Jan 2024 11:47:51 +0100 Subject: [PATCH 3/3] test: fix test case flakyness --- apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 32fbfdee5..ece1649bf 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -18,6 +18,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -86,8 +87,11 @@ t_clients(_) -> timeout end, ?assertEqual(ok, Kick), - AfterKickoutResponse2 = emqx_mgmt_api_test_util:request_api(get, Client2Path), - ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse2), + %% Client info is cleared after DOWN event + ?retry(_Interval = 100, _Attempts = 5, begin + AfterKickoutResponse2 = emqx_mgmt_api_test_util:request_api(get, Client2Path), + ?assertEqual(AfterKickoutResponse2, {error, {"HTTP/1.1", 404, "Not Found"}}) + end), %% get /clients/:clientid/authorization/cache should have no authz cache Client1AuthzCachePath = emqx_mgmt_api_test_util:api_path([