feat(ds): Add persist callback to LTS trie

This commit is contained in:
ieQu1 2023-10-11 16:38:16 +02:00
parent c149e0e2df
commit f1ab7c8a7c
1 changed files with 29 additions and 9 deletions

View File

@ -17,10 +17,10 @@
-module(emqx_ds_lts).
%% API:
-export([trie_create/0, topic_key/3, match_topics/2, lookup_topic_key/2, dump_to_dot/2]).
-export([trie_create/1, trie_create/0, topic_key/3, match_topics/2, lookup_topic_key/2]).
%% Debug:
-export([trie_next/3, trie_insert/3]).
-export([trie_next/3, trie_insert/3, dump_to_dot/2]).
-export_type([static_key/0, trie/0]).
@ -46,11 +46,16 @@
-define(PREFIX, prefix).
-type state() :: static_key() | ?PREFIX.
-type msg_storage_key() :: {static_key(), _Varying :: [binary()]}.
-type varying() :: [binary()].
-type msg_storage_key() :: {static_key(), varying()}.
-type threshold_fun() :: fun((non_neg_integer()) -> non_neg_integer()).
-type persist_callback() :: fun((_Key, _Val) -> ok).
-record(trie, {
persist :: persist_callback(),
trie :: ets:tid(),
stats :: ets:tid()
}).
@ -67,16 +72,23 @@
%%================================================================================
%% @doc Create an empty trie
-spec trie_create() -> trie().
trie_create() ->
-spec trie_create(persist_callback()) -> trie().
trie_create(Persist) ->
Trie = ets:new(trie, [{keypos, #trans.key}, set]),
Stats = ets:new(stats, [{keypos, 1}, set]),
#trie{
persist = Persist,
trie = Trie,
stats = Stats
}.
%% @doc Create a topic key,
-spec trie_create() -> trie().
trie_create() ->
trie_create(fun(_, _) ->
ok
end).
%% @doc Lookup the topic key. Create a new one, if not found.
-spec topic_key(trie(), threshold_fun(), [binary()]) -> msg_storage_key().
topic_key(Trie, ThresholdFun, Tokens) ->
do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, []).
@ -161,8 +173,9 @@ trie_next(#trie{trie = Trie}, State, Token) ->
end.
-spec trie_insert(trie(), state(), edge()) -> {Updated, state()} when
Updated :: false | non_neg_integer().
trie_insert(#trie{trie = Trie, stats = Stats}, State, Token) ->
NChildren :: non_neg_integer(),
Updated :: false | NChildren.
trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token) ->
Key = {State, Token},
NewState = get_id_for_key(State, Token),
Rec = #trans{
@ -171,6 +184,7 @@ trie_insert(#trie{trie = Trie, stats = Stats}, State, Token) ->
},
case ets:insert_new(Trie, Rec) of
true ->
ok = Persist(Key, NewState),
Inc =
case Token of
?EOT -> 0;
@ -206,7 +220,7 @@ get_id_for_key(_State, _Token) ->
crypto:strong_rand_bytes(8).
%% erlfmt-ignore
-spec do_match_topics(trie(), state(), non_neg_integer(), [binary() | '+' | '#']) ->
-spec do_match_topics(trie(), state(), [binary() | '+'], [binary() | '+' | '#']) ->
list().
do_match_topics(Trie, State, Varying, []) ->
case trie_next(Trie, State, ?EOT) of
@ -260,6 +274,8 @@ do_lookup_topic_key(Trie, State, [Tok | Rest], Varying) ->
end.
do_topic_key(Trie, _, _, State, [], Varying) ->
%% We reached the end of topic. Assert: Trie node that corresponds
%% to EOT cannot be a wildcard.
{_, false, Static} = trie_next_(Trie, State, ?EOT),
{Static, lists:reverse(Varying)};
do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Varying0) ->
@ -268,11 +284,15 @@ do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Varying0) ->
Varying =
case trie_next_(Trie, State, Tok) of
{NChildren, _, _DiscardState} when is_integer(NChildren), NChildren > Threshold ->
%% Number of children for the trie node reached the
%% threshold, we need to insert wildcard here:
{_, NextState} = trie_insert(Trie, State, ?PLUS),
[Tok | Varying0];
{_, false, NextState} ->
Varying0;
{_, true, NextState} ->
%% This topic level is marked as wildcard in the trie,
%% we need to add it to the varying part of the key:
[Tok | Varying0]
end,
do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, Varying).