fix(retainer): fix topic search by index

This commit is contained in:
Ilya Averyanov 2024-01-11 13:28:21 +03:00
parent 7c8a36fc06
commit 09d524144b
6 changed files with 346 additions and 93 deletions

View File

@ -88,13 +88,16 @@ select_index(Tokens, Indices) ->
select_index(Tokens, Indices, 0, undefined). select_index(Tokens, Indices, 0, undefined).
%% @doc For an index and a wildcard topic %% @doc For an index and a wildcard topic
%% returns a matchspec pattern for the corresponding index key. %% returns a tuple of:
%% * matchspec pattern for the corresponding index key
%% * boolean flag indicating whether the pattern is exact
%% %%
%% E.g. for `[2, 3]' index and <code>['+', <<"b">>, '+', <<"d">>]</code> wildcard topic %% E.g. for `[2, 3]' index and <code>['+', <<"b">>, '+', <<"d">>]</code> wildcard topic
%% returns <code>{[2, 3], {[<<"b">>, '_'], ['_', <<"d">>]}}</code> pattern. %% returns <code>{[2, 3], {[<<"b">>, '_'], ['_', <<"d">>]}}</code> pattern.
-spec condition(index(), emqx_types:words()) -> match_pattern_part(). -spec condition(index(), emqx_types:words()) -> {match_pattern_part(), boolean()}.
condition(Index, Tokens) -> condition(Index, Tokens) ->
{Index, condition(Index, Tokens, 1, [], [])}. {Condition, IsExact} = condition(Index, Tokens, 1, [], []),
{{Index, Condition}, IsExact}.
%% @doc Returns a matchspec pattern for a wildcard topic. %% @doc Returns a matchspec pattern for a wildcard topic.
%% %%
@ -103,15 +106,17 @@ condition(Index, Tokens) ->
-spec condition(emqx_types:words()) -> match_pattern_part(). -spec condition(emqx_types:words()) -> match_pattern_part().
condition(Tokens) -> condition(Tokens) ->
Tokens1 = [ Tokens1 = [
case W =:= '+' of case W of
true -> '_'; '+' -> '_';
_ -> W _ -> W
end end
|| W <- Tokens || W <- Tokens
], ],
case length(Tokens1) > 0 andalso lists:last(Tokens1) =:= '#' of case length(Tokens1) > 0 andalso lists:last(Tokens1) =:= '#' of
false -> Tokens1; false ->
_ -> (Tokens1 -- ['#']) ++ '_' Tokens1;
_ ->
(Tokens1 -- ['#']) ++ '_'
end. end.
%% @doc Restores concrete topic from its index key representation. %% @doc Restores concrete topic from its index key representation.
@ -162,13 +167,13 @@ select_index(Tokens, [Index | Indices], MaxScore, SelectedIndex) ->
end. end.
condition([_NIndex | _OtherIndex], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) -> condition([_NIndex | _OtherIndex], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) ->
{lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch) ++ '_'}; {{lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch) ++ '_'}, false};
condition([], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) -> condition([], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) ->
{lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ '_'}; {{lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ '_'}, true};
condition([], Tokens, _N, IndexMatch, OtherMatch) -> condition([], Tokens, _N, IndexMatch, OtherMatch) ->
{lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ condition(Tokens)}; {{lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ condition(Tokens)}, true};
condition([_NIndex | _OtherIndex], [], _N, IndexMatch, OtherMatch) -> condition([_NIndex | _OtherIndex], [], _N, IndexMatch, OtherMatch) ->
{lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch)}; {{lists:reverse(IndexMatch), lists:reverse(OtherMatch)}, true};
condition([NIndex | OtherIndex], ['+' | OtherTokens], N, IndexMatch, OtherMatch) when condition([NIndex | OtherIndex], ['+' | OtherTokens], N, IndexMatch, OtherMatch) when
NIndex =:= N NIndex =:= N
-> ->

View File

@ -330,15 +330,15 @@ search_table(Tokens, Now) ->
search_table(undefined, Tokens, Now) -> search_table(undefined, Tokens, Now) ->
Ms = make_message_match_spec(Tokens, Now), Ms = make_message_match_spec(Tokens, Now),
ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]); ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]);
search_table(Index, Tokens, Now) -> search_table(Index, FilterTokens, Now) ->
Ms = make_index_match_spec(Index, Tokens, Now), {Ms, IsExactMs} = make_index_match_spec(Index, FilterTokens, Now),
Topics = [ Topics = [
emqx_retainer_index:restore_topic(Key) emqx_retainer_index:restore_topic(Key)
|| #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms) || #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms)
], ],
RetainedMsgQH = qlc:q([ RetainedMsgQH = qlc:q([
ets:lookup(?TAB_MESSAGE, TopicTokens) ets:lookup(?TAB_MESSAGE, TopicTokens)
|| TopicTokens <- Topics || TopicTokens <- Topics, match(IsExactMs, TopicTokens, FilterTokens)
]), ]),
qlc:q([ qlc:q([
RetainedMsg RetainedMsg
@ -350,6 +350,9 @@ search_table(Index, Tokens, Now) ->
(ExpiryTime == 0) or (ExpiryTime > Now) (ExpiryTime == 0) or (ExpiryTime > Now)
]). ]).
match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true;
match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens).
clear_batch(Indices, QC) -> clear_batch(Indices, QC) ->
{Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE), {Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE),
lists:foreach( lists:foreach(
@ -423,9 +426,9 @@ make_message_match_spec(Tokens, NowMs) ->
[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}]. [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
make_index_match_spec(Index, Tokens, NowMs) -> make_index_match_spec(Index, Tokens, NowMs) ->
Cond = emqx_retainer_index:condition(Index, Tokens), {Cond, IsExact} = emqx_retainer_index:condition(Index, Tokens),
MsHd = #retained_index{key = Cond, expiry_time = '$3'}, MsHd = #retained_index{key = Cond, expiry_time = '$3'},
[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}]. {[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}], IsExact}.
is_table_full() -> is_table_full() ->
Limit = emqx:get_config([retainer, backend, max_retained_messages]), Limit = emqx:get_config([retainer, backend, max_retained_messages]),

View File

@ -22,8 +22,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-import(emqx_mgmt_api_test_util, [ -import(emqx_mgmt_api_test_util, [
request_api/2, request_api/4, request_api/5, api_path/1, auth_header_/0 request_api/2, request_api/4, request_api/5, api_path/1, auth_header_/0
@ -33,25 +32,38 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
application:load(emqx_conf), Apps = emqx_cth_suite:start(
ok = ekka:start(), [
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), emqx_conf,
emqx_retainer_SUITE:load_conf(), emqx,
emqx_mgmt_api_test_util:init_suite([emqx_retainer, emqx_conf]), emqx_retainer,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{
work_dir => emqx_cth_suite:work_dir(Config)
}
),
_ = emqx_common_test_http:create_default_app(),
%% make sure no "$SYS/#" topics %% make sure no "$SYS/#" topics
emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}), _ = emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}),
Config. [{apps, Apps} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->
ekka:stop(), emqx_common_test_http:delete_default_app(),
mria:stop(), emqx_cth_suite:stop(?config(apps, Config)).
mria_mnesia:delete_schema(),
emqx_mgmt_api_test_util:end_suite([emqx_retainer, emqx_conf]),
Config.
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(), snabbkaffe:start_trace(),
Config. emqx_retainer:clean(),
{ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C),
[{client, C} | Config].
end_per_testcase(_, Config) ->
ok = emqtt:disconnect(?config(client, Config)),
snabbkaffe:stop(),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Test Cases %% Test Cases
@ -65,7 +77,6 @@ t_config(_Config) ->
#{ #{
backend := _, backend := _,
enable := _, enable := _,
flow_control := _,
max_payload_size := _, max_payload_size := _,
msg_clear_interval := _, msg_clear_interval := _,
msg_expiry_interval := _ msg_expiry_interval := _
@ -90,28 +101,22 @@ t_config(_Config) ->
UpdateConf(false), UpdateConf(false),
UpdateConf(true). UpdateConf(true).
t_messages(_) -> t_messages1(Config) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), C = ?config(client, Config),
{ok, _} = emqtt:connect(C1),
emqx_retainer:clean(),
Each = fun(I) -> Each = fun(I) ->
emqtt:publish( emqtt:publish(
C1, C,
<<"retained/", (I + 60)>>, <<"retained/", (I + 60)>>,
<<"retained">>, <<"retained">>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
) )
end, end,
?check_trace( ?assertWaitEvent(
{ok, {ok, _}} = lists:foreach(Each, lists:seq(1, 5)),
?wait_async_action( #{?snk_kind := message_retained, topic := <<"retained/A">>},
lists:foreach(Each, lists:seq(1, 5)), 500
#{?snk_kind := message_retained, topic := <<"retained/A">>},
500
),
[]
), ),
{ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])), {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
@ -133,32 +138,120 @@ t_messages(_) ->
from_username := _ from_username := _
}, },
First First
).
t_messages2(Config) ->
C = ?config(client, Config),
ok = lists:foreach(
fun(Topic) ->
?assertWaitEvent(
emqtt:publish(C, Topic, <<"retained">>, [{qos, 0}, {retain, true}]),
#{?snk_kind := message_retained, topic := Topic},
500
)
end,
[<<"c">>, <<"c/1">>, <<"c/1/1">>]
), ),
ok = emqtt:disconnect(C1). {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages?topic=c"])),
t_messages_page(_) -> #{data := Msgs, meta := _} = decode_json(MsgsJson),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), ?assertEqual(1, length(Msgs)).
emqx_retainer:clean(),
t_message1(Config) ->
C = ?config(client, Config),
?assertWaitEvent(
emqtt:publish(C, <<"c/1">>, <<"retained">>, [{qos, 0}, {retain, true}]),
#{?snk_kind := message_retained, topic := <<"c/1">>},
500
),
?assertMatch(
{error, {_, 404, _}},
request_api(
get,
api_path(["mqtt", "retainer", "message", "c"])
)
),
{ok, Json} =
request_api(
get,
api_path(["mqtt", "retainer", "message", "c%2F1"])
),
?assertMatch(
#{
topic := <<"c/1">>,
payload := <<"cmV0YWluZWQ=">>
},
decode_json(Json)
).
t_message2(Config) ->
C = ?config(client, Config),
?assertWaitEvent(
emqtt:publish(C, <<"c">>, <<"retained">>, [{qos, 0}, {retain, true}]),
#{?snk_kind := message_retained, topic := <<"c">>},
500
),
?assertMatch(
{error, {_, 404, _}},
request_api(
get,
api_path(["mqtt", "retainer", "message", "c%2F%2B"])
)
),
{ok, Json0} =
request_api(
get,
api_path(["mqtt", "retainer", "message", "c"])
),
?assertMatch(
#{
topic := <<"c">>,
payload := <<"cmV0YWluZWQ=">>
},
decode_json(Json0)
),
{ok, Json1} =
request_api(
get,
api_path(["mqtt", "retainer", "message", "c%2F%23"])
),
?assertMatch(
#{
topic := <<"c">>,
payload := <<"cmV0YWluZWQ=">>
},
decode_json(Json1)
).
t_messages_page(Config) ->
C = ?config(client, Config),
Each = fun(I) -> Each = fun(I) ->
emqtt:publish( emqtt:publish(
C1, C,
<<"retained/", (I + 60)>>, <<"retained/", (I + 60)>>,
<<"retained">>, <<"retained">>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
) )
end, end,
?check_trace( ?assertWaitEvent(
{ok, {ok, _}} = lists:foreach(Each, lists:seq(1, 5)),
?wait_async_action( #{?snk_kind := message_retained, topic := <<"retained/A">>},
lists:foreach(Each, lists:seq(1, 5)), 500
#{?snk_kind := message_retained, topic := <<"retained/A">>},
500
),
[]
), ),
Page = 4, Page = 4,
@ -187,17 +280,13 @@ t_messages_page(_) ->
from_username := _ from_username := _
}, },
OnlyOne OnlyOne
), ).
ok = emqtt:disconnect(C1). t_lookup_and_delete(Config) ->
C = ?config(client, Config),
t_lookup_and_delete(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqx_retainer:clean(),
timer:sleep(300), timer:sleep(300),
emqtt:publish(C1, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]), emqtt:publish(C, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]),
timer:sleep(300), timer:sleep(300),
API = api_path(["mqtt", "retainer", "message", "retained%2Fapi"]), API = api_path(["mqtt", "retainer", "message", "retained%2Fapi"]),
@ -220,9 +309,7 @@ t_lookup_and_delete(_) ->
{ok, []} = request_api(delete, API), {ok, []} = request_api(delete, API),
{error, {"HTTP/1.1", 404, "Not Found"}} = request_api(get, API), {error, {"HTTP/1.1", 404, "Not Found"}} = request_api(get, API),
{error, {"HTTP/1.1", 404, "Not Found"}} = request_api(delete, API), {error, {"HTTP/1.1", 404, "Not Found"}} = request_api(delete, API).
ok = emqtt:disconnect(C1).
t_change_storage_type(_Config) -> t_change_storage_type(_Config) ->
Path = api_path(["mqtt", "retainer"]), Path = api_path(["mqtt", "retainer"]),
@ -310,14 +397,12 @@ t_change_storage_type(_Config) ->
ok. ok.
t_match_and_clean(_) -> t_match_and_clean(Config) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), C = ?config(client, Config),
{ok, _} = emqtt:connect(C1),
emqx_retainer:clean(),
timer:sleep(300), timer:sleep(300),
_ = [ _ = [
emqtt:publish(C1, <<P/binary, "/", S/binary>>, <<"retained">>, [{qos, 0}, {retain, true}]) emqtt:publish(C, <<P/binary, "/", S/binary>>, <<"retained">>, [{qos, 0}, {retain, true}])
|| P <- [<<"t">>, <<"f">>], S <- [<<"1">>, <<"2">>, <<"3">>] || P <- [<<"t">>, <<"f">>], S <- [<<"1">>, <<"2">>, <<"3">>]
], ],
@ -337,20 +422,16 @@ t_match_and_clean(_) ->
{ok, []} = request_api(delete, CleanAPI), {ok, []} = request_api(delete, CleanAPI),
{ok, LookupJson2} = request_api(get, API), {ok, LookupJson2} = request_api(get, API),
?assertMatch(#{data := []}, decode_json(LookupJson2)), ?assertMatch(#{data := []}, decode_json(LookupJson2)).
ok = emqtt:disconnect(C1).
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------
decode_json(Data) ->
BinJson = emqx_utils_json:decode(Data, [return_maps]),
emqx_utils_maps:unsafe_atom_key_map(BinJson).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
decode_json(Data) ->
BinJson = emqx_utils_json:decode(Data, [return_maps]),
emqx_utils_maps:unsafe_atom_key_map(BinJson).
raw_systopic_conf() -> raw_systopic_conf() ->
#{ #{
<<"sys_event_messages">> => <<"sys_event_messages">> =>

View File

@ -57,6 +57,30 @@ t_to_index_key(_Config) ->
[1, 4], [1, 4],
[<<"a">>, <<"b">>, <<"c">>] [<<"a">>, <<"b">>, <<"c">>]
) )
),
?assertEqual(
{[1, 2, 3], {[<<"a">>], []}},
emqx_retainer_index:to_index_key(
[1, 2, 3],
[<<"a">>]
)
),
?assertEqual(
{[3, 5], {[<<"b">>], [<<"x">>, <<"a">>, <<"y">>]}},
emqx_retainer_index:to_index_key(
[3, 5],
[<<"x">>, <<"a">>, <<"b">>, <<"y">>]
)
),
?assertEqual(
{[3, 5], {[<<"b">>, <<"z">>], [<<"x">>, <<"a">>, <<"y">>]}},
emqx_retainer_index:to_index_key(
[3, 5],
[<<"x">>, <<"a">>, <<"b">>, <<"y">>, <<"z">>]
)
). ).
t_index_score(_Config) -> t_index_score(_Config) ->
@ -148,7 +172,7 @@ t_condition(_Config) ->
t_condition_index(_Config) -> t_condition_index(_Config) ->
?assertEqual( ?assertEqual(
{[2, 3], {[<<"a">>, <<"b">>], ['_', '_']}}, {{[2, 3], {[<<"a">>, <<"b">>], ['_', '_']}}, true},
emqx_retainer_index:condition( emqx_retainer_index:condition(
[2, 3], [2, 3],
['+', <<"a">>, <<"b">>, '+'] ['+', <<"a">>, <<"b">>, '+']
@ -156,7 +180,7 @@ t_condition_index(_Config) ->
), ),
?assertEqual( ?assertEqual(
{[3, 4], {[<<"b">>, '_'], ['_', <<"a">>]}}, {{[3, 4], {[<<"b">>, '_'], ['_', <<"a">>]}}, true},
emqx_retainer_index:condition( emqx_retainer_index:condition(
[3, 4], [3, 4],
['+', <<"a">>, <<"b">>, '+'] ['+', <<"a">>, <<"b">>, '+']
@ -164,7 +188,7 @@ t_condition_index(_Config) ->
), ),
?assertEqual( ?assertEqual(
{[3, 5], {[<<"b">> | '_'], ['_', <<"a">>, '_']}}, {{[3, 5], {[<<"b">>], ['_', <<"a">>, '_']}}, true},
emqx_retainer_index:condition( emqx_retainer_index:condition(
[3, 5], [3, 5],
['+', <<"a">>, <<"b">>, '+'] ['+', <<"a">>, <<"b">>, '+']
@ -172,7 +196,7 @@ t_condition_index(_Config) ->
), ),
?assertEqual( ?assertEqual(
{[3, 5], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, {{[3, 5], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, false},
emqx_retainer_index:condition( emqx_retainer_index:condition(
[3, 5], [3, 5],
['+', <<"a">>, <<"b">>, '#'] ['+', <<"a">>, <<"b">>, '#']
@ -180,7 +204,7 @@ t_condition_index(_Config) ->
), ),
?assertEqual( ?assertEqual(
{[3, 4], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, {{[3, 4], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, false},
emqx_retainer_index:condition( emqx_retainer_index:condition(
[3, 4], [3, 4],
['+', <<"a">>, <<"b">>, '#'] ['+', <<"a">>, <<"b">>, '#']
@ -188,7 +212,7 @@ t_condition_index(_Config) ->
), ),
?assertEqual( ?assertEqual(
{[1], {[<<"a">>], '_'}}, {{[1], {[<<"a">>], '_'}}, true},
emqx_retainer_index:condition( emqx_retainer_index:condition(
[1], [1],
[<<"a">>, '#'] [<<"a">>, '#']
@ -196,13 +220,39 @@ t_condition_index(_Config) ->
), ),
?assertEqual( ?assertEqual(
{[1, 2, 3], {['', <<"saya">>, '_'], []}}, {{[1, 2, 3], {['', <<"saya">>, '_'], []}}, true},
emqx_retainer_index:condition( emqx_retainer_index:condition(
[1, 2, 3], [1, 2, 3],
['', <<"saya">>, '+'] ['', <<"saya">>, '+']
) )
),
?assertEqual(
{{[1, 2, 3], {[<<"c">>], []}}, true},
emqx_retainer_index:condition(
[1, 2, 3],
[<<"c">>]
)
),
?assertEqual(
{{[1, 2, 3], {[<<"c">> | '_'], '_'}}, false},
emqx_retainer_index:condition(
[1, 2, 3],
[<<"c">>, '#']
)
),
?assertEqual(
{{[1], {['_'], '_'}}, true},
emqx_retainer_index:condition(
[1],
['+', '#']
)
). ).
% {[2],[[<<48>>,<<48>>]],['+','+','#']}
t_restore_topic(_Config) -> t_restore_topic(_Config) ->
?assertEqual( ?assertEqual(
[<<"x">>, <<"a">>, <<"b">>, <<"y">>], [<<"x">>, <<"a">>, <<"b">>, <<"y">>],
@ -223,4 +273,11 @@ t_restore_topic(_Config) ->
emqx_retainer_index:restore_topic( emqx_retainer_index:restore_topic(
{[3, 5], {[<<"b">>], [<<"x">>, <<"a">>, <<"y">>]}} {[3, 5], {[<<"b">>], [<<"x">>, <<"a">>, <<"y">>]}}
) )
),
?assertEqual(
[<<"a">>],
emqx_retainer_index:restore_topic(
{[1, 2, 3], {[<<"a">>], []}}
)
). ).

View File

@ -0,0 +1,106 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(prop_emqx_retainer_index).
-include_lib("proper/include/proper.hrl").
-define(CHARS, 6).
-define(MAX_TOPIC_LEN, 12).
-define(MAX_INDEX_LEN, 4).
-define(MAX_FILTER_LEN, 6).
%%--------------------------------------------------------------------
%% Properties
%%--------------------------------------------------------------------
prop_index() ->
?FORALL(
{Index, Topics0, Filter},
{index_t(), list(topic_t()), filter_t()},
begin
Topics = lists:usort(Topics0),
MatchedTopicsDirectly = lists:filter(
fun(Topic) ->
emqx_topic:match(Topic, Filter)
end,
Topics
),
Tab = ets:new(?MODULE, [set]),
ok = lists:foreach(
fun(Topic) ->
Key = emqx_retainer_index:to_index_key(Index, Topic),
ets:insert(Tab, {Key, true})
end,
Topics
),
{IndexMs, IsExact} = emqx_retainer_index:condition(Index, Filter),
Ms = [{{IndexMs, '_'}, [], ['$_']}],
MatchedTopixByIndex0 = [
emqx_retainer_index:restore_topic(Key)
|| {Key, _} <- ets:select(Tab, Ms)
],
MatchedTopixByIndex =
case IsExact of
true ->
MatchedTopixByIndex0;
false ->
lists:filter(
fun(Topic) ->
emqx_topic:match(Topic, Filter)
end,
MatchedTopixByIndex0
)
end,
lists:sort(MatchedTopicsDirectly) =:= lists:sort(MatchedTopixByIndex)
end
).
index_t() ->
?LET(
{Ints, Len},
{non_empty(list(integer(1, ?MAX_TOPIC_LEN))), integer(1, ?MAX_INDEX_LEN)},
lists:usort(lists:sublist(Ints, Len))
).
topic_t() ->
?LET(
{Topic, Len},
{non_empty(list(topic_segment_t())), integer(1, ?MAX_TOPIC_LEN)},
lists:sublist(Topic, Len)
).
filter_t() ->
?LET(
{TopicFilter, Len, MLWildcard},
{
non_empty(list(oneof([topic_segment_t(), '+']))),
integer(1, ?MAX_FILTER_LEN),
oneof([[], ['#']])
},
lists:sublist(TopicFilter, Len) ++ MLWildcard
).
topic_segment_t() ->
?LET(
I,
integer(0, ?CHARS - 1),
<<($0 + I)>>
).

View File

@ -0,0 +1 @@
Fix message indexing in retainer. Previously, clients with wildcard subscriptions could receive excess retained messages, not belonging to the topics matching the subscription.