From 09d524144b412d262e060266b4fc0e9762b89766 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 11 Jan 2024 13:28:21 +0300 Subject: [PATCH] fix(retainer): fix topic search by index --- .../emqx_retainer/src/emqx_retainer_index.erl | 27 ++- .../src/emqx_retainer_mnesia.erl | 13 +- .../test/emqx_retainer_api_SUITE.erl | 221 ++++++++++++------ .../test/emqx_retainer_index_SUITE.erl | 71 +++++- .../test/props/prop_emqx_retainer_index.erl | 106 +++++++++ changes/ce/fix-12303.en.md | 1 + 6 files changed, 346 insertions(+), 93 deletions(-) create mode 100644 apps/emqx_retainer/test/props/prop_emqx_retainer_index.erl create mode 100644 changes/ce/fix-12303.en.md diff --git a/apps/emqx_retainer/src/emqx_retainer_index.erl b/apps/emqx_retainer/src/emqx_retainer_index.erl index ec49e7a65..3b4f1e99f 100644 --- a/apps/emqx_retainer/src/emqx_retainer_index.erl +++ b/apps/emqx_retainer/src/emqx_retainer_index.erl @@ -88,13 +88,16 @@ select_index(Tokens, Indices) -> select_index(Tokens, Indices, 0, undefined). %% @doc For an index and a wildcard topic -%% returns a matchspec pattern for the corresponding index key. +%% returns a tuple of: +%% * matchspec pattern for the corresponding index key +%% * boolean flag indicating whether the pattern is exact %% %% E.g. for `[2, 3]' index and ['+', <<"b">>, '+', <<"d">>] wildcard topic %% returns {[2, 3], {[<<"b">>, '_'], ['_', <<"d">>]}} pattern. --spec condition(index(), emqx_types:words()) -> match_pattern_part(). +-spec condition(index(), emqx_types:words()) -> {match_pattern_part(), boolean()}. condition(Index, Tokens) -> - {Index, condition(Index, Tokens, 1, [], [])}. + {Condition, IsExact} = condition(Index, Tokens, 1, [], []), + {{Index, Condition}, IsExact}. %% @doc Returns a matchspec pattern for a wildcard topic. %% @@ -103,15 +106,17 @@ condition(Index, Tokens) -> -spec condition(emqx_types:words()) -> match_pattern_part(). condition(Tokens) -> Tokens1 = [ - case W =:= '+' of - true -> '_'; + case W of + '+' -> '_'; _ -> W end || W <- Tokens ], case length(Tokens1) > 0 andalso lists:last(Tokens1) =:= '#' of - false -> Tokens1; - _ -> (Tokens1 -- ['#']) ++ '_' + false -> + Tokens1; + _ -> + (Tokens1 -- ['#']) ++ '_' end. %% @doc Restores concrete topic from its index key representation. @@ -162,13 +167,13 @@ select_index(Tokens, [Index | Indices], MaxScore, SelectedIndex) -> end. condition([_NIndex | _OtherIndex], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) -> - {lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch) ++ '_'}; + {{lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch) ++ '_'}, false}; condition([], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) -> - {lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ '_'}; + {{lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ '_'}, true}; condition([], Tokens, _N, IndexMatch, OtherMatch) -> - {lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ condition(Tokens)}; + {{lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ condition(Tokens)}, true}; condition([_NIndex | _OtherIndex], [], _N, IndexMatch, OtherMatch) -> - {lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch)}; + {{lists:reverse(IndexMatch), lists:reverse(OtherMatch)}, true}; condition([NIndex | OtherIndex], ['+' | OtherTokens], N, IndexMatch, OtherMatch) when NIndex =:= N -> diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 73c86fe04..c0baf2351 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -330,15 +330,15 @@ search_table(Tokens, Now) -> search_table(undefined, Tokens, Now) -> Ms = make_message_match_spec(Tokens, Now), ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]); -search_table(Index, Tokens, Now) -> - Ms = make_index_match_spec(Index, Tokens, Now), +search_table(Index, FilterTokens, Now) -> + {Ms, IsExactMs} = make_index_match_spec(Index, FilterTokens, Now), Topics = [ emqx_retainer_index:restore_topic(Key) || #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms) ], RetainedMsgQH = qlc:q([ ets:lookup(?TAB_MESSAGE, TopicTokens) - || TopicTokens <- Topics + || TopicTokens <- Topics, match(IsExactMs, TopicTokens, FilterTokens) ]), qlc:q([ RetainedMsg @@ -350,6 +350,9 @@ search_table(Index, Tokens, Now) -> (ExpiryTime == 0) or (ExpiryTime > Now) ]). +match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true; +match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens). + clear_batch(Indices, QC) -> {Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE), lists:foreach( @@ -423,9 +426,9 @@ make_message_match_spec(Tokens, NowMs) -> [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}]. make_index_match_spec(Index, Tokens, NowMs) -> - Cond = emqx_retainer_index:condition(Index, Tokens), + {Cond, IsExact} = emqx_retainer_index:condition(Index, Tokens), MsHd = #retained_index{key = Cond, expiry_time = '$3'}, - [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}]. + {[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}], IsExact}. is_table_full() -> Limit = emqx:get_config([retainer, backend, max_retained_messages]), diff --git a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl index fb4dea16d..05e4199b7 100644 --- a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl @@ -22,8 +22,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). - --define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). +-include_lib("emqx/include/asserts.hrl"). -import(emqx_mgmt_api_test_util, [ request_api/2, request_api/4, request_api/5, api_path/1, auth_header_/0 @@ -33,25 +32,38 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - application:load(emqx_conf), - ok = ekka:start(), - ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), - emqx_retainer_SUITE:load_conf(), - emqx_mgmt_api_test_util:init_suite([emqx_retainer, emqx_conf]), + Apps = emqx_cth_suite:start( + [ + emqx_conf, + emqx, + emqx_retainer, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{ + work_dir => emqx_cth_suite:work_dir(Config) + } + ), + _ = emqx_common_test_http:create_default_app(), %% make sure no "$SYS/#" topics - emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}), - Config. + _ = emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}), + [{apps, Apps} | Config]. end_per_suite(Config) -> - ekka:stop(), - mria:stop(), - mria_mnesia:delete_schema(), - emqx_mgmt_api_test_util:end_suite([emqx_retainer, emqx_conf]), - Config. + emqx_common_test_http:delete_default_app(), + emqx_cth_suite:stop(?config(apps, Config)). init_per_testcase(_, Config) -> - {ok, _} = emqx_cluster_rpc:start_link(), - Config. + snabbkaffe:start_trace(), + emqx_retainer:clean(), + {ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C), + [{client, C} | Config]. + +end_per_testcase(_, Config) -> + ok = emqtt:disconnect(?config(client, Config)), + snabbkaffe:stop(), + ok. %%------------------------------------------------------------------------------ %% Test Cases @@ -65,7 +77,6 @@ t_config(_Config) -> #{ backend := _, enable := _, - flow_control := _, max_payload_size := _, msg_clear_interval := _, msg_expiry_interval := _ @@ -90,28 +101,22 @@ t_config(_Config) -> UpdateConf(false), UpdateConf(true). -t_messages(_) -> - {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), - emqx_retainer:clean(), +t_messages1(Config) -> + C = ?config(client, Config), Each = fun(I) -> emqtt:publish( - C1, + C, <<"retained/", (I + 60)>>, <<"retained">>, [{qos, 0}, {retain, true}] ) end, - ?check_trace( - {ok, {ok, _}} = - ?wait_async_action( - lists:foreach(Each, lists:seq(1, 5)), - #{?snk_kind := message_retained, topic := <<"retained/A">>}, - 500 - ), - [] + ?assertWaitEvent( + lists:foreach(Each, lists:seq(1, 5)), + #{?snk_kind := message_retained, topic := <<"retained/A">>}, + 500 ), {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])), @@ -133,32 +138,120 @@ t_messages(_) -> from_username := _ }, First + ). + +t_messages2(Config) -> + C = ?config(client, Config), + + ok = lists:foreach( + fun(Topic) -> + ?assertWaitEvent( + emqtt:publish(C, Topic, <<"retained">>, [{qos, 0}, {retain, true}]), + #{?snk_kind := message_retained, topic := Topic}, + 500 + ) + end, + [<<"c">>, <<"c/1">>, <<"c/1/1">>] ), - ok = emqtt:disconnect(C1). + {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages?topic=c"])), -t_messages_page(_) -> - {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), - emqx_retainer:clean(), + #{data := Msgs, meta := _} = decode_json(MsgsJson), + + ?assertEqual(1, length(Msgs)). + +t_message1(Config) -> + C = ?config(client, Config), + + ?assertWaitEvent( + emqtt:publish(C, <<"c/1">>, <<"retained">>, [{qos, 0}, {retain, true}]), + #{?snk_kind := message_retained, topic := <<"c/1">>}, + 500 + ), + + ?assertMatch( + {error, {_, 404, _}}, + request_api( + get, + api_path(["mqtt", "retainer", "message", "c"]) + ) + ), + + {ok, Json} = + request_api( + get, + api_path(["mqtt", "retainer", "message", "c%2F1"]) + ), + + ?assertMatch( + #{ + topic := <<"c/1">>, + payload := <<"cmV0YWluZWQ=">> + }, + decode_json(Json) + ). + +t_message2(Config) -> + C = ?config(client, Config), + + ?assertWaitEvent( + emqtt:publish(C, <<"c">>, <<"retained">>, [{qos, 0}, {retain, true}]), + #{?snk_kind := message_retained, topic := <<"c">>}, + 500 + ), + + ?assertMatch( + {error, {_, 404, _}}, + request_api( + get, + api_path(["mqtt", "retainer", "message", "c%2F%2B"]) + ) + ), + + {ok, Json0} = + request_api( + get, + api_path(["mqtt", "retainer", "message", "c"]) + ), + + ?assertMatch( + #{ + topic := <<"c">>, + payload := <<"cmV0YWluZWQ=">> + }, + decode_json(Json0) + ), + + {ok, Json1} = + request_api( + get, + api_path(["mqtt", "retainer", "message", "c%2F%23"]) + ), + + ?assertMatch( + #{ + topic := <<"c">>, + payload := <<"cmV0YWluZWQ=">> + }, + decode_json(Json1) + ). + +t_messages_page(Config) -> + C = ?config(client, Config), Each = fun(I) -> emqtt:publish( - C1, + C, <<"retained/", (I + 60)>>, <<"retained">>, [{qos, 0}, {retain, true}] ) end, - ?check_trace( - {ok, {ok, _}} = - ?wait_async_action( - lists:foreach(Each, lists:seq(1, 5)), - #{?snk_kind := message_retained, topic := <<"retained/A">>}, - 500 - ), - [] + ?assertWaitEvent( + lists:foreach(Each, lists:seq(1, 5)), + #{?snk_kind := message_retained, topic := <<"retained/A">>}, + 500 ), Page = 4, @@ -187,17 +280,13 @@ t_messages_page(_) -> from_username := _ }, OnlyOne - ), + ). - ok = emqtt:disconnect(C1). - -t_lookup_and_delete(_) -> - {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), - emqx_retainer:clean(), +t_lookup_and_delete(Config) -> + C = ?config(client, Config), timer:sleep(300), - emqtt:publish(C1, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]), timer:sleep(300), API = api_path(["mqtt", "retainer", "message", "retained%2Fapi"]), @@ -220,9 +309,7 @@ t_lookup_and_delete(_) -> {ok, []} = request_api(delete, API), {error, {"HTTP/1.1", 404, "Not Found"}} = request_api(get, API), - {error, {"HTTP/1.1", 404, "Not Found"}} = request_api(delete, API), - - ok = emqtt:disconnect(C1). + {error, {"HTTP/1.1", 404, "Not Found"}} = request_api(delete, API). t_change_storage_type(_Config) -> Path = api_path(["mqtt", "retainer"]), @@ -310,14 +397,12 @@ t_change_storage_type(_Config) -> ok. -t_match_and_clean(_) -> - {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), - emqx_retainer:clean(), +t_match_and_clean(Config) -> + C = ?config(client, Config), timer:sleep(300), _ = [ - emqtt:publish(C1, <

>, <<"retained">>, [{qos, 0}, {retain, true}]) + emqtt:publish(C, <

>, <<"retained">>, [{qos, 0}, {retain, true}]) || P <- [<<"t">>, <<"f">>], S <- [<<"1">>, <<"2">>, <<"3">>] ], @@ -337,20 +422,16 @@ t_match_and_clean(_) -> {ok, []} = request_api(delete, CleanAPI), {ok, LookupJson2} = request_api(get, API), - ?assertMatch(#{data := []}, decode_json(LookupJson2)), - - ok = emqtt:disconnect(C1). - -%%-------------------------------------------------------------------- -%% HTTP Request -%%-------------------------------------------------------------------- -decode_json(Data) -> - BinJson = emqx_utils_json:decode(Data, [return_maps]), - emqx_utils_maps:unsafe_atom_key_map(BinJson). + ?assertMatch(#{data := []}, decode_json(LookupJson2)). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- + +decode_json(Data) -> + BinJson = emqx_utils_json:decode(Data, [return_maps]), + emqx_utils_maps:unsafe_atom_key_map(BinJson). + raw_systopic_conf() -> #{ <<"sys_event_messages">> => diff --git a/apps/emqx_retainer/test/emqx_retainer_index_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_index_SUITE.erl index e044f2f5f..afa7c2c2f 100644 --- a/apps/emqx_retainer/test/emqx_retainer_index_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_index_SUITE.erl @@ -57,6 +57,30 @@ t_to_index_key(_Config) -> [1, 4], [<<"a">>, <<"b">>, <<"c">>] ) + ), + + ?assertEqual( + {[1, 2, 3], {[<<"a">>], []}}, + emqx_retainer_index:to_index_key( + [1, 2, 3], + [<<"a">>] + ) + ), + + ?assertEqual( + {[3, 5], {[<<"b">>], [<<"x">>, <<"a">>, <<"y">>]}}, + emqx_retainer_index:to_index_key( + [3, 5], + [<<"x">>, <<"a">>, <<"b">>, <<"y">>] + ) + ), + + ?assertEqual( + {[3, 5], {[<<"b">>, <<"z">>], [<<"x">>, <<"a">>, <<"y">>]}}, + emqx_retainer_index:to_index_key( + [3, 5], + [<<"x">>, <<"a">>, <<"b">>, <<"y">>, <<"z">>] + ) ). t_index_score(_Config) -> @@ -148,7 +172,7 @@ t_condition(_Config) -> t_condition_index(_Config) -> ?assertEqual( - {[2, 3], {[<<"a">>, <<"b">>], ['_', '_']}}, + {{[2, 3], {[<<"a">>, <<"b">>], ['_', '_']}}, true}, emqx_retainer_index:condition( [2, 3], ['+', <<"a">>, <<"b">>, '+'] @@ -156,7 +180,7 @@ t_condition_index(_Config) -> ), ?assertEqual( - {[3, 4], {[<<"b">>, '_'], ['_', <<"a">>]}}, + {{[3, 4], {[<<"b">>, '_'], ['_', <<"a">>]}}, true}, emqx_retainer_index:condition( [3, 4], ['+', <<"a">>, <<"b">>, '+'] @@ -164,7 +188,7 @@ t_condition_index(_Config) -> ), ?assertEqual( - {[3, 5], {[<<"b">> | '_'], ['_', <<"a">>, '_']}}, + {{[3, 5], {[<<"b">>], ['_', <<"a">>, '_']}}, true}, emqx_retainer_index:condition( [3, 5], ['+', <<"a">>, <<"b">>, '+'] @@ -172,7 +196,7 @@ t_condition_index(_Config) -> ), ?assertEqual( - {[3, 5], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, + {{[3, 5], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, false}, emqx_retainer_index:condition( [3, 5], ['+', <<"a">>, <<"b">>, '#'] @@ -180,7 +204,7 @@ t_condition_index(_Config) -> ), ?assertEqual( - {[3, 4], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, + {{[3, 4], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, false}, emqx_retainer_index:condition( [3, 4], ['+', <<"a">>, <<"b">>, '#'] @@ -188,7 +212,7 @@ t_condition_index(_Config) -> ), ?assertEqual( - {[1], {[<<"a">>], '_'}}, + {{[1], {[<<"a">>], '_'}}, true}, emqx_retainer_index:condition( [1], [<<"a">>, '#'] @@ -196,13 +220,39 @@ t_condition_index(_Config) -> ), ?assertEqual( - {[1, 2, 3], {['', <<"saya">>, '_'], []}}, + {{[1, 2, 3], {['', <<"saya">>, '_'], []}}, true}, emqx_retainer_index:condition( [1, 2, 3], ['', <<"saya">>, '+'] ) + ), + + ?assertEqual( + {{[1, 2, 3], {[<<"c">>], []}}, true}, + emqx_retainer_index:condition( + [1, 2, 3], + [<<"c">>] + ) + ), + + ?assertEqual( + {{[1, 2, 3], {[<<"c">> | '_'], '_'}}, false}, + emqx_retainer_index:condition( + [1, 2, 3], + [<<"c">>, '#'] + ) + ), + + ?assertEqual( + {{[1], {['_'], '_'}}, true}, + emqx_retainer_index:condition( + [1], + ['+', '#'] + ) ). +% {[2],[[<<48>>,<<48>>]],['+','+','#']} + t_restore_topic(_Config) -> ?assertEqual( [<<"x">>, <<"a">>, <<"b">>, <<"y">>], @@ -223,4 +273,11 @@ t_restore_topic(_Config) -> emqx_retainer_index:restore_topic( {[3, 5], {[<<"b">>], [<<"x">>, <<"a">>, <<"y">>]}} ) + ), + + ?assertEqual( + [<<"a">>], + emqx_retainer_index:restore_topic( + {[1, 2, 3], {[<<"a">>], []}} + ) ). diff --git a/apps/emqx_retainer/test/props/prop_emqx_retainer_index.erl b/apps/emqx_retainer/test/props/prop_emqx_retainer_index.erl new file mode 100644 index 000000000..8b9451457 --- /dev/null +++ b/apps/emqx_retainer/test/props/prop_emqx_retainer_index.erl @@ -0,0 +1,106 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_emqx_retainer_index). + +-include_lib("proper/include/proper.hrl"). + +-define(CHARS, 6). +-define(MAX_TOPIC_LEN, 12). +-define(MAX_INDEX_LEN, 4). +-define(MAX_FILTER_LEN, 6). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_index() -> + ?FORALL( + {Index, Topics0, Filter}, + {index_t(), list(topic_t()), filter_t()}, + begin + Topics = lists:usort(Topics0), + + MatchedTopicsDirectly = lists:filter( + fun(Topic) -> + emqx_topic:match(Topic, Filter) + end, + Topics + ), + + Tab = ets:new(?MODULE, [set]), + ok = lists:foreach( + fun(Topic) -> + Key = emqx_retainer_index:to_index_key(Index, Topic), + ets:insert(Tab, {Key, true}) + end, + Topics + ), + + {IndexMs, IsExact} = emqx_retainer_index:condition(Index, Filter), + Ms = [{{IndexMs, '_'}, [], ['$_']}], + MatchedTopixByIndex0 = [ + emqx_retainer_index:restore_topic(Key) + || {Key, _} <- ets:select(Tab, Ms) + ], + MatchedTopixByIndex = + case IsExact of + true -> + MatchedTopixByIndex0; + false -> + lists:filter( + fun(Topic) -> + emqx_topic:match(Topic, Filter) + end, + MatchedTopixByIndex0 + ) + end, + + lists:sort(MatchedTopicsDirectly) =:= lists:sort(MatchedTopixByIndex) + end + ). + +index_t() -> + ?LET( + {Ints, Len}, + {non_empty(list(integer(1, ?MAX_TOPIC_LEN))), integer(1, ?MAX_INDEX_LEN)}, + lists:usort(lists:sublist(Ints, Len)) + ). + +topic_t() -> + ?LET( + {Topic, Len}, + {non_empty(list(topic_segment_t())), integer(1, ?MAX_TOPIC_LEN)}, + lists:sublist(Topic, Len) + ). + +filter_t() -> + ?LET( + {TopicFilter, Len, MLWildcard}, + { + non_empty(list(oneof([topic_segment_t(), '+']))), + integer(1, ?MAX_FILTER_LEN), + oneof([[], ['#']]) + }, + lists:sublist(TopicFilter, Len) ++ MLWildcard + ). + +topic_segment_t() -> + ?LET( + I, + integer(0, ?CHARS - 1), + <<($0 + I)>> + ). diff --git a/changes/ce/fix-12303.en.md b/changes/ce/fix-12303.en.md new file mode 100644 index 000000000..ddbb7ca0e --- /dev/null +++ b/changes/ce/fix-12303.en.md @@ -0,0 +1 @@ +Fix message indexing in retainer. Previously, clients with wildcard subscriptions could receive excess retained messages, not belonging to the topics matching the subscription.