From b5c485cd554d6d2644a15f7959a554e8ea233961 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 4 Dec 2023 12:58:37 +0300 Subject: [PATCH 1/3] test(sessds): add empty level topic subscription testcase --- .../test/emqx_persistent_messages_SUITE.erl | 41 ++++++++++++++++--- apps/emqx_utils/src/emqx_utils_maps.erl | 16 +++++++- .../emqx_utils/test/emqx_utils_maps_tests.erl | 19 +++++++++ 3 files changed, 70 insertions(+), 6 deletions(-) diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 80a83c0a4..c219a5a63 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -282,6 +282,34 @@ t_publish_as_persistent(_Config) -> emqtt:stop(Pub) end. +t_publish_empty_topic_levels(_Config) -> + Sub = connect(<>, true, 30), + Pub = connect(<>, true, 30), + try + {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t//+//#">>, qos1), + Messages = [ + {<<"t//1">>, <<"1">>}, + {<<"t//1/">>, <<"2">>}, + {<<"t//2//">>, <<"3">>}, + {<<"t//2//foo">>, <<"4">>}, + {<<"t//2/foo">>, <<"5">>}, + {<<"t/3/bar">>, <<"6">>} + ], + [emqtt:publish(Pub, Topic, Payload, ?QOS_1) || {Topic, Payload} <- Messages], + Received = receive_messages(length(Messages), 1_500), + ?assertMatch( + [ + #{topic := <<"t//1/">>, payload := <<"2">>}, + #{topic := <<"t//2//">>, payload := <<"3">>}, + #{topic := <<"t//2//foo">>, payload := <<"4">>} + ], + lists:sort(emqx_utils_maps:key_comparer(payload), Received) + ) + after + emqtt:stop(Sub), + emqtt:stop(Pub) + end. + %% connect(ClientId, CleanStart, EI) -> @@ -322,15 +350,18 @@ consume(It) -> end. receive_messages(Count) -> - lists:reverse(receive_messages(Count, [])). + receive_messages(Count, 5_000). -receive_messages(0, Msgs) -> +receive_messages(Count, Timeout) -> + lists:reverse(receive_messages(Count, [], Timeout)). + +receive_messages(0, Msgs, _Timeout) -> Msgs; -receive_messages(Count, Msgs) -> +receive_messages(Count, Msgs, Timeout) -> receive {publish, Msg} -> - receive_messages(Count - 1, [Msg | Msgs]) - after 5_000 -> + receive_messages(Count - 1, [Msg | Msgs], Timeout) + after Timeout -> Msgs end. diff --git a/apps/emqx_utils/src/emqx_utils_maps.erl b/apps/emqx_utils/src/emqx_utils_maps.erl index a3b6961f0..043ab5210 100644 --- a/apps/emqx_utils/src/emqx_utils_maps.erl +++ b/apps/emqx_utils/src/emqx_utils_maps.erl @@ -35,7 +35,8 @@ if_only_to_toggle_enable/2, update_if_present/3, put_if/4, - rename/3 + rename/3, + key_comparer/1 ]). -export_type([config_key/0, config_key_path/0]). @@ -318,3 +319,16 @@ rename(OldKey, NewKey, Map) -> error -> Map end. + +-spec key_comparer(K) -> fun((M, M) -> boolean()) when M :: #{K => _V}. +key_comparer(K) -> + fun + (#{K := V1}, #{K := V2}) -> + V1 < V2; + (#{K := _}, _) -> + false; + (_, #{K := _}) -> + true; + (M1, M2) -> + M1 < M2 + end. diff --git a/apps/emqx_utils/test/emqx_utils_maps_tests.erl b/apps/emqx_utils/test/emqx_utils_maps_tests.erl index 506851f0a..a9f39536e 100644 --- a/apps/emqx_utils/test/emqx_utils_maps_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_maps_tests.erl @@ -110,3 +110,22 @@ best_effort_recursive_sum_test_() -> ) ) ]. + +key_comparer_test() -> + Comp = emqx_utils_maps:key_comparer(foo), + ?assertEqual( + [ + #{}, + #{baz => 42}, + #{foo => 1}, + #{foo => 42}, + #{foo => bar, baz => 42} + ], + lists:sort(Comp, [ + #{foo => 42}, + #{baz => 42}, + #{foo => bar, baz => 42}, + #{foo => 1}, + #{} + ]) + ). From 92c4b29a4ca2986e5f85630478be628428980096 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 4 Dec 2023 12:59:16 +0300 Subject: [PATCH 2/3] feat(topic): match empty topic levels more loosely So that the result of `emqx_topic:tokens/1` would be perfectly matchable with the result of `emqx_topic:words/1` of a topic filter with empty levels. --- apps/emqx/src/emqx_topic.erl | 8 +++----- apps/emqx/test/emqx_topic_SUITE.erl | 6 ++++++ 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index 76c6ef34e..bc6946a43 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -91,13 +91,11 @@ match([H | T1], [H | T2]) -> match(T1, T2); match([_H | T1], ['+' | T2]) -> match(T1, T2); +match([<<>> | T1], ['' | T2]) -> + match(T1, T2); match(_, ['#']) -> true; -match([_H1 | _], [_H2 | _]) -> - false; -match([_H1 | _], []) -> - false; -match([], [_H | _T2]) -> +match(_, _) -> false. -spec match_share(Name, Filter) -> boolean() when diff --git a/apps/emqx/test/emqx_topic_SUITE.erl b/apps/emqx/test/emqx_topic_SUITE.erl index 4761ea17d..7a2130992 100644 --- a/apps/emqx/test/emqx_topic_SUITE.erl +++ b/apps/emqx/test/emqx_topic_SUITE.erl @@ -115,6 +115,12 @@ t_sys_match(_) -> true = match(<<"a/b/$c">>, <<"a/b/#">>), true = match(<<"a/b/$c">>, <<"a/#">>). +t_match_tokens(_) -> + true = match(emqx_topic:tokens(<<"a/b/c">>), words(<<"a/+/c">>)), + true = match(emqx_topic:tokens(<<"a//c">>), words(<<"a/+/c">>)), + false = match(emqx_topic:tokens(<<"a//c/">>), words(<<"a/+/c">>)), + true = match(emqx_topic:tokens(<<"a//c/">>), words(<<"a/+/c/#">>)). + t_match_perf(_) -> true = match(<<"a/b/ccc">>, <<"a/#">>), Name = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>, From 130a5a54425e8a27f60b001fe9800fdac291cbcd Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 4 Dec 2023 13:12:27 +0300 Subject: [PATCH 3/3] fix(ds): pass topics to `emqx_topic:words/1` before feeding LTS tree So that empty levels in topics will be properly mapped into `''` atoms. --- .../emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 6a69a20f3..c2f533673 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -368,7 +368,7 @@ check_message( #{?tag := ?IT, ?start_time := StartTime, ?topic_filter := TopicFilter}, #message{timestamp = Timestamp, topic = Topic} ) when Timestamp >= StartTime -> - emqx_topic:match(emqx_topic:words(Topic), TopicFilter); + emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter); check_message(_Cutoff, _It, _Msg) -> false. @@ -378,7 +378,7 @@ format_key(KeyMapper, Key) -> -spec make_key(s(), emqx_types:message()) -> {binary(), [binary()]}. make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) -> - Tokens = emqx_topic:tokens(TopicBin), + Tokens = emqx_topic:words(TopicBin), {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), VaryingHashes = [hash_topic_level(I) || I <- Varying], KeyMapper = array:get(length(Varying), KeyMappers),