test(ds): Add new helper functions

- Improve message comparison
- Add set operations
This commit is contained in:
ieQu1 2024-07-01 00:56:26 +02:00
parent 210556e545
commit de48077ac4
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
2 changed files with 61 additions and 5 deletions

View File

@ -64,6 +64,7 @@
-export([work_dir/1]). -export([work_dir/1]).
-export([work_dir/2]). -export([work_dir/2]).
-export([clean_work_dir/1]).
-export([load_apps/1]). -export([load_apps/1]).
-export([start_apps/2]). -export([start_apps/2]).
@ -432,6 +433,16 @@ work_dir(TCName, CTConfig) ->
WorkDir = work_dir(CTConfig), WorkDir = work_dir(CTConfig),
filename:join(WorkDir, TCName). filename:join(WorkDir, TCName).
%% @doc Delete contents of the workdir.
clean_work_dir(WorkDir) ->
ct:pal("Cleaning workdir ~p", [WorkDir]),
case re:run(WorkDir, "./_build/test/logs/") of
{match, _} ->
file:del_dir_r(WorkDir);
nomatch ->
error({unsafe_workdir, WorkDir})
end.
%% %%
start_ekka() -> start_ekka() ->

View File

@ -148,8 +148,7 @@ do_ds_topic_generation_stream(DB, Node, Shard, It0) ->
?ON( ?ON(
Node, Node,
begin begin
Now = emqx_ds_replication_layer:current_timestamp(DB, Shard), emqx_ds_storage_layer:next(Shard, It0, 1, _Now = 1 bsl 63)
emqx_ds_storage_layer:next(Shard, It0, 1, Now)
end end
) )
of of
@ -233,15 +232,60 @@ transitions(Node, DB) ->
end end
). ).
%% Stream comparison %% Message comparison
%% Try to eliminate any ambiguity in the message representation.
message_canonical_form(Msg0 = #message{}) ->
message_canonical_form(emqx_message:to_map(Msg0));
message_canonical_form(#{flags := Flags0, headers := Headers0, payload := Payload0} = Msg) ->
%% Remove flags that are false:
Flags = maps:filter(
fun(_Key, Val) -> Val end,
Flags0
),
Msg#{flags := Flags, payload := iolist_to_binary(Payload0)}.
sublist(L) ->
PrintMax = 20,
case length(L) of
0 ->
[];
N when N > PrintMax ->
lists:sublist(L, 1, PrintMax) ++ ['...', N - PrintMax, 'more'];
_ ->
L
end.
message_set(L) ->
ordsets:from_list([message_canonical_form(I) || I <- L]).
message_set_subtract(A, B) ->
ordsets:subtract(message_set(A), message_set(B)).
assert_same_set(Expected, Got) ->
assert_same_set(Expected, Got, #{}).
assert_same_set(Expected, Got, Comment) ->
SE = message_set(Expected),
SG = message_set(Got),
case {ordsets:subtract(SE, SG), ordsets:subtract(SG, SE)} of
{[], []} ->
ok;
{Missing, Unexpected} ->
error(Comment#{
matching => sublist(ordsets:intersection(SE, SG)),
missing => sublist(Missing),
unexpected => sublist(Unexpected)
})
end.
message_eq(Fields, {_Key, Msg1 = #message{}}, Msg2) -> message_eq(Fields, {_Key, Msg1 = #message{}}, Msg2) ->
message_eq(Fields, Msg1, Msg2); message_eq(Fields, Msg1, Msg2);
message_eq(Fields, Msg1, {_Key, Msg2 = #message{}}) -> message_eq(Fields, Msg1, {_Key, Msg2 = #message{}}) ->
message_eq(Fields, Msg1, Msg2); message_eq(Fields, Msg1, Msg2);
message_eq(Fields, Msg1 = #message{}, Msg2 = #message{}) -> message_eq(Fields, Msg1 = #message{}, Msg2 = #message{}) ->
maps:with(Fields, emqx_message:to_map(Msg1)) =:= maps:with(Fields, message_canonical_form(Msg1)) =:=
maps:with(Fields, emqx_message:to_map(Msg2)). maps:with(Fields, message_canonical_form(Msg2)).
%% Consuming streams and iterators %% Consuming streams and iterators
@ -304,6 +348,7 @@ ds_topic_stream(DB, ClientId, TopicBin, Node) ->
{DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)} {DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)}
end end
), ),
ct:pal("Streams for ~p, ~p @ ~p:~n ~p", [ClientId, TopicBin, Node, DSStreams]),
%% Sort streams by their rank Y, and chain them together: %% Sort streams by their rank Y, and chain them together:
emqx_utils_stream:chain([ emqx_utils_stream:chain([
ds_topic_generation_stream(DB, Node, ShardId, Topic, S) ds_topic_generation_stream(DB, Node, ShardId, Topic, S)