refactor(ds): Refactor storage layer
This commit is contained in:
parent
c91df2f5cd
commit
7095cb8583
|
@ -23,9 +23,7 @@
|
|||
|
||||
%% Message persistence
|
||||
-export([
|
||||
persist/1,
|
||||
serialize/1,
|
||||
deserialize/1
|
||||
persist/1
|
||||
]).
|
||||
|
||||
%% FIXME
|
||||
|
@ -83,18 +81,9 @@ needs_persistence(Msg) ->
|
|||
not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)).
|
||||
|
||||
store_message(Msg) ->
|
||||
ID = emqx_message:id(Msg),
|
||||
Timestamp = emqx_guid:timestamp(ID),
|
||||
Topic = emqx_topic:words(emqx_message:topic(Msg)),
|
||||
emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize(Msg)).
|
||||
emqx_ds:message_store([Msg]).
|
||||
|
||||
has_subscribers(#message{topic = Topic}) ->
|
||||
emqx_persistent_session_ds_router:has_any_route(Topic).
|
||||
|
||||
%%
|
||||
|
||||
serialize(Msg) ->
|
||||
term_to_binary(emqx_message:to_map(Msg)).
|
||||
|
||||
deserialize(Bin) ->
|
||||
emqx_message:from_map(binary_to_term(Bin)).
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
-export([create_db/2]).
|
||||
|
||||
%% Message storage API:
|
||||
-export([message_store/3, message_store/2]).
|
||||
-export([message_store/1, message_store/2, message_store/3]).
|
||||
|
||||
%% Message replay API:
|
||||
-export([get_streams/3, open_iterator/2, next/2]).
|
||||
|
@ -53,7 +53,7 @@
|
|||
%% implementations for emqx_ds, so this type has to take this into
|
||||
%% account.
|
||||
-record(stream,
|
||||
{ shard :: emqx_ds:shard()
|
||||
{ shard :: emqx_ds_replication_layer:shard()
|
||||
, enc :: emqx_ds_replication_layer:stream()
|
||||
}).
|
||||
|
||||
|
@ -64,7 +64,7 @@
|
|||
%% This record encapsulates the iterator entity from the replication
|
||||
%% level.
|
||||
-record(iterator,
|
||||
{ shard :: emqx_ds:shard()
|
||||
{ shard :: emqx_ds_replication_layer:shard()
|
||||
, enc :: enqx_ds_replication_layer:iterator()
|
||||
}).
|
||||
|
||||
|
@ -80,7 +80,9 @@
|
|||
|
||||
-type create_db_opts() :: #{}.
|
||||
|
||||
-type message_id() :: binary().
|
||||
-type message_id() :: emqx_ds_replication_layer:message_id().
|
||||
|
||||
-define(DEFAULT_DB, <<"default">>).
|
||||
|
||||
%%================================================================================
|
||||
%% API funcions
|
||||
|
@ -90,6 +92,11 @@
|
|||
create_db(DB, Opts) ->
|
||||
emqx_ds_replication_layer:create_db(DB, Opts).
|
||||
|
||||
-spec message_store([emqx_types:message()]) ->
|
||||
{ok, [message_id()]} | {error, _}.
|
||||
message_store(Msgs) ->
|
||||
message_store(?DEFAULT_DB, Msgs, #{}).
|
||||
|
||||
-spec message_store(db(), [emqx_types:message()], message_store_opts()) ->
|
||||
{ok, [message_id()]} | {error, _}.
|
||||
message_store(DB, Msgs, Opts) ->
|
||||
|
@ -143,7 +150,7 @@ open_iterator(#stream{shard = Shard, enc = Stream}, StartTime) ->
|
|||
Err
|
||||
end.
|
||||
|
||||
-spec next(iterator(), non_neg_integer()) -> {ok, iterator(), [emqx_types:message()]} | end_of_stream.
|
||||
-spec next(iterator(), pos_integer()) -> {ok, iterator(), [emqx_types:message()]} | end_of_stream.
|
||||
next(#iterator{shard = Shard, enc = Iter0}, BatchSize) ->
|
||||
case emqx_ds_replication_layer:next(Shard, Iter0, BatchSize) of
|
||||
{ok, Iter, Batch} ->
|
||||
|
|
|
@ -28,11 +28,11 @@
|
|||
%% internal exports:
|
||||
-export([ do_create_shard_v1/2,
|
||||
do_get_streams_v1/3,
|
||||
do_open_iterator/3,
|
||||
do_open_iterator_v1/3,
|
||||
do_next_v1/3
|
||||
]).
|
||||
|
||||
-export_type([shard/0, stream/0, iterator/0]).
|
||||
-export_type([shard/0, stream/0, iterator/0, message_id/0]).
|
||||
|
||||
%%================================================================================
|
||||
%% Type declarations
|
||||
|
@ -44,6 +44,8 @@
|
|||
|
||||
-opaque iterator() :: emqx_ds_storage_layer:iterator().
|
||||
|
||||
-type message_id() :: emqx_ds_storage_layer:message_id().
|
||||
|
||||
%%================================================================================
|
||||
%% API functions
|
||||
%%================================================================================
|
||||
|
@ -83,10 +85,18 @@ open_iterator(Shard, Stream, StartTime) ->
|
|||
Node = node_of_shard(Shard),
|
||||
emqx_ds_proto_v1:open_iterator(Node, Shard, Stream, StartTime).
|
||||
|
||||
-spec next(shard(), iterator(), non_neg_integer()) ->
|
||||
-spec next(shard(), iterator(), pos_integer()) ->
|
||||
{ok, iterator(), [emqx_types:message()]} | end_of_stream.
|
||||
next(Shard, Iter, BatchSize) ->
|
||||
Node = node_of_shard(Shard),
|
||||
%% TODO: iterator can contain information that is useful for
|
||||
%% reconstructing messages sent over the network. For example,
|
||||
%% when we send messages with the learned topic index, we could
|
||||
%% send the static part of topic once, and append it to the
|
||||
%% messages on the receiving node, hence saving some network.
|
||||
%%
|
||||
%% This kind of trickery should be probably done here in the
|
||||
%% replication layer. Or, perhaps, in the logic lary.
|
||||
emqx_ds_proto_v1:next(Node, Shard, Iter, BatchSize).
|
||||
|
||||
%%================================================================================
|
||||
|
@ -107,7 +117,7 @@ do_get_streams_v1(Shard, TopicFilter, StartTime) ->
|
|||
error({todo, Shard, TopicFilter, StartTime}).
|
||||
|
||||
-spec do_open_iterator_v1(shard(), stream(), emqx_ds:time()) -> iterator().
|
||||
do_open_iterator_v1(Shard, Stream, Time) ->
|
||||
do_open_iterator_v1(Shard, Stream, StartTime) ->
|
||||
error({todo, Shard, Stream, StartTime}).
|
||||
|
||||
-spec do_next_v1(shard(), iterator(), non_neg_integer()) ->
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
-export([create_generation/3]).
|
||||
|
||||
-export([get_streams/3]).
|
||||
-export([store/5]).
|
||||
-export([message_store/3]).
|
||||
-export([delete/4]).
|
||||
|
||||
-export([make_iterator/2, next/1, next/2]).
|
||||
|
@ -33,11 +33,13 @@
|
|||
|
||||
-compile({inline, [meta_lookup/2]}).
|
||||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
|
||||
%%================================================================================
|
||||
%% Type declarations
|
||||
%%================================================================================
|
||||
|
||||
-opaque stream() :: {term()}.
|
||||
-type stream() :: term(). %% Opaque term returned by the generation callback module
|
||||
|
||||
-type options() :: #{
|
||||
dir => file:filename()
|
||||
|
@ -101,7 +103,7 @@
|
|||
%% 3. `inplace_update_support`?
|
||||
-define(ITERATOR_CF_OPTS, []).
|
||||
|
||||
-define(REF(Keyspace, ShardId), {via, gproc, {n, l, {?MODULE, Keyspace, ShardId}}}).
|
||||
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
|
||||
|
||||
%%================================================================================
|
||||
%% Callbacks
|
||||
|
@ -149,30 +151,34 @@
|
|||
|
||||
-spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
|
||||
{ok, pid()}.
|
||||
start_link(Shard = {Keyspace, ShardId}, Options) ->
|
||||
gen_server:start_link(?REF(Keyspace, ShardId), ?MODULE, {Shard, Options}, []).
|
||||
start_link(Shard, Options) ->
|
||||
gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
|
||||
|
||||
-spec get_streams(emqx_ds:keyspace(), emqx_ds:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [stream()].
|
||||
get_streams(KeySpace, TopicFilter, StartTime) ->
|
||||
%% FIXME: messages can be potentially stored in multiple
|
||||
%% generations. This function should return the results from all
|
||||
%% of them!
|
||||
%% Otherwise we could LOSE messages when generations are switched.
|
||||
{GenId, #{module := Mod, }} = meta_lookup_gen(Shard, StartTime),
|
||||
-spec get_streams(emqx_ds:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [_Stream].
|
||||
get_streams(_ShardId, _TopicFilter, _StartTime) ->
|
||||
[].
|
||||
|
||||
|
||||
-spec create_generation(
|
||||
emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()
|
||||
) ->
|
||||
{ok, gen_id()} | {error, nonmonotonic}.
|
||||
create_generation({Keyspace, ShardId}, Since, Config = {_Module, _Options}) ->
|
||||
gen_server:call(?REF(Keyspace, ShardId), {create_generation, Since, Config}).
|
||||
create_generation(ShardId, Since, Config = {_Module, _Options}) ->
|
||||
gen_server:call(?REF(ShardId), {create_generation, Since, Config}).
|
||||
|
||||
-spec store(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()) ->
|
||||
ok | {error, _}.
|
||||
store(Shard, GUID, Time, Topic, Msg) ->
|
||||
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
|
||||
Mod:store(Data, GUID, Time, Topic, Msg).
|
||||
-spec message_store(emqx_ds:shard(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
|
||||
{ok, _MessageId} | {error, _}.
|
||||
message_store(Shard, Msgs, _Opts) ->
|
||||
{ok, lists:map(
|
||||
fun(Msg) ->
|
||||
GUID = emqx_message:id(Msg),
|
||||
Timestamp = Msg#message.timestamp,
|
||||
{_GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, Timestamp),
|
||||
Topic = emqx_topic:words(emqx_message:topic(Msg)),
|
||||
Payload = serialize(Msg),
|
||||
Mod:store(ModState, GUID, Timestamp, Topic, Payload)
|
||||
end,
|
||||
Msgs)}.
|
||||
|
||||
-spec delete(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic()) ->
|
||||
ok | {error, _}.
|
||||
|
@ -212,7 +218,8 @@ do_next(It, N, Acc) when N =< 0 ->
|
|||
{ok, It, lists:reverse(Acc)};
|
||||
do_next(It = #it{module = Mod, data = ItData}, N, Acc) ->
|
||||
case Mod:next(ItData) of
|
||||
{value, Val, ItDataNext} ->
|
||||
{value, Bin, ItDataNext} ->
|
||||
Val = deserialize(Bin),
|
||||
do_next(It#it{data = ItDataNext}, N - 1, [Val | Acc]);
|
||||
{error, _} = _Error ->
|
||||
%% todo: log?
|
||||
|
@ -663,6 +670,14 @@ is_gen_valid(Shard, GenId, Since) when GenId > 0 ->
|
|||
is_gen_valid(_Shard, 0, 0) ->
|
||||
ok.
|
||||
|
||||
serialize(Msg) ->
|
||||
%% TODO: remove topic, GUID, etc. from the stored message.
|
||||
term_to_binary(emqx_message:to_map(Msg)).
|
||||
|
||||
deserialize(Bin) ->
|
||||
emqx_message:from_map(binary_to_term(Bin)).
|
||||
|
||||
|
||||
%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok.
|
||||
%% store_cfs(DBHandle, CFRefs) ->
|
||||
%% lists:foreach(
|
||||
|
|
|
@ -30,7 +30,7 @@ start_link() ->
|
|||
%%================================================================================
|
||||
|
||||
init([]) ->
|
||||
Children = [shard_sup()],
|
||||
Children = [storage_layer_sup()],
|
||||
SupFlags = #{
|
||||
strategy => one_for_all,
|
||||
intensity => 0,
|
||||
|
@ -42,7 +42,7 @@ init([]) ->
|
|||
%% Internal functions
|
||||
%%================================================================================
|
||||
|
||||
shard_sup() ->
|
||||
storage_layer_sup() ->
|
||||
#{
|
||||
id => local_store_shard_sup,
|
||||
start => {emqx_ds_storage_layer_sup, start_link, []},
|
||||
|
|
|
@ -34,15 +34,15 @@ create_shard(Node, Shard, Opts) ->
|
|||
|
||||
-spec get_streams(node(), emqx_ds_replication_layer:shard(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
||||
[emqx_ds_replication_layer:stream()].
|
||||
get_streams(Shard, TopicFilter, Time) ->
|
||||
get_streams(Node, Shard, TopicFilter, Time) ->
|
||||
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]).
|
||||
|
||||
-spec open_iterator(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:stream(), emqx_ds:time()) ->
|
||||
{ok, emqx_ds_replication_layer:iterator()} | {error, _}.
|
||||
open_iterator(Node, Shard, Stream, StartTime) ->
|
||||
erpc:call(Node, emqx_ds_replication_layer, do_open_iterator_v1, [Shard, Stream, Time]).
|
||||
erpc:call(Node, emqx_ds_replication_layer, do_open_iterator_v1, [Shard, Stream, StartTime]).
|
||||
|
||||
-spec next(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), non_neg_integer()) ->
|
||||
-spec next(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer()) ->
|
||||
{ok, emqx_ds_replication_layer:iterator(), [emqx_types:messages()]} | end_of_stream.
|
||||
next(Node, Shard, Iter, BatchSize) ->
|
||||
erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [Shard, Iter, BatchSize]).
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("stdlib/include/assert.hrl").
|
||||
|
||||
|
@ -39,19 +40,24 @@ t_open(_Config) ->
|
|||
t_store(_Config) ->
|
||||
MessageID = emqx_guid:gen(),
|
||||
PublishedAt = 1000,
|
||||
Topic = [<<"foo">>, <<"bar">>],
|
||||
Topic = <<"foo/bar">>,
|
||||
Payload = <<"message">>,
|
||||
?assertMatch(ok, emqx_ds_storage_layer:store(?SHARD, MessageID, PublishedAt, Topic, Payload)).
|
||||
Msg = #message{
|
||||
id = MessageID,
|
||||
topic = Topic,
|
||||
payload = Payload,
|
||||
timestamp = PublishedAt
|
||||
},
|
||||
?assertMatch({ok, [_]}, emqx_ds_storage_layer:message_store(?SHARD, [Msg], #{})).
|
||||
|
||||
%% Smoke test for iteration through a concrete topic
|
||||
t_iterate(_Config) ->
|
||||
%% Prepare data:
|
||||
Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]],
|
||||
Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>],
|
||||
Timestamps = lists:seq(1, 10),
|
||||
[
|
||||
emqx_ds_storage_layer:store(
|
||||
store(
|
||||
?SHARD,
|
||||
emqx_guid:gen(),
|
||||
PublishedAt,
|
||||
Topic,
|
||||
integer_to_binary(PublishedAt)
|
||||
|
@ -61,7 +67,7 @@ t_iterate(_Config) ->
|
|||
%% Iterate through individual topics:
|
||||
[
|
||||
begin
|
||||
{ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, {Topic, 0}),
|
||||
{ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, {parse_topic(Topic), 0}),
|
||||
Values = iterate(It),
|
||||
?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
|
||||
end
|
||||
|
@ -149,7 +155,7 @@ t_create_gen(_Config) ->
|
|||
Topics = ["foo/bar", "foo/bar/baz"],
|
||||
Timestamps = lists:seq(1, 100),
|
||||
[
|
||||
?assertEqual(ok, store(?SHARD, PublishedAt, Topic, <<>>))
|
||||
?assertMatch({ok, [_]}, store(?SHARD, PublishedAt, Topic, <<>>))
|
||||
|| Topic <- Topics, PublishedAt <- Timestamps
|
||||
].
|
||||
|
||||
|
@ -215,16 +221,24 @@ t_iterate_multigen_preserve_restore(_Config) ->
|
|||
emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID)
|
||||
).
|
||||
|
||||
store(Shard, PublishedAt, TopicL, Payload) when is_list(TopicL) ->
|
||||
store(Shard, PublishedAt, list_to_binary(TopicL), Payload);
|
||||
store(Shard, PublishedAt, Topic, Payload) ->
|
||||
ID = emqx_guid:gen(),
|
||||
emqx_ds_storage_layer:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload).
|
||||
Msg = #message{
|
||||
id = ID,
|
||||
topic = Topic,
|
||||
timestamp = PublishedAt,
|
||||
payload = Payload
|
||||
},
|
||||
emqx_ds_storage_layer:message_store(Shard, [Msg], #{}).
|
||||
|
||||
iterate(DB, TopicFilter, StartTime) ->
|
||||
iterate(iterator(DB, TopicFilter, StartTime)).
|
||||
|
||||
iterate(It) ->
|
||||
case emqx_ds_storage_layer:next(It) of
|
||||
{ok, ItNext, [Payload]} ->
|
||||
{ok, ItNext, [#message{payload = Payload}]} ->
|
||||
[Payload | iterate(ItNext)];
|
||||
end_of_stream ->
|
||||
[]
|
||||
|
@ -234,8 +248,8 @@ iterate(end_of_stream, _N) ->
|
|||
{end_of_stream, []};
|
||||
iterate(It, N) ->
|
||||
case emqx_ds_storage_layer:next(It, N) of
|
||||
{ok, ItFinal, Payloads} ->
|
||||
{ItFinal, Payloads};
|
||||
{ok, ItFinal, Messages} ->
|
||||
{ItFinal, [Payload || #message{payload = Payload} <- Messages]};
|
||||
end_of_stream ->
|
||||
{end_of_stream, []}
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue