diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index ac9d297de..e0d1685e8 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -23,7 +23,6 @@ -define(SHARED_SUB_SHARD, emqx_shared_sub_shard). -define(CM_SHARD, emqx_cm_shard). -define(ROUTE_SHARD, route_shard). --define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard). %% Banner %%-------------------------------------------------------------------- @@ -92,7 +91,7 @@ -record(route, { topic :: binary(), - dest :: node() | {binary(), node()} | emqx_session:sessionID() + dest :: node() | {binary(), node()} | emqx_session:session_id() }). %%-------------------------------------------------------------------- diff --git a/apps/emqx/include/emqx_session.hrl b/apps/emqx/include/emqx_session.hrl new file mode 100644 index 000000000..3fea157ed --- /dev/null +++ b/apps/emqx/include/emqx_session.hrl @@ -0,0 +1,55 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_SESSION_HRL). +-define(EMQX_SESSION_HRL, true). + +-record(session, { + %% Client's id + clientid :: emqx_types:clientid(), + id :: emqx_session:session_id(), + %% Is this session a persistent session i.e. was it started with Session-Expiry > 0 + is_persistent :: boolean(), + %% Client’s Subscriptions. + subscriptions :: map(), + %% Max subscriptions allowed + max_subscriptions :: non_neg_integer() | infinity, + %% Upgrade QoS? + upgrade_qos :: boolean(), + %% Client <- Broker: QoS1/2 messages sent to the client but + %% have not been unacked. + inflight :: emqx_inflight:inflight(), + %% All QoS1/2 messages published to when client is disconnected, + %% or QoS1/2 messages pending transmission to the Client. + %% + %% Optionally, QoS0 messages pending transmission to the Client. + mqueue :: emqx_mqueue:mqueue(), + %% Next packet id of the session + next_pkt_id = 1 :: emqx_types:packet_id(), + %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond) + retry_interval :: timeout(), + %% Client -> Broker: QoS2 messages received from the client, but + %% have not been completely acknowledged + awaiting_rel :: map(), + %% Maximum number of awaiting QoS2 messages allowed + max_awaiting_rel :: non_neg_integer() | infinity, + %% Awaiting PUBREL Timeout (Unit: millisecond) + await_rel_timeout :: timeout(), + %% Created at + created_at :: pos_integer() +}). + +-endif. diff --git a/apps/emqx/integration_test/emqx_ds_SUITE.erl b/apps/emqx/integration_test/emqx_ds_SUITE.erl new file mode 100644 index 000000000..842782e35 --- /dev/null +++ b/apps/emqx/integration_test/emqx_ds_SUITE.erl @@ -0,0 +1,210 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ds_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(DS_SHARD, <<"local">>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + TCApps = emqx_cth_suite:start( + app_specs(), + #{work_dir => ?config(priv_dir, Config)} + ), + [{tc_apps, TCApps} | Config]. + +end_per_suite(Config) -> + TCApps = ?config(tc_apps, Config), + emqx_cth_suite:stop(TCApps), + ok. + +init_per_testcase(t_session_subscription_idempotency, Config) -> + Cluster = cluster(#{n => 1}), + ClusterOpts = #{work_dir => ?config(priv_dir, Config)}, + NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts), + Nodes = emqx_cth_cluster:start(Cluster, ClusterOpts), + [ + {cluster, Cluster}, + {node_specs, NodeSpecs}, + {cluster_opts, ClusterOpts}, + {nodes, Nodes} + | Config + ]; +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(t_session_subscription_idempotency, Config) -> + Nodes = ?config(nodes, Config), + ok = emqx_cth_cluster:stop(Nodes), + ok; +end_per_testcase(_TestCase, _Config) -> + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +cluster(#{n := N}) -> + Node1 = ds_SUITE1, + Spec = #{ + role => core, + join_to => emqx_cth_cluster:node_name(Node1), + apps => app_specs() + }, + [ + {Node1, Spec} + | lists:map( + fun(M) -> + Name = binary_to_atom(<<"ds_SUITE", (integer_to_binary(M))/binary>>), + {Name, Spec} + end, + lists:seq(2, N) + ) + ]. + +app_specs() -> + [ + emqx_durable_storage, + {emqx, #{ + before_start => fun() -> + emqx_app:set_config_loader(?MODULE) + end, + config => #{persistent_session_store => #{ds => true}}, + override_env => [{boot_modules, [broker, listeners]}] + }} + ]. + +get_mqtt_port(Node, Type) -> + {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), + Port. + +get_all_iterator_ids(Node) -> + Fn = fun(K, _V, Acc) -> [K | Acc] end, + erpc:call(Node, fun() -> + emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, []) + end). + +wait_nodeup(Node) -> + ?retry( + _Sleep0 = 500, + _Attempts0 = 50, + pong = net_adm:ping(Node) + ). + +wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) -> + #{override_env := Env} = proplists:get_value(gen_rpc, Apps), + Port = proplists:get_value(tcp_server_port, Env), + ?retry( + _Sleep0 = 500, + _Attempts0 = 50, + false = emqx_common_test_helpers:is_tcp_server_available("127.0.0.1", Port) + ). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_session_subscription_idempotency(Config) -> + [Node1Spec | _] = ?config(node_specs, Config), + [Node1] = ?config(nodes, Config), + Port = get_mqtt_port(Node1, tcp), + SubTopicFilter = <<"t/+">>, + ClientId = <<"myclientid">>, + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := persistent_session_ds_iterator_added}, + _NEvents0 = 1, + #{?snk_kind := will_restart_node}, + _Guard0 = true + ), + ?force_ordering( + #{?snk_kind := restarted_node}, + _NEvents1 = 1, + #{?snk_kind := persistent_session_ds_open_iterators, ?snk_span := start}, + _Guard1 = true + ), + + spawn_link(fun() -> + ?tp(will_restart_node, #{}), + ct:pal("restarting node ~p", [Node1]), + true = monitor_node(Node1, true), + ok = erpc:call(Node1, init, restart, []), + receive + {nodedown, Node1} -> + ok + after 10_000 -> + ct:fail("node ~p didn't stop", [Node1]) + end, + ct:pal("waiting for nodeup ~p", [Node1]), + wait_nodeup(Node1), + wait_gen_rpc_down(Node1Spec), + ct:pal("restarting apps on ~p", [Node1]), + Apps = maps:get(apps, Node1Spec), + ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]), + _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]), + %% have to re-inject this so that we may stop the node succesfully at the + %% end.... + ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec), + ct:pal("node ~p restarted", [Node1]), + ?tp(restarted_node, #{}), + ok + end), + + ct:pal("starting 1"), + {ok, Client0} = emqtt:start_link([ + {port, Port}, + {clientid, ClientId}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client0), + ct:pal("subscribing 1"), + process_flag(trap_exit, true), + catch emqtt:subscribe(Client0, SubTopicFilter, qos2), + receive + {'EXIT', {shutdown, _}} -> + ok + after 0 -> ok + end, + process_flag(trap_exit, false), + + {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000), + ct:pal("starting 2"), + {ok, Client1} = emqtt:start_link([ + {port, Port}, + {clientid, ClientId}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client1), + ct:pal("subscribing 2"), + {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2), + + ok = emqtt:stop(Client1), + + ok + end, + fun(Trace) -> + ct:pal("trace:\n ~p", [Trace]), + %% Exactly one iterator should have been opened. + ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), + ?assertMatch( + {_IsNew = false, ClientId}, + erpc:call(Node1, emqx_ds, session_open, [ClientId]) + ), + ok + end + ), + ok. diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index e13f60654..68d42ee01 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -41,6 +41,7 @@ {emqx_node_rebalance_evacuation,1}. {emqx_node_rebalance_status,1}. {emqx_persistent_session,1}. +{emqx_persistent_session_ds,1}. {emqx_plugins,1}. {emqx_prometheus,1}. {emqx_resource,1}. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index c680560fb..2cc2b72b4 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -21,6 +21,7 @@ -include("emqx.hrl"). -include("emqx_cm.hrl"). +-include("emqx_session.hrl"). -include("logger.hrl"). -include("types.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -301,7 +302,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> create_session(ClientInfo, ConnInfo) -> Options = get_session_confs(ClientInfo, ConnInfo), - Session = emqx_session:init(Options), + Session = emqx_session:init_and_open(Options), ok = emqx_metrics:inc('session.created'), ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]), Session. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 27b4f0950..dc615fd5b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -16,15 +16,24 @@ -module(emqx_persistent_session_ds). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + -export([init/0]). --export([persist_message/1]). +-export([ + persist_message/1, + open_session/1, + add_subscription/2 +]). -export([ serialize_message/1, deserialize_message/1 ]). +%% RPC +-export([do_open_iterator/3]). + %% FIXME -define(DS_SHARD, <<"local">>). @@ -72,6 +81,55 @@ store_message(Msg) -> find_subscribers(_Msg) -> [node()]. +open_session(ClientID) -> + ?WHEN_ENABLED(emqx_ds:session_open(ClientID)). + +-spec add_subscription(emqx_types:topic(), emqx_ds:session_id()) -> + {ok, emqx_ds:iterator_id(), IsNew :: boolean()} | {skipped, disabled}. +add_subscription(TopicFilterBin, DSSessionID) -> + ?WHEN_ENABLED( + begin + TopicFilter = emqx_topic:words(TopicFilterBin), + {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator( + DSSessionID, TopicFilter + ), + Ctx = #{ + iterator_id => IteratorID, + start_time => StartMS, + is_new => IsNew + }, + ?tp(persistent_session_ds_iterator_added, Ctx), + ?tp_span( + persistent_session_ds_open_iterators, + Ctx, + ok = open_iterator_on_all_shards(TopicFilter, StartMS, IteratorID) + ), + {ok, IteratorID, IsNew} + end + ). + +-spec open_iterator_on_all_shards(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. +open_iterator_on_all_shards(TopicFilter, StartMS, IteratorID) -> + ?tp(persistent_session_ds_will_open_iterators, #{ + iterator_id => IteratorID, + start_time => StartMS + }), + %% Note: currently, shards map 1:1 to nodes, but this will change in the future. + Nodes = emqx:running_nodes(), + Results = emqx_persistent_session_ds_proto_v1:open_iterator( + Nodes, TopicFilter, StartMS, IteratorID + ), + %% TODO: handle errors + true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results), + ok. + +%% RPC target. +-spec do_open_iterator(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. +do_open_iterator(TopicFilter, StartMS, IteratorID) -> + Replay = {TopicFilter, StartMS}, + {ok, _It} = emqx_ds_storage_layer:ensure_iterator(?DS_SHARD, IteratorID, Replay), + ok. + %% serialize_message(Msg) -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index d838e95d0..0c051f002 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -44,6 +44,7 @@ -module(emqx_session). -include("emqx.hrl"). +-include("emqx_session.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). -include("types.hrl"). @@ -59,7 +60,7 @@ unpersist/1 ]). --export([init/1]). +-export([init/1, init_and_open/1]). -export([ info/1, @@ -101,49 +102,13 @@ %% Export for CT -export([set_field/3]). --type sessionID() :: emqx_guid:guid(). +-type session_id() :: emqx_guid:guid(). -export_type([ session/0, - sessionID/0 + session_id/0 ]). --record(session, { - %% Client's id - clientid :: emqx_types:clientid(), - id :: sessionID(), - %% Is this session a persistent session i.e. was it started with Session-Expiry > 0 - is_persistent :: boolean(), - %% Client’s Subscriptions. - subscriptions :: map(), - %% Max subscriptions allowed - max_subscriptions :: non_neg_integer() | infinity, - %% Upgrade QoS? - upgrade_qos :: boolean(), - %% Client <- Broker: QoS1/2 messages sent to the client but - %% have not been unacked. - inflight :: emqx_inflight:inflight(), - %% All QoS1/2 messages published to when client is disconnected, - %% or QoS1/2 messages pending transmission to the Client. - %% - %% Optionally, QoS0 messages pending transmission to the Client. - mqueue :: emqx_mqueue:mqueue(), - %% Next packet id of the session - next_pkt_id = 1 :: emqx_types:packet_id(), - %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond) - retry_interval :: timeout(), - %% Client -> Broker: QoS2 messages received from the client, but - %% have not been completely acknowledged - awaiting_rel :: map(), - %% Maximum number of awaiting QoS2 messages allowed - max_awaiting_rel :: non_neg_integer() | infinity, - %% Awaiting PUBREL Timeout (Unit: millisecond) - await_rel_timeout :: timeout(), - %% Created at - created_at :: pos_integer() - %% Message deliver latency stats -}). - -type inflight_data_phase() :: wait_ack | wait_comp. -record(inflight_data, { @@ -201,6 +166,13 @@ %% Init a Session %%-------------------------------------------------------------------- +-spec init_and_open(options()) -> session(). +init_and_open(Options) -> + #{clientid := ClientID} = Options, + Session0 = emqx_session:init(Options), + _ = emqx_persistent_session_ds:open_session(ClientID), + Session0. + -spec init(options()) -> session(). init(Opts) -> MaxInflight = maps:get(max_inflight, Opts), @@ -324,11 +296,13 @@ subscribe( case IsNew andalso is_subscriptions_full(Session) of false -> ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts), + Session1 = Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}, + Session2 = add_persistent_subscription(TopicFilter, ClientId, Session1), ok = emqx_hooks:run( 'session.subscribed', [ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}] ), - {ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}}; + {ok, Session2}; true -> {error, ?RC_QUOTA_EXCEEDED} end. @@ -341,6 +315,12 @@ is_subscriptions_full(#session{ }) -> maps:size(Subs) >= MaxLimit. +-spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) -> + session(). +add_persistent_subscription(TopicFilterBin, ClientId, Session) -> + _ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId), + Session. + %%-------------------------------------------------------------------- %% Client -> Broker: UNSUBSCRIBE %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index 1567f9e62..25484bdf0 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -21,6 +21,7 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). +-include("persistent_session/emqx_persistent_session.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session.erl b/apps/emqx/src/persistent_session/emqx_persistent_session.erl index 111154571..d85e13d67 100644 --- a/apps/emqx/src/persistent_session/emqx_persistent_session.erl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session.erl @@ -115,10 +115,10 @@ storage_backend() -> %% Session message ADT API %%-------------------------------------------------------------------- --spec session_message_info('timestamp' | 'sessionID', sess_msg_key()) -> term(). +-spec session_message_info('timestamp' | 'session_id', sess_msg_key()) -> term(). session_message_info(timestamp, {_, <<>>, <>, ?ABANDONED}) -> TS; session_message_info(timestamp, {_, GUID, _, _}) -> emqx_guid:timestamp(GUID); -session_message_info(sessionID, {SessionID, _, _, _}) -> SessionID. +session_message_info(session_id, {SessionID, _, _, _}) -> SessionID. %%-------------------------------------------------------------------- %% DB API @@ -243,7 +243,7 @@ discard_opt(true, ClientID, Session) -> emqx_session_router:delete_routes(SessionID, Subscriptions), emqx_session:set_field(is_persistent, false, Session). --spec mark_resume_begin(emqx_session:sessionID()) -> emqx_guid:guid(). +-spec mark_resume_begin(emqx_session:session_id()) -> emqx_guid:guid(). mark_resume_begin(SessionID) -> MarkerID = emqx_guid:gen(), put_session_message({SessionID, MarkerID, <<>>, ?MARKER}), @@ -396,12 +396,12 @@ do_mark_as_delivered(SessionID, [{deliver, STopic, Msg} | Left]) -> do_mark_as_delivered(_SessionID, []) -> ok. --spec pending(emqx_session:sessionID()) -> +-spec pending(emqx_session:session_id()) -> [{emqx_types:message(), STopic :: binary()}]. pending(SessionID) -> pending_messages_in_db(SessionID, []). --spec pending(emqx_session:sessionID(), MarkerIDs :: [emqx_guid:guid()]) -> +-spec pending(emqx_session:session_id(), MarkerIDs :: [emqx_guid:guid()]) -> [{emqx_types:message(), STopic :: binary()}]. pending(SessionID, MarkerIds) -> %% TODO: Handle lost MarkerIDs @@ -460,8 +460,8 @@ read_pending_msgs([], Acc) -> lists:reverse(Acc). %% The keys are ordered by -%% {sessionID(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired). -%% {sessionID(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER} +%% {session_id(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired). +%% {session_id(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER} %% where %% <<>> < emqx_guid:guid() %% <<>> < bin_timestamp() @@ -491,7 +491,7 @@ pending_messages({SessionID, PrevMsgId, PrevSTopic, PrevTag} = PrevKey, Acc, Mar false -> pending_messages(Key, Acc, MarkerIds); true -> pending_messages(Key, [{PrevMsgId, PrevSTopic} | Acc], MarkerIds) end; - %% Next sessionID or '$end_of_table' + %% Next session_id or '$end_of_table' _What -> case PrevTag =:= ?UNDELIVERED of false -> {lists:reverse(Acc), MarkerIds}; diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session.hrl b/apps/emqx/src/persistent_session/emqx_persistent_session.hrl index eb4224116..5476d8daf 100644 --- a/apps/emqx/src/persistent_session/emqx_persistent_session.hrl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session.hrl @@ -14,6 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- +-define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard). + -record(session_store, { client_id :: binary(), expiry_interval :: non_neg_integer(), diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl index a4c4e5422..4aa59cdef 100644 --- a/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl @@ -56,6 +56,7 @@ start_link() -> init([]) -> process_flag(trap_exit, true), + mria_rlog:ensure_shard(?PERSISTENT_SESSION_SHARD), {ok, start_message_gc_timer(start_session_gc_timer(#{}))}. handle_call(_Request, _From, State) -> diff --git a/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl b/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl new file mode 100644 index 000000000..cd348cc2c --- /dev/null +++ b/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl @@ -0,0 +1,49 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_ds_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + open_iterator/4 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 30_000). + +introduced_in() -> + %% FIXME + "5.3.0". + +-spec open_iterator( + [node()], + emqx_topic:words(), + emqx_ds:time(), + emqx_ds:iterator_id() +) -> + emqx_rpc:erpc_multicall(ok). +open_iterator(Nodes, TopicFilter, StartMS, IteratorID) -> + erpc:multicall( + Nodes, + emqx_persistent_session_ds, + do_open_iterator, + [TopicFilter, StartMS, IteratorID], + ?TIMEOUT + ). diff --git a/apps/emqx/test/emqx_crl_cache_SUITE.erl b/apps/emqx/test/emqx_crl_cache_SUITE.erl index 6c6337038..248013ce9 100644 --- a/apps/emqx/test/emqx_crl_cache_SUITE.erl +++ b/apps/emqx/test/emqx_crl_cache_SUITE.erl @@ -41,6 +41,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> + emqx_config:erase_all(), ok. init_per_testcase(TestCase, Config) when diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index 5e8bd4103..e24600181 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -17,9 +17,11 @@ -module(emqx_cth_cluster). -export([start/2]). --export([stop/1]). +-export([stop/1, stop_node/1]). -export([share_load_module/2]). +-export([node_name/1, mk_nodespecs/2]). +-export([start_apps/2, set_node_opts/2]). -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]). @@ -83,7 +85,7 @@ when }. start(Nodes, ClusterOpts) -> NodeSpecs = mk_nodespecs(Nodes, ClusterOpts), - ct:pal("Starting cluster: ~p", [NodeSpecs]), + ct:pal("Starting cluster:\n ~p", [NodeSpecs]), % 1. Start bare nodes with only basic applications running _ = emqx_utils:pmap(fun start_node_init/1, NodeSpecs, ?TIMEOUT_NODE_START_MS), % 2. Start applications needed to enable clustering @@ -237,6 +239,8 @@ default_appspec(emqx_conf, Spec, _NodeSpecs) -> listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec) } }; +default_appspec(emqx, Spec, _NodeSpecs) -> + #{config => #{listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)}}; default_appspec(_App, _, _) -> #{}. @@ -285,17 +289,20 @@ load_apps(Node, #{apps := Apps}) -> erpc:call(Node, emqx_cth_suite, load_apps, [Apps]). start_apps_clustering(Node, #{apps := Apps} = Spec) -> - SuiteOpts = maps:with([work_dir], Spec), + SuiteOpts = suite_opts(Spec), AppsClustering = [lists:keyfind(App, 1, Apps) || App <- ?APPS_CLUSTERING], _Started = erpc:call(Node, emqx_cth_suite, start, [AppsClustering, SuiteOpts]), ok. start_apps(Node, #{apps := Apps} = Spec) -> - SuiteOpts = maps:with([work_dir], Spec), + SuiteOpts = suite_opts(Spec), AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)], _Started = erpc:call(Node, emqx_cth_suite, start_apps, [AppsRest, SuiteOpts]), ok. +suite_opts(Spec) -> + maps:with([work_dir], Spec). + maybe_join_cluster(_Node, #{role := replicant}) -> ok; maybe_join_cluster(Node, Spec) -> diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 9b3e58da4..80b3a578c 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -101,7 +101,13 @@ when %% function will raise an error. work_dir := file:name() }. -start(Apps, SuiteOpts = #{work_dir := WorkDir}) -> +start(Apps, SuiteOpts0 = #{work_dir := WorkDir0}) -> + %% when running CT on the whole app, it seems like `priv_dir` is the same on all + %% suites and leads to the "clean slate" verificatin to fail. + WorkDir = binary_to_list( + filename:join([WorkDir0, emqx_guid:to_hexstr(emqx_guid:gen())]) + ), + SuiteOpts = SuiteOpts0#{work_dir := WorkDir}, % 1. Prepare appspec instructions AppSpecs = [mk_appspec(App, SuiteOpts) || App <- Apps], % 2. Load every app so that stuff scanning attributes of loaded modules works diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index b818e3fec..db22b19e6 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -17,32 +17,46 @@ -module(emqx_persistent_messages_SUITE). -include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -compile(export_all). -compile(nowarn_export_all). --define(NOW, - (calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}])) -). +-define(DS_SHARD, <<"local">>). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(emqx_durable_storage), - ok = emqx_common_test_helpers:start_apps([], fun - (emqx) -> - emqx_common_test_helpers:boot_modules(all), - emqx_config:init_load(emqx_schema, <<"persistent_session_store.ds = true">>), - emqx_app:set_config_loader(?MODULE); - (_) -> - ok - end), + %% avoid inter-suite flakiness... + %% TODO: remove after other suites start to use `emx_cth_suite' + application:stop(emqx), + application:stop(emqx_durable_storage), + WorkDir = ?config(priv_dir, Config), + TCApps = emqx_cth_suite:start( + app_specs(), + #{work_dir => WorkDir} + ), + [{tc_apps, TCApps} | Config]. + +end_per_suite(Config) -> + TCApps = ?config(tc_apps, Config), + emqx_cth_suite:stop(TCApps), + ok. + +init_per_testcase(t_session_subscription_iterators, Config) -> + Cluster = cluster(), + Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}), + [{nodes, Nodes} | Config]; +init_per_testcase(_TestCase, Config) -> Config. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([]), - application:stop(emqx_durable_storage), +end_per_testcase(t_session_subscription_iterators, Config) -> + Nodes = ?config(nodes, Config), + ok = emqx_cth_cluster:stop(Nodes), + ok; +end_per_testcase(_TestCase, _Config) -> ok. t_messages_persisted(_Config) -> @@ -76,7 +90,7 @@ t_messages_persisted(_Config) -> ct:pal("Results = ~p", [Results]), - Persisted = consume(<<"local">>, {['#'], 0}), + Persisted = consume(?DS_SHARD, {['#'], 0}), ct:pal("Persisted = ~p", [Persisted]), @@ -88,6 +102,97 @@ t_messages_persisted(_Config) -> ok. +%% TODO: test quic and ws too +t_session_subscription_iterators(Config) -> + [Node1, Node2] = ?config(nodes, Config), + Port = get_mqtt_port(Node1, tcp), + Topic = <<"t/topic">>, + SubTopicFilter = <<"t/+">>, + AnotherTopic = <<"u/another-topic">>, + ClientId = <<"myclientid">>, + ?check_trace( + begin + [ + Payload1, + Payload2, + Payload3, + Payload4 + ] = lists:map( + fun(N) -> <<"hello", (integer_to_binary(N))/binary>> end, + lists:seq(1, 4) + ), + ct:pal("starting"), + {ok, Client} = emqtt:start_link([ + {port, Port}, + {clientid, ClientId}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client), + ct:pal("publishing 1"), + Message1 = emqx_message:make(Topic, Payload1), + publish(Node1, Message1), + ct:pal("subscribing 1"), + {ok, _, [2]} = emqtt:subscribe(Client, SubTopicFilter, qos2), + ct:pal("publishing 2"), + Message2 = emqx_message:make(Topic, Payload2), + publish(Node1, Message2), + [_] = receive_messages(1), + ct:pal("subscribing 2"), + {ok, _, [1]} = emqtt:subscribe(Client, SubTopicFilter, qos1), + ct:pal("publishing 3"), + Message3 = emqx_message:make(Topic, Payload3), + publish(Node1, Message3), + [_] = receive_messages(1), + ct:pal("publishing 4"), + Message4 = emqx_message:make(AnotherTopic, Payload4), + publish(Node1, Message4), + emqtt:stop(Client), + #{ + messages => [Message1, Message2, Message3, Message4] + } + end, + fun(Results, Trace) -> + ct:pal("trace:\n ~p", [Trace]), + #{ + messages := [_Message1, Message2, Message3 | _] + } = Results, + case ?of_kind(ds_session_subscription_added, Trace) of + [] -> + %% Since `emqx_durable_storage' is a dependency of `emqx', it gets + %% compiled in "prod" mode when running emqx standalone tests. + ok; + [_ | _] -> + ?assertMatch( + [ + #{?snk_kind := ds_session_subscription_added}, + #{?snk_kind := ds_session_subscription_present} + ], + ?of_kind( + [ + ds_session_subscription_added, + ds_session_subscription_present + ], + Trace + ) + ), + ok + end, + ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), + {ok, [IteratorId]} = get_all_iterator_ids(Node1), + ?assertMatch({ok, [IteratorId]}, get_all_iterator_ids(Node2)), + ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end), + ExpectedMessages = [Message2, Message3], + %% Note: it is expected that this will break after replayers are in place. + %% They might have consumed all the messages by this time. + ?assertEqual(ExpectedMessages, ReplayMessages1), + %% Different DS shard + ReplayMessages2 = erpc:call(Node2, fun() -> consume(?DS_SHARD, IteratorId) end), + ?assertEqual([], ReplayMessages2), + ok + end + ), + ok. + %% connect(ClientId, CleanStart, EI) -> @@ -103,8 +208,11 @@ connect(ClientId, CleanStart, EI) -> {ok, _} = emqtt:connect(Client), Client. -consume(Shard, Replay) -> +consume(Shard, Replay = {_TopicFiler, _StartMS}) -> {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Replay), + consume(It); +consume(Shard, IteratorId) when is_binary(IteratorId) -> + {ok, It} = emqx_ds_storage_layer:restore_iterator(Shard, IteratorId), consume(It). consume(It) -> @@ -114,3 +222,54 @@ consume(It) -> none -> [] end. + +receive_messages(Count) -> + receive_messages(Count, []). + +receive_messages(0, Msgs) -> + Msgs; +receive_messages(Count, Msgs) -> + receive + {publish, Msg} -> + receive_messages(Count - 1, [Msg | Msgs]) + after 5_000 -> + Msgs + end. + +publish(Node, Message) -> + erpc:call(Node, emqx, publish, [Message]). + +get_iterator_ids(Node, ClientId) -> + Channel = erpc:call(Node, fun() -> + [ConnPid] = emqx_cm:lookup_channels(ClientId), + sys:get_state(ConnPid) + end), + emqx_connection:info({channel, {session, iterators}}, Channel). + +app_specs() -> + [ + emqx_durable_storage, + {emqx, #{ + config => #{persistent_session_store => #{ds => true}}, + override_env => [{boot_modules, [broker, listeners]}] + }} + ]. + +cluster() -> + Node1 = persistent_messages_SUITE1, + Spec = #{ + role => core, + join_to => emqx_cth_cluster:node_name(Node1), + apps => app_specs() + }, + [ + {Node1, Spec}, + {persistent_messages_SUITE2, Spec} + ]. + +get_mqtt_port(Node, Type) -> + {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), + Port. + +get_all_iterator_ids(Node) -> + erpc:call(Node, emqx_ds_storage_layer, list_iterator_prefix, [?DS_SHARD, <<>>]). diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 0e9d3032c..ab1720754 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -20,6 +20,7 @@ -include_lib("proper/include/proper.hrl"). -include("emqx.hrl"). +-include("emqx_session.hrl"). -include("emqx_access_control.hrl"). %% High level Types @@ -132,33 +133,22 @@ clientinfo() -> sessioninfo() -> ?LET( Session, - {session, clientid(), - % id - sessionid(), - % is_persistent - boolean(), - % subscriptions - subscriptions(), - % max_subscriptions - non_neg_integer(), - % upgrade_qos - boolean(), - % emqx_inflight:inflight() - inflight(), - % emqx_mqueue:mqueue() - mqueue(), - % next_pkt_id - packet_id(), - % retry_interval - safty_timeout(), - % awaiting_rel - awaiting_rel(), - % max_awaiting_rel - non_neg_integer(), - % await_rel_timeout - safty_timeout(), - % created_at - timestamp()}, + #session{ + clientid = clientid(), + id = sessionid(), + is_persistent = boolean(), + subscriptions = subscriptions(), + max_subscriptions = non_neg_integer(), + upgrade_qos = boolean(), + inflight = inflight(), + mqueue = mqueue(), + next_pkt_id = packet_id(), + retry_interval = safty_timeout(), + awaiting_rel = awaiting_rel(), + max_awaiting_rel = non_neg_integer(), + await_rel_timeout = safty_timeout(), + created_at = timestamp() + }, emqx_session:info(Session) ). diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 9eccf8c16..889e7ea24 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -15,6 +15,9 @@ %%-------------------------------------------------------------------- -module(emqx_ds). +-include_lib("stdlib/include/ms_transform.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + %% API: -export([ensure_shard/2]). %% Messages: @@ -39,6 +42,8 @@ message_stats/0, message_store_opts/0, session_id/0, + replay/0, + replay_id/0, iterator_id/0, iterator/0, shard/0, @@ -56,7 +61,7 @@ -type iterator() :: term(). --opaque iterator_id() :: binary(). +-type iterator_id() :: binary(). %%-type session() :: #session{}. @@ -73,9 +78,17 @@ %% Timestamp %% Earliest possible timestamp is 0. -%% TODO granularity? +%% TODO granularity? Currently, we should always use micro second, as that's the unit we +%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. -type time() :: non_neg_integer(). +-type replay_id() :: binary(). + +-type replay() :: { + _TopicFilter :: emqx_topic:words(), + _StartTime :: time() +}. + %%================================================================================ %% API funcions %%================================================================================ @@ -121,23 +134,20 @@ message_stats() -> %% %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. --spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id(), [iterator_id()]}. +-spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id()}. session_open(ClientID) -> - {atomic, Ret} = - mria:transaction( - ?DS_SHARD, - fun() -> - case mnesia:read(?SESSION_TAB, ClientID) of - [#session{iterators = Iterators}] -> - {false, ClientID, Iterators}; - [] -> - Session = #session{id = ClientID, iterators = []}, - mnesia:write(?SESSION_TAB, Session, write), - {true, ClientID, []} - end + {atomic, Res} = + mria:transaction(?DS_SHARD, fun() -> + case mnesia:read(?SESSION_TAB, ClientID, write) of + [#session{}] -> + {false, ClientID}; + [] -> + Session = #session{id = ClientID}, + mnesia:write(?SESSION_TAB, Session, write), + {true, ClientID} end - ), - Ret. + end), + Res. %% @doc Called when a client reconnects with `clean session=true' or %% during session GC @@ -160,10 +170,36 @@ session_suspend(_SessionId) -> %% @doc Called when a client subscribes to a topic. Idempotent. -spec session_add_iterator(session_id(), emqx_topic:words()) -> - {ok, iterator_id()} | {error, session_not_found}. -session_add_iterator(_SessionId, _TopicFilter) -> - %% TODO - {ok, <<"">>}. + {ok, iterator_id(), time(), _IsNew :: boolean()}. +session_add_iterator(DSSessionId, TopicFilter) -> + IteratorRefId = {DSSessionId, TopicFilter}, + {atomic, Res} = + mria:transaction(?DS_SHARD, fun() -> + case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of + [] -> + {IteratorId, StartMS} = new_iterator_id(DSSessionId), + IteratorRef = #iterator_ref{ + ref_id = IteratorRefId, + it_id = IteratorId, + start_time = StartMS + }, + ok = mnesia:write(?ITERATOR_REF_TAB, IteratorRef, write), + ?tp( + ds_session_subscription_added, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = true, + {ok, IteratorId, StartMS, IsNew}; + [#iterator_ref{it_id = IteratorId, start_time = StartMS}] -> + ?tp( + ds_session_subscription_present, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = false, + {ok, IteratorId, StartMS, IsNew} + end + end), + Res. %% @doc Called when a client unsubscribes from a topic. Returns `true' %% if the session contained the subscription or `false' if it wasn't @@ -201,3 +237,9 @@ iterator_stats() -> %%================================================================================ %% Internal functions %%================================================================================ + +-spec new_iterator_id(session_id()) -> {iterator_id(), time()}. +new_iterator_id(DSSessionId) -> + NowMS = erlang:system_time(microsecond), + IteratorId = <>, + {IteratorId, NowMS}. diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index 216e979ee..cbcdb0b8c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -25,7 +25,18 @@ init_mnesia() -> {record_name, session}, {attributes, record_info(fields, session)} ] - ). + ), + ok = mria:create_table( + ?ITERATOR_REF_TAB, + [ + {rlog_shard, ?DS_SHARD}, + {type, ordered_set}, + {storage, storage()}, + {record_name, iterator_ref}, + {attributes, record_info(fields, iterator_ref)} + ] + ), + ok. storage() -> case mria:rocksdb_backend_available() of diff --git a/apps/emqx_durable_storage/src/emqx_ds_int.hrl b/apps/emqx_durable_storage/src/emqx_ds_int.hrl index 96688ede6..47493bd0b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_int.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_int.hrl @@ -17,11 +17,20 @@ -define(EMQX_DS_HRL, true). -define(SESSION_TAB, emqx_ds_session). +-define(ITERATOR_REF_TAB, emqx_ds_iterator_ref). -define(DS_SHARD, emqx_ds_shard). -record(session, { + %% same as clientid id :: emqx_ds:session_id(), - iterators :: [{emqx_topic:words(), emqx_ds:iterator_id()}] + %% for future usage + props = #{} :: map() +}). + +-record(iterator_ref, { + ref_id :: {emqx_ds:session_id(), emqx_topic:words()}, + it_id :: emqx_ds:iterator_id(), + start_time :: emqx_ds:time() }). -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replay.erl b/apps/emqx_durable_storage/src/emqx_ds_replay.erl deleted file mode 100644 index a66cee7fd..000000000 --- a/apps/emqx_durable_storage/src/emqx_ds_replay.erl +++ /dev/null @@ -1,36 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- --module(emqx_ds_replay). - -%% API: --export([]). - --export_type([replay_id/0, replay/0]). - -%%================================================================================ -%% Type declarations -%%================================================================================ - --type replay_id() :: binary(). - --type replay() :: { - _TopicFilter :: emqx_ds:topic(), - _StartTime :: emqx_ds:time() -}. - -%%================================================================================ -%% API funcions -%%================================================================================ - -%%================================================================================ -%% behaviour callbacks -%%================================================================================ - -%%================================================================================ -%% Internal exports -%%================================================================================ - -%%================================================================================ -%% Internal functions -%%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 017423b02..47c29e170 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -13,7 +13,15 @@ -export([make_iterator/2, next/1]). --export([preserve_iterator/2, restore_iterator/2, discard_iterator/2]). +-export([ + preserve_iterator/2, + restore_iterator/2, + discard_iterator/2, + ensure_iterator/3, + discard_iterator_prefix/2, + list_iterator_prefix/2, + foldl_iterator_prefix/4 +]). %% behaviour callbacks: -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -160,10 +168,10 @@ next(It = #it{module = Mod, data = ItData}) -> end end. --spec preserve_iterator(iterator(), emqx_ds:replay_id()) -> +-spec preserve_iterator(iterator(), emqx_ds:iterator_id()) -> ok | {error, _TODO}. -preserve_iterator(It = #it{}, ReplayID) -> - iterator_put_state(ReplayID, It). +preserve_iterator(It = #it{}, IteratorID) -> + iterator_put_state(IteratorID, It). -spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> {ok, iterator()} | {error, _TODO}. @@ -177,11 +185,50 @@ restore_iterator(Shard, ReplayID) -> Error end. +-spec ensure_iterator(emqx_ds:shard(), emqx_ds:iterator_id(), emqx_ds:replay()) -> + {ok, iterator()} | {error, _TODO}. +ensure_iterator(Shard, IteratorID, Replay = {_TopicFilter, _StartMS}) -> + case restore_iterator(Shard, IteratorID) of + {ok, It} -> + {ok, It}; + {error, not_found} -> + {ok, It} = make_iterator(Shard, Replay), + ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), + {ok, It}; + Error -> + Error + end. + -spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> ok | {error, _TODO}. discard_iterator(Shard, ReplayID) -> iterator_delete(Shard, ReplayID). +-spec discard_iterator_prefix(emqx_ds:shard(), binary()) -> + ok | {error, _TODO}. +discard_iterator_prefix(Shard, KeyPrefix) -> + case do_discard_iterator_prefix(Shard, KeyPrefix) of + {ok, _} -> ok; + Error -> Error + end. + +-spec list_iterator_prefix( + emqx_ds:shard(), + binary() +) -> {ok, [emqx_ds:iterator_id()]} | {error, _TODO}. +list_iterator_prefix(Shard, KeyPrefix) -> + do_list_iterator_prefix(Shard, KeyPrefix). + +-spec foldl_iterator_prefix( + emqx_ds:shard(), + binary(), + fun((_Key :: binary(), _Value :: binary(), Acc) -> Acc), + Acc +) -> {ok, Acc} | {error, _TODO} when + Acc :: term(). +foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> + do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc). + %%================================================================================ %% behaviour callbacks %%================================================================================ @@ -344,7 +391,11 @@ open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay}, %% --define(KEY_REPLAY_STATE(ReplayID), <<(ReplayID)/binary, "rs">>). +-define(KEY_REPLAY_STATE(IteratorId), <<(IteratorId)/binary, "rs">>). +-define(KEY_REPLAY_STATE_PAT(KeyReplayState), begin + <> = (KeyReplayState), + IteratorId +end). -define(ITERATION_WRITE_OPTS, []). -define(ITERATION_READ_OPTS, []). @@ -391,6 +442,44 @@ restore_iterator_state( It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}}, open_restore_iterator(meta_get_gen(Shard, Gen), It, State). +do_list_iterator_prefix(Shard, KeyPrefix) -> + Fn = fun(K0, _V, Acc) -> + K = ?KEY_REPLAY_STATE_PAT(K0), + [K | Acc] + end, + do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, []). + +do_discard_iterator_prefix(Shard, KeyPrefix) -> + #db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db), + Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end, + do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, ok). + +do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), + case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of + {ok, It} -> + NextAction = {seek, KeyPrefix}, + do_foldl_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction, Fn, Acc); + Error -> + Error + end. + +do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction, Fn, Acc) -> + case rocksdb:iterator_move(It, NextAction) of + {ok, K = <>, V} -> + NewAcc = Fn(K, V, Acc), + do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, next, Fn, NewAcc); + {ok, _K, _V} -> + ok = rocksdb:iterator_close(It), + {ok, Acc}; + {error, invalid_iterator} -> + ok = rocksdb:iterator_close(It), + {ok, Acc}; + Error -> + ok = rocksdb:iterator_close(It), + Error + end. + %% Functions for dealing with the metadata stored persistently in rocksdb -define(CURRENT_GEN, <<"current">>). diff --git a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src index ecf9dd270..367ade691 100644 --- a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src +++ b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src @@ -2,7 +2,7 @@ {application, emqx_durable_storage, [ {description, "Message persistence and subscription replays for EMQX"}, % strict semver, bump manually! - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, rocksdb, gproc, mria]}, diff --git a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl index 59668ca01..e9daf2581 100644 --- a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl +++ b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl @@ -14,6 +14,8 @@ -opaque t() :: ets:tid(). +-export_type([t/0]). + -spec open() -> t(). open() -> ets:new(?MODULE, [ordered_set, {keypos, 1}]). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 18ac65ae6..d9b6d9bd5 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -927,7 +927,7 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) -> retry_interval, upgrade_qos, zone, - %% sessionID, defined in emqx_session.erl + %% session_id, defined in emqx_session.erl id ], TimesKeys = [created_at, connected_at, disconnected_at], diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index b83a46903..b677da2b2 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -525,7 +525,6 @@ do_import_conf(RawConf, Opts) -> Errors = lists:foldr( fun(Module, ErrorsAcc) -> - Module:import_config(RawConf), case Module:import_config(RawConf) of {ok, #{changed := Changed}} -> maybe_print_changed(Changed, Opts), diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl index f9b9ef766..381862995 100644 --- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl @@ -440,8 +440,8 @@ create_test_tab(Attributes) -> apps_to_start() -> [ - {emqx_conf, "dashboard.listeners.http.bind = 0"}, {emqx, #{override_env => [{boot_modules, [broker, router]}]}}, + {emqx_conf, #{config => #{dashboard => #{listeners => #{http => #{bind => <<"0">>}}}}}}, emqx_psk, emqx_management, emqx_dashboard,