From a11e75d1894add20802fc7983e7342c6473ccca1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 6 Jan 2023 13:42:28 +0300 Subject: [PATCH] fix(ds): clear bitmask of topic filter tail containing wildcards --- .../src/emqx_replay_message_storage.erl | 9 +++-- .../test/emqx_replay_storage_SUITE.erl | 35 +++++++++++++++++-- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index 1c91066cf..759ddf559 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -457,8 +457,13 @@ compute_topic_bitmask([], [{hash, level, Size} | Rest], Acc) -> compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size)); compute_topic_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) -> compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size)); -compute_topic_bitmask(_, [{hash, levels, Size} | Rest], Acc) -> - compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size)); +compute_topic_bitmask(Tail, [{hash, levels, Size} | Rest], Acc) -> + Mask = + case lists:member('+', Tail) orelse lists:member('#', Tail) of + true -> 0; + false -> ones(Size) + end, + compute_topic_bitmask([], Rest, bitwise_concat(Acc, Mask, Size)); compute_topic_bitmask(_, [], Acc) -> Acc. diff --git a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl index c99063350..3d7e7cb41 100644 --- a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl @@ -115,6 +115,19 @@ t_iterate_wildcard(_Config) -> ), ok. +t_iterate_long_tail_wildcard(_Config) -> + Topic = "b/c/d/e/f/g", + TopicFilter = "b/c/d/e/+/+", + Timestamps = lists:seq(1, 100), + _ = [ + store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + || PublishedAt <- Timestamps + ], + ?assertEqual( + lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, TopicFilter, 50)]) + ). + store(Zone, PublishedAt, Topic, Payload) -> ID = emqx_guid:gen(), emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload). @@ -140,13 +153,29 @@ parse_topic(Topic) -> all() -> emqx_common_test_helpers:all(?MODULE). -init_per_testcase(TC, Config) -> +init_per_suite(Config) -> {ok, _} = application:ensure_all_started(emqx_replay), + Config. + +end_per_suite(_Config) -> + ok = application:stop(emqx_replay). + +init_per_testcase(TC, Config) -> + ok = set_zone_config(zone(TC), #{ + timestamp_bits => 64, + topic_bits_per_level => [8, 8, 32, 16], + epoch => 5 + }), {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)), Config. -end_per_testcase(_TC, _Config) -> - ok = application:stop(emqx_replay). +end_per_testcase(TC, _Config) -> + ok = emqx_replay_local_store_sup:stop_zone(zone(TC)). zone(TC) -> list_to_atom(?MODULE_STRING ++ atom_to_list(TC)). + +set_zone_config(Zone, Options) -> + ok = application:set_env(emqx_replay, zone_config, #{ + Zone => {emqx_replay_message_storage, Options} + }).