diff --git a/Makefile b/Makefile index ed10a09fd..8e8f4b493 100644 --- a/Makefile +++ b/Makefile @@ -85,7 +85,7 @@ $(REL_PROFILES:%=%-compile): $(REBAR) merge-config .PHONY: ct ct: $(REBAR) merge-config - ENABLE_COVER_COMPILE=1 $(REBAR) ct --name $(CT_NODE_NAME) -c -v --cover_export_name $(CT_COVER_EXPORT_PREFIX)-ct + @ENABLE_COVER_COMPILE=1 $(REBAR) ct --name $(CT_NODE_NAME) -c -v --cover_export_name $(CT_COVER_EXPORT_PREFIX)-ct ## only check bpapi for enterprise profile because it's a super-set. .PHONY: static_checks diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 47967cb1e..f647c660f 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -18,6 +18,7 @@ {emqx_dashboard,1}. {emqx_delayed,1}. {emqx_delayed,2}. +{emqx_ds,1}. {emqx_eviction_agent,1}. {emqx_eviction_agent,2}. {emqx_exhook,1}. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index c99b8c947..abecb72a2 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -61,6 +61,13 @@ %% session table operations -export([create_tables/0]). +%% Remove me later (satisfy checks for an unused BPAPI) +-export([ + do_open_iterator/3, + do_ensure_iterator_closed/1, + do_ensure_all_iterators_closed/1 +]). + -ifdef(TEST). -export([session_open/1]). -endif. @@ -268,13 +275,17 @@ get_subscription(TopicFilter, #{iterators := Iters}) -> {ok, emqx_types:publish_result(), replies(), session()} | {error, emqx_types:reason_code()}. publish(_PacketId, Msg, Session) -> - ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg]), - {ok, persisted, [], Session}. + %% TODO: + Result = emqx_broker:publish(Msg), + {ok, Result, [], Session}. %%-------------------------------------------------------------------- %% Client -> Broker: PUBACK %%-------------------------------------------------------------------- +%% FIXME: parts of the commit offset function are mocked +-dialyzer({nowarn_function, puback/3}). + -spec puback(clientinfo(), emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}. @@ -323,17 +334,16 @@ pubcomp(_ClientInfo, _PacketId, _Session = #{}) -> %%-------------------------------------------------------------------- -spec deliver(clientinfo(), [emqx_types:deliver()], session()) -> - {ok, emqx_types:message(), replies(), session()}. + {ok, replies(), session()}. deliver(_ClientInfo, _Delivers, Session) -> - %% This may be triggered for the system messages. FIXME. + %% TODO: QoS0 and system messages end up here. {ok, [], Session}. --spec handle_timeout(clientinfo(), emqx_session:common_timer_name(), session()) -> +-spec handle_timeout(clientinfo(), _Timeout, session()) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) -> WindowSize = 100, {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize), - %%logger:warning("Inflight: ~p", [Inflight]), ensure_timer(pull), {ok, Publishes, Session#{inflight => Inflight}}; handle_timeout(_ClientInfo, get_streams, Session = #{id := Id}) -> @@ -601,6 +611,26 @@ new_subscription_id(DSSessionId, TopicFilter) -> DSSubId = {DSSessionId, TopicFilter}, {DSSubId, NowMS}. +%%-------------------------------------------------------------------- +%% RPC targets (v1) +%%-------------------------------------------------------------------- + +%% RPC target. +-spec do_open_iterator(emqx_types:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> + {ok, emqx_ds_storage_layer:iterator()} | {error, _Reason}. +do_open_iterator(_TopicFilter, _StartMS, _IteratorID) -> + {error, not_implemented}. + +%% RPC target. +-spec do_ensure_iterator_closed(emqx_ds:iterator_id()) -> ok. +do_ensure_iterator_closed(_IteratorID) -> + ok. + +%% RPC target. +-spec do_ensure_all_iterators_closed(id()) -> ok. +do_ensure_all_iterators_closed(_DSSessionID) -> + ok. + %%-------------------------------------------------------------------- %% Reading batches %%-------------------------------------------------------------------- @@ -677,5 +707,5 @@ export_record(_, _, [], Acc) -> -spec ensure_timer(pull | get_streams) -> ok. ensure_timer(Type) -> - emqx_utils:start_timer(100, {emqx_session, Type}), + _ = emqx_utils:start_timer(100, {emqx_session, Type}), ok. diff --git a/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl b/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl index d35ccd963..e879b495c 100644 --- a/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, open_iterator/4, close_iterator/2, @@ -31,9 +32,11 @@ -define(TIMEOUT, 30_000). introduced_in() -> - %% FIXME "5.3.0". +deprecated_since() -> + "5.4.0". + -spec open_iterator( [node()], emqx_types:words(), diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index c8199239f..1e7f88367 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -30,9 +30,6 @@ %% Message replay API: -export([get_streams/3, make_iterator/3, next/2]). -%% Iterator storage API: --export([save_iterator/3, get_iterator/2]). - %% Misc. API: -export([]). @@ -101,8 +98,6 @@ -type message_id() :: emqx_ds_replication_layer:message_id(). --type iterator_id() :: term(). - -type get_iterator_result(Iterator) :: {ok, Iterator} | undefined. %%================================================================================ @@ -182,14 +177,6 @@ make_iterator(Stream, TopicFilter, StartTime) -> next(Iter, BatchSize) -> emqx_ds_replication_layer:next(Iter, BatchSize). --spec save_iterator(db(), iterator_id(), iterator()) -> ok. -save_iterator(DB, ITRef, Iterator) -> - emqx_ds_replication_layer:save_iterator(DB, ITRef, Iterator). - --spec get_iterator(db(), iterator_id()) -> get_iterator_result(iterator()). -get_iterator(DB, ITRef) -> - emqx_ds_replication_layer:get_iterator(DB, ITRef). - %%================================================================================ %% Internal exports %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 34bb66031..d61dfa906 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -25,9 +25,7 @@ store_batch/3, get_streams/3, make_iterator/3, - next/2, - save_iterator/3, - get_iterator/2 + next/2 ]). %% internal exports: @@ -169,14 +167,6 @@ next(Iter0, BatchSize) -> Other end. --spec save_iterator(db(), emqx_ds:iterator_id(), iterator()) -> ok. -save_iterator(_DB, _ITRef, _Iterator) -> - error(todo). - --spec get_iterator(db(), emqx_ds:iterator_id()) -> emqx_ds:get_iterator_result(iterator()). -get_iterator(_DB, _ITRef) -> - error(todo). - %%================================================================================ %% behavior callbacks %%================================================================================ @@ -198,12 +188,15 @@ do_drop_shard_v1(Shard) -> do_get_streams_v1(Shard, TopicFilter, StartTime) -> emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime). --spec do_make_iterator_v1(shard_id(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> +-spec do_make_iterator_v1( + shard_id(), emqx_ds_storage_layer:stream(), emqx_ds:topic_filter(), emqx_ds:time() +) -> {ok, iterator()} | {error, _}. do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) -> emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime). --spec do_next_v1(shard_id(), Iter, pos_integer()) -> emqx_ds:next_result(Iter). +-spec do_next_v1(shard_id(), emqx_ds_storage_layer:iterator(), pos_integer()) -> + emqx_ds:next_result(emqx_ds_storage_layer:iterator()). do_next_v1(Shard, Iter, BatchSize) -> emqx_ds_storage_layer:next(Shard, Iter, BatchSize). diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index c79f94377..c974b253f 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -28,25 +28,29 @@ %% API funcions %%================================================================================ --spec open_shard(node(), emqx_ds_replication_layer:shard(), emqx_ds:create_db_opts()) -> +-spec open_shard(node(), emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) -> ok. open_shard(Node, Shard, Opts) -> erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [Shard, Opts]). --spec drop_shard(node(), emqx_ds_replication_layer:shard()) -> +-spec drop_shard(node(), emqx_ds_replication_layer:shard_id()) -> ok. drop_shard(Node, Shard) -> erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [Shard]). -spec get_streams( - node(), emqx_ds_replication_layer:shard(), emqx_ds:topic_filter(), emqx_ds:time() + node(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() ) -> [{integer(), emqx_ds_replication_layer:stream()}]. get_streams(Node, Shard, TopicFilter, Time) -> erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]). -spec make_iterator( - node(), emqx_ds_replication_layer:shard(), _Stream, emqx_ds:topic_filter(), emqx_ds:time() + node(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:stream(), + emqx_ds:topic_filter(), + emqx_ds:time() ) -> {ok, emqx_ds_replication_layer:iterator()} | {error, _}. make_iterator(Node, Shard, Stream, TopicFilter, StartTime) -> @@ -55,9 +59,9 @@ make_iterator(Node, Shard, Stream, TopicFilter, StartTime) -> ]). -spec next( - node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer() + node(), emqx_ds_replication_layer:shard_id(), emqx_ds_storage_layer:iterator(), pos_integer() ) -> - {ok, emqx_ds_replication_layer:iterator(), [emqx_types:messages()]} + {ok, emqx_ds_storage_layer:iterator(), [emqx_types:messages()]} | {ok, end_of_stream} | {error, _}. next(Node, Shard, Iter, BatchSize) -> diff --git a/topic_match_test.png b/topic_match_test.png deleted file mode 100644 index 6ff1a8911..000000000 Binary files a/topic_match_test.png and /dev/null differ