diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index 45ad2beab..e867ad850 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -21,6 +21,7 @@ -export([store/5]). -export([make_iterator/3]). +-export([next/1]). %% Debug/troubleshooting: -export([make_message_key/3, compute_topic_hash/1, hash/2, combine/3]). @@ -41,13 +42,15 @@ -type time() :: integer(). -record(db, { - db :: rocksdb:db_handle() + handle :: rocksdb:db_handle() }). -record(it, { handle :: rocksdb:itr_handle(), + next_action :: {seek, binary()} | next, topic_filter :: emqx_topic:words(), - bitmask :: integer(), + hash_filter :: integer(), + hash_bitmask :: integer(), start_time :: time() }). @@ -64,40 +67,56 @@ open(Filename, Options) -> case rocksdb:open(Filename, [{create_if_missing, true}, Options]) of {ok, Handle} -> - {ok, #db{db = Handle}}; + {ok, #db{handle = Handle}}; Error -> Error end. -spec close(db()) -> ok | {error, _}. -close(#db{db = DB}) -> +close(#db{handle = DB}) -> rocksdb:close(DB). -spec store(db(), emqx_guid:guid(), time(), topic(), binary()) -> - ok. -store(#db{db = DB}, MessageID, PublishedAt, Topic, MessagePayload) -> + ok | {error, _TODO}. +store(#db{handle = DB}, MessageID, PublishedAt, Topic, MessagePayload) -> Key = make_message_key(MessageID, Topic, PublishedAt), Value = make_message_value(Topic, MessagePayload), rocksdb:put(DB, Key, Value, [{sync, true}]). -spec make_iterator(db(), emqx_topic:words(), time() | earliest) -> - {ok, iterator()} | {error, invalid_start_time}. -make_iterator(#db{db = DBHandle}, TopicFilter, StartTime) -> + % {error, invalid_start_time}? might just start from the beginning of time + % and call it a day: client violated the contract anyway. + {ok, iterator()} | {error, _TODO}. +make_iterator(#db{handle = DBHandle}, TopicFilter, StartTime) -> case rocksdb:iterator(DBHandle, []) of {ok, ITHandle} -> + Hash = compute_topic_hash(TopicFilter), + HashBitmask = make_bitmask(TopicFilter), + HashFilter = Hash band HashBitmask, #it{ handle = ITHandle, + next_action = {seek, combine(HashFilter, StartTime, <<>>)}, topic_filter = TopicFilter, start_time = StartTime, - bitmask = make_bitmask(TopicFilter) + hash_filter = HashFilter, + hash_bitmask = HashBitmask }; Err -> Err end. --spec next(iterator()) -> {value, binary()} | none. -next(It) -> - error(noimpl). +-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. +next(It = #it{next_action = Action}) -> + case rocksdb:iterator_move(It#it.handle, Action) of + % spec says `{ok, Key}` is also possible but the implementation says it's not + {ok, Key, Value} -> + {TopicHash, PublishedAt} = extract(Key), + match_next(It, TopicHash, PublishedAt, Value); + {error, invalid_iterator} -> + stop_iteration(It); + {error, iterator_closed} -> + {error, closed} + end. %%================================================================================ %% Internal exports @@ -111,9 +130,15 @@ make_message_key(MessageID, Topic, PublishedAt) -> make_message_value(Topic, MessagePayload) -> term_to_binary({Topic, MessagePayload}). +unwrap_message_value(Binary) -> + binary_to_term(Binary). + combine(TopicHash, PublishedAt, MessageID) -> <>. +extract(<>) -> + {TopicHash, PublishedAt}. + compute_topic_hash(Topic) -> compute_topic_hash(Topic, ?TOPIC_LEVELS_ENTROPY_BITS, 0). @@ -123,7 +148,7 @@ hash(Input, Bits) -> -spec make_bitmask(emqx_topic:words()) -> integer(). make_bitmask(TopicFilter) -> - make_bitmask(TopicFilter, ?TOPIC_LEVELS_ENTROPY_BITS). + make_bitmask(TopicFilter, ?TOPIC_LEVELS_ENTROPY_BITS, 0). %%================================================================================ %% Internal functions @@ -139,21 +164,21 @@ compute_topic_hash([Level | LevelsRest], [Bits | BitsRest], Acc) -> Hash = hash(Level, Bits), compute_topic_hash(LevelsRest, BitsRest, Acc bsl Bits + Hash). -make_bitmask(LevelsRest, [Bits], Acc) -> - Hash = hash(LevelsRest, Bits), - Acc bsl Bits + Hash; +make_bitmask(['#'], BitsPerLevel, Acc) -> + Acc bsl lists:sum(BitsPerLevel) + 0; +make_bitmask(['+' | LevelsRest], [Bits | BitsRest], Acc) -> + make_bitmask(LevelsRest, BitsRest, Acc bsl Bits + 0); +make_bitmask(_, [Bits], Acc) -> + Acc bsl Bits + ones(Bits); make_bitmask([], [Bits | BitsRest], Acc) -> - Hash = hash(<<"/">>, Bits), - make_bitmask([], BitsRest, Acc bsl Bits + Hash); -make_bitmask([Level | LevelsRest], [Bits | BitsRest], Acc) -> - Hash = case Level of - '+' -> - 0; - Bin when is_binary(Bin) -> - 1 bsl Bits - 1; + make_bitmask([], BitsRest, Acc bsl Bits + ones(Bits)); +make_bitmask([_ | LevelsRest], [Bits | BitsRest], Acc) -> + make_bitmask(LevelsRest, BitsRest, Acc bsl Bits + ones(Bits)); +make_bitmask(_, [], Acc) -> + Acc. - Hash = hash(Level, Bits), - make_bitmask(LevelsRest, BitsRest, Acc bsl Bits + Hash). +ones(Bits) -> + 1 bsl Bits - 1. %% |123|345|678| %% foo bar baz @@ -167,3 +192,223 @@ make_bitmask([Level | LevelsRest], [Bits | BitsRest], Acc) -> %% |123|000|678| %% |123|056|678| & |fff|000|fff| = |123|000|678|. + +%% Filter = |123|***|678| +%% Key1 = |123|011|108| → Seek = |123|011|678| +%% Key1 = |123|011|679| → Seek = |123|012|678| +%% Key1 = |123|999|679| → Seek = 1|123|000|678| → eos + +%% Filter = |123|***|678|***| +%% Key1 = |123|011|108|121| → Seek = |123|011|678|000| +%% Key1 = |123|011|679|919| → Seek = |123|012|678|000| +%% Key1 = |123|999|679|001| → Seek = 1|123|000|678|000| → eos +%% Key1 = |125|999|179|017| → Seek = 1|123|000|678|000| → eos + +match_next( + It = #it{ + topic_filter = TopicFilter, + hash_filter = HashFilter, + hash_bitmask = HashBitmask, + start_time = StartTime + }, + TopicHash, + PublishedAt, + Value +) -> + HashMatches = (TopicHash band It#it.hash_bitmask) == It#it.hash_filter, + TimeMatches = PublishedAt >= It#it.start_time, + case HashMatches of + true when TimeMatches -> + {Topic, MessagePayload} = unwrap_message_value(Value), + case emqx_topic:match(Topic, TopicFilter) of + true -> + {value, MessagePayload, It#it{next_action = next}}; + false -> + next(It#it{next_action = next}) + end; + true -> + NextAction = {seek, combine(TopicHash, StartTime, <<>>)}, + next(It#it{next_action = NextAction}); + false -> + case compute_next_seek(TopicHash, HashFilter, HashBitmask) of + NextHash when is_integer(NextHash) -> + NextAction = {seek, combine(NextHash, StartTime, <<>>)}, + next(It#it{next_action = NextAction}); + none -> + stop_iteration(It) + end + end. + +stop_iteration(It) -> + ok = rocksdb:iterator_close(It#it.handle), + none. + +compute_next_seek(TopicHash, HashFilter, HashBitmask) -> + compute_next_seek(TopicHash, HashFilter, HashBitmask, ?TOPIC_LEVELS_ENTROPY_BITS). + +compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) -> + % NOTE + % Ok, this convoluted mess implements a sort of _increment operation_ for some + % strange number in variable bit-width base. There are `Levels` "digits", those + % with `0` level bitmask have `BitsPerLevel` bit-width and those with `111...` + % level bitmask have in some sense 0 bits (because they are fixed "digits" + % with exacly one possible value). + % TODO make at least remotely readable / optimize later + Result = zipfoldr3( + fun(LevelHash, Filter, LevelMask, Bits, Shift, {Carry, Acc}) -> + % io:format(user, "~n *** LH: ~.16B / F: ~.16B / M: ~.16B / Bs: ~B / Sh: ~B~n", [LevelHash, Filter, LevelMask, Bits, Shift]), + % io:format(user, "~n *** Carry: ~B / Acc: ~.16B~n", [Carry, Acc]), + case LevelMask of + 0 when Carry == 0 -> + {0, Acc + (LevelHash bsl Shift)}; + 0 -> + LevelHash1 = LevelHash + Carry, + NextCarry = LevelHash1 bsr Bits, + NextAcc = (LevelHash1 band ones(Bits)) bsl Shift, + {NextCarry, NextAcc}; + _ when (LevelHash + Carry) == Filter -> + {0, Acc + (Filter bsl Shift)}; + _ when (LevelHash + Carry) > Filter -> + {1, Filter bsl Shift}; + _ -> + {0, Filter bsl Shift} + end + end, + {1, 0}, + TopicHash, + HashFilter, + HashBitmask, + BitsPerLevel + ), + case Result of + {_, {_Carry = 0, Next}} -> + Next bor HashFilter; + {_, {_Carry = 1, _}} -> + % we got "carried away" past the range, time to stop iteration + none + end. + +% zipfoldr3(FoldFun, Acc, I1, I2, I3, Shift, [Bits]) -> +% { Shift + Bits +% , FoldFun( I1 band ones(Bits) +% , I2 band ones(Bits) +% , I3 band ones(Bits) +% , Bits, Acc ) }; +zipfoldr3(_FoldFun, Acc, _, _, _, []) -> + {0, Acc}; +zipfoldr3(FoldFun, Acc, I1, I2, I3, [Bits | Rest]) -> + {Shift, AccNext} = zipfoldr3( + FoldFun, + Acc, + I1, + I2, + I3, + Rest + ), + % { FoldFun(I1 band ones(Bits), I2 band ones(Bits), I3 band ones(Bits), Bits, AccNext). + { + Shift + Bits, + FoldFun( + (I1 bsr Shift) band ones(Bits), + (I2 bsr Shift) band ones(Bits), + (I3 bsr Shift) band ones(Bits), + Bits, + Shift, + AccNext + ) + }. + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +make_test_bitmask(TopicFilter) -> + make_bitmask(TopicFilter, [3, 4, 5, 2], 0). + +bitmask_test_() -> + [ + ?_assertEqual( + 2#111_1111_11111_11, + make_test_bitmask([<<"foo">>, <<"bar">>]) + ), + ?_assertEqual( + 2#111_0000_11111_11, + make_test_bitmask([<<"foo">>, '+']) + ), + ?_assertEqual( + 2#111_0000_00000_11, + make_test_bitmask([<<"foo">>, '+', '+']) + ), + ?_assertEqual( + 2#111_0000_11111_00, + make_test_bitmask([<<"foo">>, '+', <<"bar">>, '+']) + ) + ]. + +wildcard_bitmask_test_() -> + [ + ?_assertEqual( + 2#000_0000_00000_00, + make_test_bitmask(['#']) + ), + ?_assertEqual( + 2#111_0000_00000_00, + make_test_bitmask([<<"foo">>, '#']) + ), + ?_assertEqual( + 2#111_1111_11111_00, + make_test_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, '#']) + ), + ?_assertEqual( + 2#111_1111_11111_11, + make_test_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, '#']) + ) + ]. + +%% Filter = |123|***|678|***| +%% Mask = |123|***|678|***| +%% Key1 = |123|011|108|121| → Seek = 0 |123|011|678|000| +%% Key2 = |123|011|679|919| → Seek = 0 |123|012|678|000| +%% Key3 = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos +%% Key4 = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos + +compute_test_next_seek(TopicHash, HashFilter, HashBitmask) -> + compute_next_seek(TopicHash, HashFilter, HashBitmask, [8, 8, 16, 12]). + +next_seek_test_() -> + [ + ?_assertMatch( + 16#FD_11_0678_000, + compute_test_next_seek( + 16#FD_11_0108_121, + 16#FD_00_0678_000, + 16#FF_00_FFFF_000 + ) + ), + ?_assertMatch( + 16#FD_12_0678_000, + compute_test_next_seek( + 16#FD_11_0679_919, + 16#FD_00_0678_000, + 16#FF_00_FFFF_000 + ) + ), + ?_assertMatch( + none, + compute_test_next_seek( + 16#FD_FF_0679_001, + 16#FD_00_0678_000, + 16#FF_00_FFFF_000 + ) + ), + ?_assertMatch( + none, + compute_test_next_seek( + 16#FE_11_0179_017, + 16#FD_00_0678_000, + 16#FF_00_FFFF_000 + ) + ) + ]. + +-endif.