diff --git a/.github/workflows/_push-entrypoint.yaml b/.github/workflows/_push-entrypoint.yaml index b1ba0cdeb..e2eff6dc7 100644 --- a/.github/workflows/_push-entrypoint.yaml +++ b/.github/workflows/_push-entrypoint.yaml @@ -11,7 +11,7 @@ on: - 'e*' branches: - 'master' - - 'release-5?' + - 'release-5[0-9]' - 'ci/**' env: diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index f647c660f..7042f5186 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -14,7 +14,7 @@ {emqx_conf,1}. {emqx_conf,2}. {emqx_conf,3}. -{emqx_connector, 1}. +{emqx_connector,1}. {emqx_dashboard,1}. {emqx_delayed,1}. {emqx_delayed,2}. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 802b29837..2dd4aa241 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -33,7 +33,7 @@ desc/1, types/0, short_paths/0, - short_paths_fields/1 + short_paths_fields/0 ]). -define(KILOBYTE, 1024). @@ -103,11 +103,11 @@ roots() -> ]. fields(limiter) -> - short_paths_fields(?MODULE, ?IMPORTANCE_HIDDEN) ++ + short_paths_fields(?IMPORTANCE_HIDDEN) ++ [ {Type, ?HOCON(?R_REF(node_opts), #{ - desc => ?DESC(Type), + desc => deprecated_desc(Type), importance => ?IMPORTANCE_HIDDEN, required => {false, recursively}, aliases => alias_of_type(Type) @@ -120,7 +120,7 @@ fields(limiter) -> ?HOCON( ?R_REF(client_fields), #{ - desc => ?DESC(client), + desc => deprecated_desc(client), importance => ?IMPORTANCE_HIDDEN, required => {false, recursively}, deprecated => {since, "5.0.25"} @@ -129,10 +129,10 @@ fields(limiter) -> ]; fields(node_opts) -> [ - {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"infinity">>})}, + {rate, ?HOCON(rate(), #{desc => deprecated_desc(rate), default => <<"infinity">>})}, {burst, ?HOCON(burst_rate(), #{ - desc => ?DESC(burst), + desc => deprecated_desc(burst), default => <<"0">> })} ]; @@ -142,11 +142,12 @@ fields(bucket_opts) -> fields_of_bucket(<<"infinity">>); fields(client_opts) -> [ - {rate, ?HOCON(rate(), #{default => <<"infinity">>, desc => ?DESC(rate)})}, + {rate, ?HOCON(rate(), #{default => <<"infinity">>, desc => deprecated_desc(rate)})}, {initial, ?HOCON(initial(), #{ default => <<"0">>, - desc => ?DESC(initial), + + desc => deprecated_desc(initial), importance => ?IMPORTANCE_HIDDEN })}, %% low_watermark add for emqx_channel and emqx_session @@ -157,14 +158,14 @@ fields(client_opts) -> ?HOCON( initial(), #{ - desc => ?DESC(low_watermark), + desc => deprecated_desc(low_watermark), default => <<"0">>, importance => ?IMPORTANCE_HIDDEN } )}, {burst, ?HOCON(burst(), #{ - desc => ?DESC(burst), + desc => deprecated_desc(burst), default => <<"0">>, importance => ?IMPORTANCE_HIDDEN, aliases => [capacity] @@ -173,7 +174,7 @@ fields(client_opts) -> ?HOCON( boolean(), #{ - desc => ?DESC(divisible), + desc => deprecated_desc(divisible), default => true, importance => ?IMPORTANCE_HIDDEN } @@ -182,7 +183,7 @@ fields(client_opts) -> ?HOCON( emqx_schema:timeout_duration(), #{ - desc => ?DESC(max_retry_time), + desc => deprecated_desc(max_retry_time), default => <<"1h">>, importance => ?IMPORTANCE_HIDDEN } @@ -191,7 +192,7 @@ fields(client_opts) -> ?HOCON( failure_strategy(), #{ - desc => ?DESC(failure_strategy), + desc => deprecated_desc(failure_strategy), default => force, importance => ?IMPORTANCE_HIDDEN } @@ -204,14 +205,14 @@ fields(listener_client_fields) -> fields(Type) -> simple_bucket_field(Type). -short_paths_fields(DesModule) -> - short_paths_fields(DesModule, ?DEFAULT_IMPORTANCE). +short_paths_fields() -> + short_paths_fields(?DEFAULT_IMPORTANCE). -short_paths_fields(DesModule, Importance) -> +short_paths_fields(Importance) -> [ {Name, ?HOCON(rate(), #{ - desc => ?DESC(DesModule, Name), + desc => ?DESC(Name), required => false, importance => Importance, example => Example @@ -381,7 +382,7 @@ simple_bucket_field(Type) when is_atom(Type) -> ?HOCON( ?R_REF(?MODULE, client_opts), #{ - desc => ?DESC(client), + desc => deprecated_desc(client), required => {false, recursively}, importance => importance_of_type(Type), aliases => alias_of_type(Type) @@ -394,7 +395,7 @@ composite_bucket_fields(Types, ClientRef) -> [ {Type, ?HOCON(?R_REF(?MODULE, bucket_opts), #{ - desc => ?DESC(?MODULE, Type), + desc => deprecated_desc(Type), required => {false, recursively}, importance => importance_of_type(Type), aliases => alias_of_type(Type) @@ -406,7 +407,7 @@ composite_bucket_fields(Types, ClientRef) -> ?HOCON( ?R_REF(?MODULE, ClientRef), #{ - desc => ?DESC(client), + desc => deprecated_desc(client), required => {false, recursively} } )} @@ -414,10 +415,10 @@ composite_bucket_fields(Types, ClientRef) -> fields_of_bucket(Default) -> [ - {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => Default})}, + {rate, ?HOCON(rate(), #{desc => deprecated_desc(rate), default => Default})}, {burst, ?HOCON(burst(), #{ - desc => ?DESC(burst), + desc => deprecated_desc(burst), default => <<"0">>, importance => ?IMPORTANCE_HIDDEN, aliases => [capacity] @@ -425,7 +426,7 @@ fields_of_bucket(Default) -> {initial, ?HOCON(initial(), #{ default => <<"0">>, - desc => ?DESC(initial), + desc => deprecated_desc(initial), importance => ?IMPORTANCE_HIDDEN })} ]. @@ -434,7 +435,7 @@ client_fields(Types) -> [ {Type, ?HOCON(?R_REF(client_opts), #{ - desc => ?DESC(Type), + desc => deprecated_desc(Type), required => false, importance => importance_of_type(Type), aliases => alias_of_type(Type) @@ -457,3 +458,6 @@ alias_of_type(bytes) -> [bytes_in]; alias_of_type(_) -> []. + +deprecated_desc(_Field) -> + <<"Deprecated since v5.0.25">>. diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index d137891a2..d86ca84ad 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -28,6 +28,10 @@ -include("emqx_persistent_session_ds.hrl"). +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + %%================================================================================ %% Type declarations %%================================================================================ @@ -132,7 +136,7 @@ fetch(_SessionId, Inflight, _Streams = [], _N, Acc) -> {lists:reverse(Acc), Inflight}; fetch(_SessionId, Inflight, _Streams, 0, Acc) -> {lists:reverse(Acc), Inflight}; -fetch(SessionId, Inflight0, [#ds_stream{stream = Stream} | Streams], N, Publishes0) -> +fetch(SessionId, Inflight0, [Stream | Streams], N, Publishes0) -> #inflight{next_seqno = FirstSeqNo, offset_ranges = Ranges0} = Inflight0, ItBegin = get_last_iterator(SessionId, Stream, Ranges0), {ok, ItEnd, Messages} = emqx_ds:next(ItBegin, N), @@ -162,6 +166,7 @@ fetch(SessionId, Inflight0, [#ds_stream{stream = Stream} | Streams], N, Publishe fetch(SessionId, Inflight1, Streams, N, Publishes) end. +-spec update_iterator(emqx_persistent_session_ds:id(), emqx_ds:stream(), emqx_ds:iterator()) -> ok. update_iterator(SessionId, Stream, Iterator) -> mria:dirty_write(?SESSION_ITER_TAB, #ds_iter{id = {SessionId, Stream}, iter = Iterator}). @@ -173,13 +178,20 @@ get_last_iterator(SessionId, Stream, Ranges) -> Next end. +-spec get_iterator(emqx_persistent_session_ds:id(), emqx_ds:stream()) -> emqx_ds:iterator(). get_iterator(SessionId, Stream) -> Id = {SessionId, Stream}, [#ds_iter{iter = It}] = mnesia:dirty_read(?SESSION_ITER_TAB, Id), It. +-spec get_streams(emqx_persistent_session_ds:id()) -> [emqx_ds:stream()]. get_streams(SessionId) -> - mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId). + lists:map( + fun(#ds_stream{stream = Stream}) -> + Stream + end, + mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId) + ). %% Packet ID as defined by MQTT protocol is a 16-bit integer in range %% 1..FFFF. This function translates internal session sequence number diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index f3027f500..5ab7723f7 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -97,8 +97,6 @@ props := map() }. -%% -type session() :: #session{}. - -type timestamp() :: emqx_utils_calendar:epoch_millisecond(). -type topic() :: emqx_types:topic(). -type clientinfo() :: emqx_types:clientinfo(). @@ -344,7 +342,15 @@ deliver(_ClientInfo, _Delivers, Session) -> handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) -> WindowSize = 100, {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize), - ensure_timer(pull), + %% TODO: make these values configurable: + Timeout = + case Publishes of + [] -> + 100; + [_ | _] -> + 0 + end, + ensure_timer(pull, Timeout), {ok, Publishes, Session#{inflight => Inflight}}; handle_timeout(_ClientInfo, get_streams, Session = #{id := Id}) -> renew_streams(Id), @@ -714,5 +720,9 @@ ensure_timers() -> -spec ensure_timer(pull | get_streams) -> ok. ensure_timer(Type) -> - _ = emqx_utils:start_timer(100, {emqx_session, Type}), + ensure_timer(Type, 100). + +-spec ensure_timer(pull | get_streams, non_neg_integer()) -> ok. +ensure_timer(Type, Timeout) -> + _ = emqx_utils:start_timer(Timeout, {emqx_session, Type}), ok. diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 54b077795..81b997df5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -36,6 +36,7 @@ stream :: emqx_ds:stream(), rank :: emqx_ds:stream_rank() }). +-type ds_stream() :: #ds_stream{}. -record(ds_iter, { id :: {emqx_persistent_session_ds:id(), emqx_ds:stream()}, diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 804a3a04c..3848e77b4 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1849,7 +1849,7 @@ base_listener(Bind) -> default => true } )} - ] ++ emqx_limiter_schema:short_paths_fields(?MODULE). + ] ++ emqx_limiter_schema:short_paths_fields(). desc("persistent_session_store") -> "Settings for message persistence."; diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index 6afdfa478..9bfb4d5e7 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -306,7 +306,7 @@ test_stepdown_session(Action, Reason) -> ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), ok = emqx_cm:register_channel(ClientId, Pid2, ConnInfo), - ?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))), + ?assertEqual(lists:sort([Pid1, Pid2]), lists:sort(emqx_cm:lookup_channels(ClientId))), case Reason of noproc -> exit(Pid1, kill), diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index d61dfa906..b81f43c4f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -43,30 +43,41 @@ %% Type declarations %%================================================================================ +%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending +%% records over the wire. + +%% tags: +-define(stream, stream). +-define(it, it). + +%% keys: +-define(tag, 1). +-define(shard, 2). +-define(enc, 3). + -type db() :: emqx_ds:db(). -type shard_id() :: {db(), atom()}. -%% This record enapsulates the stream entity from the replication -%% level. +%% This enapsulates the stream entity from the replication level. %% %% TODO: currently the stream is hardwired to only support the %% internal rocksdb storage. In the future we want to add another %% implementations for emqx_ds, so this type has to take this into %% account. --record(stream, { - shard :: emqx_ds_replication_layer:shard_id(), - enc :: emqx_ds_storage_layer:stream() -}). +-opaque stream() :: + #{ + ?tag := ?stream, + ?shard := emqx_ds_replication_layer:shard_id(), + ?enc := emqx_ds_storage_layer:stream() + }. --opaque stream() :: #stream{}. - --record(iterator, { - shard :: emqx_ds_replication_layer:shard_id(), - enc :: enqx_ds_storage_layer:iterator() -}). - --opaque iterator() :: #iterator{}. +-opaque iterator() :: + #{ + ?tag := ?it, + ?shard := emqx_ds_replication_layer:shard_id(), + ?enc := emqx_ds_storage_layer:iterator() + }. -type message_id() :: emqx_ds_storage_layer:message_id(). @@ -124,9 +135,10 @@ get_streams(DB, TopicFilter, StartTime) -> fun({RankY, Stream}) -> RankX = Shard, Rank = {RankX, RankY}, - {Rank, #stream{ - shard = Shard, - enc = Stream + {Rank, #{ + ?tag => ?stream, + ?shard => Shard, + ?enc => Stream }} end, Streams @@ -138,18 +150,18 @@ get_streams(DB, TopicFilter, StartTime) -> -spec make_iterator(stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). make_iterator(Stream, TopicFilter, StartTime) -> - #stream{shard = Shard, enc = StorageStream} = Stream, + #{?tag := ?stream, ?shard := Shard, ?enc := StorageStream} = Stream, Node = node_of_shard(Shard), case emqx_ds_proto_v1:make_iterator(Node, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> - {ok, #iterator{shard = Shard, enc = Iter}}; + {ok, #{?tag => ?it, ?shard => Shard, ?enc => Iter}}; Err = {error, _} -> Err end. -spec next(iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). next(Iter0, BatchSize) -> - #iterator{shard = Shard, enc = StorageIter0} = Iter0, + #{?tag := ?it, ?shard := Shard, ?enc := StorageIter0} = Iter0, Node = node_of_shard(Shard), %% TODO: iterator can contain information that is useful for %% reconstructing messages sent over the network. For example, @@ -161,7 +173,7 @@ next(Iter0, BatchSize) -> %% replication layer. Or, perhaps, in the logic layer. case emqx_ds_proto_v1:next(Node, Shard, StorageIter0, BatchSize) of {ok, StorageIter, Batch} -> - Iter = #iterator{shard = Shard, enc = StorageIter}, + Iter = Iter0#{?enc := StorageIter}, {ok, Iter, Batch}; Other -> Other @@ -184,14 +196,14 @@ do_drop_shard_v1(Shard) -> emqx_ds_storage_layer:drop_shard(Shard). -spec do_get_streams_v1(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> - [{integer(), _Stream}]. + [{integer(), emqx_ds_storage_layer:stream()}]. do_get_streams_v1(Shard, TopicFilter, StartTime) -> emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime). -spec do_make_iterator_v1( shard_id(), emqx_ds_storage_layer:stream(), emqx_ds:topic_filter(), emqx_ds:time() ) -> - {ok, iterator()} | {error, _}. + {ok, emqx_ds_storage_layer:iterator()} | {error, _}. do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) -> emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index d57d8013c..d2c997ae1 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -38,6 +38,20 @@ %% Type declarations %%================================================================================ +%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending +%% records over the wire. + +%% tags: +-define(stream, stream). +-define(it, it). + +%% keys: +-define(tag, 1). +-define(topic_filter, 2). +-define(start_time, 3). +-define(storage_key, 4). +-define(last_seen_key, 5). + -type options() :: #{ bits_per_wildcard_level => pos_integer(), @@ -65,18 +79,20 @@ -type s() :: #s{}. --record(stream, { - storage_key :: emqx_ds_lts:msg_storage_key() -}). +-type stream() :: + #{ + ?tag := ?stream, + ?storage_key := emqx_ds_lts:msg_storage_key() + }. --record(it, { - topic_filter :: emqx_ds:topic_filter(), - start_time :: emqx_ds:time(), - storage_key :: emqx_ds_lts:msg_storage_key(), - last_seen_key = <<>> :: binary() -}). - --type iterator() :: #it{}. +-type iterator() :: + #{ + ?tag := ?it, + ?topic_filter := emqx_ds:topic_filter(), + ?start_time := emqx_ds:time(), + ?storage_key := emqx_ds_lts:msg_storage_key(), + ?last_seen_key := binary() + }. -define(COUNTER, emqx_ds_storage_bitfield_lts_counter). @@ -170,18 +186,35 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> Messages ). +-spec get_streams( + emqx_ds_replication_layer:shard_id(), + s(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> [stream()]. get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter), - [#stream{storage_key = I} || I <- Indexes]. + [#{?tag => ?stream, ?storage_key => I} || I <- Indexes]. -make_iterator(_Shard, _Data, #stream{storage_key = StorageKey}, TopicFilter, StartTime) -> +-spec make_iterator( + emqx_ds_replication_layer:shard_id(), + s(), + stream(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> {ok, iterator()}. +make_iterator( + _Shard, _Data, #{?tag := ?stream, ?storage_key := StorageKey}, TopicFilter, StartTime +) -> %% Note: it's a good idea to keep the iterator structure lean, %% since it can be stored on a remote node that could update its %% code independently from us. - {ok, #it{ - topic_filter = TopicFilter, - start_time = StartTime, - storage_key = StorageKey + {ok, #{ + ?tag => ?it, + ?topic_filter => TopicFilter, + ?start_time => StartTime, + ?storage_key => StorageKey, + ?last_seen_key => <<>> }}. next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> @@ -192,16 +225,19 @@ next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, next_until(Schema, It, SafeCutoffTime, BatchSize). -next_until(_Schema, It, SafeCutoffTime, _BatchSize) when It#it.start_time >= SafeCutoffTime -> +next_until(_Schema, It = #{?tag := ?it, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when + StartTime >= SafeCutoffTime +-> %% We're in the middle of the current epoch, so we can't yet iterate over it. %% It would be unsafe otherwise: messages can be stored in the current epoch %% concurrently with iterating over it. They can end up earlier (in the iteration %% order) due to the nature of keymapping, potentially causing us to miss them. {ok, It, []}; next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, BatchSize) -> - #it{ - start_time = StartTime, - storage_key = {TopicIndex, Varying} + #{ + ?tag := ?it, + ?start_time := StartTime, + ?storage_key := {TopicIndex, Varying} } = It, %% Make filter: Inequations = [ @@ -250,7 +286,7 @@ next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) -> {ok, It, lists:reverse(Acc)}; next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> inc_counter(), - #it{last_seen_key = Key0} = It0, + #{?tag := ?it, ?last_seen_key := Key0} = It0, case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of overflow -> {ok, It0, lists:reverse(Acc0)}; @@ -268,7 +304,7 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> end. traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) -> - It = It0#it{last_seen_key = Key}, + It = It0#{?last_seen_key := Key}, case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of true -> Msg = deserialize(Val), @@ -310,7 +346,7 @@ check_message( overflow; check_message( _Cutoff, - #it{start_time = StartTime, topic_filter = TopicFilter}, + #{?tag := ?it, ?start_time := StartTime, ?topic_filter := TopicFilter}, #message{timestamp = Timestamp, topic = Topic} ) when Timestamp >= StartTime -> emqx_topic:match(emqx_topic:words(Topic), TopicFilter); diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 57af33d61..c91ac49d5 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -34,6 +34,18 @@ %% Type declarations %%================================================================================ +%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending +%% records over the wire. + +%% tags: +-define(stream, stream). +-define(it, it). + +%% keys: +-define(tag, 1). +-define(generation, 2). +-define(enc, 3). + -type prototype() :: {emqx_ds_storage_reference, emqx_ds_storage_reference:options()} | {emqx_ds_storage_bitfield_lts, emqx_ds_storage_bitfield_lts:options()}. @@ -44,23 +56,21 @@ -type gen_id() :: 0..16#ffff. -%% Note: this record might be stored permanently on a remote node. --record(stream, { - generation :: gen_id(), - enc :: _EncapsulatedData, - misc = #{} :: map() -}). +%% Note: this might be stored permanently on a remote node. +-opaque stream() :: + #{ + ?tag := ?stream, + ?generation := gen_id(), + ?enc := term() + }. --opaque stream() :: #stream{}. - -%% Note: this record might be stored permanently on a remote node. --record(it, { - generation :: gen_id(), - enc :: _EncapsulatedData, - misc = #{} :: map() -}). - --opaque iterator() :: #it{}. +%% Note: this might be stored permanently on a remote node. +-opaque iterator() :: + #{ + ?tag := ?it, + ?generation := gen_id(), + ?enc := term() + }. %%%% Generation: @@ -154,9 +164,10 @@ get_streams(Shard, TopicFilter, StartTime) -> #{module := Mod, data := GenData} = generation_get(Shard, GenId), Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), [ - {GenId, #stream{ - generation = GenId, - enc = Stream + {GenId, #{ + ?tag => ?stream, + ?generation => GenId, + ?enc => Stream }} || Stream <- Streams ] @@ -166,13 +177,16 @@ get_streams(Shard, TopicFilter, StartTime) -> -spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). -make_iterator(Shard, #stream{generation = GenId, enc = Stream}, TopicFilter, StartTime) -> +make_iterator( + Shard, #{?tag := ?stream, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime +) -> #{module := Mod, data := GenData} = generation_get(Shard, GenId), case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of {ok, Iter} -> - {ok, #it{ - generation = GenId, - enc = Iter + {ok, #{ + ?tag => ?it, + ?generation => GenId, + ?enc => Iter }}; {error, _} = Err -> Err @@ -180,7 +194,7 @@ make_iterator(Shard, #stream{generation = GenId, enc = Stream}, TopicFilter, Sta -spec next(shard_id(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). -next(Shard, Iter = #it{generation = GenId, enc = GenIter0}, BatchSize) -> +next(Shard, Iter = #{?tag := ?it, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> #{module := Mod, data := GenData} = generation_get(Shard, GenId), Current = generation_current(Shard), case Mod:next(Shard, GenData, GenIter0, BatchSize) of @@ -190,7 +204,7 @@ next(Shard, Iter = #it{generation = GenId, enc = GenIter0}, BatchSize) -> %% the stream has been fully replayed. {ok, end_of_stream}; {ok, GenIter, Batch} -> - {ok, Iter#it{enc = GenIter}, Batch}; + {ok, Iter#{?enc := GenIter}, Batch}; Error = {error, _} -> Error end. diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 17e873ecd..6a79a4a61 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -41,7 +41,7 @@ drop_shard(Node, Shard) -> -spec get_streams( node(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() ) -> - [{integer(), emqx_ds_replication_layer:stream()}]. + [{integer(), emqx_ds_storage_layer:stream()}]. get_streams(Node, Shard, TopicFilter, Time) -> erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]). @@ -52,7 +52,7 @@ get_streams(Node, Shard, TopicFilter, Time) -> emqx_ds:topic_filter(), emqx_ds:time() ) -> - {ok, emqx_ds_replication_layer:iterator()} | {error, _}. + {ok, emqx_ds_storage_layer:iterator()} | {error, _}. make_iterator(Node, Shard, Stream, TopicFilter, StartTime) -> erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [ Shard, Stream, TopicFilter, StartTime diff --git a/rel/i18n/emqx_limiter_schema.hocon b/rel/i18n/emqx_limiter_schema.hocon index b2958ce90..1a0ed5273 100644 --- a/rel/i18n/emqx_limiter_schema.hocon +++ b/rel/i18n/emqx_limiter_schema.hocon @@ -2,114 +2,33 @@ emqx_limiter_schema { max_conn_rate.desc: """Maximum connection rate.
-This is used to limit the connection rate for this node, -once the limit is reached, new connections will be deferred or refused""" +This is used to limit the connection rate for this node. +Once the limit is reached, new connections will be deferred or refused.
+For example:
+- 1000/s :: Only accepts 1000 connections per second
+- 1000/10s :: Only accepts 1000 connections every 10 seconds""" max_conn_rate.label: """Maximum Connection Rate""" messages_rate.desc: """Messages publish rate.
-This is used to limit the inbound message numbers for this node, -once the limit is reached, the restricted client will slow down and even be hung for a while.""" +This is used to limit the inbound message numbers for this node. +Once the limit is reached, the restricted client will slow down and even be hung for a while.
+For example:
+- 500/s :: Only the first 500 messages are sent per second and other messages are buffered.
+- 500/10s :: Only the first 500 messages are sent even 10 second and other messages are buffered.""" messages_rate.label: """Messages Publish Rate""" bytes_rate.desc: """Data publish rate.
-This is used to limit the inbound bytes rate for this node, -once the limit is reached, the restricted client will slow down and even be hung for a while.""" +This is used to limit the inbound bytes rate for this node. +Once the limit is reached, the restricted client will slow down and even be hung for a while.
+The unit of the bytes could be:KB MB GB.
+For example:
+- 500KB/s :: Only the first 500 kilobytes are sent per second and other messages are buffered.
+- 500MB/10s :: Only the first 500 megabytes are sent even 10 second and other messages are buffered.""" bytes_rate.label: """Data Publish Rate""" -bucket_cfg.desc: -"""Bucket Configs""" - -bucket_cfg.label: -"""Buckets""" - -burst.desc: -"""The burst, This value is based on rate.
- This value + rate = the maximum limit that can be achieved when limiter burst.""" - -burst.label: -"""Burst""" - -bytes.desc: -"""The `bytes` limiter. -This is used to limit the inbound bytes rate for this EMQX node. -Once the limit is reached, the restricted client will be slow down even be hung for a while.""" - -bytes.label: -"""Bytes""" - -client.desc: -"""The rate limit for each user of the bucket""" - -client.label: -"""Per Client""" - -connection.desc: -"""The connection limiter. -This is used to limit the connection rate for this EMQX node. -Once the limit is reached, new connections will be refused""" - -connection.label: -"""Connection""" - -divisible.desc: -"""Is it possible to split the number of requested tokens?""" - -divisible.label: -"""Divisible""" - -failure_strategy.desc: -"""The strategy when all the retries failed.""" - -failure_strategy.label: -"""Failure Strategy""" - -initial.desc: -"""The initial number of tokens for this bucket.""" - -initial.label: -"""Initial""" - -internal.desc: -"""Limiter for EMQX internal app.""" - -low_watermark.desc: -"""If the remaining tokens are lower than this value, -the check/consume will succeed, but it will be forced to wait for a short period of time.""" - -low_watermark.label: -"""Low Watermark""" - -max_retry_time.desc: -"""The maximum retry time when acquire failed.""" - -max_retry_time.label: -"""Max Retry Time""" - -message_routing.desc: -"""The message routing limiter. -This is used to limit the forwarding rate for this EMQX node. -Once the limit is reached, new publish will be refused""" - -message_routing.label: -"""Message Routing""" - -messages.desc: -"""The `messages` limiter. -This is used to limit the inbound message numbers for this EMQX node -Once the limit is reached, the restricted client will be slow down even be hung for a while.""" - -messages.label: -"""Messages""" - -rate.desc: -"""Rate for this bucket.""" - -rate.label: -"""Rate""" - } diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 9ed579994..e1d086197 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1039,27 +1039,6 @@ base_listener_limiter.desc: base_listener_limiter.label: """Type of the rate limit.""" -max_conn_rate.desc: -"""Maximum connection rate.
-This is used to limit the connection rate for this listener, -once the limit is reached, new connections will be deferred or refused""" -max_conn_rate.label: -"""Maximum Connection Rate""" - -messages_rate.desc: -"""Messages publish rate.
-This is used to limit the inbound message numbers for each client connected to this listener, -once the limit is reached, the restricted client will slow down and even be hung for a while.""" -messages_rate.label: -"""Messages Publish Rate""" - -bytes_rate.desc: -"""Data publish rate.
-This is used to limit the inbound bytes rate for each client connected to this listener, -once the limit is reached, the restricted client will slow down and even be hung for a while.""" -bytes_rate.label: -"""Data Publish Rate""" - persistent_session_store_backend.desc: """Database management system used to store information about persistent sessions and messages. - `builtin`: Use the embedded database (mria)"""