%%-------------------------------------------------------------------- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_ds_lts). %% API: -export([ trie_create/1, trie_create/0, destroy/1, trie_dump/2, trie_restore/2, trie_update/2, trie_copy_learned_paths/2, topic_key/3, match_topics/2, lookup_topic_key/2, reverse_lookup/2, info/2, info/1, compress_topic/3, decompress_topic/2 ]). %% Debug: -export([trie_next/3, trie_insert/3, dump_to_dot/2]). -export_type([ options/0, level/0, static_key/0, trie/0, msg_storage_key/0, learned_structure/0 ]). -include_lib("stdlib/include/ms_transform.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -elvis([{elvis_style, variable_naming_convention, disable}]). -elvis([{elvis_style, dont_repeat_yourself, disable}]). -endif. %%================================================================================ %% Type declarations %%================================================================================ %% End Of Topic -define(EOT, []). -define(PLUS, '+'). -type level() :: binary() | ''. -type edge() :: level() | ?EOT | ?PLUS. %% Fixed size binary or integer, depending on the options: -type static_key() :: non_neg_integer() | binary(). %% Trie root: -define(PREFIX, prefix). %% Special prefix root for reverse lookups: -define(rlookup, rlookup). -define(rlookup(STATIC), {?rlookup, STATIC}). -type state() :: static_key() | ?PREFIX. -type varying() :: [level() | ?PLUS]. -type msg_storage_key() :: {static_key(), varying()}. -type threshold_fun() :: fun((non_neg_integer()) -> non_neg_integer()). -type persist_callback() :: fun((_Key, _Val) -> ok). -type learned_structure() :: [level() | ?PLUS, ...]. -type options() :: #{ persist_callback => persist_callback(), %% If set, static key is an integer that fits in a given nubmer of bits: static_key_bits => pos_integer(), %% If set, static key is a _binary_ of a given length: static_key_bytes => pos_integer(), reverse_lookups => boolean() }. -type dump() :: [{_Key, _Val}]. -record(trie, { persist :: persist_callback(), is_binary_key :: boolean(), static_key_size :: pos_integer(), trie :: ets:tid(), stats :: ets:tid(), rlookups = false :: boolean() }). -opaque trie() :: #trie{}. -record(trans, {key, next}). -type trans() :: #trans{ key :: {state(), edge()}, next :: state() } | #trans{ key :: {?rlookup, static_key()}, next :: [level() | ?PLUS] }. %%================================================================================ %% API functions %%================================================================================ %% @doc Create an empty trie -spec trie_create(options()) -> trie(). trie_create(UserOpts) -> Persist = maps:get( persist_callback, UserOpts, fun(_, _) -> ok end ), Rlookups = maps:get(reverse_lookups, UserOpts, false), IsBinaryKey = case UserOpts of #{static_key_bits := StaticKeySize} -> false; #{static_key_bytes := StaticKeySize} -> true; _ -> StaticKeySize = 16, true end, Trie = ets:new(trie, [{keypos, #trans.key}, set, public]), Stats = ets:new(stats, [{keypos, 1}, set, public]), #trie{ persist = Persist, is_binary_key = IsBinaryKey, static_key_size = StaticKeySize, trie = Trie, stats = Stats, rlookups = Rlookups }. -spec trie_create() -> trie(). trie_create() -> trie_create(#{}). -spec destroy(trie()) -> ok. destroy(#trie{trie = Trie, stats = Stats}) -> catch ets:delete(Trie), catch ets:delete(Stats), ok. %% @doc Restore trie from a dump -spec trie_restore(options(), dump()) -> trie(). trie_restore(Options, Dump) -> trie_update(trie_create(Options), Dump). %% @doc Update a trie with a dump of operations (used for replication) -spec trie_update(trie(), dump()) -> trie(). trie_update(Trie, Dump) -> lists:foreach( fun({{StateFrom, Token}, StateTo}) -> trie_insert(Trie, StateFrom, Token, StateTo) end, Dump ), Trie. -spec trie_dump(trie(), _Filter :: all | wildcard) -> dump(). trie_dump(Trie, Filter) -> case Filter of all -> Fun = fun(_) -> true end; wildcard -> Fun = fun(L) -> lists:member(?PLUS, L) end end, Paths = lists:filter( fun(Path) -> Fun(tokens_of_path(Path)) end, paths(Trie) ), RlookupIdx = lists:filter( fun({_, Tokens}) -> Fun(Tokens) end, all_emanating(Trie, ?rlookup) ), lists:flatten([Paths, RlookupIdx]). -spec trie_copy_learned_paths(trie(), trie()) -> trie(). trie_copy_learned_paths(OldTrie, NewTrie) -> lists:foreach( fun({{StateFrom, Token}, StateTo}) -> trie_insert(NewTrie, StateFrom, Token, StateTo) end, trie_dump(OldTrie, wildcard) ), NewTrie. %% @doc Lookup the topic key. Create a new one, if not found. -spec topic_key(trie(), threshold_fun(), [level()]) -> msg_storage_key(). topic_key(Trie, ThresholdFun, Tokens) -> do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, [], []). %% @doc Return an exisiting topic key if it exists. -spec lookup_topic_key(trie(), [level()]) -> {ok, msg_storage_key()} | undefined. lookup_topic_key(Trie, Tokens) -> do_lookup_topic_key(Trie, ?PREFIX, Tokens, []). %% @doc Return list of keys of topics that match a given topic filter -spec match_topics(trie(), [level() | '+' | '#']) -> [msg_storage_key()]. match_topics(Trie, TopicFilter) -> do_match_topics(Trie, ?PREFIX, [], TopicFilter). %% @doc Dump trie to graphviz format for debugging -spec dump_to_dot(trie(), file:filename()) -> ok. dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) -> L = ets:tab2list(Trie), {Nodes0, Edges} = lists:foldl( fun(#trans{key = {From, Label}, next = To}, {AccN, AccEdge}) -> Edge = {From, To, Label}, {[From, To] ++ AccN, [Edge | AccEdge]} end, {[], []}, L ), Nodes = lists:map( fun(Node) -> case ets:lookup(Stats, Node) of [{_, NChildren}] -> ok; [] -> NChildren = 0 end, {Node, NChildren} end, lists:usort(Nodes0) ), {ok, FD} = file:open(Filename, [write]), Print = fun (?PREFIX) -> "prefix"; (Bin) when is_binary(Bin) -> Bin; (NodeId) when is_integer(NodeId) -> integer_to_binary(NodeId, 16) end, io:format(FD, "digraph {~n", []), lists:foreach( fun({Node, NChildren}) -> Id = Print(Node), io:format(FD, " \"~s\" [label=\"~s : ~p\"];~n", [Id, Id, NChildren]) end, Nodes ), lists:foreach( fun({From, To, Label}) -> io:format(FD, " \"~s\" -> \"~s\" [label=\"~s\"];~n", [Print(From), Print(To), Label]) end, Edges ), io:format(FD, "}~n", []), file:close(FD). -spec reverse_lookup(trie(), static_key()) -> {ok, learned_structure()} | undefined. reverse_lookup(#trie{rlookups = false}, _) -> error({badarg, reverse_lookups_disabled}); reverse_lookup(#trie{trie = Trie}, StaticKey) -> case ets:lookup(Trie, ?rlookup(StaticKey)) of [#trans{next = Next}] -> {ok, Next}; [] -> undefined end. %% @doc Get information about the trie. %% %% Note: `reverse_lookups' must be enabled to get the number of %% topics. -spec info(trie(), size | topics) -> _. info(#trie{rlookups = true, stats = Stats}, topics) -> case ets:lookup(Stats, ?rlookup) of [{_, N}] -> N; [] -> 0 end; info(#trie{}, topics) -> undefined; info(#trie{trie = T}, size) -> ets:info(T, size). %% @doc Return size of the trie -spec info(trie()) -> proplists:proplist(). info(Trie) -> [ {size, info(Trie, size)}, {topics, info(Trie, topics)} ]. %%%%%%%% Topic compression %%%%%%%%%% %% @doc Given topic structure for the static LTS index (as returned by %% `reverse_lookup'), compress a topic filter to exclude static %% levels: -spec compress_topic(static_key(), learned_structure(), emqx_ds:topic_filter()) -> [emqx_ds_lts:level() | '+']. compress_topic(StaticKey, TopicStructure, TopicFilter) -> compress_topic(StaticKey, TopicStructure, TopicFilter, []). %% @doc Given topic structure and a compressed topic filter, return %% the original* topic filter. %% %% * '#' will be replaced with '+'s -spec decompress_topic(learned_structure(), [level() | '+']) -> emqx_ds:topic_filter(). decompress_topic(TopicStructure, Topic) -> decompress_topic(TopicStructure, Topic, []). %%================================================================================ %% Internal exports %%================================================================================ -spec trie_next(trie(), state(), level() | ?EOT) -> {Wildcard, state()} | undefined when Wildcard :: boolean(). trie_next(#trie{trie = Trie}, State, ?EOT) -> case ets:lookup(Trie, {State, ?EOT}) of [#trans{next = Next}] -> {false, Next}; [] -> undefined end; trie_next(#trie{trie = Trie}, State, Token) -> %% NOTE: it's crucial to return the original (non-wildcard) index %% for the topic, if found. Otherwise messages from the same topic %% will end up in different streams, once the wildcard is learned, %% and their replay order will become undefined: case ets:lookup(Trie, {State, Token}) of [#trans{next = Next}] -> {false, Next}; [] -> case ets:lookup(Trie, {State, ?PLUS}) of [#trans{next = Next}] -> {true, Next}; [] -> undefined end end. -spec trie_insert(trie(), state(), edge()) -> {Updated, state()} when NChildren :: non_neg_integer(), Updated :: false | NChildren. trie_insert(Trie, State, Token) -> trie_insert(Trie, State, Token, get_id_for_key(Trie, State, Token)). %%================================================================================ %% Internal functions %%================================================================================ -spec trie_insert (trie(), state(), edge(), state()) -> {Updated, state()} when NChildren :: non_neg_integer(), Updated :: false | NChildren; (trie(), ?rlookup, static_key(), [level() | '+']) -> {false | non_neg_integer(), state()}. trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token, NewState) -> Key = {State, Token}, Rec = #trans{ key = Key, next = NewState }, case ets_insert_new(Trie, Rec) of true -> ok = Persist(Key, NewState), Inc = case Token of ?EOT -> 0; ?PLUS -> 0; _ -> 1 end, NChildren = ets:update_counter(Stats, State, {2, Inc}, {State, 0}), {NChildren, NewState}; false -> [#trans{next = NextState}] = ets:lookup(Trie, Key), {false, NextState} end. -spec get_id_for_key(trie(), state(), edge()) -> static_key(). get_id_for_key(#trie{is_binary_key = IsBin, static_key_size = Size}, State, Token) -> %% Requirements for the return value: %% %% It should be globally unique for the `{State, Token}` pair. Other %% than that, there's no requirements. The return value doesn't even %% have to be deterministic, since the states are saved in the trie. %% Yet, it helps a lot if it is, so that applying the same sequence %% of topics to different tries will result in the same trie state. %% %% The generated value becomes the ID of the topic in the durable %% storage. Its size should be relatively small to reduce the %% overhead of storing messages. %% %% If we want to impress computer science crowd, sorry, I mean to %% minimize storage requirements, we can even employ Huffman coding %% based on the frequency of messages. Hash = crypto:hash(sha256, term_to_binary([State | Token])), case IsBin of false -> %% Note: for backward compatibility with bitstream_lts %% layout we allow the key to be an integer. But this also %% changes the semantics of `static_key_size` from number %% of bytes to bits: <> = Hash, Int; true -> element(1, erlang:split_binary(Hash, Size)) end. %% erlfmt-ignore -spec do_match_topics(trie(), state(), [level() | '+'], [level() | '+' | '#']) -> list(). do_match_topics(Trie, State, Varying, []) -> case trie_next(Trie, State, ?EOT) of {false, Static} -> [{Static, lists:reverse(Varying)}]; undefined -> [] end; do_match_topics(Trie, State, Varying, ['#']) -> Emanating = emanating(Trie, State, ?PLUS), lists:flatmap( fun ({?EOT, Static}) -> [{Static, lists:reverse(Varying)}]; ({?PLUS, NextState}) -> do_match_topics(Trie, NextState, [?PLUS | Varying], ['#']); ({_, NextState}) -> do_match_topics(Trie, NextState, Varying, ['#']) end, Emanating ); do_match_topics(Trie, State, Varying, [Level | Rest]) -> Emanating = emanating(Trie, State, Level), lists:flatmap( fun ({?EOT, _NextState}) -> []; ({?PLUS, NextState}) -> do_match_topics(Trie, NextState, [Level | Varying], Rest); ({_, NextState}) -> do_match_topics(Trie, NextState, Varying, Rest) end, Emanating ). -spec do_lookup_topic_key(trie(), state(), [level()], [level()]) -> {ok, msg_storage_key()} | undefined. do_lookup_topic_key(Trie, State, [], Varying) -> case trie_next(Trie, State, ?EOT) of {false, Static} -> {ok, {Static, lists:reverse(Varying)}}; undefined -> undefined end; do_lookup_topic_key(Trie, State, [Tok | Rest], Varying) -> case trie_next(Trie, State, Tok) of {true, NextState} -> do_lookup_topic_key(Trie, NextState, Rest, [Tok | Varying]); {false, NextState} -> do_lookup_topic_key(Trie, NextState, Rest, Varying); undefined -> undefined end. do_topic_key(Trie, _, _, State, [], Tokens, Varying) -> %% We reached the end of topic. Assert: Trie node that corresponds %% to EOT cannot be a wildcard. {Updated, false, Static} = trie_next_(Trie, State, ?EOT), _ = case Trie#trie.rlookups andalso Updated of false -> ok; _ -> trie_insert(Trie, rlookup, Static, lists:reverse(Tokens)) end, {Static, lists:reverse(Varying)}; do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Tokens, Varying0) -> % TODO: it's not necessary to call it every time. Threshold = ThresholdFun(Depth), {NChildren, IsWildcard, NextState} = trie_next_(Trie, State, Tok), Varying = case IsWildcard of _ when is_integer(NChildren), NChildren >= Threshold -> %% Number of children for the trie node reached the %% threshold, we need to insert wildcard here. {_, _WildcardState} = trie_insert(Trie, State, ?PLUS), Varying0; false -> Varying0; true -> %% This topic level is marked as wildcard in the trie, %% we need to add it to the varying part of the key: [Tok | Varying0] end, TokOrWildcard = case IsWildcard of true -> ?PLUS; false -> Tok end, do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, [TokOrWildcard | Tokens], Varying). %% @doc Has side effects! Inserts missing elements -spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} when New :: false | non_neg_integer(), Wildcard :: boolean(). trie_next_(Trie, State, Token) -> case trie_next(Trie, State, Token) of {Wildcard, NextState} -> {false, Wildcard, NextState}; undefined -> {Updated, NextState} = trie_insert(Trie, State, Token), {Updated, false, NextState} end. %% @doc Return all edges emanating from a node: %% erlfmt-ignore -spec emanating(trie(), state(), edge()) -> [{edge(), state()}]. emanating(#trie{trie = Tab}, State, ?PLUS) -> ets:select( Tab, ets:fun2ms( fun(#trans{key = {S, Edge}, next = Next}) when S == State -> {Edge, Next} end ) ); emanating(#trie{trie = Tab}, State, ?EOT) -> case ets:lookup(Tab, {State, ?EOT}) of [#trans{next = Next}] -> [{?EOT, Next}]; [] -> [] end; emanating(#trie{trie = Tab}, State, Token) when is_binary(Token); Token =:= '' -> [ {Edge, Next} || #trans{key = {_, Edge}, next = Next} <- ets:lookup(Tab, {State, ?PLUS}) ++ ets:lookup(Tab, {State, Token}) ]. all_emanating(#trie{trie = Tab}, State) -> ets:select( Tab, ets:fun2ms(fun(#trans{key = {S, Edge}, next = Next}) when S == State -> {{S, Edge}, Next} end) ). paths(#trie{} = T) -> Roots = all_emanating(T, ?PREFIX), lists:flatmap( fun({Segment, Next}) -> follow_path(T, Next, [{Segment, Next}]) end, Roots ). follow_path(#trie{} = T, State, Path) -> lists:flatmap( fun ({{_State, ?EOT}, _Next} = Segment) -> [lists:reverse([Segment | Path])]; ({_Edge, Next} = Segment) -> follow_path(T, Next, [Segment | Path]) end, all_emanating(T, State) ). tokens_of_path([{{_State, Token}, _Next} | Rest]) -> [Token | tokens_of_path(Rest)]; tokens_of_path([]) -> []. %% Wrapper for type checking only: -compile({inline, ets_insert_new/2}). -spec ets_insert_new(ets:tid(), trans()) -> boolean(). ets_insert_new(Tid, Trans) -> ets:insert_new(Tid, Trans). compress_topic(_StaticKey, [], [], Acc) -> lists:reverse(Acc); compress_topic(StaticKey, TStructL0, ['#'], Acc) -> case TStructL0 of [] -> lists:reverse(Acc); ['+' | TStructL] -> compress_topic(StaticKey, TStructL, ['#'], ['+' | Acc]); [_ | TStructL] -> compress_topic(StaticKey, TStructL, ['#'], Acc) end; compress_topic(StaticKey, ['+' | TStructL], [Level | TopicL], Acc) -> compress_topic(StaticKey, TStructL, TopicL, [Level | Acc]); compress_topic(StaticKey, [Struct | TStructL], [Level | TopicL], Acc) when Level =:= '+'; Level =:= Struct -> compress_topic(StaticKey, TStructL, TopicL, Acc); compress_topic(StaticKey, TStructL, TopicL, _Acc) -> %% Topic is mismatched with the structure. This should never %% happen. LTS got corrupted? Err = #{ msg => 'Topic structure mismatch', static_key => StaticKey, input => TopicL, structure => TStructL }, throw({unrecoverable, Err}). decompress_topic(['+' | TStructL], [Level | TopicL], Acc) -> decompress_topic(TStructL, TopicL, [Level | Acc]); decompress_topic([StaticLevel | TStructL], TopicL, Acc) -> decompress_topic(TStructL, TopicL, [StaticLevel | Acc]); decompress_topic([], [], Acc) -> lists:reverse(Acc). %%================================================================================ %% Tests %%================================================================================ -ifdef(TEST). trie_basic_test() -> T = trie_create(), ?assertMatch(undefined, trie_next(T, ?PREFIX, <<"foo">>)), {1, S1} = trie_insert(T, ?PREFIX, <<"foo">>), ?assertMatch({false, S1}, trie_insert(T, ?PREFIX, <<"foo">>)), ?assertMatch({false, S1}, trie_next(T, ?PREFIX, <<"foo">>)), ?assertMatch(undefined, trie_next(T, ?PREFIX, <<"bar">>)), {2, S2} = trie_insert(T, ?PREFIX, <<"bar">>), ?assertMatch({false, S2}, trie_insert(T, ?PREFIX, <<"bar">>)), ?assertMatch(undefined, trie_next(T, S1, <<"foo">>)), ?assertMatch(undefined, trie_next(T, S1, <<"bar">>)), {1, S11} = trie_insert(T, S1, <<"foo">>), {2, S12} = trie_insert(T, S1, <<"bar">>), ?assertMatch({false, S11}, trie_next(T, S1, <<"foo">>)), ?assertMatch({false, S12}, trie_next(T, S1, <<"bar">>)), ?assertMatch(undefined, trie_next(T, S11, <<"bar">>)), {1, S111} = trie_insert(T, S11, <<"bar">>), ?assertMatch({false, S111}, trie_next(T, S11, <<"bar">>)). lookup_key_test() -> T = trie_create(), {_, S1} = trie_insert(T, ?PREFIX, <<"foo">>), {_, S11} = trie_insert(T, S1, <<"foo">>), %% Topics don't match until we insert ?EOT: ?assertMatch( undefined, lookup_topic_key(T, [<<"foo">>]) ), ?assertMatch( undefined, lookup_topic_key(T, [<<"foo">>, <<"foo">>]) ), {_, S10} = trie_insert(T, S1, ?EOT), {_, S110} = trie_insert(T, S11, ?EOT), ?assertMatch( {ok, {S10, []}}, lookup_topic_key(T, [<<"foo">>]) ), ?assertMatch( {ok, {S110, []}}, lookup_topic_key(T, [<<"foo">>, <<"foo">>]) ), %% The rest of keys still don't match: ?assertMatch( undefined, lookup_topic_key(T, [<<"bar">>]) ), ?assertMatch( undefined, lookup_topic_key(T, [<<"bar">>, <<"foo">>]) ). wildcard_lookup_test() -> T = trie_create(), {1, S1} = trie_insert(T, ?PREFIX, <<"foo">>), %% Plus doesn't increase the number of children {0, S11} = trie_insert(T, S1, ?PLUS), {1, S111} = trie_insert(T, S11, <<"foo">>), %% ?EOT doesn't increase the number of children {0, S1110} = trie_insert(T, S111, ?EOT), ?assertMatch( {ok, {S1110, [<<"bar">>]}}, lookup_topic_key(T, [<<"foo">>, <<"bar">>, <<"foo">>]) ), ?assertMatch( {ok, {S1110, [<<"quux">>]}}, lookup_topic_key(T, [<<"foo">>, <<"quux">>, <<"foo">>]) ), ?assertMatch( undefined, lookup_topic_key(T, [<<"foo">>]) ), ?assertMatch( undefined, lookup_topic_key(T, [<<"foo">>, <<"bar">>]) ), ?assertMatch( undefined, lookup_topic_key(T, [<<"foo">>, <<"bar">>, <<"bar">>]) ), ?assertMatch( undefined, lookup_topic_key(T, [<<"bar">>, <<"foo">>, <<"foo">>]) ), {_, S10} = trie_insert(T, S1, ?EOT), ?assertMatch( {ok, {S10, []}}, lookup_topic_key(T, [<<"foo">>]) ). %% erlfmt-ignore topic_key_test() -> T = trie_create(), try Threshold = 4, ThresholdFun = fun(0) -> 1000; (_) -> Threshold end, %% Test that bottom layer threshold is high: lists:foreach( fun(I) -> {_, []} = test_key(T, ThresholdFun, [I, 99999, 999999, 99999]) end, lists:seq(1, 10)), %% Test adding children on the 2nd level: lists:foreach( fun(I) -> case test_key(T, ThresholdFun, [1, I, 1]) of {_, []} -> ?assert(I < Threshold, {I, '<', Threshold}), ok; {_, [Var]} -> ?assert(I >= Threshold, {I, '>=', Threshold}), ?assertEqual(Var, integer_to_binary(I)) end end, lists:seq(1, 100)), %% This doesn't affect 2nd level with a different prefix: ?assertMatch({_, []}, test_key(T, ThresholdFun, [2, 1, 1])), ?assertMatch({_, []}, test_key(T, ThresholdFun, [2, 10, 1])), %% This didn't retroactively change the indexes that were %% created prior to reaching the threshold: ?assertMatch({_, []}, test_key(T, ThresholdFun, [1, 1, 1])), ?assertMatch({_, []}, test_key(T, ThresholdFun, [1, 2, 1])), %% Now create another level of +: lists:foreach( fun(I) -> case test_key(T, ThresholdFun, [1, 42, 1, I, 42]) of {_, [<<"42">>]} when I =< Threshold -> %% TODO: off by 1 error ok; {_, [<<"42">>, Var]} -> ?assertEqual(Var, integer_to_binary(I)); Ret -> error({Ret, I}) end end, lists:seq(1, 100)) after dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot")) end. %% erlfmt-ignore topic_match_test() -> T = trie_create(), try Threshold = 2, ThresholdFun = fun(0) -> 1000; (_) -> Threshold end, {S1, []} = test_key(T, ThresholdFun, [1]), {S11, []} = test_key(T, ThresholdFun, [1, 1]), {S12, []} = test_key(T, ThresholdFun, [1, 2]), {S111, []} = test_key(T, ThresholdFun, [1, 1, 1]), {S11e, []} = test_key(T, ThresholdFun, [1, 1, '']), %% Match concrete topics: assert_match_topics(T, [1], [{S1, []}]), assert_match_topics(T, [1, 1], [{S11, []}]), assert_match_topics(T, [1, 1, 1], [{S111, []}]), %% Match topics with +: assert_match_topics(T, [1, '+'], [{S11, []}, {S12, []}]), assert_match_topics(T, [1, '+', 1], [{S111, []}]), assert_match_topics(T, [1, '+', ''], [{S11e, []}]), %% Match topics with #: assert_match_topics(T, [1, '#'], [{S1, []}, {S11, []}, {S12, []}, {S111, []}, {S11e, []}]), assert_match_topics(T, [1, 1, '#'], [{S11, []}, {S111, []}, {S11e, []}]), %% Now add learned wildcards: {S21, []} = test_key(T, ThresholdFun, [2, 1]), {S22, []} = test_key(T, ThresholdFun, [2, 2]), {S2_, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3]), {S2_11, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 1]), {S2_12, [<<"4">>]} = test_key(T, ThresholdFun, [2, 4, 1, 2]), {S2_1_, [<<"3">>, <<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 3]), %% %% Check matching: assert_match_topics(T, [2, 2], [{S22, []}, {S2_, [<<"2">>]}]), assert_match_topics(T, [2, '+'], [{S22, []}, {S21, []}, {S2_, ['+']}]), assert_match_topics(T, [2, '#'], [{S21, []}, {S22, []}, {S2_, ['+']}, {S2_11, ['+']}, {S2_12, ['+']}, {S2_1_, ['+', '+']}]), ok after dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot")) end. %% erlfmt-ignore rlookup_test() -> T = trie_create(#{reverse_lookups => true}), Threshold = 2, ThresholdFun = fun(0) -> 1000; (_) -> Threshold end, {S1, []} = test_key(T, ThresholdFun, [1]), {S11, []} = test_key(T, ThresholdFun, [1, 1]), {S12, []} = test_key(T, ThresholdFun, [1, 2]), {S111, []} = test_key(T, ThresholdFun, [1, 1, 1]), {S11e, []} = test_key(T, ThresholdFun, [1, 1, '']), %% Now add learned wildcards: {S21, []} = test_key(T, ThresholdFun, [2, 1]), {S22, []} = test_key(T, ThresholdFun, [2, 2]), {S2_, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3]), {S2_11, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 1]), {S2_12, [<<"4">>]} = test_key(T, ThresholdFun, [2, 4, 1, 2]), {S2_1_, [<<"3">>, <<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 3]), %% Check reverse matching: ?assertEqual({ok, [<<"1">>]}, reverse_lookup(T, S1)), ?assertEqual({ok, [<<"1">>, <<"1">>]}, reverse_lookup(T, S11)), ?assertEqual({ok, [<<"1">>, <<"2">>]}, reverse_lookup(T, S12)), ?assertEqual({ok, [<<"1">>, <<"1">>, <<"1">>]}, reverse_lookup(T, S111)), ?assertEqual({ok, [<<"1">>, <<"1">>, '']}, reverse_lookup(T, S11e)), ?assertEqual({ok, [<<"2">>, <<"1">>]}, reverse_lookup(T, S21)), ?assertEqual({ok, [<<"2">>, <<"2">>]}, reverse_lookup(T, S22)), ?assertEqual({ok, [<<"2">>, '+']}, reverse_lookup(T, S2_)), ?assertEqual({ok, [<<"2">>, '+', <<"1">>, <<"1">>]}, reverse_lookup(T, S2_11)), ?assertEqual({ok, [<<"2">>, '+', <<"1">>, <<"2">>]}, reverse_lookup(T, S2_12)), ?assertEqual({ok, [<<"2">>, '+', <<"1">>, '+']}, reverse_lookup(T, S2_1_)), %% Dump and restore trie to make sure rlookup still works: T1 = trie_restore(#{reverse_lookups => true}, trie_dump(T, all)), destroy(T), ?assertEqual({ok, [<<"2">>, <<"1">>]}, reverse_lookup(T1, S21)), ?assertEqual({ok, [<<"2">>, '+', <<"1">>, '+']}, reverse_lookup(T1, S2_1_)). n_topics_test() -> Threshold = 3, ThresholdFun = fun (0) -> 1000; (_) -> Threshold end, T = trie_create(#{reverse_lookups => true}), ?assertEqual(0, info(T, topics)), {S11, []} = test_key(T, ThresholdFun, [1, 1]), {S11, []} = test_key(T, ThresholdFun, [1, 1]), ?assertEqual(1, info(T, topics)), {S12, []} = test_key(T, ThresholdFun, [1, 2]), {S12, []} = test_key(T, ThresholdFun, [1, 2]), ?assertEqual(2, info(T, topics)), {_S13, []} = test_key(T, ThresholdFun, [1, 3]), ?assertEqual(3, info(T, topics)), {S1_, [_]} = test_key(T, ThresholdFun, [1, 4]), ?assertEqual(4, info(T, topics)), {S1_, [_]} = test_key(T, ThresholdFun, [1, 5]), {S1_, [_]} = test_key(T, ThresholdFun, [1, 6]), {S1_, [_]} = test_key(T, ThresholdFun, [1, 7]), ?assertEqual(4, info(T, topics)), ?assertMatch( [{size, N}, {topics, 4}] when is_integer(N), info(T) ). -define(keys_history, topic_key_history). %% erlfmt-ignore assert_match_topics(Trie, Filter0, Expected) -> Filter = lists:map(fun(I) when is_integer(I) -> integer_to_binary(I); (I) -> I end, Filter0), Matched = match_topics(Trie, Filter), ?assertMatch( #{missing := [], unexpected := []} , #{ missing => Expected -- Matched , unexpected => Matched -- Expected } , Filter ). %% erlfmt-ignore test_key(Trie, Threshold, Topic0) -> Topic = lists:map(fun('') -> ''; (I) -> integer_to_binary(I) end, Topic0), Ret = topic_key(Trie, Threshold, Topic), %% Test idempotency: Ret1 = topic_key(Trie, Threshold, Topic), ?assertEqual(Ret, Ret1, Topic), %% Add new key to the history: case get(?keys_history) of undefined -> OldHistory = #{}; OldHistory -> ok end, %% Test that the generated keys are always unique for the topic: History = maps:update_with( Ret, fun(Old) -> case Old =:= Topic of true -> Old; false -> error(#{ '$msg' => "Duplicate key!" , key => Ret , old_topic => Old , new_topic => Topic }) end end, Topic, OldHistory), put(?keys_history, History), {ok, Ret} = lookup_topic_key(Trie, Topic), Ret. paths_test() -> T = trie_create(), Threshold = 4, ThresholdFun = fun (0) -> 1000; (_) -> Threshold end, PathsToInsert = [ [''], [1], [2, 2], [3, 3, 3], [2, 3, 4] ] ++ [[4, I, 4] || I <- lists:seq(1, Threshold + 2)] ++ [['', I, ''] || I <- lists:seq(1, Threshold + 2)], lists:foreach( fun(PathSpec) -> test_key(T, ThresholdFun, PathSpec) end, PathsToInsert ), %% Test that the paths we've inserted are produced in the output Paths = paths(T), FormattedPaths = lists:map(fun format_path/1, Paths), ExpectedWildcardPaths = [ [4, '+', 4], ['', '+', ''] ], ExpectedPaths = [ [''], [1], [2, 2], [3, 3, 3] ] ++ [[4, I, 4] || I <- lists:seq(1, Threshold)] ++ [['', I, ''] || I <- lists:seq(1, Threshold)] ++ ExpectedWildcardPaths, FormatPathSpec = fun(PathSpec) -> lists:map( fun (I) when is_integer(I) -> integer_to_binary(I); (A) -> A end, PathSpec ) ++ [?EOT] end, lists:foreach( fun(PathSpec) -> Path = FormatPathSpec(PathSpec), ?assert( lists:member(Path, FormattedPaths), #{ paths => FormattedPaths, expected_path => Path } ) end, ExpectedPaths ), %% Test filter function for paths containing wildcards WildcardPaths = lists:filter( fun(Path) -> lists:member(?PLUS, tokens_of_path(Path)) end, Paths ), FormattedWildcardPaths = lists:map(fun format_path/1, WildcardPaths), ?assertEqual( sets:from_list(lists:map(FormatPathSpec, ExpectedWildcardPaths), [{version, 2}]), sets:from_list(FormattedWildcardPaths, [{version, 2}]), #{ expected => ExpectedWildcardPaths, wildcards => FormattedWildcardPaths } ), %% Test that we're able to reconstruct the same trie from the paths T2 = trie_create(), [ trie_insert(T2, State, Edge, Next) || Path <- Paths, {{State, Edge}, Next} <- Path ], #trie{trie = Tab1} = T, #trie{trie = Tab2} = T2, Dump1 = sets:from_list(ets:tab2list(Tab1), [{version, 2}]), Dump2 = sets:from_list(ets:tab2list(Tab2), [{version, 2}]), ?assertEqual(Dump1, Dump2). format_path([{{_State, Edge}, _Next} | Rest]) -> [Edge | format_path(Rest)]; format_path([]) -> []. compress_topic_test() -> %% Structure without wildcards: ?assertEqual([], compress_topic(42, [], [])), ?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>])), ?assertEqual([], compress_topic(42, [<<"foo">>, ''], [<<"foo">>, ''])), ?assertEqual([], compress_topic(42, [<<"foo">>, ''], [<<"foo">>, '+'])), ?assertEqual([], compress_topic(42, [<<"foo">>, ''], ['+', '+'])), ?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], ['#'])), ?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], [<<"foo">>, <<"bar">>, '#'])), ?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], ['+', '#'])), ?assertEqual( [], compress_topic(42, [<<"foo">>, <<"bar">>, ''], [<<"foo">>, <<"bar">>, '', '#']) ), %% With wildcards: ?assertEqual( [<<"1">>], compress_topic(42, [<<"foo">>, '+', <<"bar">>], [<<"foo">>, <<"1">>, <<"bar">>]) ), ?assertEqual( [<<"1">>, <<"2">>], compress_topic( 42, [<<"foo">>, '+', <<"bar">>, '+', <<"baz">>], [<<"foo">>, <<"1">>, <<"bar">>, <<"2">>, <<"baz">>] ) ), ?assertEqual( ['+', <<"2">>], compress_topic( 42, [<<"foo">>, '+', <<"bar">>, '+', <<"baz">>], [<<"foo">>, '+', <<"bar">>, <<"2">>, <<"baz">>] ) ), ?assertEqual( ['+', '+'], compress_topic( 42, [<<"foo">>, '+', <<"bar">>, '+', <<"baz">>], [<<"foo">>, '+', <<"bar">>, '+', <<"baz">>] ) ), ?assertEqual( ['+', '+'], compress_topic( 42, [<<"foo">>, '+', <<"bar">>, '+', <<"baz">>], ['#'] ) ), ?assertEqual( ['+', '+'], compress_topic( 42, [<<"foo">>, '+', <<"bar">>, '+', <<"baz">>], [<<"foo">>, '+', '+', '#'] ) ), %% Mismatch: ?assertException(_, {unrecoverable, _}, compress_topic(42, [<<"foo">>], [<<"bar">>])), ?assertException(_, {unrecoverable, _}, compress_topic(42, [], [<<"bar">>])), ?assertException(_, {unrecoverable, _}, compress_topic(42, [<<"foo">>], [])), ?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], ['', '', ''])), ?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], [<<"foo">>, '#'])), ?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], ['+', '+', '+', '#'])), ?assertException(_, {unrecoverable, _}, compress_topic(42, ['+'], [<<"bar">>, '+'])), ?assertException( _, {unrecoverable, _}, compress_topic(42, [<<"foo">>, '+'], [<<"bar">>, <<"baz">>]) ). decompress_topic_test() -> %% Structure without wildcards: ?assertEqual([], decompress_topic([], [])), ?assertEqual( [<<"foo">>, '', <<"bar">>], decompress_topic([<<"foo">>, '', <<"bar">>], []) ), %% With wildcards: ?assertEqual( [<<"foo">>, '', <<"bar">>, <<"baz">>], decompress_topic([<<"foo">>, '+', <<"bar">>, '+'], ['', <<"baz">>]) ), ?assertEqual( [<<"foo">>, '+', <<"bar">>, '+', ''], decompress_topic([<<"foo">>, '+', <<"bar">>, '+', ''], ['+', '+']) ). -endif.