diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index ccacb3ee7..45ad2beab 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,11 +16,21 @@ -module(emqx_replay_message_storage). --export([open/2]). +%% API: +-export([open/2, close/1]). -export([store/5]). -export([make_iterator/3]). +%% Debug/troubleshooting: +-export([make_message_key/3, compute_topic_hash/1, hash/2, combine/3]). + +-export_type([db/0, iterator/0]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + %% see rocksdb:db_options() -type options() :: proplists:proplist(). @@ -34,29 +44,64 @@ db :: rocksdb:db_handle() }). +-record(it, { + handle :: rocksdb:itr_handle(), + topic_filter :: emqx_topic:words(), + bitmask :: integer(), + start_time :: time() +}). + -opaque db() :: #db{}. --opaque iterator() :: _TODO. + +-opaque iterator() :: #it{}. + +%%================================================================================ +%% API funcions +%%================================================================================ -spec open(file:filename_all(), options()) -> {ok, db()} | {error, _TODO}. open(Filename, Options) -> - #db{ - db = rocksdb:open(Filename, Options) - }. + case rocksdb:open(Filename, [{create_if_missing, true}, Options]) of + {ok, Handle} -> + {ok, #db{db = Handle}}; + Error -> + Error + end. + +-spec close(db()) -> ok | {error, _}. +close(#db{db = DB}) -> + rocksdb:close(DB). -spec store(db(), emqx_guid:guid(), time(), topic(), binary()) -> ok. -store(DB, MessageID, PublishedAt, Topic, MessagePayload) -> +store(#db{db = DB}, MessageID, PublishedAt, Topic, MessagePayload) -> Key = make_message_key(MessageID, Topic, PublishedAt), Value = make_message_value(Topic, MessagePayload), rocksdb:put(DB, Key, Value, [{sync, true}]). --spec make_iterator(db(), emqx_topic:words(), time()) -> +-spec make_iterator(db(), emqx_topic:words(), time() | earliest) -> {ok, iterator()} | {error, invalid_start_time}. -make_iterator(DB, TopicFilter, StartTime) -> +make_iterator(#db{db = DBHandle}, TopicFilter, StartTime) -> + case rocksdb:iterator(DBHandle, []) of + {ok, ITHandle} -> + #it{ + handle = ITHandle, + topic_filter = TopicFilter, + start_time = StartTime, + bitmask = make_bitmask(TopicFilter) + }; + Err -> + Err + end. + +-spec next(iterator()) -> {value, binary()} | none. +next(It) -> error(noimpl). -%% +%%================================================================================ +%% Internal exports +%%================================================================================ -define(TOPIC_LEVELS_ENTROPY_BITS, [8, 8, 32, 16]). @@ -72,16 +117,53 @@ combine(TopicHash, PublishedAt, MessageID) -> compute_topic_hash(Topic) -> compute_topic_hash(Topic, ?TOPIC_LEVELS_ENTROPY_BITS, 0). -compute_topic_hash([], [Bits | BitsRest], Acc) -> - Hash = hash(<<"/">>, Bits), - compute_topic_hash([], BitsRest, Acc bsl Bits + Hash); +hash(Input, Bits) -> + % at most 32 bits + erlang:phash2(Input, 1 bsl Bits). + +-spec make_bitmask(emqx_topic:words()) -> integer(). +make_bitmask(TopicFilter) -> + make_bitmask(TopicFilter, ?TOPIC_LEVELS_ENTROPY_BITS). + +%%================================================================================ +%% Internal functions +%%================================================================================ + compute_topic_hash(LevelsRest, [Bits], Acc) -> Hash = hash(LevelsRest, Bits), Acc bsl Bits + Hash; +compute_topic_hash([], [Bits | BitsRest], Acc) -> + Hash = hash(<<"/">>, Bits), + compute_topic_hash([], BitsRest, Acc bsl Bits + Hash); compute_topic_hash([Level | LevelsRest], [Bits | BitsRest], Acc) -> Hash = hash(Level, Bits), compute_topic_hash(LevelsRest, BitsRest, Acc bsl Bits + Hash). -hash(Input, Bits) -> - % at most 32 bits - erlang:phash2(Input, 1 bsl Bits). +make_bitmask(LevelsRest, [Bits], Acc) -> + Hash = hash(LevelsRest, Bits), + Acc bsl Bits + Hash; +make_bitmask([], [Bits | BitsRest], Acc) -> + Hash = hash(<<"/">>, Bits), + make_bitmask([], BitsRest, Acc bsl Bits + Hash); +make_bitmask([Level | LevelsRest], [Bits | BitsRest], Acc) -> + Hash = case Level of + '+' -> + 0; + Bin when is_binary(Bin) -> + 1 bsl Bits - 1; + + Hash = hash(Level, Bits), + make_bitmask(LevelsRest, BitsRest, Acc bsl Bits + Hash). + +%% |123|345|678| +%% foo bar baz + +%% |123|000|678| - |123|fff|678| + +%% foo + baz + +%% |fff|000|fff| + +%% |123|000|678| + +%% |123|056|678| & |fff|000|fff| = |123|000|678|.