From 5fd5fc76e515cad1e2187a47cb12896f88ab1f1a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 14 Jun 2024 14:16:05 +0200 Subject: [PATCH] fix(dsstore): ensure backward compatibility --- .../src/emqx_ds_storage_layer.erl | 99 +++++++++++++++++-- 1 file changed, 91 insertions(+), 8 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index e0e0256c4..6d1744a07 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -1133,23 +1133,106 @@ erase_schema_runtime(Shard) -> -undef(PERSISTENT_TERM). --define(ROCKSDB_SCHEMA_KEY, <<"schema_v1">>). +-define(ROCKSDB_SCHEMA_KEY(V), <<"schema_", V>>). + +-define(ROCKSDB_SCHEMA_KEY, ?ROCKSDB_SCHEMA_KEY("v2")). +-define(ROCKSDB_SCHEMA_KEYS, [ + ?ROCKSDB_SCHEMA_KEY, + ?ROCKSDB_SCHEMA_KEY("v1") +]). -spec get_schema_persistent(rocksdb:db_handle()) -> shard_schema() | not_found. get_schema_persistent(DB) -> - case rocksdb:get(DB, ?ROCKSDB_SCHEMA_KEY, []) of + get_schema_persistent(DB, ?ROCKSDB_SCHEMA_KEYS). + +get_schema_persistent(DB, [Key | Rest]) -> + case rocksdb:get(DB, Key, []) of {ok, Blob} -> - Schema = binary_to_term(Blob), - %% Sanity check: - #{current_generation := _, prototype := _} = Schema, - Schema; + deserialize_schema(Key, Blob); not_found -> - not_found - end. + get_schema_persistent(DB, Rest) + end; +get_schema_persistent(_DB, []) -> + not_found. -spec put_schema_persistent(rocksdb:db_handle(), shard_schema()) -> ok. put_schema_persistent(DB, Schema) -> Blob = term_to_binary(Schema), rocksdb:put(DB, ?ROCKSDB_SCHEMA_KEY, Blob, []). +-spec deserialize_schema(_SchemaVsn :: binary(), binary()) -> shard_schema(). +deserialize_schema(SchemaVsn, Blob) -> + %% Sanity check: + Schema = #{current_generation := _, prototype := _} = binary_to_term(Blob), + decode_schema(SchemaVsn, Schema). + +decode_schema(?ROCKSDB_SCHEMA_KEY, Schema) -> + Schema; +decode_schema(?ROCKSDB_SCHEMA_KEY("v1"), Schema) -> + maps:map(fun decode_schema_v1/2, Schema). + +decode_schema_v1(?GEN_KEY(_), Generation = #{}) -> + decode_generation_schema_v1(Generation); +decode_schema_v1(_, V) -> + V. + +decode_generation_schema_v1(SchemaV1 = #{cf_refs := CFRefs}) -> + %% Drop potentially dead CF references from the time generation was created. + Schema = maps:remove(cf_refs, SchemaV1), + Schema#{cf_names => cf_names(CFRefs)}; +decode_generation_schema_v1(Schema = #{}) -> + Schema. + +%%-------------------------------------------------------------------------------- + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +decode_schema_v1_test() -> + SchemaV1 = #{ + current_generation => 42, + prototype => {emqx_ds_storage_reference, #{}}, + ?GEN_KEY(41) => #{ + module => emqx_ds_storage_reference, + data => {schema}, + cf_refs => [{"emqx_ds_storage_reference41", erlang:make_ref()}], + created_at => 12345, + since => 0, + until => 123456 + }, + ?GEN_KEY(42) => #{ + module => emqx_ds_storage_reference, + data => {schema}, + cf_refs => [{"emqx_ds_storage_reference42", erlang:make_ref()}], + created_at => 54321, + since => 123456, + until => undefined + } + }, + ?assertEqual( + #{ + current_generation => 42, + prototype => {emqx_ds_storage_reference, #{}}, + ?GEN_KEY(41) => #{ + module => emqx_ds_storage_reference, + data => {schema}, + cf_names => ["emqx_ds_storage_reference41"], + created_at => 12345, + since => 0, + until => 123456 + }, + ?GEN_KEY(42) => #{ + module => emqx_ds_storage_reference, + data => {schema}, + cf_names => ["emqx_ds_storage_reference42"], + created_at => 54321, + since => 123456, + until => undefined + } + }, + deserialize_schema(?ROCKSDB_SCHEMA_KEY("v1"), term_to_binary(SchemaV1)) + ). + +-endif. + -undef(ROCKSDB_SCHEMA_KEY).