diff --git a/apps/emqx_retainer/src/emqx_retainer_index.erl b/apps/emqx_retainer/src/emqx_retainer_index.erl
index ec49e7a65..3b4f1e99f 100644
--- a/apps/emqx_retainer/src/emqx_retainer_index.erl
+++ b/apps/emqx_retainer/src/emqx_retainer_index.erl
@@ -88,13 +88,16 @@ select_index(Tokens, Indices) ->
select_index(Tokens, Indices, 0, undefined).
%% @doc For an index and a wildcard topic
-%% returns a matchspec pattern for the corresponding index key.
+%% returns a tuple of:
+%% * matchspec pattern for the corresponding index key
+%% * boolean flag indicating whether the pattern is exact
%%
%% E.g. for `[2, 3]' index and ['+', <<"b">>, '+', <<"d">>]
wildcard topic
%% returns {[2, 3], {[<<"b">>, '_'], ['_', <<"d">>]}}
pattern.
--spec condition(index(), emqx_types:words()) -> match_pattern_part().
+-spec condition(index(), emqx_types:words()) -> {match_pattern_part(), boolean()}.
condition(Index, Tokens) ->
- {Index, condition(Index, Tokens, 1, [], [])}.
+ {Condition, IsExact} = condition(Index, Tokens, 1, [], []),
+ {{Index, Condition}, IsExact}.
%% @doc Returns a matchspec pattern for a wildcard topic.
%%
@@ -103,15 +106,17 @@ condition(Index, Tokens) ->
-spec condition(emqx_types:words()) -> match_pattern_part().
condition(Tokens) ->
Tokens1 = [
- case W =:= '+' of
- true -> '_';
+ case W of
+ '+' -> '_';
_ -> W
end
|| W <- Tokens
],
case length(Tokens1) > 0 andalso lists:last(Tokens1) =:= '#' of
- false -> Tokens1;
- _ -> (Tokens1 -- ['#']) ++ '_'
+ false ->
+ Tokens1;
+ _ ->
+ (Tokens1 -- ['#']) ++ '_'
end.
%% @doc Restores concrete topic from its index key representation.
@@ -162,13 +167,13 @@ select_index(Tokens, [Index | Indices], MaxScore, SelectedIndex) ->
end.
condition([_NIndex | _OtherIndex], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) ->
- {lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch) ++ '_'};
+ {{lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch) ++ '_'}, false};
condition([], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) ->
- {lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ '_'};
+ {{lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ '_'}, true};
condition([], Tokens, _N, IndexMatch, OtherMatch) ->
- {lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ condition(Tokens)};
+ {{lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ condition(Tokens)}, true};
condition([_NIndex | _OtherIndex], [], _N, IndexMatch, OtherMatch) ->
- {lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch)};
+ {{lists:reverse(IndexMatch), lists:reverse(OtherMatch)}, true};
condition([NIndex | OtherIndex], ['+' | OtherTokens], N, IndexMatch, OtherMatch) when
NIndex =:= N
->
diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl
index 73c86fe04..c0baf2351 100644
--- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl
+++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl
@@ -330,15 +330,15 @@ search_table(Tokens, Now) ->
search_table(undefined, Tokens, Now) ->
Ms = make_message_match_spec(Tokens, Now),
ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]);
-search_table(Index, Tokens, Now) ->
- Ms = make_index_match_spec(Index, Tokens, Now),
+search_table(Index, FilterTokens, Now) ->
+ {Ms, IsExactMs} = make_index_match_spec(Index, FilterTokens, Now),
Topics = [
emqx_retainer_index:restore_topic(Key)
|| #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms)
],
RetainedMsgQH = qlc:q([
ets:lookup(?TAB_MESSAGE, TopicTokens)
- || TopicTokens <- Topics
+ || TopicTokens <- Topics, match(IsExactMs, TopicTokens, FilterTokens)
]),
qlc:q([
RetainedMsg
@@ -350,6 +350,9 @@ search_table(Index, Tokens, Now) ->
(ExpiryTime == 0) or (ExpiryTime > Now)
]).
+match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true;
+match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens).
+
clear_batch(Indices, QC) ->
{Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE),
lists:foreach(
@@ -423,9 +426,9 @@ make_message_match_spec(Tokens, NowMs) ->
[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
make_index_match_spec(Index, Tokens, NowMs) ->
- Cond = emqx_retainer_index:condition(Index, Tokens),
+ {Cond, IsExact} = emqx_retainer_index:condition(Index, Tokens),
MsHd = #retained_index{key = Cond, expiry_time = '$3'},
- [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
+ {[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}], IsExact}.
is_table_full() ->
Limit = emqx:get_config([retainer, backend, max_retained_messages]),
diff --git a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl
index fb4dea16d..05e4199b7 100644
--- a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl
+++ b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl
@@ -22,8 +22,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-
--define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
+-include_lib("emqx/include/asserts.hrl").
-import(emqx_mgmt_api_test_util, [
request_api/2, request_api/4, request_api/5, api_path/1, auth_header_/0
@@ -33,25 +32,38 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
- application:load(emqx_conf),
- ok = ekka:start(),
- ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
- emqx_retainer_SUITE:load_conf(),
- emqx_mgmt_api_test_util:init_suite([emqx_retainer, emqx_conf]),
+ Apps = emqx_cth_suite:start(
+ [
+ emqx_conf,
+ emqx,
+ emqx_retainer,
+ emqx_management,
+ {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+ ],
+ #{
+ work_dir => emqx_cth_suite:work_dir(Config)
+ }
+ ),
+ _ = emqx_common_test_http:create_default_app(),
%% make sure no "$SYS/#" topics
- emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}),
- Config.
+ _ = emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}),
+ [{apps, Apps} | Config].
end_per_suite(Config) ->
- ekka:stop(),
- mria:stop(),
- mria_mnesia:delete_schema(),
- emqx_mgmt_api_test_util:end_suite([emqx_retainer, emqx_conf]),
- Config.
+ emqx_common_test_http:delete_default_app(),
+ emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(_, Config) ->
- {ok, _} = emqx_cluster_rpc:start_link(),
- Config.
+ snabbkaffe:start_trace(),
+ emqx_retainer:clean(),
+ {ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+ {ok, _} = emqtt:connect(C),
+ [{client, C} | Config].
+
+end_per_testcase(_, Config) ->
+ ok = emqtt:disconnect(?config(client, Config)),
+ snabbkaffe:stop(),
+ ok.
%%------------------------------------------------------------------------------
%% Test Cases
@@ -65,7 +77,6 @@ t_config(_Config) ->
#{
backend := _,
enable := _,
- flow_control := _,
max_payload_size := _,
msg_clear_interval := _,
msg_expiry_interval := _
@@ -90,28 +101,22 @@ t_config(_Config) ->
UpdateConf(false),
UpdateConf(true).
-t_messages(_) ->
- {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
- {ok, _} = emqtt:connect(C1),
- emqx_retainer:clean(),
+t_messages1(Config) ->
+ C = ?config(client, Config),
Each = fun(I) ->
emqtt:publish(
- C1,
+ C,
<<"retained/", (I + 60)>>,
<<"retained">>,
[{qos, 0}, {retain, true}]
)
end,
- ?check_trace(
- {ok, {ok, _}} =
- ?wait_async_action(
- lists:foreach(Each, lists:seq(1, 5)),
- #{?snk_kind := message_retained, topic := <<"retained/A">>},
- 500
- ),
- []
+ ?assertWaitEvent(
+ lists:foreach(Each, lists:seq(1, 5)),
+ #{?snk_kind := message_retained, topic := <<"retained/A">>},
+ 500
),
{ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
@@ -133,32 +138,120 @@ t_messages(_) ->
from_username := _
},
First
+ ).
+
+t_messages2(Config) ->
+ C = ?config(client, Config),
+
+ ok = lists:foreach(
+ fun(Topic) ->
+ ?assertWaitEvent(
+ emqtt:publish(C, Topic, <<"retained">>, [{qos, 0}, {retain, true}]),
+ #{?snk_kind := message_retained, topic := Topic},
+ 500
+ )
+ end,
+ [<<"c">>, <<"c/1">>, <<"c/1/1">>]
),
- ok = emqtt:disconnect(C1).
+ {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages?topic=c"])),
-t_messages_page(_) ->
- {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
- {ok, _} = emqtt:connect(C1),
- emqx_retainer:clean(),
+ #{data := Msgs, meta := _} = decode_json(MsgsJson),
+
+ ?assertEqual(1, length(Msgs)).
+
+t_message1(Config) ->
+ C = ?config(client, Config),
+
+ ?assertWaitEvent(
+ emqtt:publish(C, <<"c/1">>, <<"retained">>, [{qos, 0}, {retain, true}]),
+ #{?snk_kind := message_retained, topic := <<"c/1">>},
+ 500
+ ),
+
+ ?assertMatch(
+ {error, {_, 404, _}},
+ request_api(
+ get,
+ api_path(["mqtt", "retainer", "message", "c"])
+ )
+ ),
+
+ {ok, Json} =
+ request_api(
+ get,
+ api_path(["mqtt", "retainer", "message", "c%2F1"])
+ ),
+
+ ?assertMatch(
+ #{
+ topic := <<"c/1">>,
+ payload := <<"cmV0YWluZWQ=">>
+ },
+ decode_json(Json)
+ ).
+
+t_message2(Config) ->
+ C = ?config(client, Config),
+
+ ?assertWaitEvent(
+ emqtt:publish(C, <<"c">>, <<"retained">>, [{qos, 0}, {retain, true}]),
+ #{?snk_kind := message_retained, topic := <<"c">>},
+ 500
+ ),
+
+ ?assertMatch(
+ {error, {_, 404, _}},
+ request_api(
+ get,
+ api_path(["mqtt", "retainer", "message", "c%2F%2B"])
+ )
+ ),
+
+ {ok, Json0} =
+ request_api(
+ get,
+ api_path(["mqtt", "retainer", "message", "c"])
+ ),
+
+ ?assertMatch(
+ #{
+ topic := <<"c">>,
+ payload := <<"cmV0YWluZWQ=">>
+ },
+ decode_json(Json0)
+ ),
+
+ {ok, Json1} =
+ request_api(
+ get,
+ api_path(["mqtt", "retainer", "message", "c%2F%23"])
+ ),
+
+ ?assertMatch(
+ #{
+ topic := <<"c">>,
+ payload := <<"cmV0YWluZWQ=">>
+ },
+ decode_json(Json1)
+ ).
+
+t_messages_page(Config) ->
+ C = ?config(client, Config),
Each = fun(I) ->
emqtt:publish(
- C1,
+ C,
<<"retained/", (I + 60)>>,
<<"retained">>,
[{qos, 0}, {retain, true}]
)
end,
- ?check_trace(
- {ok, {ok, _}} =
- ?wait_async_action(
- lists:foreach(Each, lists:seq(1, 5)),
- #{?snk_kind := message_retained, topic := <<"retained/A">>},
- 500
- ),
- []
+ ?assertWaitEvent(
+ lists:foreach(Each, lists:seq(1, 5)),
+ #{?snk_kind := message_retained, topic := <<"retained/A">>},
+ 500
),
Page = 4,
@@ -187,17 +280,13 @@ t_messages_page(_) ->
from_username := _
},
OnlyOne
- ),
+ ).
- ok = emqtt:disconnect(C1).
-
-t_lookup_and_delete(_) ->
- {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
- {ok, _} = emqtt:connect(C1),
- emqx_retainer:clean(),
+t_lookup_and_delete(Config) ->
+ C = ?config(client, Config),
timer:sleep(300),
- emqtt:publish(C1, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]),
+ emqtt:publish(C, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]),
timer:sleep(300),
API = api_path(["mqtt", "retainer", "message", "retained%2Fapi"]),
@@ -220,9 +309,7 @@ t_lookup_and_delete(_) ->
{ok, []} = request_api(delete, API),
{error, {"HTTP/1.1", 404, "Not Found"}} = request_api(get, API),
- {error, {"HTTP/1.1", 404, "Not Found"}} = request_api(delete, API),
-
- ok = emqtt:disconnect(C1).
+ {error, {"HTTP/1.1", 404, "Not Found"}} = request_api(delete, API).
t_change_storage_type(_Config) ->
Path = api_path(["mqtt", "retainer"]),
@@ -310,14 +397,12 @@ t_change_storage_type(_Config) ->
ok.
-t_match_and_clean(_) ->
- {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
- {ok, _} = emqtt:connect(C1),
- emqx_retainer:clean(),
+t_match_and_clean(Config) ->
+ C = ?config(client, Config),
timer:sleep(300),
_ = [
- emqtt:publish(C1, <
>, <<"retained">>, [{qos, 0}, {retain, true}]) + emqtt:publish(C, <
>, <<"retained">>, [{qos, 0}, {retain, true}]) || P <- [<<"t">>, <<"f">>], S <- [<<"1">>, <<"2">>, <<"3">>] ], @@ -337,20 +422,16 @@ t_match_and_clean(_) -> {ok, []} = request_api(delete, CleanAPI), {ok, LookupJson2} = request_api(get, API), - ?assertMatch(#{data := []}, decode_json(LookupJson2)), - - ok = emqtt:disconnect(C1). - -%%-------------------------------------------------------------------- -%% HTTP Request -%%-------------------------------------------------------------------- -decode_json(Data) -> - BinJson = emqx_utils_json:decode(Data, [return_maps]), - emqx_utils_maps:unsafe_atom_key_map(BinJson). + ?assertMatch(#{data := []}, decode_json(LookupJson2)). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- + +decode_json(Data) -> + BinJson = emqx_utils_json:decode(Data, [return_maps]), + emqx_utils_maps:unsafe_atom_key_map(BinJson). + raw_systopic_conf() -> #{ <<"sys_event_messages">> => diff --git a/apps/emqx_retainer/test/emqx_retainer_index_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_index_SUITE.erl index e044f2f5f..afa7c2c2f 100644 --- a/apps/emqx_retainer/test/emqx_retainer_index_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_index_SUITE.erl @@ -57,6 +57,30 @@ t_to_index_key(_Config) -> [1, 4], [<<"a">>, <<"b">>, <<"c">>] ) + ), + + ?assertEqual( + {[1, 2, 3], {[<<"a">>], []}}, + emqx_retainer_index:to_index_key( + [1, 2, 3], + [<<"a">>] + ) + ), + + ?assertEqual( + {[3, 5], {[<<"b">>], [<<"x">>, <<"a">>, <<"y">>]}}, + emqx_retainer_index:to_index_key( + [3, 5], + [<<"x">>, <<"a">>, <<"b">>, <<"y">>] + ) + ), + + ?assertEqual( + {[3, 5], {[<<"b">>, <<"z">>], [<<"x">>, <<"a">>, <<"y">>]}}, + emqx_retainer_index:to_index_key( + [3, 5], + [<<"x">>, <<"a">>, <<"b">>, <<"y">>, <<"z">>] + ) ). t_index_score(_Config) -> @@ -148,7 +172,7 @@ t_condition(_Config) -> t_condition_index(_Config) -> ?assertEqual( - {[2, 3], {[<<"a">>, <<"b">>], ['_', '_']}}, + {{[2, 3], {[<<"a">>, <<"b">>], ['_', '_']}}, true}, emqx_retainer_index:condition( [2, 3], ['+', <<"a">>, <<"b">>, '+'] @@ -156,7 +180,7 @@ t_condition_index(_Config) -> ), ?assertEqual( - {[3, 4], {[<<"b">>, '_'], ['_', <<"a">>]}}, + {{[3, 4], {[<<"b">>, '_'], ['_', <<"a">>]}}, true}, emqx_retainer_index:condition( [3, 4], ['+', <<"a">>, <<"b">>, '+'] @@ -164,7 +188,7 @@ t_condition_index(_Config) -> ), ?assertEqual( - {[3, 5], {[<<"b">> | '_'], ['_', <<"a">>, '_']}}, + {{[3, 5], {[<<"b">>], ['_', <<"a">>, '_']}}, true}, emqx_retainer_index:condition( [3, 5], ['+', <<"a">>, <<"b">>, '+'] @@ -172,7 +196,7 @@ t_condition_index(_Config) -> ), ?assertEqual( - {[3, 5], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, + {{[3, 5], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, false}, emqx_retainer_index:condition( [3, 5], ['+', <<"a">>, <<"b">>, '#'] @@ -180,7 +204,7 @@ t_condition_index(_Config) -> ), ?assertEqual( - {[3, 4], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, + {{[3, 4], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, false}, emqx_retainer_index:condition( [3, 4], ['+', <<"a">>, <<"b">>, '#'] @@ -188,7 +212,7 @@ t_condition_index(_Config) -> ), ?assertEqual( - {[1], {[<<"a">>], '_'}}, + {{[1], {[<<"a">>], '_'}}, true}, emqx_retainer_index:condition( [1], [<<"a">>, '#'] @@ -196,13 +220,39 @@ t_condition_index(_Config) -> ), ?assertEqual( - {[1, 2, 3], {['', <<"saya">>, '_'], []}}, + {{[1, 2, 3], {['', <<"saya">>, '_'], []}}, true}, emqx_retainer_index:condition( [1, 2, 3], ['', <<"saya">>, '+'] ) + ), + + ?assertEqual( + {{[1, 2, 3], {[<<"c">>], []}}, true}, + emqx_retainer_index:condition( + [1, 2, 3], + [<<"c">>] + ) + ), + + ?assertEqual( + {{[1, 2, 3], {[<<"c">> | '_'], '_'}}, false}, + emqx_retainer_index:condition( + [1, 2, 3], + [<<"c">>, '#'] + ) + ), + + ?assertEqual( + {{[1], {['_'], '_'}}, true}, + emqx_retainer_index:condition( + [1], + ['+', '#'] + ) ). +% {[2],[[<<48>>,<<48>>]],['+','+','#']} + t_restore_topic(_Config) -> ?assertEqual( [<<"x">>, <<"a">>, <<"b">>, <<"y">>], @@ -223,4 +273,11 @@ t_restore_topic(_Config) -> emqx_retainer_index:restore_topic( {[3, 5], {[<<"b">>], [<<"x">>, <<"a">>, <<"y">>]}} ) + ), + + ?assertEqual( + [<<"a">>], + emqx_retainer_index:restore_topic( + {[1, 2, 3], {[<<"a">>], []}} + ) ). diff --git a/apps/emqx_retainer/test/props/prop_emqx_retainer_index.erl b/apps/emqx_retainer/test/props/prop_emqx_retainer_index.erl new file mode 100644 index 000000000..8b9451457 --- /dev/null +++ b/apps/emqx_retainer/test/props/prop_emqx_retainer_index.erl @@ -0,0 +1,106 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_emqx_retainer_index). + +-include_lib("proper/include/proper.hrl"). + +-define(CHARS, 6). +-define(MAX_TOPIC_LEN, 12). +-define(MAX_INDEX_LEN, 4). +-define(MAX_FILTER_LEN, 6). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_index() -> + ?FORALL( + {Index, Topics0, Filter}, + {index_t(), list(topic_t()), filter_t()}, + begin + Topics = lists:usort(Topics0), + + MatchedTopicsDirectly = lists:filter( + fun(Topic) -> + emqx_topic:match(Topic, Filter) + end, + Topics + ), + + Tab = ets:new(?MODULE, [set]), + ok = lists:foreach( + fun(Topic) -> + Key = emqx_retainer_index:to_index_key(Index, Topic), + ets:insert(Tab, {Key, true}) + end, + Topics + ), + + {IndexMs, IsExact} = emqx_retainer_index:condition(Index, Filter), + Ms = [{{IndexMs, '_'}, [], ['$_']}], + MatchedTopixByIndex0 = [ + emqx_retainer_index:restore_topic(Key) + || {Key, _} <- ets:select(Tab, Ms) + ], + MatchedTopixByIndex = + case IsExact of + true -> + MatchedTopixByIndex0; + false -> + lists:filter( + fun(Topic) -> + emqx_topic:match(Topic, Filter) + end, + MatchedTopixByIndex0 + ) + end, + + lists:sort(MatchedTopicsDirectly) =:= lists:sort(MatchedTopixByIndex) + end + ). + +index_t() -> + ?LET( + {Ints, Len}, + {non_empty(list(integer(1, ?MAX_TOPIC_LEN))), integer(1, ?MAX_INDEX_LEN)}, + lists:usort(lists:sublist(Ints, Len)) + ). + +topic_t() -> + ?LET( + {Topic, Len}, + {non_empty(list(topic_segment_t())), integer(1, ?MAX_TOPIC_LEN)}, + lists:sublist(Topic, Len) + ). + +filter_t() -> + ?LET( + {TopicFilter, Len, MLWildcard}, + { + non_empty(list(oneof([topic_segment_t(), '+']))), + integer(1, ?MAX_FILTER_LEN), + oneof([[], ['#']]) + }, + lists:sublist(TopicFilter, Len) ++ MLWildcard + ). + +topic_segment_t() -> + ?LET( + I, + integer(0, ?CHARS - 1), + <<($0 + I)>> + ). diff --git a/changes/ce/fix-12303.en.md b/changes/ce/fix-12303.en.md new file mode 100644 index 000000000..ddbb7ca0e --- /dev/null +++ b/changes/ce/fix-12303.en.md @@ -0,0 +1 @@ +Fix message indexing in retainer. Previously, clients with wildcard subscriptions could receive excess retained messages, not belonging to the topics matching the subscription.