diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 3822078b7..4c8fc4b67 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -34,7 +34,8 @@ {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, - {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}} + {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}, + {ra, "2.7.3"} ]}. {plugins, [{rebar3_proper, "0.12.1"}, rebar3_path_deps]}. diff --git a/apps/emqx/src/emqx_ds_schema.erl b/apps/emqx/src/emqx_ds_schema.erl index 5c552404e..bbbd4eafd 100644 --- a/apps/emqx/src/emqx_ds_schema.erl +++ b/apps/emqx/src/emqx_ds_schema.erl @@ -36,12 +36,15 @@ %% API %%================================================================================ -translate_builtin(#{ - backend := builtin, - n_shards := NShards, - replication_factor := ReplFactor, - layout := Layout -}) -> +translate_builtin( + Backend = #{ + backend := builtin, + n_shards := NShards, + n_sites := NSites, + replication_factor := ReplFactor, + layout := Layout + } +) -> Storage = case Layout of #{ @@ -61,7 +64,9 @@ translate_builtin(#{ #{ backend => builtin, n_shards => NShards, + n_sites => NSites, replication_factor => ReplFactor, + replication_options => maps:get(replication_options, Backend, #{}), storage => Storage }. @@ -126,6 +131,16 @@ fields(builtin) -> desc => ?DESC(builtin_n_shards) } )}, + %% TODO: Deprecate once cluster management and rebalancing is implemented. + {"n_sites", + sc( + pos_integer(), + #{ + default => 1, + importance => ?IMPORTANCE_HIDDEN, + desc => ?DESC(builtin_n_sites) + } + )}, {replication_factor, sc( pos_integer(), @@ -134,6 +149,15 @@ fields(builtin) -> importance => ?IMPORTANCE_HIDDEN } )}, + %% TODO: Elaborate. + {"replication_options", + sc( + hoconsc:map(name, any()), + #{ + default => #{}, + importance => ?IMPORTANCE_HIDDEN + } + )}, {local_write_buffer, sc( ref(builtin_local_write_buffer), @@ -201,7 +225,7 @@ fields(layout_builtin_wildcard_optimized) -> sc( range(0, 64), #{ - default => 10, + default => 20, importance => ?IMPORTANCE_HIDDEN, desc => ?DESC(wildcard_optimized_epoch_bits) } diff --git a/apps/emqx/src/emqx_message.erl b/apps/emqx/src/emqx_message.erl index 8ec31f479..f8dd157f7 100644 --- a/apps/emqx/src/emqx_message.erl +++ b/apps/emqx/src/emqx_message.erl @@ -38,7 +38,8 @@ from/1, topic/1, payload/1, - timestamp/1 + timestamp/1, + timestamp/2 ]). %% Flags @@ -79,7 +80,10 @@ estimate_size/1 ]). --export_type([message_map/0]). +-export_type([ + timestamp/0, + message_map/0 +]). -type message_map() :: #{ id := binary(), @@ -89,10 +93,14 @@ headers := emqx_types:headers(), topic := emqx_types:topic(), payload := emqx_types:payload(), - timestamp := integer(), + timestamp := timestamp(), extra := _ }. +%% Message timestamp +%% Granularity: milliseconds. +-type timestamp() :: non_neg_integer(). + -elvis([{elvis_style, god_modules, disable}]). -spec make(emqx_types:topic(), emqx_types:payload()) -> emqx_types:message(). @@ -201,9 +209,14 @@ topic(#message{topic = Topic}) -> Topic. -spec payload(emqx_types:message()) -> emqx_types:payload(). payload(#message{payload = Payload}) -> Payload. --spec timestamp(emqx_types:message()) -> integer(). +-spec timestamp(emqx_types:message()) -> timestamp(). timestamp(#message{timestamp = TS}) -> TS. +-spec timestamp(emqx_types:message(), second | millisecond | microsecond) -> non_neg_integer(). +timestamp(#message{timestamp = TS}, second) -> TS div 1000; +timestamp(#message{timestamp = TS}, millisecond) -> TS; +timestamp(#message{timestamp = TS}, microsecond) -> TS * 1000. + -spec is_sys(emqx_types:message()) -> boolean(). is_sys(#message{flags = #{sys := true}}) -> true; @@ -416,7 +429,7 @@ from_map(#{ }. %% @doc Get current timestamp in milliseconds. --spec timestamp_now() -> integer(). +-spec timestamp_now() -> timestamp(). timestamp_now() -> erlang:system_time(millisecond). diff --git a/apps/emqx/src/emqx_rpc.erl b/apps/emqx/src/emqx_rpc.erl index 61aa2a8ca..5cc8ca4df 100644 --- a/apps/emqx/src/emqx_rpc.erl +++ b/apps/emqx/src/emqx_rpc.erl @@ -37,7 +37,6 @@ badrpc/0, call_result/1, call_result/0, - cast_result/0, multicall_result/1, multicall_result/0, erpc/1, diff --git a/apps/emqx/test/emqx_bpapi_static_checks.erl b/apps/emqx/test/emqx_bpapi_static_checks.erl index 5cc8226a4..32008b833 100644 --- a/apps/emqx/test/emqx_bpapi_static_checks.erl +++ b/apps/emqx/test/emqx_bpapi_static_checks.erl @@ -48,7 +48,7 @@ %% Applications and modules we wish to ignore in the analysis: -define(IGNORED_APPS, - "gen_rpc, recon, redbug, observer_cli, snabbkaffe, ekka, mria, amqp_client, rabbit_common, esaml" + "gen_rpc, recon, redbug, observer_cli, snabbkaffe, ekka, mria, amqp_client, rabbit_common, esaml, ra" ). -define(IGNORED_MODULES, "emqx_rpc"). -define(FORCE_DELETED_MODULES, [ diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 14989bdd8..94bc58908 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -33,10 +33,6 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - %% avoid inter-suite flakiness... - %% TODO: remove after other suites start to use `emx_cth_suite' - application:stop(emqx), - application:stop(emqx_durable_storage), Config. end_per_suite(_Config) -> @@ -45,19 +41,33 @@ end_per_suite(_Config) -> init_per_testcase(t_session_subscription_iterators = TestCase, Config) -> Cluster = cluster(), Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}), + _ = wait_shards_online(Nodes), [{nodes, Nodes} | Config]; init_per_testcase(t_message_gc = TestCase, Config) -> Opts = #{ extra_emqx_conf => - "\n session_persistence.message_retention_period = 1s" + "\n session_persistence.message_retention_period = 3s" "\n durable_storage.messages.n_shards = 3" }, common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts); +init_per_testcase(t_replication_options = TestCase, Config) -> + Opts = #{ + extra_emqx_conf => + "\n durable_storage.messages.replication_options {" + "\n wal_max_size_bytes = 16000000" + "\n wal_max_batch_size = 1024" + "\n wal_write_strategy = o_sync" + "\n wal_sync_method = datasync" + "\n wal_compute_checksums = false" + "\n snapshot_interval = 64" + "\n resend_window = 60" + "\n}" + }, + common_init_per_testcase(TestCase, Config, Opts); init_per_testcase(TestCase, Config) -> common_init_per_testcase(TestCase, Config, _Opts = #{}). common_init_per_testcase(TestCase, Config, Opts) -> - ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), Apps = emqx_cth_suite:start( app_specs(Opts), #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} @@ -67,14 +77,11 @@ common_init_per_testcase(TestCase, Config, Opts) -> end_per_testcase(t_session_subscription_iterators, Config) -> Nodes = ?config(nodes, Config), emqx_common_test_helpers:call_janitor(60_000), - ok = emqx_cth_cluster:stop(Nodes), - end_per_testcase(common, Config); + ok = emqx_cth_cluster:stop(Nodes); end_per_testcase(_TestCase, Config) -> Apps = proplists:get_value(apps, Config, []), emqx_common_test_helpers:call_janitor(60_000), - clear_db(), - emqx_cth_suite:stop(Apps), - ok. + ok = emqx_cth_suite:stop(Apps). t_messages_persisted(_Config) -> C1 = connect(<>, true, 30), @@ -390,7 +397,7 @@ t_message_gc(Config) -> message(<<"foo/bar">>, <<"1">>, 0), message(<<"foo/baz">>, <<"2">>, 1) ], - ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0), + ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0, #{sync => true}), ?tp(inserted_batch, #{}), {ok, _} = ?block_until(#{?snk_kind := ps_message_gc_added_gen}), @@ -399,7 +406,7 @@ t_message_gc(Config) -> message(<<"foo/bar">>, <<"3">>, Now + 100), message(<<"foo/baz">>, <<"4">>, Now + 101) ], - ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1), + ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1, #{sync => true}), {ok, _} = snabbkaffe:block_until( ?match_n_events(NShards, #{?snk_kind := message_gc_generation_dropped}), @@ -455,6 +462,33 @@ t_metrics_not_dropped(_Config) -> ok. +t_replication_options(_Config) -> + ?assertMatch( + #{ + backend := builtin, + replication_options := #{ + wal_max_size_bytes := 16000000, + wal_max_batch_size := 1024, + wal_write_strategy := o_sync, + wal_sync_method := datasync, + wal_compute_checksums := false, + snapshot_interval := 64, + resend_window := 60 + } + }, + emqx_ds_replication_layer_meta:get_options(?PERSISTENT_MESSAGE_DB) + ), + ?assertMatch( + #{ + wal_max_size_bytes := 16000000, + wal_max_batch_size := 1024, + wal_write_strategy := o_sync, + wal_compute_checksums := false, + wal_sync_method := datasync + }, + ra_system:fetch(?PERSISTENT_MESSAGE_DB) + ). + %% connect(ClientId, CleanStart, EI) -> @@ -524,22 +558,24 @@ app_specs(Opts) -> ]. cluster() -> - Spec = #{role => core, apps => app_specs()}, + ExtraConf = "\n durable_storage.messages.n_sites = 2", + Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})}, [ {persistent_messages_SUITE1, Spec}, {persistent_messages_SUITE2, Spec} ]. +wait_shards_online(Nodes = [Node | _]) -> + NShards = erpc:call(Node, emqx_ds_replication_layer_meta, n_shards, [?PERSISTENT_MESSAGE_DB]), + ?retry(500, 10, [?assertEqual(NShards, shards_online(N)) || N <- Nodes]). + +shards_online(Node) -> + length(erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [?PERSISTENT_MESSAGE_DB])). + get_mqtt_port(Node, Type) -> {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), Port. -clear_db() -> - ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), - mria:stop(), - ok = mnesia:delete_schema([node()]), - ok. - message(Topic, Payload, PublishedAt) -> #message{ topic = Topic, diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 4143c9ffd..3053127d3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -43,6 +43,7 @@ %% Misc. API: -export([count/1]). +-export([timestamp_us/0]). -export_type([ create_db_opts/0, @@ -147,9 +148,8 @@ -type error(Reason) :: {error, recoverable | unrecoverable, Reason}. %% Timestamp +%% Each message must have unique timestamp. %% Earliest possible timestamp is 0. -%% TODO granularity? Currently, we should always use milliseconds, as that's the unit we -%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. -type time() :: non_neg_integer(). -type message_store_opts() :: @@ -295,6 +295,7 @@ drop_db(DB) -> undefined -> ok; Module -> + _ = persistent_term:erase(?persistent_term(DB)), Module:drop_db(DB) end. @@ -394,6 +395,10 @@ count(DB) -> %% Internal exports %%================================================================================ +-spec timestamp_us() -> time(). +timestamp_us() -> + erlang:system_time(microsecond). + %%================================================================================ %% Internal functions %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index e522f2a09..100d7fa1f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -112,7 +112,9 @@ vector_to_key/2, bin_vector_to_key/2, key_to_vector/2, + key_to_coord/3, bin_key_to_vector/2, + bin_key_to_coord/3, key_to_bitstring/2, bitstring_to_key/2, make_filter/2, @@ -297,13 +299,7 @@ bin_vector_to_key(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size = key_to_vector(#keymapper{vec_scanner = Scanner}, Key) -> lists:map( fun(Actions) -> - lists:foldl( - fun(Action, Acc) -> - Acc bor extract_inv(Key, Action) - end, - 0, - Actions - ) + extract_coord(Actions, Key) end, Scanner ). @@ -324,6 +320,16 @@ bin_key_to_vector(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size = DimSizeof ). +-spec key_to_coord(keymapper(), scalar(), dimension()) -> coord(). +key_to_coord(#keymapper{vec_scanner = Scanner}, Key, Dim) -> + Actions = lists:nth(Dim, Scanner), + extract_coord(Actions, Key). + +-spec bin_key_to_coord(keymapper(), key(), dimension()) -> coord(). +bin_key_to_coord(Keymapper = #keymapper{key_size = Size}, BinKey, Dim) -> + <> = BinKey, + key_to_coord(Keymapper, Key, Dim). + %% @doc Transform a bitstring to a key -spec bitstring_to_key(keymapper(), bitstring()) -> scalar(). bitstring_to_key(#keymapper{key_size = Size}, Bin) -> @@ -680,6 +686,15 @@ extract_inv(Dest, #scan_action{ }) -> ((Dest bsr DestOffset) band SrcBitmask) bsl SrcOffset. +extract_coord(Actions, Key) -> + lists:foldl( + fun(Action, Acc) -> + Acc bor extract_inv(Key, Action) + end, + 0, + Actions + ). + ones(Bits) -> 1 bsl Bits - 1. diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 68aa0ee90..a93a94168 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -21,7 +21,8 @@ -behaviour(supervisor). %% API: --export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1]). +-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1, ensure_egress/1]). +-export([which_shards/1]). %% behaviour callbacks: -export([init/1]). @@ -36,12 +37,14 @@ -define(via(REC), {via, gproc, {n, l, REC}}). -define(db_sup, ?MODULE). --define(shard_sup, emqx_ds_builtin_db_shard_sup). +-define(shards_sup, emqx_ds_builtin_db_shards_sup). -define(egress_sup, emqx_ds_builtin_db_egress_sup). +-define(shard_sup, emqx_ds_builtin_db_shard_sup). -record(?db_sup, {db}). --record(?shard_sup, {db}). +-record(?shards_sup, {db}). -record(?egress_sup, {db}). +-record(?shard_sup, {db, shard}). %%================================================================================ %% API funcions @@ -53,8 +56,8 @@ start_db(DB, Opts) -> -spec start_shard(emqx_ds_storage_layer:shard_id()) -> supervisor:startchild_ret(). -start_shard(Shard = {DB, _}) -> - supervisor:start_child(?via(#?shard_sup{db = DB}), shard_spec(DB, Shard)). +start_shard({DB, Shard}) -> + supervisor:start_child(?via(#?shards_sup{db = DB}), shard_spec(DB, Shard)). -spec start_egress(emqx_ds_storage_layer:shard_id()) -> supervisor:startchild_ret(). @@ -63,21 +66,24 @@ start_egress({DB, Shard}) -> -spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}. stop_shard(Shard = {DB, _}) -> - Sup = ?via(#?shard_sup{db = DB}), + Sup = ?via(#?shards_sup{db = DB}), ok = supervisor:terminate_child(Sup, Shard), ok = supervisor:delete_child(Sup, Shard). -spec ensure_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}. ensure_shard(Shard) -> - case start_shard(Shard) of - {ok, _Pid} -> - ok; - {error, {already_started, _Pid}} -> - ok; - {error, Reason} -> - {error, Reason} - end. + ensure_started(start_shard(Shard)). + +-spec ensure_egress(emqx_ds_storage_layer:shard_id()) -> + ok | {error, _Reason}. +ensure_egress(Shard) -> + ensure_started(start_egress(Shard)). + +-spec which_shards(emqx_ds:db()) -> + [_Child]. +which_shards(DB) -> + supervisor:which_children(?via(#?shards_sup{db = DB})). %%================================================================================ %% behaviour callbacks @@ -86,45 +92,78 @@ ensure_shard(Shard) -> init({#?db_sup{db = DB}, DefaultOpts}) -> %% Spec for the top-level supervisor for the database: logger:notice("Starting DS DB ~p", [DB]), - _ = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), - %% TODO: before the leader election is implemented, we set ourselves as the leader for all shards: - MyShards = emqx_ds_replication_layer_meta:my_shards(DB), - lists:foreach( - fun(Shard) -> - emqx_ds_replication_layer:maybe_set_myself_as_leader(DB, Shard) - end, - MyShards - ), - Children = [sup_spec(#?shard_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, [])], + Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), + ok = start_ra_system(DB, Opts), + Children = [ + sup_spec(#?shards_sup{db = DB}, []), + sup_spec(#?egress_sup{db = DB}, []), + shard_allocator_spec(DB, Opts) + ], SupFlags = #{ strategy => one_for_all, intensity => 0, period => 1 }, {ok, {SupFlags, Children}}; -init({#?shard_sup{db = DB}, _}) -> - %% Spec for the supervisor that manages the worker processes for +init({#?shards_sup{db = _DB}, _}) -> + %% Spec for the supervisor that manages the supervisors for %% each local shard of the DB: - MyShards = emqx_ds_replication_layer_meta:my_shards(DB), - Children = [shard_spec(DB, Shard) || Shard <- MyShards], SupFlags = #{ strategy => one_for_one, intensity => 10, period => 1 }, - {ok, {SupFlags, Children}}; -init({#?egress_sup{db = DB}, _}) -> + {ok, {SupFlags, []}}; +init({#?egress_sup{db = _DB}, _}) -> %% Spec for the supervisor that manages the egress proxy processes %% managing traffic towards each of the shards of the DB: - Shards = emqx_ds_replication_layer_meta:shards(DB), - Children = [egress_spec(DB, Shard) || Shard <- Shards], SupFlags = #{ strategy => one_for_one, intensity => 0, period => 1 }, + {ok, {SupFlags, []}}; +init({#?shard_sup{db = DB, shard = Shard}, _}) -> + SupFlags = #{ + strategy => rest_for_one, + intensity => 10, + period => 100 + }, + Opts = emqx_ds_replication_layer_meta:get_options(DB), + Children = [ + shard_storage_spec(DB, Shard, Opts), + shard_replication_spec(DB, Shard, Opts) + ], {ok, {SupFlags, Children}}. +start_ra_system(DB, #{replication_options := ReplicationOpts}) -> + DataDir = filename:join([emqx_ds:base_dir(), DB, dsrepl]), + Config = lists:foldr(fun maps:merge/2, #{}, [ + ra_system:default_config(), + #{ + name => DB, + data_dir => DataDir, + wal_data_dir => DataDir, + names => ra_system:derive_names(DB) + }, + maps:with( + [ + wal_max_size_bytes, + wal_max_batch_size, + wal_write_strategy, + wal_sync_method, + wal_compute_checksums + ], + ReplicationOpts + ) + ]), + case ra_system:start(Config) of + {ok, _System} -> + ok; + {error, {already_started, _System}} -> + ok + end. + %%================================================================================ %% Internal exports %%================================================================================ @@ -145,15 +184,39 @@ sup_spec(Id, Options) -> }. shard_spec(DB, Shard) -> - Options = emqx_ds_replication_layer_meta:get_options(DB), #{ - id => Shard, - start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]}, + id => {shard, Shard}, + start => {?MODULE, start_link_sup, [#?shard_sup{db = DB, shard = Shard}, []]}, + shutdown => infinity, + restart => permanent, + type => supervisor + }. + +shard_storage_spec(DB, Shard, Opts) -> + #{ + id => {Shard, storage}, + start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Opts]}, shutdown => 5_000, restart => permanent, type => worker }. +shard_replication_spec(DB, Shard, Opts) -> + #{ + id => {Shard, replication}, + start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard, Opts]}, + restart => transient, + type => worker + }. + +shard_allocator_spec(DB, Opts) -> + #{ + id => shard_allocator, + start => {emqx_ds_replication_shard_allocator, start_link, [DB, Opts]}, + restart => permanent, + type => worker + }. + egress_spec(DB, Shard) -> #{ id => Shard, @@ -162,3 +225,13 @@ egress_spec(DB, Shard) -> restart => permanent, type => worker }. + +ensure_started(Res) -> + case Res of + {ok, _Pid} -> + ok; + {error, {already_started, _Pid}} -> + ok; + {error, Reason} -> + {error, Reason} + end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 9135819f9..72f142b8f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -36,27 +36,36 @@ update_iterator/3, next/3, delete_next/4, - node_of_shard/2, - shard_of_message/3, - maybe_set_myself_as_leader/2 + shard_of_message/3 ]). %% internal exports: -export([ + %% RPC Targets: do_drop_db_v1/1, do_store_batch_v1/4, do_get_streams_v1/4, do_get_streams_v2/4, - do_make_iterator_v1/5, do_make_iterator_v2/5, do_update_iterator_v2/4, do_next_v1/4, - do_add_generation_v2/1, do_list_generations_with_lifetimes_v3/2, - do_drop_generation_v3/3, do_get_delete_streams_v4/4, do_make_delete_iterator_v4/5, - do_delete_next_v4/5 + do_delete_next_v4/5, + %% Unused: + do_drop_generation_v3/3, + %% Obsolete: + do_make_iterator_v1/5, + do_add_generation_v2/1, + + %% Egress API: + ra_store_batch/3 +]). + +-export([ + init/1, + apply/3 ]). -export_type([ @@ -85,7 +94,9 @@ backend := builtin, storage := emqx_ds_storage_layer:prototype(), n_shards => pos_integer(), - replication_factor => pos_integer() + n_sites => pos_integer(), + replication_factor => pos_integer(), + replication_options => _TODO :: #{} }. %% This enapsulates the stream entity from the replication level. @@ -150,13 +161,19 @@ open_db(DB, CreateOpts) -> -spec add_generation(emqx_ds:db()) -> ok | {error, _}. add_generation(DB) -> - Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB), - _ = emqx_ds_proto_v4:add_generation(Nodes, DB), - ok. + foreach_shard( + DB, + fun(Shard) -> ok = ra_add_generation(DB, Shard) end + ). -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. update_db_config(DB, CreateOpts) -> - emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts). + ok = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts), + Opts = emqx_ds_replication_layer_meta:get_options(DB), + foreach_shard( + DB, + fun(Shard) -> ok = ra_update_config(DB, Shard, Opts) end + ). -spec list_generations_with_lifetimes(emqx_ds:db()) -> #{generation_rank() => emqx_ds:generation_info()}. @@ -164,13 +181,12 @@ list_generations_with_lifetimes(DB) -> Shards = list_shards(DB), lists:foldl( fun(Shard, GensAcc) -> - Node = node_of_shard(DB, Shard), maps:fold( fun(GenId, Data, AccInner) -> AccInner#{{Shard, GenId} => Data} end, GensAcc, - emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard) + ra_list_generations_with_lifetimes(DB, Shard) ) end, #{}, @@ -179,18 +195,15 @@ list_generations_with_lifetimes(DB) -> -spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}. drop_generation(DB, {Shard, GenId}) -> - %% TODO: drop generation in all nodes in the replica set, not only in the leader, - %% after we have proper replication in place. - Node = node_of_shard(DB, Shard), - emqx_ds_proto_v4:drop_generation(Node, DB, Shard, GenId). + ra_drop_generation(DB, Shard, GenId). -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> - Nodes = list_nodes(), - _ = emqx_ds_proto_v4:drop_db(Nodes, DB), - _ = emqx_ds_replication_layer_meta:drop_db(DB), - emqx_ds_builtin_sup:stop_db(DB), - ok. + foreach_shard(DB, fun(Shard) -> + {ok, _} = ra_drop_shard(DB, Shard) + end), + _ = emqx_ds_proto_v4:drop_db(list_nodes(), DB), + emqx_ds_replication_layer_meta:drop_db(DB). -spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). @@ -208,10 +221,9 @@ get_streams(DB, TopicFilter, StartTime) -> Shards = list_shards(DB), lists:flatmap( fun(Shard) -> - Node = node_of_shard(DB, Shard), Streams = try - emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime) + ra_get_streams(DB, Shard, TopicFilter, StartTime) catch error:{erpc, _} -> %% TODO: log? @@ -235,8 +247,7 @@ get_delete_streams(DB, TopicFilter, StartTime) -> Shards = list_shards(DB), lists:flatmap( fun(Shard) -> - Node = node_of_shard(DB, Shard), - Streams = emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, StartTime), + Streams = ra_get_delete_streams(DB, Shard, TopicFilter, StartTime), lists:map( fun(StorageLayerStream) -> ?delete_stream(Shard, StorageLayerStream) @@ -251,8 +262,7 @@ get_delete_streams(DB, TopicFilter, StartTime) -> emqx_ds:make_iterator_result(iterator()). make_iterator(DB, Stream, TopicFilter, StartTime) -> ?stream_v2(Shard, StorageStream) = Stream, - Node = node_of_shard(DB, Shard), - try emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of + try ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; Error = {error, _, _} -> @@ -266,12 +276,7 @@ make_iterator(DB, Stream, TopicFilter, StartTime) -> emqx_ds:make_delete_iterator_result(delete_iterator()). make_delete_iterator(DB, Stream, TopicFilter, StartTime) -> ?delete_stream(Shard, StorageStream) = Stream, - Node = node_of_shard(DB, Shard), - case - emqx_ds_proto_v4:make_delete_iterator( - Node, DB, Shard, StorageStream, TopicFilter, StartTime - ) - of + case ra_make_delete_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}}; Err = {error, _} -> @@ -282,8 +287,7 @@ make_delete_iterator(DB, Stream, TopicFilter, StartTime) -> emqx_ds:make_iterator_result(iterator()). update_iterator(DB, OldIter, DSKey) -> #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter, - Node = node_of_shard(DB, Shard), - try emqx_ds_proto_v4:update_iterator(Node, DB, Shard, StorageIter, DSKey) of + try ra_update_iterator(DB, Shard, StorageIter, DSKey) of {ok, Iter} -> {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; Error = {error, _, _} -> @@ -296,7 +300,6 @@ update_iterator(DB, OldIter, DSKey) -> -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). next(DB, Iter0, BatchSize) -> #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0, - Node = node_of_shard(DB, Shard), %% TODO: iterator can contain information that is useful for %% reconstructing messages sent over the network. For example, %% when we send messages with the learned topic index, we could @@ -305,7 +308,7 @@ next(DB, Iter0, BatchSize) -> %% %% This kind of trickery should be probably done here in the %% replication layer. Or, perhaps, in the logic layer. - case emqx_ds_proto_v4:next(Node, DB, Shard, StorageIter0, BatchSize) of + case ra_next(DB, Shard, StorageIter0, BatchSize) of {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, Batch}; @@ -321,8 +324,7 @@ next(DB, Iter0, BatchSize) -> emqx_ds:delete_next_result(delete_iterator()). delete_next(DB, Iter0, Selector, BatchSize) -> #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0} = Iter0, - Node = node_of_shard(DB, Shard), - case emqx_ds_proto_v4:delete_next(Node, DB, Shard, StorageIter0, Selector, BatchSize) of + case ra_delete_next(DB, Shard, StorageIter0, Selector, BatchSize) of {ok, StorageIter, NumDeleted} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, NumDeleted}; @@ -330,21 +332,10 @@ delete_next(DB, Iter0, Selector, BatchSize) -> Other end. --spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). -node_of_shard(DB, Shard) -> - case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of - {ok, Leader} -> - Leader; - {error, no_leader_for_shard} -> - %% TODO: use optvar - timer:sleep(500), - node_of_shard(DB, Shard) - end. - -spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> emqx_ds_replication_layer:shard_id(). shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> - N = emqx_ds_replication_layer_meta:n_shards(DB), + N = emqx_ds_replication_shard_allocator:n_shards(DB), Hash = case SerializeBy of clientid -> erlang:phash2(From, N); @@ -352,18 +343,8 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> end, integer_to_binary(Hash). -%% TODO: there's no real leader election right now --spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok. -maybe_set_myself_as_leader(DB, Shard) -> - Site = emqx_ds_replication_layer_meta:this_site(), - case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of - [Site | _] -> - %% Currently the first in-sync replica always becomes the - %% leader - ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node()); - _Sites -> - ok - end. +foreach_shard(DB, Fun) -> + lists:foreach(Fun, list_shards(DB)). %%================================================================================ %% behavior callbacks @@ -392,7 +373,8 @@ do_drop_db_v1(DB) -> ) -> emqx_ds:store_batch_result(). do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) -> - emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options). + Batch = [{emqx_message:timestamp(Message), Message} || Message <- Messages], + emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options). %% Remove me in EMQX 5.6 -dialyzer({nowarn_function, do_get_streams_v1/4}). @@ -477,15 +459,9 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) -> emqx_ds_storage_layer:delete_next({DB, Shard}, Iter, Selector, BatchSize). --spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}. -do_add_generation_v2(DB) -> - MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB), - lists:foreach( - fun(ShardId) -> - emqx_ds_storage_layer:add_generation({DB, ShardId}) - end, - MyShards - ). +-spec do_add_generation_v2(emqx_ds:db()) -> no_return(). +do_add_generation_v2(_DB) -> + error(obsolete_api). -spec do_list_generations_with_lifetimes_v3(emqx_ds:db(), shard_id()) -> #{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}. @@ -510,3 +486,188 @@ do_get_delete_streams_v4(DB, Shard, TopicFilter, StartTime) -> list_nodes() -> mria:running_nodes(). + +%% TODO +%% Too large for normal operation, need better backpressure mechanism. +-define(RA_TIMEOUT, 60 * 1000). + +ra_store_batch(DB, Shard, Messages) -> + Command = #{ + ?tag => ?BATCH, + ?batch_messages => Messages + }, + Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), + case ra:process_command(Servers, Command, ?RA_TIMEOUT) of + {ok, Result, _Leader} -> + Result; + Error -> + Error + end. + +ra_add_generation(DB, Shard) -> + Command = #{ + ?tag => add_generation, + ?since => emqx_ds:timestamp_us() + }, + Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), + case ra:process_command(Servers, Command, ?RA_TIMEOUT) of + {ok, Result, _Leader} -> + Result; + Error -> + error(Error, [DB, Shard]) + end. + +ra_update_config(DB, Shard, Opts) -> + Command = #{ + ?tag => update_config, + ?config => Opts, + ?since => emqx_ds:timestamp_us() + }, + Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), + case ra:process_command(Servers, Command, ?RA_TIMEOUT) of + {ok, Result, _Leader} -> + Result; + Error -> + error(Error, [DB, Shard]) + end. + +ra_drop_generation(DB, Shard, GenId) -> + Command = #{?tag => drop_generation, ?generation => GenId}, + Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), + case ra:process_command(Servers, Command, ?RA_TIMEOUT) of + {ok, Result, _Leader} -> + Result; + Error -> + error(Error, [DB, Shard]) + end. + +ra_get_streams(DB, Shard, TopicFilter, Time) -> + {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + TimestampUs = timestamp_to_timeus(Time), + emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs). + +ra_get_delete_streams(DB, Shard, TopicFilter, Time) -> + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time). + +ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> + {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + TimestampUs = timestamp_to_timeus(StartTime), + emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimestampUs). + +ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). + +ra_update_iterator(DB, Shard, Iter, DSKey) -> + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey). + +ra_next(DB, Shard, Iter, BatchSize) -> + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize). + +ra_delete_next(DB, Shard, Iter, Selector, BatchSize) -> + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize). + +ra_list_generations_with_lifetimes(DB, Shard) -> + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + Gens = emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard), + maps:map( + fun(_GenId, Data = #{since := Since, until := Until}) -> + Data#{ + since := timeus_to_timestamp(Since), + until := emqx_maybe:apply(fun timeus_to_timestamp/1, Until) + } + end, + Gens + ). + +ra_drop_shard(DB, Shard) -> + ra:delete_cluster(emqx_ds_replication_layer_shard:shard_servers(DB, Shard), ?RA_TIMEOUT). + +%% + +init(#{db := DB, shard := Shard}) -> + #{db_shard => {DB, Shard}, latest => 0}. + +apply( + #{index := RaftIdx}, + #{ + ?tag := ?BATCH, + ?batch_messages := MessagesIn + }, + #{db_shard := DBShard, latest := Latest} = State +) -> + %% NOTE + %% Unique timestamp tracking real time closely. + %% With microsecond granularity it should be nearly impossible for it to run + %% too far ahead than the real time clock. + {NLatest, Messages} = assign_timestamps(Latest, MessagesIn), + %% TODO + %% Batch is now reversed, but it should not make a lot of difference. + %% Even if it would be in order, it's still possible to write messages far away + %% in the past, i.e. when replica catches up with the leader. Storage layer + %% currently relies on wall clock time to decide if it's safe to iterate over + %% next epoch, this is likely wrong. Ideally it should rely on consensus clock + %% time instead. + Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), + NState = State#{latest := NLatest}, + %% TODO: Need to measure effects of changing frequency of `release_cursor`. + Effect = {release_cursor, RaftIdx, NState}, + {NState, Result, Effect}; +apply( + _RaftMeta, + #{?tag := add_generation, ?since := Since}, + #{db_shard := DBShard, latest := Latest} = State +) -> + {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest), + Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp), + NState = State#{latest := NLatest}, + {NState, Result}; +apply( + _RaftMeta, + #{?tag := update_config, ?since := Since, ?config := Opts}, + #{db_shard := DBShard, latest := Latest} = State +) -> + {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest), + Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts), + NState = State#{latest := NLatest}, + {NState, Result}; +apply( + _RaftMeta, + #{?tag := drop_generation, ?generation := GenId}, + #{db_shard := DBShard} = State +) -> + Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId), + {State, Result}. + +assign_timestamps(Latest, Messages) -> + assign_timestamps(Latest, Messages, []). + +assign_timestamps(Latest, [MessageIn | Rest], Acc) -> + case emqx_message:timestamp(MessageIn, microsecond) of + TimestampUs when TimestampUs > Latest -> + Message = assign_timestamp(TimestampUs, MessageIn), + assign_timestamps(TimestampUs, Rest, [Message | Acc]); + _Earlier -> + Message = assign_timestamp(Latest + 1, MessageIn), + assign_timestamps(Latest + 1, Rest, [Message | Acc]) + end; +assign_timestamps(Latest, [], Acc) -> + {Latest, Acc}. + +assign_timestamp(TimestampUs, Message) -> + {TimestampUs, Message}. + +ensure_monotonic_timestamp(TimestampUs, Latest) when TimestampUs > Latest -> + {TimestampUs, TimestampUs + 1}; +ensure_monotonic_timestamp(_TimestampUs, Latest) -> + {Latest, Latest + 1}. + +timestamp_to_timeus(TimestampMs) -> + TimestampMs * 1000. + +timeus_to_timestamp(TimestampUs) -> + TimestampUs div 1000. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl index b3a57d442..70812fa18 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl @@ -29,6 +29,16 @@ -define(tag, 1). -define(shard, 2). -define(enc, 3). + +%% ?BATCH -define(batch_messages, 2). +-define(timestamp, 3). + +%% add_generation / update_config +-define(config, 2). +-define(since, 3). + +%% drop_generation +-define(generation, 2). -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 8b1a9a835..6d879dbb6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -40,7 +40,6 @@ -export_type([]). --include("emqx_ds_replication_layer.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). %%================================================================================ @@ -109,7 +108,6 @@ store_batch(DB, Messages, Opts) -> -record(s, { db :: emqx_ds:db(), shard :: emqx_ds_replication_layer:shard_id(), - leader :: node(), n = 0 :: non_neg_integer(), tref :: reference(), batch = [] :: [emqx_types:message()], @@ -119,12 +117,9 @@ store_batch(DB, Messages, Opts) -> init([DB, Shard]) -> process_flag(trap_exit, true), process_flag(message_queue_data, off_heap), - %% TODO: adjust leader dynamically - Leader = shard_leader(DB, Shard), S = #s{ db = DB, shard = Shard, - leader = Leader, tref = start_timer() }, {ok, S}. @@ -156,16 +151,32 @@ terminate(_Reason, _S) -> %% Internal functions %%================================================================================ +-define(COOLDOWN_MIN, 1000). +-define(COOLDOWN_MAX, 5000). + do_flush(S = #s{batch = []}) -> S#s{tref = start_timer()}; do_flush( - S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard, leader = Leader} + S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard} ) -> - Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)}, - ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}), - [gen_server:reply(From, ok) || From <- lists:reverse(Replies)], - ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages}), - erlang:garbage_collect(), + case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of + ok -> + lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), + true = erlang:garbage_collect(), + ?tp( + emqx_ds_replication_layer_egress_flush, + #{db => DB, shard => Shard, batch => Messages} + ); + Error -> + true = erlang:garbage_collect(), + ?tp( + warning, + emqx_ds_replication_layer_egress_flush_failed, + #{db => DB, shard => Shard, reason => Error} + ), + Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), + ok = timer:sleep(Cooldown) + end, S#s{ n = 0, batch = [], @@ -212,13 +223,3 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies start_timer() -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), erlang:send_after(Interval, self(), ?flush). - -shard_leader(DB, Shard) -> - %% TODO: use optvar - case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of - {ok, Leader} -> - Leader; - {error, no_leader_for_shard} -> - timer:sleep(500), - shard_leader(DB, Shard) - end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 597c8bc0d..f84863c03 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -29,19 +29,15 @@ -export([ shards/1, my_shards/1, - my_owned_shards/1, - leader_nodes/1, + allocate_shards/2, replica_set/2, - in_sync_replicas/2, sites/0, + node/1, open_db/2, get_options/1, update_db_config/2, drop_db/1, - shard_leader/2, this_site/0, - set_leader/3, - is_leader/1, print_status/0 ]). @@ -51,12 +47,10 @@ %% internal exports: -export([ open_db_trans/2, + allocate_shards_trans/2, update_db_config_trans/2, drop_db_trans/1, claim_site/2, - in_sync_replicas_trans/2, - set_leader_trans/3, - is_leader_trans/1, n_shards/1 ]). @@ -95,9 +89,6 @@ %% Sites that should contain the data when the cluster is in the %% stable state (no nodes are being added or removed from it): replica_set :: [site()], - %% Sites that contain the actual data: - in_sync_replicas :: [site()], - leader :: node() | undefined, misc = #{} :: map() }). @@ -107,13 +98,24 @@ %% Peristent term key: -define(emqx_ds_builtin_site, emqx_ds_builtin_site). +%% Make Dialyzer happy +-define(NODE_PAT(), + %% Equivalent of `#?NODE_TAB{_ = '_'}`: + erlang:make_tuple(record_info(size, ?NODE_TAB), '_') +). + +-define(SHARD_PAT(SHARD), + %% Equivalent of `#?SHARD_TAB{shard = SHARD, _ = '_'}` + erlang:make_tuple(record_info(size, ?SHARD_TAB), '_', [{#?SHARD_TAB.shard, SHARD}]) +). + %%================================================================================ %% API funcions %%================================================================================ -spec print_status() -> ok. print_status() -> - io:format("THIS SITE:~n~s~n", [base64:encode(this_site())]), + io:format("THIS SITE:~n~s~n", [this_site()]), io:format("~nSITES:~n", []), Nodes = [node() | nodes()], lists:foreach( @@ -123,28 +125,18 @@ print_status() -> true -> up; false -> down end, - io:format("~s ~p ~p~n", [base64:encode(Site), Node, Status]) + io:format("~s ~p ~p~n", [Site, Node, Status]) end, eval_qlc(mnesia:table(?NODE_TAB)) ), io:format( - "~nSHARDS:~nId Leader Status~n", [] + "~nSHARDS:~nId Replicas~n", [] ), lists:foreach( - fun(#?SHARD_TAB{shard = {DB, Shard}, leader = Leader}) -> + fun(#?SHARD_TAB{shard = {DB, Shard}, replica_set = RS}) -> ShardStr = string:pad(io_lib:format("~p/~s", [DB, Shard]), 30), - LeaderStr = string:pad(atom_to_list(Leader), 33), - Status = - case lists:member(Leader, Nodes) of - true -> - case node() of - Leader -> "up *"; - _ -> "up" - end; - false -> - "down" - end, - io:format("~s ~s ~s~n", [ShardStr, LeaderStr, Status]) + ReplicasStr = string:pad(io_lib:format("~p", [RS]), 40), + io:format("~s ~s~n", [ShardStr, ReplicasStr]) end, eval_qlc(mnesia:table(?SHARD_TAB)) ). @@ -169,30 +161,19 @@ shards(DB) -> -spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. my_shards(DB) -> Site = this_site(), - filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet, in_sync_replicas = InSync}) -> - lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync) + filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet}) -> + lists:member(Site, ReplicaSet) end). --spec my_owned_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. -my_owned_shards(DB) -> - Self = node(), - filter_shards(DB, fun(#?SHARD_TAB{leader = Leader}) -> - Self =:= Leader - end). - --spec leader_nodes(emqx_ds:db()) -> [node()]. -leader_nodes(DB) -> - lists:uniq( - filter_shards( - DB, - fun(#?SHARD_TAB{leader = Leader}) -> - Leader =/= undefined - end, - fun(#?SHARD_TAB{leader = Leader}) -> - Leader - end - ) - ). +allocate_shards(DB, Opts) -> + case mria:transaction(?SHARD, fun ?MODULE:allocate_shards_trans/2, [DB, Opts]) of + {atomic, Shards} -> + {ok, Shards}; + {aborted, {shards_already_allocated, Shards}} -> + {ok, Shards}; + {aborted, {insufficient_sites_online, Needed, Sites}} -> + {error, #{reason => insufficient_sites_online, needed => Needed, sites => Sites}} + end. -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, [site()]} | {error, _}. @@ -204,46 +185,27 @@ replica_set(DB, Shard) -> {error, no_shard} end. --spec in_sync_replicas(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> - [site()]. -in_sync_replicas(DB, ShardId) -> - {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:in_sync_replicas_trans/2, [DB, ShardId]), - case Result of - {ok, InSync} -> - InSync; - {error, _} -> - [] - end. - -spec sites() -> [site()]. sites() -> eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])). --spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> - {ok, node()} | {error, no_leader_for_shard}. -shard_leader(DB, Shard) -> - case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of - [#?SHARD_TAB{leader = Leader}] when Leader =/= undefined -> - {ok, Leader}; - _ -> - {error, no_leader_for_shard} +-spec node(site()) -> node() | undefined. +node(Site) -> + case mnesia:dirty_read(?NODE_TAB, Site) of + [#?NODE_TAB{node = Node}] -> + Node; + [] -> + undefined end. --spec set_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) -> - ok. -set_leader(DB, Shard, Node) -> - {atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]), - ok. - --spec is_leader(node()) -> boolean(). -is_leader(Node) -> - {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]), - Result. - -spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts(). get_options(DB) -> - {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, undefined]), - Opts. + case mnesia:dirty_read(?META_TAB, DB) of + [#?META_TAB{db_props = Opts}] -> + Opts; + [] -> + #{} + end. -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> emqx_ds_replication_layer:builtin_db_opts(). @@ -275,7 +237,6 @@ init([]) -> logger:set_process_metadata(#{domain => [ds, meta]}), ensure_tables(), ensure_site(), - {ok, _} = mnesia:subscribe({table, ?META_TAB, detailed}), S = #s{}, {ok, S}. @@ -285,18 +246,6 @@ handle_call(_Call, _From, S) -> handle_cast(_Cast, S) -> {noreply, S}. -handle_info( - {mnesia_table_event, {write, ?META_TAB, #?META_TAB{db = DB, db_props = Options}, [_], _}}, S -) -> - MyShards = my_owned_shards(DB), - - lists:foreach( - fun(ShardId) -> - emqx_ds_storage_layer:update_config({DB, ShardId}, Options) - end, - MyShards - ), - {noreply, S}; handle_info(_Info, S) -> {noreply, S}. @@ -308,20 +257,60 @@ terminate(_Reason, #s{}) -> %% Internal exports %%================================================================================ --spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts() | undefined) -> +-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> emqx_ds_replication_layer:builtin_db_opts(). open_db_trans(DB, CreateOpts) -> case mnesia:wread({?META_TAB, DB}) of - [] when is_map(CreateOpts) -> - NShards = maps:get(n_shards, CreateOpts), - ReplicationFactor = maps:get(replication_factor, CreateOpts), + [] -> mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}), - create_shards(DB, NShards, ReplicationFactor), CreateOpts; [#?META_TAB{db_props = Opts}] -> Opts end. +-spec allocate_shards_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> [_Shard]. +allocate_shards_trans(DB, Opts) -> + NShards = maps:get(n_shards, Opts), + NSites = maps:get(n_sites, Opts), + ReplicationFactor = maps:get(replication_factor, Opts), + NReplicas = min(NSites, ReplicationFactor), + Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], + AllSites = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read), + case length(AllSites) of + N when N >= NSites -> + ok; + _ -> + mnesia:abort({insufficient_sites_online, NSites, AllSites}) + end, + case mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write) of + [] -> + ok; + Records -> + ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records], + mnesia:abort({shards_already_allocated, ShardsAllocated}) + end, + {Allocation, _} = lists:mapfoldl( + fun(Shard, SSites) -> + {Sites, _} = emqx_utils_stream:consume(NReplicas, SSites), + {_, SRest} = emqx_utils_stream:consume(1, SSites), + {{Shard, Sites}, SRest} + end, + emqx_utils_stream:repeat(emqx_utils_stream:list(AllSites)), + Shards + ), + lists:map( + fun({Shard, Sites}) -> + ReplicaSet = [Site || #?NODE_TAB{site = Site} <- Sites], + Record = #?SHARD_TAB{ + shard = {DB, Shard}, + replica_set = ReplicaSet + }, + ok = mnesia:write(Record), + Shard + end, + Allocation + ). + -spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> ok | {error, database}. update_db_config_trans(DB, CreateOpts) -> @@ -357,51 +346,13 @@ drop_db_trans(DB) -> claim_site(Site, Node) -> mnesia:write(#?NODE_TAB{site = Site, node = Node}). --spec in_sync_replicas_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> - {ok, [site()]} | {error, no_shard}. -in_sync_replicas_trans(DB, Shard) -> - case mnesia:read(?SHARD_TAB, {DB, Shard}) of - [#?SHARD_TAB{in_sync_replicas = InSync}] -> - {ok, InSync}; - [] -> - {error, no_shard} - end. - --spec set_leader_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) -> - ok. -set_leader_trans(DB, Shard, Node) -> - [Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}), - Record = Record0#?SHARD_TAB{leader = Node}, - mnesia:write(Record). - --spec is_leader_trans(node) -> boolean(). -is_leader_trans(Node) -> - case - mnesia:select( - ?SHARD_TAB, - ets:fun2ms(fun(#?SHARD_TAB{leader = Leader}) -> - Leader =:= Node - end), - 1, - read - ) - of - {[_ | _], _Cont} -> - true; - _ -> - false - end. - %%================================================================================ %% Internal functions %%================================================================================ ensure_tables() -> - %% TODO: seems like it may introduce flakiness - Majority = false, ok = mria:create_table(?META_TAB, [ {rlog_shard, ?SHARD}, - {majority, Majority}, {type, ordered_set}, {storage, disc_copies}, {record_name, ?META_TAB}, @@ -409,7 +360,6 @@ ensure_tables() -> ]), ok = mria:create_table(?NODE_TAB, [ {rlog_shard, ?SHARD}, - {majority, Majority}, {type, ordered_set}, {storage, disc_copies}, {record_name, ?NODE_TAB}, @@ -417,7 +367,6 @@ ensure_tables() -> ]), ok = mria:create_table(?SHARD_TAB, [ {rlog_shard, ?SHARD}, - {majority, Majority}, {type, ordered_set}, {storage, disc_copies}, {record_name, ?SHARD_TAB}, @@ -431,8 +380,8 @@ ensure_site() -> {ok, [Site]} -> ok; _ -> - Site = crypto:strong_rand_bytes(8), - logger:notice("Creating a new site with ID=~s", [base64:encode(Site)]), + Site = binary:encode_hex(crypto:strong_rand_bytes(8)), + logger:notice("Creating a new site with ID=~s", [Site]), ok = filelib:ensure_dir(Filename), {ok, FD} = file:open(Filename, [write]), io:format(FD, "~p.", [Site]), @@ -442,30 +391,6 @@ ensure_site() -> persistent_term:put(?emqx_ds_builtin_site, Site), ok. --spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok. -create_shards(DB, NShards, ReplicationFactor) -> - Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], - AllSites = sites(), - lists:foreach( - fun(Shard) -> - Hashes0 = [{hash(Shard, Site), Site} || Site <- AllSites], - Hashes = lists:sort(Hashes0), - {_, Sites} = lists:unzip(Hashes), - [First | ReplicaSet] = lists:sublist(Sites, 1, ReplicationFactor), - Record = #?SHARD_TAB{ - shard = {DB, Shard}, - replica_set = ReplicaSet, - in_sync_replicas = [First] - }, - mnesia:write(Record) - end, - Shards - ). - --spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any(). -hash(Shard, Site) -> - erlang:phash2({Shard, Site}). - eval_qlc(Q) -> case mnesia:is_transaction() of true -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl new file mode 100644 index 000000000..7540e01bb --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -0,0 +1,207 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ds_replication_layer_shard). + +-export([start_link/3]). + +%% Static server configuration +-export([ + shard_servers/2, + local_server/2 +]). + +%% Dynamic server location API +-export([ + servers/3, + server/3 +]). + +-behaviour(gen_server). +-export([ + init/1, + handle_call/3, + handle_cast/2, + terminate/2 +]). + +%% + +start_link(DB, Shard, Opts) -> + gen_server:start_link(?MODULE, {DB, Shard, Opts}, []). + +shard_servers(DB, Shard) -> + {ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard), + [ + {server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)} + || Site <- ReplicaSet + ]. + +local_server(DB, Shard) -> + Site = emqx_ds_replication_layer_meta:this_site(), + {server_name(DB, Shard, Site), node()}. + +cluster_name(DB, Shard) -> + iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])). + +server_name(DB, Shard, Site) -> + DBBin = atom_to_binary(DB), + binary_to_atom(<<"ds_", DBBin/binary, Shard/binary, "_", Site/binary>>). + +%% + +servers(DB, Shard, _Order = leader_preferred) -> + get_servers_leader_preferred(DB, Shard); +servers(DB, Shard, _Order = undefined) -> + get_shard_servers(DB, Shard). + +server(DB, Shard, _Which = local_preferred) -> + get_server_local_preferred(DB, Shard). + +get_servers_leader_preferred(DB, Shard) -> + %% NOTE: Contact last known leader first, then rest of shard servers. + ClusterName = get_cluster_name(DB, Shard), + case ra_leaderboard:lookup_leader(ClusterName) of + Leader when Leader /= undefined -> + Servers = ra_leaderboard:lookup_members(ClusterName), + [Leader | lists:delete(Leader, Servers)]; + undefined -> + %% TODO: Dynamic membership. + get_shard_servers(DB, Shard) + end. + +get_server_local_preferred(DB, Shard) -> + %% NOTE: Contact random replica that is not a known leader. + %% TODO: Replica may be down, so we may need to retry. + ClusterName = get_cluster_name(DB, Shard), + case ra_leaderboard:lookup_members(ClusterName) of + Servers when is_list(Servers) -> + pick_local(Servers); + undefined -> + %% TODO + %% Leader is unkonwn if there are no servers of this group on the + %% local node. We want to pick a replica in that case as well. + %% TODO: Dynamic membership. + pick_random(get_shard_servers(DB, Shard)) + end. + +pick_local(Servers) -> + case lists:dropwhile(fun({_Name, Node}) -> Node =/= node() end, Servers) of + [Local | _] -> + Local; + [] -> + pick_random(Servers) + end. + +pick_random(Servers) -> + lists:nth(rand:uniform(length(Servers)), Servers). + +get_cluster_name(DB, Shard) -> + memoize(fun cluster_name/2, [DB, Shard]). + +get_local_server(DB, Shard) -> + memoize(fun local_server/2, [DB, Shard]). + +get_shard_servers(DB, Shard) -> + maps:get(servers, emqx_ds_replication_shard_allocator:shard_meta(DB, Shard)). + +%% + +init({DB, Shard, Opts}) -> + _ = process_flag(trap_exit, true), + _Meta = start_shard(DB, Shard, Opts), + {ok, {DB, Shard}}. + +handle_call(_Call, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, {DB, Shard}) -> + LocalServer = get_local_server(DB, Shard), + ok = ra:stop_server(DB, LocalServer). + +%% + +start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> + Site = emqx_ds_replication_layer_meta:this_site(), + ClusterName = cluster_name(DB, Shard), + LocalServer = local_server(DB, Shard), + Servers = shard_servers(DB, Shard), + case ra:restart_server(DB, LocalServer) of + ok -> + Bootstrap = false; + {error, name_not_registered} -> + Bootstrap = true, + ok = ra:start_server(DB, #{ + id => LocalServer, + uid => <>, + cluster_name => ClusterName, + initial_members => Servers, + machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, + log_init_args => maps:with( + [ + snapshot_interval, + resend_window + ], + ReplicationOpts + ) + }) + end, + case Servers of + [LocalServer | _] -> + %% TODO + %% Not super robust, but we probably don't expect nodes to be down + %% when we bring up a fresh consensus group. Triggering election + %% is not really required otherwise. + %% TODO + %% Ensure that doing that on node restart does not disrupt consensus. + %% Edit: looks like it doesn't, this could actually be quite useful + %% to "steal" leadership from nodes that have too much leader load. + %% TODO + %% It doesn't really work that way. There's `ra:transfer_leadership/2` + %% for that. + try + ra:trigger_election(LocalServer, _Timeout = 1_000) + catch + %% TODO + %% Tolerating exceptions because server might be occupied with log + %% replay for a while. + exit:{timeout, _} when not Bootstrap -> + ok + end; + _ -> + ok + end, + #{ + cluster_name => ClusterName, + servers => Servers, + local_server => LocalServer + }. + +%% + +memoize(Fun, Args) -> + %% NOTE: Assuming that the function is pure and never returns `undefined`. + case persistent_term:get([Fun | Args], undefined) of + undefined -> + Result = erlang:apply(Fun, Args), + _ = persistent_term:put([Fun | Args], Result), + Result; + Result -> + Result + end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl new file mode 100644 index 000000000..6da33f09f --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -0,0 +1,154 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ds_replication_shard_allocator). + +-export([start_link/2]). + +-export([n_shards/1]). +-export([shard_meta/2]). + +-behaviour(gen_server). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-define(db_meta(DB), {?MODULE, DB}). +-define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}). + +%% + +start_link(DB, Opts) -> + gen_server:start_link(?MODULE, {DB, Opts}, []). + +n_shards(DB) -> + Meta = persistent_term:get(?db_meta(DB)), + maps:get(n_shards, Meta). + +shard_meta(DB, Shard) -> + persistent_term:get(?shard_meta(DB, Shard)). + +%% + +-define(ALLOCATE_RETRY_TIMEOUT, 1_000). + +init({DB, Opts}) -> + _ = erlang:process_flag(trap_exit, true), + _ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}), + State = #{db => DB, opts => Opts, status => allocating}, + case allocate_shards(State) of + {ok, NState} -> + {ok, NState}; + {error, Data} -> + _ = logger:notice( + Data#{ + msg => "Shard allocation still in progress", + retry_in => ?ALLOCATE_RETRY_TIMEOUT + } + ), + {ok, State, ?ALLOCATE_RETRY_TIMEOUT} + end. + +handle_call(_Call, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Cast, State) -> + {noreply, State}. + +handle_info(timeout, State) -> + case allocate_shards(State) of + {ok, NState} -> + {noreply, NState}; + {error, Data} -> + _ = logger:notice( + Data#{ + msg => "Shard allocation still in progress", + retry_in => ?ALLOCATE_RETRY_TIMEOUT + } + ), + {noreply, State, ?ALLOCATE_RETRY_TIMEOUT} + end; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #{db := DB, shards := Shards}) -> + erase_db_meta(DB), + erase_shards_meta(DB, Shards); +terminate(_Reason, #{}) -> + ok. + +%% + +allocate_shards(State = #{db := DB, opts := Opts}) -> + case emqx_ds_replication_layer_meta:allocate_shards(DB, Opts) of + {ok, Shards} -> + logger:notice(#{msg => "Shards allocated", shards => Shards}), + ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)), + ok = start_egresses(DB, Shards), + ok = save_db_meta(DB, Shards), + ok = save_shards_meta(DB, Shards), + {ok, State#{shards => Shards, status := ready}}; + {error, Reason} -> + {error, Reason} + end. + +start_shards(DB, Shards) -> + ok = lists:foreach( + fun(Shard) -> + ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard}) + end, + Shards + ), + ok = logger:info(#{msg => "Shards started", shards => Shards}), + ok. + +start_egresses(DB, Shards) -> + ok = lists:foreach( + fun(Shard) -> + ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard}) + end, + Shards + ), + logger:info(#{msg => "Egresses started", shards => Shards}), + ok. + +save_db_meta(DB, Shards) -> + persistent_term:put(?db_meta(DB), #{ + shards => Shards, + n_shards => length(Shards) + }). + +save_shards_meta(DB, Shards) -> + lists:foreach(fun(Shard) -> save_shard_meta(DB, Shard) end, Shards). + +save_shard_meta(DB, Shard) -> + Servers = emqx_ds_replication_layer_shard:shard_servers(DB, Shard), + persistent_term:put(?shard_meta(DB, Shard), #{ + servers => Servers + }). + +erase_db_meta(DB) -> + persistent_term:erase(?db_meta(DB)). + +erase_shards_meta(DB, Shards) -> + lists:foreach(fun(Shard) -> erase_shard_meta(DB, Shard) end, Shards). + +erase_shard_meta(DB, Shard) -> + persistent_term:erase(?shard_meta(DB, Shard)). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 64984e1d8..594854d21 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -137,6 +137,9 @@ -include("emqx_ds_bitmask.hrl"). +-define(DIM_TOPIC, 1). +-define(DIM_TS, 2). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -160,8 +163,8 @@ create(_ShardId, DBHandle, GenId, Options) -> %% Get options: BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64), TopicIndexBytes = maps:get(topic_index_bytes, Options, 4), - %% 10 bits -> 1024 ms -> ~1 sec - TSOffsetBits = maps:get(epoch_bits, Options, 10), + %% 20 bits -> 1048576 us -> ~1 sec + TSOffsetBits = maps:get(epoch_bits, Options, 20), %% Create column families: DataCFName = data_cf(GenId), TrieCFName = trie_cf(GenId), @@ -242,16 +245,19 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{}) -> ok. -spec store_batch( - emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts() + emqx_ds_storage_layer:shard_id(), + s(), + [{emqx_ds:time(), emqx_types:message()}], + emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> {ok, Batch} = rocksdb:batch(), lists:foreach( - fun(Msg) -> - {Key, _} = make_key(S, Msg), + fun({Timestamp, Msg}) -> + {Key, _} = make_key(S, Timestamp, Msg), Val = serialize(Msg), - rocksdb:batch_put(Batch, Data, Key, Val) + rocksdb:put(DB, Data, Key, Val, []) end, Messages ), @@ -345,7 +351,7 @@ next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know %% the current time to compute it. - Now = emqx_message:timestamp_now(), + Now = emqx_ds:timestamp_us(), SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, next_until(Schema, It, SafeCutoffTime, BatchSize). @@ -436,9 +442,7 @@ prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Key %% Make filter: Inequations = [ {'=', TopicIndex}, - {StartTime, '..', SafeCutoffTime - 1}, - %% Unique integer: - any + {StartTime, '..', SafeCutoffTime - 1} %% Varying topic levels: | lists:map( fun @@ -483,39 +487,44 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> true = Key1 > Key0, case rocksdb:iterator_move(ITHandle, {seek, Key1}) of {ok, Key, Val} -> - {N, It, Acc} = - traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N0), + {N, It, Acc} = traverse_interval( + ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N0 + ), next_loop(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N); {error, invalid_iterator} -> {ok, It0, lists:reverse(Acc0)} end end. -traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) -> +traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) -> It = It0#{?last_seen_key := Key}, - case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of + Timestamp = emqx_ds_bitmask_keymapper:bin_key_to_coord(KeyMapper, Key, ?DIM_TS), + case + emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) andalso + check_timestamp(Cutoff, It, Timestamp) + of true -> Msg = deserialize(Val), - case check_message(Cutoff, It, Msg) of + case check_message(It, Msg) of true -> Acc = [{Key, Msg} | Acc0], - traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N - 1); + traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N - 1); false -> - traverse_interval(ITHandle, Filter, Cutoff, It, Acc0, N); - overflow -> - {0, It0, Acc0} + traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc0, N) end; + overflow -> + {0, It0, Acc0}; false -> {N, It, Acc0} end. -traverse_interval(_ITHandle, _Filter, _Cutoff, It, Acc, 0) -> +traverse_interval(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) -> {0, It, Acc}; -traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N) -> +traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N) -> inc_counter(), case rocksdb:iterator_move(ITHandle, next) of {ok, Key, Val} -> - traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It, Acc, N); + traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It, Acc, N); {error, invalid_iterator} -> {0, It, Acc} end. @@ -564,6 +573,7 @@ delete_traverse_interval(LoopContext0) -> storage_iter := It0, current_key := Key, current_val := Val, + keymapper := KeyMapper, filter := Filter, safe_cutoff_time := Cutoff, selector := Selector, @@ -574,10 +584,14 @@ delete_traverse_interval(LoopContext0) -> remaining := Remaining0 } = LoopContext0, It = It0#{?last_seen_key := Key}, - case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of + Timestamp = emqx_ds_bitmask_keymapper:bin_key_to_coord(KeyMapper, Key, ?DIM_TS), + case + emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) andalso + check_timestamp(Cutoff, It, Timestamp) + of true -> Msg = deserialize(Val), - case check_message(Cutoff, It, Msg) of + case check_message(It, Msg) of true -> case Selector(Msg) of true -> @@ -590,10 +604,10 @@ delete_traverse_interval(LoopContext0) -> delete_traverse_interval1(LoopContext0#{remaining := Remaining0 - 1}) end; false -> - delete_traverse_interval1(LoopContext0); - overflow -> - {0, It0, AccDel0, AccIter0} + delete_traverse_interval1(LoopContext0) end; + overflow -> + {0, It0, AccDel0, AccIter0}; false -> {Remaining0, It, AccDel0, AccIter0} end. @@ -621,39 +635,28 @@ delete_traverse_interval1(LoopContext0) -> {0, It, AccDel, AccIter} end. --spec check_message(emqx_ds:time(), iterator() | delete_iterator(), emqx_types:message()) -> +-spec check_timestamp(emqx_ds:time(), iterator() | delete_iterator(), emqx_ds:time()) -> true | false | overflow. -check_message( - Cutoff, - _It, - #message{timestamp = Timestamp} -) when Timestamp >= Cutoff -> +check_timestamp(Cutoff, _It, Timestamp) when Timestamp >= Cutoff -> %% We hit the current epoch, we can't continue iterating over it yet. %% It would be unsafe otherwise: messages can be stored in the current epoch %% concurrently with iterating over it. They can end up earlier (in the iteration %% order) due to the nature of keymapping, potentially causing us to miss them. overflow; -check_message( - _Cutoff, - #{?tag := ?IT, ?start_time := StartTime, ?topic_filter := TopicFilter}, - #message{timestamp = Timestamp, topic = Topic} -) when Timestamp >= StartTime -> - emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter); -check_message( - _Cutoff, - #{?tag := ?DELETE_IT, ?start_time := StartTime, ?topic_filter := TopicFilter}, - #message{timestamp = Timestamp, topic = Topic} -) when Timestamp >= StartTime -> - emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter); -check_message(_Cutoff, _It, _Msg) -> - false. +check_timestamp(_Cutoff, #{?start_time := StartTime}, Timestamp) -> + Timestamp >= StartTime. + +-spec check_message(iterator() | delete_iterator(), emqx_types:message()) -> + true | false. +check_message(#{?topic_filter := TopicFilter}, #message{topic = Topic}) -> + emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter). format_key(KeyMapper, Key) -> Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)], lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])). --spec make_key(s(), emqx_types:message()) -> {binary(), [binary()]}. -make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) -> +-spec make_key(s(), emqx_ds:time(), emqx_types:message()) -> {binary(), [binary()]}. +make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, #message{topic = TopicBin}) -> Tokens = emqx_topic:words(TopicBin), {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), VaryingHashes = [hash_topic_level(I) || I <- Varying], @@ -666,11 +669,10 @@ make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestam ]) -> binary(). make_key(KeyMapper, TopicIndex, Timestamp, Varying) -> - UniqueInteger = erlang:unique_integer([monotonic, positive]), emqx_ds_bitmask_keymapper:key_to_bitstring( KeyMapper, emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, [ - TopicIndex, Timestamp, UniqueInteger | Varying + TopicIndex, Timestamp | Varying ]) ). @@ -723,13 +725,12 @@ deserialize(Blob) -> %% erlfmt-ignore make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) -> Bitsources = - %% Dimension Offset Bitsize - [{1, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index - {2, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch - [{3 + I, 0, BitsPerTopicLevel } %% Varying topic levels + %% Dimension Offset Bitsize + [{?DIM_TOPIC, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index + {?DIM_TS, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch + [{?DIM_TS + I, 0, BitsPerTopicLevel } %% Varying topic levels || I <- lists:seq(1, N)] ++ - [{2, 0, TSOffsetBits }, %% Timestamp offset - {3, 0, 64 }], %% Unique integer + [{?DIM_TS, 0, TSOffsetBits }], %% Timestamp offset Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)), %% Assert: case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 1ab0df580..69f5b8231 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -29,8 +29,8 @@ update_iterator/3, next/3, delete_next/4, - update_config/2, - add_generation/1, + update_config/3, + add_generation/2, list_generations_with_lifetimes/1, drop_generation/2 ]). @@ -133,7 +133,7 @@ cf_refs := cf_refs(), %% Time at which this was created. Might differ from `since', in particular for the %% first generation. - created_at := emqx_ds:time(), + created_at := emqx_message:timestamp(), %% When should this generation become active? %% This generation should only contain messages timestamped no earlier than that. %% The very first generation will have `since` equal 0. @@ -194,7 +194,12 @@ -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> ok | {error, _Reason}. --callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) -> +-callback store_batch( + shard_id(), + _Data, + [{emqx_ds:time(), emqx_types:message()}], + emqx_ds:message_store_opts() +) -> emqx_ds:store_batch_result(). -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> @@ -219,6 +224,9 @@ %% API for the replication layer %%================================================================================ +%% Note: we specify gen_server requests as records to make use of Dialyzer: +-record(call_add_generation, {since :: emqx_ds:time()}). +-record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}). -record(call_list_generations_with_lifetimes, {}). -record(call_drop_generation, {gen_id :: gen_id()}). @@ -230,7 +238,11 @@ open_shard(Shard, Options) -> drop_shard(Shard) -> ok = rocksdb:destroy(db_dir(Shard), []). --spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) -> +-spec store_batch( + shard_id(), + [{emqx_ds:time(), emqx_types:message()}], + emqx_ds:message_store_opts() +) -> emqx_ds:store_batch_result(). store_batch(Shard, Messages, Options) -> %% We always store messages in the current generation: @@ -398,13 +410,16 @@ delete_next( {ok, end_of_stream} end. --spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok. -update_config(ShardId, Options) -> - gen_server:call(?REF(ShardId), {?FUNCTION_NAME, Options}, infinity). +-spec update_config(shard_id(), emqx_ds:time(), emqx_ds:create_db_opts()) -> + ok | {error, overlaps_existing_generations}. +update_config(ShardId, Since, Options) -> + Call = #call_update_config{since = Since, options = Options}, + gen_server:call(?REF(ShardId), Call, infinity). --spec add_generation(shard_id()) -> ok. -add_generation(ShardId) -> - gen_server:call(?REF(ShardId), add_generation, infinity). +-spec add_generation(shard_id(), emqx_ds:time()) -> + ok | {error, overlaps_existing_generations}. +add_generation(ShardId, Since) -> + gen_server:call(?REF(ShardId), #call_add_generation{since = Since}, infinity). -spec list_generations_with_lifetimes(shard_id()) -> #{ @@ -438,9 +453,6 @@ start_link(Shard = {_, _}, Options) -> shard :: shard() }). -%% Note: we specify gen_server requests as records to make use of Dialyzer: --record(call_create_generation, {since :: emqx_ds:time()}). - -type server_state() :: #s{}. -define(DEFAULT_CF, "default"). @@ -470,18 +482,22 @@ init({ShardId, Options}) -> commit_metadata(S), {ok, S}. -handle_call({update_config, Options}, _From, #s{schema = Schema} = S0) -> - Prototype = maps:get(storage, Options), - S1 = S0#s{schema = Schema#{prototype := Prototype}}, - Since = emqx_message:timestamp_now(), - S = add_generation(S1, Since), - commit_metadata(S), - {reply, ok, S}; -handle_call(add_generation, _From, S0) -> - Since = emqx_message:timestamp_now(), - S = add_generation(S0, Since), - commit_metadata(S), - {reply, ok, S}; +handle_call(#call_update_config{since = Since, options = Options}, _From, S0) -> + case handle_update_config(S0, Since, Options) of + S = #s{} -> + commit_metadata(S), + {reply, ok, S}; + Error = {error, _} -> + {reply, Error, S0} + end; +handle_call(#call_add_generation{since = Since}, _From, S0) -> + case handle_add_generation(S0, Since) of + S = #s{} -> + commit_metadata(S), + {reply, ok, S}; + Error = {error, _} -> + {reply, Error, S0} + end; handle_call(#call_list_generations_with_lifetimes{}, _From, S) -> Generations = handle_list_generations_with_lifetimes(S), {reply, Generations, S}; @@ -489,10 +505,6 @@ handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) -> {Reply, S} = handle_drop_generation(S0, GenId), commit_metadata(S), {reply, Reply, S}; -handle_call(#call_create_generation{since = Since}, _From, S0) -> - S = add_generation(S0, Since), - commit_metadata(S), - {reply, ok, S}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -528,11 +540,10 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) -> ShardSchema ). --spec add_generation(server_state(), emqx_ds:time()) -> server_state(). -add_generation(S0, Since) -> +-spec handle_add_generation(server_state(), emqx_ds:time()) -> + server_state() | {error, overlaps_existing_generations}. +handle_add_generation(S0, Since) -> #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0, - Schema1 = update_last_until(Schema0, Since), - Shard1 = update_last_until(Shard0, Since), #{current_generation := OldGenId, prototype := {CurrentMod, _ModConf}} = Schema0, OldKey = ?GEN_KEY(OldGenId), @@ -540,39 +551,53 @@ add_generation(S0, Since) -> #{cf_refs := OldCFRefs} = OldGenSchema, #{OldKey := #{module := OldMod, data := OldGenData}} = Shard0, - {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since), + Schema1 = update_last_until(Schema0, Since), + Shard1 = update_last_until(Shard0, Since), - CFRefs = NewCFRefs ++ CFRefs0, - Key = ?GEN_KEY(GenId), - Generation0 = - #{data := NewGenData0} = - open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)), + case Schema1 of + _Updated = #{} -> + {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since), + CFRefs = NewCFRefs ++ CFRefs0, + Key = ?GEN_KEY(GenId), + Generation0 = + #{data := NewGenData0} = + open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)), + %% When the new generation's module is the same as the last one, we might want to + %% perform actions like inheriting some of the previous (meta)data. + NewGenData = + run_post_creation_actions( + #{ + shard_id => ShardId, + db => DB, + new_gen_id => GenId, + old_gen_id => OldGenId, + new_cf_refs => NewCFRefs, + old_cf_refs => OldCFRefs, + new_gen_runtime_data => NewGenData0, + old_gen_runtime_data => OldGenData, + new_module => CurrentMod, + old_module => OldMod + } + ), + Generation = Generation0#{data := NewGenData}, + Shard = Shard1#{current_generation := GenId, Key => Generation}, + S0#s{ + cf_refs = CFRefs, + schema = Schema, + shard = Shard + }; + {error, exists} -> + S0; + {error, Reason} -> + {error, Reason} + end. - %% When the new generation's module is the same as the last one, we might want to - %% perform actions like inheriting some of the previous (meta)data. - NewGenData = - run_post_creation_actions( - #{ - shard_id => ShardId, - db => DB, - new_gen_id => GenId, - old_gen_id => OldGenId, - new_cf_refs => NewCFRefs, - old_cf_refs => OldCFRefs, - new_gen_runtime_data => NewGenData0, - old_gen_runtime_data => OldGenData, - new_module => CurrentMod, - old_module => OldMod - } - ), - Generation = Generation0#{data := NewGenData}, - - Shard = Shard1#{current_generation := GenId, Key => Generation}, - S0#s{ - cf_refs = CFRefs, - schema = Schema, - shard = Shard - }. +-spec handle_update_config(server_state(), emqx_ds:time(), emqx_ds:create_db_opts()) -> + server_state() | {error, overlaps_existing_generations}. +handle_update_config(S0 = #s{schema = Schema}, Since, Options) -> + Prototype = maps:get(storage, Options), + S = S0#s{schema = Schema#{prototype := Prototype}}, + handle_add_generation(S, Since). -spec handle_list_generations_with_lifetimes(server_state()) -> #{gen_id() => map()}. handle_list_generations_with_lifetimes(#s{schema = ShardSchema}) -> @@ -652,7 +677,7 @@ new_generation(ShardId, DB, Schema0, Since) -> module => Mod, data => GenData, cf_refs => NewCFRefs, - created_at => emqx_message:timestamp_now(), + created_at => erlang:system_time(millisecond), since => Since, until => undefined }, @@ -703,12 +728,19 @@ rocksdb_open(Shard, Options) -> db_dir({DB, ShardId}) -> filename:join([emqx_ds:base_dir(), atom_to_list(DB), binary_to_list(ShardId)]). --spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard(). -update_last_until(Schema, Until) -> - #{current_generation := GenId} = Schema, - GenData0 = maps:get(?GEN_KEY(GenId), Schema), - GenData = GenData0#{until := Until}, - Schema#{?GEN_KEY(GenId) := GenData}. +-spec update_last_until(Schema, emqx_ds:time()) -> + Schema | {error, exists | overlaps_existing_generations} +when + Schema :: shard_schema() | shard(). +update_last_until(Schema = #{current_generation := GenId}, Until) -> + case maps:get(?GEN_KEY(GenId), Schema) of + GenData = #{since := CurrentSince} when CurrentSince < Until -> + Schema#{?GEN_KEY(GenId) := GenData#{until := Until}}; + #{since := Until} -> + {error, exists}; + #{since := CurrentSince} when CurrentSince > Until -> + {error, overlaps_existing_generations} + end. run_post_creation_actions( #{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index c9b5bad60..7aa54b9f3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -117,9 +117,8 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := tru Res; store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> lists:foreach( - fun(Msg) -> - Id = erlang:unique_integer([monotonic]), - Key = <>, + fun({Timestamp, Msg}) -> + Key = <>, Val = term_to_binary(Msg), rocksdb:put(DB, CF, Key, Val, []) end, @@ -210,8 +209,8 @@ do_next(_, _, _, _, 0, Key, Acc) -> {Key, Acc}; do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) -> case rocksdb:iterator_move(IT, Action) of - {ok, Key, Blob} -> - Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob), + {ok, Key = <>, Blob} -> + Msg = #message{topic = Topic} = binary_to_term(Blob), TopicWords = emqx_topic:words(Topic), case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of true -> diff --git a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src index 2d9d932c0..191ed9a2d 100644 --- a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src +++ b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src @@ -5,7 +5,7 @@ {vsn, "0.1.12"}, {modules, []}, {registered, []}, - {applications, [kernel, stdlib, rocksdb, gproc, mria, emqx_utils]}, + {applications, [kernel, stdlib, rocksdb, gproc, mria, ra, emqx_utils]}, {mod, {emqx_ds_app, []}}, {env, []} ]}. diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 082ed2dff..64d81307c 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -31,7 +31,9 @@ opts() -> backend => builtin, storage => {emqx_ds_storage_reference, #{}}, n_shards => ?N_SHARDS, - replication_factor => 3 + n_sites => 1, + replication_factor => 3, + replication_options => #{} }. %% A simple smoke test that verifies that opening/closing the DB @@ -51,13 +53,8 @@ t_00_smoke_open_drop(_Config) -> lists:foreach( fun(Shard) -> ?assertEqual( - {ok, []}, emqx_ds_replication_layer_meta:replica_set(DB, Shard) - ), - ?assertEqual( - [Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) - ), - %% Check that the leader is eleected; - ?assertEqual({ok, node()}, emqx_ds_replication_layer_meta:shard_leader(DB, Shard)) + {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard) + ) end, Shards ), diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 9a47b2b5f..636b57b89 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -29,7 +29,9 @@ backend => builtin, storage => {emqx_ds_storage_bitfield_lts, #{}}, n_shards => 1, - replication_factor => 1 + n_sites => 1, + replication_factor => 1, + replication_options => #{} }). -define(COMPACT_CONFIG, #{ @@ -54,7 +56,7 @@ t_store(_Config) -> payload = Payload, timestamp = PublishedAt }, - ?assertMatch(ok, emqx_ds_storage_layer:store_batch(?SHARD, [Msg], #{})). + ?assertMatch(ok, emqx_ds_storage_layer:store_batch(?SHARD, [{PublishedAt, Msg}], #{})). %% Smoke test for iteration through a concrete topic t_iterate(_Config) -> @@ -62,7 +64,7 @@ t_iterate(_Config) -> Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>], Timestamps = lists:seq(1, 10), Batch = [ - make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) + {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), @@ -90,7 +92,7 @@ t_delete(_Config) -> Topics = [<<"foo/bar">>, TopicToDelete, <<"a">>], Timestamps = lists:seq(1, 10), Batch = [ - make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) + {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), @@ -121,7 +123,7 @@ t_get_streams(_Config) -> Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>], Timestamps = lists:seq(1, 10), Batch = [ - make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) + {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), @@ -147,7 +149,7 @@ t_get_streams(_Config) -> NewBatch = [ begin B = integer_to_binary(I), - make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>) + {100, make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>)} end || I <- lists:seq(1, 200) ], @@ -176,12 +178,8 @@ t_new_generation_inherit_trie(_Config) -> Timestamps = lists:seq(1, 10_000, 100), Batch = [ begin - B = integer_to_binary(I), - make_message( - TS, - <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>, - integer_to_binary(TS) - ) + Topic = emqx_topic:join(["wildcard", integer_to_binary(I), "suffix", Suffix]), + {TS, make_message(TS, Topic, integer_to_binary(TS))} end || I <- lists:seq(1, 200), TS <- Timestamps, @@ -190,7 +188,7 @@ t_new_generation_inherit_trie(_Config) -> ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), %% Now we create a new generation with the same LTS module. It should inherit the %% learned trie. - ok = emqx_ds_storage_layer:add_generation(?SHARD), + ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1000), ok end, fun(Trace) -> @@ -205,23 +203,21 @@ t_replay(_Config) -> Topics = [<<"foo/bar">>, <<"foo/bar/baz">>], Timestamps = lists:seq(1, 10_000, 100), Batch1 = [ - make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) + {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar': Batch2 = [ begin - B = integer_to_binary(I), - make_message( - TS, <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>, integer_to_binary(TS) - ) + Topic = emqx_topic:join(["wildcard", integer_to_list(I), "suffix", Suffix]), + {TS, make_message(TS, Topic, integer_to_binary(TS))} end || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), %% Check various topic filters: - Messages = Batch1 ++ Batch2, + Messages = [M || {_TS, M} <- Batch1 ++ Batch2], %% Missing topics (no ghost messages): ?assertNot(check(?SHARD, <<"missing/foo/bar">>, 0, Messages)), %% Regular topics: @@ -479,18 +475,6 @@ make_message(PublishedAt, Topic, Payload) when is_binary(Topic) -> payload = Payload }. -store(Shard, PublishedAt, TopicL, Payload) when is_list(TopicL) -> - store(Shard, PublishedAt, list_to_binary(TopicL), Payload); -store(Shard, PublishedAt, Topic, Payload) -> - ID = emqx_guid:gen(), - Msg = #message{ - id = ID, - topic = Topic, - timestamp = PublishedAt, - payload = Payload - }, - emqx_ds_storage_layer:message_store(Shard, [Msg], #{}). - payloads(Messages) -> lists:map( fun(#message{payload = P}) -> diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index a1bd9650e..777ad2959 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -31,7 +31,7 @@ -endif. %% These apps are always (re)started by emqx_machine: --define(BASIC_REBOOT_APPS, [gproc, esockd, ranch, cowboy, emqx]). +-define(BASIC_REBOOT_APPS, [gproc, esockd, ranch, cowboy, emqx_durable_storage, emqx]). %% If any of these applications crash, the entire EMQX node shuts down: -define(BASIC_PERMANENT_APPS, [mria, ekka, esockd, emqx]). diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 791a4de16..fac536532 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -22,7 +22,9 @@ list/1, mqueue/1, map/2, - chain/2 + transpose/1, + chain/2, + repeat/1 ]). %% Evaluating @@ -91,6 +93,31 @@ map(F, S) -> end end. +%% @doc Transpose a list of streams into a stream producing lists of their respective values. +%% The resulting stream is as long as the shortest of the input streams. +-spec transpose([stream(X)]) -> stream([X]). +transpose([S]) -> + map(fun(X) -> [X] end, S); +transpose([S | Streams]) -> + transpose_tail(S, transpose(Streams)); +transpose([]) -> + empty(). + +transpose_tail(S, Tail) -> + fun() -> + case next(S) of + [X | SRest] -> + case next(Tail) of + [Xs | TailRest] -> + [[X | Xs] | transpose_tail(SRest, TailRest)]; + [] -> + [] + end; + [] -> + [] + end + end. + %% @doc Make a stream by chaining (concatenating) two streams. %% The second stream begins to produce values only after the first one is exhausted. -spec chain(stream(X), stream(Y)) -> stream(X | Y). @@ -104,6 +131,19 @@ chain(SFirst, SThen) -> end end. +%% @doc Make an infinite stream out of repeats of given stream. +%% If the given stream is empty, the resulting stream is also empty. +-spec repeat(stream(X)) -> stream(X). +repeat(S) -> + fun() -> + case next(S) of + [X | SRest] -> + [X | chain(SRest, repeat(S))]; + [] -> + [] + end + end. + %% %% @doc Produce the next value from the stream. diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index 814d6fe0d..60b67a4ff 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -74,6 +74,80 @@ chain_list_map_test() -> emqx_utils_stream:consume(S) ). +transpose_test() -> + S = emqx_utils_stream:transpose([ + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:list([4, 5, 6, 7]) + ]), + ?assertEqual( + [[1, 4], [2, 5], [3, 6]], + emqx_utils_stream:consume(S) + ). + +transpose_none_test() -> + ?assertEqual( + [], + emqx_utils_stream:consume(emqx_utils_stream:transpose([])) + ). + +transpose_one_test() -> + S = emqx_utils_stream:transpose([emqx_utils_stream:list([1, 2, 3])]), + ?assertEqual( + [[1], [2], [3]], + emqx_utils_stream:consume(S) + ). + +transpose_many_test() -> + S = emqx_utils_stream:transpose([ + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:list([4, 5, 6, 7]), + emqx_utils_stream:list([8, 9]) + ]), + ?assertEqual( + [[1, 4, 8], [2, 5, 9]], + emqx_utils_stream:consume(S) + ). + +transpose_many_empty_test() -> + S = emqx_utils_stream:transpose([ + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:list([4, 5, 6, 7]), + emqx_utils_stream:empty() + ]), + ?assertEqual( + [], + emqx_utils_stream:consume(S) + ). + +repeat_test() -> + S = emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2, 3])), + ?assertMatch( + {[1, 2, 3, 1, 2, 3, 1, 2], _}, + emqx_utils_stream:consume(8, S) + ), + {_, SRest} = emqx_utils_stream:consume(8, S), + ?assertMatch( + {[3, 1, 2, 3, 1, 2, 3, 1], _}, + emqx_utils_stream:consume(8, SRest) + ). + +repeat_empty_test() -> + S = emqx_utils_stream:repeat(emqx_utils_stream:list([])), + ?assertEqual( + [], + emqx_utils_stream:consume(8, S) + ). + +transpose_repeat_test() -> + S = emqx_utils_stream:transpose([ + emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2])), + emqx_utils_stream:list([4, 5, 6, 7, 8]) + ]), + ?assertEqual( + [[1, 4], [2, 5], [1, 6], [2, 7], [1, 8]], + emqx_utils_stream:consume(S) + ). + mqueue_test() -> _ = erlang:send_after(1, self(), 1), _ = erlang:send_after(100, self(), 2), diff --git a/mix.exs b/mix.exs index a2f4de17e..251a1c5f4 100644 --- a/mix.exs +++ b/mix.exs @@ -100,7 +100,8 @@ defmodule EMQXUmbrella.MixProject do {:rfc3339, github: "emqx/rfc3339", tag: "0.2.3", override: true}, {:bcrypt, github: "emqx/erlang-bcrypt", tag: "0.6.2", override: true}, {:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true}, - {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true} + {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true}, + {:ra, "2.7.3", override: true} ] ++ emqx_apps(profile_info, version) ++ enterprise_deps(profile_info) ++ jq_dep() ++ quicer_dep() diff --git a/rebar.config b/rebar.config index 126ce9532..a38aa85b6 100644 --- a/rebar.config +++ b/rebar.config @@ -110,7 +110,8 @@ {uuid, {git, "https://github.com/okeuday/uuid.git", {tag, "v2.0.6"}}}, {ssl_verify_fun, "1.1.7"}, {rfc3339, {git, "https://github.com/emqx/rfc3339.git", {tag, "0.2.3"}}}, - {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.2"}}} + {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.2"}}}, + {ra, "2.7.3"} ]}. {xref_ignores, diff --git a/rel/i18n/emqx_ds_schema.hocon b/rel/i18n/emqx_ds_schema.hocon index 89a276275..11d25ebe4 100644 --- a/rel/i18n/emqx_ds_schema.hocon +++ b/rel/i18n/emqx_ds_schema.hocon @@ -30,6 +30,15 @@ builtin_n_shards.desc: Please note that it takes effect only during the initialization of the durable storage database. Changing this configuration parameter after the database has been already created won't take any effect.~""" +builtin_n_sites.label: "Initial number of sites" +builtin_n_sites.desc: + """~ + Number of storage sites that need to share responsibility over the set of storage shards. + In this context, sites are essentially EMQX nodes that have message durability enabled. + Please note that it takes effect only during the initialization of the durable storage database. + During this phase at least that many sites should come online to distribute shards between them, otherwise message storage will be unavailable until then. + After the initialization is complete, sites may be offline, which will affect availability depending on the number of offline sites and replication factor.~""" + builtin_local_write_buffer.label: "Local write buffer" builtin_local_write_buffer.desc: """~