diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 80a83c0a4..c219a5a63 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -282,6 +282,34 @@ t_publish_as_persistent(_Config) -> emqtt:stop(Pub) end. +t_publish_empty_topic_levels(_Config) -> + Sub = connect(<>, true, 30), + Pub = connect(<>, true, 30), + try + {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t//+//#">>, qos1), + Messages = [ + {<<"t//1">>, <<"1">>}, + {<<"t//1/">>, <<"2">>}, + {<<"t//2//">>, <<"3">>}, + {<<"t//2//foo">>, <<"4">>}, + {<<"t//2/foo">>, <<"5">>}, + {<<"t/3/bar">>, <<"6">>} + ], + [emqtt:publish(Pub, Topic, Payload, ?QOS_1) || {Topic, Payload} <- Messages], + Received = receive_messages(length(Messages), 1_500), + ?assertMatch( + [ + #{topic := <<"t//1/">>, payload := <<"2">>}, + #{topic := <<"t//2//">>, payload := <<"3">>}, + #{topic := <<"t//2//foo">>, payload := <<"4">>} + ], + lists:sort(emqx_utils_maps:key_comparer(payload), Received) + ) + after + emqtt:stop(Sub), + emqtt:stop(Pub) + end. + %% connect(ClientId, CleanStart, EI) -> @@ -322,15 +350,18 @@ consume(It) -> end. receive_messages(Count) -> - lists:reverse(receive_messages(Count, [])). + receive_messages(Count, 5_000). -receive_messages(0, Msgs) -> +receive_messages(Count, Timeout) -> + lists:reverse(receive_messages(Count, [], Timeout)). + +receive_messages(0, Msgs, _Timeout) -> Msgs; -receive_messages(Count, Msgs) -> +receive_messages(Count, Msgs, Timeout) -> receive {publish, Msg} -> - receive_messages(Count - 1, [Msg | Msgs]) - after 5_000 -> + receive_messages(Count - 1, [Msg | Msgs], Timeout) + after Timeout -> Msgs end. diff --git a/apps/emqx_utils/src/emqx_utils_maps.erl b/apps/emqx_utils/src/emqx_utils_maps.erl index a3b6961f0..043ab5210 100644 --- a/apps/emqx_utils/src/emqx_utils_maps.erl +++ b/apps/emqx_utils/src/emqx_utils_maps.erl @@ -35,7 +35,8 @@ if_only_to_toggle_enable/2, update_if_present/3, put_if/4, - rename/3 + rename/3, + key_comparer/1 ]). -export_type([config_key/0, config_key_path/0]). @@ -318,3 +319,16 @@ rename(OldKey, NewKey, Map) -> error -> Map end. + +-spec key_comparer(K) -> fun((M, M) -> boolean()) when M :: #{K => _V}. +key_comparer(K) -> + fun + (#{K := V1}, #{K := V2}) -> + V1 < V2; + (#{K := _}, _) -> + false; + (_, #{K := _}) -> + true; + (M1, M2) -> + M1 < M2 + end. diff --git a/apps/emqx_utils/test/emqx_utils_maps_tests.erl b/apps/emqx_utils/test/emqx_utils_maps_tests.erl index 506851f0a..a9f39536e 100644 --- a/apps/emqx_utils/test/emqx_utils_maps_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_maps_tests.erl @@ -110,3 +110,22 @@ best_effort_recursive_sum_test_() -> ) ) ]. + +key_comparer_test() -> + Comp = emqx_utils_maps:key_comparer(foo), + ?assertEqual( + [ + #{}, + #{baz => 42}, + #{foo => 1}, + #{foo => 42}, + #{foo => bar, baz => 42} + ], + lists:sort(Comp, [ + #{foo => 42}, + #{baz => 42}, + #{foo => bar, baz => 42}, + #{foo => 1}, + #{} + ]) + ).