fix(ds): clear bitmask of topic filter tail containing wildcards

This commit is contained in:
Andrew Mayorov 2023-01-06 13:42:28 +03:00 committed by ieQu1
parent ac0935ef91
commit a11e75d189
2 changed files with 39 additions and 5 deletions

View File

@ -457,8 +457,13 @@ compute_topic_bitmask([], [{hash, level, Size} | Rest], Acc) ->
compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size));
compute_topic_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) ->
compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size));
compute_topic_bitmask(_, [{hash, levels, Size} | Rest], Acc) ->
compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size));
compute_topic_bitmask(Tail, [{hash, levels, Size} | Rest], Acc) ->
Mask =
case lists:member('+', Tail) orelse lists:member('#', Tail) of
true -> 0;
false -> ones(Size)
end,
compute_topic_bitmask([], Rest, bitwise_concat(Acc, Mask, Size));
compute_topic_bitmask(_, [], Acc) ->
Acc.

View File

@ -115,6 +115,19 @@ t_iterate_wildcard(_Config) ->
),
ok.
t_iterate_long_tail_wildcard(_Config) ->
Topic = "b/c/d/e/f/g",
TopicFilter = "b/c/d/e/+/+",
Timestamps = lists:seq(1, 100),
_ = [
store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| PublishedAt <- Timestamps
],
?assertEqual(
lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, TopicFilter, 50)])
).
store(Zone, PublishedAt, Topic, Payload) ->
ID = emqx_guid:gen(),
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
@ -140,13 +153,29 @@ parse_topic(Topic) ->
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_testcase(TC, Config) ->
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(emqx_replay),
Config.
end_per_suite(_Config) ->
ok = application:stop(emqx_replay).
init_per_testcase(TC, Config) ->
ok = set_zone_config(zone(TC), #{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 32, 16],
epoch => 5
}),
{ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)),
Config.
end_per_testcase(_TC, _Config) ->
ok = application:stop(emqx_replay).
end_per_testcase(TC, _Config) ->
ok = emqx_replay_local_store_sup:stop_zone(zone(TC)).
zone(TC) ->
list_to_atom(?MODULE_STRING ++ atom_to_list(TC)).
set_zone_config(Zone, Options) ->
ok = application:set_env(emqx_replay, zone_config, #{
Zone => {emqx_replay_message_storage, Options}
}).