Compare commits

...

21 Commits

Author SHA1 Message Date
Andrew Mayorov b94ed120ed
wip: do not store sessions in rocksdb 2024-02-29 21:15:01 +01:00
Andrew Mayorov 19cc7c34a5
wip: disable rocksdb wal 2024-02-29 21:06:08 +01:00
Andrew Mayorov 475efc93b3
wip: allow to tune select ra options 2024-02-29 17:19:38 +01:00
Andrew Mayorov da6844957e
wip: allocate shards predictably 2024-02-27 21:38:42 +01:00
Andrew Mayorov 41f1fd8ebc
feat(utils-stream): add a few more stream combinators 2024-02-27 21:38:42 +01:00
Andrew Mayorov 574746b190
wip: tune egress draining strategy 2024-02-22 18:41:10 +01:00
Andrew Mayorov 1895b250e9
wip: provide persist feedback to broker + alternative egress impl 2024-02-20 20:33:57 +01:00
Andrew Mayorov 83dd6a2896
wip: handle errors gracefully in shard egress process
Also add cooldown on timeout / unavailability.
2024-02-19 19:09:22 +01:00
Andrew Mayorov 77c65266d0
wip: clarify how to perform leadership transfer in runtime 2024-02-19 19:07:59 +01:00
Andrew Mayorov 0e832db1d4
wip: catch and mask badrpcs in the read path 2024-02-15 16:51:52 +01:00
Andrew Mayorov 58bd42bfc1
wip: prefer local replica in read path 2024-02-12 19:24:42 +01:00
Andrew Mayorov 3a6b4b57d7
wip: require majority for replication-related tables 2024-02-12 19:24:42 +01:00
Andrew Mayorov 5ab036a5d2
wip: tolerate trigger election timeouts for existing servers 2024-02-12 19:24:42 +01:00
Andrew Mayorov 114efb9818
fix(boot): add `emqx_durable_storage` to the list of restarted apps 2024-02-12 19:24:42 +01:00
Andrew Mayorov 9d94ee73de
wip: move shard allocation to separate process
That starts shard and egress processes only when shards are fully
allocated.
2024-02-12 19:24:42 +01:00
Andrew Mayorov 1b107d578c
wip: make db + shard part of machine state
It doesn't feel right, but right now is the easiest way to have it
in the scope of `apply/3`, because `init/1` doesn't actually invoked
for ra machines recovered from the existing log / snapshot.
2024-02-12 19:24:42 +01:00
Andrew Mayorov 5e32fe4355
wip: cache shard metadata in persistent terms 2024-02-12 19:24:41 +01:00
Andrew Mayorov 7fa3bbf176
wip: manage generations / db config through ra machine 2024-02-12 19:24:41 +01:00
Andrew Mayorov 46e8118e1c
wip: allocate shards only when predefined number of sites online 2024-02-12 19:24:41 +01:00
Andrew Mayorov fcb5ed346f
wip: reassign timestamp at the time of submission 2024-02-12 19:24:41 +01:00
Andrew Mayorov bb0cf62879
wip: implement raft-based replication 2024-02-12 19:24:41 +01:00
25 changed files with 957 additions and 335 deletions

View File

@ -250,9 +250,11 @@ persist_publish(Msg) ->
case emqx_persistent_message:persist(Msg) of case emqx_persistent_message:persist(Msg) of
ok -> ok ->
[persisted]; [persisted];
{_SkipOrError, _Reason} -> {skipped, _Reason} ->
[];
{error, _Reason} = Error ->
% TODO: log errors? % TODO: log errors?
[] [Error]
end. end.
%% Called internally %% Called internally

View File

@ -745,6 +745,9 @@ ensure_quota(PubRes, Channel = #channel{quota = Limiter}) ->
pubrec_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; pubrec_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
pubrec_reason_code([_ | _]) -> ?RC_SUCCESS. pubrec_reason_code([_ | _]) -> ?RC_SUCCESS.
puback_reason_code(_PacketId, _Msg, [{error, _Reason}]) ->
%% FIXME
undefined;
puback_reason_code(PacketId, Msg, [] = PubRes) -> puback_reason_code(PacketId, Msg, [] = PubRes) ->
emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_NO_MATCHING_SUBSCRIBERS); emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_NO_MATCHING_SUBSCRIBERS);
puback_reason_code(PacketId, Msg, [_ | _] = PubRes) -> puback_reason_code(PacketId, Msg, [_ | _] = PubRes) ->

View File

@ -66,6 +66,7 @@
-export([ -export([
is_expired/2, is_expired/2,
set_timestamp/2,
update_expiry/1, update_expiry/1,
timestamp_now/0 timestamp_now/0
]). ]).
@ -288,6 +289,10 @@ is_expired(#message{timestamp = CreatedAt}, Zone) ->
Interval -> elapsed(CreatedAt) > Interval Interval -> elapsed(CreatedAt) > Interval
end. end.
-spec set_timestamp(integer(), emqx_types:message()) -> emqx_types:message().
set_timestamp(Timestamp, Msg) ->
Msg#message{timestamp = Timestamp}.
-spec update_expiry(emqx_types:message()) -> emqx_types:message(). -spec update_expiry(emqx_types:message()) -> emqx_types:message().
update_expiry( update_expiry(
Msg = #message{ Msg = #message{

View File

@ -61,9 +61,10 @@ force_ds() ->
emqx_config:get([session_persistence, force_persistence]). emqx_config:get([session_persistence, force_persistence]).
storage_backend(#{ storage_backend(#{
builtin := #{ builtin := Backend = #{
enable := true, enable := true,
n_shards := NShards, n_shards := NShards,
n_sites := NSites,
replication_factor := ReplicationFactor replication_factor := ReplicationFactor
} }
}) -> }) ->
@ -71,7 +72,9 @@ storage_backend(#{
backend => builtin, backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}}, storage => {emqx_ds_storage_bitfield_lts, #{}},
n_shards => NShards, n_shards => NShards,
replication_factor => ReplicationFactor n_sites => NSites,
replication_factor => ReplicationFactor,
replication_options => maps:get(replication_options, Backend, #{})
}; };
storage_backend(#{ storage_backend(#{
fdb := #{enable := true} = FDBConfig fdb := #{enable := true} = FDBConfig
@ -97,7 +100,10 @@ needs_persistence(Msg) ->
-spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result(). -spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result().
store_message(Msg) -> store_message(Msg) ->
emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{sync => false}). case emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{}) of
[ok] -> ok;
[error] -> {error, timeout}
end.
has_subscribers(#message{topic = Topic}) -> has_subscribers(#message{topic = Topic}) ->
emqx_persistent_session_ds_router:has_any_route(Topic). emqx_persistent_session_ds_router:has_any_route(Topic).

View File

@ -311,7 +311,7 @@ subscribe(
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID), ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID),
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
Subscription = #{ Subscription = #{
start_time => now_ms(), start_time => emqx_ds:timestamp_us(),
props => SubOpts, props => SubOpts,
id => SubId, id => SubId,
deleted => false deleted => false

View File

@ -135,7 +135,7 @@ create_tables() ->
[ [
{rlog_shard, ?DS_MRIA_SHARD}, {rlog_shard, ?DS_MRIA_SHARD},
{type, ordered_set}, {type, ordered_set},
{storage, rocksdb_copies}, {storage, disc_copies},
{record_name, kv}, {record_name, kv},
{attributes, record_info(fields, kv)} {attributes, record_info(fields, kv)}
] ]
@ -516,7 +516,7 @@ create_kv_pmap_table(Table) ->
mria:create_table(Table, [ mria:create_table(Table, [
{type, ordered_set}, {type, ordered_set},
{rlog_shard, ?DS_MRIA_SHARD}, {rlog_shard, ?DS_MRIA_SHARD},
{storage, rocksdb_copies}, {storage, disc_copies},
{record_name, kv}, {record_name, kv},
{attributes, record_info(fields, kv)} {attributes, record_info(fields, kv)}
]). ]).

View File

@ -208,6 +208,7 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
?SLOG(debug, #{ ?SLOG(debug, #{
msg => new_stream, key => Key, stream => Stream msg => new_stream, key => Key, stream => Stream
}), }),
%% TODO: It's unlikely to fail but it's still possible.
{ok, Iterator} = emqx_ds:make_iterator( {ok, Iterator} = emqx_ds:make_iterator(
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
), ),

View File

@ -1933,7 +1933,16 @@ fields("session_storage_backend_builtin") ->
pos_integer(), pos_integer(),
#{ #{
desc => ?DESC(session_builtin_n_shards), desc => ?DESC(session_builtin_n_shards),
default => 16 %% FIXME
default => 4
}
)},
%% TODO: Minimum number of sites that will be responsible for the shards
{"n_sites",
sc(
pos_integer(),
#{
default => 1
} }
)}, )},
{"replication_factor", {"replication_factor",
@ -1944,6 +1953,14 @@ fields("session_storage_backend_builtin") ->
importance => ?IMPORTANCE_HIDDEN importance => ?IMPORTANCE_HIDDEN
} }
)}, )},
{"replication_options",
sc(
map(name, any()),
#{
default => #{},
importance => ?IMPORTANCE_HIDDEN
}
)},
{"egress_batch_size", {"egress_batch_size",
sc( sc(
pos_integer(), pos_integer(),

View File

@ -33,10 +33,6 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
%% avoid inter-suite flakiness...
%% TODO: remove after other suites start to use `emx_cth_suite'
application:stop(emqx),
application:stop(emqx_durable_storage),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
@ -390,7 +386,7 @@ t_message_gc(Config) ->
message(<<"foo/bar">>, <<"1">>, 0), message(<<"foo/bar">>, <<"1">>, 0),
message(<<"foo/baz">>, <<"2">>, 1) message(<<"foo/baz">>, <<"2">>, 1)
], ],
ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0), [ok, ok] = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0),
?tp(inserted_batch, #{}), ?tp(inserted_batch, #{}),
{ok, _} = ?block_until(#{?snk_kind := ps_message_gc_added_gen}), {ok, _} = ?block_until(#{?snk_kind := ps_message_gc_added_gen}),
@ -399,7 +395,7 @@ t_message_gc(Config) ->
message(<<"foo/bar">>, <<"3">>, Now + 100), message(<<"foo/bar">>, <<"3">>, Now + 100),
message(<<"foo/baz">>, <<"4">>, Now + 101) message(<<"foo/baz">>, <<"4">>, Now + 101)
], ],
ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1), [ok, ok] = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1),
{ok, _} = snabbkaffe:block_until( {ok, _} = snabbkaffe:block_until(
?match_n_events(NShards, #{?snk_kind := message_gc_generation_dropped}), ?match_n_events(NShards, #{?snk_kind := message_gc_generation_dropped}),
@ -524,7 +520,8 @@ app_specs(Opts) ->
]. ].
cluster() -> cluster() ->
Spec = #{role => core, apps => app_specs()}, ExtraConf = "\n session_persistence.storage.builtin.n_sites = 2",
Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})},
[ [
{persistent_messages_SUITE1, Spec}, {persistent_messages_SUITE1, Spec},
{persistent_messages_SUITE2, Spec} {persistent_messages_SUITE2, Spec}

View File

@ -39,7 +39,7 @@
-export([get_streams/3, make_iterator/4, update_iterator/3, next/3]). -export([get_streams/3, make_iterator/4, update_iterator/3, next/3]).
%% Misc. API: %% Misc. API:
-export([]). -export([timestamp_us/0]).
-export_type([ -export_type([
create_db_opts/0, create_db_opts/0,
@ -115,9 +115,11 @@
-type next_result() :: next_result(iterator()). -type next_result() :: next_result(iterator()).
%% Timestamp %% Timestamp
%% Each message must have unique timestamp.
%% Earliest possible timestamp is 0. %% Earliest possible timestamp is 0.
%% TODO granularity? Currently, we should always use milliseconds, as that's the unit we %% Granularity: microsecond.
%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. %% TODO: Currently, we should always use milliseconds, as that's the unit we
%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps.
-type time() :: non_neg_integer(). -type time() :: non_neg_integer().
-type message_store_opts() :: -type message_store_opts() ::
@ -318,6 +320,10 @@ next(DB, Iter, BatchSize) ->
%% Internal exports %% Internal exports
%%================================================================================ %%================================================================================
-spec timestamp_us() -> time().
timestamp_us() ->
erlang:system_time(microsecond).
%%================================================================================ %%================================================================================
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================

View File

@ -22,13 +22,18 @@
%% API: %% API:
-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1]). -export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1]).
-export([status/1]).
%% behaviour callbacks: %% behaviour callbacks:
-export([init/1]). -export([init/1]).
-export([handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
%% internal exports: %% internal exports:
-export([start_link_sup/2]). -export([start_link_sup/2]).
%% FIXME
-export([lookup_shard_meta/2]).
%%================================================================================ %%================================================================================
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
@ -43,6 +48,8 @@
-record(?shard_sup, {db}). -record(?shard_sup, {db}).
-record(?egress_sup, {db}). -record(?egress_sup, {db}).
-define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}).
%%================================================================================ %%================================================================================
%% API funcions %% API funcions
%%================================================================================ %%================================================================================
@ -79,6 +86,13 @@ ensure_shard(Shard) ->
{error, Reason} {error, Reason}
end. end.
status(DB) ->
State = sys:get_state(?via({allocator, DB})),
maps:get(status, State).
lookup_shard_meta(DB, Shard) ->
persistent_term:get(?shard_meta(DB, Shard)).
%%================================================================================ %%================================================================================
%% behaviour callbacks %% behaviour callbacks
%%================================================================================ %%================================================================================
@ -86,44 +100,87 @@ ensure_shard(Shard) ->
init({#?db_sup{db = DB}, DefaultOpts}) -> init({#?db_sup{db = DB}, DefaultOpts}) ->
%% Spec for the top-level supervisor for the database: %% Spec for the top-level supervisor for the database:
logger:notice("Starting DS DB ~p", [DB]), logger:notice("Starting DS DB ~p", [DB]),
_ = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
%% TODO: before the leader election is implemented, we set ourselves as the leader for all shards: ok = start_ra_system(DB, Opts),
MyShards = emqx_ds_replication_layer_meta:my_shards(DB), Children = [
lists:foreach( sup_spec(#?shard_sup{db = DB}, []),
fun(Shard) -> sup_spec(#?egress_sup{db = DB}, []),
emqx_ds_replication_layer:maybe_set_myself_as_leader(DB, Shard) shard_allocator_spec(DB, Opts)
end, ],
MyShards
),
Children = [sup_spec(#?shard_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, [])],
SupFlags = #{ SupFlags = #{
strategy => one_for_all, strategy => one_for_all,
intensity => 0, intensity => 0,
period => 1 period => 1
}, },
{ok, {SupFlags, Children}}; {ok, {SupFlags, Children}};
init({#?shard_sup{db = DB}, _}) -> init({#?shard_sup{db = _DB}, _}) ->
%% Spec for the supervisor that manages the worker processes for %% Spec for the supervisor that manages the worker processes for
%% each local shard of the DB: %% each local shard of the DB:
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
Children = [shard_spec(DB, Shard) || Shard <- MyShards],
SupFlags = #{ SupFlags = #{
strategy => one_for_one, strategy => one_for_one,
intensity => 10, intensity => 10,
period => 1 period => 1
}, },
{ok, {SupFlags, Children}}; {ok, {SupFlags, []}};
init({#?egress_sup{db = DB}, _}) -> init({#?egress_sup{db = _DB}, _}) ->
%% Spec for the supervisor that manages the egress proxy processes %% Spec for the supervisor that manages the egress proxy processes
%% managing traffic towards each of the shards of the DB: %% managing traffic towards each of the shards of the DB:
Shards = emqx_ds_replication_layer_meta:shards(DB),
Children = [egress_spec(DB, Shard) || Shard <- Shards],
SupFlags = #{ SupFlags = #{
strategy => one_for_one, strategy => one_for_one,
intensity => 0, intensity => 0,
period => 1 period => 1
}, },
{ok, {SupFlags, Children}}. {ok, {SupFlags, []}};
init({allocator, DB, Opts}) ->
_ = erlang:process_flag(trap_exit, true),
_ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}),
init_allocator(DB, Opts).
start_ra_system(DB, #{replication_options := ReplicationOpts}) ->
DataDir = filename:join([emqx:data_dir(), "dsrepl"]),
Config = lists:foldr(fun maps:merge/2, #{}, [
ra_system:default_config(),
#{
name => DB,
data_dir => DataDir,
wal_data_dir => DataDir
},
maps:with(
[
wal_max_size_bytes,
wal_max_batch_size,
wal_write_strategy,
wal_sync_method,
wal_compute_checksums
],
ReplicationOpts
)
]),
case ra_system:start(Config) of
{ok, _System} ->
ok;
{error, {already_started, _System}} ->
ok
end.
start_shards(DB, Shards, Opts) ->
SupRef = ?via(#?shard_sup{db = DB}),
lists:foreach(
fun(Shard) ->
{ok, _} = supervisor:start_child(SupRef, shard_spec(DB, Shard, Opts)),
{ok, _} = supervisor:start_child(SupRef, shard_replication_spec(DB, Shard, Opts))
end,
Shards
).
start_egresses(DB, Shards) ->
SupRef = ?via(#?egress_sup{db = DB}),
lists:foreach(
fun(Shard) ->
{ok, _} = supervisor:start_child(SupRef, egress_spec(DB, Shard))
end,
Shards
).
%%================================================================================ %%================================================================================
%% Internal exports %% Internal exports
@ -145,15 +202,34 @@ sup_spec(Id, Options) ->
}. }.
shard_spec(DB, Shard) -> shard_spec(DB, Shard) ->
Options = emqx_ds_replication_layer_meta:get_options(DB), shard_spec(DB, Shard, emqx_ds_replication_layer_meta:get_options(DB)).
shard_spec(DB, Shard, Options) ->
#{ #{
id => Shard, id => {Shard, storage},
start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]}, start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]},
shutdown => 5_000, shutdown => 5_000,
restart => permanent, restart => permanent,
type => worker type => worker
}. }.
shard_replication_spec(DB, Shard, Opts) ->
#{
id => {Shard, replication},
start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard, Opts]},
restart => transient,
type => worker
}.
shard_allocator_spec(DB, Opts) ->
#{
id => shard_allocator,
start =>
{gen_server, start_link, [?via({allocator, DB}), ?MODULE, {allocator, DB, Opts}, []]},
restart => permanent,
type => worker
}.
egress_spec(DB, Shard) -> egress_spec(DB, Shard) ->
#{ #{
id => Shard, id => Shard,
@ -162,3 +238,79 @@ egress_spec(DB, Shard) ->
restart => permanent, restart => permanent,
type => worker type => worker
}. }.
%% Allocator
-define(ALLOCATE_RETRY_TIMEOUT, 1_000).
init_allocator(DB, Opts) ->
State = #{db => DB, opts => Opts, status => allocating},
case allocate_shards(State) of
NState = #{} ->
{ok, NState};
{error, Data} ->
_ = logger:notice(
Data#{
msg => "Shard allocation still in progress",
retry_in => ?ALLOCATE_RETRY_TIMEOUT
}
),
{ok, State, ?ALLOCATE_RETRY_TIMEOUT}
end.
handle_call(_Call, _From, State) ->
{reply, ignored, State}.
handle_cast(_Cast, State) ->
{noreply, State}.
handle_info(timeout, State) ->
case allocate_shards(State) of
NState = #{} ->
{noreply, NState};
{error, Data} ->
_ = logger:notice(
Data#{
msg => "Shard allocation still in progress",
retry_in => ?ALLOCATE_RETRY_TIMEOUT
}
),
{noreply, State, ?ALLOCATE_RETRY_TIMEOUT}
end.
terminate(_Reason, #{db := DB, shards := Shards}) ->
%% FIXME
erase_shards_meta(DB, Shards).
%%
allocate_shards(State = #{db := DB, opts := Opts}) ->
case emqx_ds_replication_layer_meta:allocate_shards(DB, Opts) of
{ok, Shards} ->
logger:notice(#{msg => "Shards allocated", shards => Shards}),
ok = save_shards_meta(DB, Shards),
ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB), Opts),
logger:notice(#{
msg => "Shards started", shards => emqx_ds_replication_layer_meta:my_shards(DB)
}),
ok = start_egresses(DB, Shards),
logger:notice(#{msg => "Egresses started", shards => Shards}),
State#{shards => Shards, status := ready};
{error, Reason} ->
{error, Reason}
end.
save_shards_meta(DB, Shards) ->
lists:foreach(fun(Shard) -> save_shard_meta(DB, Shard) end, Shards).
save_shard_meta(DB, Shard) ->
Servers = emqx_ds_replication_layer_shard:shard_servers(DB, Shard),
persistent_term:put(?shard_meta(DB, Shard), #{
servers => Servers
}).
erase_shards_meta(DB, Shards) ->
lists:foreach(fun(Shard) -> erase_shard_meta(DB, Shard) end, Shards).
erase_shard_meta(DB, Shard) ->
persistent_term:erase(?shard_meta(DB, Shard)).

View File

@ -33,9 +33,7 @@
make_iterator/4, make_iterator/4,
update_iterator/3, update_iterator/3,
next/3, next/3,
node_of_shard/2, shard_of_message/3
shard_of_message/3,
maybe_set_myself_as_leader/2
]). ]).
%% internal exports: %% internal exports:
@ -50,7 +48,15 @@
do_next_v1/4, do_next_v1/4,
do_add_generation_v2/1, do_add_generation_v2/1,
do_list_generations_with_lifetimes_v3/2, do_list_generations_with_lifetimes_v3/2,
do_drop_generation_v3/3 do_drop_generation_v3/3,
%% FIXME
ra_store_batch/3
]).
-export([
init/1,
apply/3
]). ]).
-export_type([ -export_type([
@ -71,7 +77,9 @@
backend := builtin, backend := builtin,
storage := emqx_ds_storage_layer:prototype(), storage := emqx_ds_storage_layer:prototype(),
n_shards => pos_integer(), n_shards => pos_integer(),
replication_factor => pos_integer() n_sites => pos_integer(),
replication_factor => pos_integer(),
replication_options => _TODO :: #{}
}. }.
%% This enapsulates the stream entity from the replication level. %% This enapsulates the stream entity from the replication level.
@ -126,13 +134,19 @@ open_db(DB, CreateOpts) ->
-spec add_generation(emqx_ds:db()) -> ok | {error, _}. -spec add_generation(emqx_ds:db()) -> ok | {error, _}.
add_generation(DB) -> add_generation(DB) ->
Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB), foreach_shard(
_ = emqx_ds_proto_v4:add_generation(Nodes, DB), DB,
ok. fun(Shard) -> ok = ra_add_generation(DB, Shard) end
).
-spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
update_db_config(DB, CreateOpts) -> update_db_config(DB, CreateOpts) ->
emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts). ok = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts),
Opts = emqx_ds_replication_layer_meta:get_options(DB),
foreach_shard(
DB,
fun(Shard) -> ok = ra_update_config(DB, Shard, Opts) end
).
-spec list_generations_with_lifetimes(emqx_ds:db()) -> -spec list_generations_with_lifetimes(emqx_ds:db()) ->
#{generation_rank() => emqx_ds:generation_info()}. #{generation_rank() => emqx_ds:generation_info()}.
@ -140,13 +154,12 @@ list_generations_with_lifetimes(DB) ->
Shards = list_shards(DB), Shards = list_shards(DB),
lists:foldl( lists:foldl(
fun(Shard, GensAcc) -> fun(Shard, GensAcc) ->
Node = node_of_shard(DB, Shard),
maps:fold( maps:fold(
fun(GenId, Data, AccInner) -> fun(GenId, Data, AccInner) ->
AccInner#{{Shard, GenId} => Data} AccInner#{{Shard, GenId} => Data}
end, end,
GensAcc, GensAcc,
emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard) ra_list_generations_with_lifetimes(DB, Shard)
) )
end, end,
#{}, #{},
@ -155,10 +168,7 @@ list_generations_with_lifetimes(DB) ->
-spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}. -spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}.
drop_generation(DB, {Shard, GenId}) -> drop_generation(DB, {Shard, GenId}) ->
%% TODO: drop generation in all nodes in the replica set, not only in the leader, ra_drop_generation(DB, Shard, GenId).
%% after we have proper replication in place.
Node = node_of_shard(DB, Shard),
emqx_ds_proto_v4:drop_generation(Node, DB, Shard, GenId).
-spec drop_db(emqx_ds:db()) -> ok | {error, _}. -spec drop_db(emqx_ds:db()) -> ok | {error, _}.
drop_db(DB) -> drop_db(DB) ->
@ -179,8 +189,7 @@ get_streams(DB, TopicFilter, StartTime) ->
Shards = list_shards(DB), Shards = list_shards(DB),
lists:flatmap( lists:flatmap(
fun(Shard) -> fun(Shard) ->
Node = node_of_shard(DB, Shard), Streams = ra_get_streams(DB, Shard, TopicFilter, StartTime),
Streams = emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime),
lists:map( lists:map(
fun({RankY, StorageLayerStream}) -> fun({RankY, StorageLayerStream}) ->
RankX = Shard, RankX = Shard,
@ -197,8 +206,7 @@ get_streams(DB, TopicFilter, StartTime) ->
emqx_ds:make_iterator_result(iterator()). emqx_ds:make_iterator_result(iterator()).
make_iterator(DB, Stream, TopicFilter, StartTime) -> make_iterator(DB, Stream, TopicFilter, StartTime) ->
?stream_v2(Shard, StorageStream) = Stream, ?stream_v2(Shard, StorageStream) = Stream,
Node = node_of_shard(DB, Shard), case ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
case emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
{ok, Iter} -> {ok, Iter} ->
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
Err = {error, _} -> Err = {error, _} ->
@ -213,16 +221,7 @@ make_iterator(DB, Stream, TopicFilter, StartTime) ->
emqx_ds:make_iterator_result(iterator()). emqx_ds:make_iterator_result(iterator()).
update_iterator(DB, OldIter, DSKey) -> update_iterator(DB, OldIter, DSKey) ->
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter, #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
Node = node_of_shard(DB, Shard), case ra_update_iterator(DB, Shard, StorageIter, DSKey) of
case
emqx_ds_proto_v4:update_iterator(
Node,
DB,
Shard,
StorageIter,
DSKey
)
of
{ok, Iter} -> {ok, Iter} ->
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
Err = {error, _} -> Err = {error, _} ->
@ -232,7 +231,6 @@ update_iterator(DB, OldIter, DSKey) ->
-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
next(DB, Iter0, BatchSize) -> next(DB, Iter0, BatchSize) ->
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0, #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0,
Node = node_of_shard(DB, Shard),
%% TODO: iterator can contain information that is useful for %% TODO: iterator can contain information that is useful for
%% reconstructing messages sent over the network. For example, %% reconstructing messages sent over the network. For example,
%% when we send messages with the learned topic index, we could %% when we send messages with the learned topic index, we could
@ -241,7 +239,7 @@ next(DB, Iter0, BatchSize) ->
%% %%
%% This kind of trickery should be probably done here in the %% This kind of trickery should be probably done here in the
%% replication layer. Or, perhaps, in the logic layer. %% replication layer. Or, perhaps, in the logic layer.
case emqx_ds_proto_v4:next(Node, DB, Shard, StorageIter0, BatchSize) of case ra_next(DB, Shard, StorageIter0, BatchSize) of
{ok, StorageIter, Batch} -> {ok, StorageIter, Batch} ->
Iter = Iter0#{?enc := StorageIter}, Iter = Iter0#{?enc := StorageIter},
{ok, Iter, Batch}; {ok, Iter, Batch};
@ -249,17 +247,6 @@ next(DB, Iter0, BatchSize) ->
Other Other
end. end.
-spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
node_of_shard(DB, Shard) ->
case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
{ok, Leader} ->
Leader;
{error, no_leader_for_shard} ->
%% TODO: use optvar
timer:sleep(500),
node_of_shard(DB, Shard)
end.
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> -spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) ->
emqx_ds_replication_layer:shard_id(). emqx_ds_replication_layer:shard_id().
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
@ -271,18 +258,8 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
end, end,
integer_to_binary(Hash). integer_to_binary(Hash).
%% TODO: there's no real leader election right now foreach_shard(DB, Fun) ->
-spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok. lists:foreach(Fun, list_shards(DB)).
maybe_set_myself_as_leader(DB, Shard) ->
Site = emqx_ds_replication_layer_meta:this_site(),
case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of
[Site | _] ->
%% Currently the first in-sync replica always becomes the
%% leader
ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node());
_Sites ->
ok
end.
%%================================================================================ %%================================================================================
%% behavior callbacks %% behavior callbacks
@ -298,7 +275,8 @@ do_drop_db_v1(DB) ->
emqx_ds_builtin_sup:stop_db(DB), emqx_ds_builtin_sup:stop_db(DB),
lists:foreach( lists:foreach(
fun(Shard) -> fun(Shard) ->
emqx_ds_storage_layer:drop_shard({DB, Shard}) emqx_ds_storage_layer:drop_shard({DB, Shard}),
ra_drop_shard(DB, Shard)
end, end,
MyShards MyShards
). ).
@ -376,7 +354,8 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
-spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}. -spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}.
do_add_generation_v2(DB) -> do_add_generation_v2(DB) ->
MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB), %% FIXME
MyShards = [],
lists:foreach( lists:foreach(
fun(ShardId) -> fun(ShardId) ->
emqx_ds_storage_layer:add_generation({DB, ShardId}) emqx_ds_storage_layer:add_generation({DB, ShardId})
@ -400,3 +379,194 @@ do_drop_generation_v3(DB, ShardId, GenId) ->
list_nodes() -> list_nodes() ->
mria:running_nodes(). mria:running_nodes().
%%
%% TODO
%% Too large for normal operation, need better backpressure mechanism.
-define(RA_TIMEOUT, 60 * 1000).
-define(SAFERPC(SERVER, EXPR), make_safe_rpc(SERVER, fun() -> EXPR end)).
-define(SAFERPC(SERVER, EXPR, RET), make_safe_rpc(SERVER, fun() -> EXPR end, RET)).
-define(GENRPC(SERVER, EXPR), make_gen_rpc(SERVER, fun() -> EXPR end)).
ra_store_batch(DB, Shard, Messages) ->
Command = #{
?tag => ?BATCH,
?batch_messages => Messages
},
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
Result;
Error ->
Error
end.
ra_add_generation(DB, Shard) ->
Command = #{?tag => add_generation},
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
Result;
Error ->
error(Error, [DB, Shard])
end.
ra_update_config(DB, Shard, Opts) ->
Command = #{?tag => update_config, ?config => Opts},
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
Result;
Error ->
error(Error, [DB, Shard])
end.
ra_drop_generation(DB, Shard, GenId) ->
Command = #{?tag => drop_generation, ?generation => GenId},
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
Result;
Error ->
error(Error, [DB, Shard])
end.
ra_get_streams(DB, Shard, TopicFilter, Time) ->
Server = {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
?SAFERPC(
Server,
emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time),
[]
).
ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
Server = {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
?SAFERPC(
Server,
emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime)
).
ra_update_iterator(DB, Shard, Iter, DSKey) ->
Server = {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
?SAFERPC(Server, emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)).
ra_next(DB, Shard, Iter, BatchSize) ->
Server = {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
?GENRPC(Server, emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize)).
ra_list_generations_with_lifetimes(DB, Shard) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard).
ra_drop_shard(DB, Shard) ->
LocalServer = emqx_ds_replication_layer_shard:server(DB, Shard, local),
ra:force_delete_server(_System = default, LocalServer).
make_safe_rpc(Server, Fun) ->
try
Fun()
catch
C:Reason when C == error orelse C == exit ->
_ = logger:warning(#{
msg => "RPC failed",
server => Server,
exception => C,
context => Reason
}),
{error, Reason}
end.
make_safe_rpc(Server, Fun, Ret) ->
try
Fun()
catch
C:Reason when C == error orelse C == exit ->
_ = logger:warning(#{
msg => "RPC failed",
server => Server,
exception => C,
context => Reason
}),
Ret
end.
make_gen_rpc(Server, Fun) ->
case Fun() of
{badrpc, Reason} ->
_ = logger:warning(#{
msg => "RPC failed",
server => Server,
context => Reason
}),
{error, Reason};
Ret ->
Ret
end.
%%
init(#{db := DB, shard := Shard}) ->
#{db_shard => {DB, Shard}, latest => 0}.
apply(
#{index := RaftIdx},
#{
?tag := ?BATCH,
?batch_messages := MessagesIn
},
#{db_shard := DBShard, latest := Latest} = State
) ->
%% NOTE
%% Unique timestamp tracking real time closely.
%% With microsecond granularity it should be nearly impossible for it to run
%% too far ahead than the real time clock.
{NLatest, Messages} = assign_timestamps(Latest, MessagesIn),
%% TODO
%% Batch is now reversed, but it should not make a lot of difference.
%% Even if it would be in order, it's still possible to write messages far away
%% in the past, i.e. when replica catches up with the leader. Storage layer
%% currently relies on wall clock time to decide if it's safe to iterate over
%% next epoch, this is likely wrong. Ideally it should rely on consensus clock
%% time instead.
Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}),
NState = State#{latest := NLatest},
%% TODO: Need to measure effects of changing frequency of `release_cursor`.
Effect = {release_cursor, RaftIdx, NState},
{NState, Result, Effect};
apply(
_RaftMeta,
#{?tag := add_generation},
#{db_shard := DBShard} = State
) ->
Result = emqx_ds_storage_layer:add_generation(DBShard),
{State, Result};
apply(
_RaftMeta,
#{?tag := update_config, ?config := Opts},
#{db_shard := DBShard} = State
) ->
Result = emqx_ds_storage_layer:update_config(DBShard, Opts),
{State, Result};
apply(
_RaftMeta,
#{?tag := drop_generation, ?generation := GenId},
#{db_shard := DBShard} = State
) ->
Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId),
{State, Result}.
assign_timestamps(Latest, Messages) ->
assign_timestamps(Latest, Messages, []).
assign_timestamps(Latest, [MessageIn | Rest], Acc) ->
case emqx_message:timestamp(MessageIn) of
Later when Later > Latest ->
assign_timestamps(Later, Rest, [MessageIn | Acc]);
_Earlier ->
Message = emqx_message:set_timestamp(Latest + 1, MessageIn),
assign_timestamps(Latest + 1, Rest, [Message | Acc])
end;
assign_timestamps(Latest, [], Acc) ->
{Latest, Acc}.

View File

@ -28,6 +28,15 @@
-define(tag, 1). -define(tag, 1).
-define(shard, 2). -define(shard, 2).
-define(enc, 3). -define(enc, 3).
%% ?BATCH
-define(batch_messages, 2). -define(batch_messages, 2).
-define(timestamp, 3).
%% update_config
-define(config, 2).
%% drop_generation
-define(generation, 2).
-endif. -endif.

View File

@ -27,6 +27,8 @@
%% servers, if needed. %% servers, if needed.
-module(emqx_ds_replication_layer_egress). -module(emqx_ds_replication_layer_egress).
-include_lib("emqx_utils/include/emqx_message.hrl").
-behaviour(gen_server). -behaviour(gen_server).
%% API: %% API:
@ -40,17 +42,19 @@
-export_type([]). -export_type([]).
-include("emqx_ds_replication_layer.hrl").
-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("snabbkaffe/include/trace.hrl").
%%================================================================================ %%================================================================================
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
-define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}). -define(ACCUM_TIMEOUT, 4).
-define(flush, flush). -define(DRAIN_TIMEOUT, 1).
-define(COOLDOWN_TIMEOUT_MIN, 1000).
-define(COOLDOWN_TIMEOUT_MAX, 5000).
-record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}). -define(name(DB, Shard), {n, l, {?MODULE, DB, Shard}}).
-define(via(DB, Shard), {via, gproc, ?name(DB, Shard)}).
%%================================================================================ %%================================================================================
%% API functions %% API functions
@ -61,15 +65,27 @@ start_link(DB, Shard) ->
gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []). gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> -spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
ok. [ok | error].
store_batch(DB, Messages, Opts) -> store_batch(DB, Messages, _Opts) ->
Sync = maps:get(sync, Opts, true), Pid = self(),
lists:foreach( Refs = lists:map(
fun(Message) -> fun(MessageIn) ->
Ref = erlang:make_ref(),
Message = emqx_message:set_timestamp(emqx_ds:timestamp_us(), MessageIn),
Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync}) _ = gproc:send(?name(DB, Shard), {Pid, Ref, Message}),
Ref
end, end,
Messages Messages
),
%% FIXME
lists:map(
fun(Ref) ->
receive
{Ref, Result} -> Result
end
end,
Refs
). ).
%%================================================================================ %%================================================================================
@ -78,37 +94,30 @@ store_batch(DB, Messages, Opts) ->
-record(s, { -record(s, {
db :: emqx_ds:db(), db :: emqx_ds:db(),
shard :: emqx_ds_replication_layer:shard_id(), shard :: emqx_ds_replication_layer:shard_id()
leader :: node(),
n = 0 :: non_neg_integer(),
tref :: reference(),
batch = [] :: [emqx_types:message()],
pending_replies = [] :: [gen_server:from()]
}). }).
init([DB, Shard]) -> init([DB, Shard]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
process_flag(message_queue_data, off_heap), process_flag(message_queue_data, off_heap),
%% TODO: adjust leader dynamically
{ok, Leader} = emqx_ds_replication_layer_meta:shard_leader(DB, Shard),
S = #s{ S = #s{
db = DB, db = DB,
shard = Shard, shard = Shard
leader = Leader,
tref = start_timer()
}, },
{ok, S}. {ok, S}.
handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) ->
do_enqueue(From, Sync, Msg, S);
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}. {reply, {error, unknown_call}, S}.
handle_cast(_Cast, S) -> handle_cast(_Cast, S) ->
{noreply, S}. {noreply, S}.
handle_info(?flush, S) -> handle_info(Req = {_Pid, _Ref, #message{}}, S) ->
{noreply, do_flush(S)}; ok = timer:sleep(?ACCUM_TIMEOUT),
Batch = [Req | drain_requests(1, max_batch_size())],
_ = flush(Batch, S),
true = erlang:garbage_collect(),
{noreply, S};
handle_info(_Info, S) -> handle_info(_Info, S) ->
{noreply, S}. {noreply, S}.
@ -123,53 +132,47 @@ terminate(_Reason, _S) ->
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
do_flush(S = #s{batch = []}) -> drain_requests(M, M) ->
S#s{tref = start_timer()}; [M];
do_flush( drain_requests(N, M) ->
S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard, leader = Leader} receive
) -> Req = {_Pid, _Ref, #message{}} ->
Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)}, [Req | drain_requests(N + 1, M)]
ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}), after ?DRAIN_TIMEOUT ->
[gen_server:reply(From, ok) || From <- lists:reverse(Replies)], [N]
?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard}), end.
erlang:garbage_collect(),
S#s{
n = 0,
batch = [],
pending_replies = [],
tref = start_timer()
}.
do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> flush(Batch, #s{db = DB, shard = Shard}) ->
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), %% FIXME
S1 = S0#s{n = N + 1, batch = [Msg | Batch]}, Messages = [Message || {_, _, Message} <- Batch],
S2 = case emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages) of
case N >= NMax of ok ->
true -> Size = reply(ok, Batch),
_ = erlang:cancel_timer(S0#s.tref), ?tp(
do_flush(S1); emqx_ds_replication_layer_egress_flush,
false -> #{db => DB, shard => Shard, size => Size}
S1 ),
end, ok;
%% TODO: later we may want to delay the reply until the message is {error, Reason} ->
%% replicated, but it requies changes to the PUBACK/PUBREC flow to Size = reply(error, Batch),
%% allow for async replies. For now, we ack when the message is ?tp(
%% _buffered_ rather than stored. warning,
%% emqx_ds_replication_layer_egress_flush_failed,
%% Otherwise, the client would freeze for at least flush interval, #{db => DB, shard => Shard, size => Size, reason => Reason}
%% or until the buffer is filled. ),
S = ok = cooldown(),
case Sync of {error, Reason}
true -> end.
S2#s{pending_replies = [From | Replies]};
false ->
gen_server:reply(From, ok),
S2
end,
%% TODO: add a backpressure mechanism for the server to avoid
%% building a long message queue.
{noreply, S}.
start_timer() -> reply(Result, [{Pid, Ref, _Message} | Rest]) ->
Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), erlang:send(Pid, {Ref, Result}),
erlang:send_after(Interval, self(), ?flush). reply(Result, Rest);
reply(_Result, [Size]) when is_integer(Size) ->
Size.
cooldown() ->
Timeout = ?COOLDOWN_TIMEOUT_MIN + rand:uniform(?COOLDOWN_TIMEOUT_MAX - ?COOLDOWN_TIMEOUT_MIN),
timer:sleep(Timeout).
max_batch_size() ->
max(1, application:get_env(emqx_durable_storage, egress_batch_size, 1000)).

View File

@ -29,19 +29,16 @@
-export([ -export([
shards/1, shards/1,
my_shards/1, my_shards/1,
my_owned_shards/1, allocate_shards/2,
leader_nodes/1,
replica_set/2, replica_set/2,
in_sync_replicas/2, in_sync_replicas/2,
sites/0, sites/0,
node/1,
open_db/2, open_db/2,
get_options/1, get_options/1,
update_db_config/2, update_db_config/2,
drop_db/1, drop_db/1,
shard_leader/2,
this_site/0, this_site/0,
set_leader/3,
is_leader/1,
print_status/0 print_status/0
]). ]).
@ -51,12 +48,11 @@
%% internal exports: %% internal exports:
-export([ -export([
open_db_trans/2, open_db_trans/2,
allocate_shards_trans/2,
update_db_config_trans/2, update_db_config_trans/2,
drop_db_trans/1, drop_db_trans/1,
claim_site/2, claim_site/2,
in_sync_replicas_trans/2, in_sync_replicas_trans/2,
set_leader_trans/3,
is_leader_trans/1,
n_shards/1 n_shards/1
]). ]).
@ -97,7 +93,6 @@
replica_set :: [site()], replica_set :: [site()],
%% Sites that contain the actual data: %% Sites that contain the actual data:
in_sync_replicas :: [site()], in_sync_replicas :: [site()],
leader :: node() | undefined,
misc = #{} :: map() misc = #{} :: map()
}). }).
@ -113,7 +108,7 @@
-spec print_status() -> ok. -spec print_status() -> ok.
print_status() -> print_status() ->
io:format("THIS SITE:~n~s~n", [base64:encode(this_site())]), io:format("THIS SITE:~n~s~n", [this_site()]),
io:format("~nSITES:~n", []), io:format("~nSITES:~n", []),
Nodes = [node() | nodes()], Nodes = [node() | nodes()],
lists:foreach( lists:foreach(
@ -123,28 +118,18 @@ print_status() ->
true -> up; true -> up;
false -> down false -> down
end, end,
io:format("~s ~p ~p~n", [base64:encode(Site), Node, Status]) io:format("~s ~p ~p~n", [Site, Node, Status])
end, end,
eval_qlc(mnesia:table(?NODE_TAB)) eval_qlc(mnesia:table(?NODE_TAB))
), ),
io:format( io:format(
"~nSHARDS:~nId Leader Status~n", [] "~nSHARDS:~nId Replicas~n", []
), ),
lists:foreach( lists:foreach(
fun(#?SHARD_TAB{shard = {DB, Shard}, leader = Leader}) -> fun(#?SHARD_TAB{shard = {DB, Shard}, replica_set = RS}) ->
ShardStr = string:pad(io_lib:format("~p/~s", [DB, Shard]), 30), ShardStr = string:pad(io_lib:format("~p/~s", [DB, Shard]), 30),
LeaderStr = string:pad(atom_to_list(Leader), 33), ReplicasStr = string:pad(io_lib:format("~p", [RS]), 40),
Status = io:format("~s ~s~n", [ShardStr, ReplicasStr])
case lists:member(Leader, Nodes) of
true ->
case node() of
Leader -> "up *";
_ -> "up"
end;
false ->
"down"
end,
io:format("~s ~s ~s~n", [ShardStr, LeaderStr, Status])
end, end,
eval_qlc(mnesia:table(?SHARD_TAB)) eval_qlc(mnesia:table(?SHARD_TAB))
). ).
@ -169,30 +154,19 @@ shards(DB) ->
-spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. -spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
my_shards(DB) -> my_shards(DB) ->
Site = this_site(), Site = this_site(),
filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet, in_sync_replicas = InSync}) -> filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet}) ->
lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync) lists:member(Site, ReplicaSet)
end). end).
-spec my_owned_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. allocate_shards(DB, Opts) ->
my_owned_shards(DB) -> case mria:transaction(?SHARD, fun ?MODULE:allocate_shards_trans/2, [DB, Opts]) of
Self = node(), {atomic, Shards} ->
filter_shards(DB, fun(#?SHARD_TAB{leader = Leader}) -> {ok, Shards};
Self =:= Leader {aborted, {shards_already_allocated, Shards}} ->
end). {ok, Shards};
{aborted, {insufficient_sites_online, Needed, Sites}} ->
-spec leader_nodes(emqx_ds:db()) -> [node()]. {error, #{reason => insufficient_sites_online, needed => Needed, sites => Sites}}
leader_nodes(DB) -> end.
lists:uniq(
filter_shards(
DB,
fun(#?SHARD_TAB{leader = Leader}) ->
Leader =/= undefined
end,
fun(#?SHARD_TAB{leader = Leader}) ->
Leader
end
)
).
-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
{ok, [site()]} | {error, _}. {ok, [site()]} | {error, _}.
@ -219,31 +193,23 @@ in_sync_replicas(DB, ShardId) ->
sites() -> sites() ->
eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])). eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])).
-spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> -spec node(site()) -> node() | undefined.
{ok, node()} | {error, no_leader_for_shard}. node(Site) ->
shard_leader(DB, Shard) -> case mnesia:dirty_read(?NODE_TAB, Site) of
case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of [#?NODE_TAB{node = Node}] ->
[#?SHARD_TAB{leader = Leader}] when Leader =/= undefined -> Node;
{ok, Leader}; [] ->
_ -> undefined
{error, no_leader_for_shard}
end. end.
-spec set_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) ->
ok.
set_leader(DB, Shard, Node) ->
{atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]),
ok.
-spec is_leader(node()) -> boolean().
is_leader(Node) ->
{atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]),
Result.
-spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts(). -spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
get_options(DB) -> get_options(DB) ->
{atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, undefined]), case mnesia:dirty_read(?META_TAB, DB) of
Opts. [#?META_TAB{db_props = Opts}] ->
Opts;
[] ->
#{}
end.
-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
emqx_ds_replication_layer:builtin_db_opts(). emqx_ds_replication_layer:builtin_db_opts().
@ -275,7 +241,6 @@ init([]) ->
logger:set_process_metadata(#{domain => [ds, meta]}), logger:set_process_metadata(#{domain => [ds, meta]}),
ensure_tables(), ensure_tables(),
ensure_site(), ensure_site(),
{ok, _} = mnesia:subscribe({table, ?META_TAB, detailed}),
S = #s{}, S = #s{},
{ok, S}. {ok, S}.
@ -285,22 +250,11 @@ handle_call(_Call, _From, S) ->
handle_cast(_Cast, S) -> handle_cast(_Cast, S) ->
{noreply, S}. {noreply, S}.
handle_info(
{mnesia_table_event, {write, ?META_TAB, #?META_TAB{db = DB, db_props = Options}, [_], _}}, S
) ->
MyShards = my_owned_shards(DB),
lists:foreach(
fun(ShardId) ->
emqx_ds_storage_layer:update_config({DB, ShardId}, Options)
end,
MyShards
),
{noreply, S};
handle_info(_Info, S) -> handle_info(_Info, S) ->
{noreply, S}. {noreply, S}.
terminate(_Reason, #s{}) -> terminate(_Reason, #s{}) ->
%% FIXME: persistent_term:erase(?DB_META(...))
persistent_term:erase(?emqx_ds_builtin_site), persistent_term:erase(?emqx_ds_builtin_site),
ok. ok.
@ -308,20 +262,60 @@ terminate(_Reason, #s{}) ->
%% Internal exports %% Internal exports
%%================================================================================ %%================================================================================
-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts() | undefined) -> -spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
emqx_ds_replication_layer:builtin_db_opts(). emqx_ds_replication_layer:builtin_db_opts().
open_db_trans(DB, CreateOpts) -> open_db_trans(DB, CreateOpts) ->
case mnesia:wread({?META_TAB, DB}) of case mnesia:wread({?META_TAB, DB}) of
[] when is_map(CreateOpts) -> [] ->
NShards = maps:get(n_shards, CreateOpts),
ReplicationFactor = maps:get(replication_factor, CreateOpts),
mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}), mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
create_shards(DB, NShards, ReplicationFactor),
CreateOpts; CreateOpts;
[#?META_TAB{db_props = Opts}] -> [#?META_TAB{db_props = Opts}] ->
Opts Opts
end. end.
-spec allocate_shards_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> [_Shard].
allocate_shards_trans(DB, Opts) ->
NShards = maps:get(n_shards, Opts),
NSites = maps:get(n_sites, Opts),
ReplicationFactor = maps:get(replication_factor, Opts),
NReplicas = min(NSites, ReplicationFactor),
Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read),
case length(AllSites) of
N when N >= NSites ->
ok;
_ ->
mnesia:abort({insufficient_sites_online, NSites, AllSites})
end,
case mnesia:match_object(?SHARD_TAB, #?SHARD_TAB{shard = {DB, '_'}, _ = '_'}, write) of
[] ->
ok;
Records ->
ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records],
mnesia:abort({shards_already_allocated, ShardsAllocated})
end,
{Allocation, _} = lists:mapfoldl(
fun(Shard, SSites) ->
{Sites, _} = emqx_utils_stream:consume(NReplicas, SSites),
{_, SRest} = emqx_utils_stream:consume(1, SSites),
{{Shard, Sites}, SRest}
end,
emqx_utils_stream:repeat(emqx_utils_stream:list(AllSites)),
Shards
),
lists:map(
fun({Shard, Sites}) ->
ReplicaSet = [Site || #?NODE_TAB{site = Site} <- Sites],
Record = #?SHARD_TAB{
shard = {DB, Shard},
replica_set = ReplicaSet
},
ok = mnesia:write(Record),
Shard
end,
Allocation
).
-spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> -spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
ok | {error, database}. ok | {error, database}.
update_db_config_trans(DB, CreateOpts) -> update_db_config_trans(DB, CreateOpts) ->
@ -367,41 +361,13 @@ in_sync_replicas_trans(DB, Shard) ->
{error, no_shard} {error, no_shard}
end. end.
-spec set_leader_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) ->
ok.
set_leader_trans(DB, Shard, Node) ->
[Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}),
Record = Record0#?SHARD_TAB{leader = Node},
mnesia:write(Record).
-spec is_leader_trans(node) -> boolean().
is_leader_trans(Node) ->
case
mnesia:select(
?SHARD_TAB,
ets:fun2ms(fun(#?SHARD_TAB{leader = Leader}) ->
Leader =:= Node
end),
1,
read
)
of
{[_ | _], _Cont} ->
true;
_ ->
false
end.
%%================================================================================ %%================================================================================
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
ensure_tables() -> ensure_tables() ->
%% TODO: seems like it may introduce flakiness
Majority = false,
ok = mria:create_table(?META_TAB, [ ok = mria:create_table(?META_TAB, [
{rlog_shard, ?SHARD}, {rlog_shard, ?SHARD},
{majority, Majority},
{type, ordered_set}, {type, ordered_set},
{storage, disc_copies}, {storage, disc_copies},
{record_name, ?META_TAB}, {record_name, ?META_TAB},
@ -409,7 +375,6 @@ ensure_tables() ->
]), ]),
ok = mria:create_table(?NODE_TAB, [ ok = mria:create_table(?NODE_TAB, [
{rlog_shard, ?SHARD}, {rlog_shard, ?SHARD},
{majority, Majority},
{type, ordered_set}, {type, ordered_set},
{storage, disc_copies}, {storage, disc_copies},
{record_name, ?NODE_TAB}, {record_name, ?NODE_TAB},
@ -417,7 +382,6 @@ ensure_tables() ->
]), ]),
ok = mria:create_table(?SHARD_TAB, [ ok = mria:create_table(?SHARD_TAB, [
{rlog_shard, ?SHARD}, {rlog_shard, ?SHARD},
{majority, Majority},
{type, ordered_set}, {type, ordered_set},
{storage, disc_copies}, {storage, disc_copies},
{record_name, ?SHARD_TAB}, {record_name, ?SHARD_TAB},
@ -431,8 +395,8 @@ ensure_site() ->
{ok, [Site]} -> {ok, [Site]} ->
ok; ok;
_ -> _ ->
Site = crypto:strong_rand_bytes(8), Site = binary:encode_hex(crypto:strong_rand_bytes(4)),
logger:notice("Creating a new site with ID=~s", [base64:encode(Site)]), logger:notice("Creating a new site with ID=~s", [Site]),
ok = filelib:ensure_dir(Filename), ok = filelib:ensure_dir(Filename),
{ok, FD} = file:open(Filename, [write]), {ok, FD} = file:open(Filename, [write]),
io:format(FD, "~p.", [Site]), io:format(FD, "~p.", [Site]),
@ -442,30 +406,6 @@ ensure_site() ->
persistent_term:put(?emqx_ds_builtin_site, Site), persistent_term:put(?emqx_ds_builtin_site, Site),
ok. ok.
-spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok.
create_shards(DB, NShards, ReplicationFactor) ->
Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
AllSites = sites(),
lists:foreach(
fun(Shard) ->
Hashes0 = [{hash(Shard, Site), Site} || Site <- AllSites],
Hashes = lists:sort(Hashes0),
{_, Sites} = lists:unzip(Hashes),
[First | ReplicaSet] = lists:sublist(Sites, 1, ReplicationFactor),
Record = #?SHARD_TAB{
shard = {DB, Shard},
replica_set = ReplicaSet,
in_sync_replicas = [First]
},
mnesia:write(Record)
end,
Shards
).
-spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any().
hash(Shard, Site) ->
erlang:phash2({Shard, Site}).
eval_qlc(Q) -> eval_qlc(Q) ->
case mnesia:is_transaction() of case mnesia:is_transaction() of
true -> true ->

View File

@ -0,0 +1,199 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_replication_layer_shard).
-export([start_link/3]).
-export([shard_servers/2]).
-export([
servers/3,
server/3
]).
-behaviour(gen_server).
-export([
init/1,
handle_call/3,
handle_cast/2,
terminate/2
]).
-define(PTERM(DB, SHARD, L), {?MODULE, DB, SHARD, L}).
-define(MEMOIZE(DB, SHARD, EXPR),
case persistent_term:get(__X_Key = ?PTERM(DB, SHARD, ?LINE), undefined) of
undefined ->
ok = persistent_term:put(__X_Key, __X_Value = (EXPR)),
__X_Value;
__X_Value ->
__X_Value
end
).
%%
start_link(DB, Shard, Opts) ->
gen_server:start_link(?MODULE, {DB, Shard, Opts}, []).
shard_servers(DB, Shard) ->
{ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
[
{server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}
|| Site <- ReplicaSet
].
local_server(DB, Shard) ->
Site = emqx_ds_replication_layer_meta:this_site(),
{server_name(DB, Shard, Site), node()}.
cluster_name(DB, Shard) ->
iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])).
server_name(DB, Shard, Site) ->
DBBin = atom_to_binary(DB),
binary_to_atom(<<"ds_", DBBin/binary, Shard/binary, "_", Site/binary>>).
%%
servers(DB, Shard, _Order = leader_preferred) ->
get_servers_leader_preferred(DB, Shard);
servers(DB, Shard, _Order = undefined) ->
get_shard_servers(DB, Shard).
server(DB, Shard, _Which = local_preferred) ->
get_server_local_preferred(DB, Shard).
get_servers_leader_preferred(DB, Shard) ->
%% NOTE: Contact last known leader first, then rest of shard servers.
ClusterName = get_cluster_name(DB, Shard),
case ra_leaderboard:lookup_leader(ClusterName) of
Leader when Leader /= undefined ->
Servers = ra_leaderboard:lookup_members(ClusterName),
[Leader | lists:delete(Leader, Servers)];
undefined ->
%% TODO: Dynamic membership.
get_shard_servers(DB, Shard)
end.
get_server_local_preferred(DB, Shard) ->
%% NOTE: Contact random replica that is not a known leader.
%% TODO: Replica may be down, so we may need to retry.
ClusterName = get_cluster_name(DB, Shard),
case ra_leaderboard:lookup_members(ClusterName) of
Servers when is_list(Servers) ->
pick_local(Servers);
undefined ->
%% TODO
%% Leader is unkonwn if there are no servers of this group on the
%% local node. We want to pick a replica in that case as well.
%% TODO: Dynamic membership.
pick_random(get_shard_servers(DB, Shard))
end.
pick_local(Servers) ->
case lists:dropwhile(fun({_Name, Node}) -> Node =/= node() end, Servers) of
[Local | _] ->
Local;
[] ->
pick_random(Servers)
end.
pick_random(Servers) ->
lists:nth(rand:uniform(length(Servers)), Servers).
get_cluster_name(DB, Shard) ->
?MEMOIZE(DB, Shard, cluster_name(DB, Shard)).
get_local_server(DB, Shard) ->
?MEMOIZE(DB, Shard, local_server(DB, Shard)).
get_shard_servers(DB, Shard) ->
maps:get(servers, emqx_ds_builtin_db_sup:lookup_shard_meta(DB, Shard)).
%%
init({DB, Shard, Opts}) ->
_ = process_flag(trap_exit, true),
_Meta = start_shard(DB, Shard, Opts),
{ok, {DB, Shard}}.
handle_call(_Call, _From, State) ->
{reply, ignored, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
terminate(_Reason, {DB, Shard}) ->
LocalServer = get_local_server(DB, Shard),
ok = ra:stop_server(DB, LocalServer).
%%
start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
Site = emqx_ds_replication_layer_meta:this_site(),
ClusterName = cluster_name(DB, Shard),
LocalServer = local_server(DB, Shard),
Servers = shard_servers(DB, Shard),
case ra:restart_server(DB, LocalServer) of
ok ->
Bootstrap = false;
{error, name_not_registered} ->
Bootstrap = true,
ok = ra:start_server(DB, #{
id => LocalServer,
uid => <<ClusterName/binary, "_", Site/binary>>,
cluster_name => ClusterName,
initial_members => Servers,
machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
log_init_args => maps:with(
[
snapshot_interval,
resend_window
],
ReplicationOpts
)
})
end,
case Servers of
[LocalServer | _] ->
%% TODO
%% Not super robust, but we probably don't expect nodes to be down
%% when we bring up a fresh consensus group. Triggering election
%% is not really required otherwise.
%% TODO
%% Ensure that doing that on node restart does not disrupt consensus.
%% Edit: looks like it doesn't, this could actually be quite useful
%% to "steal" leadership from nodes that have too much leader load.
%% TODO
%% It doesn't really work that way. There's `ra:transfer_leadership/2`
%% for that.
try
ra:trigger_election(LocalServer, _Timeout = 1_000)
catch
%% TODO
%% Tolerating exceptions because server might be occupied with log
%% replay for a while.
exit:{timeout, _} when not Bootstrap ->
ok
end;
_ ->
ok
end,
#{
cluster_name => ClusterName,
servers => Servers,
local_server => LocalServer
}.

View File

@ -145,8 +145,8 @@ create(_ShardId, DBHandle, GenId, Options) ->
%% Get options: %% Get options:
BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64), BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64),
TopicIndexBytes = maps:get(topic_index_bytes, Options, 4), TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
%% 10 bits -> 1024 ms -> ~1 sec %% 20 bits -> 1048576 us -> ~1 sec
TSOffsetBits = maps:get(epoch_bits, Options, 10), TSOffsetBits = maps:get(epoch_bits, Options, 20),
%% Create column families: %% Create column families:
DataCFName = data_cf(GenId), DataCFName = data_cf(GenId),
TrieCFName = trie_cf(GenId), TrieCFName = trie_cf(GenId),
@ -235,7 +235,7 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
fun(Msg) -> fun(Msg) ->
{Key, _} = make_key(S, Msg), {Key, _} = make_key(S, Msg),
Val = serialize(Msg), Val = serialize(Msg),
rocksdb:put(DB, Data, Key, Val, []) rocksdb:put(DB, Data, Key, Val, [{disable_wal, true}])
end, end,
Messages Messages
). ).
@ -288,7 +288,7 @@ next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
%% Compute safe cutoff time. %% Compute safe cutoff time.
%% It's the point in time where the last complete epoch ends, so we need to know %% It's the point in time where the last complete epoch ends, so we need to know
%% the current time to compute it. %% the current time to compute it.
Now = emqx_message:timestamp_now(), Now = emqx_ds:timestamp_us(),
SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
next_until(Schema, It, SafeCutoffTime, BatchSize). next_until(Schema, It, SafeCutoffTime, BatchSize).
@ -309,9 +309,7 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime,
%% Make filter: %% Make filter:
Inequations = [ Inequations = [
{'=', TopicIndex}, {'=', TopicIndex},
{StartTime, '..', SafeCutoffTime - 1}, {StartTime, '..', SafeCutoffTime - 1}
%% Unique integer:
any
%% Varying topic levels: %% Varying topic levels:
| lists:map( | lists:map(
fun fun
@ -438,11 +436,10 @@ make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestam
]) -> ]) ->
binary(). binary().
make_key(KeyMapper, TopicIndex, Timestamp, Varying) -> make_key(KeyMapper, TopicIndex, Timestamp, Varying) ->
UniqueInteger = erlang:unique_integer([monotonic, positive]),
emqx_ds_bitmask_keymapper:key_to_bitstring( emqx_ds_bitmask_keymapper:key_to_bitstring(
KeyMapper, KeyMapper,
emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, [ emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, [
TopicIndex, Timestamp, UniqueInteger | Varying TopicIndex, Timestamp | Varying
]) ])
). ).
@ -498,10 +495,9 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
%% Dimension Offset Bitsize %% Dimension Offset Bitsize
[{1, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index [{1, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index
{2, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch {2, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch
[{3 + I, 0, BitsPerTopicLevel } %% Varying topic levels [{2 + I, 0, BitsPerTopicLevel } %% Varying topic levels
|| I <- lists:seq(1, N)] ++ || I <- lists:seq(1, N)] ++
[{2, 0, TSOffsetBits }, %% Timestamp offset [{2, 0, TSOffsetBits }], %% Timestamp offset
{3, 0, 64 }], %% Unique integer
Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)), Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)),
%% Assert: %% Assert:
case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of
@ -515,7 +511,7 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
-spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie(). -spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie().
restore_trie(TopicIndexBytes, DB, CF) -> restore_trie(TopicIndexBytes, DB, CF) ->
PersistCallback = fun(Key, Val) -> PersistCallback = fun(Key, Val) ->
rocksdb:put(DB, CF, term_to_binary(Key), term_to_binary(Val), []) rocksdb:put(DB, CF, term_to_binary(Key), term_to_binary(Val), [{disable_wal, true}])
end, end,
{ok, IT} = rocksdb:iterator(DB, CF, []), {ok, IT} = rocksdb:iterator(DB, CF, []),
try try

View File

@ -582,7 +582,9 @@ rocksdb_open(Shard, Options) ->
DBOptions = [ DBOptions = [
{create_if_missing, true}, {create_if_missing, true},
{create_missing_column_families, true}, {create_missing_column_families, true},
{enable_write_thread_adaptive_yield, false} % {enable_write_thread_adaptive_yield, false},
{manual_wal_flush, true},
{atomic_flush, true}
| maps:get(db_options, Options, []) | maps:get(db_options, Options, [])
], ],
DBDir = db_dir(Shard), DBDir = db_dir(Shard),
@ -707,6 +709,6 @@ get_schema_persistent(DB) ->
-spec put_schema_persistent(rocksdb:db_handle(), shard_schema()) -> ok. -spec put_schema_persistent(rocksdb:db_handle(), shard_schema()) -> ok.
put_schema_persistent(DB, Schema) -> put_schema_persistent(DB, Schema) ->
Blob = term_to_binary(Schema), Blob = term_to_binary(Schema),
rocksdb:put(DB, ?ROCKSDB_SCHEMA_KEY, Blob, []). rocksdb:put(DB, ?ROCKSDB_SCHEMA_KEY, Blob, [{disable_wal, true}]).
-undef(ROCKSDB_SCHEMA_KEY). -undef(ROCKSDB_SCHEMA_KEY).

View File

@ -93,8 +93,7 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
lists:foreach( lists:foreach(
fun(Msg) -> fun(Msg) ->
Id = erlang:unique_integer([monotonic]), Key = <<(emqx_message:timestamp(Msg)):64>>,
Key = <<Id:64>>,
Val = term_to_binary(Msg), Val = term_to_binary(Msg),
rocksdb:put(DB, CF, Key, Val, []) rocksdb:put(DB, CF, Key, Val, [])
end, end,

View File

@ -5,7 +5,7 @@
{vsn, "0.1.11"}, {vsn, "0.1.11"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, rocksdb, gproc, mria, emqx_utils]}, {applications, [kernel, stdlib, rocksdb, gproc, mria, ra, emqx_utils]},
{mod, {emqx_ds_app, []}}, {mod, {emqx_ds_app, []}},
{env, []} {env, []}
]}. ]}.

View File

@ -50,13 +50,13 @@ t_00_smoke_open_drop(_Config) ->
lists:foreach( lists:foreach(
fun(Shard) -> fun(Shard) ->
?assertEqual( ?assertEqual(
{ok, []}, emqx_ds_replication_layer_meta:replica_set(DB, Shard) {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
), ),
?assertEqual( ?assertEqual(
[Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) %% FIXME
), undefined,
%% Check that the leader is eleected; emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard)
?assertEqual({ok, node()}, emqx_ds_replication_layer_meta:shard_leader(DB, Shard)) )
end, end,
Shards Shards
), ),

View File

@ -31,7 +31,7 @@
-endif. -endif.
%% These apps are always (re)started by emqx_machine: %% These apps are always (re)started by emqx_machine:
-define(BASIC_REBOOT_APPS, [gproc, esockd, ranch, cowboy, emqx]). -define(BASIC_REBOOT_APPS, [gproc, esockd, ranch, cowboy, emqx_durable_storage, emqx]).
%% If any of these applications crash, the entire EMQX node shuts down: %% If any of these applications crash, the entire EMQX node shuts down:
-define(BASIC_PERMANENT_APPS, [mria, ekka, esockd, emqx]). -define(BASIC_PERMANENT_APPS, [mria, ekka, esockd, emqx]).

View File

@ -22,7 +22,9 @@
list/1, list/1,
mqueue/1, mqueue/1,
map/2, map/2,
chain/2 zip/1,
chain/2,
repeat/1
]). ]).
%% Evaluating %% Evaluating
@ -91,6 +93,31 @@ map(F, S) ->
end end
end. end.
%% @doc Zip a list of streams into a stream producing lists of their respective values.
%% The resulting stream is as long as the shortest of the input streams.
-spec zip([stream(X)]) -> stream([X]).
zip([S]) ->
map(fun(X) -> [X] end, S);
zip([S | Streams]) ->
ziptail(S, zip(Streams));
zip([]) ->
empty().
ziptail(S, Tail) ->
fun() ->
case next(S) of
[X | SRest] ->
case next(Tail) of
[Xs | TailRest] ->
[[X | Xs] | ziptail(SRest, TailRest)];
[] ->
[]
end;
[] ->
[]
end
end.
%% @doc Make a stream by chaining (concatenating) two streams. %% @doc Make a stream by chaining (concatenating) two streams.
%% The second stream begins to produce values only after the first one is exhausted. %% The second stream begins to produce values only after the first one is exhausted.
-spec chain(stream(X), stream(Y)) -> stream(X | Y). -spec chain(stream(X), stream(Y)) -> stream(X | Y).
@ -104,6 +131,19 @@ chain(SFirst, SThen) ->
end end
end. end.
%% @doc Make an infinite stream out of repeats of given stream.
%% If the given stream is empty, the resulting stream is also empty.
-spec repeat(stream(X)) -> stream(X).
repeat(S) ->
fun() ->
case next(S) of
[X | SRest] ->
[X | chain(SRest, repeat(S))];
[] ->
[]
end
end.
%% %%
%% @doc Produce the next value from the stream. %% @doc Produce the next value from the stream.

View File

@ -74,6 +74,80 @@ chain_list_map_test() ->
emqx_utils_stream:consume(S) emqx_utils_stream:consume(S)
). ).
zip_test() ->
S = emqx_utils_stream:zip([
emqx_utils_stream:list([1, 2, 3]),
emqx_utils_stream:list([4, 5, 6, 7])
]),
?assertEqual(
[[1, 4], [2, 5], [3, 6]],
emqx_utils_stream:consume(S)
).
zip_none_test() ->
?assertEqual(
[],
emqx_utils_stream:consume(emqx_utils_stream:zip([]))
).
zip_one_test() ->
S = emqx_utils_stream:zip([emqx_utils_stream:list([1, 2, 3])]),
?assertEqual(
[[1], [2], [3]],
emqx_utils_stream:consume(S)
).
zip_many_test() ->
S = emqx_utils_stream:zip([
emqx_utils_stream:list([1, 2, 3]),
emqx_utils_stream:list([4, 5, 6, 7]),
emqx_utils_stream:list([8, 9])
]),
?assertEqual(
[[1, 4, 8], [2, 5, 9]],
emqx_utils_stream:consume(S)
).
zip_many_empty_test() ->
S = emqx_utils_stream:zip([
emqx_utils_stream:list([1, 2, 3]),
emqx_utils_stream:list([4, 5, 6, 7]),
emqx_utils_stream:empty()
]),
?assertEqual(
[],
emqx_utils_stream:consume(S)
).
repeat_test() ->
S = emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2, 3])),
?assertMatch(
{[1, 2, 3, 1, 2, 3, 1, 2], _},
emqx_utils_stream:consume(8, S)
),
{_, SRest} = emqx_utils_stream:consume(8, S),
?assertMatch(
{[3, 1, 2, 3, 1, 2, 3, 1], _},
emqx_utils_stream:consume(8, SRest)
).
repeat_empty_test() ->
S = emqx_utils_stream:repeat(emqx_utils_stream:list([])),
?assertEqual(
[],
emqx_utils_stream:consume(8, S)
).
zip_repeat_test() ->
S = emqx_utils_stream:zip([
emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2])),
emqx_utils_stream:list([4, 5, 6, 7, 8])
]),
?assertEqual(
[[1, 4], [2, 5], [1, 6], [2, 7], [1, 8]],
emqx_utils_stream:consume(S)
).
mqueue_test() -> mqueue_test() ->
_ = erlang:send_after(1, self(), 1), _ = erlang:send_after(1, self(), 1),
_ = erlang:send_after(100, self(), 2), _ = erlang:send_after(100, self(), 2),

View File

@ -110,7 +110,8 @@
{uuid, {git, "https://github.com/okeuday/uuid.git", {tag, "v2.0.6"}}}, {uuid, {git, "https://github.com/okeuday/uuid.git", {tag, "v2.0.6"}}},
{ssl_verify_fun, "1.1.7"}, {ssl_verify_fun, "1.1.7"},
{rfc3339, {git, "https://github.com/emqx/rfc3339.git", {tag, "0.2.3"}}}, {rfc3339, {git, "https://github.com/emqx/rfc3339.git", {tag, "0.2.3"}}},
{bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.1"}}} {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.1"}}},
{ra, "2.7.3"}
]}. ]}.
{xref_ignores, {xref_ignores,