Compare commits
2 Commits
master
...
dev/ilia/0
Author | SHA1 | Date |
---|---|---|
![]() |
13166e6ca8 | |
![]() |
a92869c8cf |
|
@ -2,7 +2,7 @@
|
|||
{application, emqx_retainer, [
|
||||
{description, "EMQX Retainer"},
|
||||
% strict semver, bump manually!
|
||||
{vsn, "5.0.21"},
|
||||
{vsn, "5.0.22"},
|
||||
{modules, []},
|
||||
{registered, [emqx_retainer_sup]},
|
||||
{applications, [kernel, stdlib, emqx, emqx_ctl]},
|
||||
|
|
|
@ -255,6 +255,8 @@ deliver([], Context, Pid, Topic, Cursor, Limiter) ->
|
|||
deliver(Result, Context, Pid, Topic, Cursor, Limiter) ->
|
||||
case erlang:is_process_alive(Pid) of
|
||||
false ->
|
||||
ok = close_cursor(Cursor),
|
||||
?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}),
|
||||
{ok, Limiter};
|
||||
_ ->
|
||||
DeliverNum = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined),
|
||||
|
@ -272,6 +274,11 @@ deliver(Result, Context, Pid, Topic, Cursor, Limiter) ->
|
|||
end
|
||||
end.
|
||||
|
||||
close_cursor({{qlc_cursor, _} = Cursor, _}) ->
|
||||
qlc:delete_cursor(Cursor);
|
||||
close_cursor(_Cursor) ->
|
||||
ok.
|
||||
|
||||
do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) ->
|
||||
{ok, Limiter};
|
||||
do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
|
||||
|
|
|
@ -22,7 +22,6 @@
|
|||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("stdlib/include/ms_transform.hrl").
|
||||
-include_lib("stdlib/include/qlc.hrl").
|
||||
|
||||
%% emqx_retainer callbacks
|
||||
-export([
|
||||
|
@ -60,7 +59,6 @@
|
|||
|
||||
-define(META_KEY, index_meta).
|
||||
|
||||
-define(CLEAR_BATCH_SIZE, 1000).
|
||||
-define(REINDEX_BATCH_SIZE, 1000).
|
||||
-define(REINDEX_DISPATCH_WAIT, 30000).
|
||||
-define(REINDEX_RPC_RETRY_INTERVAL, 1000).
|
||||
|
@ -167,15 +165,20 @@ clear_expired(_) ->
|
|||
|
||||
clear_expired() ->
|
||||
NowMs = erlang:system_time(millisecond),
|
||||
QH = qlc:q([
|
||||
RetainedMsg
|
||||
|| #retained_message{
|
||||
expiry_time = ExpiryTime
|
||||
} = RetainedMsg <- ets:table(?TAB_MESSAGE),
|
||||
(ExpiryTime =/= 0) and (ExpiryTime < NowMs)
|
||||
]),
|
||||
QC = qlc:cursor(QH),
|
||||
clear_batch(dirty_indices(write), QC).
|
||||
S0 = ets_stream(?TAB_MESSAGE),
|
||||
S1 = stream_filter(
|
||||
fun(#retained_message{expiry_time = ExpiryTime}) ->
|
||||
ExpiryTime =/= 0 andalso ExpiryTime < NowMs
|
||||
end,
|
||||
S0
|
||||
),
|
||||
DirtyWriteIndices = dirty_indices(write),
|
||||
stream_foreach(
|
||||
fun(RetainedMsg) ->
|
||||
delete_message_with_indices(RetainedMsg, DirtyWriteIndices)
|
||||
end,
|
||||
S1
|
||||
).
|
||||
|
||||
delete_message(_State, Topic) ->
|
||||
Tokens = topic_to_tokens(Topic),
|
||||
|
@ -183,13 +186,11 @@ delete_message(_State, Topic) ->
|
|||
false ->
|
||||
ok = delete_message_by_topic(Tokens, dirty_indices(write));
|
||||
true ->
|
||||
QH = search_table(Tokens, 0),
|
||||
qlc:fold(
|
||||
fun(RetainedMsg, _) ->
|
||||
ok = delete_message_with_indices(RetainedMsg, dirty_indices(write))
|
||||
end,
|
||||
undefined,
|
||||
QH
|
||||
S = search_stream(Tokens, 0),
|
||||
DirtyWriteIndices = dirty_indices(write),
|
||||
stream_foreach(
|
||||
fun(RetainedMsg) -> delete_message_with_indices(RetainedMsg, DirtyWriteIndices) end,
|
||||
S
|
||||
)
|
||||
end.
|
||||
|
||||
|
@ -199,54 +200,49 @@ read_message(_State, Topic) ->
|
|||
match_messages(_State, Topic, undefined) ->
|
||||
Tokens = topic_to_tokens(Topic),
|
||||
Now = erlang:system_time(millisecond),
|
||||
QH = msg_table(search_table(Tokens, Now)),
|
||||
S = msg_stream(search_stream(Tokens, Now)),
|
||||
case batch_read_number() of
|
||||
all_remaining ->
|
||||
{ok, qlc:eval(QH), undefined};
|
||||
{ok, emqx_utils_stream:consume(S), undefined};
|
||||
BatchNum when is_integer(BatchNum) ->
|
||||
Cursor = qlc:cursor(QH),
|
||||
match_messages(undefined, Topic, {Cursor, BatchNum})
|
||||
match_messages(undefined, Topic, {S, BatchNum})
|
||||
end;
|
||||
match_messages(_State, _Topic, {Cursor, BatchNum}) ->
|
||||
case qlc_next_answers(Cursor, BatchNum) of
|
||||
{closed, Rows} ->
|
||||
{ok, Rows, undefined};
|
||||
{more, Rows} ->
|
||||
{ok, Rows, {Cursor, BatchNum}}
|
||||
match_messages(_State, _Topic, {S0, BatchNum}) ->
|
||||
case emqx_utils_stream:consume(BatchNum, S0) of
|
||||
{Rows, S1} ->
|
||||
{ok, Rows, {S1, BatchNum}};
|
||||
Rows when is_list(Rows) ->
|
||||
{ok, Rows, undefined}
|
||||
end.
|
||||
|
||||
page_read(_State, Topic, Page, Limit) ->
|
||||
Now = erlang:system_time(millisecond),
|
||||
QH =
|
||||
S0 =
|
||||
case Topic of
|
||||
undefined ->
|
||||
msg_table(search_table(undefined, ['#'], Now));
|
||||
msg_stream(search_stream(undefined, ['#'], Now));
|
||||
_ ->
|
||||
Tokens = topic_to_tokens(Topic),
|
||||
msg_table(search_table(Tokens, Now))
|
||||
msg_stream(search_stream(Tokens, Now))
|
||||
end,
|
||||
OrderedQH = qlc:sort(QH, {order, fun compare_message/2}),
|
||||
Cursor = qlc:cursor(OrderedQH),
|
||||
%% This is very inefficient, but we are limited with inherited API
|
||||
S1 = emqx_utils_stream:list(
|
||||
lists:sort(
|
||||
fun compare_message/2,
|
||||
emqx_utils_stream:consume(S0)
|
||||
)
|
||||
),
|
||||
NSkip = (Page - 1) * Limit,
|
||||
SkipResult =
|
||||
case NSkip > 0 of
|
||||
true ->
|
||||
{Result, _} = qlc_next_answers(Cursor, NSkip),
|
||||
Result;
|
||||
false ->
|
||||
more
|
||||
end,
|
||||
case SkipResult of
|
||||
closed ->
|
||||
{ok, false, []};
|
||||
more ->
|
||||
case qlc_next_answers(Cursor, Limit) of
|
||||
{closed, Rows} ->
|
||||
{ok, false, Rows};
|
||||
{more, Rows} ->
|
||||
qlc:delete_cursor(Cursor),
|
||||
{ok, true, Rows}
|
||||
end
|
||||
case emqx_utils_stream:consume(NSkip, S1) of
|
||||
{_, S2} ->
|
||||
case emqx_utils_stream:consume(Limit, S2) of
|
||||
{Rows, _S3} ->
|
||||
{ok, true, Rows};
|
||||
Rows when is_list(Rows) ->
|
||||
{ok, false, Rows}
|
||||
end;
|
||||
Rows when is_list(Rows) ->
|
||||
{ok, false, []}
|
||||
end.
|
||||
|
||||
clean(_) ->
|
||||
|
@ -318,58 +314,63 @@ do_store_retained_index(Key, ExpiryTime) ->
|
|||
},
|
||||
mnesia:write(?TAB_INDEX, RetainedIndex, write).
|
||||
|
||||
msg_table(SearchTable) ->
|
||||
qlc:q([
|
||||
Msg
|
||||
|| #retained_message{
|
||||
msg = Msg
|
||||
} <- SearchTable
|
||||
]).
|
||||
msg_stream(SearchStream) ->
|
||||
emqx_utils_stream:map(
|
||||
fun(#retained_message{msg = Msg}) -> Msg end,
|
||||
SearchStream
|
||||
).
|
||||
|
||||
search_table(Tokens, Now) ->
|
||||
search_stream(Tokens, Now) ->
|
||||
Indices = dirty_indices(read),
|
||||
Index = emqx_retainer_index:select_index(Tokens, Indices),
|
||||
search_table(Index, Tokens, Now).
|
||||
search_stream(Index, Tokens, Now).
|
||||
|
||||
search_table(undefined, Tokens, Now) ->
|
||||
search_stream(undefined, Tokens, Now) ->
|
||||
Ms = make_message_match_spec(Tokens, Now),
|
||||
ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]);
|
||||
search_table(Index, FilterTokens, Now) ->
|
||||
emqx_utils_stream:ets(
|
||||
fun
|
||||
(undefined) -> ets:select(?TAB_MESSAGE, Ms, 1);
|
||||
(Cont) -> ets:select(Cont)
|
||||
end
|
||||
);
|
||||
search_stream(Index, FilterTokens, Now) ->
|
||||
{Ms, IsExactMs} = make_index_match_spec(Index, FilterTokens, Now),
|
||||
Topics = [
|
||||
emqx_retainer_index:restore_topic(Key)
|
||||
|| #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms)
|
||||
],
|
||||
RetainedMsgQH = qlc:q([
|
||||
ets:lookup(?TAB_MESSAGE, TopicTokens)
|
||||
|| TopicTokens <- Topics, match(IsExactMs, TopicTokens, FilterTokens)
|
||||
]),
|
||||
qlc:q([
|
||||
RetainedMsg
|
||||
|| [
|
||||
#retained_message{
|
||||
expiry_time = ExpiryTime
|
||||
} = RetainedMsg
|
||||
] <- RetainedMsgQH,
|
||||
(ExpiryTime == 0) or (ExpiryTime > Now)
|
||||
]).
|
||||
IndexRecordStream = emqx_utils_stream:ets(
|
||||
fun
|
||||
(undefined) -> ets:select(?TAB_INDEX, Ms, 1);
|
||||
(Cont) -> ets:select(Cont)
|
||||
end
|
||||
),
|
||||
TopicStream = emqx_utils_stream:map(
|
||||
fun(#retained_index{key = Key}) -> emqx_retainer_index:restore_topic(Key) end,
|
||||
IndexRecordStream
|
||||
),
|
||||
MatchingTopicStream = stream_filter(
|
||||
fun(TopicTokens) -> match(IsExactMs, TopicTokens, FilterTokens) end,
|
||||
TopicStream
|
||||
),
|
||||
LookedUpRetainMsgStream = emqx_utils_stream:map(
|
||||
fun(TopicTokens) -> ets:lookup(?TAB_MESSAGE, TopicTokens) end,
|
||||
MatchingTopicStream
|
||||
),
|
||||
FoundAndValidLookedUpRetainMsgStream = stream_filter(
|
||||
fun
|
||||
([#retained_message{expiry_time = ExpiryTime}]) ->
|
||||
ExpiryTime =:= 0 orelse ExpiryTime >= Now;
|
||||
([]) ->
|
||||
false
|
||||
end,
|
||||
LookedUpRetainMsgStream
|
||||
),
|
||||
RetainMsgStream = emqx_utils_stream:map(
|
||||
fun([RetainedMsg]) -> RetainedMsg end,
|
||||
FoundAndValidLookedUpRetainMsgStream
|
||||
),
|
||||
RetainMsgStream.
|
||||
|
||||
match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true;
|
||||
match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens).
|
||||
|
||||
clear_batch(Indices, QC) ->
|
||||
{Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE),
|
||||
lists:foreach(
|
||||
fun(RetainedMsg) ->
|
||||
delete_message_with_indices(RetainedMsg, Indices)
|
||||
end,
|
||||
Rows
|
||||
),
|
||||
case Result of
|
||||
closed -> ok;
|
||||
more -> clear_batch(Indices, QC)
|
||||
end.
|
||||
|
||||
delete_message_by_topic(TopicTokens, Indices) ->
|
||||
case mnesia:dirty_read(?TAB_MESSAGE, TopicTokens) of
|
||||
[] -> ok;
|
||||
|
@ -409,21 +410,6 @@ read_messages(Topic) ->
|
|||
end
|
||||
end.
|
||||
|
||||
qlc_next_answers(QC, N) ->
|
||||
case qlc:next_answers(QC, N) of
|
||||
NextAnswers when
|
||||
is_list(NextAnswers) andalso
|
||||
length(NextAnswers) < N
|
||||
->
|
||||
qlc:delete_cursor(QC),
|
||||
{closed, NextAnswers};
|
||||
NextAnswers when is_list(NextAnswers) ->
|
||||
{more, NextAnswers};
|
||||
{error, Module, Reason} ->
|
||||
qlc:delete_cursor(QC),
|
||||
error({qlc_error, Module, Reason})
|
||||
end.
|
||||
|
||||
make_message_match_spec(Tokens, NowMs) ->
|
||||
Cond = emqx_retainer_index:condition(Tokens),
|
||||
MsHd = #retained_message{topic = Cond, msg = '_', expiry_time = '$3'},
|
||||
|
@ -552,8 +538,11 @@ reindex(NewIndices, Force, StatusFun) when
|
|||
{atomic, ok} = mria:clear_table(?TAB_INDEX),
|
||||
|
||||
%% Fill index records in batches.
|
||||
QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]),
|
||||
ok = reindex_batch(qlc:cursor(QH), 0, StatusFun),
|
||||
TopicStream = emqx_utils_stream:map(
|
||||
fun(#retained_message{topic = Topic}) -> Topic end,
|
||||
ets_stream(?TAB_MESSAGE)
|
||||
),
|
||||
ok = reindex_batch(TopicStream, 0, StatusFun),
|
||||
|
||||
%% Enable read indices and unlock reindexing.
|
||||
finalize_reindex();
|
||||
|
@ -631,12 +620,12 @@ reindex_topic(Indices, Topic) ->
|
|||
ok
|
||||
end.
|
||||
|
||||
reindex_batch(QC, Done, StatusFun) ->
|
||||
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [QC, Done]) of
|
||||
{atomic, {more, NewDone}} ->
|
||||
reindex_batch(Stream0, Done, StatusFun) ->
|
||||
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [Stream0, Done]) of
|
||||
{atomic, {more, NewDone, Stream1}} ->
|
||||
_ = StatusFun(NewDone),
|
||||
reindex_batch(QC, NewDone, StatusFun);
|
||||
{atomic, {closed, NewDone}} ->
|
||||
reindex_batch(Stream1, NewDone, StatusFun);
|
||||
{atomic, {done, NewDone}} ->
|
||||
_ = StatusFun(NewDone),
|
||||
ok;
|
||||
{aborted, Reason} ->
|
||||
|
@ -647,14 +636,26 @@ reindex_batch(QC, Done, StatusFun) ->
|
|||
{error, Reason}
|
||||
end.
|
||||
|
||||
do_reindex_batch(QC, Done) ->
|
||||
do_reindex_batch(Stream0, Done) ->
|
||||
Indices = db_indices(write),
|
||||
{Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE),
|
||||
Result = emqx_utils_stream:consume(?REINDEX_BATCH_SIZE, Stream0),
|
||||
Topics =
|
||||
case Result of
|
||||
{Rows, _Stream1} ->
|
||||
Rows;
|
||||
Rows when is_list(Rows) ->
|
||||
Rows
|
||||
end,
|
||||
ok = lists:foreach(
|
||||
fun(Topic) -> reindex_topic(Indices, Topic) end,
|
||||
Topics
|
||||
),
|
||||
{Status, Done + length(Topics)}.
|
||||
case Result of
|
||||
{_Rows, Stream1} ->
|
||||
{more, Done + length(Topics), Stream1};
|
||||
_Rows ->
|
||||
{done, Done + length(Topics)}
|
||||
end.
|
||||
|
||||
wait_dispatch_complete(Timeout) ->
|
||||
Nodes = mria:running_nodes(),
|
||||
|
@ -690,3 +691,37 @@ are_indices_updated(Indices) ->
|
|||
_ ->
|
||||
false
|
||||
end.
|
||||
|
||||
ets_stream(Tab) ->
|
||||
emqx_utils_stream:ets(
|
||||
fun
|
||||
(undefined) -> ets:match_object(Tab, '$1', 1);
|
||||
(Cont) -> ets:match_object(Cont)
|
||||
end
|
||||
).
|
||||
|
||||
stream_foreach(F, S) ->
|
||||
case emqx_utils_stream:next(S) of
|
||||
[X | Rest] ->
|
||||
F(X),
|
||||
stream_foreach(F, Rest);
|
||||
[] ->
|
||||
ok
|
||||
end.
|
||||
|
||||
%% TODO: move to emqx_utils_stream
|
||||
stream_filter(F, S) ->
|
||||
FilterNext = fun FilterNext(St) ->
|
||||
case emqx_utils_stream:next(St) of
|
||||
[X | Rest] ->
|
||||
case F(X) of
|
||||
true ->
|
||||
[X | stream_filter(F, Rest)];
|
||||
false ->
|
||||
FilterNext(Rest)
|
||||
end;
|
||||
[] ->
|
||||
[]
|
||||
end
|
||||
end,
|
||||
fun() -> FilterNext(S) end.
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
|
||||
-include("emqx_retainer.hrl").
|
||||
|
||||
-include_lib("emqx/include/asserts.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
@ -77,31 +78,25 @@ end_per_suite(Config) ->
|
|||
emqx_cth_suite:stop(?config(suite_apps, Config)).
|
||||
|
||||
init_per_group(mnesia_without_indices, Config) ->
|
||||
mnesia:clear_table(?TAB_INDEX_META),
|
||||
mnesia:clear_table(?TAB_INDEX),
|
||||
mnesia:clear_table(?TAB_MESSAGE),
|
||||
Config;
|
||||
[{index, false} | Config];
|
||||
init_per_group(mnesia_reindex, Config) ->
|
||||
emqx_retainer_mnesia:populate_index_meta(),
|
||||
mnesia:clear_table(?TAB_INDEX),
|
||||
mnesia:clear_table(?TAB_MESSAGE),
|
||||
Config;
|
||||
init_per_group(_, Config) ->
|
||||
emqx_retainer_mnesia:populate_index_meta(),
|
||||
mnesia:clear_table(?TAB_INDEX),
|
||||
mnesia:clear_table(?TAB_MESSAGE),
|
||||
Config.
|
||||
|
||||
end_per_group(_Group, Config) ->
|
||||
emqx_retainer_mnesia:populate_index_meta(),
|
||||
Config.
|
||||
|
||||
init_per_testcase(t_get_basic_usage_info, Config) ->
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
case ?config(index, Config) of
|
||||
false ->
|
||||
mnesia:clear_table(?TAB_INDEX_META);
|
||||
_ ->
|
||||
emqx_retainer_mnesia:populate_index_meta()
|
||||
end,
|
||||
mnesia:clear_table(?TAB_INDEX),
|
||||
mnesia:clear_table(?TAB_MESSAGE),
|
||||
emqx_retainer_mnesia:populate_index_meta(),
|
||||
Config;
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
|
||||
app_spec() ->
|
||||
|
@ -309,7 +304,7 @@ t_message_expiry(Config) ->
|
|||
|
||||
ok = emqtt:disconnect(C1)
|
||||
end,
|
||||
with_conf(ConfMod, Case).
|
||||
with_conf(Config, ConfMod, Case).
|
||||
|
||||
t_message_expiry_2(Config) ->
|
||||
ConfMod = fun(Conf) ->
|
||||
|
@ -331,9 +326,9 @@ t_message_expiry_2(Config) ->
|
|||
|
||||
ok = emqtt:disconnect(C1)
|
||||
end,
|
||||
with_conf(ConfMod, Case).
|
||||
with_conf(Config, ConfMod, Case).
|
||||
|
||||
t_table_full(_) ->
|
||||
t_table_full(Config) ->
|
||||
ConfMod = fun(Conf) ->
|
||||
Conf#{<<"backend">> => #{<<"max_retained_messages">> => <<"1">>}}
|
||||
end,
|
||||
|
@ -350,7 +345,7 @@ t_table_full(_) ->
|
|||
|
||||
ok = emqtt:disconnect(C1)
|
||||
end,
|
||||
with_conf(ConfMod, Case).
|
||||
with_conf(Config, ConfMod, Case).
|
||||
|
||||
t_clean(Config) ->
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
|
@ -445,7 +440,7 @@ t_flow_control(_) ->
|
|||
Diff = End - Begin,
|
||||
|
||||
?assert(
|
||||
Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9),
|
||||
Diff > timer:seconds(2.5) andalso Diff < timer:seconds(4),
|
||||
lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))
|
||||
),
|
||||
|
||||
|
@ -461,7 +456,7 @@ t_flow_control(_) ->
|
|||
}),
|
||||
ok.
|
||||
|
||||
t_clear_expired(_) ->
|
||||
t_clear_expired(Config) ->
|
||||
ConfMod = fun(Conf) ->
|
||||
Conf#{
|
||||
<<"msg_clear_interval">> := <<"1s">>,
|
||||
|
@ -497,9 +492,9 @@ t_clear_expired(_) ->
|
|||
|
||||
ok = emqtt:disconnect(C1)
|
||||
end,
|
||||
with_conf(ConfMod, Case).
|
||||
with_conf(Config, ConfMod, Case).
|
||||
|
||||
t_max_payload_size(_) ->
|
||||
t_max_payload_size(Config) ->
|
||||
ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := <<"1kb">>} end,
|
||||
Case = fun() ->
|
||||
emqx_retainer:clean(),
|
||||
|
@ -528,7 +523,7 @@ t_max_payload_size(_) ->
|
|||
|
||||
ok = emqtt:disconnect(C1)
|
||||
end,
|
||||
with_conf(ConfMod, Case).
|
||||
with_conf(Config, ConfMod, Case).
|
||||
|
||||
t_page_read(_) ->
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
|
@ -835,10 +830,11 @@ receive_messages(Count, Msgs) ->
|
|||
Msgs
|
||||
end.
|
||||
|
||||
with_conf(ConfMod, Case) ->
|
||||
with_conf(CTConfig, ConfMod, Case) ->
|
||||
Conf = emqx:get_raw_config([retainer]),
|
||||
NewConf = ConfMod(Conf),
|
||||
emqx_retainer:update_config(NewConf),
|
||||
?config(index, CTConfig) =:= false andalso mria:clear_table(?TAB_INDEX_META),
|
||||
try
|
||||
Case(),
|
||||
{ok, _} = emqx_retainer:update_config(Conf)
|
||||
|
|
Loading…
Reference in New Issue