From f1ab7c8a7c83d1109d287ba5b8ab95ce64376e9b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 11 Oct 2023 16:38:16 +0200 Subject: [PATCH] feat(ds): Add persist callback to LTS trie --- apps/emqx_durable_storage/src/emqx_ds_lts.erl | 38 ++++++++++++++----- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index fcc9f2b36..5422979b7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -17,10 +17,10 @@ -module(emqx_ds_lts). %% API: --export([trie_create/0, topic_key/3, match_topics/2, lookup_topic_key/2, dump_to_dot/2]). +-export([trie_create/1, trie_create/0, topic_key/3, match_topics/2, lookup_topic_key/2]). %% Debug: --export([trie_next/3, trie_insert/3]). +-export([trie_next/3, trie_insert/3, dump_to_dot/2]). -export_type([static_key/0, trie/0]). @@ -46,11 +46,16 @@ -define(PREFIX, prefix). -type state() :: static_key() | ?PREFIX. --type msg_storage_key() :: {static_key(), _Varying :: [binary()]}. +-type varying() :: [binary()]. + +-type msg_storage_key() :: {static_key(), varying()}. -type threshold_fun() :: fun((non_neg_integer()) -> non_neg_integer()). +-type persist_callback() :: fun((_Key, _Val) -> ok). + -record(trie, { + persist :: persist_callback(), trie :: ets:tid(), stats :: ets:tid() }). @@ -67,16 +72,23 @@ %%================================================================================ %% @doc Create an empty trie --spec trie_create() -> trie(). -trie_create() -> +-spec trie_create(persist_callback()) -> trie(). +trie_create(Persist) -> Trie = ets:new(trie, [{keypos, #trans.key}, set]), Stats = ets:new(stats, [{keypos, 1}, set]), #trie{ + persist = Persist, trie = Trie, stats = Stats }. -%% @doc Create a topic key, +-spec trie_create() -> trie(). +trie_create() -> + trie_create(fun(_, _) -> + ok + end). + +%% @doc Lookup the topic key. Create a new one, if not found. -spec topic_key(trie(), threshold_fun(), [binary()]) -> msg_storage_key(). topic_key(Trie, ThresholdFun, Tokens) -> do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, []). @@ -161,8 +173,9 @@ trie_next(#trie{trie = Trie}, State, Token) -> end. -spec trie_insert(trie(), state(), edge()) -> {Updated, state()} when - Updated :: false | non_neg_integer(). -trie_insert(#trie{trie = Trie, stats = Stats}, State, Token) -> + NChildren :: non_neg_integer(), + Updated :: false | NChildren. +trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token) -> Key = {State, Token}, NewState = get_id_for_key(State, Token), Rec = #trans{ @@ -171,6 +184,7 @@ trie_insert(#trie{trie = Trie, stats = Stats}, State, Token) -> }, case ets:insert_new(Trie, Rec) of true -> + ok = Persist(Key, NewState), Inc = case Token of ?EOT -> 0; @@ -206,7 +220,7 @@ get_id_for_key(_State, _Token) -> crypto:strong_rand_bytes(8). %% erlfmt-ignore --spec do_match_topics(trie(), state(), non_neg_integer(), [binary() | '+' | '#']) -> +-spec do_match_topics(trie(), state(), [binary() | '+'], [binary() | '+' | '#']) -> list(). do_match_topics(Trie, State, Varying, []) -> case trie_next(Trie, State, ?EOT) of @@ -260,6 +274,8 @@ do_lookup_topic_key(Trie, State, [Tok | Rest], Varying) -> end. do_topic_key(Trie, _, _, State, [], Varying) -> + %% We reached the end of topic. Assert: Trie node that corresponds + %% to EOT cannot be a wildcard. {_, false, Static} = trie_next_(Trie, State, ?EOT), {Static, lists:reverse(Varying)}; do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Varying0) -> @@ -268,11 +284,15 @@ do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Varying0) -> Varying = case trie_next_(Trie, State, Tok) of {NChildren, _, _DiscardState} when is_integer(NChildren), NChildren > Threshold -> + %% Number of children for the trie node reached the + %% threshold, we need to insert wildcard here: {_, NextState} = trie_insert(Trie, State, ?PLUS), [Tok | Varying0]; {_, false, NextState} -> Varying0; {_, true, NextState} -> + %% This topic level is marked as wildcard in the trie, + %% we need to add it to the varying part of the key: [Tok | Varying0] end, do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, Varying).