diff --git a/Makefile b/Makefile index 254a4b0f9..ed10a09fd 100644 --- a/Makefile +++ b/Makefile @@ -85,7 +85,7 @@ $(REL_PROFILES:%=%-compile): $(REBAR) merge-config .PHONY: ct ct: $(REBAR) merge-config - @$(REBAR) ct --name $(CT_NODE_NAME) -c -v --cover_export_name $(CT_COVER_EXPORT_PREFIX)-ct + ENABLE_COVER_COMPILE=1 $(REBAR) ct --name $(CT_NODE_NAME) -c -v --cover_export_name $(CT_COVER_EXPORT_PREFIX)-ct ## only check bpapi for enterprise profile because it's a super-set. .PHONY: static_checks diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl new file mode 100644 index 000000000..ce57eaa80 --- /dev/null +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -0,0 +1,207 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc This module implements the routines for replaying streams of +%% messages. +-module(emqx_persistent_message_ds_replayer). + +%% API: +-export([new/0, next_packet_id/1, replay/2, commit_offset/3, poll/3]). + +%% internal exports: +-export([]). + +-export_type([inflight/0]). + +-include("emqx_persistent_session_ds.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +%% Note: sequence numbers are monotonic; they don't wrap around: +-type seqno() :: non_neg_integer(). + +-record(range, { + stream :: emqx_ds:stream(), + first :: seqno(), + last :: seqno(), + iterator_next :: emqx_ds:iterator() | undefined +}). + +-type range() :: #range{}. + +-record(inflight, { + next_seqno = 0 :: seqno(), + acked_seqno = 0 :: seqno(), + offset_ranges = [] :: [range()] +}). + +-opaque inflight() :: #inflight{}. + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec new() -> inflight(). +new() -> + #inflight{}. + +-spec next_packet_id(inflight()) -> {emqx_types:packet_id(), inflight()}. +next_packet_id(Inflight0 = #inflight{next_seqno = LastSeqno}) -> + Inflight = Inflight0#inflight{next_seqno = LastSeqno + 1}, + {seqno_to_packet_id(LastSeqno), Inflight}. + +-spec replay(emqx_persistent_session_ds:id(), inflight()) -> + emqx_session:replies(). +replay(_SessionId, _Inflight = #inflight{offset_ranges = _Ranges}) -> + []. + +-spec commit_offset(emqx_persistent_session_ds:id(), emqx_types:packet_id(), inflight()) -> + {_IsValidOffset :: boolean(), inflight()}. +commit_offset(SessionId, PacketId, Inflight0 = #inflight{acked_seqno = AckedSeqno0, next_seqno = NextSeqNo, offset_ranges = Ranges0}) -> + AckedSeqno = packet_id_to_seqno(NextSeqNo, PacketId), + true = AckedSeqno0 < AckedSeqno, + Ranges = lists:filter( + fun(#range{stream = Stream, last = LastSeqno, iterator_next = ItNext}) -> + case LastSeqno =< AckedSeqno of + true -> + %% This range has been fully + %% acked. Remove it and replace saved + %% iterator with the trailing iterator. + update_iterator(SessionId, Stream, ItNext), + false; + false -> + %% This range still has unacked + %% messages: + true + end + end, + Ranges0 + ), + Inflight = Inflight0#inflight{acked_seqno = AckedSeqno, offset_ranges = Ranges}, + {true, Inflight}. + +-spec poll(emqx_persistent_session_ds:id(), inflight(), pos_integer()) -> + {emqx_session:replies(), inflight()}. +poll(SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < 16#7fff -> + #inflight{next_seqno = NextSeqNo0, acked_seqno = AckedSeqno} = + Inflight0, + FetchThreshold = max(1, WindowSize div 2), + FreeSpace = AckedSeqno + WindowSize - NextSeqNo0, + case FreeSpace >= FetchThreshold of + false -> + %% TODO: this branch is meant to avoid fetching data from + %% the DB in chunks that are too small. However, this + %% logic is not exactly good for the latency. Can the + %% client get stuck even? + {[], Inflight0}; + true -> + Streams = shuffle(get_streams(SessionId)), + fetch(SessionId, Inflight0, Streams, FreeSpace, []) + end. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +%%================================================================================ +%% Internal functions +%%================================================================================ + +fetch(_SessionId, Inflight, _Streams = [], _N, Acc) -> + {lists:reverse(Acc), Inflight}; +fetch(_SessionId, Inflight, _Streams, 0, Acc) -> + {lists:reverse(Acc), Inflight}; +fetch(SessionId, Inflight0, [#ds_stream{stream = Stream} | Streams], N, Publishes0) -> + #inflight{next_seqno = FirstSeqNo, offset_ranges = Ranges0} = Inflight0, + ItBegin = get_last_iterator(SessionId, Stream, Ranges0), + {ok, ItEnd, Messages} = emqx_ds:next(ItBegin, N), + {Publishes, Inflight1} = + lists:foldl( + fun(Msg, {PubAcc0, InflightAcc0}) -> + {PacketId, InflightAcc} = next_packet_id(InflightAcc0), + PubAcc = [{PacketId, Msg} | PubAcc0], + {PubAcc, InflightAcc} + end, + {Publishes0, Inflight0}, + Messages + ), + #inflight{next_seqno = LastSeqNo} = Inflight1, + NMessages = LastSeqNo - FirstSeqNo, + case NMessages > 0 of + true -> + Range = #range{ + first = FirstSeqNo, + last = LastSeqNo - 1, + stream = Stream, + iterator_next = ItEnd + }, + Inflight = Inflight1#inflight{offset_ranges = Ranges0 ++ [Range]}, + fetch(SessionId, Inflight, Streams, N - NMessages, Publishes); + false -> + fetch(SessionId, Inflight1, Streams, N, Publishes) + end. + +update_iterator(SessionId, Stream, Iterator) -> + mria:dirty_write(?SESSION_ITER_TAB, #ds_iter{id = {SessionId, Stream}, iter = Iterator}). + +get_last_iterator(SessionId, Stream, Ranges) -> + case lists:keyfind(Stream, #range.stream, lists:reverse(Ranges)) of + false -> + get_iterator(SessionId, Stream); + #range{iterator_next = Next} -> + Next + end. + +get_iterator(SessionId, Stream) -> + Id = {SessionId, Stream}, + [#ds_iter{iter = It}] = mnesia:dirty_read(?SESSION_ITER_TAB, Id), + It. + +get_streams(SessionId) -> + mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId). + +%% Packet ID as defined by MQTT protocol is a 16-bit integer in range +%% 1..FFFF. This function translates internal session sequence number +%% to MQTT packet ID by chopping off most significant bits and adding +%% 1. This assumes that there's never more FFFF in-flight packets at +%% any time: +-spec seqno_to_packet_id(non_neg_integer()) -> emqx_types:packet_id(). +seqno_to_packet_id(Counter) -> + Counter rem 16#ffff + 1. + +%% Reconstruct session counter by adding most significant bits from +%% the current counter to the packet id. +-spec packet_id_to_seqno(non_neg_integer(), emqx_types:packet_id()) -> non_neg_integer(). +packet_id_to_seqno(NextSeqNo, PacketId) -> + N = ((NextSeqNo bsr 16) bsl 16) + PacketId, + case N > NextSeqNo of + true -> N - 16#10000; + false -> N + end. + +-spec shuffle([A]) -> [A]. +shuffle(L0) -> + L1 = lists:map( + fun(A) -> + {rand:uniform(), A} + end, + L0 + ), + L2 = lists:sort(L1), + {_, L} = lists:unzip(L2), + L. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 9bc9e0b91..b8afc771f 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -18,9 +18,12 @@ -include("emqx.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). -include("emqx_mqtt.hrl"). +-include("emqx_persistent_session_ds.hrl"). + %% Session API -export([ create/3, @@ -50,7 +53,7 @@ -export([ deliver/3, replay/3, - % handle_timeout/3, + handle_timeout/3, disconnect/1, terminate/2 ]). @@ -81,10 +84,14 @@ expires_at := timestamp() | never, %% Client’s Subscriptions. iterators := #{topic() => subscription()}, + %% Inflight messages + inflight := emqx_persistent_message_ds_replayer:inflight(), %% props := map() }. +%% -type session() :: #session{}. + -type timestamp() :: emqx_utils_calendar:epoch_millisecond(). -type topic() :: emqx_types:topic(). -type clientinfo() :: emqx_types:clientinfo(). @@ -113,6 +120,8 @@ open(#{clientid := ClientID}, _ConnInfo) -> %% somehow isolate those idling not-yet-expired sessions into a separate process %% space, and move this call back into `emqx_cm` where it belongs. ok = emqx_cm:discard_session(ClientID), + ensure_timer(pull), + ensure_timer(get_streams), case open_session(ClientID) of Session = #{} -> {true, Session, []}; @@ -259,8 +268,8 @@ get_subscription(TopicFilter, #{iterators := Iters}) -> {ok, emqx_types:publish_result(), replies(), session()} | {error, emqx_types:reason_code()}. publish(_PacketId, Msg, Session) -> - % TODO: stub - {ok, emqx_broker:publish(Msg), [], Session}. + ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg]), + {ok, persisted, [], Session}. %%-------------------------------------------------------------------- %% Client -> Broker: PUBACK @@ -269,9 +278,14 @@ publish(_PacketId, Msg, Session) -> -spec puback(clientinfo(), emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}. -puback(_ClientInfo, _PacketId, _Session = #{}) -> - % TODO: stub - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}. +puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) -> + case emqx_persistent_message_ds_replayer:commit_offset(Id, PacketId, Inflight0) of + {true, Inflight} -> + Msg = #message{}, %% TODO + {ok, Msg, [], Session#{inflight => Inflight}}; + {false, _} -> + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} + end. %%-------------------------------------------------------------------- %% Client -> Broker: PUBREC @@ -308,10 +322,23 @@ pubcomp(_ClientInfo, _PacketId, _Session = #{}) -> %%-------------------------------------------------------------------- -spec deliver(clientinfo(), [emqx_types:deliver()], session()) -> - no_return(). -deliver(_ClientInfo, _Delivers, _Session = #{}) -> - % TODO: ensure it's unreachable somehow - error(unexpected). + {ok, emqx_types:message(), replies(), session()}. +deliver(_ClientInfo, _Delivers, Session) -> + %% This may be triggered for the system messages. FIXME. + {ok, [], Session}. + +-spec handle_timeout(clientinfo(), emqx_session:common_timer_name(), session()) -> + {ok, replies(), session()} | {ok, replies(), timeout(), session()}. +handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) -> + WindowSize = 100, + {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize), + %%logger:warning("Inflight: ~p", [Inflight]), + ensure_timer(pull), + {ok, Publishes, Session#{inflight => Inflight}}; +handle_timeout(_ClientInfo, get_streams, Session = #{id := Id}) -> + renew_streams(Id), + ensure_timer(get_streams), + {ok, [], Session}. -spec replay(clientinfo(), [], session()) -> {ok, replies(), session()}. @@ -390,29 +417,11 @@ del_subscription(TopicFilterBin, DSSessionId) -> %% Session tables operations %%-------------------------------------------------------------------- --define(SESSION_TAB, emqx_ds_session). --define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions). --define(DS_MRIA_SHARD, emqx_ds_session_shard). - --record(session, { - %% same as clientid - id :: id(), - %% creation time - created_at :: _Millisecond :: non_neg_integer(), - expires_at = never :: _Millisecond :: non_neg_integer() | never, - %% for future usage - props = #{} :: map() -}). - --record(ds_sub, { - id :: subscription_id(), - start_time :: emqx_ds:time(), - props = #{} :: map(), - extra = #{} :: map() -}). --type ds_sub() :: #ds_sub{}. - create_tables() -> + ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{ + backend => builtin, + storage => {emqx_ds_storage_bitfield_lts, #{}} + }), ok = mria:create_table( ?SESSION_TAB, [ @@ -433,7 +442,29 @@ create_tables() -> {attributes, record_info(fields, ds_sub)} ] ), - ok = mria:wait_for_tables([?SESSION_TAB, ?SESSION_SUBSCRIPTIONS_TAB]), + ok = mria:create_table( + ?SESSION_STREAM_TAB, + [ + {rlog_shard, ?DS_MRIA_SHARD}, + {type, bag}, + {storage, storage()}, + {record_name, ds_stream}, + {attributes, record_info(fields, ds_stream)} + ] + ), + ok = mria:create_table( + ?SESSION_ITER_TAB, + [ + {rlog_shard, ?DS_MRIA_SHARD}, + {type, set}, + {storage, storage()}, + {record_name, ds_iter}, + {attributes, record_info(fields, ds_iter)} + ] + ), + ok = mria:wait_for_tables([ + ?SESSION_TAB, ?SESSION_SUBSCRIPTIONS_TAB, ?SESSION_STREAM_TAB, ?SESSION_ITER_TAB + ]), ok. -dialyzer({nowarn_function, storage/0}). @@ -482,7 +513,8 @@ session_create(SessionId, Props) -> id = SessionId, created_at = erlang:system_time(millisecond), expires_at = never, - props = Props + props = Props, + inflight = emqx_persistent_message_ds_replayer:new() }, ok = mnesia:write(?SESSION_TAB, Session, write), Session. @@ -555,12 +587,12 @@ session_del_subscription(#ds_sub{id = DSSubId}) -> mnesia:delete(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write). session_read_subscriptions(DSSessionId) -> - % NOTE: somewhat convoluted way to trick dialyzer - Pat = erlang:make_tuple(record_info(size, ds_sub), '_', [ - {1, ds_sub}, - {#ds_sub.id, {DSSessionId, '_'}} - ]), - mnesia:match_object(?SESSION_SUBSCRIPTIONS_TAB, Pat, read). + MS = ets:fun2ms( + fun(Sub = #ds_sub{id = {Sess, _}}) when Sess =:= DSSessionId -> + Sub + end + ), + mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read). -spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), emqx_ds:time()}. new_subscription_id(DSSessionId, TopicFilter) -> @@ -568,12 +600,58 @@ new_subscription_id(DSSessionId, TopicFilter) -> DSSubId = {DSSessionId, TopicFilter}, {DSSubId, NowMS}. +%%-------------------------------------------------------------------- +%% Reading batches +%%-------------------------------------------------------------------- + +renew_streams(Id) -> + Subscriptions = ro_transaction(fun() -> session_read_subscriptions(Id) end), + ExistingStreams = ro_transaction(fun() -> mnesia:read(?SESSION_STREAM_TAB, Id) end), + lists:foreach( + fun(#ds_sub{id = {_, TopicFilter}, start_time = StartTime}) -> + renew_streams(Id, ExistingStreams, TopicFilter, StartTime) + end, + Subscriptions + ). + +renew_streams(Id, ExistingStreams, TopicFilter, StartTime) -> + AllStreams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), + transaction( + fun() -> + lists:foreach( + fun({Rank, Stream}) -> + Rec = #ds_stream{ + session = Id, + topic_filter = TopicFilter, + stream = Stream, + rank = Rank + }, + case lists:member(Rec, ExistingStreams) of + true -> + ok; + false -> + mnesia:write(?SESSION_STREAM_TAB, Rec, write), + % StartTime), + {ok, Iterator} = emqx_ds:make_iterator(Stream, TopicFilter, 0), + IterRec = #ds_iter{id = {Id, Stream}, iter = Iterator}, + mnesia:write(?SESSION_ITER_TAB, IterRec, write) + end + end, + AllStreams + ) + end + ). + %%-------------------------------------------------------------------------------- transaction(Fun) -> {atomic, Res} = mria:transaction(?DS_MRIA_SHARD, Fun), Res. +ro_transaction(Fun) -> + {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun), + Res. + %%-------------------------------------------------------------------------------- export_subscriptions(DSSubs) -> @@ -586,7 +664,7 @@ export_subscriptions(DSSubs) -> ). export_session(#session{} = Record) -> - export_record(Record, #session.id, [id, created_at, expires_at, props], #{}). + export_record(Record, #session.id, [id, created_at, expires_at, inflight, props], #{}). export_subscription(#ds_sub{} = Record) -> export_record(Record, #ds_sub.start_time, [start_time, props, extra], #{}). @@ -595,3 +673,8 @@ export_record(Record, I, [Field | Rest], Acc) -> export_record(Record, I + 1, Rest, Acc#{Field => element(I, Record)}); export_record(_, _, [], Acc) -> Acc. + +-spec ensure_timer(pull | get_streams) -> ok. +ensure_timer(Type) -> + emqx_utils:start_timer(100, {emqx_session, Type}), + ok. diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl new file mode 100644 index 000000000..54b077795 --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -0,0 +1,56 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_PERSISTENT_SESSION_DS_HRL_HRL). +-define(EMQX_PERSISTENT_SESSION_DS_HRL_HRL, true). + +-define(SESSION_TAB, emqx_ds_session). +-define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions). +-define(SESSION_STREAM_TAB, emqx_ds_stream_tab). +-define(SESSION_ITER_TAB, emqx_ds_iter_tab). +-define(DS_MRIA_SHARD, emqx_ds_session_shard). + +-record(ds_sub, { + id :: emqx_persistent_session_ds:subscription_id(), + start_time :: emqx_ds:time(), + props = #{} :: map(), + extra = #{} :: map() +}). +-type ds_sub() :: #ds_sub{}. + +-record(ds_stream, { + session :: emqx_persistent_session_ds:id(), + topic_filter :: emqx_ds:topic_filter(), + stream :: emqx_ds:stream(), + rank :: emqx_ds:stream_rank() +}). + +-record(ds_iter, { + id :: {emqx_persistent_session_ds:id(), emqx_ds:stream()}, + iter :: emqx_ds:iterator() +}). + +-record(session, { + %% same as clientid + id :: emqx_persistent_session_ds:id(), + %% creation time + created_at :: _Millisecond :: non_neg_integer(), + expires_at = never :: _Millisecond :: non_neg_integer() | never, + inflight :: emqx_persistent_message_ds_replayer:inflight(), + %% for future usage + props = #{} :: map() +}). + +-endif. diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 32e59a114..db025a457 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -103,8 +103,8 @@ t_messages_persisted(_Config) -> ct:pal("Persisted = ~p", [Persisted]), ?assertEqual( - [M1, M2, M5, M7, M9, M10], - [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted] + lists:sort([M1, M2, M5, M7, M9, M10]), + lists:sort([{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted]) ), ok. @@ -146,11 +146,11 @@ t_messages_persisted_2(_Config) -> ct:pal("Persisted = ~p", [Persisted]), ?assertEqual( - [ + lists:sort([ {T(<<"client/1/topic">>), <<"4">>}, {T(<<"client/2/topic">>), <<"5">>} - ], - [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted] + ]), + lists:sort([{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted]) ), ok. @@ -252,9 +252,13 @@ connect(Opts0 = #{}) -> Client. consume(TopicFiler, StartMS) -> - [{_, Stream}] = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFiler, StartMS), - {ok, It} = emqx_ds:make_iterator(Stream, StartMS), - consume(It). + lists:flatmap( + fun({_Rank, Stream}) -> + {ok, It} = emqx_ds:make_iterator(Stream, StartMS, 0), + consume(It) + end, + emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFiler, StartMS) + ). consume(It) -> case emqx_ds:next(It, 100) of diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index be3bf6e6a..008305671 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -510,6 +510,48 @@ t_process_dies_session_expires(Config) -> emqtt:disconnect(Client2). +t_publish_while_client_is_gone_qos1(Config) -> + %% A persistent session should receive messages in its + %% subscription even if the process owning the session dies. + ConnFun = ?config(conn_fun, Config), + Topic = ?config(topic, Config), + STopic = ?config(stopic, Config), + Payload1 = <<"hello1">>, + Payload2 = <<"hello2">>, + ClientId = ?config(client_id, Config), + {ok, Client1} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, true} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [1]} = emqtt:subscribe(Client1, STopic, qos1), + + ok = emqtt:disconnect(Client1), + maybe_kill_connection_process(ClientId, Config), + + ok = publish(Topic, [Payload1, Payload2]), + + {ok, Client2} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, false} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client2), + Msgs = receive_messages(2), + ?assertMatch([_, _], Msgs), + [Msg2, Msg1] = Msgs, + ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)), + ?assertEqual({ok, 1}, maps:find(qos, Msg1)), + ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)), + ?assertEqual({ok, 1}, maps:find(qos, Msg2)), + + ok = emqtt:disconnect(Client2). + t_publish_while_client_is_gone(init, Config) -> skip_ds_tc(Config); t_publish_while_client_is_gone('end', _Config) -> ok. t_publish_while_client_is_gone(Config) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 941573bf8..c8199239f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -30,6 +30,9 @@ %% Message replay API: -export([get_streams/3, make_iterator/3, next/2]). +%% Iterator storage API: +-export([save_iterator/3, get_iterator/2]). + %% Misc. API: -export([]). @@ -46,7 +49,8 @@ message_id/0, next_result/1, next_result/0, store_batch_result/0, - make_iterator_result/1, make_iterator_result/0 + make_iterator_result/1, make_iterator_result/0, + get_iterator_result/1 ]). %%================================================================================ @@ -97,6 +101,10 @@ -type message_id() :: emqx_ds_replication_layer:message_id(). +-type iterator_id() :: term(). + +-type get_iterator_result(Iterator) :: {ok, Iterator} | undefined. + %%================================================================================ %% API funcions %%================================================================================ @@ -174,6 +182,14 @@ make_iterator(Stream, TopicFilter, StartTime) -> next(Iter, BatchSize) -> emqx_ds_replication_layer:next(Iter, BatchSize). +-spec save_iterator(db(), iterator_id(), iterator()) -> ok. +save_iterator(DB, ITRef, Iterator) -> + emqx_ds_replication_layer:save_iterator(DB, ITRef, Iterator). + +-spec get_iterator(db(), iterator_id()) -> get_iterator_result(iterator()). +get_iterator(DB, ITRef) -> + emqx_ds_replication_layer:get_iterator(DB, ITRef). + %%================================================================================ %% Internal exports %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_helper.erl b/apps/emqx_durable_storage/src/emqx_ds_helper.erl new file mode 100644 index 000000000..5b55831d1 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_helper.erl @@ -0,0 +1,73 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_helper). + +%% API: +-export([create_rr/1]). + +%% internal exports: +-export([]). + +-export_type([rr/0]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-type item() :: {emqx_ds:stream_rank(), emqx_ds:stream()}. + +-type rr() :: #{ + queue := #{term() => [{integer(), emqx_ds:stream()}]}, + active_ring := {[item()], [item()]} +}. + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec create_rr([item()]) -> rr(). +create_rr(Streams) -> + RR0 = #{latest_rank => #{}, active_ring => {[], []}}, + add_streams(RR0, Streams). + +-spec add_streams(rr(), [item()]) -> rr(). +add_streams(#{queue := Q0, active_ring := R0}, Streams) -> + Q1 = lists:foldl( + fun({{RankX, RankY}, Stream}, Acc) -> + maps:update_with(RankX, fun(L) -> [{RankY, Stream} | L] end, Acc) + end, + Q0, + Streams + ), + Q2 = maps:map( + fun(_RankX, Streams1) -> + lists:usort(Streams1) + end, + Q1 + ), + #{queue => Q2, active_ring => R0}. + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +%%================================================================================ +%% Internal exports +%%================================================================================ + +%%================================================================================ +%% Internal functions +%%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 06cead725..9b1ff5c7c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -22,7 +22,9 @@ store_batch/3, get_streams/3, make_iterator/3, - next/2 + next/2, + save_iterator/3, + get_iterator/2 ]). %% internal exports: @@ -42,7 +44,7 @@ -type db() :: emqx_ds:db(). --type shard_id() :: {emqx_ds:db(), atom()}. +-type shard_id() :: {db(), atom()}. %% This record enapsulates the stream entity from the replication %% level. @@ -71,7 +73,7 @@ %% API functions %%================================================================================ --spec list_shards(emqx_ds:db()) -> [shard_id()]. +-spec list_shards(db()) -> [shard_id()]. list_shards(DB) -> %% TODO: milestone 5 lists:map( @@ -81,7 +83,7 @@ list_shards(DB) -> list_nodes() ). --spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok | {error, _}. +-spec open_db(db(), emqx_ds:create_db_opts()) -> ok | {error, _}. open_db(DB, Opts) -> %% TODO: improve error reporting, don't just crash lists:foreach( @@ -92,7 +94,7 @@ open_db(DB, Opts) -> list_nodes() ). --spec drop_db(emqx_ds:db()) -> ok | {error, _}. +-spec drop_db(db()) -> ok | {error, _}. drop_db(DB) -> lists:foreach( fun(Node) -> @@ -102,7 +104,7 @@ drop_db(DB) -> list_nodes() ). --spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> +-spec store_batch(db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). store_batch(DB, Msg, Opts) -> %% TODO: Currently we store messages locally. @@ -112,7 +114,7 @@ store_batch(DB, Msg, Opts) -> -spec get_streams(db(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. get_streams(DB, TopicFilter, StartTime) -> - Shards = emqx_ds_replication_layer:list_shards(DB), + Shards = list_shards(DB), lists:flatmap( fun(Shard) -> Node = node_of_shard(Shard), @@ -164,6 +166,14 @@ next(Iter0, BatchSize) -> Other end. +-spec save_iterator(db(), emqx_ds:iterator_id(), iterator()) -> ok. +save_iterator(_DB, _ITRef, _Iterator) -> + error(todo). + +-spec get_iterator(db(), emqx_ds:iterator_id()) -> emqx_ds:get_iterator_result(iterator()). +get_iterator(_DB, _ITRef) -> + error(todo). + %%================================================================================ %% behavior callbacks %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index bce976559..8b2e3cc61 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -368,7 +368,7 @@ rocksdb_open(Shard, Options) -> -spec db_dir(shard_id()) -> file:filename(). db_dir({DB, ShardId}) -> - lists:flatten([atom_to_list(DB), $:, atom_to_list(ShardId)]). + filename:join("data", lists:flatten([atom_to_list(DB), $:, atom_to_list(ShardId)])). %%-------------------------------------------------------------------------------- %% Schema access diff --git a/tdd b/tdd new file mode 100755 index 000000000..197891df6 --- /dev/null +++ b/tdd @@ -0,0 +1,13 @@ +#!/bin/bash + +make fmt > /dev/null &>1 & + +./rebar3 ct --name ct@127.0.0.1 --readable=true --suite ./_build/test/lib/emqx/test/emqx_persistent_session_SUITE.beam --case t_publish_while_client_is_gone_qos1 --group tcp + +suites=$(cat <