From f79dd16672ebc40e27970190efd79cac62183c4c Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 14 May 2023 23:22:39 +0200 Subject: [PATCH] refactor(replay): zone -> shard Also bump erlang-rocksdb version --- apps/emqx_replay/src/emqx_replay.erl | 4 +- apps/emqx_replay/src/emqx_replay_conf.erl | 29 ++- .../src/emqx_replay_local_store.erl | 182 +++++++++--------- .../src/emqx_replay_local_store_sup.erl | 24 +-- .../src/emqx_replay_message_storage.erl | 10 +- apps/emqx_replay/src/emqx_replay_sup.erl | 8 +- .../test/emqx_replay_local_store_SUITE.erl | 94 ++++----- mix.exs | 2 +- rebar.config | 2 +- 9 files changed, 179 insertions(+), 176 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay.erl b/apps/emqx_replay/src/emqx_replay.erl index ed790697f..91ce94134 100644 --- a/apps/emqx_replay/src/emqx_replay.erl +++ b/apps/emqx_replay/src/emqx_replay.erl @@ -18,7 +18,7 @@ %% API: -export([]). --export_type([topic/0, time/0]). +-export_type([topic/0, time/0, shard/0]). -export_type([replay_id/0, replay/0]). %%================================================================================ @@ -28,6 +28,8 @@ %% parsed -type topic() :: list(binary()). +-type shard() :: binary(). + %% Timestamp %% Earliest possible timestamp is 0. %% TODO granularity? diff --git a/apps/emqx_replay/src/emqx_replay_conf.erl b/apps/emqx_replay/src/emqx_replay_conf.erl index 46fa53867..45f1b4fa5 100644 --- a/apps/emqx_replay/src/emqx_replay_conf.erl +++ b/apps/emqx_replay/src/emqx_replay_conf.erl @@ -18,9 +18,9 @@ %% TODO: make a proper HOCON schema and all... %% API: --export([zone_config/1, db_options/0]). +-export([shard_config/1, db_options/0]). --export([zone_iteration_options/1]). +-export([shard_iteration_options/1]). -export([default_iteration_options/0]). -type backend_config() :: @@ -35,17 +35,16 @@ -define(APP, emqx_replay). --type zone() :: emqx_types:zone(). +-spec shard_config(emqx_replay:shard()) -> backend_config(). +shard_config(Shard) -> + DefaultShardConfig = application:get_env(?APP, default_shard_config, default_shard_config()), + Shards = application:get_env(?APP, shard_config, #{}), + maps:get(Shard, Shards, DefaultShardConfig). --spec zone_config(zone()) -> backend_config(). -zone_config(Zone) -> - DefaultZoneConfig = application:get_env(?APP, default_zone_config, default_zone_config()), - Zones = application:get_env(?APP, zone_config, #{}), - maps:get(Zone, Zones, DefaultZoneConfig). - --spec zone_iteration_options(zone()) -> emqx_replay_message_storage:iteration_options(). -zone_iteration_options(Zone) -> - case zone_config(Zone) of +-spec shard_iteration_options(emqx_replay:shard()) -> + emqx_replay_message_storage:iteration_options(). +shard_iteration_options(Shard) -> + case shard_config(Shard) of {emqx_replay_message_storage, Config} -> maps:get(iteration, Config, default_iteration_options()); {_Module, _} -> @@ -54,11 +53,11 @@ zone_iteration_options(Zone) -> -spec default_iteration_options() -> emqx_replay_message_storage:iteration_options(). default_iteration_options() -> - {emqx_replay_message_storage, Config} = default_zone_config(), + {emqx_replay_message_storage, Config} = default_shard_config(), maps:get(iteration, Config). --spec default_zone_config() -> backend_config(). -default_zone_config() -> +-spec default_shard_config() -> backend_config(). +default_shard_config() -> {emqx_replay_message_storage, #{ timestamp_bits => 64, topic_bits_per_level => [8, 8, 8, 32, 16], diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 8a74f248e..359db382d 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -30,7 +30,7 @@ %% behavior callbacks: -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). --export_type([cf_refs/0, gen_id/0, db_write_options/0]). +-export_type([cf_refs/0, gen_id/0, db_write_options/0, state/0, iterator/0]). -compile({inline, [meta_lookup/2]}). @@ -39,7 +39,7 @@ %%================================================================================ %% see rocksdb:db_options() --type options() :: proplists:proplist(). +% -type options() :: proplists:proplist(). -type db_write_options() :: proplists:proplist(). @@ -59,14 +59,14 @@ }. -record(s, { - zone :: emqx_types:zone(), + shard :: emqx_replay:shard(), db :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle(), cf_generations :: cf_refs() }). -record(it, { - zone :: emqx_types:zone(), + shard :: emqx_replay:shard(), gen :: gen_id(), replay :: emqx_replay:replay(), module :: module(), @@ -94,33 +94,35 @@ %% 3. `inplace_update_support`? -define(ITERATOR_CF_OPTS, []). --define(REF(Zone), {via, gproc, {n, l, {?MODULE, Zone}}}). +-define(REF(Shard), {via, gproc, {n, l, {?MODULE, Shard}}}). %%================================================================================ %% API funcions %%================================================================================ --spec start_link(emqx_types:zone()) -> {ok, pid()}. -start_link(Zone) -> - gen_server:start_link(?REF(Zone), ?MODULE, [Zone], []). +-spec start_link(emqx_replay:shard()) -> {ok, pid()}. +start_link(Shard) -> + gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []). --spec create_generation(emqx_types:zone(), emqx_replay:time(), emqx_replay_conf:backend_config()) -> +-spec create_generation(emqx_replay:shard(), emqx_replay:time(), emqx_replay_conf:backend_config()) -> {ok, gen_id()} | {error, nonmonotonic}. -create_generation(Zone, Since, Config = {_Module, _Options}) -> - gen_server:call(?REF(Zone), {create_generation, Since, Config}). +create_generation(Shard, Since, Config = {_Module, _Options}) -> + gen_server:call(?REF(Shard), {create_generation, Since, Config}). --spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) -> +-spec store( + emqx_replay:shard(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary() +) -> ok | {error, _TODO}. -store(Zone, GUID, Time, Topic, Msg) -> - {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Zone, Time), +store(Shard, GUID, Time, Topic, Msg) -> + {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), Mod:store(Data, GUID, Time, Topic, Msg). --spec make_iterator(emqx_types:zone(), emqx_replay:replay()) -> +-spec make_iterator(emqx_replay:shard(), emqx_replay:replay()) -> {ok, iterator()} | {error, _TODO}. -make_iterator(Zone, Replay = {_, StartTime}) -> - {GenId, Gen} = meta_lookup_gen(Zone, StartTime), +make_iterator(Shard, Replay = {_, StartTime}) -> + {GenId, Gen} = meta_lookup_gen(Shard, StartTime), open_iterator(Gen, #it{ - zone = Zone, + shard = Shard, gen = GenId, replay = Replay }). @@ -148,30 +150,30 @@ next(It = #it{module = Mod, data = ItData}) -> preserve_iterator(It = #it{}, ReplayID) -> iterator_put_state(ReplayID, It). --spec restore_iterator(emqx_types:zone(), emqx_replay:replay_id()) -> +-spec restore_iterator(emqx_replay:shard(), emqx_replay:replay_id()) -> {ok, iterator()} | {error, _TODO}. -restore_iterator(Zone, ReplayID) -> - case iterator_get_state(Zone, ReplayID) of +restore_iterator(Shard, ReplayID) -> + case iterator_get_state(Shard, ReplayID) of {ok, Serial} -> - restore_iterator_state(Zone, Serial); + restore_iterator_state(Shard, Serial); not_found -> {error, not_found}; {error, _Reason} = Error -> Error end. --spec discard_iterator(emqx_types:zone(), emqx_replay:replay_id()) -> +-spec discard_iterator(emqx_replay:shard(), emqx_replay:replay_id()) -> ok | {error, _TODO}. -discard_iterator(Zone, ReplayID) -> - iterator_delete(Zone, ReplayID). +discard_iterator(Shard, ReplayID) -> + iterator_delete(Shard, ReplayID). %%================================================================================ %% behavior callbacks %%================================================================================ -init([Zone]) -> +init([Shard]) -> process_flag(trap_exit, true), - {ok, S0} = open_db(Zone), + {ok, S0} = open_db(Shard), S = ensure_current_generation(S0), ok = populate_metadata(S), {ok, S}. @@ -192,8 +194,8 @@ handle_cast(_Cast, S) -> handle_info(_Info, S) -> {noreply, S}. -terminate(_Reason, #s{db = DB, zone = Zone}) -> - meta_erase(Zone), +terminate(_Reason, #s{db = DB, shard = Shard}) -> + meta_erase(Shard), ok = rocksdb:close(DB). %%================================================================================ @@ -203,21 +205,21 @@ terminate(_Reason, #s{db = DB, zone = Zone}) -> -record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}). -spec populate_metadata(state()) -> ok. -populate_metadata(S = #s{zone = Zone, db = DBHandle, cf_iterator = CFIterator}) -> - ok = meta_put(Zone, db, #db{handle = DBHandle, cf_iterator = CFIterator}), +populate_metadata(S = #s{shard = Shard, db = DBHandle, cf_iterator = CFIterator}) -> + ok = meta_put(Shard, db, #db{handle = DBHandle, cf_iterator = CFIterator}), Current = schema_get_current(DBHandle), lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)). -spec populate_metadata(gen_id(), state()) -> ok. -populate_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) -> +populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) -> Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S), - meta_register_gen(Zone, GenId, Gen). + meta_register_gen(Shard, GenId, Gen). -spec ensure_current_generation(state()) -> state(). -ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) -> +ensure_current_generation(S = #s{shard = Shard, db = DBHandle}) -> case schema_get_current(DBHandle) of undefined -> - Config = emqx_replay_conf:zone_config(Zone), + Config = emqx_replay_conf:shard_config(Shard), {ok, _, NS} = create_new_gen(0, Config, S), NS; _GenId -> @@ -226,16 +228,16 @@ ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) -> -spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> {ok, gen_id(), state()} | {error, nonmonotonic}. -create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) -> - GenId = get_next_id(meta_get_current(Zone)), +create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) -> + GenId = get_next_id(meta_get_current(Shard)), GenId = get_next_id(schema_get_current(DBHandle)), - case is_gen_valid(Zone, GenId, Since) of + case is_gen_valid(Shard, GenId, Since) of ok -> {ok, Gen, NS} = create_gen(GenId, Since, Config, S), %% TODO: Transaction? Column family creation can't be transactional, anyway. ok = schema_put_gen(DBHandle, GenId, Gen), ok = schema_put_current(DBHandle, GenId), - ok = meta_register_gen(Zone, GenId, open_gen(GenId, Gen, NS)), + ok = meta_register_gen(Shard, GenId, open_gen(GenId, Gen, NS)), {ok, GenId, NS}; {error, _} = Error -> Error @@ -253,9 +255,9 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations }, {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. --spec open_db(emqx_types:zone()) -> {ok, state()} | {error, _TODO}. -open_db(Zone) -> - Filename = atom_to_list(Zone), +-spec open_db(emqx_replay:shard()) -> {ok, state()} | {error, _TODO}. +open_db(Shard) -> + Filename = binary_to_list(Shard), DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true} @@ -278,7 +280,7 @@ open_db(Zone) -> {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} -> {CFNames, _} = lists:unzip(ExistingCFs), {ok, #s{ - zone = Zone, + shard = Shard, db = DBHandle, cf_iterator = CFIterator, cf_generations = lists:zip(CFNames, CFRefs) @@ -291,14 +293,14 @@ open_db(Zone) -> open_gen( GenId, Gen = #{module := Mod, data := Data}, - #s{zone = Zone, db = DBHandle, cf_generations = CFs} + #s{shard = Shard, db = DBHandle, cf_generations = CFs} ) -> - DB = Mod:open(Zone, DBHandle, GenId, CFs, Data), + DB = Mod:open(Shard, DBHandle, GenId, CFs, Data), Gen#{data := DB}. -spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. -open_next_iterator(It = #it{zone = Zone, gen = GenId}) -> - open_next_iterator(meta_get_gen(Zone, GenId + 1), It#it{gen = GenId + 1}). +open_next_iterator(It = #it{shard = Shard, gen = GenId}) -> + open_next_iterator(meta_get_gen(Shard, GenId + 1), It#it{gen = GenId + 1}). open_next_iterator(undefined, _It) -> none; @@ -331,17 +333,17 @@ open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay}, -define(ITERATION_WRITE_OPTS, []). -define(ITERATION_READ_OPTS, []). -iterator_get_state(Zone, ReplayID) -> - #db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db), +iterator_get_state(Shard, ReplayID) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS). -iterator_put_state(ID, It = #it{zone = Zone}) -> - #db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db), +iterator_put_state(ID, It = #it{shard = Shard}) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), Serial = preserve_iterator_state(It), rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS). -iterator_delete(Zone, ID) -> - #db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db), +iterator_delete(Shard, ID) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS). preserve_iterator_state(#it{ @@ -358,10 +360,10 @@ preserve_iterator_state(#it{ st => Mod:preserve_iterator(ItData) }). -restore_iterator_state(Zone, Serial) when is_binary(Serial) -> - restore_iterator_state(Zone, binary_to_term(Serial)); +restore_iterator_state(Shard, Serial) when is_binary(Serial) -> + restore_iterator_state(Shard, binary_to_term(Serial)); restore_iterator_state( - Zone, + Shard, #{ v := 1, gen := Gen, @@ -370,8 +372,8 @@ restore_iterator_state( st := State } ) -> - It = #it{zone = Zone, gen = Gen, replay = {TopicFilter, StartTime}}, - open_restore_iterator(meta_get_gen(Zone, Gen), It, State). + It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}}, + open_restore_iterator(meta_get_gen(Shard, Gen), It, State). %% Functions for dealing with the metadata stored persistently in rocksdb @@ -409,27 +411,27 @@ schema_gen_key(N) -> -undef(SCHEMA_WRITE_OPTS). -undef(SCHEMA_READ_OPTS). -%% Functions for dealing with the runtime zone metadata: +%% Functions for dealing with the runtime shard metadata: --define(PERSISTENT_TERM(ZONE, GEN), {?MODULE, ZONE, GEN}). +-define(PERSISTENT_TERM(SHARD, GEN), {?MODULE, SHARD, GEN}). --spec meta_register_gen(emqx_types:zone(), gen_id(), generation()) -> ok. -meta_register_gen(Zone, GenId, Gen) -> +-spec meta_register_gen(emqx_replay:shard(), gen_id(), generation()) -> ok. +meta_register_gen(Shard, GenId, Gen) -> Gs = case GenId > 0 of - true -> meta_lookup(Zone, GenId - 1); + true -> meta_lookup(Shard, GenId - 1); false -> [] end, - ok = meta_put(Zone, GenId, [Gen | Gs]), - ok = meta_put(Zone, current, GenId). + ok = meta_put(Shard, GenId, [Gen | Gs]), + ok = meta_put(Shard, current, GenId). --spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> {gen_id(), generation()}. -meta_lookup_gen(Zone, Time) -> +-spec meta_lookup_gen(emqx_replay:shard(), emqx_replay:time()) -> {gen_id(), generation()}. +meta_lookup_gen(Shard, Time) -> % TODO % Is cheaper persistent term GC on update here worth extra lookup? I'm leaning % towards a "no". - Current = meta_lookup(Zone, current), - Gens = meta_lookup(Zone, Current), + Current = meta_lookup(Shard, current), + Gens = meta_lookup(Shard, Current), find_gen(Time, Current, Gens). find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since -> @@ -437,34 +439,34 @@ find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since -> find_gen(Time, GenId, [_Gen | Rest]) -> find_gen(Time, GenId - 1, Rest). --spec meta_get_gen(emqx_types:zone(), gen_id()) -> generation() | undefined. -meta_get_gen(Zone, GenId) -> - case meta_lookup(Zone, GenId, []) of +-spec meta_get_gen(emqx_replay:shard(), gen_id()) -> generation() | undefined. +meta_get_gen(Shard, GenId) -> + case meta_lookup(Shard, GenId, []) of [Gen | _Older] -> Gen; [] -> undefined end. --spec meta_get_current(emqx_types:zone()) -> gen_id() | undefined. -meta_get_current(Zone) -> - meta_lookup(Zone, current, undefined). +-spec meta_get_current(emqx_replay:shard()) -> gen_id() | undefined. +meta_get_current(Shard) -> + meta_lookup(Shard, current, undefined). --spec meta_lookup(emqx_types:zone(), _K) -> _V. -meta_lookup(Zone, K) -> - persistent_term:get(?PERSISTENT_TERM(Zone, K)). +-spec meta_lookup(emqx_replay:shard(), _K) -> _V. +meta_lookup(Shard, K) -> + persistent_term:get(?PERSISTENT_TERM(Shard, K)). --spec meta_lookup(emqx_types:zone(), _K, Default) -> _V | Default. -meta_lookup(Zone, K, Default) -> - persistent_term:get(?PERSISTENT_TERM(Zone, K), Default). +-spec meta_lookup(emqx_replay:shard(), _K, Default) -> _V | Default. +meta_lookup(Shard, K, Default) -> + persistent_term:get(?PERSISTENT_TERM(Shard, K), Default). --spec meta_put(emqx_types:zone(), _K, _V) -> ok. -meta_put(Zone, K, V) -> - persistent_term:put(?PERSISTENT_TERM(Zone, K), V). +-spec meta_put(emqx_replay:shard(), _K, _V) -> ok. +meta_put(Shard, K, V) -> + persistent_term:put(?PERSISTENT_TERM(Shard, K), V). --spec meta_erase(emqx_types:zone()) -> ok. -meta_erase(Zone) -> +-spec meta_erase(emqx_replay:shard()) -> ok. +meta_erase(Shard) -> [ persistent_term:erase(K) - || {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Zone + || {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Shard ], ok. @@ -473,15 +475,15 @@ meta_erase(Zone) -> get_next_id(undefined) -> 0; get_next_id(GenId) -> GenId + 1. -is_gen_valid(Zone, GenId, Since) when GenId > 0 -> - [GenPrev | _] = meta_lookup(Zone, GenId - 1), +is_gen_valid(Shard, GenId, Since) when GenId > 0 -> + [GenPrev | _] = meta_lookup(Shard, GenId - 1), case GenPrev of #{since := SincePrev} when Since > SincePrev -> ok; #{} -> {error, nonmonotonic} end; -is_gen_valid(_Zone, 0, 0) -> +is_gen_valid(_Shard, 0, 0) -> ok. %% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok. diff --git a/apps/emqx_replay/src/emqx_replay_local_store_sup.erl b/apps/emqx_replay/src/emqx_replay_local_store_sup.erl index fb88ef212..6812d0ee9 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store_sup.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store_sup.erl @@ -18,7 +18,7 @@ -behavior(supervisor). %% API: --export([start_link/0, start_zone/1, stop_zone/1]). +-export([start_link/0, start_shard/1, stop_shard/1]). %% behavior callbacks: -export([init/1]). @@ -37,14 +37,14 @@ start_link() -> supervisor:start_link({local, ?SUP}, ?MODULE, []). --spec start_zone(emqx_types:zone()) -> supervisor:startchild_ret(). -start_zone(Zone) -> - supervisor:start_child(?SUP, zone_child_spec(Zone)). +-spec start_shard(emqx_replay:shard()) -> supervisor:startchild_ret(). +start_shard(Shard) -> + supervisor:start_child(?SUP, shard_child_spec(Shard)). --spec stop_zone(emqx_types:zone()) -> ok | {error, _}. -stop_zone(Zone) -> - ok = supervisor:terminate_child(?SUP, Zone), - ok = supervisor:delete_child(?SUP, Zone). +-spec stop_shard(emqx_replay:shard()) -> ok | {error, _}. +stop_shard(Shard) -> + ok = supervisor:terminate_child(?SUP, Shard), + ok = supervisor:delete_child(?SUP, Shard). %%================================================================================ %% behavior callbacks @@ -63,11 +63,11 @@ init([]) -> %% Internal functions %%================================================================================ --spec zone_child_spec(emqx_types:zone()) -> supervisor:child_spec(). -zone_child_spec(Zone) -> +-spec shard_child_spec(emqx_replay:shard()) -> supervisor:child_spec(). +shard_child_spec(Shard) -> #{ - id => Zone, - start => {emqx_replay_local_store, start_link, [Zone]}, + id => Shard, + start => {emqx_replay_local_store, start_link, [Shard]}, shutdown => 5_000, restart => permanent, type => worker diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index fbeb452c9..bfbaf55b3 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -182,7 +182,7 @@ -opaque schema() :: #schema{}. -record(db, { - zone :: emqx_types:zone(), + shard :: emqx_replay:shard(), handle :: rocksdb:db_handle(), cf :: rocksdb:cf_handle(), keymapper :: keymapper(), @@ -244,17 +244,17 @@ create_new(DBHandle, GenId, Options) -> %% Reopen the database -spec open( - emqx_types:zone(), + emqx_replay:shard(), rocksdb:db_handle(), emqx_replay_local_store:gen_id(), emqx_replay_local_store:cf_refs(), schema() ) -> db(). -open(Zone, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> +open(Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs), #db{ - zone = Zone, + shard = Shard, handle = DBHandle, cf = CFHandle, keymapper = Keymapper @@ -292,7 +292,7 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, -spec make_iterator(db(), emqx_replay:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, Replay) -> - Options = emqx_replay_conf:zone_iteration_options(DB#db.zone), + Options = emqx_replay_conf:shard_iteration_options(DB#db.shard), make_iterator(DB, Replay, Options). -spec make_iterator(db(), emqx_replay:replay(), iteration_options()) -> diff --git a/apps/emqx_replay/src/emqx_replay_sup.erl b/apps/emqx_replay/src/emqx_replay_sup.erl index a5da13c7a..969ce9a49 100644 --- a/apps/emqx_replay/src/emqx_replay_sup.erl +++ b/apps/emqx_replay/src/emqx_replay_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ start_link() -> %%================================================================================ init([]) -> - Children = [zone_sup()], + Children = [shard_sup()], SupFlags = #{ strategy => one_for_all, intensity => 0, @@ -54,9 +54,9 @@ init([]) -> %% Internal functions %%================================================================================ -zone_sup() -> +shard_sup() -> #{ - id => local_store_zone_sup, + id => local_store_shard_sup, start => {emqx_replay_local_store_sup, start_link, []}, restart => permanent, type => supervisor, diff --git a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl index afed30b88..cdf8a95e7 100644 --- a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl @@ -21,7 +21,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). --define(ZONE, zone(?FUNCTION_NAME)). +-define(SHARD, shard(?FUNCTION_NAME)). -define(DEFAULT_CONFIG, {emqx_replay_message_storage, #{ @@ -44,8 +44,8 @@ %% Smoke test for opening and reopening the database t_open(_Config) -> - ok = emqx_replay_local_store_sup:stop_zone(?ZONE), - {ok, _} = emqx_replay_local_store_sup:start_zone(?ZONE). + ok = emqx_replay_local_store_sup:stop_shard(?SHARD), + {ok, _} = emqx_replay_local_store_sup:start_shard(?SHARD). %% Smoke test of store function t_store(_Config) -> @@ -53,7 +53,7 @@ t_store(_Config) -> PublishedAt = 1000, Topic = [<<"foo">>, <<"bar">>], Payload = <<"message">>, - ?assertMatch(ok, emqx_replay_local_store:store(?ZONE, MessageID, PublishedAt, Topic, Payload)). + ?assertMatch(ok, emqx_replay_local_store:store(?SHARD, MessageID, PublishedAt, Topic, Payload)). %% Smoke test for iteration through a concrete topic t_iterate(_Config) -> @@ -62,7 +62,7 @@ t_iterate(_Config) -> Timestamps = lists:seq(1, 10), [ emqx_replay_local_store:store( - ?ZONE, + ?SHARD, emqx_guid:gen(), PublishedAt, Topic, @@ -73,7 +73,7 @@ t_iterate(_Config) -> %% Iterate through individual topics: [ begin - {ok, It} = emqx_replay_local_store:make_iterator(?ZONE, {Topic, 0}), + {ok, It} = emqx_replay_local_store:make_iterator(?SHARD, {Topic, 0}), Values = iterate(It), ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values) end @@ -87,50 +87,50 @@ t_iterate_wildcard(_Config) -> Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], Timestamps = lists:seq(1, 10), _ = [ - store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) || Topic <- Topics, PublishedAt <- Timestamps ], ?assertEqual( lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 0)]) ), ?assertEqual( [], - lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 10 + 1)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 10 + 1)]) ), ?assertEqual( lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 5)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 5)]) ), ?assertEqual( lists:sort([ {Topic, PublishedAt} || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps ]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/#", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)]) ), ?assertEqual( lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/+", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+", 0)]) ), ?assertEqual( [], - lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/+/bar", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+/bar", 0)]) ), ?assertEqual( lists:sort([ {Topic, PublishedAt} || Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps ]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "+/bar/#", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "+/bar/#", 0)]) ), ?assertEqual( lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 0)]) ), ?assertEqual( [], - lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/+/+", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/+/+", 0)]) ), ok. @@ -139,40 +139,40 @@ t_iterate_long_tail_wildcard(_Config) -> TopicFilter = "b/c/d/e/+/+", Timestamps = lists:seq(1, 100), _ = [ - store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) || PublishedAt <- Timestamps ], ?assertEqual( lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, TopicFilter, 50)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, TopicFilter, 50)]) ). t_create_gen(_Config) -> - {ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG), + {ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), ?assertEqual( {error, nonmonotonic}, - emqx_replay_local_store:create_generation(?ZONE, 1, ?DEFAULT_CONFIG) + emqx_replay_local_store:create_generation(?SHARD, 1, ?DEFAULT_CONFIG) ), ?assertEqual( {error, nonmonotonic}, - emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG) + emqx_replay_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG) ), - {ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), Topics = ["foo/bar", "foo/bar/baz"], Timestamps = lists:seq(1, 100), [ - ?assertEqual(ok, store(?ZONE, PublishedAt, Topic, <<>>)) + ?assertEqual(ok, store(?SHARD, PublishedAt, Topic, <<>>)) || Topic <- Topics, PublishedAt <- Timestamps ]. t_iterate_multigen(_Config) -> - {ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 1000, ?DEFAULT_CONFIG), + {ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_replay_local_store:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], Timestamps = lists:seq(1, 100), _ = [ - store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) || Topic <- Topics, PublishedAt <- Timestamps ], ?assertEqual( @@ -180,38 +180,38 @@ t_iterate_multigen(_Config) -> {Topic, PublishedAt} || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps ]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/#", 0)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)]) ), ?assertEqual( lists:sort([ {Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100) ]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)]) + lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)]) ). t_iterate_multigen_preserve_restore(_Config) -> ReplayID = atom_to_binary(?FUNCTION_NAME), - {ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 100, ?DEFAULT_CONFIG), + {ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_replay_local_store:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), Topics = ["foo/bar", "foo/bar/baz", "a/bar"], Timestamps = lists:seq(1, 100), TopicFilter = "foo/#", TopicsMatching = ["foo/bar", "foo/bar/baz"], _ = [ - store(?ZONE, TS, Topic, term_to_binary({Topic, TS})) + store(?SHARD, TS, Topic, term_to_binary({Topic, TS})) || Topic <- Topics, TS <- Timestamps ], - It0 = iterator(?ZONE, TopicFilter, 0), + It0 = iterator(?SHARD, TopicFilter, 0), {It1, Res10} = iterate(It0, 10), % preserve mid-generation ok = emqx_replay_local_store:preserve_iterator(It1, ReplayID), - {ok, It2} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID), + {ok, It2} = emqx_replay_local_store:restore_iterator(?SHARD, ReplayID), {It3, Res100} = iterate(It2, 88), % preserve on the generation boundary ok = emqx_replay_local_store:preserve_iterator(It3, ReplayID), - {ok, It4} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID), + {ok, It4} = emqx_replay_local_store:restore_iterator(?SHARD, ReplayID), {It5, Res200} = iterate(It4, 1000), ?assertEqual(none, It5), ?assertEqual( @@ -220,16 +220,16 @@ t_iterate_multigen_preserve_restore(_Config) -> ), ?assertEqual( ok, - emqx_replay_local_store:discard_iterator(?ZONE, ReplayID) + emqx_replay_local_store:discard_iterator(?SHARD, ReplayID) ), ?assertEqual( {error, not_found}, - emqx_replay_local_store:restore_iterator(?ZONE, ReplayID) + emqx_replay_local_store:restore_iterator(?SHARD, ReplayID) ). -store(Zone, PublishedAt, Topic, Payload) -> +store(Shard, PublishedAt, Topic, Payload) -> ID = emqx_guid:gen(), - emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload). + emqx_replay_local_store:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload). iterate(DB, TopicFilter, StartTime) -> iterate(iterator(DB, TopicFilter, StartTime)). @@ -274,15 +274,15 @@ end_per_suite(_Config) -> ok = application:stop(emqx_replay). init_per_testcase(TC, Config) -> - ok = set_zone_config(zone(TC), ?DEFAULT_CONFIG), - {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)), + ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG), + {ok, _} = emqx_replay_local_store_sup:start_shard(shard(TC)), Config. end_per_testcase(TC, _Config) -> - ok = emqx_replay_local_store_sup:stop_zone(zone(TC)). + ok = emqx_replay_local_store_sup:stop_shard(shard(TC)). -zone(TC) -> - list_to_atom(lists:concat([?MODULE, "_", TC])). +shard(TC) -> + list_to_binary(lists:concat([?MODULE, "_", TC])). -set_zone_config(Zone, Config) -> - ok = application:set_env(emqx_replay, zone_config, #{Zone => Config}). +set_shard_config(Shard, Config) -> + ok = application:set_env(emqx_replay, shard_config, #{Shard => Config}). diff --git a/mix.exs b/mix.exs index 7b76bdc4b..b68312bb9 100644 --- a/mix.exs +++ b/mix.exs @@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, {:esockd, github: "emqx/esockd", tag: "5.9.6", override: true}, - {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-9", override: true}, + {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-11", override: true}, {:ekka, github: "emqx/ekka", tag: "0.15.1", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, diff --git a/rebar.config b/rebar.config index 0c9a79e24..8b4504a05 100644 --- a/rebar.config +++ b/rebar.config @@ -61,7 +61,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}} - , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-9"}}} + , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-11"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}