diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index a6ca86f7e..fe0a30489 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -817,9 +817,10 @@ t_crash_restart_recover(Config) -> DBOpts = opts(#{n_shards => 16, n_sites => 3, replication_factor => 3}), %% Prepare test event stream. - {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages( - ?FUNCTION_NAME, _NClients = 8, _NMsgs = 400 - ), + NMsgs = 400, + NClients = 8, + {Stream0, TopicStreams} = + emqx_ds_test_helpers:interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), Stream1 = emqx_utils_stream:interleave( [ {300, Stream0}, @@ -849,19 +850,59 @@ t_crash_restart_recover(Config) -> %% Apply the test events, including simulated node crashes. NodeStream = emqx_utils_stream:const(N1), emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0), - timer:sleep(5000), - %% Verify that all the data is there. - emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams) + %% It's expected to lose few messages when leaders are abruptly killed. + MatchFlushFailed = ?match_event(#{?snk_kind := emqx_ds_buffer_flush_failed}), + {ok, SubRef} = snabbkaffe:subscribe(MatchFlushFailed, NMsgs, _Timeout = 5000, infinity), + {timeout, Events} = snabbkaffe:receive_events(SubRef), + LostMessages = [M || #{batch := Messages} <- Events, M <- Messages], + ct:pal("Some messages were lost: ~p", [LostMessages]), + ?assert(length(LostMessages) < NMsgs div 20), + + %% Verify that all the successfully persisted messages are there. + VerifyClient = fun({ClientId, ExpectedStream}) -> + Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId), + ClientNodes = nodes_of_clientid(ClientId, Nodes), + DSStream1 = ds_topic_stream(ClientId, Topic, hd(ClientNodes)), + %% Do nodes contain same messages for a client? + lists:foreach( + fun(ClientNode) -> + DSStream = ds_topic_stream(ClientId, Topic, ClientNode), + ?defer_assert(emqx_ds_test_helpers:diff_messages(DSStream1, DSStream)) + end, + tl(ClientNodes) + ), + %% Does any messages were lost unexpectedly? + {_, DSMessages} = lists:unzip(emqx_utils_stream:consume(DSStream1)), + ExpectedMessages = emqx_utils_stream:consume(ExpectedStream), + MissingMessages = ExpectedMessages -- DSMessages, + ?defer_assert(?assertEqual([], MissingMessages -- LostMessages, DSMessages)) + end, + lists:foreach(VerifyClient, TopicStreams) end, [] ). +nodes_of_clientid(ClientId, Nodes) -> + emqx_ds_test_helpers:nodes_of_clientid(?DB, ClientId, Nodes). + +ds_topic_stream(ClientId, ClientTopic, Node) -> + emqx_ds_test_helpers:ds_topic_stream(?DB, ClientId, ClientTopic, Node). + +is_message_lost(Message, MessagesLost) -> + lists:any( + fun(ML) -> + emqx_ds_test_helpers:message_eq([clientid, topic, payload], Message, ML) + end, + MessagesLost + ). + kill_restart_node_async(Node, Spec, DBOpts) -> erlang:spawn_link(?MODULE, kill_restart_node, [Node, Spec, DBOpts]). kill_restart_node(Node, Spec, DBOpts) -> ok = emqx_cth_peer:kill(Node), + ?tp(test_cluster_node_killed, #{node => Node}), _ = emqx_cth_cluster:restart(Spec), ok = erpc:call(Node, emqx_ds, open_db, [?DB, DBOpts]). diff --git a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl index e93bb33be..dec9eea80 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl @@ -314,7 +314,7 @@ do_flush( ?tp( debug, emqx_ds_buffer_flush_failed, - #{db => DB, shard => Shard, error => Err} + #{db => DB, shard => Shard, batch => Messages, error => Err} ), emqx_ds_builtin_metrics:inc_buffer_batches_failed(Metrics), Reply = diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index 0b6b634c5..af41df1ad 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -266,15 +266,18 @@ verify_stream_effects(DB, TestCase, Node, ClientId, ExpectedStream) -> ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]), ?defer_assert( begin - snabbkaffe_diff:assert_lists_eq( + diff_messages( ExpectedStream, - ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node), - message_diff_options([id, qos, from, flags, headers, topic, payload, extra]) + ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node) ), ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node]) end ). +diff_messages(Expected, Got) -> + Fields = [id, qos, from, flags, headers, topic, payload, extra], + diff_messages(Fields, Expected, Got). + diff_messages(Fields, Expected, Got) -> snabbkaffe_diff:assert_lists_eq(Expected, Got, message_diff_options(Fields)).