diff --git a/apps/emqx/src/emqx_message.erl b/apps/emqx/src/emqx_message.erl index 7e899fe2d..f8dd157f7 100644 --- a/apps/emqx/src/emqx_message.erl +++ b/apps/emqx/src/emqx_message.erl @@ -38,7 +38,8 @@ from/1, topic/1, payload/1, - timestamp/1 + timestamp/1, + timestamp/2 ]). %% Flags @@ -66,7 +67,6 @@ -export([ is_expired/2, - set_timestamp/2, update_expiry/1, timestamp_now/0 ]). @@ -80,7 +80,10 @@ estimate_size/1 ]). --export_type([message_map/0]). +-export_type([ + timestamp/0, + message_map/0 +]). -type message_map() :: #{ id := binary(), @@ -90,10 +93,14 @@ headers := emqx_types:headers(), topic := emqx_types:topic(), payload := emqx_types:payload(), - timestamp := integer(), + timestamp := timestamp(), extra := _ }. +%% Message timestamp +%% Granularity: milliseconds. +-type timestamp() :: non_neg_integer(). + -elvis([{elvis_style, god_modules, disable}]). -spec make(emqx_types:topic(), emqx_types:payload()) -> emqx_types:message(). @@ -202,9 +209,14 @@ topic(#message{topic = Topic}) -> Topic. -spec payload(emqx_types:message()) -> emqx_types:payload(). payload(#message{payload = Payload}) -> Payload. --spec timestamp(emqx_types:message()) -> integer(). +-spec timestamp(emqx_types:message()) -> timestamp(). timestamp(#message{timestamp = TS}) -> TS. +-spec timestamp(emqx_types:message(), second | millisecond | microsecond) -> non_neg_integer(). +timestamp(#message{timestamp = TS}, second) -> TS div 1000; +timestamp(#message{timestamp = TS}, millisecond) -> TS; +timestamp(#message{timestamp = TS}, microsecond) -> TS * 1000. + -spec is_sys(emqx_types:message()) -> boolean(). is_sys(#message{flags = #{sys := true}}) -> true; @@ -289,10 +301,6 @@ is_expired(#message{timestamp = CreatedAt}, Zone) -> Interval -> elapsed(CreatedAt) > Interval end. --spec set_timestamp(integer(), emqx_types:message()) -> emqx_types:message(). -set_timestamp(Timestamp, Msg) -> - Msg#message{timestamp = Timestamp}. - -spec update_expiry(emqx_types:message()) -> emqx_types:message(). update_expiry( Msg = #message{ @@ -421,7 +429,7 @@ from_map(#{ }. %% @doc Get current timestamp in milliseconds. --spec timestamp_now() -> integer(). +-spec timestamp_now() -> timestamp(). timestamp_now() -> erlang:system_time(millisecond). diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index ac374b8a9..8cf3cb284 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -323,7 +323,7 @@ subscribe( ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID), {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), Subscription = #{ - start_time => emqx_ds:timestamp_us(), + start_time => now_ms(), props => SubOpts, id => SubId, deleted => false diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index a470d7281..24a8667b6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -150,9 +150,6 @@ %% Timestamp %% Each message must have unique timestamp. %% Earliest possible timestamp is 0. -%% Granularity: microsecond. -%% TODO: Currently, we should always use milliseconds, as that's the unit we -%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. -type time() :: non_neg_integer(). -type message_store_opts() :: diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index e98e235aa..100d7fa1f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -320,7 +320,7 @@ bin_key_to_vector(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size = DimSizeof ). --spec key_to_coord(keymapper(), key(), dimension()) -> coord(). +-spec key_to_coord(keymapper(), scalar(), dimension()) -> coord(). key_to_coord(#keymapper{vec_scanner = Scanner}, Key, Dim) -> Actions = lists:nth(Dim, Scanner), extract_coord(Actions, Key). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index c34c8d49d..39d5d7d68 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -371,7 +371,8 @@ do_drop_db_v1(DB) -> ) -> emqx_ds:store_batch_result(). do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) -> - emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options). + Batch = [{emqx_message:timestamp(Message), Message} || Message <- Messages], + emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options). %% Remove me in EMQX 5.6 -dialyzer({nowarn_function, do_get_streams_v1/4}). @@ -462,7 +463,7 @@ do_add_generation_v2(DB) -> MyShards = [], lists:foreach( fun(ShardId) -> - emqx_ds_storage_layer:add_generation({DB, ShardId}) + emqx_ds_storage_layer:add_generation({DB, ShardId}, emqx_ds:timestamp_us()) end, MyShards ). @@ -511,7 +512,10 @@ ra_store_batch(DB, Shard, Messages) -> end. ra_add_generation(DB, Shard) -> - Command = #{?tag => add_generation}, + Command = #{ + ?tag => add_generation, + ?since => emqx_ds:timestamp_us() + }, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), case ra:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> @@ -521,7 +525,11 @@ ra_add_generation(DB, Shard) -> end. ra_update_config(DB, Shard, Opts) -> - Command = #{?tag => update_config, ?config => Opts}, + Command = #{ + ?tag => update_config, + ?config => Opts, + ?since => emqx_ds:timestamp_us() + }, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), case ra:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> @@ -541,16 +549,18 @@ ra_drop_generation(DB, Shard, GenId) -> end. ra_get_streams(DB, Shard, TopicFilter, Time) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time). + {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + TimestampUs = timestamp_to_timeus(Time), + emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs). ra_get_delete_streams(DB, Shard, TopicFilter, Time) -> {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time). ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). + {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + TimestampUs = timestamp_to_timeus(StartTime), + emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimestampUs). ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), @@ -570,7 +580,16 @@ ra_delete_next(DB, Shard, Iter, Selector, BatchSize) -> ra_list_generations_with_lifetimes(DB, Shard) -> {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard). + Gens = emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard), + maps:map( + fun(_GenId, Data = #{since := Since, until := Until}) -> + Data#{ + since := timeus_to_timestamp(Since), + until := emqx_maybe:apply(fun timeus_to_timestamp/1, Until) + } + end, + Gens + ). ra_drop_shard(DB, Shard) -> LocalServer = emqx_ds_replication_layer_shard:server(DB, Shard, local), @@ -608,18 +627,22 @@ apply( {NState, Result, Effect}; apply( _RaftMeta, - #{?tag := add_generation}, - #{db_shard := DBShard} = State + #{?tag := add_generation, ?since := Since}, + #{db_shard := DBShard, latest := Latest} = State ) -> - Result = emqx_ds_storage_layer:add_generation(DBShard), - {State, Result}; + {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest), + Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp), + NState = State#{latest := NLatest}, + {NState, Result}; apply( _RaftMeta, - #{?tag := update_config, ?config := Opts}, - #{db_shard := DBShard} = State + #{?tag := update_config, ?since := Since, ?config := Opts}, + #{db_shard := DBShard, latest := Latest} = State ) -> - Result = emqx_ds_storage_layer:update_config(DBShard, Opts), - {State, Result}; + {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest), + Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts), + NState = State#{latest := NLatest}, + {NState, Result}; apply( _RaftMeta, #{?tag := drop_generation, ?generation := GenId}, @@ -632,12 +655,27 @@ assign_timestamps(Latest, Messages) -> assign_timestamps(Latest, Messages, []). assign_timestamps(Latest, [MessageIn | Rest], Acc) -> - case emqx_message:timestamp(MessageIn) of - Later when Later > Latest -> - assign_timestamps(Later, Rest, [MessageIn | Acc]); + case emqx_message:timestamp(MessageIn, microsecond) of + TimestampUs when TimestampUs > Latest -> + Message = assign_timestamp(TimestampUs, MessageIn), + assign_timestamps(TimestampUs, Rest, [Message | Acc]); _Earlier -> - Message = emqx_message:set_timestamp(Latest + 1, MessageIn), + Message = assign_timestamp(Latest + 1, MessageIn), assign_timestamps(Latest + 1, Rest, [Message | Acc]) end; assign_timestamps(Latest, [], Acc) -> {Latest, Acc}. + +assign_timestamp(TimestampUs, Message) -> + {TimestampUs, Message}. + +ensure_monotonic_timestamp(TimestampUs, Latest) when TimestampUs > Latest -> + {TimestampUs, TimestampUs + 1}; +ensure_monotonic_timestamp(_TimestampUs, Latest) -> + {Latest, Latest + 1}. + +timestamp_to_timeus(TimestampMs) -> + TimestampMs * 1000. + +timeus_to_timestamp(TimestampUs) -> + TimestampUs div 1000. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl index 89d615cbb..70812fa18 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl @@ -34,8 +34,9 @@ -define(batch_messages, 2). -define(timestamp, 3). -%% update_config +%% add_generation / update_config -define(config, 2). +-define(since, 3). %% drop_generation -define(generation, 2). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 080eda937..d3dcd887d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -67,9 +67,8 @@ store_batch(DB, Messages, Opts) -> case maps:get(atomic, Opts, false) of false -> lists:foreach( - fun(MessageIn) -> - Shard = emqx_ds_replication_layer:shard_of_message(DB, MessageIn, clientid), - Message = emqx_message:set_timestamp(emqx_ds:timestamp_us(), MessageIn), + fun(Message) -> + Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), gen_server:call( ?via(DB, Shard), #enqueue_req{ @@ -83,9 +82,7 @@ store_batch(DB, Messages, Opts) -> ); true -> maps:foreach( - fun(Shard, BatchIn) -> - Timestamp = emqx_ds:timestamp_us(), - Batch = [emqx_message:set_timestamp(Timestamp, Message) || Message <- BatchIn], + fun(Shard, Batch) -> gen_server:call( ?via(DB, Shard), #enqueue_atomic_req{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 326926d20..594854d21 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -245,16 +245,19 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{}) -> ok. -spec store_batch( - emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts() + emqx_ds_storage_layer:shard_id(), + s(), + [{emqx_ds:time(), emqx_types:message()}], + emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> {ok, Batch} = rocksdb:batch(), lists:foreach( - fun(Msg) -> - {Key, _} = make_key(S, Msg), + fun({Timestamp, Msg}) -> + {Key, _} = make_key(S, Timestamp, Msg), Val = serialize(Msg), - rocksdb:batch_put(Batch, Data, Key, Val) + rocksdb:put(DB, Data, Key, Val, []) end, Messages ), @@ -652,8 +655,8 @@ format_key(KeyMapper, Key) -> Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)], lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])). --spec make_key(s(), emqx_types:message()) -> {binary(), [binary()]}. -make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) -> +-spec make_key(s(), emqx_ds:time(), emqx_types:message()) -> {binary(), [binary()]}. +make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, #message{topic = TopicBin}) -> Tokens = emqx_topic:words(TopicBin), {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), VaryingHashes = [hash_topic_level(I) || I <- Varying], diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 1ab0df580..06e852dcd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -29,8 +29,8 @@ update_iterator/3, next/3, delete_next/4, - update_config/2, - add_generation/1, + update_config/3, + add_generation/2, list_generations_with_lifetimes/1, drop_generation/2 ]). @@ -133,7 +133,7 @@ cf_refs := cf_refs(), %% Time at which this was created. Might differ from `since', in particular for the %% first generation. - created_at := emqx_ds:time(), + created_at := emqx_message:timestamp(), %% When should this generation become active? %% This generation should only contain messages timestamped no earlier than that. %% The very first generation will have `since` equal 0. @@ -194,7 +194,12 @@ -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> ok | {error, _Reason}. --callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) -> +-callback store_batch( + shard_id(), + _Data, + [{emqx_ds:time(), emqx_types:message()}], + emqx_ds:message_store_opts() +) -> emqx_ds:store_batch_result(). -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> @@ -219,6 +224,9 @@ %% API for the replication layer %%================================================================================ +%% Note: we specify gen_server requests as records to make use of Dialyzer: +-record(call_add_generation, {since :: emqx_ds:time()}). +-record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}). -record(call_list_generations_with_lifetimes, {}). -record(call_drop_generation, {gen_id :: gen_id()}). @@ -230,7 +238,11 @@ open_shard(Shard, Options) -> drop_shard(Shard) -> ok = rocksdb:destroy(db_dir(Shard), []). --spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) -> +-spec store_batch( + shard_id(), + [{emqx_ds:time(), emqx_types:message()}], + emqx_ds:message_store_opts() +) -> emqx_ds:store_batch_result(). store_batch(Shard, Messages, Options) -> %% We always store messages in the current generation: @@ -398,13 +410,14 @@ delete_next( {ok, end_of_stream} end. --spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok. -update_config(ShardId, Options) -> - gen_server:call(?REF(ShardId), {?FUNCTION_NAME, Options}, infinity). +-spec update_config(shard_id(), emqx_ds:time(), emqx_ds:create_db_opts()) -> ok. +update_config(ShardId, Since, Options) -> + Call = #call_update_config{since = Since, options = Options}, + gen_server:call(?REF(ShardId), Call, infinity). --spec add_generation(shard_id()) -> ok. -add_generation(ShardId) -> - gen_server:call(?REF(ShardId), add_generation, infinity). +-spec add_generation(shard_id(), emqx_ds:time()) -> ok. +add_generation(ShardId, Since) -> + gen_server:call(?REF(ShardId), #call_add_generation{since = Since}, infinity). -spec list_generations_with_lifetimes(shard_id()) -> #{ @@ -438,9 +451,6 @@ start_link(Shard = {_, _}, Options) -> shard :: shard() }). -%% Note: we specify gen_server requests as records to make use of Dialyzer: --record(call_create_generation, {since :: emqx_ds:time()}). - -type server_state() :: #s{}. -define(DEFAULT_CF, "default"). @@ -470,16 +480,12 @@ init({ShardId, Options}) -> commit_metadata(S), {ok, S}. -handle_call({update_config, Options}, _From, #s{schema = Schema} = S0) -> - Prototype = maps:get(storage, Options), - S1 = S0#s{schema = Schema#{prototype := Prototype}}, - Since = emqx_message:timestamp_now(), - S = add_generation(S1, Since), +handle_call(#call_update_config{since = Since, options = Options}, _From, S0) -> + S = #s{} = handle_update_config(S0, Since, Options), commit_metadata(S), {reply, ok, S}; -handle_call(add_generation, _From, S0) -> - Since = emqx_message:timestamp_now(), - S = add_generation(S0, Since), +handle_call(#call_add_generation{since = Since}, _From, S0) -> + S = #s{} = handle_add_generation(S0, Since), commit_metadata(S), {reply, ok, S}; handle_call(#call_list_generations_with_lifetimes{}, _From, S) -> @@ -489,10 +495,6 @@ handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) -> {Reply, S} = handle_drop_generation(S0, GenId), commit_metadata(S), {reply, Reply, S}; -handle_call(#call_create_generation{since = Since}, _From, S0) -> - S = add_generation(S0, Since), - commit_metadata(S), - {reply, ok, S}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -528,11 +530,10 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) -> ShardSchema ). --spec add_generation(server_state(), emqx_ds:time()) -> server_state(). -add_generation(S0, Since) -> +-spec handle_add_generation(server_state(), emqx_ds:time()) -> + server_state() | {error, nonmonotonic}. +handle_add_generation(S0, Since) -> #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0, - Schema1 = update_last_until(Schema0, Since), - Shard1 = update_last_until(Shard0, Since), #{current_generation := OldGenId, prototype := {CurrentMod, _ModConf}} = Schema0, OldKey = ?GEN_KEY(OldGenId), @@ -540,39 +541,53 @@ add_generation(S0, Since) -> #{cf_refs := OldCFRefs} = OldGenSchema, #{OldKey := #{module := OldMod, data := OldGenData}} = Shard0, - {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since), + Schema1 = update_last_until(Schema0, Since), + Shard1 = update_last_until(Shard0, Since), - CFRefs = NewCFRefs ++ CFRefs0, - Key = ?GEN_KEY(GenId), - Generation0 = - #{data := NewGenData0} = - open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)), + case Schema1 of + _Updated = #{} -> + {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since), + CFRefs = NewCFRefs ++ CFRefs0, + Key = ?GEN_KEY(GenId), + Generation0 = + #{data := NewGenData0} = + open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)), + %% When the new generation's module is the same as the last one, we might want to + %% perform actions like inheriting some of the previous (meta)data. + NewGenData = + run_post_creation_actions( + #{ + shard_id => ShardId, + db => DB, + new_gen_id => GenId, + old_gen_id => OldGenId, + new_cf_refs => NewCFRefs, + old_cf_refs => OldCFRefs, + new_gen_runtime_data => NewGenData0, + old_gen_runtime_data => OldGenData, + new_module => CurrentMod, + old_module => OldMod + } + ), + Generation = Generation0#{data := NewGenData}, + Shard = Shard1#{current_generation := GenId, Key => Generation}, + S0#s{ + cf_refs = CFRefs, + schema = Schema, + shard = Shard + }; + {error, exists} -> + S0; + {error, Reason} -> + {error, Reason} + end. - %% When the new generation's module is the same as the last one, we might want to - %% perform actions like inheriting some of the previous (meta)data. - NewGenData = - run_post_creation_actions( - #{ - shard_id => ShardId, - db => DB, - new_gen_id => GenId, - old_gen_id => OldGenId, - new_cf_refs => NewCFRefs, - old_cf_refs => OldCFRefs, - new_gen_runtime_data => NewGenData0, - old_gen_runtime_data => OldGenData, - new_module => CurrentMod, - old_module => OldMod - } - ), - Generation = Generation0#{data := NewGenData}, - - Shard = Shard1#{current_generation := GenId, Key => Generation}, - S0#s{ - cf_refs = CFRefs, - schema = Schema, - shard = Shard - }. +-spec handle_update_config(server_state(), emqx_ds:time(), emqx_ds:create_db_opts()) -> + server_state(). +handle_update_config(S0 = #s{schema = Schema}, Since, Options) -> + Prototype = maps:get(storage, Options), + S = S0#s{schema = Schema#{prototype := Prototype}}, + handle_add_generation(S, Since). -spec handle_list_generations_with_lifetimes(server_state()) -> #{gen_id() => map()}. handle_list_generations_with_lifetimes(#s{schema = ShardSchema}) -> @@ -652,7 +667,7 @@ new_generation(ShardId, DB, Schema0, Since) -> module => Mod, data => GenData, cf_refs => NewCFRefs, - created_at => emqx_message:timestamp_now(), + created_at => erlang:system_time(millisecond), since => Since, until => undefined }, @@ -703,12 +718,19 @@ rocksdb_open(Shard, Options) -> db_dir({DB, ShardId}) -> filename:join([emqx_ds:base_dir(), atom_to_list(DB), binary_to_list(ShardId)]). --spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard(). -update_last_until(Schema, Until) -> - #{current_generation := GenId} = Schema, - GenData0 = maps:get(?GEN_KEY(GenId), Schema), - GenData = GenData0#{until := Until}, - Schema#{?GEN_KEY(GenId) := GenData}. +-spec update_last_until(Schema, emqx_ds:time()) -> + Schema | {error, exists | nonmonotonic} +when + Schema :: shard_schema() | shard(). +update_last_until(Schema = #{current_generation := GenId}, Until) -> + case maps:get(?GEN_KEY(GenId), Schema) of + GenData = #{since := CurrentSince} when CurrentSince < Until -> + Schema#{?GEN_KEY(GenId) := GenData#{until := Until}}; + #{since := Until} -> + {error, exists}; + #{since := CurrentSince} when CurrentSince > Until -> + {error, nonmonotonic} + end. run_post_creation_actions( #{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 9c217ef48..7aa54b9f3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -117,8 +117,8 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := tru Res; store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> lists:foreach( - fun(Msg) -> - Key = <<(emqx_message:timestamp(Msg)):64>>, + fun({Timestamp, Msg}) -> + Key = <>, Val = term_to_binary(Msg), rocksdb:put(DB, CF, Key, Val, []) end, @@ -209,8 +209,8 @@ do_next(_, _, _, _, 0, Key, Acc) -> {Key, Acc}; do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) -> case rocksdb:iterator_move(IT, Action) of - {ok, Key, Blob} -> - Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob), + {ok, Key = <>, Blob} -> + Msg = #message{topic = Topic} = binary_to_term(Blob), TopicWords = emqx_topic:words(Topic), case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of true -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 6e7d8629e..636b57b89 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -56,7 +56,7 @@ t_store(_Config) -> payload = Payload, timestamp = PublishedAt }, - ?assertMatch(ok, emqx_ds_storage_layer:store_batch(?SHARD, [Msg], #{})). + ?assertMatch(ok, emqx_ds_storage_layer:store_batch(?SHARD, [{PublishedAt, Msg}], #{})). %% Smoke test for iteration through a concrete topic t_iterate(_Config) -> @@ -64,7 +64,7 @@ t_iterate(_Config) -> Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>], Timestamps = lists:seq(1, 10), Batch = [ - make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) + {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), @@ -92,7 +92,7 @@ t_delete(_Config) -> Topics = [<<"foo/bar">>, TopicToDelete, <<"a">>], Timestamps = lists:seq(1, 10), Batch = [ - make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) + {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), @@ -123,7 +123,7 @@ t_get_streams(_Config) -> Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>], Timestamps = lists:seq(1, 10), Batch = [ - make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) + {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), @@ -149,7 +149,7 @@ t_get_streams(_Config) -> NewBatch = [ begin B = integer_to_binary(I), - make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>) + {100, make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>)} end || I <- lists:seq(1, 200) ], @@ -178,12 +178,8 @@ t_new_generation_inherit_trie(_Config) -> Timestamps = lists:seq(1, 10_000, 100), Batch = [ begin - B = integer_to_binary(I), - make_message( - TS, - <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>, - integer_to_binary(TS) - ) + Topic = emqx_topic:join(["wildcard", integer_to_binary(I), "suffix", Suffix]), + {TS, make_message(TS, Topic, integer_to_binary(TS))} end || I <- lists:seq(1, 200), TS <- Timestamps, @@ -192,7 +188,7 @@ t_new_generation_inherit_trie(_Config) -> ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), %% Now we create a new generation with the same LTS module. It should inherit the %% learned trie. - ok = emqx_ds_storage_layer:add_generation(?SHARD), + ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1000), ok end, fun(Trace) -> @@ -207,23 +203,21 @@ t_replay(_Config) -> Topics = [<<"foo/bar">>, <<"foo/bar/baz">>], Timestamps = lists:seq(1, 10_000, 100), Batch1 = [ - make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) + {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar': Batch2 = [ begin - B = integer_to_binary(I), - make_message( - TS, <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>, integer_to_binary(TS) - ) + Topic = emqx_topic:join(["wildcard", integer_to_list(I), "suffix", Suffix]), + {TS, make_message(TS, Topic, integer_to_binary(TS))} end || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), %% Check various topic filters: - Messages = Batch1 ++ Batch2, + Messages = [M || {_TS, M} <- Batch1 ++ Batch2], %% Missing topics (no ghost messages): ?assertNot(check(?SHARD, <<"missing/foo/bar">>, 0, Messages)), %% Regular topics: @@ -481,18 +475,6 @@ make_message(PublishedAt, Topic, Payload) when is_binary(Topic) -> payload = Payload }. -store(Shard, PublishedAt, TopicL, Payload) when is_list(TopicL) -> - store(Shard, PublishedAt, list_to_binary(TopicL), Payload); -store(Shard, PublishedAt, Topic, Payload) -> - ID = emqx_guid:gen(), - Msg = #message{ - id = ID, - topic = Topic, - timestamp = PublishedAt, - payload = Payload - }, - emqx_ds_storage_layer:message_store(Shard, [Msg], #{}). - payloads(Messages) -> lists:map( fun(#message{payload = P}) ->