feat(ds_lts): New APIs: info, reverse lookups and topic compression
This commit is contained in:
parent
eb80402ccb
commit
f84fb34692
|
@ -26,7 +26,13 @@
|
||||||
trie_copy_learned_paths/2,
|
trie_copy_learned_paths/2,
|
||||||
topic_key/3,
|
topic_key/3,
|
||||||
match_topics/2,
|
match_topics/2,
|
||||||
lookup_topic_key/2
|
lookup_topic_key/2,
|
||||||
|
reverse_lookup/2,
|
||||||
|
info/2,
|
||||||
|
info/1,
|
||||||
|
|
||||||
|
compress_topic/3,
|
||||||
|
decompress_topic/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Debug:
|
%% Debug:
|
||||||
|
@ -34,18 +40,21 @@
|
||||||
|
|
||||||
-export_type([
|
-export_type([
|
||||||
options/0,
|
options/0,
|
||||||
|
level/0,
|
||||||
static_key/0,
|
static_key/0,
|
||||||
trie/0,
|
trie/0,
|
||||||
msg_storage_key/0
|
msg_storage_key/0,
|
||||||
|
learned_structure/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("stdlib/include/ms_transform.hrl").
|
-include_lib("stdlib/include/ms_transform.hrl").
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-endif.
|
|
||||||
|
|
||||||
-elvis([{elvis_style, variable_naming_convention, disable}]).
|
-elvis([{elvis_style, variable_naming_convention, disable}]).
|
||||||
|
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
|
||||||
|
-endif.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Type declarations
|
%% Type declarations
|
||||||
|
@ -55,15 +64,22 @@
|
||||||
-define(EOT, []).
|
-define(EOT, []).
|
||||||
-define(PLUS, '+').
|
-define(PLUS, '+').
|
||||||
|
|
||||||
-type edge() :: binary() | ?EOT | ?PLUS.
|
-type level() :: binary() | ''.
|
||||||
|
|
||||||
%% Fixed size binary
|
-type edge() :: level() | ?EOT | ?PLUS.
|
||||||
-type static_key() :: non_neg_integer().
|
|
||||||
|
|
||||||
|
%% Fixed size binary or integer, depending on the options:
|
||||||
|
-type static_key() :: non_neg_integer() | binary().
|
||||||
|
|
||||||
|
%% Trie root:
|
||||||
-define(PREFIX, prefix).
|
-define(PREFIX, prefix).
|
||||||
|
%% Special prefix root for reverse lookups:
|
||||||
|
-define(rlookup, rlookup).
|
||||||
|
-define(rlookup(STATIC), {?rlookup, STATIC}).
|
||||||
|
|
||||||
-type state() :: static_key() | ?PREFIX.
|
-type state() :: static_key() | ?PREFIX.
|
||||||
|
|
||||||
-type varying() :: [binary() | ?PLUS].
|
-type varying() :: [level() | ?PLUS].
|
||||||
|
|
||||||
-type msg_storage_key() :: {static_key(), varying()}.
|
-type msg_storage_key() :: {static_key(), varying()}.
|
||||||
|
|
||||||
|
@ -71,27 +87,42 @@
|
||||||
|
|
||||||
-type persist_callback() :: fun((_Key, _Val) -> ok).
|
-type persist_callback() :: fun((_Key, _Val) -> ok).
|
||||||
|
|
||||||
|
-type learned_structure() :: [level() | ?PLUS, ...].
|
||||||
|
|
||||||
-type options() ::
|
-type options() ::
|
||||||
#{
|
#{
|
||||||
persist_callback => persist_callback(),
|
persist_callback => persist_callback(),
|
||||||
static_key_size => pos_integer()
|
%% If set, static key is an integer that fits in a given nubmer of bits:
|
||||||
|
static_key_bits => pos_integer(),
|
||||||
|
%% If set, static key is a _binary_ of a given length:
|
||||||
|
static_key_bytes => pos_integer(),
|
||||||
|
reverse_lookups => boolean()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-type dump() :: [{_Key, _Val}].
|
-type dump() :: [{_Key, _Val}].
|
||||||
|
|
||||||
-record(trie, {
|
-record(trie, {
|
||||||
persist :: persist_callback(),
|
persist :: persist_callback(),
|
||||||
|
is_binary_key :: boolean(),
|
||||||
static_key_size :: pos_integer(),
|
static_key_size :: pos_integer(),
|
||||||
trie :: ets:tid(),
|
trie :: ets:tid(),
|
||||||
stats :: ets:tid()
|
stats :: ets:tid(),
|
||||||
|
rlookups = false :: boolean()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-opaque trie() :: #trie{}.
|
-opaque trie() :: #trie{}.
|
||||||
|
|
||||||
-record(trans, {
|
-record(trans, {key, next}).
|
||||||
|
|
||||||
|
-type trans() ::
|
||||||
|
#trans{
|
||||||
key :: {state(), edge()},
|
key :: {state(), edge()},
|
||||||
next :: state()
|
next :: state()
|
||||||
}).
|
}
|
||||||
|
| #trans{
|
||||||
|
key :: {?rlookup, static_key()},
|
||||||
|
next :: [level() | ?PLUS]
|
||||||
|
}.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API functions
|
%% API functions
|
||||||
|
@ -100,21 +131,31 @@
|
||||||
%% @doc Create an empty trie
|
%% @doc Create an empty trie
|
||||||
-spec trie_create(options()) -> trie().
|
-spec trie_create(options()) -> trie().
|
||||||
trie_create(UserOpts) ->
|
trie_create(UserOpts) ->
|
||||||
Defaults = #{
|
Persist = maps:get(
|
||||||
persist_callback => fun(_, _) -> ok end,
|
persist_callback,
|
||||||
static_key_size => 8
|
UserOpts,
|
||||||
},
|
fun(_, _) -> ok end
|
||||||
#{
|
),
|
||||||
persist_callback := Persist,
|
Rlookups = maps:get(reverse_lookups, UserOpts, false),
|
||||||
static_key_size := StaticKeySize
|
IsBinaryKey =
|
||||||
} = maps:merge(Defaults, UserOpts),
|
case UserOpts of
|
||||||
|
#{static_key_bits := StaticKeySize} ->
|
||||||
|
false;
|
||||||
|
#{static_key_bytes := StaticKeySize} ->
|
||||||
|
true;
|
||||||
|
_ ->
|
||||||
|
StaticKeySize = 16,
|
||||||
|
true
|
||||||
|
end,
|
||||||
Trie = ets:new(trie, [{keypos, #trans.key}, set, public]),
|
Trie = ets:new(trie, [{keypos, #trans.key}, set, public]),
|
||||||
Stats = ets:new(stats, [{keypos, 1}, set, public]),
|
Stats = ets:new(stats, [{keypos, 1}, set, public]),
|
||||||
#trie{
|
#trie{
|
||||||
persist = Persist,
|
persist = Persist,
|
||||||
|
is_binary_key = IsBinaryKey,
|
||||||
static_key_size = StaticKeySize,
|
static_key_size = StaticKeySize,
|
||||||
trie = Trie,
|
trie = Trie,
|
||||||
stats = Stats
|
stats = Stats,
|
||||||
|
rlookups = Rlookups
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-spec trie_create() -> trie().
|
-spec trie_create() -> trie().
|
||||||
|
@ -149,9 +190,21 @@ trie_dump(Trie, Filter) ->
|
||||||
all ->
|
all ->
|
||||||
Fun = fun(_) -> true end;
|
Fun = fun(_) -> true end;
|
||||||
wildcard ->
|
wildcard ->
|
||||||
Fun = fun contains_wildcard/1
|
Fun = fun(L) -> lists:member(?PLUS, L) end
|
||||||
end,
|
end,
|
||||||
lists:append([P || P <- paths(Trie), Fun(P)]).
|
Paths = lists:filter(
|
||||||
|
fun(Path) ->
|
||||||
|
Fun(tokens_of_path(Path))
|
||||||
|
end,
|
||||||
|
paths(Trie)
|
||||||
|
),
|
||||||
|
RlookupIdx = lists:filter(
|
||||||
|
fun({_, Tokens}) ->
|
||||||
|
Fun(Tokens)
|
||||||
|
end,
|
||||||
|
all_emanating(Trie, ?rlookup)
|
||||||
|
),
|
||||||
|
lists:flatten([Paths, RlookupIdx]).
|
||||||
|
|
||||||
-spec trie_copy_learned_paths(trie(), trie()) -> trie().
|
-spec trie_copy_learned_paths(trie(), trie()) -> trie().
|
||||||
trie_copy_learned_paths(OldTrie, NewTrie) ->
|
trie_copy_learned_paths(OldTrie, NewTrie) ->
|
||||||
|
@ -164,17 +217,17 @@ trie_copy_learned_paths(OldTrie, NewTrie) ->
|
||||||
NewTrie.
|
NewTrie.
|
||||||
|
|
||||||
%% @doc Lookup the topic key. Create a new one, if not found.
|
%% @doc Lookup the topic key. Create a new one, if not found.
|
||||||
-spec topic_key(trie(), threshold_fun(), [binary() | '']) -> msg_storage_key().
|
-spec topic_key(trie(), threshold_fun(), [level()]) -> msg_storage_key().
|
||||||
topic_key(Trie, ThresholdFun, Tokens) ->
|
topic_key(Trie, ThresholdFun, Tokens) ->
|
||||||
do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, []).
|
do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, [], []).
|
||||||
|
|
||||||
%% @doc Return an exisiting topic key if it exists.
|
%% @doc Return an exisiting topic key if it exists.
|
||||||
-spec lookup_topic_key(trie(), [binary()]) -> {ok, msg_storage_key()} | undefined.
|
-spec lookup_topic_key(trie(), [level()]) -> {ok, msg_storage_key()} | undefined.
|
||||||
lookup_topic_key(Trie, Tokens) ->
|
lookup_topic_key(Trie, Tokens) ->
|
||||||
do_lookup_topic_key(Trie, ?PREFIX, Tokens, []).
|
do_lookup_topic_key(Trie, ?PREFIX, Tokens, []).
|
||||||
|
|
||||||
%% @doc Return list of keys of topics that match a given topic filter
|
%% @doc Return list of keys of topics that match a given topic filter
|
||||||
-spec match_topics(trie(), [binary() | '+' | '#']) ->
|
-spec match_topics(trie(), [level() | '+' | '#']) ->
|
||||||
[msg_storage_key()].
|
[msg_storage_key()].
|
||||||
match_topics(Trie, TopicFilter) ->
|
match_topics(Trie, TopicFilter) ->
|
||||||
do_match_topics(Trie, ?PREFIX, [], TopicFilter).
|
do_match_topics(Trie, ?PREFIX, [], TopicFilter).
|
||||||
|
@ -206,7 +259,8 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) ->
|
||||||
{ok, FD} = file:open(Filename, [write]),
|
{ok, FD} = file:open(Filename, [write]),
|
||||||
Print = fun
|
Print = fun
|
||||||
(?PREFIX) -> "prefix";
|
(?PREFIX) -> "prefix";
|
||||||
(NodeId) -> integer_to_binary(NodeId, 16)
|
(Bin) when is_binary(Bin) -> Bin;
|
||||||
|
(NodeId) when is_integer(NodeId) -> integer_to_binary(NodeId, 16)
|
||||||
end,
|
end,
|
||||||
io:format(FD, "digraph {~n", []),
|
io:format(FD, "digraph {~n", []),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
|
@ -225,11 +279,64 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) ->
|
||||||
io:format(FD, "}~n", []),
|
io:format(FD, "}~n", []),
|
||||||
file:close(FD).
|
file:close(FD).
|
||||||
|
|
||||||
|
-spec reverse_lookup(trie(), static_key()) -> {ok, learned_structure()} | undefined.
|
||||||
|
reverse_lookup(#trie{rlookups = false}, _) ->
|
||||||
|
error({badarg, reverse_lookups_disabled});
|
||||||
|
reverse_lookup(#trie{trie = Trie}, StaticKey) ->
|
||||||
|
case ets:lookup(Trie, ?rlookup(StaticKey)) of
|
||||||
|
[#trans{next = Next}] ->
|
||||||
|
{ok, Next};
|
||||||
|
[] ->
|
||||||
|
undefined
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Get information about the trie.
|
||||||
|
%%
|
||||||
|
%% Note: `reverse_lookups' must be enabled to get the number of
|
||||||
|
%% topics.
|
||||||
|
-spec info(trie(), size | topics) -> _.
|
||||||
|
info(#trie{rlookups = true, stats = Stats}, topics) ->
|
||||||
|
case ets:lookup(Stats, ?rlookup) of
|
||||||
|
[{_, N}] -> N;
|
||||||
|
[] -> 0
|
||||||
|
end;
|
||||||
|
info(#trie{}, topics) ->
|
||||||
|
undefined;
|
||||||
|
info(#trie{trie = T}, size) ->
|
||||||
|
ets:info(T, size).
|
||||||
|
|
||||||
|
%% @doc Return size of the trie
|
||||||
|
-spec info(trie()) -> proplists:proplist().
|
||||||
|
info(Trie) ->
|
||||||
|
[
|
||||||
|
{size, info(Trie, size)},
|
||||||
|
{topics, info(Trie, topics)}
|
||||||
|
].
|
||||||
|
|
||||||
|
%%%%%%%% Topic compression %%%%%%%%%%
|
||||||
|
|
||||||
|
%% @doc Given topic structure for the static LTS index (as returned by
|
||||||
|
%% `reverse_lookup'), compress a topic filter to exclude static
|
||||||
|
%% levels:
|
||||||
|
-spec compress_topic(static_key(), learned_structure(), emqx_ds:topic_filter()) ->
|
||||||
|
[emqx_ds_lts:level() | '+'].
|
||||||
|
compress_topic(StaticKey, TopicStructure, TopicFilter) ->
|
||||||
|
compress_topic(StaticKey, TopicStructure, TopicFilter, []).
|
||||||
|
|
||||||
|
%% @doc Given topic structure and a compressed topic filter, return
|
||||||
|
%% the original* topic filter.
|
||||||
|
%%
|
||||||
|
%% * '#' will be replaced with '+'s
|
||||||
|
-spec decompress_topic(learned_structure(), [level() | '+']) ->
|
||||||
|
emqx_ds:topic_filter().
|
||||||
|
decompress_topic(TopicStructure, Topic) ->
|
||||||
|
decompress_topic(TopicStructure, Topic, []).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Internal exports
|
%% Internal exports
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec trie_next(trie(), state(), binary() | ?EOT) -> {Wildcard, state()} | undefined when
|
-spec trie_next(trie(), state(), level() | ?EOT) -> {Wildcard, state()} | undefined when
|
||||||
Wildcard :: boolean().
|
Wildcard :: boolean().
|
||||||
trie_next(#trie{trie = Trie}, State, ?EOT) ->
|
trie_next(#trie{trie = Trie}, State, ?EOT) ->
|
||||||
case ets:lookup(Trie, {State, ?EOT}) of
|
case ets:lookup(Trie, {State, ?EOT}) of
|
||||||
|
@ -261,16 +368,19 @@ trie_insert(Trie, State, Token) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec trie_insert(trie(), state(), edge(), state()) -> {Updated, state()} when
|
-spec trie_insert
|
||||||
|
(trie(), state(), edge(), state()) -> {Updated, state()} when
|
||||||
NChildren :: non_neg_integer(),
|
NChildren :: non_neg_integer(),
|
||||||
Updated :: false | NChildren.
|
Updated :: false | NChildren;
|
||||||
|
(trie(), ?rlookup, static_key(), [level() | '+']) ->
|
||||||
|
{false | non_neg_integer(), state()}.
|
||||||
trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token, NewState) ->
|
trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token, NewState) ->
|
||||||
Key = {State, Token},
|
Key = {State, Token},
|
||||||
Rec = #trans{
|
Rec = #trans{
|
||||||
key = Key,
|
key = Key,
|
||||||
next = NewState
|
next = NewState
|
||||||
},
|
},
|
||||||
case ets:insert_new(Trie, Rec) of
|
case ets_insert_new(Trie, Rec) of
|
||||||
true ->
|
true ->
|
||||||
ok = Persist(Key, NewState),
|
ok = Persist(Key, NewState),
|
||||||
Inc =
|
Inc =
|
||||||
|
@ -287,7 +397,7 @@ trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token,
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec get_id_for_key(trie(), state(), edge()) -> static_key().
|
-spec get_id_for_key(trie(), state(), edge()) -> static_key().
|
||||||
get_id_for_key(#trie{static_key_size = Size}, State, Token) when Size =< 32 ->
|
get_id_for_key(#trie{is_binary_key = IsBin, static_key_size = Size}, State, Token) ->
|
||||||
%% Requirements for the return value:
|
%% Requirements for the return value:
|
||||||
%%
|
%%
|
||||||
%% It should be globally unique for the `{State, Token}` pair. Other
|
%% It should be globally unique for the `{State, Token}` pair. Other
|
||||||
|
@ -303,11 +413,17 @@ get_id_for_key(#trie{static_key_size = Size}, State, Token) when Size =< 32 ->
|
||||||
%% If we want to impress computer science crowd, sorry, I mean to
|
%% If we want to impress computer science crowd, sorry, I mean to
|
||||||
%% minimize storage requirements, we can even employ Huffman coding
|
%% minimize storage requirements, we can even employ Huffman coding
|
||||||
%% based on the frequency of messages.
|
%% based on the frequency of messages.
|
||||||
<<Int:(Size * 8), _/bytes>> = crypto:hash(sha256, term_to_binary([State | Token])),
|
Hash = crypto:hash(sha256, term_to_binary([State | Token])),
|
||||||
Int.
|
case IsBin of
|
||||||
|
false ->
|
||||||
|
<<Int:(Size * 8), _/bytes>> = Hash,
|
||||||
|
Int;
|
||||||
|
true ->
|
||||||
|
element(1, erlang:split_binary(Hash, Size))
|
||||||
|
end.
|
||||||
|
|
||||||
%% erlfmt-ignore
|
%% erlfmt-ignore
|
||||||
-spec do_match_topics(trie(), state(), [binary() | '+'], [binary() | '+' | '#']) ->
|
-spec do_match_topics(trie(), state(), [level() | '+'], [level() | '+' | '#']) ->
|
||||||
list().
|
list().
|
||||||
do_match_topics(Trie, State, Varying, []) ->
|
do_match_topics(Trie, State, Varying, []) ->
|
||||||
case trie_next(Trie, State, ?EOT) of
|
case trie_next(Trie, State, ?EOT) of
|
||||||
|
@ -341,7 +457,7 @@ do_match_topics(Trie, State, Varying, [Level | Rest]) ->
|
||||||
Emanating
|
Emanating
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec do_lookup_topic_key(trie(), state(), [binary()], [binary()]) ->
|
-spec do_lookup_topic_key(trie(), state(), [level()], [level()]) ->
|
||||||
{ok, msg_storage_key()} | undefined.
|
{ok, msg_storage_key()} | undefined.
|
||||||
do_lookup_topic_key(Trie, State, [], Varying) ->
|
do_lookup_topic_key(Trie, State, [], Varying) ->
|
||||||
case trie_next(Trie, State, ?EOT) of
|
case trie_next(Trie, State, ?EOT) of
|
||||||
|
@ -360,29 +476,42 @@ do_lookup_topic_key(Trie, State, [Tok | Rest], Varying) ->
|
||||||
undefined
|
undefined
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_topic_key(Trie, _, _, State, [], Varying) ->
|
do_topic_key(Trie, _, _, State, [], Tokens, Varying) ->
|
||||||
%% We reached the end of topic. Assert: Trie node that corresponds
|
%% We reached the end of topic. Assert: Trie node that corresponds
|
||||||
%% to EOT cannot be a wildcard.
|
%% to EOT cannot be a wildcard.
|
||||||
{_, false, Static} = trie_next_(Trie, State, ?EOT),
|
{Updated, false, Static} = trie_next_(Trie, State, ?EOT),
|
||||||
|
_ =
|
||||||
|
case Trie#trie.rlookups andalso Updated of
|
||||||
|
false ->
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
trie_insert(Trie, rlookup, Static, lists:reverse(Tokens))
|
||||||
|
end,
|
||||||
{Static, lists:reverse(Varying)};
|
{Static, lists:reverse(Varying)};
|
||||||
do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Varying0) ->
|
do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Tokens, Varying0) ->
|
||||||
% TODO: it's not necessary to call it every time.
|
% TODO: it's not necessary to call it every time.
|
||||||
Threshold = ThresholdFun(Depth),
|
Threshold = ThresholdFun(Depth),
|
||||||
|
{NChildren, IsWildcard, NextState} = trie_next_(Trie, State, Tok),
|
||||||
Varying =
|
Varying =
|
||||||
case trie_next_(Trie, State, Tok) of
|
case IsWildcard of
|
||||||
{NChildren, _, NextState} when is_integer(NChildren), NChildren >= Threshold ->
|
_ when is_integer(NChildren), NChildren >= Threshold ->
|
||||||
%% Number of children for the trie node reached the
|
%% Number of children for the trie node reached the
|
||||||
%% threshold, we need to insert wildcard here.
|
%% threshold, we need to insert wildcard here.
|
||||||
{_, _WildcardState} = trie_insert(Trie, State, ?PLUS),
|
{_, _WildcardState} = trie_insert(Trie, State, ?PLUS),
|
||||||
Varying0;
|
Varying0;
|
||||||
{_, false, NextState} ->
|
false ->
|
||||||
Varying0;
|
Varying0;
|
||||||
{_, true, NextState} ->
|
true ->
|
||||||
%% This topic level is marked as wildcard in the trie,
|
%% This topic level is marked as wildcard in the trie,
|
||||||
%% we need to add it to the varying part of the key:
|
%% we need to add it to the varying part of the key:
|
||||||
[Tok | Varying0]
|
[Tok | Varying0]
|
||||||
end,
|
end,
|
||||||
do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, Varying).
|
TokOrWildcard =
|
||||||
|
case IsWildcard of
|
||||||
|
true -> ?PLUS;
|
||||||
|
false -> Tok
|
||||||
|
end,
|
||||||
|
do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, [TokOrWildcard | Tokens], Varying).
|
||||||
|
|
||||||
%% @doc Has side effects! Inserts missing elements
|
%% @doc Has side effects! Inserts missing elements
|
||||||
-spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} when
|
-spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} when
|
||||||
|
@ -450,12 +579,51 @@ follow_path(#trie{} = T, State, Path) ->
|
||||||
all_emanating(T, State)
|
all_emanating(T, State)
|
||||||
).
|
).
|
||||||
|
|
||||||
contains_wildcard([{{_State, ?PLUS}, _Next} | _Rest]) ->
|
tokens_of_path([{{_State, Token}, _Next} | Rest]) ->
|
||||||
true;
|
[Token | tokens_of_path(Rest)];
|
||||||
contains_wildcard([_ | Rest]) ->
|
tokens_of_path([]) ->
|
||||||
contains_wildcard(Rest);
|
[].
|
||||||
contains_wildcard([]) ->
|
|
||||||
false.
|
%% Wrapper for type checking only:
|
||||||
|
-compile({inline, ets_insert_new/2}).
|
||||||
|
-spec ets_insert_new(ets:tid(), trans()) -> boolean().
|
||||||
|
ets_insert_new(Tid, Trans) ->
|
||||||
|
ets:insert_new(Tid, Trans).
|
||||||
|
|
||||||
|
compress_topic(_StaticKey, [], [], Acc) ->
|
||||||
|
lists:reverse(Acc);
|
||||||
|
compress_topic(StaticKey, TStructL0, ['#'], Acc) ->
|
||||||
|
case TStructL0 of
|
||||||
|
[] ->
|
||||||
|
lists:reverse(Acc);
|
||||||
|
['+' | TStructL] ->
|
||||||
|
compress_topic(StaticKey, TStructL, ['#'], ['+' | Acc]);
|
||||||
|
[_ | TStructL] ->
|
||||||
|
compress_topic(StaticKey, TStructL, ['#'], Acc)
|
||||||
|
end;
|
||||||
|
compress_topic(StaticKey, ['+' | TStructL], [Level | TopicL], Acc) ->
|
||||||
|
compress_topic(StaticKey, TStructL, TopicL, [Level | Acc]);
|
||||||
|
compress_topic(StaticKey, [Struct | TStructL], [Level | TopicL], Acc) when
|
||||||
|
Level =:= '+'; Level =:= Struct
|
||||||
|
->
|
||||||
|
compress_topic(StaticKey, TStructL, TopicL, Acc);
|
||||||
|
compress_topic(StaticKey, TStructL, TopicL, _Acc) ->
|
||||||
|
%% Topic is mismatched with the structure. This should never
|
||||||
|
%% happen. LTS got corrupted?
|
||||||
|
Err = #{
|
||||||
|
msg => 'Topic structure mismatch',
|
||||||
|
static_key => StaticKey,
|
||||||
|
input => TopicL,
|
||||||
|
structure => TStructL
|
||||||
|
},
|
||||||
|
throw({unrecoverable, Err}).
|
||||||
|
|
||||||
|
decompress_topic(['+' | TStructL], [Level | TopicL], Acc) ->
|
||||||
|
decompress_topic(TStructL, TopicL, [Level | Acc]);
|
||||||
|
decompress_topic([StaticLevel | TStructL], TopicL, Acc) ->
|
||||||
|
decompress_topic(TStructL, TopicL, [StaticLevel | Acc]);
|
||||||
|
decompress_topic([], [], Acc) ->
|
||||||
|
lists:reverse(Acc).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Tests
|
%% Tests
|
||||||
|
@ -658,6 +826,76 @@ topic_match_test() ->
|
||||||
dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot"))
|
dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot"))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% erlfmt-ignore
|
||||||
|
rlookup_test() ->
|
||||||
|
T = trie_create(#{reverse_lookups => true}),
|
||||||
|
Threshold = 2,
|
||||||
|
ThresholdFun = fun(0) -> 1000;
|
||||||
|
(_) -> Threshold
|
||||||
|
end,
|
||||||
|
{S1, []} = test_key(T, ThresholdFun, [1]),
|
||||||
|
{S11, []} = test_key(T, ThresholdFun, [1, 1]),
|
||||||
|
{S12, []} = test_key(T, ThresholdFun, [1, 2]),
|
||||||
|
{S111, []} = test_key(T, ThresholdFun, [1, 1, 1]),
|
||||||
|
{S11e, []} = test_key(T, ThresholdFun, [1, 1, '']),
|
||||||
|
%% Now add learned wildcards:
|
||||||
|
{S21, []} = test_key(T, ThresholdFun, [2, 1]),
|
||||||
|
{S22, []} = test_key(T, ThresholdFun, [2, 2]),
|
||||||
|
{S2_, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3]),
|
||||||
|
{S2_11, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 1]),
|
||||||
|
{S2_12, [<<"4">>]} = test_key(T, ThresholdFun, [2, 4, 1, 2]),
|
||||||
|
{S2_1_, [<<"3">>, <<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 3]),
|
||||||
|
%% Check reverse matching:
|
||||||
|
?assertEqual({ok, [<<"1">>]}, reverse_lookup(T, S1)),
|
||||||
|
?assertEqual({ok, [<<"1">>, <<"1">>]}, reverse_lookup(T, S11)),
|
||||||
|
?assertEqual({ok, [<<"1">>, <<"2">>]}, reverse_lookup(T, S12)),
|
||||||
|
?assertEqual({ok, [<<"1">>, <<"1">>, <<"1">>]}, reverse_lookup(T, S111)),
|
||||||
|
?assertEqual({ok, [<<"1">>, <<"1">>, '']}, reverse_lookup(T, S11e)),
|
||||||
|
?assertEqual({ok, [<<"2">>, <<"1">>]}, reverse_lookup(T, S21)),
|
||||||
|
?assertEqual({ok, [<<"2">>, <<"2">>]}, reverse_lookup(T, S22)),
|
||||||
|
?assertEqual({ok, [<<"2">>, '+']}, reverse_lookup(T, S2_)),
|
||||||
|
?assertEqual({ok, [<<"2">>, '+', <<"1">>, <<"1">>]}, reverse_lookup(T, S2_11)),
|
||||||
|
?assertEqual({ok, [<<"2">>, '+', <<"1">>, <<"2">>]}, reverse_lookup(T, S2_12)),
|
||||||
|
?assertEqual({ok, [<<"2">>, '+', <<"1">>, '+']}, reverse_lookup(T, S2_1_)),
|
||||||
|
%% Dump and restore trie to make sure rlookup still works:
|
||||||
|
T1 = trie_restore(#{reverse_lookups => true}, trie_dump(T, all)),
|
||||||
|
destroy(T),
|
||||||
|
?assertEqual({ok, [<<"2">>, <<"1">>]}, reverse_lookup(T1, S21)),
|
||||||
|
?assertEqual({ok, [<<"2">>, '+', <<"1">>, '+']}, reverse_lookup(T1, S2_1_)).
|
||||||
|
|
||||||
|
n_topics_test() ->
|
||||||
|
Threshold = 3,
|
||||||
|
ThresholdFun = fun
|
||||||
|
(0) -> 1000;
|
||||||
|
(_) -> Threshold
|
||||||
|
end,
|
||||||
|
|
||||||
|
T = trie_create(#{reverse_lookups => true}),
|
||||||
|
?assertEqual(0, info(T, topics)),
|
||||||
|
{S11, []} = test_key(T, ThresholdFun, [1, 1]),
|
||||||
|
{S11, []} = test_key(T, ThresholdFun, [1, 1]),
|
||||||
|
?assertEqual(1, info(T, topics)),
|
||||||
|
|
||||||
|
{S12, []} = test_key(T, ThresholdFun, [1, 2]),
|
||||||
|
{S12, []} = test_key(T, ThresholdFun, [1, 2]),
|
||||||
|
?assertEqual(2, info(T, topics)),
|
||||||
|
|
||||||
|
{_S13, []} = test_key(T, ThresholdFun, [1, 3]),
|
||||||
|
?assertEqual(3, info(T, topics)),
|
||||||
|
|
||||||
|
{S1_, [_]} = test_key(T, ThresholdFun, [1, 4]),
|
||||||
|
?assertEqual(4, info(T, topics)),
|
||||||
|
|
||||||
|
{S1_, [_]} = test_key(T, ThresholdFun, [1, 5]),
|
||||||
|
{S1_, [_]} = test_key(T, ThresholdFun, [1, 6]),
|
||||||
|
{S1_, [_]} = test_key(T, ThresholdFun, [1, 7]),
|
||||||
|
?assertEqual(4, info(T, topics)),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
[{size, N}, {topics, 4}] when is_integer(N),
|
||||||
|
info(T)
|
||||||
|
).
|
||||||
|
|
||||||
-define(keys_history, topic_key_history).
|
-define(keys_history, topic_key_history).
|
||||||
|
|
||||||
%% erlfmt-ignore
|
%% erlfmt-ignore
|
||||||
|
@ -773,11 +1011,16 @@ paths_test() ->
|
||||||
),
|
),
|
||||||
|
|
||||||
%% Test filter function for paths containing wildcards
|
%% Test filter function for paths containing wildcards
|
||||||
WildcardPaths = lists:filter(fun contains_wildcard/1, Paths),
|
WildcardPaths = lists:filter(
|
||||||
|
fun(Path) ->
|
||||||
|
lists:member(?PLUS, tokens_of_path(Path))
|
||||||
|
end,
|
||||||
|
Paths
|
||||||
|
),
|
||||||
FormattedWildcardPaths = lists:map(fun format_path/1, WildcardPaths),
|
FormattedWildcardPaths = lists:map(fun format_path/1, WildcardPaths),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
sets:from_list(FormattedWildcardPaths, [{version, 2}]),
|
|
||||||
sets:from_list(lists:map(FormatPathSpec, ExpectedWildcardPaths), [{version, 2}]),
|
sets:from_list(lists:map(FormatPathSpec, ExpectedWildcardPaths), [{version, 2}]),
|
||||||
|
sets:from_list(FormattedWildcardPaths, [{version, 2}]),
|
||||||
#{
|
#{
|
||||||
expected => ExpectedWildcardPaths,
|
expected => ExpectedWildcardPaths,
|
||||||
wildcards => FormattedWildcardPaths
|
wildcards => FormattedWildcardPaths
|
||||||
|
@ -795,13 +1038,97 @@ paths_test() ->
|
||||||
#trie{trie = Tab2} = T2,
|
#trie{trie = Tab2} = T2,
|
||||||
Dump1 = sets:from_list(ets:tab2list(Tab1), [{version, 2}]),
|
Dump1 = sets:from_list(ets:tab2list(Tab1), [{version, 2}]),
|
||||||
Dump2 = sets:from_list(ets:tab2list(Tab2), [{version, 2}]),
|
Dump2 = sets:from_list(ets:tab2list(Tab2), [{version, 2}]),
|
||||||
?assertEqual(Dump1, Dump2),
|
?assertEqual(Dump1, Dump2).
|
||||||
|
|
||||||
ok.
|
|
||||||
|
|
||||||
format_path([{{_State, Edge}, _Next} | Rest]) ->
|
format_path([{{_State, Edge}, _Next} | Rest]) ->
|
||||||
[Edge | format_path(Rest)];
|
[Edge | format_path(Rest)];
|
||||||
format_path([]) ->
|
format_path([]) ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
|
compress_topic_test() ->
|
||||||
|
%% Structure without wildcards:
|
||||||
|
?assertEqual([], compress_topic(42, [], [])),
|
||||||
|
?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>])),
|
||||||
|
?assertEqual([], compress_topic(42, [<<"foo">>, ''], [<<"foo">>, ''])),
|
||||||
|
?assertEqual([], compress_topic(42, [<<"foo">>, ''], [<<"foo">>, '+'])),
|
||||||
|
?assertEqual([], compress_topic(42, [<<"foo">>, ''], ['+', '+'])),
|
||||||
|
?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], ['#'])),
|
||||||
|
?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], [<<"foo">>, <<"bar">>, '#'])),
|
||||||
|
?assertEqual([], compress_topic(42, [<<"foo">>, <<"bar">>, ''], ['+', '#'])),
|
||||||
|
?assertEqual(
|
||||||
|
[], compress_topic(42, [<<"foo">>, <<"bar">>, ''], [<<"foo">>, <<"bar">>, '', '#'])
|
||||||
|
),
|
||||||
|
%% With wildcards:
|
||||||
|
?assertEqual(
|
||||||
|
[<<"1">>], compress_topic(42, [<<"foo">>, '+', <<"bar">>], [<<"foo">>, <<"1">>, <<"bar">>])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
[<<"1">>, <<"2">>],
|
||||||
|
compress_topic(
|
||||||
|
42,
|
||||||
|
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
|
||||||
|
[<<"foo">>, <<"1">>, <<"bar">>, <<"2">>, <<"baz">>]
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
['+', <<"2">>],
|
||||||
|
compress_topic(
|
||||||
|
42,
|
||||||
|
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
|
||||||
|
[<<"foo">>, '+', <<"bar">>, <<"2">>, <<"baz">>]
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
['+', '+'],
|
||||||
|
compress_topic(
|
||||||
|
42,
|
||||||
|
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
|
||||||
|
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>]
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
['+', '+'],
|
||||||
|
compress_topic(
|
||||||
|
42,
|
||||||
|
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
|
||||||
|
['#']
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
['+', '+'],
|
||||||
|
compress_topic(
|
||||||
|
42,
|
||||||
|
[<<"foo">>, '+', <<"bar">>, '+', <<"baz">>],
|
||||||
|
[<<"foo">>, '+', '+', '#']
|
||||||
|
)
|
||||||
|
),
|
||||||
|
%% Mismatch:
|
||||||
|
?assertException(_, {unrecoverable, _}, compress_topic(42, [<<"foo">>], [<<"bar">>])),
|
||||||
|
?assertException(_, {unrecoverable, _}, compress_topic(42, [], [<<"bar">>])),
|
||||||
|
?assertException(_, {unrecoverable, _}, compress_topic(42, [<<"foo">>], [])),
|
||||||
|
?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], ['', '', ''])),
|
||||||
|
?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], [<<"foo">>, '#'])),
|
||||||
|
?assertException(_, {unrecoverable, _}, compress_topic(42, ['', ''], ['+', '+', '+', '#'])),
|
||||||
|
?assertException(_, {unrecoverable, _}, compress_topic(42, ['+'], [<<"bar">>, '+'])),
|
||||||
|
?assertException(
|
||||||
|
_, {unrecoverable, _}, compress_topic(42, [<<"foo">>, '+'], [<<"bar">>, <<"baz">>])
|
||||||
|
).
|
||||||
|
|
||||||
|
decompress_topic_test() ->
|
||||||
|
%% Structure without wildcards:
|
||||||
|
?assertEqual([], decompress_topic([], [])),
|
||||||
|
?assertEqual(
|
||||||
|
[<<"foo">>, '', <<"bar">>],
|
||||||
|
decompress_topic([<<"foo">>, '', <<"bar">>], [])
|
||||||
|
),
|
||||||
|
%% With wildcards:
|
||||||
|
?assertEqual(
|
||||||
|
[<<"foo">>, '', <<"bar">>, <<"baz">>],
|
||||||
|
decompress_topic([<<"foo">>, '+', <<"bar">>, '+'], ['', <<"baz">>])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
[<<"foo">>, '+', <<"bar">>, '+', ''],
|
||||||
|
decompress_topic([<<"foo">>, '+', <<"bar">>, '+', ''], ['+', '+'])
|
||||||
|
).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
Loading…
Reference in New Issue