From f84fb3469239cb95a2fbc7236de4023ca2df8f5f Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 1 Jul 2024 00:54:49 +0200 Subject: [PATCH] feat(ds_lts): New APIs: info, reverse lookups and topic compression --- apps/emqx_durable_storage/src/emqx_ds_lts.erl | 443 +++++++++++++++--- 1 file changed, 385 insertions(+), 58 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index be13591e6..c0625593a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -26,7 +26,13 @@ trie_copy_learned_paths/2, topic_key/3, match_topics/2, - lookup_topic_key/2 + lookup_topic_key/2, + reverse_lookup/2, + info/2, + info/1, + + compress_topic/3, + decompress_topic/2 ]). %% Debug: @@ -34,18 +40,21 @@ -export_type([ options/0, + level/0, static_key/0, trie/0, - msg_storage_key/0 + msg_storage_key/0, + learned_structure/0 ]). -include_lib("stdlib/include/ms_transform.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). --endif. -elvis([{elvis_style, variable_naming_convention, disable}]). +-elvis([{elvis_style, dont_repeat_yourself, disable}]). +-endif. %%================================================================================ %% Type declarations @@ -55,15 +64,22 @@ -define(EOT, []). -define(PLUS, '+'). --type edge() :: binary() | ?EOT | ?PLUS. +-type level() :: binary() | ''. -%% Fixed size binary --type static_key() :: non_neg_integer(). +-type edge() :: level() | ?EOT | ?PLUS. +%% Fixed size binary or integer, depending on the options: +-type static_key() :: non_neg_integer() | binary(). + +%% Trie root: -define(PREFIX, prefix). +%% Special prefix root for reverse lookups: +-define(rlookup, rlookup). +-define(rlookup(STATIC), {?rlookup, STATIC}). + -type state() :: static_key() | ?PREFIX. --type varying() :: [binary() | ?PLUS]. +-type varying() :: [level() | ?PLUS]. -type msg_storage_key() :: {static_key(), varying()}. @@ -71,27 +87,42 @@ -type persist_callback() :: fun((_Key, _Val) -> ok). +-type learned_structure() :: [level() | ?PLUS, ...]. + -type options() :: #{ persist_callback => persist_callback(), - static_key_size => pos_integer() + %% If set, static key is an integer that fits in a given nubmer of bits: + static_key_bits => pos_integer(), + %% If set, static key is a _binary_ of a given length: + static_key_bytes => pos_integer(), + reverse_lookups => boolean() }. -type dump() :: [{_Key, _Val}]. -record(trie, { persist :: persist_callback(), + is_binary_key :: boolean(), static_key_size :: pos_integer(), trie :: ets:tid(), - stats :: ets:tid() + stats :: ets:tid(), + rlookups = false :: boolean() }). -opaque trie() :: #trie{}. --record(trans, { - key :: {state(), edge()}, - next :: state() -}). +-record(trans, {key, next}). + +-type trans() :: + #trans{ + key :: {state(), edge()}, + next :: state() + } + | #trans{ + key :: {?rlookup, static_key()}, + next :: [level() | ?PLUS] + }. %%================================================================================ %% API functions @@ -100,21 +131,31 @@ %% @doc Create an empty trie -spec trie_create(options()) -> trie(). trie_create(UserOpts) -> - Defaults = #{ - persist_callback => fun(_, _) -> ok end, - static_key_size => 8 - }, - #{ - persist_callback := Persist, - static_key_size := StaticKeySize - } = maps:merge(Defaults, UserOpts), + Persist = maps:get( + persist_callback, + UserOpts, + fun(_, _) -> ok end + ), + Rlookups = maps:get(reverse_lookups, UserOpts, false), + IsBinaryKey = + case UserOpts of + #{static_key_bits := StaticKeySize} -> + false; + #{static_key_bytes := StaticKeySize} -> + true; + _ -> + StaticKeySize = 16, + true + end, Trie = ets:new(trie, [{keypos, #trans.key}, set, public]), Stats = ets:new(stats, [{keypos, 1}, set, public]), #trie{ persist = Persist, + is_binary_key = IsBinaryKey, static_key_size = StaticKeySize, trie = Trie, - stats = Stats + stats = Stats, + rlookups = Rlookups }. -spec trie_create() -> trie(). @@ -149,9 +190,21 @@ trie_dump(Trie, Filter) -> all -> Fun = fun(_) -> true end; wildcard -> - Fun = fun contains_wildcard/1 + Fun = fun(L) -> lists:member(?PLUS, L) end end, - lists:append([P || P <- paths(Trie), Fun(P)]). + Paths = lists:filter( + fun(Path) -> + Fun(tokens_of_path(Path)) + end, + paths(Trie) + ), + RlookupIdx = lists:filter( + fun({_, Tokens}) -> + Fun(Tokens) + end, + all_emanating(Trie, ?rlookup) + ), + lists:flatten([Paths, RlookupIdx]). -spec trie_copy_learned_paths(trie(), trie()) -> trie(). trie_copy_learned_paths(OldTrie, NewTrie) -> @@ -164,17 +217,17 @@ trie_copy_learned_paths(OldTrie, NewTrie) -> NewTrie. %% @doc Lookup the topic key. Create a new one, if not found. --spec topic_key(trie(), threshold_fun(), [binary() | '']) -> msg_storage_key(). +-spec topic_key(trie(), threshold_fun(), [level()]) -> msg_storage_key(). topic_key(Trie, ThresholdFun, Tokens) -> - do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, []). + do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, [], []). %% @doc Return an exisiting topic key if it exists. --spec lookup_topic_key(trie(), [binary()]) -> {ok, msg_storage_key()} | undefined. +-spec lookup_topic_key(trie(), [level()]) -> {ok, msg_storage_key()} | undefined. lookup_topic_key(Trie, Tokens) -> do_lookup_topic_key(Trie, ?PREFIX, Tokens, []). %% @doc Return list of keys of topics that match a given topic filter --spec match_topics(trie(), [binary() | '+' | '#']) -> +-spec match_topics(trie(), [level() | '+' | '#']) -> [msg_storage_key()]. match_topics(Trie, TopicFilter) -> do_match_topics(Trie, ?PREFIX, [], TopicFilter). @@ -206,7 +259,8 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) -> {ok, FD} = file:open(Filename, [write]), Print = fun (?PREFIX) -> "prefix"; - (NodeId) -> integer_to_binary(NodeId, 16) + (Bin) when is_binary(Bin) -> Bin; + (NodeId) when is_integer(NodeId) -> integer_to_binary(NodeId, 16) end, io:format(FD, "digraph {~n", []), lists:foreach( @@ -225,11 +279,64 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) -> io:format(FD, "}~n", []), file:close(FD). +-spec reverse_lookup(trie(), static_key()) -> {ok, learned_structure()} | undefined. +reverse_lookup(#trie{rlookups = false}, _) -> + error({badarg, reverse_lookups_disabled}); +reverse_lookup(#trie{trie = Trie}, StaticKey) -> + case ets:lookup(Trie, ?rlookup(StaticKey)) of + [#trans{next = Next}] -> + {ok, Next}; + [] -> + undefined + end. + +%% @doc Get information about the trie. +%% +%% Note: `reverse_lookups' must be enabled to get the number of +%% topics. +-spec info(trie(), size | topics) -> _. +info(#trie{rlookups = true, stats = Stats}, topics) -> + case ets:lookup(Stats, ?rlookup) of + [{_, N}] -> N; + [] -> 0 + end; +info(#trie{}, topics) -> + undefined; +info(#trie{trie = T}, size) -> + ets:info(T, size). + +%% @doc Return size of the trie +-spec info(trie()) -> proplists:proplist(). +info(Trie) -> + [ + {size, info(Trie, size)}, + {topics, info(Trie, topics)} + ]. + +%%%%%%%% Topic compression %%%%%%%%%% + +%% @doc Given topic structure for the static LTS index (as returned by +%% `reverse_lookup'), compress a topic filter to exclude static +%% levels: +-spec compress_topic(static_key(), learned_structure(), emqx_ds:topic_filter()) -> + [emqx_ds_lts:level() | '+']. +compress_topic(StaticKey, TopicStructure, TopicFilter) -> + compress_topic(StaticKey, TopicStructure, TopicFilter, []). + +%% @doc Given topic structure and a compressed topic filter, return +%% the original* topic filter. +%% +%% * '#' will be replaced with '+'s +-spec decompress_topic(learned_structure(), [level() | '+']) -> + emqx_ds:topic_filter(). +decompress_topic(TopicStructure, Topic) -> + decompress_topic(TopicStructure, Topic, []). + %%================================================================================ %% Internal exports %%================================================================================ --spec trie_next(trie(), state(), binary() | ?EOT) -> {Wildcard, state()} | undefined when +-spec trie_next(trie(), state(), level() | ?EOT) -> {Wildcard, state()} | undefined when Wildcard :: boolean(). trie_next(#trie{trie = Trie}, State, ?EOT) -> case ets:lookup(Trie, {State, ?EOT}) of @@ -261,16 +368,19 @@ trie_insert(Trie, State, Token) -> %% Internal functions %%================================================================================ --spec trie_insert(trie(), state(), edge(), state()) -> {Updated, state()} when - NChildren :: non_neg_integer(), - Updated :: false | NChildren. +-spec trie_insert + (trie(), state(), edge(), state()) -> {Updated, state()} when + NChildren :: non_neg_integer(), + Updated :: false | NChildren; + (trie(), ?rlookup, static_key(), [level() | '+']) -> + {false | non_neg_integer(), state()}. trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token, NewState) -> Key = {State, Token}, Rec = #trans{ key = Key, next = NewState }, - case ets:insert_new(Trie, Rec) of + case ets_insert_new(Trie, Rec) of true -> ok = Persist(Key, NewState), Inc = @@ -287,7 +397,7 @@ trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token, end. -spec get_id_for_key(trie(), state(), edge()) -> static_key(). -get_id_for_key(#trie{static_key_size = Size}, State, Token) when Size =< 32 -> +get_id_for_key(#trie{is_binary_key = IsBin, static_key_size = Size}, State, Token) -> %% Requirements for the return value: %% %% It should be globally unique for the `{State, Token}` pair. Other @@ -303,11 +413,17 @@ get_id_for_key(#trie{static_key_size = Size}, State, Token) when Size =< 32 -> %% If we want to impress computer science crowd, sorry, I mean to %% minimize storage requirements, we can even employ Huffman coding %% based on the frequency of messages. - <> = crypto:hash(sha256, term_to_binary([State | Token])), - Int. + Hash = crypto:hash(sha256, term_to_binary([State | Token])), + case IsBin of + false -> + <> = Hash, + Int; + true -> + element(1, erlang:split_binary(Hash, Size)) + end. %% erlfmt-ignore --spec do_match_topics(trie(), state(), [binary() | '+'], [binary() | '+' | '#']) -> +-spec do_match_topics(trie(), state(), [level() | '+'], [level() | '+' | '#']) -> list(). do_match_topics(Trie, State, Varying, []) -> case trie_next(Trie, State, ?EOT) of @@ -341,7 +457,7 @@ do_match_topics(Trie, State, Varying, [Level | Rest]) -> Emanating ). --spec do_lookup_topic_key(trie(), state(), [binary()], [binary()]) -> +-spec do_lookup_topic_key(trie(), state(), [level()], [level()]) -> {ok, msg_storage_key()} | undefined. do_lookup_topic_key(Trie, State, [], Varying) -> case trie_next(Trie, State, ?EOT) of @@ -360,29 +476,42 @@ do_lookup_topic_key(Trie, State, [Tok | Rest], Varying) -> undefined end. -do_topic_key(Trie, _, _, State, [], Varying) -> +do_topic_key(Trie, _, _, State, [], Tokens, Varying) -> %% We reached the end of topic. Assert: Trie node that corresponds %% to EOT cannot be a wildcard. - {_, false, Static} = trie_next_(Trie, State, ?EOT), + {Updated, false, Static} = trie_next_(Trie, State, ?EOT), + _ = + case Trie#trie.rlookups andalso Updated of + false -> + ok; + _ -> + trie_insert(Trie, rlookup, Static, lists:reverse(Tokens)) + end, {Static, lists:reverse(Varying)}; -do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Varying0) -> +do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Tokens, Varying0) -> % TODO: it's not necessary to call it every time. Threshold = ThresholdFun(Depth), + {NChildren, IsWildcard, NextState} = trie_next_(Trie, State, Tok), Varying = - case trie_next_(Trie, State, Tok) of - {NChildren, _, NextState} when is_integer(NChildren), NChildren >= Threshold -> + case IsWildcard of + _ when is_integer(NChildren), NChildren >= Threshold -> %% Number of children for the trie node reached the %% threshold, we need to insert wildcard here. {_, _WildcardState} = trie_insert(Trie, State, ?PLUS), Varying0; - {_, false, NextState} -> + false -> Varying0; - {_, true, NextState} -> + true -> %% This topic level is marked as wildcard in the trie, %% we need to add it to the varying part of the key: [Tok | Varying0] end, - do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, Varying). + TokOrWildcard = + case IsWildcard of + true -> ?PLUS; + false -> Tok + end, + do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, [TokOrWildcard | Tokens], Varying). %% @doc Has side effects! Inserts missing elements -spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} when @@ -450,12 +579,51 @@ follow_path(#trie{} = T, State, Path) -> all_emanating(T, State) ). -contains_wildcard([{{_State, ?PLUS}, _Next} | _Rest]) -> - true; -contains_wildcard([_ | Rest]) -> - contains_wildcard(Rest); -contains_wildcard([]) -> - false. +tokens_of_path([{{_State, Token}, _Next} | Rest]) -> + [Token | tokens_of_path(Rest)]; +tokens_of_path([]) -> + []. + +%% Wrapper for type checking only: +-compile({inline, ets_insert_new/2}). +-spec ets_insert_new(ets:tid(), trans()) -> boolean(). +ets_insert_new(Tid, Trans) -> + ets:insert_new(Tid, Trans). + +compress_topic(_StaticKey, [], [], Acc) -> + lists:reverse(Acc); +compress_topic(StaticKey, TStructL0, ['#'], Acc) -> + case TStructL0 of + [] -> + lists:reverse(Acc); + ['+' | TStructL] -> + compress_topic(StaticKey, TStructL, ['#'], ['+' | Acc]); + [_ | TStructL] -> + compress_topic(StaticKey, TStructL, ['#'], Acc) + end; +compress_topic(StaticKey, ['+' | TStructL], [Level | TopicL], Acc) -> + compress_topic(StaticKey, TStructL, TopicL, [Level | Acc]); +compress_topic(StaticKey, [Struct | TStructL], [Level | TopicL], Acc) when + Level =:= '+'; Level =:= Struct +-> + compress_topic(StaticKey, TStructL, TopicL, Acc); +compress_topic(StaticKey, TStructL, TopicL, _Acc) -> + %% Topic is mismatched with the structure. This should never + %% happen. LTS got corrupted? + Err = #{ + msg => 'Topic structure mismatch', + static_key => StaticKey, + input => TopicL, + structure => TStructL + }, + throw({unrecoverable, Err}). + +decompress_topic(['+' | TStructL], [Level | TopicL], Acc) -> + decompress_topic(TStructL, TopicL, [Level | Acc]); +decompress_topic([StaticLevel | TStructL], TopicL, Acc) -> + decompress_topic(TStructL, TopicL, [StaticLevel | Acc]); +decompress_topic([], [], Acc) -> + lists:reverse(Acc). %%================================================================================ %% Tests @@ -658,6 +826,76 @@ topic_match_test() -> dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot")) end. +%% erlfmt-ignore +rlookup_test() -> + T = trie_create(#{reverse_lookups => true}), + Threshold = 2, + ThresholdFun = fun(0) -> 1000; + (_) -> Threshold + end, + {S1, []} = test_key(T, ThresholdFun, [1]), + {S11, []} = test_key(T, ThresholdFun, [1, 1]), + {S12, []} = test_key(T, ThresholdFun, [1, 2]), + {S111, []} = test_key(T, ThresholdFun, [1, 1, 1]), + {S11e, []} = test_key(T, ThresholdFun, [1, 1, '']), + %% Now add learned wildcards: + {S21, []} = test_key(T, ThresholdFun, [2, 1]), + {S22, []} = test_key(T, ThresholdFun, [2, 2]), + {S2_, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3]), + {S2_11, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 1]), + {S2_12, [<<"4">>]} = test_key(T, ThresholdFun, [2, 4, 1, 2]), + {S2_1_, [<<"3">>, <<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 3]), + %% Check reverse matching: + ?assertEqual({ok, [<<"1">>]}, reverse_lookup(T, S1)), + ?assertEqual({ok, [<<"1">>, <<"1">>]}, reverse_lookup(T, S11)), + ?assertEqual({ok, [<<"1">>, <<"2">>]}, reverse_lookup(T, S12)), + ?assertEqual({ok, [<<"1">>, <<"1">>, <<"1">>]}, reverse_lookup(T, S111)), + ?assertEqual({ok, [<<"1">>, <<"1">>, '']}, reverse_lookup(T, S11e)), + ?assertEqual({ok, [<<"2">>, <<"1">>]}, reverse_lookup(T, S21)), + ?assertEqual({ok, [<<"2">>, <<"2">>]}, reverse_lookup(T, S22)), + ?assertEqual({ok, [<<"2">>, '+']}, reverse_lookup(T, S2_)), + ?assertEqual({ok, [<<"2">>, '+', <<"1">>, <<"1">>]}, reverse_lookup(T, S2_11)), + ?assertEqual({ok, [<<"2">>, '+', <<"1">>, <<"2">>]}, reverse_lookup(T, S2_12)), + ?assertEqual({ok, [<<"2">>, '+', <<"1">>, '+']}, reverse_lookup(T, S2_1_)), + %% Dump and restore trie to make sure rlookup still works: + T1 = trie_restore(#{reverse_lookups => true}, trie_dump(T, all)), + destroy(T), + ?assertEqual({ok, [<<"2">>, <<"1">>]}, reverse_lookup(T1, S21)), + ?assertEqual({ok, [<<"2">>, '+', <<"1">>, '+']}, reverse_lookup(T1, S2_1_)). + +n_topics_test() -> + Threshold = 3, + ThresholdFun = fun + (0) -> 1000; + (_) -> Threshold + end, + + T = trie_create(#{reverse_lookups => true}), + ?assertEqual(0, info(T, topics)), + {S11, []} = test_key(T, ThresholdFun, [1, 1]), + {S11, []} = test_key(T, ThresholdFun, [1, 1]), + ?assertEqual(1, info(T, topics)), + + {S12, []} = test_key(T, ThresholdFun, [1, 2]), + {S12, []} = test_key(T, ThresholdFun, [1, 2]), + ?assertEqual(2, info(T, topics)), + + {_S13, []} = test_key(T, ThresholdFun, [1, 3]), + ?assertEqual(3, info(T, topics)), + + {S1_, [_]} = test_key(T, ThresholdFun, [1, 4]), + ?assertEqual(4, info(T, topics)), + + {S1_, [_]} = test_key(T, ThresholdFun, [1, 5]), + {S1_, [_]} = test_key(T, ThresholdFun, [1, 6]), + {S1_, [_]} = test_key(T, ThresholdFun, [1, 7]), + ?assertEqual(4, info(T, topics)), + + ?assertMatch( + [{size, N}, {topics, 4}] when is_integer(N), + info(T) + ). + -define(keys_history, topic_key_history). %% erlfmt-ignore @@ -773,11 +1011,16 @@ paths_test() -> ), %% Test filter function for paths containing wildcards - WildcardPaths = lists:filter(fun contains_wildcard/1, Paths), + WildcardPaths = lists:filter( + fun(Path) -> + lists:member(?PLUS, tokens_of_path(Path)) + end, + Paths + ), FormattedWildcardPaths = lists:map(fun format_path/1, WildcardPaths), ?assertEqual( - sets:from_list(FormattedWildcardPaths, [{version, 2}]), sets:from_list(lists:map(FormatPathSpec, ExpectedWildcardPaths), [{version, 2}]), + sets:from_list(FormattedWildcardPaths, [{version, 2}]), #{ expected => ExpectedWildcardPaths, wildcards => FormattedWildcardPaths @@ -795,13 +1038,97 @@ paths_test() -> #trie{trie = Tab2} = T2, Dump1 = sets:from_list(ets:tab2list(Tab1), [{version, 2}]), Dump2 = sets:from_list(ets:tab2list(Tab2), [{version, 2}]), - ?assertEqual(Dump1, Dump2), - - ok. + ?assertEqual(Dump1, Dump2). format_path([{{_State, Edge}, _Next} | Rest]) -> [Edge | format_path(Rest)]; format_path([]) -> []. +compress_topic_test() -> + %% Structure without wildcards: + ?assertEqual([], compress_topic(42, [], [])), + ?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>])), + ?assertEqual([], compress_topic(42, [<<"foo">>, ''], [<<"foo">>, ''])), + ?assertEqual([], compress_topic(42, [<<"foo">>, ''], [<<"foo">>, '+'])), + ?assertEqual([], compress_topic(42, [<<"foo">>, ''], ['+', '+'])), + ?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], ['#'])), + ?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], [<<"foo">>, <<"bar">>, '#'])), + ?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], ['+', '#'])), + ?assertEqual( + [], compress_topic(42, [<<"foo">>, <<"bar">>, ''], [<<"foo">>, <<"bar">>, '', '#']) + ), + %% With wildcards: + ?assertEqual( + [<<"1">>], compress_topic(42, [<<"foo">>, '+', <<"bar">>], [<<"foo">>, <<"1">>, <<"bar">>]) + ), + ?assertEqual( + [<<"1">>, <<"2">>], + compress_topic( + 42, + [<<"foo">>, '+', <<"bar">>, '+', <<"baz">>], + [<<"foo">>, <<"1">>, <<"bar">>, <<"2">>, <<"baz">>] + ) + ), + ?assertEqual( + ['+', <<"2">>], + compress_topic( + 42, + [<<"foo">>, '+', <<"bar">>, '+', <<"baz">>], + [<<"foo">>, '+', <<"bar">>, <<"2">>, <<"baz">>] + ) + ), + ?assertEqual( + ['+', '+'], + compress_topic( + 42, + [<<"foo">>, '+', <<"bar">>, '+', <<"baz">>], + [<<"foo">>, '+', <<"bar">>, '+', <<"baz">>] + ) + ), + ?assertEqual( + ['+', '+'], + compress_topic( + 42, + [<<"foo">>, '+', <<"bar">>, '+', <<"baz">>], + ['#'] + ) + ), + ?assertEqual( + ['+', '+'], + compress_topic( + 42, + [<<"foo">>, '+', <<"bar">>, '+', <<"baz">>], + [<<"foo">>, '+', '+', '#'] + ) + ), + %% Mismatch: + ?assertException(_, {unrecoverable, _}, compress_topic(42, [<<"foo">>], [<<"bar">>])), + ?assertException(_, {unrecoverable, _}, compress_topic(42, [], [<<"bar">>])), + ?assertException(_, {unrecoverable, _}, compress_topic(42, [<<"foo">>], [])), + ?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], ['', '', ''])), + ?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], [<<"foo">>, '#'])), + ?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], ['+', '+', '+', '#'])), + ?assertException(_, {unrecoverable, _}, compress_topic(42, ['+'], [<<"bar">>, '+'])), + ?assertException( + _, {unrecoverable, _}, compress_topic(42, [<<"foo">>, '+'], [<<"bar">>, <<"baz">>]) + ). + +decompress_topic_test() -> + %% Structure without wildcards: + ?assertEqual([], decompress_topic([], [])), + ?assertEqual( + [<<"foo">>, '', <<"bar">>], + decompress_topic([<<"foo">>, '', <<"bar">>], []) + ), + %% With wildcards: + ?assertEqual( + [<<"foo">>, '', <<"bar">>, <<"baz">>], + decompress_topic([<<"foo">>, '+', <<"bar">>, '+'], ['', <<"baz">>]) + ), + ?assertEqual( + [<<"foo">>, '+', <<"bar">>, '+', ''], + decompress_topic([<<"foo">>, '+', <<"bar">>, '+', ''], ['+', '+']) + ). + -endif.