Merge pull request #11923 from thalesmg/sync-m-to-r54-20231109
chore: sync `master` to `release-54`
This commit is contained in:
commit
b4e45f0189
|
@ -11,7 +11,7 @@ on:
|
|||
- 'e*'
|
||||
branches:
|
||||
- 'master'
|
||||
- 'release-5?'
|
||||
- 'release-5[0-9]'
|
||||
- 'ci/**'
|
||||
|
||||
env:
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
{emqx_conf,1}.
|
||||
{emqx_conf,2}.
|
||||
{emqx_conf,3}.
|
||||
{emqx_connector, 1}.
|
||||
{emqx_connector,1}.
|
||||
{emqx_dashboard,1}.
|
||||
{emqx_delayed,1}.
|
||||
{emqx_delayed,2}.
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
desc/1,
|
||||
types/0,
|
||||
short_paths/0,
|
||||
short_paths_fields/1
|
||||
short_paths_fields/0
|
||||
]).
|
||||
|
||||
-define(KILOBYTE, 1024).
|
||||
|
@ -103,11 +103,11 @@ roots() ->
|
|||
].
|
||||
|
||||
fields(limiter) ->
|
||||
short_paths_fields(?MODULE, ?IMPORTANCE_HIDDEN) ++
|
||||
short_paths_fields(?IMPORTANCE_HIDDEN) ++
|
||||
[
|
||||
{Type,
|
||||
?HOCON(?R_REF(node_opts), #{
|
||||
desc => ?DESC(Type),
|
||||
desc => deprecated_desc(Type),
|
||||
importance => ?IMPORTANCE_HIDDEN,
|
||||
required => {false, recursively},
|
||||
aliases => alias_of_type(Type)
|
||||
|
@ -120,7 +120,7 @@ fields(limiter) ->
|
|||
?HOCON(
|
||||
?R_REF(client_fields),
|
||||
#{
|
||||
desc => ?DESC(client),
|
||||
desc => deprecated_desc(client),
|
||||
importance => ?IMPORTANCE_HIDDEN,
|
||||
required => {false, recursively},
|
||||
deprecated => {since, "5.0.25"}
|
||||
|
@ -129,10 +129,10 @@ fields(limiter) ->
|
|||
];
|
||||
fields(node_opts) ->
|
||||
[
|
||||
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"infinity">>})},
|
||||
{rate, ?HOCON(rate(), #{desc => deprecated_desc(rate), default => <<"infinity">>})},
|
||||
{burst,
|
||||
?HOCON(burst_rate(), #{
|
||||
desc => ?DESC(burst),
|
||||
desc => deprecated_desc(burst),
|
||||
default => <<"0">>
|
||||
})}
|
||||
];
|
||||
|
@ -142,11 +142,12 @@ fields(bucket_opts) ->
|
|||
fields_of_bucket(<<"infinity">>);
|
||||
fields(client_opts) ->
|
||||
[
|
||||
{rate, ?HOCON(rate(), #{default => <<"infinity">>, desc => ?DESC(rate)})},
|
||||
{rate, ?HOCON(rate(), #{default => <<"infinity">>, desc => deprecated_desc(rate)})},
|
||||
{initial,
|
||||
?HOCON(initial(), #{
|
||||
default => <<"0">>,
|
||||
desc => ?DESC(initial),
|
||||
|
||||
desc => deprecated_desc(initial),
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
})},
|
||||
%% low_watermark add for emqx_channel and emqx_session
|
||||
|
@ -157,14 +158,14 @@ fields(client_opts) ->
|
|||
?HOCON(
|
||||
initial(),
|
||||
#{
|
||||
desc => ?DESC(low_watermark),
|
||||
desc => deprecated_desc(low_watermark),
|
||||
default => <<"0">>,
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{burst,
|
||||
?HOCON(burst(), #{
|
||||
desc => ?DESC(burst),
|
||||
desc => deprecated_desc(burst),
|
||||
default => <<"0">>,
|
||||
importance => ?IMPORTANCE_HIDDEN,
|
||||
aliases => [capacity]
|
||||
|
@ -173,7 +174,7 @@ fields(client_opts) ->
|
|||
?HOCON(
|
||||
boolean(),
|
||||
#{
|
||||
desc => ?DESC(divisible),
|
||||
desc => deprecated_desc(divisible),
|
||||
default => true,
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
|
@ -182,7 +183,7 @@ fields(client_opts) ->
|
|||
?HOCON(
|
||||
emqx_schema:timeout_duration(),
|
||||
#{
|
||||
desc => ?DESC(max_retry_time),
|
||||
desc => deprecated_desc(max_retry_time),
|
||||
default => <<"1h">>,
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
|
@ -191,7 +192,7 @@ fields(client_opts) ->
|
|||
?HOCON(
|
||||
failure_strategy(),
|
||||
#{
|
||||
desc => ?DESC(failure_strategy),
|
||||
desc => deprecated_desc(failure_strategy),
|
||||
default => force,
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
|
@ -204,14 +205,14 @@ fields(listener_client_fields) ->
|
|||
fields(Type) ->
|
||||
simple_bucket_field(Type).
|
||||
|
||||
short_paths_fields(DesModule) ->
|
||||
short_paths_fields(DesModule, ?DEFAULT_IMPORTANCE).
|
||||
short_paths_fields() ->
|
||||
short_paths_fields(?DEFAULT_IMPORTANCE).
|
||||
|
||||
short_paths_fields(DesModule, Importance) ->
|
||||
short_paths_fields(Importance) ->
|
||||
[
|
||||
{Name,
|
||||
?HOCON(rate(), #{
|
||||
desc => ?DESC(DesModule, Name),
|
||||
desc => ?DESC(Name),
|
||||
required => false,
|
||||
importance => Importance,
|
||||
example => Example
|
||||
|
@ -381,7 +382,7 @@ simple_bucket_field(Type) when is_atom(Type) ->
|
|||
?HOCON(
|
||||
?R_REF(?MODULE, client_opts),
|
||||
#{
|
||||
desc => ?DESC(client),
|
||||
desc => deprecated_desc(client),
|
||||
required => {false, recursively},
|
||||
importance => importance_of_type(Type),
|
||||
aliases => alias_of_type(Type)
|
||||
|
@ -394,7 +395,7 @@ composite_bucket_fields(Types, ClientRef) ->
|
|||
[
|
||||
{Type,
|
||||
?HOCON(?R_REF(?MODULE, bucket_opts), #{
|
||||
desc => ?DESC(?MODULE, Type),
|
||||
desc => deprecated_desc(Type),
|
||||
required => {false, recursively},
|
||||
importance => importance_of_type(Type),
|
||||
aliases => alias_of_type(Type)
|
||||
|
@ -406,7 +407,7 @@ composite_bucket_fields(Types, ClientRef) ->
|
|||
?HOCON(
|
||||
?R_REF(?MODULE, ClientRef),
|
||||
#{
|
||||
desc => ?DESC(client),
|
||||
desc => deprecated_desc(client),
|
||||
required => {false, recursively}
|
||||
}
|
||||
)}
|
||||
|
@ -414,10 +415,10 @@ composite_bucket_fields(Types, ClientRef) ->
|
|||
|
||||
fields_of_bucket(Default) ->
|
||||
[
|
||||
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => Default})},
|
||||
{rate, ?HOCON(rate(), #{desc => deprecated_desc(rate), default => Default})},
|
||||
{burst,
|
||||
?HOCON(burst(), #{
|
||||
desc => ?DESC(burst),
|
||||
desc => deprecated_desc(burst),
|
||||
default => <<"0">>,
|
||||
importance => ?IMPORTANCE_HIDDEN,
|
||||
aliases => [capacity]
|
||||
|
@ -425,7 +426,7 @@ fields_of_bucket(Default) ->
|
|||
{initial,
|
||||
?HOCON(initial(), #{
|
||||
default => <<"0">>,
|
||||
desc => ?DESC(initial),
|
||||
desc => deprecated_desc(initial),
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
})}
|
||||
].
|
||||
|
@ -434,7 +435,7 @@ client_fields(Types) ->
|
|||
[
|
||||
{Type,
|
||||
?HOCON(?R_REF(client_opts), #{
|
||||
desc => ?DESC(Type),
|
||||
desc => deprecated_desc(Type),
|
||||
required => false,
|
||||
importance => importance_of_type(Type),
|
||||
aliases => alias_of_type(Type)
|
||||
|
@ -457,3 +458,6 @@ alias_of_type(bytes) ->
|
|||
[bytes_in];
|
||||
alias_of_type(_) ->
|
||||
[].
|
||||
|
||||
deprecated_desc(_Field) ->
|
||||
<<"Deprecated since v5.0.25">>.
|
||||
|
|
|
@ -28,6 +28,10 @@
|
|||
|
||||
-include("emqx_persistent_session_ds.hrl").
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-endif.
|
||||
|
||||
%%================================================================================
|
||||
%% Type declarations
|
||||
%%================================================================================
|
||||
|
@ -132,7 +136,7 @@ fetch(_SessionId, Inflight, _Streams = [], _N, Acc) ->
|
|||
{lists:reverse(Acc), Inflight};
|
||||
fetch(_SessionId, Inflight, _Streams, 0, Acc) ->
|
||||
{lists:reverse(Acc), Inflight};
|
||||
fetch(SessionId, Inflight0, [#ds_stream{stream = Stream} | Streams], N, Publishes0) ->
|
||||
fetch(SessionId, Inflight0, [Stream | Streams], N, Publishes0) ->
|
||||
#inflight{next_seqno = FirstSeqNo, offset_ranges = Ranges0} = Inflight0,
|
||||
ItBegin = get_last_iterator(SessionId, Stream, Ranges0),
|
||||
{ok, ItEnd, Messages} = emqx_ds:next(ItBegin, N),
|
||||
|
@ -162,6 +166,7 @@ fetch(SessionId, Inflight0, [#ds_stream{stream = Stream} | Streams], N, Publishe
|
|||
fetch(SessionId, Inflight1, Streams, N, Publishes)
|
||||
end.
|
||||
|
||||
-spec update_iterator(emqx_persistent_session_ds:id(), emqx_ds:stream(), emqx_ds:iterator()) -> ok.
|
||||
update_iterator(SessionId, Stream, Iterator) ->
|
||||
mria:dirty_write(?SESSION_ITER_TAB, #ds_iter{id = {SessionId, Stream}, iter = Iterator}).
|
||||
|
||||
|
@ -173,13 +178,20 @@ get_last_iterator(SessionId, Stream, Ranges) ->
|
|||
Next
|
||||
end.
|
||||
|
||||
-spec get_iterator(emqx_persistent_session_ds:id(), emqx_ds:stream()) -> emqx_ds:iterator().
|
||||
get_iterator(SessionId, Stream) ->
|
||||
Id = {SessionId, Stream},
|
||||
[#ds_iter{iter = It}] = mnesia:dirty_read(?SESSION_ITER_TAB, Id),
|
||||
It.
|
||||
|
||||
-spec get_streams(emqx_persistent_session_ds:id()) -> [emqx_ds:stream()].
|
||||
get_streams(SessionId) ->
|
||||
mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId).
|
||||
lists:map(
|
||||
fun(#ds_stream{stream = Stream}) ->
|
||||
Stream
|
||||
end,
|
||||
mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId)
|
||||
).
|
||||
|
||||
%% Packet ID as defined by MQTT protocol is a 16-bit integer in range
|
||||
%% 1..FFFF. This function translates internal session sequence number
|
||||
|
|
|
@ -97,8 +97,6 @@
|
|||
props := map()
|
||||
}.
|
||||
|
||||
%% -type session() :: #session{}.
|
||||
|
||||
-type timestamp() :: emqx_utils_calendar:epoch_millisecond().
|
||||
-type topic() :: emqx_types:topic().
|
||||
-type clientinfo() :: emqx_types:clientinfo().
|
||||
|
@ -344,7 +342,15 @@ deliver(_ClientInfo, _Delivers, Session) ->
|
|||
handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) ->
|
||||
WindowSize = 100,
|
||||
{Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize),
|
||||
ensure_timer(pull),
|
||||
%% TODO: make these values configurable:
|
||||
Timeout =
|
||||
case Publishes of
|
||||
[] ->
|
||||
100;
|
||||
[_ | _] ->
|
||||
0
|
||||
end,
|
||||
ensure_timer(pull, Timeout),
|
||||
{ok, Publishes, Session#{inflight => Inflight}};
|
||||
handle_timeout(_ClientInfo, get_streams, Session = #{id := Id}) ->
|
||||
renew_streams(Id),
|
||||
|
@ -714,5 +720,9 @@ ensure_timers() ->
|
|||
|
||||
-spec ensure_timer(pull | get_streams) -> ok.
|
||||
ensure_timer(Type) ->
|
||||
_ = emqx_utils:start_timer(100, {emqx_session, Type}),
|
||||
ensure_timer(Type, 100).
|
||||
|
||||
-spec ensure_timer(pull | get_streams, non_neg_integer()) -> ok.
|
||||
ensure_timer(Type, Timeout) ->
|
||||
_ = emqx_utils:start_timer(Timeout, {emqx_session, Type}),
|
||||
ok.
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
stream :: emqx_ds:stream(),
|
||||
rank :: emqx_ds:stream_rank()
|
||||
}).
|
||||
-type ds_stream() :: #ds_stream{}.
|
||||
|
||||
-record(ds_iter, {
|
||||
id :: {emqx_persistent_session_ds:id(), emqx_ds:stream()},
|
||||
|
|
|
@ -1849,7 +1849,7 @@ base_listener(Bind) ->
|
|||
default => true
|
||||
}
|
||||
)}
|
||||
] ++ emqx_limiter_schema:short_paths_fields(?MODULE).
|
||||
] ++ emqx_limiter_schema:short_paths_fields().
|
||||
|
||||
desc("persistent_session_store") ->
|
||||
"Settings for message persistence.";
|
||||
|
|
|
@ -306,7 +306,7 @@ test_stepdown_session(Action, Reason) ->
|
|||
ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo),
|
||||
ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo),
|
||||
ok = emqx_cm:register_channel(ClientId, Pid2, ConnInfo),
|
||||
?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))),
|
||||
?assertEqual(lists:sort([Pid1, Pid2]), lists:sort(emqx_cm:lookup_channels(ClientId))),
|
||||
case Reason of
|
||||
noproc ->
|
||||
exit(Pid1, kill),
|
||||
|
|
|
@ -43,30 +43,41 @@
|
|||
%% Type declarations
|
||||
%%================================================================================
|
||||
|
||||
%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending
|
||||
%% records over the wire.
|
||||
|
||||
%% tags:
|
||||
-define(stream, stream).
|
||||
-define(it, it).
|
||||
|
||||
%% keys:
|
||||
-define(tag, 1).
|
||||
-define(shard, 2).
|
||||
-define(enc, 3).
|
||||
|
||||
-type db() :: emqx_ds:db().
|
||||
|
||||
-type shard_id() :: {db(), atom()}.
|
||||
|
||||
%% This record enapsulates the stream entity from the replication
|
||||
%% level.
|
||||
%% This enapsulates the stream entity from the replication level.
|
||||
%%
|
||||
%% TODO: currently the stream is hardwired to only support the
|
||||
%% internal rocksdb storage. In the future we want to add another
|
||||
%% implementations for emqx_ds, so this type has to take this into
|
||||
%% account.
|
||||
-record(stream, {
|
||||
shard :: emqx_ds_replication_layer:shard_id(),
|
||||
enc :: emqx_ds_storage_layer:stream()
|
||||
}).
|
||||
-opaque stream() ::
|
||||
#{
|
||||
?tag := ?stream,
|
||||
?shard := emqx_ds_replication_layer:shard_id(),
|
||||
?enc := emqx_ds_storage_layer:stream()
|
||||
}.
|
||||
|
||||
-opaque stream() :: #stream{}.
|
||||
|
||||
-record(iterator, {
|
||||
shard :: emqx_ds_replication_layer:shard_id(),
|
||||
enc :: enqx_ds_storage_layer:iterator()
|
||||
}).
|
||||
|
||||
-opaque iterator() :: #iterator{}.
|
||||
-opaque iterator() ::
|
||||
#{
|
||||
?tag := ?it,
|
||||
?shard := emqx_ds_replication_layer:shard_id(),
|
||||
?enc := emqx_ds_storage_layer:iterator()
|
||||
}.
|
||||
|
||||
-type message_id() :: emqx_ds_storage_layer:message_id().
|
||||
|
||||
|
@ -124,9 +135,10 @@ get_streams(DB, TopicFilter, StartTime) ->
|
|||
fun({RankY, Stream}) ->
|
||||
RankX = Shard,
|
||||
Rank = {RankX, RankY},
|
||||
{Rank, #stream{
|
||||
shard = Shard,
|
||||
enc = Stream
|
||||
{Rank, #{
|
||||
?tag => ?stream,
|
||||
?shard => Shard,
|
||||
?enc => Stream
|
||||
}}
|
||||
end,
|
||||
Streams
|
||||
|
@ -138,18 +150,18 @@ get_streams(DB, TopicFilter, StartTime) ->
|
|||
-spec make_iterator(stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
||||
emqx_ds:make_iterator_result(iterator()).
|
||||
make_iterator(Stream, TopicFilter, StartTime) ->
|
||||
#stream{shard = Shard, enc = StorageStream} = Stream,
|
||||
#{?tag := ?stream, ?shard := Shard, ?enc := StorageStream} = Stream,
|
||||
Node = node_of_shard(Shard),
|
||||
case emqx_ds_proto_v1:make_iterator(Node, Shard, StorageStream, TopicFilter, StartTime) of
|
||||
{ok, Iter} ->
|
||||
{ok, #iterator{shard = Shard, enc = Iter}};
|
||||
{ok, #{?tag => ?it, ?shard => Shard, ?enc => Iter}};
|
||||
Err = {error, _} ->
|
||||
Err
|
||||
end.
|
||||
|
||||
-spec next(iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
|
||||
next(Iter0, BatchSize) ->
|
||||
#iterator{shard = Shard, enc = StorageIter0} = Iter0,
|
||||
#{?tag := ?it, ?shard := Shard, ?enc := StorageIter0} = Iter0,
|
||||
Node = node_of_shard(Shard),
|
||||
%% TODO: iterator can contain information that is useful for
|
||||
%% reconstructing messages sent over the network. For example,
|
||||
|
@ -161,7 +173,7 @@ next(Iter0, BatchSize) ->
|
|||
%% replication layer. Or, perhaps, in the logic layer.
|
||||
case emqx_ds_proto_v1:next(Node, Shard, StorageIter0, BatchSize) of
|
||||
{ok, StorageIter, Batch} ->
|
||||
Iter = #iterator{shard = Shard, enc = StorageIter},
|
||||
Iter = Iter0#{?enc := StorageIter},
|
||||
{ok, Iter, Batch};
|
||||
Other ->
|
||||
Other
|
||||
|
@ -184,14 +196,14 @@ do_drop_shard_v1(Shard) ->
|
|||
emqx_ds_storage_layer:drop_shard(Shard).
|
||||
|
||||
-spec do_get_streams_v1(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
||||
[{integer(), _Stream}].
|
||||
[{integer(), emqx_ds_storage_layer:stream()}].
|
||||
do_get_streams_v1(Shard, TopicFilter, StartTime) ->
|
||||
emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime).
|
||||
|
||||
-spec do_make_iterator_v1(
|
||||
shard_id(), emqx_ds_storage_layer:stream(), emqx_ds:topic_filter(), emqx_ds:time()
|
||||
) ->
|
||||
{ok, iterator()} | {error, _}.
|
||||
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
|
||||
do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) ->
|
||||
emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime).
|
||||
|
||||
|
|
|
@ -38,6 +38,20 @@
|
|||
%% Type declarations
|
||||
%%================================================================================
|
||||
|
||||
%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending
|
||||
%% records over the wire.
|
||||
|
||||
%% tags:
|
||||
-define(stream, stream).
|
||||
-define(it, it).
|
||||
|
||||
%% keys:
|
||||
-define(tag, 1).
|
||||
-define(topic_filter, 2).
|
||||
-define(start_time, 3).
|
||||
-define(storage_key, 4).
|
||||
-define(last_seen_key, 5).
|
||||
|
||||
-type options() ::
|
||||
#{
|
||||
bits_per_wildcard_level => pos_integer(),
|
||||
|
@ -65,18 +79,20 @@
|
|||
|
||||
-type s() :: #s{}.
|
||||
|
||||
-record(stream, {
|
||||
storage_key :: emqx_ds_lts:msg_storage_key()
|
||||
}).
|
||||
-type stream() ::
|
||||
#{
|
||||
?tag := ?stream,
|
||||
?storage_key := emqx_ds_lts:msg_storage_key()
|
||||
}.
|
||||
|
||||
-record(it, {
|
||||
topic_filter :: emqx_ds:topic_filter(),
|
||||
start_time :: emqx_ds:time(),
|
||||
storage_key :: emqx_ds_lts:msg_storage_key(),
|
||||
last_seen_key = <<>> :: binary()
|
||||
}).
|
||||
|
||||
-type iterator() :: #it{}.
|
||||
-type iterator() ::
|
||||
#{
|
||||
?tag := ?it,
|
||||
?topic_filter := emqx_ds:topic_filter(),
|
||||
?start_time := emqx_ds:time(),
|
||||
?storage_key := emqx_ds_lts:msg_storage_key(),
|
||||
?last_seen_key := binary()
|
||||
}.
|
||||
|
||||
-define(COUNTER, emqx_ds_storage_bitfield_lts_counter).
|
||||
|
||||
|
@ -170,18 +186,35 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
|
|||
Messages
|
||||
).
|
||||
|
||||
-spec get_streams(
|
||||
emqx_ds_replication_layer:shard_id(),
|
||||
s(),
|
||||
emqx_ds:topic_filter(),
|
||||
emqx_ds:time()
|
||||
) -> [stream()].
|
||||
get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
|
||||
Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter),
|
||||
[#stream{storage_key = I} || I <- Indexes].
|
||||
[#{?tag => ?stream, ?storage_key => I} || I <- Indexes].
|
||||
|
||||
make_iterator(_Shard, _Data, #stream{storage_key = StorageKey}, TopicFilter, StartTime) ->
|
||||
-spec make_iterator(
|
||||
emqx_ds_replication_layer:shard_id(),
|
||||
s(),
|
||||
stream(),
|
||||
emqx_ds:topic_filter(),
|
||||
emqx_ds:time()
|
||||
) -> {ok, iterator()}.
|
||||
make_iterator(
|
||||
_Shard, _Data, #{?tag := ?stream, ?storage_key := StorageKey}, TopicFilter, StartTime
|
||||
) ->
|
||||
%% Note: it's a good idea to keep the iterator structure lean,
|
||||
%% since it can be stored on a remote node that could update its
|
||||
%% code independently from us.
|
||||
{ok, #it{
|
||||
topic_filter = TopicFilter,
|
||||
start_time = StartTime,
|
||||
storage_key = StorageKey
|
||||
{ok, #{
|
||||
?tag => ?it,
|
||||
?topic_filter => TopicFilter,
|
||||
?start_time => StartTime,
|
||||
?storage_key => StorageKey,
|
||||
?last_seen_key => <<>>
|
||||
}}.
|
||||
|
||||
next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
|
||||
|
@ -192,16 +225,19 @@ next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
|
|||
SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
|
||||
next_until(Schema, It, SafeCutoffTime, BatchSize).
|
||||
|
||||
next_until(_Schema, It, SafeCutoffTime, _BatchSize) when It#it.start_time >= SafeCutoffTime ->
|
||||
next_until(_Schema, It = #{?tag := ?it, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when
|
||||
StartTime >= SafeCutoffTime
|
||||
->
|
||||
%% We're in the middle of the current epoch, so we can't yet iterate over it.
|
||||
%% It would be unsafe otherwise: messages can be stored in the current epoch
|
||||
%% concurrently with iterating over it. They can end up earlier (in the iteration
|
||||
%% order) due to the nature of keymapping, potentially causing us to miss them.
|
||||
{ok, It, []};
|
||||
next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, BatchSize) ->
|
||||
#it{
|
||||
start_time = StartTime,
|
||||
storage_key = {TopicIndex, Varying}
|
||||
#{
|
||||
?tag := ?it,
|
||||
?start_time := StartTime,
|
||||
?storage_key := {TopicIndex, Varying}
|
||||
} = It,
|
||||
%% Make filter:
|
||||
Inequations = [
|
||||
|
@ -250,7 +286,7 @@ next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
|
|||
{ok, It, lists:reverse(Acc)};
|
||||
next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
|
||||
inc_counter(),
|
||||
#it{last_seen_key = Key0} = It0,
|
||||
#{?tag := ?it, ?last_seen_key := Key0} = It0,
|
||||
case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
|
||||
overflow ->
|
||||
{ok, It0, lists:reverse(Acc0)};
|
||||
|
@ -268,7 +304,7 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
|
|||
end.
|
||||
|
||||
traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) ->
|
||||
It = It0#it{last_seen_key = Key},
|
||||
It = It0#{?last_seen_key := Key},
|
||||
case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of
|
||||
true ->
|
||||
Msg = deserialize(Val),
|
||||
|
@ -310,7 +346,7 @@ check_message(
|
|||
overflow;
|
||||
check_message(
|
||||
_Cutoff,
|
||||
#it{start_time = StartTime, topic_filter = TopicFilter},
|
||||
#{?tag := ?it, ?start_time := StartTime, ?topic_filter := TopicFilter},
|
||||
#message{timestamp = Timestamp, topic = Topic}
|
||||
) when Timestamp >= StartTime ->
|
||||
emqx_topic:match(emqx_topic:words(Topic), TopicFilter);
|
||||
|
|
|
@ -34,6 +34,18 @@
|
|||
%% Type declarations
|
||||
%%================================================================================
|
||||
|
||||
%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending
|
||||
%% records over the wire.
|
||||
|
||||
%% tags:
|
||||
-define(stream, stream).
|
||||
-define(it, it).
|
||||
|
||||
%% keys:
|
||||
-define(tag, 1).
|
||||
-define(generation, 2).
|
||||
-define(enc, 3).
|
||||
|
||||
-type prototype() ::
|
||||
{emqx_ds_storage_reference, emqx_ds_storage_reference:options()}
|
||||
| {emqx_ds_storage_bitfield_lts, emqx_ds_storage_bitfield_lts:options()}.
|
||||
|
@ -44,23 +56,21 @@
|
|||
|
||||
-type gen_id() :: 0..16#ffff.
|
||||
|
||||
%% Note: this record might be stored permanently on a remote node.
|
||||
-record(stream, {
|
||||
generation :: gen_id(),
|
||||
enc :: _EncapsulatedData,
|
||||
misc = #{} :: map()
|
||||
}).
|
||||
%% Note: this might be stored permanently on a remote node.
|
||||
-opaque stream() ::
|
||||
#{
|
||||
?tag := ?stream,
|
||||
?generation := gen_id(),
|
||||
?enc := term()
|
||||
}.
|
||||
|
||||
-opaque stream() :: #stream{}.
|
||||
|
||||
%% Note: this record might be stored permanently on a remote node.
|
||||
-record(it, {
|
||||
generation :: gen_id(),
|
||||
enc :: _EncapsulatedData,
|
||||
misc = #{} :: map()
|
||||
}).
|
||||
|
||||
-opaque iterator() :: #it{}.
|
||||
%% Note: this might be stored permanently on a remote node.
|
||||
-opaque iterator() ::
|
||||
#{
|
||||
?tag := ?it,
|
||||
?generation := gen_id(),
|
||||
?enc := term()
|
||||
}.
|
||||
|
||||
%%%% Generation:
|
||||
|
||||
|
@ -154,9 +164,10 @@ get_streams(Shard, TopicFilter, StartTime) ->
|
|||
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
|
||||
Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
|
||||
[
|
||||
{GenId, #stream{
|
||||
generation = GenId,
|
||||
enc = Stream
|
||||
{GenId, #{
|
||||
?tag => ?stream,
|
||||
?generation => GenId,
|
||||
?enc => Stream
|
||||
}}
|
||||
|| Stream <- Streams
|
||||
]
|
||||
|
@ -166,13 +177,16 @@ get_streams(Shard, TopicFilter, StartTime) ->
|
|||
|
||||
-spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
||||
emqx_ds:make_iterator_result(iterator()).
|
||||
make_iterator(Shard, #stream{generation = GenId, enc = Stream}, TopicFilter, StartTime) ->
|
||||
make_iterator(
|
||||
Shard, #{?tag := ?stream, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime
|
||||
) ->
|
||||
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
|
||||
case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
|
||||
{ok, Iter} ->
|
||||
{ok, #it{
|
||||
generation = GenId,
|
||||
enc = Iter
|
||||
{ok, #{
|
||||
?tag => ?it,
|
||||
?generation => GenId,
|
||||
?enc => Iter
|
||||
}};
|
||||
{error, _} = Err ->
|
||||
Err
|
||||
|
@ -180,7 +194,7 @@ make_iterator(Shard, #stream{generation = GenId, enc = Stream}, TopicFilter, Sta
|
|||
|
||||
-spec next(shard_id(), iterator(), pos_integer()) ->
|
||||
emqx_ds:next_result(iterator()).
|
||||
next(Shard, Iter = #it{generation = GenId, enc = GenIter0}, BatchSize) ->
|
||||
next(Shard, Iter = #{?tag := ?it, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->
|
||||
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
|
||||
Current = generation_current(Shard),
|
||||
case Mod:next(Shard, GenData, GenIter0, BatchSize) of
|
||||
|
@ -190,7 +204,7 @@ next(Shard, Iter = #it{generation = GenId, enc = GenIter0}, BatchSize) ->
|
|||
%% the stream has been fully replayed.
|
||||
{ok, end_of_stream};
|
||||
{ok, GenIter, Batch} ->
|
||||
{ok, Iter#it{enc = GenIter}, Batch};
|
||||
{ok, Iter#{?enc := GenIter}, Batch};
|
||||
Error = {error, _} ->
|
||||
Error
|
||||
end.
|
||||
|
|
|
@ -41,7 +41,7 @@ drop_shard(Node, Shard) ->
|
|||
-spec get_streams(
|
||||
node(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
|
||||
) ->
|
||||
[{integer(), emqx_ds_replication_layer:stream()}].
|
||||
[{integer(), emqx_ds_storage_layer:stream()}].
|
||||
get_streams(Node, Shard, TopicFilter, Time) ->
|
||||
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]).
|
||||
|
||||
|
@ -52,7 +52,7 @@ get_streams(Node, Shard, TopicFilter, Time) ->
|
|||
emqx_ds:topic_filter(),
|
||||
emqx_ds:time()
|
||||
) ->
|
||||
{ok, emqx_ds_replication_layer:iterator()} | {error, _}.
|
||||
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
|
||||
make_iterator(Node, Shard, Stream, TopicFilter, StartTime) ->
|
||||
erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [
|
||||
Shard, Stream, TopicFilter, StartTime
|
||||
|
|
|
@ -2,114 +2,33 @@ emqx_limiter_schema {
|
|||
|
||||
max_conn_rate.desc:
|
||||
"""Maximum connection rate.<br/>
|
||||
This is used to limit the connection rate for this node,
|
||||
once the limit is reached, new connections will be deferred or refused"""
|
||||
This is used to limit the connection rate for this node.
|
||||
Once the limit is reached, new connections will be deferred or refused.<br/>
|
||||
For example:<br/>
|
||||
- <code>1000/s</code> :: Only accepts 1000 connections per second<br/>
|
||||
- <code>1000/10s</code> :: Only accepts 1000 connections every 10 seconds"""
|
||||
max_conn_rate.label:
|
||||
"""Maximum Connection Rate"""
|
||||
|
||||
messages_rate.desc:
|
||||
"""Messages publish rate.<br/>
|
||||
This is used to limit the inbound message numbers for this node,
|
||||
once the limit is reached, the restricted client will slow down and even be hung for a while."""
|
||||
This is used to limit the inbound message numbers for this node.
|
||||
Once the limit is reached, the restricted client will slow down and even be hung for a while.<br/>
|
||||
For example:<br/>
|
||||
- <code>500/s</code> :: Only the first 500 messages are sent per second and other messages are buffered.<br/>
|
||||
- <code>500/10s</code> :: Only the first 500 messages are sent even 10 second and other messages are buffered."""
|
||||
messages_rate.label:
|
||||
"""Messages Publish Rate"""
|
||||
|
||||
bytes_rate.desc:
|
||||
"""Data publish rate.<br/>
|
||||
This is used to limit the inbound bytes rate for this node,
|
||||
once the limit is reached, the restricted client will slow down and even be hung for a while."""
|
||||
This is used to limit the inbound bytes rate for this node.
|
||||
Once the limit is reached, the restricted client will slow down and even be hung for a while.<br/>
|
||||
The unit of the bytes could be:KB MB GB.<br/>
|
||||
For example:<br/>
|
||||
- <code>500KB/s</code> :: Only the first 500 kilobytes are sent per second and other messages are buffered.<br/>
|
||||
- <code>500MB/10s</code> :: Only the first 500 megabytes are sent even 10 second and other messages are buffered."""
|
||||
bytes_rate.label:
|
||||
"""Data Publish Rate"""
|
||||
|
||||
bucket_cfg.desc:
|
||||
"""Bucket Configs"""
|
||||
|
||||
bucket_cfg.label:
|
||||
"""Buckets"""
|
||||
|
||||
burst.desc:
|
||||
"""The burst, This value is based on rate.<br/>
|
||||
This value + rate = the maximum limit that can be achieved when limiter burst."""
|
||||
|
||||
burst.label:
|
||||
"""Burst"""
|
||||
|
||||
bytes.desc:
|
||||
"""The `bytes` limiter.
|
||||
This is used to limit the inbound bytes rate for this EMQX node.
|
||||
Once the limit is reached, the restricted client will be slow down even be hung for a while."""
|
||||
|
||||
bytes.label:
|
||||
"""Bytes"""
|
||||
|
||||
client.desc:
|
||||
"""The rate limit for each user of the bucket"""
|
||||
|
||||
client.label:
|
||||
"""Per Client"""
|
||||
|
||||
connection.desc:
|
||||
"""The connection limiter.
|
||||
This is used to limit the connection rate for this EMQX node.
|
||||
Once the limit is reached, new connections will be refused"""
|
||||
|
||||
connection.label:
|
||||
"""Connection"""
|
||||
|
||||
divisible.desc:
|
||||
"""Is it possible to split the number of requested tokens?"""
|
||||
|
||||
divisible.label:
|
||||
"""Divisible"""
|
||||
|
||||
failure_strategy.desc:
|
||||
"""The strategy when all the retries failed."""
|
||||
|
||||
failure_strategy.label:
|
||||
"""Failure Strategy"""
|
||||
|
||||
initial.desc:
|
||||
"""The initial number of tokens for this bucket."""
|
||||
|
||||
initial.label:
|
||||
"""Initial"""
|
||||
|
||||
internal.desc:
|
||||
"""Limiter for EMQX internal app."""
|
||||
|
||||
low_watermark.desc:
|
||||
"""If the remaining tokens are lower than this value,
|
||||
the check/consume will succeed, but it will be forced to wait for a short period of time."""
|
||||
|
||||
low_watermark.label:
|
||||
"""Low Watermark"""
|
||||
|
||||
max_retry_time.desc:
|
||||
"""The maximum retry time when acquire failed."""
|
||||
|
||||
max_retry_time.label:
|
||||
"""Max Retry Time"""
|
||||
|
||||
message_routing.desc:
|
||||
"""The message routing limiter.
|
||||
This is used to limit the forwarding rate for this EMQX node.
|
||||
Once the limit is reached, new publish will be refused"""
|
||||
|
||||
message_routing.label:
|
||||
"""Message Routing"""
|
||||
|
||||
messages.desc:
|
||||
"""The `messages` limiter.
|
||||
This is used to limit the inbound message numbers for this EMQX node
|
||||
Once the limit is reached, the restricted client will be slow down even be hung for a while."""
|
||||
|
||||
messages.label:
|
||||
"""Messages"""
|
||||
|
||||
rate.desc:
|
||||
"""Rate for this bucket."""
|
||||
|
||||
rate.label:
|
||||
"""Rate"""
|
||||
|
||||
}
|
||||
|
|
|
@ -1039,27 +1039,6 @@ base_listener_limiter.desc:
|
|||
base_listener_limiter.label:
|
||||
"""Type of the rate limit."""
|
||||
|
||||
max_conn_rate.desc:
|
||||
"""Maximum connection rate.<br/>
|
||||
This is used to limit the connection rate for this listener,
|
||||
once the limit is reached, new connections will be deferred or refused"""
|
||||
max_conn_rate.label:
|
||||
"""Maximum Connection Rate"""
|
||||
|
||||
messages_rate.desc:
|
||||
"""Messages publish rate.<br/>
|
||||
This is used to limit the inbound message numbers for each client connected to this listener,
|
||||
once the limit is reached, the restricted client will slow down and even be hung for a while."""
|
||||
messages_rate.label:
|
||||
"""Messages Publish Rate"""
|
||||
|
||||
bytes_rate.desc:
|
||||
"""Data publish rate.<br/>
|
||||
This is used to limit the inbound bytes rate for each client connected to this listener,
|
||||
once the limit is reached, the restricted client will slow down and even be hung for a while."""
|
||||
bytes_rate.label:
|
||||
"""Data Publish Rate"""
|
||||
|
||||
persistent_session_store_backend.desc:
|
||||
"""Database management system used to store information about persistent sessions and messages.
|
||||
- `builtin`: Use the embedded database (mria)"""
|
||||
|
|
Loading…
Reference in New Issue