From de48077ac45d5f924727f284d9ddda5c6e479be2 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 1 Jul 2024 00:56:26 +0200 Subject: [PATCH] test(ds): Add new helper functions - Improve message comparison - Add set operations --- apps/emqx/test/emqx_cth_suite.erl | 11 ++++ .../test/emqx_ds_test_helpers.erl | 55 +++++++++++++++++-- 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 39e3ebc45..f4fcd0a75 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -64,6 +64,7 @@ -export([work_dir/1]). -export([work_dir/2]). +-export([clean_work_dir/1]). -export([load_apps/1]). -export([start_apps/2]). @@ -432,6 +433,16 @@ work_dir(TCName, CTConfig) -> WorkDir = work_dir(CTConfig), filename:join(WorkDir, TCName). +%% @doc Delete contents of the workdir. +clean_work_dir(WorkDir) -> + ct:pal("Cleaning workdir ~p", [WorkDir]), + case re:run(WorkDir, "./_build/test/logs/") of + {match, _} -> + file:del_dir_r(WorkDir); + nomatch -> + error({unsafe_workdir, WorkDir}) + end. + %% start_ekka() -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index af41df1ad..fe903fad2 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -148,8 +148,7 @@ do_ds_topic_generation_stream(DB, Node, Shard, It0) -> ?ON( Node, begin - Now = emqx_ds_replication_layer:current_timestamp(DB, Shard), - emqx_ds_storage_layer:next(Shard, It0, 1, Now) + emqx_ds_storage_layer:next(Shard, It0, 1, _Now = 1 bsl 63) end ) of @@ -233,15 +232,60 @@ transitions(Node, DB) -> end ). -%% Stream comparison +%% Message comparison + +%% Try to eliminate any ambiguity in the message representation. +message_canonical_form(Msg0 = #message{}) -> + message_canonical_form(emqx_message:to_map(Msg0)); +message_canonical_form(#{flags := Flags0, headers := Headers0, payload := Payload0} = Msg) -> + %% Remove flags that are false: + Flags = maps:filter( + fun(_Key, Val) -> Val end, + Flags0 + ), + Msg#{flags := Flags, payload := iolist_to_binary(Payload0)}. + +sublist(L) -> + PrintMax = 20, + case length(L) of + 0 -> + []; + N when N > PrintMax -> + lists:sublist(L, 1, PrintMax) ++ ['...', N - PrintMax, 'more']; + _ -> + L + end. + +message_set(L) -> + ordsets:from_list([message_canonical_form(I) || I <- L]). + +message_set_subtract(A, B) -> + ordsets:subtract(message_set(A), message_set(B)). + +assert_same_set(Expected, Got) -> + assert_same_set(Expected, Got, #{}). + +assert_same_set(Expected, Got, Comment) -> + SE = message_set(Expected), + SG = message_set(Got), + case {ordsets:subtract(SE, SG), ordsets:subtract(SG, SE)} of + {[], []} -> + ok; + {Missing, Unexpected} -> + error(Comment#{ + matching => sublist(ordsets:intersection(SE, SG)), + missing => sublist(Missing), + unexpected => sublist(Unexpected) + }) + end. message_eq(Fields, {_Key, Msg1 = #message{}}, Msg2) -> message_eq(Fields, Msg1, Msg2); message_eq(Fields, Msg1, {_Key, Msg2 = #message{}}) -> message_eq(Fields, Msg1, Msg2); message_eq(Fields, Msg1 = #message{}, Msg2 = #message{}) -> - maps:with(Fields, emqx_message:to_map(Msg1)) =:= - maps:with(Fields, emqx_message:to_map(Msg2)). + maps:with(Fields, message_canonical_form(Msg1)) =:= + maps:with(Fields, message_canonical_form(Msg2)). %% Consuming streams and iterators @@ -304,6 +348,7 @@ ds_topic_stream(DB, ClientId, TopicBin, Node) -> {DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)} end ), + ct:pal("Streams for ~p, ~p @ ~p:~n ~p", [ClientId, TopicBin, Node, DSStreams]), %% Sort streams by their rank Y, and chain them together: emqx_utils_stream:chain([ ds_topic_generation_stream(DB, Node, ShardId, Topic, S)