diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index ac9d297de..e0d1685e8 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -23,7 +23,6 @@ -define(SHARED_SUB_SHARD, emqx_shared_sub_shard). -define(CM_SHARD, emqx_cm_shard). -define(ROUTE_SHARD, route_shard). --define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard). %% Banner %%-------------------------------------------------------------------- @@ -92,7 +91,7 @@ -record(route, { topic :: binary(), - dest :: node() | {binary(), node()} | emqx_session:sessionID() + dest :: node() | {binary(), node()} | emqx_session:session_id() }). %%-------------------------------------------------------------------- diff --git a/apps/emqx/include/emqx_session.hrl b/apps/emqx/include/emqx_session.hrl new file mode 100644 index 000000000..fba4cf911 --- /dev/null +++ b/apps/emqx/include/emqx_session.hrl @@ -0,0 +1,57 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_SESSION_HRL). +-define(EMQX_SESSION_HRL, true). + +-record(session, { + %% Client's id + clientid :: emqx_types:clientid(), + id :: emqx_session:session_id(), + %% Is this session a persistent session i.e. was it started with Session-Expiry > 0 + is_persistent :: boolean(), + %% Client’s Subscriptions. + subscriptions :: map(), + %% Max subscriptions allowed + max_subscriptions :: non_neg_integer() | infinity, + %% Upgrade QoS? + upgrade_qos :: boolean(), + %% Client <- Broker: QoS1/2 messages sent to the client but + %% have not been unacked. + inflight :: emqx_inflight:inflight(), + %% All QoS1/2 messages published to when client is disconnected, + %% or QoS1/2 messages pending transmission to the Client. + %% + %% Optionally, QoS0 messages pending transmission to the Client. + mqueue :: emqx_mqueue:mqueue(), + %% Next packet id of the session + next_pkt_id = 1 :: emqx_types:packet_id(), + %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond) + retry_interval :: timeout(), + %% Client -> Broker: QoS2 messages received from the client, but + %% have not been completely acknowledged + awaiting_rel :: map(), + %% Maximum number of awaiting QoS2 messages allowed + max_awaiting_rel :: non_neg_integer() | infinity, + %% Awaiting PUBREL Timeout (Unit: millisecond) + await_rel_timeout :: timeout(), + %% Created at + created_at :: pos_integer(), + %% Durable storage iterators for existing subscriptions + iterators = [] :: [emqx_ds_replay:replay_id()] +}). + +-endif. diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index e13f60654..b6a4c6e7a 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -15,6 +15,7 @@ {emqx_conf,3}. {emqx_dashboard,1}. {emqx_delayed,1}. +{emqx_ds,1}. {emqx_eviction_agent,1}. {emqx_exhook,1}. {emqx_ft_storage_exporter_fs,1}. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index c680560fb..b98222959 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -21,6 +21,7 @@ -include("emqx.hrl"). -include("emqx_cm.hrl"). +-include("emqx_session.hrl"). -include("logger.hrl"). -include("types.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -301,7 +302,16 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> create_session(ClientInfo, ConnInfo) -> Options = get_session_confs(ClientInfo, ConnInfo), - Session = emqx_session:init(Options), + #{clientid := ClientID} = ClientInfo, + Session0 = emqx_session:init(Options), + IteratorIDs = + case emqx_persistent_session_ds:open_session(ClientID) of + {skipped, disabled} -> + []; + {_IsNew, _DSSessionID, Iterators0} -> + Iterators0 + end, + Session = Session0#session{iterators = IteratorIDs}, ok = emqx_metrics:inc('session.created'), ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]), Session. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 27b4f0950..7e9db4707 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -16,15 +16,24 @@ -module(emqx_persistent_session_ds). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + -export([init/0]). --export([persist_message/1]). +-export([ + persist_message/1, + open_session/1, + add_subscription/2 +]). -export([ serialize_message/1, deserialize_message/1 ]). +%% RPC +-export([do_open_iterator/3]). + %% FIXME -define(DS_SHARD, <<"local">>). @@ -72,6 +81,44 @@ store_message(Msg) -> find_subscribers(_Msg) -> [node()]. +open_session(ClientID) -> + ?WHEN_ENABLED(emqx_ds:session_open(ClientID)). + +-spec add_subscription(emqx_types:topic(), emqx_ds:session_id()) -> + {ok, emqx_ds:iterator_id(), _IsNew :: boolean()} | {skipped, disabled}. +add_subscription(TopicFilterBin, DSSessionID) -> + ?WHEN_ENABLED( + begin + TopicFilter = emqx_topic:words(TopicFilterBin), + {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator( + DSSessionID, TopicFilter + ), + case IsNew of + true -> + ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID); + false -> + ok + end, + {ok, IteratorID, IsNew} + end + ). + +-spec open_iterator_on_all_nodes(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. +open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) -> + Nodes = emqx:running_nodes(), + Results = emqx_ds_proto_v1:open_iterator(Nodes, TopicFilter, StartMS, IteratorID), + %% TODO: handle errors + true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results), + ok. + +-spec do_open_iterator(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. +do_open_iterator(TopicFilter, StartMS, IteratorID) -> + Replay = {TopicFilter, StartMS}, + %% FIXME: choose DS shard based on ...? + {ok, It} = emqx_ds_storage_layer:make_iterator(?DS_SHARD, Replay), + ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), + ok. + %% serialize_message(Msg) -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index d838e95d0..8f1933fc1 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -44,6 +44,7 @@ -module(emqx_session). -include("emqx.hrl"). +-include("emqx_session.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). -include("types.hrl"). @@ -101,49 +102,13 @@ %% Export for CT -export([set_field/3]). --type sessionID() :: emqx_guid:guid(). +-type session_id() :: emqx_guid:guid(). -export_type([ session/0, - sessionID/0 + session_id/0 ]). --record(session, { - %% Client's id - clientid :: emqx_types:clientid(), - id :: sessionID(), - %% Is this session a persistent session i.e. was it started with Session-Expiry > 0 - is_persistent :: boolean(), - %% Client’s Subscriptions. - subscriptions :: map(), - %% Max subscriptions allowed - max_subscriptions :: non_neg_integer() | infinity, - %% Upgrade QoS? - upgrade_qos :: boolean(), - %% Client <- Broker: QoS1/2 messages sent to the client but - %% have not been unacked. - inflight :: emqx_inflight:inflight(), - %% All QoS1/2 messages published to when client is disconnected, - %% or QoS1/2 messages pending transmission to the Client. - %% - %% Optionally, QoS0 messages pending transmission to the Client. - mqueue :: emqx_mqueue:mqueue(), - %% Next packet id of the session - next_pkt_id = 1 :: emqx_types:packet_id(), - %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond) - retry_interval :: timeout(), - %% Client -> Broker: QoS2 messages received from the client, but - %% have not been completely acknowledged - awaiting_rel :: map(), - %% Maximum number of awaiting QoS2 messages allowed - max_awaiting_rel :: non_neg_integer() | infinity, - %% Awaiting PUBREL Timeout (Unit: millisecond) - await_rel_timeout :: timeout(), - %% Created at - created_at :: pos_integer() - %% Message deliver latency stats -}). - -type inflight_data_phase() :: wait_ack | wait_comp. -record(inflight_data, { @@ -297,7 +262,9 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout; info(created_at, #session{created_at = CreatedAt}) -> - CreatedAt. + CreatedAt; +info(iterators, #session{iterators = IteratorIds}) -> + IteratorIds. %% @doc Get stats of the session. -spec stats(session()) -> emqx_types:stats(). @@ -324,11 +291,13 @@ subscribe( case IsNew andalso is_subscriptions_full(Session) of false -> ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts), + Session1 = Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}, + Session2 = add_persistent_subscription(TopicFilter, ClientId, Session1), ok = emqx_hooks:run( 'session.subscribed', [ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}] ), - {ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}}; + {ok, Session2}; true -> {error, ?RC_QUOTA_EXCEEDED} end. @@ -341,6 +310,19 @@ is_subscriptions_full(#session{ }) -> maps:size(Subs) >= MaxLimit. +-spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) -> + session(). +add_persistent_subscription(TopicFilterBin, ClientId, Session) -> + case emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId) of + {skipped, disabled} -> + Session; + {ok, IteratorID, _IsNew = true} -> + Iterators = Session#session.iterators, + Session#session{iterators = [IteratorID | Iterators]}; + {ok, _IteratorID, _IsNew = false} -> + Session + end. + %%-------------------------------------------------------------------- %% Client -> Broker: UNSUBSCRIBE %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index 1567f9e62..25484bdf0 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -21,6 +21,7 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). +-include("persistent_session/emqx_persistent_session.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session.erl b/apps/emqx/src/persistent_session/emqx_persistent_session.erl index 111154571..d85e13d67 100644 --- a/apps/emqx/src/persistent_session/emqx_persistent_session.erl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session.erl @@ -115,10 +115,10 @@ storage_backend() -> %% Session message ADT API %%-------------------------------------------------------------------- --spec session_message_info('timestamp' | 'sessionID', sess_msg_key()) -> term(). +-spec session_message_info('timestamp' | 'session_id', sess_msg_key()) -> term(). session_message_info(timestamp, {_, <<>>, <>, ?ABANDONED}) -> TS; session_message_info(timestamp, {_, GUID, _, _}) -> emqx_guid:timestamp(GUID); -session_message_info(sessionID, {SessionID, _, _, _}) -> SessionID. +session_message_info(session_id, {SessionID, _, _, _}) -> SessionID. %%-------------------------------------------------------------------- %% DB API @@ -243,7 +243,7 @@ discard_opt(true, ClientID, Session) -> emqx_session_router:delete_routes(SessionID, Subscriptions), emqx_session:set_field(is_persistent, false, Session). --spec mark_resume_begin(emqx_session:sessionID()) -> emqx_guid:guid(). +-spec mark_resume_begin(emqx_session:session_id()) -> emqx_guid:guid(). mark_resume_begin(SessionID) -> MarkerID = emqx_guid:gen(), put_session_message({SessionID, MarkerID, <<>>, ?MARKER}), @@ -396,12 +396,12 @@ do_mark_as_delivered(SessionID, [{deliver, STopic, Msg} | Left]) -> do_mark_as_delivered(_SessionID, []) -> ok. --spec pending(emqx_session:sessionID()) -> +-spec pending(emqx_session:session_id()) -> [{emqx_types:message(), STopic :: binary()}]. pending(SessionID) -> pending_messages_in_db(SessionID, []). --spec pending(emqx_session:sessionID(), MarkerIDs :: [emqx_guid:guid()]) -> +-spec pending(emqx_session:session_id(), MarkerIDs :: [emqx_guid:guid()]) -> [{emqx_types:message(), STopic :: binary()}]. pending(SessionID, MarkerIds) -> %% TODO: Handle lost MarkerIDs @@ -460,8 +460,8 @@ read_pending_msgs([], Acc) -> lists:reverse(Acc). %% The keys are ordered by -%% {sessionID(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired). -%% {sessionID(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER} +%% {session_id(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired). +%% {session_id(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER} %% where %% <<>> < emqx_guid:guid() %% <<>> < bin_timestamp() @@ -491,7 +491,7 @@ pending_messages({SessionID, PrevMsgId, PrevSTopic, PrevTag} = PrevKey, Acc, Mar false -> pending_messages(Key, Acc, MarkerIds); true -> pending_messages(Key, [{PrevMsgId, PrevSTopic} | Acc], MarkerIds) end; - %% Next sessionID or '$end_of_table' + %% Next session_id or '$end_of_table' _What -> case PrevTag =:= ?UNDELIVERED of false -> {lists:reverse(Acc), MarkerIds}; diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session.hrl b/apps/emqx/src/persistent_session/emqx_persistent_session.hrl index eb4224116..5476d8daf 100644 --- a/apps/emqx/src/persistent_session/emqx_persistent_session.hrl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session.hrl @@ -14,6 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- +-define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard). + -record(session_store, { client_id :: binary(), expiry_interval :: non_neg_integer(), diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl index a4c4e5422..4aa59cdef 100644 --- a/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl @@ -56,6 +56,7 @@ start_link() -> init([]) -> process_flag(trap_exit, true), + mria_rlog:ensure_shard(?PERSISTENT_SESSION_SHARD), {ok, start_message_gc_timer(start_session_gc_timer(#{}))}. handle_call(_Request, _From, State) -> diff --git a/apps/emqx/src/proto/emqx_ds_proto_v1.erl b/apps/emqx/src/proto/emqx_ds_proto_v1.erl new file mode 100644 index 000000000..2283b7e4e --- /dev/null +++ b/apps/emqx/src/proto/emqx_ds_proto_v1.erl @@ -0,0 +1,49 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ds_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + open_iterator/4 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 30_000). + +introduced_in() -> + %% FIXME + "5.3.0". + +-spec open_iterator( + [node()], + emqx_topic:words(), + emqx_ds:time(), + emqx_ds:iterator_id() +) -> + emqx_rpc:erpc_multicall(ok). +open_iterator(Nodes, TopicFilter, StartMS, IteratorID) -> + erpc:multicall( + Nodes, + emqx_persistent_session_ds, + do_open_iterator, + [TopicFilter, StartMS, IteratorID], + ?TIMEOUT + ). diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index 5e8bd4103..ddcb3234c 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -20,6 +20,7 @@ -export([stop/1]). -export([share_load_module/2]). +-export([node_name/1]). -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]). @@ -83,7 +84,7 @@ when }. start(Nodes, ClusterOpts) -> NodeSpecs = mk_nodespecs(Nodes, ClusterOpts), - ct:pal("Starting cluster: ~p", [NodeSpecs]), + ct:pal("Starting cluster:\n ~p", [NodeSpecs]), % 1. Start bare nodes with only basic applications running _ = emqx_utils:pmap(fun start_node_init/1, NodeSpecs, ?TIMEOUT_NODE_START_MS), % 2. Start applications needed to enable clustering @@ -237,6 +238,8 @@ default_appspec(emqx_conf, Spec, _NodeSpecs) -> listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec) } }; +default_appspec(emqx, Spec = #{listeners := true}, _NodeSpecs) -> + #{config => #{listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)}}; default_appspec(_App, _, _) -> #{}. diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index b818e3fec..62702b3bc 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -17,6 +17,8 @@ -module(emqx_persistent_messages_SUITE). -include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -compile(export_all). -compile(nowarn_export_all). @@ -24,25 +26,38 @@ -define(NOW, (calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}])) ). +-define(DS_SHARD, <<"local">>). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(emqx_durable_storage), - ok = emqx_common_test_helpers:start_apps([], fun - (emqx) -> - emqx_common_test_helpers:boot_modules(all), - emqx_config:init_load(emqx_schema, <<"persistent_session_store.ds = true">>), - emqx_app:set_config_loader(?MODULE); - (_) -> - ok - end), + %% avoid inter-suite flakiness... + application:stop(emqx), + application:stop(emqx_durable_storage), + TCApps = emqx_cth_suite:start( + app_specs(), + #{work_dir => ?config(priv_dir, Config)} + ), + [{tc_apps, TCApps} | Config]. + +end_per_suite(Config) -> + TCApps = ?config(tc_apps, Config), + emqx_cth_suite:stop(TCApps), + ok. + +init_per_testcase(t_session_subscription_iterators, Config) -> + Cluster = cluster(), + Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}), + [{nodes, Nodes} | Config]; +init_per_testcase(_TestCase, Config) -> Config. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([]), - application:stop(emqx_durable_storage), +end_per_testcase(t_session_subscription_iterators, Config) -> + Nodes = ?config(nodes, Config), + ok = emqx_cth_cluster:stop(Nodes), + ok; +end_per_testcase(_TestCase, _Config) -> ok. t_messages_persisted(_Config) -> @@ -76,7 +91,7 @@ t_messages_persisted(_Config) -> ct:pal("Results = ~p", [Results]), - Persisted = consume(<<"local">>, {['#'], 0}), + Persisted = consume(?DS_SHARD, {['#'], 0}), ct:pal("Persisted = ~p", [Persisted]), @@ -88,6 +103,98 @@ t_messages_persisted(_Config) -> ok. +%% TODO: test quic and ws too +t_session_subscription_iterators(Config) -> + [Node1, Node2] = ?config(nodes, Config), + Port = get_mqtt_port(Node1, tcp), + Topic = <<"t/topic">>, + SubTopicFilter = <<"t/+">>, + AnotherTopic = <<"u/another-topic">>, + ClientId = <<"myclientid">>, + ?check_trace( + begin + [ + Payload1, + Payload2, + Payload3, + Payload4 + ] = lists:map( + fun(N) -> <<"hello", (integer_to_binary(N))/binary>> end, + lists:seq(1, 4) + ), + ct:pal("starting"), + {ok, Client} = emqtt:start_link([ + {port, Port}, + {clientid, ClientId}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client), + ct:pal("publishing 1"), + Message1 = emqx_message:make(Topic, Payload1), + publish(Node1, Message1), + receive_messages(1), + ct:pal("subscribing 1"), + {ok, _, [2]} = emqtt:subscribe(Client, SubTopicFilter, qos2), + ct:pal("publishing 2"), + Message2 = emqx_message:make(Topic, Payload2), + publish(Node1, Message2), + receive_messages(1), + ct:pal("subscribing 2"), + {ok, _, [1]} = emqtt:subscribe(Client, SubTopicFilter, qos1), + ct:pal("publishing 3"), + Message3 = emqx_message:make(Topic, Payload3), + publish(Node1, Message3), + receive_messages(1), + ct:pal("publishing 4"), + Message4 = emqx_message:make(AnotherTopic, Payload4), + publish(Node1, Message4), + IteratorIds = get_iterator_ids(Node1, ClientId), + emqtt:stop(Client), + #{ + messages => [Message1, Message2, Message3, Message4], + iterator_ids => IteratorIds + } + end, + fun(Results, Trace) -> + ct:pal("trace:\n ~p", [Trace]), + #{ + messages := [_Message1, Message2, Message3 | _], + iterator_ids := IteratorIds + } = Results, + case ?of_kind(ds_session_subscription_added, Trace) of + [] -> + %% Since `emqx_durable_storage' is a dependency of `emqx', it gets + %% compiled in "prod" mode when running emqx standalone tests. + ok; + [_ | _] -> + ?assertMatch( + [ + #{?snk_kind := ds_session_subscription_added}, + #{?snk_kind := ds_session_subscription_present} + ], + ?of_kind( + [ + ds_session_subscription_added, + ds_session_subscription_present + ], + Trace + ) + ), + ok + end, + ?assertMatch([_], IteratorIds), + [IteratorId] = IteratorIds, + ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end), + ExpectedMessages = [Message2, Message3], + ?assertEqual(ExpectedMessages, ReplayMessages1), + %% Different DS shard + ReplayMessages2 = erpc:call(Node2, fun() -> consume(?DS_SHARD, IteratorId) end), + ?assertEqual([], ReplayMessages2), + ok + end + ), + ok. + %% connect(ClientId, CleanStart, EI) -> @@ -103,8 +210,11 @@ connect(ClientId, CleanStart, EI) -> {ok, _} = emqtt:connect(Client), Client. -consume(Shard, Replay) -> +consume(Shard, Replay = {_TopicFiler, _StartMS}) -> {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Replay), + consume(It); +consume(Shard, IteratorId) when is_binary(IteratorId) -> + {ok, It} = emqx_ds_storage_layer:restore_iterator(Shard, IteratorId), consume(It). consume(It) -> @@ -114,3 +224,59 @@ consume(It) -> none -> [] end. + +receive_messages(Count) -> + receive_messages(Count, []). + +receive_messages(0, Msgs) -> + Msgs; +receive_messages(Count, Msgs) -> + receive + {publish, Msg} -> + receive_messages(Count - 1, [Msg | Msgs]); + {deliver, _Topic, Msg} -> + receive_messages(Count - 1, [Msg | Msgs]); + _Other -> + receive_messages(Count, Msgs) + after 5000 -> + Msgs + end. + +publish(Node, Message) -> + erpc:call(Node, emqx, publish, [Message]). + +get_iterator_ids(Node, ClientId) -> + Channel = erpc:call(Node, fun() -> + [ConnPid] = emqx_cm:lookup_channels(ClientId), + sys:get_state(ConnPid) + end), + emqx_connection:info({channel, {session, iterators}}, Channel). + +app_specs() -> + [ + emqx_durable_storage, + {emqx, #{ + before_start => fun() -> + emqx_app:set_config_loader(?MODULE) + end, + config => #{persistent_session_store => #{ds => true}}, + override_env => [{boot_modules, [broker, listeners]}] + }} + ]. + +cluster() -> + Node1 = persistent_messages_SUITE1, + Spec = #{ + role => core, + join_to => emqx_cth_cluster:node_name(Node1), + listeners => true, + apps => app_specs() + }, + [ + {Node1, Spec}, + {persistent_messages_SUITE2, Spec} + ]. + +get_mqtt_port(Node, Type) -> + {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), + Port. diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 07cfabc70..d8736b918 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -267,6 +267,8 @@ receive_messages(Count, Msgs) -> receive {publish, Msg} -> receive_messages(Count - 1, [Msg | Msgs]); + {deliver, _Topic, Msg} -> + receive_messages(Count - 1, [Msg | Msgs]); _Other -> receive_messages(Count, Msgs) after 5000 -> @@ -373,6 +375,26 @@ do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) -> do_publish(Payload, PublishFun, WaitForUnregister) -> do_publish([Payload], PublishFun, WaitForUnregister). +get_replay_messages(ReplayID) -> + DSShard = <<"local">>, + case emqx_ds_storage_layer:restore_iterator(DSShard, ReplayID) of + {ok, It} -> + do_get_replay_messages(It, []); + Error -> + error({"error restoring iterator", #{error => Error, replay_id => ReplayID}}) + end. + +do_get_replay_messages(It, Acc) -> + case emqx_ds_storage_layer:next(It) of + {value, Val, NewIt} -> + Msg = erlang:binary_to_term(Val), + do_get_replay_messages(NewIt, [Msg | Acc]); + none -> + {ok, lists:reverse(Acc)}; + {error, Reason} -> + {error, Reason} + end. + %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 0e9d3032c..56e0b23b8 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -20,6 +20,7 @@ -include_lib("proper/include/proper.hrl"). -include("emqx.hrl"). +-include("emqx_session.hrl"). -include("emqx_access_control.hrl"). %% High level Types @@ -132,33 +133,23 @@ clientinfo() -> sessioninfo() -> ?LET( Session, - {session, clientid(), - % id - sessionid(), - % is_persistent - boolean(), - % subscriptions - subscriptions(), - % max_subscriptions - non_neg_integer(), - % upgrade_qos - boolean(), - % emqx_inflight:inflight() - inflight(), - % emqx_mqueue:mqueue() - mqueue(), - % next_pkt_id - packet_id(), - % retry_interval - safty_timeout(), - % awaiting_rel - awaiting_rel(), - % max_awaiting_rel - non_neg_integer(), - % await_rel_timeout - safty_timeout(), - % created_at - timestamp()}, + #session{ + clientid = clientid(), + id = sessionid(), + is_persistent = boolean(), + subscriptions = subscriptions(), + max_subscriptions = non_neg_integer(), + upgrade_qos = boolean(), + inflight = inflight(), + mqueue = mqueue(), + next_pkt_id = packet_id(), + retry_interval = safty_timeout(), + awaiting_rel = awaiting_rel(), + max_awaiting_rel = non_neg_integer(), + await_rel_timeout = safty_timeout(), + created_at = timestamp(), + iterators = [] + }, emqx_session:info(Session) ). diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 9eccf8c16..a69357975 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -15,6 +15,8 @@ %%-------------------------------------------------------------------- -module(emqx_ds). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + %% API: -export([ensure_shard/2]). %% Messages: @@ -56,7 +58,7 @@ -type iterator() :: term(). --opaque iterator_id() :: binary(). +-type iterator_id() :: binary(). %%-type session() :: #session{}. @@ -73,7 +75,8 @@ %% Timestamp %% Earliest possible timestamp is 0. -%% TODO granularity? +%% TODO granularity? Currently, we should always use micro second, as that's the unit we +%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. -type time() :: non_neg_integer(). %%================================================================================ @@ -129,11 +132,13 @@ session_open(ClientID) -> fun() -> case mnesia:read(?SESSION_TAB, ClientID) of [#session{iterators = Iterators}] -> - {false, ClientID, Iterators}; + IteratorIDs = maps:values(Iterators), + {false, ClientID, IteratorIDs}; [] -> - Session = #session{id = ClientID, iterators = []}, + Iterators = #{}, + Session = #session{id = ClientID, iterators = Iterators}, mnesia:write(?SESSION_TAB, Session, write), - {true, ClientID, []} + {true, ClientID, _IteratorIDs = []} end end ), @@ -160,10 +165,38 @@ session_suspend(_SessionId) -> %% @doc Called when a client subscribes to a topic. Idempotent. -spec session_add_iterator(session_id(), emqx_topic:words()) -> - {ok, iterator_id()} | {error, session_not_found}. -session_add_iterator(_SessionId, _TopicFilter) -> - %% TODO - {ok, <<"">>}. + {ok, iterator_id(), time(), _IsNew :: boolean()} | {error, session_not_found}. +session_add_iterator(DSSessionId, TopicFilter) -> + {atomic, Ret} = + mria:transaction( + ?DS_SHARD, + fun() -> + case mnesia:wread({?SESSION_TAB, DSSessionId}) of + [] -> + {error, session_not_found}; + [#session{iterators = #{TopicFilter := IteratorId}}] -> + StartMS = get_start_ms(IteratorId, DSSessionId), + ?tp( + ds_session_subscription_present, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = false, + {ok, IteratorId, StartMS, IsNew}; + [#session{iterators = Iterators0} = Session0] -> + {IteratorId, StartMS} = new_iterator_id(DSSessionId), + Iterators = Iterators0#{TopicFilter => IteratorId}, + Session = Session0#session{iterators = Iterators}, + mnesia:write(?SESSION_TAB, Session, write), + ?tp( + ds_session_subscription_added, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = true, + {ok, IteratorId, StartMS, IsNew} + end + end + ), + Ret. %% @doc Called when a client unsubscribes from a topic. Returns `true' %% if the session contained the subscription or `false' if it wasn't @@ -201,3 +234,14 @@ iterator_stats() -> %%================================================================================ %% Internal functions %%================================================================================ + +-spec new_iterator_id(session_id()) -> {iterator_id(), time()}. +new_iterator_id(DSSessionId) -> + NowMS = erlang:system_time(microsecond), + NowMSBin = integer_to_binary(NowMS), + {<>, NowMS}. + +-spec get_start_ms(iterator_id(), emqx_session:session_id()) -> time(). +get_start_ms(IteratorId, SessionId) -> + <> = IteratorId, + binary_to_integer(StartMSBin). diff --git a/apps/emqx_durable_storage/src/emqx_ds_int.hrl b/apps/emqx_durable_storage/src/emqx_ds_int.hrl index 96688ede6..fa11a6600 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_int.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_int.hrl @@ -21,7 +21,7 @@ -record(session, { id :: emqx_ds:session_id(), - iterators :: [{emqx_topic:words(), emqx_ds:iterator_id()}] + iterators :: #{emqx_topic:words() => emqx_ds:iterator_id()} }). -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replay.erl b/apps/emqx_durable_storage/src/emqx_ds_replay.erl index a66cee7fd..b9ffa32ac 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replay.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replay.erl @@ -15,7 +15,7 @@ -type replay_id() :: binary(). -type replay() :: { - _TopicFilter :: emqx_ds:topic(), + _TopicFilter :: emqx_ds:words(), _StartTime :: emqx_ds:time() }. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 7c73dafaf..ac4649d94 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -13,7 +13,13 @@ -export([make_iterator/2, next/1]). --export([preserve_iterator/2, restore_iterator/2, discard_iterator/2]). +-export([ + preserve_iterator/2, + restore_iterator/2, + discard_iterator/2, + is_iterator_present/2, + discard_iterator_prefix/2 +]). %% behaviour callbacks: -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -160,10 +166,10 @@ next(It = #it{module = Mod, data = ItData}) -> end end. --spec preserve_iterator(iterator(), emqx_ds_replay:replay_id()) -> +-spec preserve_iterator(iterator(), emqx_ds:iterator_id()) -> ok | {error, _TODO}. -preserve_iterator(It = #it{}, ReplayID) -> - iterator_put_state(ReplayID, It). +preserve_iterator(It = #it{}, IteratorID) -> + iterator_put_state(IteratorID, It). -spec restore_iterator(emqx_ds:shard(), emqx_ds_replay:replay_id()) -> {ok, iterator()} | {error, _TODO}. @@ -177,11 +183,27 @@ restore_iterator(Shard, ReplayID) -> Error end. --spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> +-spec is_iterator_present(emqx_ds:shard(), emqx_ds_replay:replay_id()) -> + boolean(). +is_iterator_present(Shard, ReplayID) -> + %% TODO: use keyMayExist after added to wrapper? + case iterator_get_state(Shard, ReplayID) of + {ok, _} -> + true; + _ -> + false + end. + +-spec discard_iterator(emqx_ds:shard(), emqx_ds_replay:replay_id()) -> ok | {error, _TODO}. discard_iterator(Shard, ReplayID) -> iterator_delete(Shard, ReplayID). +-spec discard_iterator_prefix(emqx_ds:shard(), binary()) -> + ok | {error, _TODO}. +discard_iterator_prefix(Shard, KeyPrefix) -> + do_discard_iterator_prefix(Shard, KeyPrefix). + %%================================================================================ %% behaviour callbacks %%================================================================================ @@ -391,6 +413,32 @@ restore_iterator_state( It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}}, open_restore_iterator(meta_get_gen(Shard, Gen), It, State). +do_discard_iterator_prefix(Shard, KeyPrefix) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), + case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of + {ok, It} -> + NextAction = {seek, KeyPrefix}, + do_discard_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction); + Error -> + Error + end. + +do_discard_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction) -> + case rocksdb:iterator_move(It, NextAction) of + {ok, K = <>, _V} -> + ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS), + do_discard_iterator_prefix(DBHandle, CF, It, KeyPrefix, next); + {ok, _K, _V} -> + ok = rocksdb:iterator_close(It), + ok; + {error, invalid_iterator} -> + ok = rocksdb:iterator_close(It), + ok; + Error -> + ok = rocksdb:iterator_close(It), + Error + end. + %% Functions for dealing with the metadata stored persistently in rocksdb -define(CURRENT_GEN, <<"current">>). diff --git a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src index ecf9dd270..367ade691 100644 --- a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src +++ b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src @@ -2,7 +2,7 @@ {application, emqx_durable_storage, [ {description, "Message persistence and subscription replays for EMQX"}, % strict semver, bump manually! - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, rocksdb, gproc, mria]}, diff --git a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl index bbe16f518..10431eb1a 100644 --- a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl +++ b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl @@ -14,6 +14,8 @@ -opaque t() :: ets:tid(). +-export_type([t/0]). + -spec open() -> t(). open() -> ets:new(?MODULE, [ordered_set, {keypos, 1}]). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 18ac65ae6..d9b6d9bd5 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -927,7 +927,7 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) -> retry_interval, upgrade_qos, zone, - %% sessionID, defined in emqx_session.erl + %% session_id, defined in emqx_session.erl id ], TimesKeys = [created_at, connected_at, disconnected_at],