1139 lines
38 KiB
Erlang
1139 lines
38 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_ds_lts).
|
|
|
|
%% API:
|
|
-export([
|
|
trie_create/1, trie_create/0,
|
|
destroy/1,
|
|
trie_dump/2,
|
|
trie_restore/2,
|
|
trie_update/2,
|
|
trie_copy_learned_paths/2,
|
|
topic_key/3,
|
|
match_topics/2,
|
|
lookup_topic_key/2,
|
|
reverse_lookup/2,
|
|
info/2,
|
|
info/1,
|
|
|
|
compress_topic/3,
|
|
decompress_topic/2
|
|
]).
|
|
|
|
%% Debug:
|
|
-export([trie_next/3, trie_insert/3, dump_to_dot/2]).
|
|
|
|
-export_type([
|
|
options/0,
|
|
level/0,
|
|
static_key/0,
|
|
trie/0,
|
|
msg_storage_key/0,
|
|
learned_structure/0
|
|
]).
|
|
|
|
-include_lib("stdlib/include/ms_transform.hrl").
|
|
|
|
-ifdef(TEST).
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-elvis([{elvis_style, variable_naming_convention, disable}]).
|
|
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
|
|
-endif.
|
|
|
|
%%================================================================================
|
|
%% Type declarations
|
|
%%================================================================================
|
|
|
|
%% End Of Topic
|
|
-define(EOT, []).
|
|
-define(PLUS, '+').
|
|
|
|
-type level() :: binary() | ''.
|
|
|
|
-type edge() :: level() | ?EOT | ?PLUS.
|
|
|
|
%% Fixed size binary or integer, depending on the options:
|
|
-type static_key() :: non_neg_integer() | binary().
|
|
|
|
%% Trie root:
|
|
-define(PREFIX, prefix).
|
|
%% Special prefix root for reverse lookups:
|
|
-define(rlookup, rlookup).
|
|
-define(rlookup(STATIC), {?rlookup, STATIC}).
|
|
|
|
-type state() :: static_key() | ?PREFIX.
|
|
|
|
-type varying() :: [level() | ?PLUS].
|
|
|
|
-type msg_storage_key() :: {static_key(), varying()}.
|
|
|
|
-type threshold_fun() :: fun((non_neg_integer()) -> non_neg_integer()).
|
|
|
|
-type persist_callback() :: fun((_Key, _Val) -> ok).
|
|
|
|
-type learned_structure() :: [level() | ?PLUS, ...].
|
|
|
|
-type options() ::
|
|
#{
|
|
persist_callback => persist_callback(),
|
|
%% If set, static key is an integer that fits in a given nubmer of bits:
|
|
static_key_bits => pos_integer(),
|
|
%% If set, static key is a _binary_ of a given length:
|
|
static_key_bytes => pos_integer(),
|
|
reverse_lookups => boolean()
|
|
}.
|
|
|
|
-type dump() :: [{_Key, _Val}].
|
|
|
|
-record(trie, {
|
|
persist :: persist_callback(),
|
|
is_binary_key :: boolean(),
|
|
static_key_size :: pos_integer(),
|
|
trie :: ets:tid(),
|
|
stats :: ets:tid(),
|
|
rlookups = false :: boolean()
|
|
}).
|
|
|
|
-opaque trie() :: #trie{}.
|
|
|
|
-record(trans, {key, next}).
|
|
|
|
-type trans() ::
|
|
#trans{
|
|
key :: {state(), edge()},
|
|
next :: state()
|
|
}
|
|
| #trans{
|
|
key :: {?rlookup, static_key()},
|
|
next :: [level() | ?PLUS]
|
|
}.
|
|
|
|
%%================================================================================
|
|
%% API functions
|
|
%%================================================================================
|
|
|
|
%% @doc Create an empty trie
|
|
-spec trie_create(options()) -> trie().
|
|
trie_create(UserOpts) ->
|
|
Persist = maps:get(
|
|
persist_callback,
|
|
UserOpts,
|
|
fun(_, _) -> ok end
|
|
),
|
|
Rlookups = maps:get(reverse_lookups, UserOpts, false),
|
|
IsBinaryKey =
|
|
case UserOpts of
|
|
#{static_key_bits := StaticKeySize} ->
|
|
false;
|
|
#{static_key_bytes := StaticKeySize} ->
|
|
true;
|
|
_ ->
|
|
StaticKeySize = 16,
|
|
true
|
|
end,
|
|
Trie = ets:new(trie, [{keypos, #trans.key}, set, public]),
|
|
Stats = ets:new(stats, [{keypos, 1}, set, public]),
|
|
#trie{
|
|
persist = Persist,
|
|
is_binary_key = IsBinaryKey,
|
|
static_key_size = StaticKeySize,
|
|
trie = Trie,
|
|
stats = Stats,
|
|
rlookups = Rlookups
|
|
}.
|
|
|
|
-spec trie_create() -> trie().
|
|
trie_create() ->
|
|
trie_create(#{}).
|
|
|
|
-spec destroy(trie()) -> ok.
|
|
destroy(#trie{trie = Trie, stats = Stats}) ->
|
|
catch ets:delete(Trie),
|
|
catch ets:delete(Stats),
|
|
ok.
|
|
|
|
%% @doc Restore trie from a dump
|
|
-spec trie_restore(options(), dump()) -> trie().
|
|
trie_restore(Options, Dump) ->
|
|
trie_update(trie_create(Options), Dump).
|
|
|
|
%% @doc Update a trie with a dump of operations (used for replication)
|
|
-spec trie_update(trie(), dump()) -> trie().
|
|
trie_update(Trie, Dump) ->
|
|
lists:foreach(
|
|
fun({{StateFrom, Token}, StateTo}) ->
|
|
trie_insert(Trie, StateFrom, Token, StateTo)
|
|
end,
|
|
Dump
|
|
),
|
|
Trie.
|
|
|
|
-spec trie_dump(trie(), _Filter :: all | wildcard) -> dump().
|
|
trie_dump(Trie, Filter) ->
|
|
case Filter of
|
|
all ->
|
|
Fun = fun(_) -> true end;
|
|
wildcard ->
|
|
Fun = fun(L) -> lists:member(?PLUS, L) end
|
|
end,
|
|
Paths = lists:filter(
|
|
fun(Path) ->
|
|
Fun(tokens_of_path(Path))
|
|
end,
|
|
paths(Trie)
|
|
),
|
|
RlookupIdx = lists:filter(
|
|
fun({_, Tokens}) ->
|
|
Fun(Tokens)
|
|
end,
|
|
all_emanating(Trie, ?rlookup)
|
|
),
|
|
lists:flatten([Paths, RlookupIdx]).
|
|
|
|
-spec trie_copy_learned_paths(trie(), trie()) -> trie().
|
|
trie_copy_learned_paths(OldTrie, NewTrie) ->
|
|
lists:foreach(
|
|
fun({{StateFrom, Token}, StateTo}) ->
|
|
trie_insert(NewTrie, StateFrom, Token, StateTo)
|
|
end,
|
|
trie_dump(OldTrie, wildcard)
|
|
),
|
|
NewTrie.
|
|
|
|
%% @doc Lookup the topic key. Create a new one, if not found.
|
|
-spec topic_key(trie(), threshold_fun(), [level()]) -> msg_storage_key().
|
|
topic_key(Trie, ThresholdFun, Tokens) ->
|
|
do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, [], []).
|
|
|
|
%% @doc Return an exisiting topic key if it exists.
|
|
-spec lookup_topic_key(trie(), [level()]) -> {ok, msg_storage_key()} | undefined.
|
|
lookup_topic_key(Trie, Tokens) ->
|
|
do_lookup_topic_key(Trie, ?PREFIX, Tokens, []).
|
|
|
|
%% @doc Return list of keys of topics that match a given topic filter
|
|
-spec match_topics(trie(), [level() | '+' | '#']) ->
|
|
[msg_storage_key()].
|
|
match_topics(Trie, TopicFilter) ->
|
|
do_match_topics(Trie, ?PREFIX, [], TopicFilter).
|
|
|
|
%% @doc Dump trie to graphviz format for debugging
|
|
-spec dump_to_dot(trie(), file:filename()) -> ok.
|
|
dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) ->
|
|
L = ets:tab2list(Trie),
|
|
{Nodes0, Edges} =
|
|
lists:foldl(
|
|
fun(#trans{key = {From, Label}, next = To}, {AccN, AccEdge}) ->
|
|
Edge = {From, To, Label},
|
|
{[From, To] ++ AccN, [Edge | AccEdge]}
|
|
end,
|
|
{[], []},
|
|
L
|
|
),
|
|
Nodes =
|
|
lists:map(
|
|
fun(Node) ->
|
|
case ets:lookup(Stats, Node) of
|
|
[{_, NChildren}] -> ok;
|
|
[] -> NChildren = 0
|
|
end,
|
|
{Node, NChildren}
|
|
end,
|
|
lists:usort(Nodes0)
|
|
),
|
|
{ok, FD} = file:open(Filename, [write]),
|
|
Print = fun
|
|
(?PREFIX) -> "prefix";
|
|
(Bin) when is_binary(Bin) -> Bin;
|
|
(NodeId) when is_integer(NodeId) -> integer_to_binary(NodeId, 16)
|
|
end,
|
|
io:format(FD, "digraph {~n", []),
|
|
lists:foreach(
|
|
fun({Node, NChildren}) ->
|
|
Id = Print(Node),
|
|
io:format(FD, " \"~s\" [label=\"~s : ~p\"];~n", [Id, Id, NChildren])
|
|
end,
|
|
Nodes
|
|
),
|
|
lists:foreach(
|
|
fun({From, To, Label}) ->
|
|
io:format(FD, " \"~s\" -> \"~s\" [label=\"~s\"];~n", [Print(From), Print(To), Label])
|
|
end,
|
|
Edges
|
|
),
|
|
io:format(FD, "}~n", []),
|
|
file:close(FD).
|
|
|
|
-spec reverse_lookup(trie(), static_key()) -> {ok, learned_structure()} | undefined.
|
|
reverse_lookup(#trie{rlookups = false}, _) ->
|
|
error({badarg, reverse_lookups_disabled});
|
|
reverse_lookup(#trie{trie = Trie}, StaticKey) ->
|
|
case ets:lookup(Trie, ?rlookup(StaticKey)) of
|
|
[#trans{next = Next}] ->
|
|
{ok, Next};
|
|
[] ->
|
|
undefined
|
|
end.
|
|
|
|
%% @doc Get information about the trie.
|
|
%%
|
|
%% Note: `reverse_lookups' must be enabled to get the number of
|
|
%% topics.
|
|
-spec info(trie(), size | topics) -> _.
|
|
info(#trie{rlookups = true, stats = Stats}, topics) ->
|
|
case ets:lookup(Stats, ?rlookup) of
|
|
[{_, N}] -> N;
|
|
[] -> 0
|
|
end;
|
|
info(#trie{}, topics) ->
|
|
undefined;
|
|
info(#trie{trie = T}, size) ->
|
|
ets:info(T, size).
|
|
|
|
%% @doc Return size of the trie
|
|
-spec info(trie()) -> proplists:proplist().
|
|
info(Trie) ->
|
|
[
|
|
{size, info(Trie, size)},
|
|
{topics, info(Trie, topics)}
|
|
].
|
|
|
|
%%%%%%%% Topic compression %%%%%%%%%%
|
|
|
|
%% @doc Given topic structure for the static LTS index (as returned by
|
|
%% `reverse_lookup'), compress a topic filter to exclude static
|
|
%% levels:
|
|
-spec compress_topic(static_key(), learned_structure(), emqx_ds:topic_filter()) ->
|
|
[emqx_ds_lts:level() | '+'].
|
|
compress_topic(StaticKey, TopicStructure, TopicFilter) ->
|
|
compress_topic(StaticKey, TopicStructure, TopicFilter, []).
|
|
|
|
%% @doc Given topic structure and a compressed topic filter, return
|
|
%% the original* topic filter.
|
|
%%
|
|
%% * '#' will be replaced with '+'s
|
|
-spec decompress_topic(learned_structure(), [level() | '+']) ->
|
|
emqx_ds:topic_filter().
|
|
decompress_topic(TopicStructure, Topic) ->
|
|
decompress_topic(TopicStructure, Topic, []).
|
|
|
|
%%================================================================================
|
|
%% Internal exports
|
|
%%================================================================================
|
|
|
|
-spec trie_next(trie(), state(), level() | ?EOT) -> {Wildcard, state()} | undefined when
|
|
Wildcard :: boolean().
|
|
trie_next(#trie{trie = Trie}, State, ?EOT) ->
|
|
case ets:lookup(Trie, {State, ?EOT}) of
|
|
[#trans{next = Next}] -> {false, Next};
|
|
[] -> undefined
|
|
end;
|
|
trie_next(#trie{trie = Trie}, State, Token) ->
|
|
%% NOTE: it's crucial to return the original (non-wildcard) index
|
|
%% for the topic, if found. Otherwise messages from the same topic
|
|
%% will end up in different streams, once the wildcard is learned,
|
|
%% and their replay order will become undefined:
|
|
case ets:lookup(Trie, {State, Token}) of
|
|
[#trans{next = Next}] ->
|
|
{false, Next};
|
|
[] ->
|
|
case ets:lookup(Trie, {State, ?PLUS}) of
|
|
[#trans{next = Next}] -> {true, Next};
|
|
[] -> undefined
|
|
end
|
|
end.
|
|
|
|
-spec trie_insert(trie(), state(), edge()) -> {Updated, state()} when
|
|
NChildren :: non_neg_integer(),
|
|
Updated :: false | NChildren.
|
|
trie_insert(Trie, State, Token) ->
|
|
trie_insert(Trie, State, Token, get_id_for_key(Trie, State, Token)).
|
|
|
|
%%================================================================================
|
|
%% Internal functions
|
|
%%================================================================================
|
|
|
|
-spec trie_insert
|
|
(trie(), state(), edge(), state()) -> {Updated, state()} when
|
|
NChildren :: non_neg_integer(),
|
|
Updated :: false | NChildren;
|
|
(trie(), ?rlookup, static_key(), [level() | '+']) ->
|
|
{false | non_neg_integer(), state()}.
|
|
trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token, NewState) ->
|
|
Key = {State, Token},
|
|
Rec = #trans{
|
|
key = Key,
|
|
next = NewState
|
|
},
|
|
case ets_insert_new(Trie, Rec) of
|
|
true ->
|
|
ok = Persist(Key, NewState),
|
|
Inc =
|
|
case Token of
|
|
?EOT -> 0;
|
|
?PLUS -> 0;
|
|
_ -> 1
|
|
end,
|
|
NChildren = ets:update_counter(Stats, State, {2, Inc}, {State, 0}),
|
|
{NChildren, NewState};
|
|
false ->
|
|
[#trans{next = NextState}] = ets:lookup(Trie, Key),
|
|
{false, NextState}
|
|
end.
|
|
|
|
-spec get_id_for_key(trie(), state(), edge()) -> static_key().
|
|
get_id_for_key(#trie{is_binary_key = IsBin, static_key_size = Size}, State, Token) ->
|
|
%% Requirements for the return value:
|
|
%%
|
|
%% It should be globally unique for the `{State, Token}` pair. Other
|
|
%% than that, there's no requirements. The return value doesn't even
|
|
%% have to be deterministic, since the states are saved in the trie.
|
|
%% Yet, it helps a lot if it is, so that applying the same sequence
|
|
%% of topics to different tries will result in the same trie state.
|
|
%%
|
|
%% The generated value becomes the ID of the topic in the durable
|
|
%% storage. Its size should be relatively small to reduce the
|
|
%% overhead of storing messages.
|
|
%%
|
|
%% If we want to impress computer science crowd, sorry, I mean to
|
|
%% minimize storage requirements, we can even employ Huffman coding
|
|
%% based on the frequency of messages.
|
|
Hash = crypto:hash(sha256, term_to_binary([State | Token])),
|
|
case IsBin of
|
|
false ->
|
|
%% Note: for backward compatibility with bitstream_lts
|
|
%% layout we allow the key to be an integer. But this also
|
|
%% changes the semantics of `static_key_size` from number
|
|
%% of bytes to bits:
|
|
<<Int:Size, _/bytes>> = Hash,
|
|
Int;
|
|
true ->
|
|
element(1, erlang:split_binary(Hash, Size))
|
|
end.
|
|
|
|
%% erlfmt-ignore
|
|
-spec do_match_topics(trie(), state(), [level() | '+'], [level() | '+' | '#']) ->
|
|
list().
|
|
do_match_topics(Trie, State, Varying, []) ->
|
|
case trie_next(Trie, State, ?EOT) of
|
|
{false, Static} -> [{Static, lists:reverse(Varying)}];
|
|
undefined -> []
|
|
end;
|
|
do_match_topics(Trie, State, Varying, ['#']) ->
|
|
Emanating = emanating(Trie, State, ?PLUS),
|
|
lists:flatmap(
|
|
fun
|
|
({?EOT, Static}) ->
|
|
[{Static, lists:reverse(Varying)}];
|
|
({?PLUS, NextState}) ->
|
|
do_match_topics(Trie, NextState, [?PLUS | Varying], ['#']);
|
|
({_, NextState}) ->
|
|
do_match_topics(Trie, NextState, Varying, ['#'])
|
|
end,
|
|
Emanating
|
|
);
|
|
do_match_topics(Trie, State, Varying, [Level | Rest]) ->
|
|
Emanating = emanating(Trie, State, Level),
|
|
lists:flatmap(
|
|
fun
|
|
({?EOT, _NextState}) ->
|
|
[];
|
|
({?PLUS, NextState}) ->
|
|
do_match_topics(Trie, NextState, [Level | Varying], Rest);
|
|
({_, NextState}) ->
|
|
do_match_topics(Trie, NextState, Varying, Rest)
|
|
end,
|
|
Emanating
|
|
).
|
|
|
|
-spec do_lookup_topic_key(trie(), state(), [level()], [level()]) ->
|
|
{ok, msg_storage_key()} | undefined.
|
|
do_lookup_topic_key(Trie, State, [], Varying) ->
|
|
case trie_next(Trie, State, ?EOT) of
|
|
{false, Static} ->
|
|
{ok, {Static, lists:reverse(Varying)}};
|
|
undefined ->
|
|
undefined
|
|
end;
|
|
do_lookup_topic_key(Trie, State, [Tok | Rest], Varying) ->
|
|
case trie_next(Trie, State, Tok) of
|
|
{true, NextState} ->
|
|
do_lookup_topic_key(Trie, NextState, Rest, [Tok | Varying]);
|
|
{false, NextState} ->
|
|
do_lookup_topic_key(Trie, NextState, Rest, Varying);
|
|
undefined ->
|
|
undefined
|
|
end.
|
|
|
|
do_topic_key(Trie, _, _, State, [], Tokens, Varying) ->
|
|
%% We reached the end of topic. Assert: Trie node that corresponds
|
|
%% to EOT cannot be a wildcard.
|
|
{Updated, false, Static} = trie_next_(Trie, State, ?EOT),
|
|
_ =
|
|
case Trie#trie.rlookups andalso Updated of
|
|
false ->
|
|
ok;
|
|
_ ->
|
|
trie_insert(Trie, rlookup, Static, lists:reverse(Tokens))
|
|
end,
|
|
{Static, lists:reverse(Varying)};
|
|
do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Tokens, Varying0) ->
|
|
% TODO: it's not necessary to call it every time.
|
|
Threshold = ThresholdFun(Depth),
|
|
{NChildren, IsWildcard, NextState} = trie_next_(Trie, State, Tok),
|
|
Varying =
|
|
case IsWildcard of
|
|
_ when is_integer(NChildren), NChildren >= Threshold ->
|
|
%% Number of children for the trie node reached the
|
|
%% threshold, we need to insert wildcard here.
|
|
{_, _WildcardState} = trie_insert(Trie, State, ?PLUS),
|
|
Varying0;
|
|
false ->
|
|
Varying0;
|
|
true ->
|
|
%% This topic level is marked as wildcard in the trie,
|
|
%% we need to add it to the varying part of the key:
|
|
[Tok | Varying0]
|
|
end,
|
|
TokOrWildcard =
|
|
case IsWildcard of
|
|
true -> ?PLUS;
|
|
false -> Tok
|
|
end,
|
|
do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, [TokOrWildcard | Tokens], Varying).
|
|
|
|
%% @doc Has side effects! Inserts missing elements
|
|
-spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} when
|
|
New :: false | non_neg_integer(),
|
|
Wildcard :: boolean().
|
|
trie_next_(Trie, State, Token) ->
|
|
case trie_next(Trie, State, Token) of
|
|
{Wildcard, NextState} ->
|
|
{false, Wildcard, NextState};
|
|
undefined ->
|
|
{Updated, NextState} = trie_insert(Trie, State, Token),
|
|
{Updated, false, NextState}
|
|
end.
|
|
|
|
%% @doc Return all edges emanating from a node:
|
|
%% erlfmt-ignore
|
|
-spec emanating(trie(), state(), edge()) -> [{edge(), state()}].
|
|
emanating(#trie{trie = Tab}, State, ?PLUS) ->
|
|
ets:select(
|
|
Tab,
|
|
ets:fun2ms(
|
|
fun(#trans{key = {S, Edge}, next = Next}) when S == State ->
|
|
{Edge, Next}
|
|
end
|
|
)
|
|
);
|
|
emanating(#trie{trie = Tab}, State, ?EOT) ->
|
|
case ets:lookup(Tab, {State, ?EOT}) of
|
|
[#trans{next = Next}] -> [{?EOT, Next}];
|
|
[] -> []
|
|
end;
|
|
emanating(#trie{trie = Tab}, State, Token) when is_binary(Token); Token =:= '' ->
|
|
[
|
|
{Edge, Next}
|
|
|| #trans{key = {_, Edge}, next = Next} <-
|
|
ets:lookup(Tab, {State, ?PLUS}) ++
|
|
ets:lookup(Tab, {State, Token})
|
|
].
|
|
|
|
all_emanating(#trie{trie = Tab}, State) ->
|
|
ets:select(
|
|
Tab,
|
|
ets:fun2ms(fun(#trans{key = {S, Edge}, next = Next}) when S == State ->
|
|
{{S, Edge}, Next}
|
|
end)
|
|
).
|
|
|
|
paths(#trie{} = T) ->
|
|
Roots = all_emanating(T, ?PREFIX),
|
|
lists:flatmap(
|
|
fun({Segment, Next}) ->
|
|
follow_path(T, Next, [{Segment, Next}])
|
|
end,
|
|
Roots
|
|
).
|
|
|
|
follow_path(#trie{} = T, State, Path) ->
|
|
lists:flatmap(
|
|
fun
|
|
({{_State, ?EOT}, _Next} = Segment) ->
|
|
[lists:reverse([Segment | Path])];
|
|
({_Edge, Next} = Segment) ->
|
|
follow_path(T, Next, [Segment | Path])
|
|
end,
|
|
all_emanating(T, State)
|
|
).
|
|
|
|
tokens_of_path([{{_State, Token}, _Next} | Rest]) ->
|
|
[Token | tokens_of_path(Rest)];
|
|
tokens_of_path([]) ->
|
|
[].
|
|
|
|
%% Wrapper for type checking only:
|
|
-compile({inline, ets_insert_new/2}).
|
|
-spec ets_insert_new(ets:tid(), trans()) -> boolean().
|
|
ets_insert_new(Tid, Trans) ->
|
|
ets:insert_new(Tid, Trans).
|
|
|
|
compress_topic(_StaticKey, [], [], Acc) ->
|
|
lists:reverse(Acc);
|
|
compress_topic(StaticKey, TStructL0, ['#'], Acc) ->
|
|
case TStructL0 of
|
|
[] ->
|
|
lists:reverse(Acc);
|
|
['+' | TStructL] ->
|
|
compress_topic(StaticKey, TStructL, ['#'], ['+' | Acc]);
|
|
[_ | TStructL] ->
|
|
compress_topic(StaticKey, TStructL, ['#'], Acc)
|
|
end;
|
|
compress_topic(StaticKey, ['+' | TStructL], [Level | TopicL], Acc) ->
|
|
compress_topic(StaticKey, TStructL, TopicL, [Level | Acc]);
|
|
compress_topic(StaticKey, [Struct | TStructL], [Level | TopicL], Acc) when
|
|
Level =:= '+'; Level =:= Struct
|
|
->
|
|
compress_topic(StaticKey, TStructL, TopicL, Acc);
|
|
compress_topic(StaticKey, TStructL, TopicL, _Acc) ->
|
|
%% Topic is mismatched with the structure. This should never
|
|
%% happen. LTS got corrupted?
|
|
Err = #{
|
|
msg => 'Topic structure mismatch',
|
|
static_key => StaticKey,
|
|
input => TopicL,
|
|
structure => TStructL
|
|
},
|
|
throw({unrecoverable, Err}).
|
|
|
|
decompress_topic(['+' | TStructL], [Level | TopicL], Acc) ->
|
|
decompress_topic(TStructL, TopicL, [Level | Acc]);
|
|
decompress_topic([StaticLevel | TStructL], TopicL, Acc) ->
|
|
decompress_topic(TStructL, TopicL, [StaticLevel | Acc]);
|
|
decompress_topic([], [], Acc) ->
|
|
lists:reverse(Acc).
|
|
|
|
%%================================================================================
|
|
%% Tests
|
|
%%================================================================================
|
|
|
|
-ifdef(TEST).
|
|
|
|
trie_basic_test() ->
|
|
T = trie_create(),
|
|
?assertMatch(undefined, trie_next(T, ?PREFIX, <<"foo">>)),
|
|
{1, S1} = trie_insert(T, ?PREFIX, <<"foo">>),
|
|
?assertMatch({false, S1}, trie_insert(T, ?PREFIX, <<"foo">>)),
|
|
?assertMatch({false, S1}, trie_next(T, ?PREFIX, <<"foo">>)),
|
|
|
|
?assertMatch(undefined, trie_next(T, ?PREFIX, <<"bar">>)),
|
|
{2, S2} = trie_insert(T, ?PREFIX, <<"bar">>),
|
|
?assertMatch({false, S2}, trie_insert(T, ?PREFIX, <<"bar">>)),
|
|
|
|
?assertMatch(undefined, trie_next(T, S1, <<"foo">>)),
|
|
?assertMatch(undefined, trie_next(T, S1, <<"bar">>)),
|
|
{1, S11} = trie_insert(T, S1, <<"foo">>),
|
|
{2, S12} = trie_insert(T, S1, <<"bar">>),
|
|
?assertMatch({false, S11}, trie_next(T, S1, <<"foo">>)),
|
|
?assertMatch({false, S12}, trie_next(T, S1, <<"bar">>)),
|
|
|
|
?assertMatch(undefined, trie_next(T, S11, <<"bar">>)),
|
|
{1, S111} = trie_insert(T, S11, <<"bar">>),
|
|
?assertMatch({false, S111}, trie_next(T, S11, <<"bar">>)).
|
|
|
|
lookup_key_test() ->
|
|
T = trie_create(),
|
|
{_, S1} = trie_insert(T, ?PREFIX, <<"foo">>),
|
|
{_, S11} = trie_insert(T, S1, <<"foo">>),
|
|
%% Topics don't match until we insert ?EOT:
|
|
?assertMatch(
|
|
undefined,
|
|
lookup_topic_key(T, [<<"foo">>])
|
|
),
|
|
?assertMatch(
|
|
undefined,
|
|
lookup_topic_key(T, [<<"foo">>, <<"foo">>])
|
|
),
|
|
{_, S10} = trie_insert(T, S1, ?EOT),
|
|
{_, S110} = trie_insert(T, S11, ?EOT),
|
|
?assertMatch(
|
|
{ok, {S10, []}},
|
|
lookup_topic_key(T, [<<"foo">>])
|
|
),
|
|
?assertMatch(
|
|
{ok, {S110, []}},
|
|
lookup_topic_key(T, [<<"foo">>, <<"foo">>])
|
|
),
|
|
%% The rest of keys still don't match:
|
|
?assertMatch(
|
|
undefined,
|
|
lookup_topic_key(T, [<<"bar">>])
|
|
),
|
|
?assertMatch(
|
|
undefined,
|
|
lookup_topic_key(T, [<<"bar">>, <<"foo">>])
|
|
).
|
|
|
|
wildcard_lookup_test() ->
|
|
T = trie_create(),
|
|
{1, S1} = trie_insert(T, ?PREFIX, <<"foo">>),
|
|
%% Plus doesn't increase the number of children
|
|
{0, S11} = trie_insert(T, S1, ?PLUS),
|
|
{1, S111} = trie_insert(T, S11, <<"foo">>),
|
|
%% ?EOT doesn't increase the number of children
|
|
{0, S1110} = trie_insert(T, S111, ?EOT),
|
|
?assertMatch(
|
|
{ok, {S1110, [<<"bar">>]}},
|
|
lookup_topic_key(T, [<<"foo">>, <<"bar">>, <<"foo">>])
|
|
),
|
|
?assertMatch(
|
|
{ok, {S1110, [<<"quux">>]}},
|
|
lookup_topic_key(T, [<<"foo">>, <<"quux">>, <<"foo">>])
|
|
),
|
|
?assertMatch(
|
|
undefined,
|
|
lookup_topic_key(T, [<<"foo">>])
|
|
),
|
|
?assertMatch(
|
|
undefined,
|
|
lookup_topic_key(T, [<<"foo">>, <<"bar">>])
|
|
),
|
|
?assertMatch(
|
|
undefined,
|
|
lookup_topic_key(T, [<<"foo">>, <<"bar">>, <<"bar">>])
|
|
),
|
|
?assertMatch(
|
|
undefined,
|
|
lookup_topic_key(T, [<<"bar">>, <<"foo">>, <<"foo">>])
|
|
),
|
|
{_, S10} = trie_insert(T, S1, ?EOT),
|
|
?assertMatch(
|
|
{ok, {S10, []}},
|
|
lookup_topic_key(T, [<<"foo">>])
|
|
).
|
|
|
|
%% erlfmt-ignore
|
|
topic_key_test() ->
|
|
T = trie_create(),
|
|
try
|
|
Threshold = 4,
|
|
ThresholdFun = fun(0) -> 1000;
|
|
(_) -> Threshold
|
|
end,
|
|
%% Test that bottom layer threshold is high:
|
|
lists:foreach(
|
|
fun(I) ->
|
|
{_, []} = test_key(T, ThresholdFun, [I, 99999, 999999, 99999])
|
|
end,
|
|
lists:seq(1, 10)),
|
|
%% Test adding children on the 2nd level:
|
|
lists:foreach(
|
|
fun(I) ->
|
|
case test_key(T, ThresholdFun, [1, I, 1]) of
|
|
{_, []} ->
|
|
?assert(I < Threshold, {I, '<', Threshold}),
|
|
ok;
|
|
{_, [Var]} ->
|
|
?assert(I >= Threshold, {I, '>=', Threshold}),
|
|
?assertEqual(Var, integer_to_binary(I))
|
|
end
|
|
end,
|
|
lists:seq(1, 100)),
|
|
%% This doesn't affect 2nd level with a different prefix:
|
|
?assertMatch({_, []}, test_key(T, ThresholdFun, [2, 1, 1])),
|
|
?assertMatch({_, []}, test_key(T, ThresholdFun, [2, 10, 1])),
|
|
%% This didn't retroactively change the indexes that were
|
|
%% created prior to reaching the threshold:
|
|
?assertMatch({_, []}, test_key(T, ThresholdFun, [1, 1, 1])),
|
|
?assertMatch({_, []}, test_key(T, ThresholdFun, [1, 2, 1])),
|
|
%% Now create another level of +:
|
|
lists:foreach(
|
|
fun(I) ->
|
|
case test_key(T, ThresholdFun, [1, 42, 1, I, 42]) of
|
|
{_, [<<"42">>]} when I =< Threshold -> %% TODO: off by 1 error
|
|
ok;
|
|
{_, [<<"42">>, Var]} ->
|
|
?assertEqual(Var, integer_to_binary(I));
|
|
Ret ->
|
|
error({Ret, I})
|
|
end
|
|
end,
|
|
lists:seq(1, 100))
|
|
after
|
|
dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot"))
|
|
end.
|
|
|
|
%% erlfmt-ignore
|
|
topic_match_test() ->
|
|
T = trie_create(),
|
|
try
|
|
Threshold = 2,
|
|
ThresholdFun = fun(0) -> 1000;
|
|
(_) -> Threshold
|
|
end,
|
|
{S1, []} = test_key(T, ThresholdFun, [1]),
|
|
{S11, []} = test_key(T, ThresholdFun, [1, 1]),
|
|
{S12, []} = test_key(T, ThresholdFun, [1, 2]),
|
|
{S111, []} = test_key(T, ThresholdFun, [1, 1, 1]),
|
|
{S11e, []} = test_key(T, ThresholdFun, [1, 1, '']),
|
|
%% Match concrete topics:
|
|
assert_match_topics(T, [1], [{S1, []}]),
|
|
assert_match_topics(T, [1, 1], [{S11, []}]),
|
|
assert_match_topics(T, [1, 1, 1], [{S111, []}]),
|
|
%% Match topics with +:
|
|
assert_match_topics(T, [1, '+'], [{S11, []}, {S12, []}]),
|
|
assert_match_topics(T, [1, '+', 1], [{S111, []}]),
|
|
assert_match_topics(T, [1, '+', ''], [{S11e, []}]),
|
|
%% Match topics with #:
|
|
assert_match_topics(T, [1, '#'],
|
|
[{S1, []},
|
|
{S11, []}, {S12, []},
|
|
{S111, []}, {S11e, []}]),
|
|
assert_match_topics(T, [1, 1, '#'],
|
|
[{S11, []},
|
|
{S111, []},
|
|
{S11e, []}]),
|
|
%% Now add learned wildcards:
|
|
{S21, []} = test_key(T, ThresholdFun, [2, 1]),
|
|
{S22, []} = test_key(T, ThresholdFun, [2, 2]),
|
|
{S2_, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3]),
|
|
{S2_11, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 1]),
|
|
{S2_12, [<<"4">>]} = test_key(T, ThresholdFun, [2, 4, 1, 2]),
|
|
{S2_1_, [<<"3">>, <<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 3]),
|
|
%% %% Check matching:
|
|
assert_match_topics(T, [2, 2],
|
|
[{S22, []}, {S2_, [<<"2">>]}]),
|
|
assert_match_topics(T, [2, '+'],
|
|
[{S22, []}, {S21, []}, {S2_, ['+']}]),
|
|
assert_match_topics(T, [2, '#'],
|
|
[{S21, []}, {S22, []},
|
|
{S2_, ['+']},
|
|
{S2_11, ['+']}, {S2_12, ['+']}, {S2_1_, ['+', '+']}]),
|
|
ok
|
|
after
|
|
dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot"))
|
|
end.
|
|
|
|
%% erlfmt-ignore
|
|
rlookup_test() ->
|
|
T = trie_create(#{reverse_lookups => true}),
|
|
Threshold = 2,
|
|
ThresholdFun = fun(0) -> 1000;
|
|
(_) -> Threshold
|
|
end,
|
|
{S1, []} = test_key(T, ThresholdFun, [1]),
|
|
{S11, []} = test_key(T, ThresholdFun, [1, 1]),
|
|
{S12, []} = test_key(T, ThresholdFun, [1, 2]),
|
|
{S111, []} = test_key(T, ThresholdFun, [1, 1, 1]),
|
|
{S11e, []} = test_key(T, ThresholdFun, [1, 1, '']),
|
|
%% Now add learned wildcards:
|
|
{S21, []} = test_key(T, ThresholdFun, [2, 1]),
|
|
{S22, []} = test_key(T, ThresholdFun, [2, 2]),
|
|
{S2_, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3]),
|
|
{S2_11, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 1]),
|
|
{S2_12, [<<"4">>]} = test_key(T, ThresholdFun, [2, 4, 1, 2]),
|
|
{S2_1_, [<<"3">>, <<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 3]),
|
|
%% Check reverse matching:
|
|
?assertEqual({ok, [<<"1">>]}, reverse_lookup(T, S1)),
|
|
?assertEqual({ok, [<<"1">>, <<"1">>]}, reverse_lookup(T, S11)),
|
|
?assertEqual({ok, [<<"1">>, <<"2">>]}, reverse_lookup(T, S12)),
|
|
?assertEqual({ok, [<<"1">>, <<"1">>, <<"1">>]}, reverse_lookup(T, S111)),
|
|
?assertEqual({ok, [<<"1">>, <<"1">>, '']}, reverse_lookup(T, S11e)),
|
|
?assertEqual({ok, [<<"2">>, <<"1">>]}, reverse_lookup(T, S21)),
|
|
?assertEqual({ok, [<<"2">>, <<"2">>]}, reverse_lookup(T, S22)),
|
|
?assertEqual({ok, [<<"2">>, '+']}, reverse_lookup(T, S2_)),
|
|
?assertEqual({ok, [<<"2">>, '+', <<"1">>, <<"1">>]}, reverse_lookup(T, S2_11)),
|
|
?assertEqual({ok, [<<"2">>, '+', <<"1">>, <<"2">>]}, reverse_lookup(T, S2_12)),
|
|
?assertEqual({ok, [<<"2">>, '+', <<"1">>, '+']}, reverse_lookup(T, S2_1_)),
|
|
%% Dump and restore trie to make sure rlookup still works:
|
|
T1 = trie_restore(#{reverse_lookups => true}, trie_dump(T, all)),
|
|
destroy(T),
|
|
?assertEqual({ok, [<<"2">>, <<"1">>]}, reverse_lookup(T1, S21)),
|
|
?assertEqual({ok, [<<"2">>, '+', <<"1">>, '+']}, reverse_lookup(T1, S2_1_)).
|
|
|
|
n_topics_test() ->
|
|
Threshold = 3,
|
|
ThresholdFun = fun
|
|
(0) -> 1000;
|
|
(_) -> Threshold
|
|
end,
|
|
|
|
T = trie_create(#{reverse_lookups => true}),
|
|
?assertEqual(0, info(T, topics)),
|
|
{S11, []} = test_key(T, ThresholdFun, [1, 1]),
|
|
{S11, []} = test_key(T, ThresholdFun, [1, 1]),
|
|
?assertEqual(1, info(T, topics)),
|
|
|
|
{S12, []} = test_key(T, ThresholdFun, [1, 2]),
|
|
{S12, []} = test_key(T, ThresholdFun, [1, 2]),
|
|
?assertEqual(2, info(T, topics)),
|
|
|
|
{_S13, []} = test_key(T, ThresholdFun, [1, 3]),
|
|
?assertEqual(3, info(T, topics)),
|
|
|
|
{S1_, [_]} = test_key(T, ThresholdFun, [1, 4]),
|
|
?assertEqual(4, info(T, topics)),
|
|
|
|
{S1_, [_]} = test_key(T, ThresholdFun, [1, 5]),
|
|
{S1_, [_]} = test_key(T, ThresholdFun, [1, 6]),
|
|
{S1_, [_]} = test_key(T, ThresholdFun, [1, 7]),
|
|
?assertEqual(4, info(T, topics)),
|
|
|
|
?assertMatch(
|
|
[{size, N}, {topics, 4}] when is_integer(N),
|
|
info(T)
|
|
).
|
|
|
|
-define(keys_history, topic_key_history).
|
|
|
|
%% erlfmt-ignore
|
|
assert_match_topics(Trie, Filter0, Expected) ->
|
|
Filter = lists:map(fun(I) when is_integer(I) -> integer_to_binary(I);
|
|
(I) -> I
|
|
end,
|
|
Filter0),
|
|
Matched = match_topics(Trie, Filter),
|
|
?assertMatch( #{missing := [], unexpected := []}
|
|
, #{ missing => Expected -- Matched
|
|
, unexpected => Matched -- Expected
|
|
}
|
|
, Filter
|
|
).
|
|
|
|
%% erlfmt-ignore
|
|
test_key(Trie, Threshold, Topic0) ->
|
|
Topic = lists:map(fun('') -> '';
|
|
(I) -> integer_to_binary(I)
|
|
end,
|
|
Topic0),
|
|
Ret = topic_key(Trie, Threshold, Topic),
|
|
%% Test idempotency:
|
|
Ret1 = topic_key(Trie, Threshold, Topic),
|
|
?assertEqual(Ret, Ret1, Topic),
|
|
%% Add new key to the history:
|
|
case get(?keys_history) of
|
|
undefined -> OldHistory = #{};
|
|
OldHistory -> ok
|
|
end,
|
|
%% Test that the generated keys are always unique for the topic:
|
|
History = maps:update_with(
|
|
Ret,
|
|
fun(Old) ->
|
|
case Old =:= Topic of
|
|
true -> Old;
|
|
false -> error(#{ '$msg' => "Duplicate key!"
|
|
, key => Ret
|
|
, old_topic => Old
|
|
, new_topic => Topic
|
|
})
|
|
end
|
|
end,
|
|
Topic,
|
|
OldHistory),
|
|
put(?keys_history, History),
|
|
{ok, Ret} = lookup_topic_key(Trie, Topic),
|
|
Ret.
|
|
|
|
paths_test() ->
|
|
T = trie_create(),
|
|
Threshold = 4,
|
|
ThresholdFun = fun
|
|
(0) -> 1000;
|
|
(_) -> Threshold
|
|
end,
|
|
PathsToInsert =
|
|
[
|
|
[''],
|
|
[1],
|
|
[2, 2],
|
|
[3, 3, 3],
|
|
[2, 3, 4]
|
|
] ++ [[4, I, 4] || I <- lists:seq(1, Threshold + 2)] ++
|
|
[['', I, ''] || I <- lists:seq(1, Threshold + 2)],
|
|
lists:foreach(
|
|
fun(PathSpec) ->
|
|
test_key(T, ThresholdFun, PathSpec)
|
|
end,
|
|
PathsToInsert
|
|
),
|
|
|
|
%% Test that the paths we've inserted are produced in the output
|
|
Paths = paths(T),
|
|
FormattedPaths = lists:map(fun format_path/1, Paths),
|
|
ExpectedWildcardPaths =
|
|
[
|
|
[4, '+', 4],
|
|
['', '+', '']
|
|
],
|
|
ExpectedPaths =
|
|
[
|
|
[''],
|
|
[1],
|
|
[2, 2],
|
|
[3, 3, 3]
|
|
] ++ [[4, I, 4] || I <- lists:seq(1, Threshold)] ++
|
|
[['', I, ''] || I <- lists:seq(1, Threshold)] ++
|
|
ExpectedWildcardPaths,
|
|
FormatPathSpec =
|
|
fun(PathSpec) ->
|
|
lists:map(
|
|
fun
|
|
(I) when is_integer(I) -> integer_to_binary(I);
|
|
(A) -> A
|
|
end,
|
|
PathSpec
|
|
) ++ [?EOT]
|
|
end,
|
|
lists:foreach(
|
|
fun(PathSpec) ->
|
|
Path = FormatPathSpec(PathSpec),
|
|
?assert(
|
|
lists:member(Path, FormattedPaths),
|
|
#{
|
|
paths => FormattedPaths,
|
|
expected_path => Path
|
|
}
|
|
)
|
|
end,
|
|
ExpectedPaths
|
|
),
|
|
|
|
%% Test filter function for paths containing wildcards
|
|
WildcardPaths = lists:filter(
|
|
fun(Path) ->
|
|
lists:member(?PLUS, tokens_of_path(Path))
|
|
end,
|
|
Paths
|
|
),
|
|
FormattedWildcardPaths = lists:map(fun format_path/1, WildcardPaths),
|
|
?assertEqual(
|
|
sets:from_list(lists:map(FormatPathSpec, ExpectedWildcardPaths), [{version, 2}]),
|
|
sets:from_list(FormattedWildcardPaths, [{version, 2}]),
|
|
#{
|
|
expected => ExpectedWildcardPaths,
|
|
wildcards => FormattedWildcardPaths
|
|
}
|
|
),
|
|
|
|
%% Test that we're able to reconstruct the same trie from the paths
|
|
T2 = trie_create(),
|
|
[
|
|
trie_insert(T2, State, Edge, Next)
|
|
|| Path <- Paths,
|
|
{{State, Edge}, Next} <- Path
|
|
],
|
|
#trie{trie = Tab1} = T,
|
|
#trie{trie = Tab2} = T2,
|
|
Dump1 = sets:from_list(ets:tab2list(Tab1), [{version, 2}]),
|
|
Dump2 = sets:from_list(ets:tab2list(Tab2), [{version, 2}]),
|
|
?assertEqual(Dump1, Dump2).
|
|
|
|
format_path([{{_State, Edge}, _Next} | Rest]) ->
|
|
[Edge | format_path(Rest)];
|
|
format_path([]) ->
|
|
[].
|
|
|
|
compress_topic_test() ->
|
|
%% Structure without wildcards:
|
|
?assertEqual([], compress_topic(42, [], [])),
|
|
?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>])),
|
|
?assertEqual([], compress_topic(42, [<<"foo">>, ''], [<<"foo">>, ''])),
|
|
?assertEqual([], compress_topic(42, [<<"foo">>, ''], [<<"foo">>, '+'])),
|
|
?assertEqual([], compress_topic(42, [<<"foo">>, ''], ['+', '+'])),
|
|
?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], ['#'])),
|
|
?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], [<<"foo">>, <<"bar">>, '#'])),
|
|
?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], ['+', '#'])),
|
|
?assertEqual(
|
|
[], compress_topic(42, [<<"foo">>, <<"bar">>, ''], [<<"foo">>, <<"bar">>, '', '#'])
|
|
),
|
|
%% With wildcards:
|
|
?assertEqual(
|
|
[<<"1">>], compress_topic(42, [<<"foo">>, '+', <<"bar">>], [<<"foo">>, <<"1">>, <<"bar">>])
|
|
),
|
|
?assertEqual(
|
|
[<<"1">>, <<"2">>],
|
|
compress_topic(
|
|
42,
|
|
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
|
|
[<<"foo">>, <<"1">>, <<"bar">>, <<"2">>, <<"baz">>]
|
|
)
|
|
),
|
|
?assertEqual(
|
|
['+', <<"2">>],
|
|
compress_topic(
|
|
42,
|
|
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
|
|
[<<"foo">>, '+', <<"bar">>, <<"2">>, <<"baz">>]
|
|
)
|
|
),
|
|
?assertEqual(
|
|
['+', '+'],
|
|
compress_topic(
|
|
42,
|
|
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
|
|
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>]
|
|
)
|
|
),
|
|
?assertEqual(
|
|
['+', '+'],
|
|
compress_topic(
|
|
42,
|
|
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
|
|
['#']
|
|
)
|
|
),
|
|
?assertEqual(
|
|
['+', '+'],
|
|
compress_topic(
|
|
42,
|
|
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
|
|
[<<"foo">>, '+', '+', '#']
|
|
)
|
|
),
|
|
%% Mismatch:
|
|
?assertException(_, {unrecoverable, _}, compress_topic(42, [<<"foo">>], [<<"bar">>])),
|
|
?assertException(_, {unrecoverable, _}, compress_topic(42, [], [<<"bar">>])),
|
|
?assertException(_, {unrecoverable, _}, compress_topic(42, [<<"foo">>], [])),
|
|
?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], ['', '', ''])),
|
|
?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], [<<"foo">>, '#'])),
|
|
?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], ['+', '+', '+', '#'])),
|
|
?assertException(_, {unrecoverable, _}, compress_topic(42, ['+'], [<<"bar">>, '+'])),
|
|
?assertException(
|
|
_, {unrecoverable, _}, compress_topic(42, [<<"foo">>, '+'], [<<"bar">>, <<"baz">>])
|
|
).
|
|
|
|
decompress_topic_test() ->
|
|
%% Structure without wildcards:
|
|
?assertEqual([], decompress_topic([], [])),
|
|
?assertEqual(
|
|
[<<"foo">>, '', <<"bar">>],
|
|
decompress_topic([<<"foo">>, '', <<"bar">>], [])
|
|
),
|
|
%% With wildcards:
|
|
?assertEqual(
|
|
[<<"foo">>, '', <<"bar">>, <<"baz">>],
|
|
decompress_topic([<<"foo">>, '+', <<"bar">>, '+'], ['', <<"baz">>])
|
|
),
|
|
?assertEqual(
|
|
[<<"foo">>, '+', <<"bar">>, '+', ''],
|
|
decompress_topic([<<"foo">>, '+', <<"bar">>, '+', ''], ['+', '+'])
|
|
).
|
|
|
|
-endif.
|