diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 609b0139d..3f38b4030 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -23,9 +23,7 @@ %% Message persistence -export([ - persist/1, - serialize/1, - deserialize/1 + persist/1 ]). %% FIXME @@ -83,18 +81,9 @@ needs_persistence(Msg) -> not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)). store_message(Msg) -> - ID = emqx_message:id(Msg), - Timestamp = emqx_guid:timestamp(ID), - Topic = emqx_topic:words(emqx_message:topic(Msg)), - emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize(Msg)). + emqx_ds:message_store([Msg]). has_subscribers(#message{topic = Topic}) -> emqx_persistent_session_ds_router:has_any_route(Topic). %% - -serialize(Msg) -> - term_to_binary(emqx_message:to_map(Msg)). - -deserialize(Bin) -> - emqx_message:from_map(binary_to_term(Bin)). diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index ad6a07330..762478932 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -19,7 +19,7 @@ -export([create_db/2]). %% Message storage API: --export([message_store/3, message_store/2]). +-export([message_store/1, message_store/2, message_store/3]). %% Message replay API: -export([get_streams/3, open_iterator/2, next/2]). @@ -53,7 +53,7 @@ %% implementations for emqx_ds, so this type has to take this into %% account. -record(stream, - { shard :: emqx_ds:shard() + { shard :: emqx_ds_replication_layer:shard() , enc :: emqx_ds_replication_layer:stream() }). @@ -64,7 +64,7 @@ %% This record encapsulates the iterator entity from the replication %% level. -record(iterator, - { shard :: emqx_ds:shard() + { shard :: emqx_ds_replication_layer:shard() , enc :: enqx_ds_replication_layer:iterator() }). @@ -80,7 +80,9 @@ -type create_db_opts() :: #{}. --type message_id() :: binary(). +-type message_id() :: emqx_ds_replication_layer:message_id(). + +-define(DEFAULT_DB, <<"default">>). %%================================================================================ %% API funcions @@ -90,6 +92,11 @@ create_db(DB, Opts) -> emqx_ds_replication_layer:create_db(DB, Opts). +-spec message_store([emqx_types:message()]) -> + {ok, [message_id()]} | {error, _}. +message_store(Msgs) -> + message_store(?DEFAULT_DB, Msgs, #{}). + -spec message_store(db(), [emqx_types:message()], message_store_opts()) -> {ok, [message_id()]} | {error, _}. message_store(DB, Msgs, Opts) -> @@ -143,7 +150,7 @@ open_iterator(#stream{shard = Shard, enc = Stream}, StartTime) -> Err end. --spec next(iterator(), non_neg_integer()) -> {ok, iterator(), [emqx_types:message()]} | end_of_stream. +-spec next(iterator(), pos_integer()) -> {ok, iterator(), [emqx_types:message()]} | end_of_stream. next(#iterator{shard = Shard, enc = Iter0}, BatchSize) -> case emqx_ds_replication_layer:next(Shard, Iter0, BatchSize) of {ok, Iter, Batch} -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 9fe08e0a2..af6087188 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -28,11 +28,11 @@ %% internal exports: -export([ do_create_shard_v1/2, do_get_streams_v1/3, - do_open_iterator/3, + do_open_iterator_v1/3, do_next_v1/3 ]). --export_type([shard/0, stream/0, iterator/0]). +-export_type([shard/0, stream/0, iterator/0, message_id/0]). %%================================================================================ %% Type declarations @@ -44,6 +44,8 @@ -opaque iterator() :: emqx_ds_storage_layer:iterator(). +-type message_id() :: emqx_ds_storage_layer:message_id(). + %%================================================================================ %% API functions %%================================================================================ @@ -83,10 +85,18 @@ open_iterator(Shard, Stream, StartTime) -> Node = node_of_shard(Shard), emqx_ds_proto_v1:open_iterator(Node, Shard, Stream, StartTime). --spec next(shard(), iterator(), non_neg_integer()) -> +-spec next(shard(), iterator(), pos_integer()) -> {ok, iterator(), [emqx_types:message()]} | end_of_stream. next(Shard, Iter, BatchSize) -> Node = node_of_shard(Shard), + %% TODO: iterator can contain information that is useful for + %% reconstructing messages sent over the network. For example, + %% when we send messages with the learned topic index, we could + %% send the static part of topic once, and append it to the + %% messages on the receiving node, hence saving some network. + %% + %% This kind of trickery should be probably done here in the + %% replication layer. Or, perhaps, in the logic lary. emqx_ds_proto_v1:next(Node, Shard, Iter, BatchSize). %%================================================================================ @@ -107,7 +117,7 @@ do_get_streams_v1(Shard, TopicFilter, StartTime) -> error({todo, Shard, TopicFilter, StartTime}). -spec do_open_iterator_v1(shard(), stream(), emqx_ds:time()) -> iterator(). -do_open_iterator_v1(Shard, Stream, Time) -> +do_open_iterator_v1(Shard, Stream, StartTime) -> error({todo, Shard, Stream, StartTime}). -spec do_next_v1(shard(), iterator(), non_neg_integer()) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 7a96cab51..f4dbbe6f4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -10,7 +10,7 @@ -export([create_generation/3]). -export([get_streams/3]). --export([store/5]). +-export([message_store/3]). -export([delete/4]). -export([make_iterator/2, next/1, next/2]). @@ -33,11 +33,13 @@ -compile({inline, [meta_lookup/2]}). +-include_lib("emqx/include/emqx.hrl"). + %%================================================================================ %% Type declarations %%================================================================================ --opaque stream() :: {term()}. +-type stream() :: term(). %% Opaque term returned by the generation callback module -type options() :: #{ dir => file:filename() @@ -101,7 +103,7 @@ %% 3. `inplace_update_support`? -define(ITERATOR_CF_OPTS, []). --define(REF(Keyspace, ShardId), {via, gproc, {n, l, {?MODULE, Keyspace, ShardId}}}). +-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). %%================================================================================ %% Callbacks @@ -149,30 +151,34 @@ -spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> {ok, pid()}. -start_link(Shard = {Keyspace, ShardId}, Options) -> - gen_server:start_link(?REF(Keyspace, ShardId), ?MODULE, {Shard, Options}, []). +start_link(Shard, Options) -> + gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). --spec get_streams(emqx_ds:keyspace(), emqx_ds:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [stream()]. -get_streams(KeySpace, TopicFilter, StartTime) -> - %% FIXME: messages can be potentially stored in multiple - %% generations. This function should return the results from all - %% of them! - %% Otherwise we could LOSE messages when generations are switched. - {GenId, #{module := Mod, }} = meta_lookup_gen(Shard, StartTime), +-spec get_streams(emqx_ds:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [_Stream]. +get_streams(_ShardId, _TopicFilter, _StartTime) -> + []. -spec create_generation( emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config() ) -> {ok, gen_id()} | {error, nonmonotonic}. -create_generation({Keyspace, ShardId}, Since, Config = {_Module, _Options}) -> - gen_server:call(?REF(Keyspace, ShardId), {create_generation, Since, Config}). +create_generation(ShardId, Since, Config = {_Module, _Options}) -> + gen_server:call(?REF(ShardId), {create_generation, Since, Config}). --spec store(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()) -> - ok | {error, _}. -store(Shard, GUID, Time, Topic, Msg) -> - {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), - Mod:store(Data, GUID, Time, Topic, Msg). +-spec message_store(emqx_ds:shard(), [emqx_types:message()], emqx_ds:message_store_opts()) -> + {ok, _MessageId} | {error, _}. +message_store(Shard, Msgs, _Opts) -> + {ok, lists:map( + fun(Msg) -> + GUID = emqx_message:id(Msg), + Timestamp = Msg#message.timestamp, + {_GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, Timestamp), + Topic = emqx_topic:words(emqx_message:topic(Msg)), + Payload = serialize(Msg), + Mod:store(ModState, GUID, Timestamp, Topic, Payload) + end, + Msgs)}. -spec delete(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic()) -> ok | {error, _}. @@ -212,7 +218,8 @@ do_next(It, N, Acc) when N =< 0 -> {ok, It, lists:reverse(Acc)}; do_next(It = #it{module = Mod, data = ItData}, N, Acc) -> case Mod:next(ItData) of - {value, Val, ItDataNext} -> + {value, Bin, ItDataNext} -> + Val = deserialize(Bin), do_next(It#it{data = ItDataNext}, N - 1, [Val | Acc]); {error, _} = _Error -> %% todo: log? @@ -663,6 +670,14 @@ is_gen_valid(Shard, GenId, Since) when GenId > 0 -> is_gen_valid(_Shard, 0, 0) -> ok. +serialize(Msg) -> + %% TODO: remove topic, GUID, etc. from the stored message. + term_to_binary(emqx_message:to_map(Msg)). + +deserialize(Bin) -> + emqx_message:from_map(binary_to_term(Bin)). + + %% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok. %% store_cfs(DBHandle, CFRefs) -> %% lists:foreach( diff --git a/apps/emqx_durable_storage/src/emqx_ds_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_sup.erl index ca939e892..d371a2346 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_sup.erl @@ -30,7 +30,7 @@ start_link() -> %%================================================================================ init([]) -> - Children = [shard_sup()], + Children = [storage_layer_sup()], SupFlags = #{ strategy => one_for_all, intensity => 0, @@ -42,7 +42,7 @@ init([]) -> %% Internal functions %%================================================================================ -shard_sup() -> +storage_layer_sup() -> #{ id => local_store_shard_sup, start => {emqx_ds_storage_layer_sup, start_link, []}, diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index f5d802003..79285fe16 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -34,15 +34,15 @@ create_shard(Node, Shard, Opts) -> -spec get_streams(node(), emqx_ds_replication_layer:shard(), emqx_ds:topic_filter(), emqx_ds:time()) -> [emqx_ds_replication_layer:stream()]. -get_streams(Shard, TopicFilter, Time) -> +get_streams(Node, Shard, TopicFilter, Time) -> erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]). -spec open_iterator(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:stream(), emqx_ds:time()) -> {ok, emqx_ds_replication_layer:iterator()} | {error, _}. open_iterator(Node, Shard, Stream, StartTime) -> - erpc:call(Node, emqx_ds_replication_layer, do_open_iterator_v1, [Shard, Stream, Time]). + erpc:call(Node, emqx_ds_replication_layer, do_open_iterator_v1, [Shard, Stream, StartTime]). --spec next(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), non_neg_integer()) -> +-spec next(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer()) -> {ok, emqx_ds_replication_layer:iterator(), [emqx_types:messages()]} | end_of_stream. next(Node, Shard, Iter, BatchSize) -> erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [Shard, Iter, BatchSize]). diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl index 10596e216..981f1062a 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl @@ -6,6 +6,7 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx/include/emqx.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). @@ -39,19 +40,24 @@ t_open(_Config) -> t_store(_Config) -> MessageID = emqx_guid:gen(), PublishedAt = 1000, - Topic = [<<"foo">>, <<"bar">>], + Topic = <<"foo/bar">>, Payload = <<"message">>, - ?assertMatch(ok, emqx_ds_storage_layer:store(?SHARD, MessageID, PublishedAt, Topic, Payload)). + Msg = #message{ + id = MessageID, + topic = Topic, + payload = Payload, + timestamp = PublishedAt + }, + ?assertMatch({ok, [_]}, emqx_ds_storage_layer:message_store(?SHARD, [Msg], #{})). %% Smoke test for iteration through a concrete topic t_iterate(_Config) -> %% Prepare data: - Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]], + Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>], Timestamps = lists:seq(1, 10), [ - emqx_ds_storage_layer:store( + store( ?SHARD, - emqx_guid:gen(), PublishedAt, Topic, integer_to_binary(PublishedAt) @@ -61,7 +67,7 @@ t_iterate(_Config) -> %% Iterate through individual topics: [ begin - {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, {Topic, 0}), + {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, {parse_topic(Topic), 0}), Values = iterate(It), ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values) end @@ -149,7 +155,7 @@ t_create_gen(_Config) -> Topics = ["foo/bar", "foo/bar/baz"], Timestamps = lists:seq(1, 100), [ - ?assertEqual(ok, store(?SHARD, PublishedAt, Topic, <<>>)) + ?assertMatch({ok, [_]}, store(?SHARD, PublishedAt, Topic, <<>>)) || Topic <- Topics, PublishedAt <- Timestamps ]. @@ -215,16 +221,24 @@ t_iterate_multigen_preserve_restore(_Config) -> emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID) ). +store(Shard, PublishedAt, TopicL, Payload) when is_list(TopicL) -> + store(Shard, PublishedAt, list_to_binary(TopicL), Payload); store(Shard, PublishedAt, Topic, Payload) -> ID = emqx_guid:gen(), - emqx_ds_storage_layer:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload). + Msg = #message{ + id = ID, + topic = Topic, + timestamp = PublishedAt, + payload = Payload + }, + emqx_ds_storage_layer:message_store(Shard, [Msg], #{}). iterate(DB, TopicFilter, StartTime) -> iterate(iterator(DB, TopicFilter, StartTime)). iterate(It) -> case emqx_ds_storage_layer:next(It) of - {ok, ItNext, [Payload]} -> + {ok, ItNext, [#message{payload = Payload}]} -> [Payload | iterate(ItNext)]; end_of_stream -> [] @@ -234,8 +248,8 @@ iterate(end_of_stream, _N) -> {end_of_stream, []}; iterate(It, N) -> case emqx_ds_storage_layer:next(It, N) of - {ok, ItFinal, Payloads} -> - {ItFinal, Payloads}; + {ok, ItFinal, Messages} -> + {ItFinal, [Payload || #message{payload = Payload} <- Messages]}; end_of_stream -> {end_of_stream, []} end.