diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 920e2528f..15391de6e 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -163,7 +163,7 @@ mk_clientid(Prefix, ID) -> restart_node(Node, NodeSpec) -> ?tp(will_restart_node, #{}), - emqx_cth_cluster:restart(Node, NodeSpec), + emqx_cth_cluster:restart(NodeSpec), wait_nodeup(Node), ?tp(restarted_node, #{}), ok. diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index 981b2e5eb..20400a1c4 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -38,7 +38,7 @@ %% in `end_per_suite/1` or `end_per_group/2`) with the result from step 2. -module(emqx_cth_cluster). --export([start/1, start/2, restart/1, restart/2]). +-export([start/1, start/2, restart/1]). -export([stop/1, stop_node/1]). -export([start_bare_nodes/1, start_bare_nodes/2]). @@ -163,13 +163,13 @@ wait_clustered([Node | Nodes] = All, Check, Deadline) -> wait_clustered(All, Check, Deadline) end. -restart(NodeSpec) -> - restart(maps:get(name, NodeSpec), NodeSpec). - -restart(Node, Spec) -> - ct:pal("Stopping peer node ~p", [Node]), - ok = emqx_cth_peer:stop(Node), - start([Spec#{boot_type => restart}]). +restart(NodeSpecs = [_ | _]) -> + Nodes = [maps:get(name, Spec) || Spec <- NodeSpecs], + ct:pal("Stopping peer nodes: ~p", [Nodes]), + ok = stop(Nodes), + start([Spec#{boot_type => restart} || Spec <- NodeSpecs]); +restart(NodeSpec = #{}) -> + restart([NodeSpec]). mk_nodespecs(Nodes, ClusterOpts) -> NodeSpecs = lists:zipwith( diff --git a/apps/emqx/test/emqx_cth_peer.erl b/apps/emqx/test/emqx_cth_peer.erl index b3849739a..9db595d73 100644 --- a/apps/emqx/test/emqx_cth_peer.erl +++ b/apps/emqx/test/emqx_cth_peer.erl @@ -22,6 +22,7 @@ -export([start/2, start/3, start/4]). -export([start_link/2, start_link/3, start_link/4]). -export([stop/1]). +-export([kill/1]). start(Name, Args) -> start(Name, Args, []). @@ -66,6 +67,32 @@ stop(Node) when is_atom(Node) -> ok end. +%% @doc Kill a node abruptly, through mechanisms provided by OS. +%% Relies on POSIX `kill`. +kill(Node) -> + try erpc:call(Node, os, getpid, []) of + OSPid -> + Pid = whereis(Node), + _ = is_pid(Pid) andalso unlink(Pid), + Result = kill_os_process(OSPid), + %% Either ensure control process stops, or try to stop if not killed. + _ = is_pid(Pid) andalso catch peer:stop(Pid), + Result + catch + error:{erpc, _} = Reason -> + {error, Reason} + end. + +kill_os_process(OSPid) -> + Cmd = "kill -SIGKILL " ++ OSPid, + Port = erlang:open_port({spawn, Cmd}, [binary, exit_status, hide]), + receive + {Port, {exit_status, 0}} -> + ok; + {Port, {exit_status, EC}} -> + {error, EC} + end. + parse_node_name(NodeName) -> case string:tokens(atom_to_list(NodeName), "@") of [Name, Host] -> diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index 0a1173e70..669abdbf1 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -57,11 +57,14 @@ ra_store_batch/3 ]). +-behaviour(ra_machine). -export([ init/1, apply/3, tick/2, + state_enter/2, + snapshot_module/0 ]). @@ -143,7 +146,12 @@ %% Core state of the replication, i.e. the state of ra machine. -type ra_state() :: #{ + %% Shard ID. db_shard := {emqx_ds:db(), shard_id()}, + + %% Unique timestamp tracking real time closely. + %% With microsecond granularity it should be nearly impossible for it to run + %% too far ahead of the real time clock. latest := timestamp_us() }. @@ -374,7 +382,7 @@ init_buffer(_DB, _Shard, _Options) -> {ok, #bs{}}. -spec flush_buffer(emqx_ds:db(), shard_id(), [emqx_types:message()], egress_state()) -> - {egress_state(), ok | {error, recoverable | unrecoverable, _}}. + {egress_state(), ok | emqx_ds:error(_)}. flush_buffer(DB, Shard, Messages, State) -> case ra_store_batch(DB, Shard, Messages) of {timeout, ServerId} -> @@ -574,6 +582,20 @@ list_nodes() -> %% Too large for normal operation, need better backpressure mechanism. -define(RA_TIMEOUT, 60 * 1000). +%% How often to release Raft logs? +%% Each time we written approximately this number of bytes. +%% Close to the RocksDB's default of 64 MiB. +-define(RA_RELEASE_LOG_APPROX_SIZE, 50_000_000). +%% ...Or at least each N log entries. +-define(RA_RELEASE_LOG_MIN_FREQ, 64_000). + +-ifdef(TEST). +-undef(RA_RELEASE_LOG_APPROX_SIZE). +-undef(RA_RELEASE_LOG_MIN_FREQ). +-define(RA_RELEASE_LOG_APPROX_SIZE, 50_000). +-define(RA_RELEASE_LOG_MIN_FREQ, 1_000). +-endif. + -define(SAFE_ERPC(EXPR), try EXPR @@ -603,18 +625,20 @@ list_nodes() -> ). -spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) -> - ok | {timeout, _} | {error, recoverable | unrecoverable, _Err} | _Err. + ok | {timeout, _} | {error, recoverable | unrecoverable, _Err}. ra_store_batch(DB, Shard, Messages) -> Command = #{ ?tag => ?BATCH, ?batch_messages => Messages }, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command, ?RA_TIMEOUT) of + case emqx_ds_replication_layer_shard:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> Result; - Error -> - Error + {timeout, _} = Timeout -> + Timeout; + {error, Reason = servers_unreachable} -> + {error, recoverable, Reason} end. ra_add_generation(DB, Shard) -> @@ -741,6 +765,9 @@ ra_drop_shard(DB, Shard) -> %% +-define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release'). +-define(pd_ra_bytes_need_release, '$emqx_ds_raft_bytes_need_release'). + -spec init(_Args :: map()) -> ra_state(). init(#{db := DB, shard := Shard}) -> #{db_shard => {DB, Shard}, latest => 0}. @@ -748,33 +775,29 @@ init(#{db := DB, shard := Shard}) -> -spec apply(ra_machine:command_meta_data(), ra_command(), ra_state()) -> {ra_state(), _Reply, _Effects}. apply( - #{index := RaftIdx}, + RaftMeta, #{ ?tag := ?BATCH, ?batch_messages := MessagesIn }, #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0 ) -> - %% NOTE - %% Unique timestamp tracking real time closely. - %% With microsecond granularity it should be nearly impossible for it to run - %% too far ahead than the real time clock. - ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, ts => Latest0}), - {Latest, Messages} = assign_timestamps(Latest0, MessagesIn), - Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), + ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}), + {Stats, Latest, Messages} = assign_timestamps(Latest0, MessagesIn), + Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{durable => false}), State = State0#{latest := Latest}, set_ts(DBShard, Latest), - %% TODO: Need to measure effects of changing frequency of `release_cursor`. - Effect = {release_cursor, RaftIdx, State}, - {State, Result, Effect}; + Effects = try_release_log(Stats, RaftMeta, State), + Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), + {State, Result, Effects}; apply( - _RaftMeta, + RaftMeta, #{?tag := add_generation, ?since := Since}, #{db_shard := DBShard, latest := Latest0} = State0 ) -> ?tp( info, - ds_replication_layer_add_generation, + ds_ra_add_generation, #{ shard => DBShard, since => Since @@ -784,15 +807,17 @@ apply( Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp), State = State0#{latest := Latest}, set_ts(DBShard, Latest), - {State, Result}; + Effects = release_log(RaftMeta, State), + Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), + {State, Result, Effects}; apply( - _RaftMeta, + RaftMeta, #{?tag := update_config, ?since := Since, ?config := Opts}, #{db_shard := DBShard, latest := Latest0} = State0 ) -> ?tp( notice, - ds_replication_layer_update_config, + ds_ra_update_config, #{ shard => DBShard, config => Opts, @@ -802,7 +827,9 @@ apply( {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0), Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts), State = State0#{latest := Latest}, - {State, Result}; + Effects = release_log(RaftMeta, State), + Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), + {State, Result, Effects}; apply( _RaftMeta, #{?tag := drop_generation, ?generation := GenId}, @@ -810,7 +837,7 @@ apply( ) -> ?tp( info, - ds_replication_layer_drop_generation, + ds_ra_drop_generation, #{ shard => DBShard, generation => GenId @@ -827,7 +854,7 @@ apply( set_ts(DBShard, Latest), ?tp( debug, - emqx_ds_replication_layer_storage_event, + ds_ra_storage_event, #{ shard => DBShard, payload => CustomEvent, latest => Latest } @@ -835,27 +862,83 @@ apply( Effects = handle_custom_event(DBShard, Latest, CustomEvent), {State#{latest => Latest}, ok, Effects}. +try_release_log({_N, BatchSize}, RaftMeta = #{index := CurrentIdx}, State) -> + %% NOTE + %% Because cursor release means storage flush (see + %% `emqx_ds_replication_snapshot:write/3`), we should do that not too often + %% (so the storage is happy with L0 SST sizes) and not too rarely (so we don't + %% accumulate huge Raft logs). + case inc_bytes_need_release(BatchSize) of + AccSize when AccSize > ?RA_RELEASE_LOG_APPROX_SIZE -> + release_log(RaftMeta, State); + _NotYet -> + case get_log_need_release(RaftMeta) of + undefined -> + []; + PrevIdx when CurrentIdx - PrevIdx > ?RA_RELEASE_LOG_MIN_FREQ -> + %% Release everything up to the last log entry, but only if there were + %% more than %% `?RA_RELEASE_LOG_MIN_FREQ` new entries since the last + %% release. + release_log(RaftMeta, State); + _ -> + [] + end + end. + +release_log(RaftMeta = #{index := CurrentIdx}, State) -> + %% NOTE + %% Release everything up to the last log entry. This is important: any log entries + %% following `CurrentIdx` should not contribute to `State` (that will be recovered + %% from a snapshot). + update_log_need_release(RaftMeta), + reset_bytes_need_release(), + {release_cursor, CurrentIdx, State}. + +get_log_need_release(RaftMeta) -> + case erlang:get(?pd_ra_idx_need_release) of + undefined -> + update_log_need_release(RaftMeta), + undefined; + LastIdx -> + LastIdx + end. + +update_log_need_release(#{index := CurrentIdx}) -> + erlang:put(?pd_ra_idx_need_release, CurrentIdx). + +get_bytes_need_release() -> + emqx_maybe:define(erlang:get(?pd_ra_bytes_need_release), 0). + +inc_bytes_need_release(Size) -> + Acc = get_bytes_need_release() + Size, + erlang:put(?pd_ra_bytes_need_release, Acc), + Acc. + +reset_bytes_need_release() -> + erlang:put(?pd_ra_bytes_need_release, 0). + -spec tick(integer(), ra_state()) -> ra_machine:effects(). tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard), {Timestamp, _} = ensure_monotonic_timestamp(timestamp_to_timeus(TimeMs), Latest), - ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}), + ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}), handle_custom_event(DBShard, Timestamp, tick). assign_timestamps(Latest, Messages) -> - assign_timestamps(Latest, Messages, []). + assign_timestamps(Latest, Messages, [], 0, 0). -assign_timestamps(Latest, [MessageIn | Rest], Acc) -> - case emqx_message:timestamp(MessageIn, microsecond) of - TimestampUs when TimestampUs > Latest -> - Message = assign_timestamp(TimestampUs, MessageIn), - assign_timestamps(TimestampUs, Rest, [Message | Acc]); +assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) -> + case emqx_message:timestamp(Message0, microsecond) of + TimestampUs when TimestampUs > Latest0 -> + Latest = TimestampUs, + Message = assign_timestamp(TimestampUs, Message0); _Earlier -> - Message = assign_timestamp(Latest + 1, MessageIn), - assign_timestamps(Latest + 1, Rest, [Message | Acc]) - end; -assign_timestamps(Latest, [], Acc) -> - {Latest, lists:reverse(Acc)}. + Latest = Latest0 + 1, + Message = assign_timestamp(Latest, Message0) + end, + assign_timestamps(Latest, Rest, [Message | Acc], N + 1, Sz + approx_message_size(Message0)); +assign_timestamps(Latest, [], Acc, N, Size) -> + {{N, Size}, Latest, lists:reverse(Acc)}. assign_timestamp(TimestampUs, Message) -> {TimestampUs, Message}. @@ -888,3 +971,26 @@ handle_custom_event(DBShard, Latest, Event) -> set_ts({DB, Shard}, TS) -> emqx_ds_builtin_raft_sup:set_gvar(DB, ?gv_timestamp(Shard), TS). + +%% + +-spec state_enter(ra_server:ra_state() | eol, ra_state()) -> ra_machine:effects(). +state_enter(MemberState, #{db_shard := {DB, Shard}, latest := Latest}) -> + ?tp( + ds_ra_state_enter, + #{db => DB, shard => Shard, latest => Latest, state => MemberState} + ), + []. + +%% + +approx_message_size(#message{from = ClientID, topic = Topic, payload = Payload}) -> + %% NOTE: Overhead here is basically few empty maps + 8-byte message id. + %% TODO: Probably need to ask the storage layer about the footprint. + MinOverhead = 40, + MinOverhead + clientid_size(ClientID) + byte_size(Topic) + byte_size(Payload). + +clientid_size(ClientID) when is_binary(ClientID) -> + byte_size(ClientID); +clientid_size(ClientID) -> + erlang:external_size(ClientID). diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl index b43373c43..cdd62d874 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl @@ -4,6 +4,8 @@ -module(emqx_ds_replication_layer_shard). +-include_lib("snabbkaffe/include/trace.hrl"). + %% API: -export([start_link/3]). @@ -19,6 +21,12 @@ servers/3 ]). +%% Safe Process Command API +-export([ + process_command/3, + try_servers/3 +]). + %% Membership -export([ add_local_server/2, @@ -37,6 +45,12 @@ -type server() :: ra:server_id(). +-type server_error() :: server_error(none()). +-type server_error(Reason) :: + {timeout, server()} + | {error, server(), Reason} + | {error, servers_unreachable}. + -define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000). %% @@ -146,6 +160,40 @@ local_site() -> %% +-spec process_command([server()], _Command, timeout()) -> + {ok, _Result, _Leader :: server()} | server_error(). +process_command(Servers, Command, Timeout) -> + try_servers(Servers, fun ra:process_command/3, [Command, Timeout]). + +-spec try_servers([server()], function(), [_Arg]) -> + {ok, _Result, _Leader :: server()} | server_error(_Reason). +try_servers([Server | Rest], Fun, Args) -> + case is_server_online(Server) andalso erlang:apply(Fun, [Server | Args]) of + {ok, R, Leader} -> + {ok, R, Leader}; + _Online = false -> + ?tp(emqx_ds_replshard_try_next_servers, #{server => Server, reason => offline}), + try_servers(Rest, Fun, Args); + {error, Reason = noproc} -> + ?tp(emqx_ds_replshard_try_next_servers, #{server => Server, reason => Reason}), + try_servers(Rest, Fun, Args); + {error, Reason} when Reason =:= nodedown orelse Reason =:= shutdown -> + %% NOTE + %% Conceptually, those error conditions basically mean the same as a plain + %% timeout: "it's impossible to tell if operation has succeeded or not". + ?tp(emqx_ds_replshard_try_servers_timeout, #{server => Server, reason => Reason}), + {timeout, Server}; + {timeout, _} = Timeout -> + ?tp(emqx_ds_replshard_try_servers_timeout, #{server => Server, reason => timeout}), + Timeout; + {error, Reason} -> + {error, Server, Reason} + end; +try_servers([], _Fun, _Args) -> + {error, servers_unreachable}. + +%% + %% @doc Add a local server to the shard cluster. %% It's recommended to have the local server running before calling this function. %% This function is idempotent. @@ -174,10 +222,10 @@ add_local_server(DB, Shard) -> } end, Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT, - case ra_try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of + case try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of {ok, _, _Leader} -> ok; - {error, already_member} -> + {error, _Server, already_member} -> ok; Error -> {error, recoverable, Error} @@ -208,10 +256,10 @@ drop_local_server(DB, Shard) -> remove_server(DB, Shard, Server) -> ShardServers = shard_servers(DB, Shard), Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT, - case ra_try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of + case try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of {ok, _, _Leader} -> ok; - {error, not_member} -> + {error, _Server, not_member} -> ok; Error -> {error, recoverable, Error} @@ -261,20 +309,6 @@ member_readiness(#{status := Status, voter_status := #{membership := Membership} member_readiness(#{}) -> unknown. -%% - -ra_try_servers([Server | Rest], Fun, Args) -> - case erlang:apply(Fun, [Server | Args]) of - {ok, R, Leader} -> - {ok, R, Leader}; - {error, Reason} when Reason == noproc; Reason == nodedown -> - ra_try_servers(Rest, Fun, Args); - ErrorOrTimeout -> - ErrorOrTimeout - end; -ra_try_servers([], _Fun, _Args) -> - {error, servers_unreachable}. - ra_overview(Server) -> case ra:member_overview(Server) of {ok, Overview, _Leader} -> diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl index 9267aee77..3a62b3b0f 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl @@ -69,6 +69,8 @@ prepare(Index, State) -> -spec write(_SnapshotDir :: file:filename(), ra_snapshot:meta(), _State :: ra_state()) -> ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}. write(Dir, Meta, MachineState) -> + ?tp(dsrepl_snapshot_write, #{meta => Meta, state => MachineState}), + ok = emqx_ds_storage_layer:flush(shard_id(MachineState)), ra_log_snapshot:write(Dir, Meta, MachineState). %% Reading a snapshot. @@ -165,6 +167,7 @@ complete_read(RS = #rs{reader = SnapReader, started_at = StartedAt}) -> -spec begin_accept(_SnapshotDir :: file:filename(), ra_snapshot:meta()) -> {ok, ws()}. begin_accept(Dir, Meta) -> + ?tp(dsrepl_snapshot_accept_started, #{meta => Meta}), WS = #ws{ phase = machine_state, started_at = erlang:monotonic_time(millisecond), @@ -207,7 +210,7 @@ complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) ?tp(dsrepl_snapshot_write_complete, #{writer => SnapWriter}), _ = emqx_ds_storage_snapshot:release_writer(SnapWriter), Result = complete_accept(WS#ws{writer = SnapWriter}), - ?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS)}), + ?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS), state => WS#ws.state}), Result; {error, Reason} -> ?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}), @@ -218,7 +221,7 @@ complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) -> ShardId = shard_id(WS), logger:info(#{ - msg => "dsrepl_snapshot_read_complete", + msg => "dsrepl_snapshot_write_complete", shard => ShardId, duration_ms => erlang:monotonic_time(millisecond) - StartedAt, bytes_written => emqx_ds_storage_snapshot:writer_info(bytes_written, SnapWriter) @@ -227,7 +230,7 @@ complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) -> write_machine_snapshot(WS). write_machine_snapshot(#ws{dir = Dir, meta = Meta, state = MachineState}) -> - write(Dir, Meta, MachineState). + ra_log_snapshot:write(Dir, Meta, MachineState). %% Restoring machine state from a snapshot. %% This is equivalent to restoring from a log snapshot. diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index abe154807..d3eed99df 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -140,6 +140,7 @@ t_replication_transfers_snapshots(Config) -> %% Stop the DB on the "offline" node. ok = emqx_cth_cluster:stop_node(NodeOffline), + _ = ?block_until(#{?snk_kind := ds_ra_state_enter, state := leader}, 500, 0), %% Fill the storage with messages and few additional generations. emqx_ds_test_helpers:apply_stream(?DB, Nodes -- [NodeOffline], Stream), @@ -232,14 +233,14 @@ t_rebalance(Config) -> ], Stream1 = emqx_utils_stream:interleave( [ - {10, Stream0}, + {20, Stream0}, emqx_utils_stream:const(add_generation) ], false ), Stream = emqx_utils_stream:interleave( [ - {50, Stream0}, + {50, Stream1}, emqx_utils_stream:list(Sequence) ], true @@ -604,7 +605,7 @@ t_drop_generation(Config) -> after emqx_cth_cluster:stop(Nodes) end, - fun(Trace) -> + fun(_Trace) -> %% TODO: some idempotency errors still happen %% ?assertMatch([], ?of_kind(ds_storage_layer_failed_to_drop_generation, Trace)), true @@ -794,6 +795,118 @@ t_store_batch_fail(_Config) -> ] ). +t_crash_restart_recover(init, Config) -> + Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], + Specs = emqx_cth_cluster:mk_nodespecs( + [ + {t_crash_stop_recover1, #{apps => Apps}}, + {t_crash_stop_recover2, #{apps => Apps}}, + {t_crash_stop_recover3, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + Nodes = emqx_cth_cluster:start(Specs), + [{nodes, Nodes}, {nodespecs, Specs} | Config]; +t_crash_restart_recover('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_crash_restart_recover(Config) -> + %% This testcase verifies that in the event of abrupt site failure message data is + %% correctly preserved. + Nodes = [N1, N2, N3] = ?config(nodes, Config), + _Specs = [_, NS2, NS3] = ?config(nodespecs, Config), + DBOpts = opts(#{n_shards => 16, n_sites => 3, replication_factor => 3}), + + %% Prepare test event stream. + NMsgs = 400, + NClients = 8, + {Stream0, TopicStreams} = + emqx_ds_test_helpers:interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), + Stream1 = emqx_utils_stream:interleave( + [ + {300, Stream0}, + emqx_utils_stream:const(add_generation) + ], + false + ), + Stream = emqx_utils_stream:interleave( + [ + {1000, Stream1}, + emqx_utils_stream:list([ + fun() -> kill_restart_node_async(N2, NS2, DBOpts) end, + fun() -> kill_restart_node_async(N3, NS3, DBOpts) end + ]) + ], + true + ), + + ?check_trace( + begin + %% Initialize DB on all nodes. + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, DBOpts]) + ), + + %% Apply the test events, including simulated node crashes. + NodeStream = emqx_utils_stream:const(N1), + emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0), + + %% It's expected to lose few messages when leaders are abruptly killed. + MatchFlushFailed = ?match_event(#{?snk_kind := emqx_ds_buffer_flush_failed}), + {ok, SubRef} = snabbkaffe:subscribe(MatchFlushFailed, NMsgs, _Timeout = 5000, infinity), + {timeout, Events} = snabbkaffe:receive_events(SubRef), + LostMessages = [M || #{batch := Messages} <- Events, M <- Messages], + ct:pal("Some messages were lost: ~p", [LostMessages]), + ?assert(length(LostMessages) < NMsgs div 20), + + %% Verify that all the successfully persisted messages are there. + VerifyClient = fun({ClientId, ExpectedStream}) -> + Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId), + ClientNodes = nodes_of_clientid(ClientId, Nodes), + DSStream1 = ds_topic_stream(ClientId, Topic, hd(ClientNodes)), + %% Do nodes contain same messages for a client? + lists:foreach( + fun(ClientNode) -> + DSStream = ds_topic_stream(ClientId, Topic, ClientNode), + ?defer_assert(emqx_ds_test_helpers:diff_messages(DSStream1, DSStream)) + end, + tl(ClientNodes) + ), + %% Does any messages were lost unexpectedly? + {_, DSMessages} = lists:unzip(emqx_utils_stream:consume(DSStream1)), + ExpectedMessages = emqx_utils_stream:consume(ExpectedStream), + MissingMessages = ExpectedMessages -- DSMessages, + ?defer_assert(?assertEqual([], MissingMessages -- LostMessages, DSMessages)) + end, + lists:foreach(VerifyClient, TopicStreams) + end, + [] + ). + +nodes_of_clientid(ClientId, Nodes) -> + emqx_ds_test_helpers:nodes_of_clientid(?DB, ClientId, Nodes). + +ds_topic_stream(ClientId, ClientTopic, Node) -> + emqx_ds_test_helpers:ds_topic_stream(?DB, ClientId, ClientTopic, Node). + +is_message_lost(Message, MessagesLost) -> + lists:any( + fun(ML) -> + emqx_ds_test_helpers:message_eq([clientid, topic, payload], Message, ML) + end, + MessagesLost + ). + +kill_restart_node_async(Node, Spec, DBOpts) -> + erlang:spawn_link(?MODULE, kill_restart_node, [Node, Spec, DBOpts]). + +kill_restart_node(Node, Spec, DBOpts) -> + ok = emqx_cth_peer:kill(Node), + ?tp(test_cluster_node_killed, #{node => Node}), + _ = emqx_cth_cluster:restart(Spec), + ok = erpc:call(Node, emqx_ds, open_db, [?DB, DBOpts]). + %% shard_server_info(Node, DB, Shard, Site, Info) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl index e93bb33be..dec9eea80 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl @@ -314,7 +314,7 @@ do_flush( ?tp( debug, emqx_ds_buffer_flush_failed, - #{db => DB, shard => Shard, error => Err} + #{db => DB, shard => Shard, batch => Messages, error => Err} ), emqx_ds_builtin_metrics:inc_buffer_batches_failed(Metrics), Reply = diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index c978e416f..20c3bc087 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -29,7 +29,7 @@ open/5, drop/5, prepare_batch/4, - commit_batch/3, + commit_batch/4, get_streams/4, get_delete_streams/4, make_iterator/5, @@ -270,7 +270,7 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> emqx_ds_storage_layer:shard_id(), s(), [{emqx_ds:time(), emqx_types:message()}, ...], - emqx_ds:message_store_opts() + emqx_ds_storage_layer:batch_store_opts() ) -> {ok, cooked_batch()}. prepare_batch(_ShardId, S, Messages, _Options) -> @@ -294,12 +294,14 @@ prepare_batch(_ShardId, S, Messages, _Options) -> -spec commit_batch( emqx_ds_storage_layer:shard_id(), s(), - cooked_batch() + cooked_batch(), + emqx_ds_storage_layer:batch_store_opts() ) -> ok | emqx_ds:error(_). commit_batch( _ShardId, _Data, - #{?cooked_payloads := [], ?cooked_lts_ops := LTS} + #{?cooked_payloads := [], ?cooked_lts_ops := LTS}, + _Options ) -> %% Assert: [] = LTS, @@ -307,7 +309,8 @@ commit_batch( commit_batch( _ShardId, #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars}, - #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs} + #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs}, + Options ) -> {ok, Batch} = rocksdb:batch(), %% Commit LTS trie to the storage: @@ -326,7 +329,7 @@ commit_batch( end, Payloads ), - Result = rocksdb:write_batch(DB, Batch, []), + Result = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)), rocksdb:release_batch(Batch), ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}), %% NOTE @@ -964,6 +967,13 @@ pop_lts_persist_ops() -> L end. +-spec write_batch_opts(emqx_ds_storage_layer:batch_store_opts()) -> + _RocksDBOpts :: [{atom(), _}]. +write_batch_opts(#{durable := false}) -> + [{disable_wal, true}]; +write_batch_opts(#{}) -> + []. + -ifdef(TEST). serialize(Msg) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 818d0bcb7..fe1d36a35 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -27,7 +27,7 @@ %% Data store_batch/3, prepare_batch/3, - commit_batch/2, + commit_batch/3, get_streams/3, get_delete_streams/3, @@ -44,6 +44,7 @@ drop_generation/2, %% Snapshotting + flush/1, take_snapshot/1, accept_snapshot/1, @@ -69,7 +70,8 @@ shard_id/0, options/0, prototype/0, - cooked_batch/0 + cooked_batch/0, + batch_store_opts/0 ]). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -108,10 +110,28 @@ -type shard_id() :: {emqx_ds:db(), binary()}. --type cf_refs() :: [{string(), rocksdb:cf_handle()}]. +-type cf_ref() :: {string(), rocksdb:cf_handle()}. +-type cf_refs() :: [cf_ref()]. -type gen_id() :: 0..16#ffff. +%% Options affecting how batches should be stored. +%% See also: `emqx_ds:message_store_opts()'. +-type batch_store_opts() :: + #{ + %% Whether the whole batch given to `store_batch' should be inserted atomically as + %% a unit. Default: `false'. + atomic => boolean(), + %% Should the storage make sure that the batch is written durably? Non-durable + %% writes are in general unsafe but require much less resources, i.e. with RocksDB + %% non-durable (WAL-less) writes do not usually involve _any_ disk I/O. + %% Default: `true'. + durable => boolean() + }. + +%% Options affecting how batches should be prepared. +-type batch_prepare_opts() :: #{}. + %% TODO: kept for BPAPI compatibility. Remove me on EMQX v5.6 -opaque stream_v1() :: #{ @@ -159,7 +179,7 @@ %% Module-specific data defined at generation creation time: data := Data, %% Column families used by this generation - cf_refs := cf_refs(), + cf_names := [string()], %% Time at which this was created. Might differ from `since', in particular for the %% first generation. created_at := emqx_message:timestamp(), @@ -225,14 +245,15 @@ shard_id(), generation_data(), [{emqx_ds:time(), emqx_types:message()}, ...], - emqx_ds:message_store_opts() + batch_store_opts() ) -> {ok, term()} | emqx_ds:error(_). -callback commit_batch( shard_id(), generation_data(), - _CookedBatch + _CookedBatch, + batch_store_opts() ) -> ok | emqx_ds:error(_). -callback get_streams( @@ -279,6 +300,7 @@ -record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}). -record(call_list_generations_with_lifetimes, {}). -record(call_drop_generation, {gen_id :: gen_id()}). +-record(call_flush, {}). -record(call_take_snapshot, {}). -spec drop_shard(shard_id()) -> ok. @@ -288,16 +310,13 @@ drop_shard(Shard) -> -spec store_batch( shard_id(), [{emqx_ds:time(), emqx_types:message()}], - emqx_ds:message_store_opts() + batch_store_opts() ) -> emqx_ds:store_batch_result(). store_batch(Shard, Messages, Options) -> - ?tp(emqx_ds_storage_layer_store_batch, #{ - shard => Shard, messages => Messages, options => Options - }), - case prepare_batch(Shard, Messages, Options) of + case prepare_batch(Shard, Messages, #{}) of {ok, CookedBatch} -> - commit_batch(Shard, CookedBatch); + commit_batch(Shard, CookedBatch, Options); ignore -> ok; Error = {error, _, _} -> @@ -307,7 +326,7 @@ store_batch(Shard, Messages, Options) -> -spec prepare_batch( shard_id(), [{emqx_ds:time(), emqx_types:message()}], - emqx_ds:message_store_opts() + batch_prepare_opts() ) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_). prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> %% NOTE @@ -336,11 +355,15 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> prepare_batch(_Shard, [], _Options) -> ignore. --spec commit_batch(shard_id(), cooked_batch()) -> emqx_ds:store_batch_result(). -commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}) -> +-spec commit_batch( + shard_id(), + cooked_batch(), + batch_store_opts() +) -> emqx_ds:store_batch_result(). +commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}, Options) -> #{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard), T0 = erlang:monotonic_time(microsecond), - Result = Mod:commit_batch(Shard, GenData, CookedBatch), + Result = Mod:commit_batch(Shard, GenData, CookedBatch, Options), T1 = erlang:monotonic_time(microsecond), emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), Result. @@ -539,6 +562,10 @@ shard_info(ShardId, status) -> error:badarg -> down end. +-spec flush(shard_id()) -> ok | {error, _}. +flush(ShardId) -> + gen_server:call(?REF(ShardId), #call_flush{}, infinity). + -spec take_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:reader()} | {error, _Reason}. take_snapshot(ShardId) -> case gen_server:call(?REF(ShardId), #call_take_snapshot{}, infinity) of @@ -566,6 +593,7 @@ start_link(Shard = {_, _}, Options) -> shard_id :: shard_id(), db :: rocksdb:db_handle(), cf_refs :: cf_refs(), + cf_need_flush :: gen_id(), schema :: shard_schema(), shard :: shard() }). @@ -591,10 +619,12 @@ init({ShardId, Options}) -> {Scm, CFRefs0} end, Shard = open_shard(ShardId, DB, CFRefs, Schema), + CurrentGenId = maps:get(current_generation, Schema), S = #s{ shard_id = ShardId, db = DB, cf_refs = CFRefs, + cf_need_flush = CurrentGenId, schema = Schema, shard = Shard }, @@ -635,6 +665,9 @@ handle_call(#call_list_generations_with_lifetimes{}, _From, S) -> handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) -> {Reply, S} = handle_drop_generation(S0, GenId), {reply, Reply, S}; +handle_call(#call_flush{}, _From, S0) -> + {Reply, S} = handle_flush(S0), + {reply, Reply, S}; handle_call(#call_take_snapshot{}, _From, S) -> Snapshot = handle_take_snapshot(S), {reply, Snapshot, S}; @@ -750,9 +783,9 @@ handle_drop_generation(S0, GenId) -> #s{ shard_id = ShardId, db = DB, - schema = #{?GEN_KEY(GenId) := GenSchema} = OldSchema, - shard = OldShard, - cf_refs = OldCFRefs + schema = #{?GEN_KEY(GenId) := GenSchema} = Schema0, + shard = #{?GEN_KEY(GenId) := #{data := RuntimeData}} = Shard0, + cf_refs = CFRefs0 } = S0, %% 1. Commit the metadata first, so other functions are less %% likely to see stale data, and replicas don't end up @@ -761,16 +794,16 @@ handle_drop_generation(S0, GenId) -> %% %% Note: in theory, this operation may be interrupted in the %% middle. This will leave column families hanging. - Shard = maps:remove(?GEN_KEY(GenId), OldShard), - Schema = maps:remove(?GEN_KEY(GenId), OldSchema), + Shard = maps:remove(?GEN_KEY(GenId), Shard0), + Schema = maps:remove(?GEN_KEY(GenId), Schema0), S1 = S0#s{ shard = Shard, schema = Schema }, commit_metadata(S1), %% 2. Now, actually drop the data from RocksDB: - #{module := Mod, cf_refs := GenCFRefs} = GenSchema, - #{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard, + #{module := Mod, cf_names := GenCFNames} = GenSchema, + GenCFRefs = [cf_ref(Name, CFRefs0) || Name <- GenCFNames], try Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) catch @@ -787,7 +820,7 @@ handle_drop_generation(S0, GenId) -> } ) end, - CFRefs = OldCFRefs -- GenCFRefs, + CFRefs = CFRefs0 -- GenCFRefs, S = S1#s{cf_refs = CFRefs}, {ok, S}. @@ -839,7 +872,7 @@ new_generation(ShardId, DB, Schema0, Shard0, Since) -> GenSchema = #{ module => Mod, data => GenData, - cf_refs => NewCFRefs, + cf_names => cf_names(NewCFRefs), created_at => erlang:system_time(millisecond), since => Since, until => undefined @@ -866,6 +899,10 @@ rocksdb_open(Shard, Options) -> DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true}, + %% NOTE + %% With WAL-less writes, it's important to have CFs flushed atomically. + %% For example, bitfield-lts backend needs data + trie CFs to be consistent. + {atomic_flush, true}, {enable_write_thread_adaptive_yield, false} | maps:get(db_options, Options, []) ], @@ -921,6 +958,34 @@ update_last_until(Schema = #{current_generation := GenId}, Until) -> {error, overlaps_existing_generations} end. +handle_flush(S = #s{db = DB, cf_refs = CFRefs, cf_need_flush = NeedFlushGenId, shard = Shard}) -> + %% NOTE + %% There could have been few generations added since the last time `flush/1` was + %% called. Strictly speaking, we don't need to flush them all at once as part of + %% a single atomic flush, but the error handling is a bit easier this way. + CurrentGenId = maps:get(current_generation, Shard), + GenIds = lists:seq(NeedFlushGenId, CurrentGenId), + CFHandles = lists:flatmap( + fun(GenId) -> + case Shard of + #{?GEN_KEY(GenId) := #{cf_names := CFNames}} -> + [cf_handle(N, CFRefs) || N <- CFNames]; + #{} -> + %% Generation was probably dropped. + [] + end + end, + GenIds + ), + case rocksdb:flush(DB, CFHandles, [{wait, true}]) of + ok -> + %% Current generation will always need a flush. + ?tp(ds_storage_flush_complete, #{gens => GenIds, cfs => CFHandles}), + {ok, S#s{cf_need_flush = CurrentGenId}}; + {error, _} = Error -> + {Error, S} + end. + handle_take_snapshot(#s{db = DB, shard_id = ShardId}) -> Name = integer_to_list(erlang:system_time(millisecond)), Dir = checkpoint_dir(ShardId, Name), @@ -954,6 +1019,21 @@ handle_event(Shard, Time, Event) -> GenId = generation_current(Shard), handle_event(Shard, Time, ?mk_storage_event(GenId, Event)). +%%-------------------------------------------------------------------------------- + +-spec cf_names(cf_refs()) -> [string()]. +cf_names(CFRefs) -> + {CFNames, _CFHandles} = lists:unzip(CFRefs), + CFNames. + +-spec cf_ref(_Name :: string(), cf_refs()) -> cf_ref(). +cf_ref(Name, CFRefs) -> + lists:keyfind(Name, 1, CFRefs). + +-spec cf_handle(_Name :: string(), cf_refs()) -> rocksdb:cf_handle(). +cf_handle(Name, CFRefs) -> + element(2, cf_ref(Name, CFRefs)). + %%-------------------------------------------------------------------------------- %% Schema access %%-------------------------------------------------------------------------------- @@ -1041,23 +1121,106 @@ erase_schema_runtime(Shard) -> -undef(PERSISTENT_TERM). --define(ROCKSDB_SCHEMA_KEY, <<"schema_v1">>). +-define(ROCKSDB_SCHEMA_KEY(V), <<"schema_", V>>). + +-define(ROCKSDB_SCHEMA_KEY, ?ROCKSDB_SCHEMA_KEY("v2")). +-define(ROCKSDB_SCHEMA_KEYS, [ + ?ROCKSDB_SCHEMA_KEY, + ?ROCKSDB_SCHEMA_KEY("v1") +]). -spec get_schema_persistent(rocksdb:db_handle()) -> shard_schema() | not_found. get_schema_persistent(DB) -> - case rocksdb:get(DB, ?ROCKSDB_SCHEMA_KEY, []) of + get_schema_persistent(DB, ?ROCKSDB_SCHEMA_KEYS). + +get_schema_persistent(DB, [Key | Rest]) -> + case rocksdb:get(DB, Key, []) of {ok, Blob} -> - Schema = binary_to_term(Blob), - %% Sanity check: - #{current_generation := _, prototype := _} = Schema, - Schema; + deserialize_schema(Key, Blob); not_found -> - not_found - end. + get_schema_persistent(DB, Rest) + end; +get_schema_persistent(_DB, []) -> + not_found. -spec put_schema_persistent(rocksdb:db_handle(), shard_schema()) -> ok. put_schema_persistent(DB, Schema) -> Blob = term_to_binary(Schema), rocksdb:put(DB, ?ROCKSDB_SCHEMA_KEY, Blob, []). +-spec deserialize_schema(_SchemaVsn :: binary(), binary()) -> shard_schema(). +deserialize_schema(SchemaVsn, Blob) -> + %% Sanity check: + Schema = #{current_generation := _, prototype := _} = binary_to_term(Blob), + decode_schema(SchemaVsn, Schema). + +decode_schema(?ROCKSDB_SCHEMA_KEY, Schema) -> + Schema; +decode_schema(?ROCKSDB_SCHEMA_KEY("v1"), Schema) -> + maps:map(fun decode_schema_v1/2, Schema). + +decode_schema_v1(?GEN_KEY(_), Generation = #{}) -> + decode_generation_schema_v1(Generation); +decode_schema_v1(_, V) -> + V. + +decode_generation_schema_v1(SchemaV1 = #{cf_refs := CFRefs}) -> + %% Drop potentially dead CF references from the time generation was created. + Schema = maps:remove(cf_refs, SchemaV1), + Schema#{cf_names => cf_names(CFRefs)}; +decode_generation_schema_v1(Schema = #{}) -> + Schema. + +%%-------------------------------------------------------------------------------- + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +decode_schema_v1_test() -> + SchemaV1 = #{ + current_generation => 42, + prototype => {emqx_ds_storage_reference, #{}}, + ?GEN_KEY(41) => #{ + module => emqx_ds_storage_reference, + data => {schema}, + cf_refs => [{"emqx_ds_storage_reference41", erlang:make_ref()}], + created_at => 12345, + since => 0, + until => 123456 + }, + ?GEN_KEY(42) => #{ + module => emqx_ds_storage_reference, + data => {schema}, + cf_refs => [{"emqx_ds_storage_reference42", erlang:make_ref()}], + created_at => 54321, + since => 123456, + until => undefined + } + }, + ?assertEqual( + #{ + current_generation => 42, + prototype => {emqx_ds_storage_reference, #{}}, + ?GEN_KEY(41) => #{ + module => emqx_ds_storage_reference, + data => {schema}, + cf_names => ["emqx_ds_storage_reference41"], + created_at => 12345, + since => 0, + until => 123456 + }, + ?GEN_KEY(42) => #{ + module => emqx_ds_storage_reference, + data => {schema}, + cf_names => ["emqx_ds_storage_reference42"], + created_at => 54321, + since => 123456, + until => undefined + } + }, + deserialize_schema(?ROCKSDB_SCHEMA_KEY("v1"), term_to_binary(SchemaV1)) + ). + +-endif. + -undef(ROCKSDB_SCHEMA_KEY). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index b4c3ade3f..ca29c11a8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -32,7 +32,7 @@ open/5, drop/5, prepare_batch/4, - commit_batch/3, + commit_batch/4, get_streams/4, get_delete_streams/4, make_iterator/5, @@ -105,7 +105,7 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) -> prepare_batch(_ShardId, _Data, Messages, _Options) -> {ok, Messages}. -commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) -> +commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, Options) -> {ok, Batch} = rocksdb:batch(), lists:foreach( fun({TS, Msg}) -> @@ -115,7 +115,7 @@ commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) -> end, Messages ), - Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []), + Res = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)), rocksdb:release_batch(Batch), Res. @@ -284,3 +284,10 @@ do_delete_next( -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. data_cf(GenId) -> "emqx_ds_storage_reference" ++ integer_to_list(GenId). + +-spec write_batch_opts(emqx_ds_storage_layer:batch_store_opts()) -> + _RocksDBOpts :: [{atom(), _}]. +write_batch_opts(#{durable := false}) -> + [{disable_wal, true}]; +write_batch_opts(#{}) -> + []. diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index bd0f382b2..866b4d381 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -64,7 +64,7 @@ t_iterate(_Config) -> {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}), %% Iterate through individual topics: [ begin @@ -94,7 +94,7 @@ t_delete(_Config) -> {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}), %% Iterate through topics: StartTime = 0, @@ -125,7 +125,7 @@ t_get_streams(_Config) -> {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}), GetStream = fun(Topic) -> StartTime = 0, emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), StartTime) @@ -152,7 +152,7 @@ t_get_streams(_Config) -> end || I <- lists:seq(1, 200) ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, #{}), %% Check that "foo/bar/baz" topic now appears in two streams: %% "foo/bar/baz" and "foo/bar/+": NewStreams = lists:sort(GetStream("foo/bar/baz")), @@ -180,7 +180,7 @@ t_new_generation_inherit_trie(_Config) -> || I <- lists:seq(1, 200), Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}), %% Now we create a new generation with the same LTS module. It should inherit the %% learned trie. ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000), @@ -194,7 +194,7 @@ t_new_generation_inherit_trie(_Config) -> || I <- lists:seq(1, 200), Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}), %% We should get only two streams for wildcard query, for "foo" and for "bar". ?assertMatch( [_Foo, _Bar], @@ -217,13 +217,13 @@ t_replay(_Config) -> {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}), %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar': Batch2 = [ {TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))} || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}), %% Check various topic filters: Messages = [M || {_TS, M} <- Batch1 ++ Batch2], %% Missing topics (no ghost messages): diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index ba9589e97..af41df1ad 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -188,12 +188,14 @@ apply_stream(DB, NodeStream0, Stream0, N) -> ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), apply_stream(DB, NodeStream, Stream, N + 1); [add_generation | Stream] -> - %% FIXME: + ?tp(notice, test_add_generation, #{}), [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), ?ON(Node, emqx_ds:add_generation(DB)), apply_stream(DB, NodeStream, Stream, N); [{Node, Operation, Arg} | Stream] when - Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites + Operation =:= join_db_site; + Operation =:= leave_db_site; + Operation =:= assign_db_sites -> ?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}), %% Apply the transition. @@ -207,7 +209,12 @@ apply_stream(DB, NodeStream0, Stream0, N) -> %% Give some time for at least one transition to complete. Transitions = transitions(Node, DB), ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), - ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))), + case Transitions of + [_ | _] -> + ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))); + [] -> + ok + end, apply_stream(DB, NodeStream0, Stream, N); [Fun | Stream] when is_function(Fun) -> Fun(), @@ -259,15 +266,18 @@ verify_stream_effects(DB, TestCase, Node, ClientId, ExpectedStream) -> ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]), ?defer_assert( begin - snabbkaffe_diff:assert_lists_eq( + diff_messages( ExpectedStream, - ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node), - message_diff_options([id, qos, from, flags, headers, topic, payload, extra]) + ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node) ), ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node]) end ). +diff_messages(Expected, Got) -> + Fields = [id, qos, from, flags, headers, topic, payload, extra], + diff_messages(Fields, Expected, Got). + diff_messages(Fields, Expected, Got) -> snabbkaffe_diff:assert_lists_eq(Expected, Got, message_diff_options(Fields)). diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 510b3e377..bab09b6b3 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -190,7 +190,7 @@ transpose_tail(S, Tail) -> %% @doc Make a stream by concatenating multiple streams. -spec chain([stream(X)]) -> stream(X). chain(L) -> - lists:foldl(fun chain/2, empty(), L). + lists:foldr(fun chain/2, empty(), L). %% @doc Make a stream by chaining (concatenating) two streams. %% The second stream begins to produce values only after the first one is exhausted. diff --git a/mix.exs b/mix.exs index 6a7e6bda7..cc8f1682e 100644 --- a/mix.exs +++ b/mix.exs @@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true}, - {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-5", override: true}, + {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-6", override: true}, {:ekka, github: "emqx/ekka", tag: "0.19.4", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, diff --git a/rebar.config b/rebar.config index e1d8d23f3..55ea83fdd 100644 --- a/rebar.config +++ b/rebar.config @@ -82,7 +82,7 @@ {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, - {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}}, + {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-6"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.4"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},