Merge pull request #11362 from thalesmg/ds-handle-sub-20230725
feat(ds): open iterators when handling `SUBSCRIBE` packets
This commit is contained in:
commit
dec21ffc95
|
@ -23,7 +23,6 @@
|
||||||
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
|
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
|
||||||
-define(CM_SHARD, emqx_cm_shard).
|
-define(CM_SHARD, emqx_cm_shard).
|
||||||
-define(ROUTE_SHARD, route_shard).
|
-define(ROUTE_SHARD, route_shard).
|
||||||
-define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard).
|
|
||||||
|
|
||||||
%% Banner
|
%% Banner
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -92,7 +91,7 @@
|
||||||
|
|
||||||
-record(route, {
|
-record(route, {
|
||||||
topic :: binary(),
|
topic :: binary(),
|
||||||
dest :: node() | {binary(), node()} | emqx_session:sessionID()
|
dest :: node() | {binary(), node()} | emqx_session:session_id()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-ifndef(EMQX_SESSION_HRL).
|
||||||
|
-define(EMQX_SESSION_HRL, true).
|
||||||
|
|
||||||
|
-record(session, {
|
||||||
|
%% Client's id
|
||||||
|
clientid :: emqx_types:clientid(),
|
||||||
|
id :: emqx_session:session_id(),
|
||||||
|
%% Is this session a persistent session i.e. was it started with Session-Expiry > 0
|
||||||
|
is_persistent :: boolean(),
|
||||||
|
%% Client’s Subscriptions.
|
||||||
|
subscriptions :: map(),
|
||||||
|
%% Max subscriptions allowed
|
||||||
|
max_subscriptions :: non_neg_integer() | infinity,
|
||||||
|
%% Upgrade QoS?
|
||||||
|
upgrade_qos :: boolean(),
|
||||||
|
%% Client <- Broker: QoS1/2 messages sent to the client but
|
||||||
|
%% have not been unacked.
|
||||||
|
inflight :: emqx_inflight:inflight(),
|
||||||
|
%% All QoS1/2 messages published to when client is disconnected,
|
||||||
|
%% or QoS1/2 messages pending transmission to the Client.
|
||||||
|
%%
|
||||||
|
%% Optionally, QoS0 messages pending transmission to the Client.
|
||||||
|
mqueue :: emqx_mqueue:mqueue(),
|
||||||
|
%% Next packet id of the session
|
||||||
|
next_pkt_id = 1 :: emqx_types:packet_id(),
|
||||||
|
%% Retry interval for redelivering QoS1/2 messages (Unit: millisecond)
|
||||||
|
retry_interval :: timeout(),
|
||||||
|
%% Client -> Broker: QoS2 messages received from the client, but
|
||||||
|
%% have not been completely acknowledged
|
||||||
|
awaiting_rel :: map(),
|
||||||
|
%% Maximum number of awaiting QoS2 messages allowed
|
||||||
|
max_awaiting_rel :: non_neg_integer() | infinity,
|
||||||
|
%% Awaiting PUBREL Timeout (Unit: millisecond)
|
||||||
|
await_rel_timeout :: timeout(),
|
||||||
|
%% Created at
|
||||||
|
created_at :: pos_integer()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-endif.
|
|
@ -0,0 +1,210 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_ds_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
|
-define(DS_SHARD, <<"local">>).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% CT boilerplate
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
TCApps = emqx_cth_suite:start(
|
||||||
|
app_specs(),
|
||||||
|
#{work_dir => ?config(priv_dir, Config)}
|
||||||
|
),
|
||||||
|
[{tc_apps, TCApps} | Config].
|
||||||
|
|
||||||
|
end_per_suite(Config) ->
|
||||||
|
TCApps = ?config(tc_apps, Config),
|
||||||
|
emqx_cth_suite:stop(TCApps),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
init_per_testcase(t_session_subscription_idempotency, Config) ->
|
||||||
|
Cluster = cluster(#{n => 1}),
|
||||||
|
ClusterOpts = #{work_dir => ?config(priv_dir, Config)},
|
||||||
|
NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts),
|
||||||
|
Nodes = emqx_cth_cluster:start(Cluster, ClusterOpts),
|
||||||
|
[
|
||||||
|
{cluster, Cluster},
|
||||||
|
{node_specs, NodeSpecs},
|
||||||
|
{cluster_opts, ClusterOpts},
|
||||||
|
{nodes, Nodes}
|
||||||
|
| Config
|
||||||
|
];
|
||||||
|
init_per_testcase(_TestCase, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(t_session_subscription_idempotency, Config) ->
|
||||||
|
Nodes = ?config(nodes, Config),
|
||||||
|
ok = emqx_cth_cluster:stop(Nodes),
|
||||||
|
ok;
|
||||||
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Helper fns
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
cluster(#{n := N}) ->
|
||||||
|
Node1 = ds_SUITE1,
|
||||||
|
Spec = #{
|
||||||
|
role => core,
|
||||||
|
join_to => emqx_cth_cluster:node_name(Node1),
|
||||||
|
apps => app_specs()
|
||||||
|
},
|
||||||
|
[
|
||||||
|
{Node1, Spec}
|
||||||
|
| lists:map(
|
||||||
|
fun(M) ->
|
||||||
|
Name = binary_to_atom(<<"ds_SUITE", (integer_to_binary(M))/binary>>),
|
||||||
|
{Name, Spec}
|
||||||
|
end,
|
||||||
|
lists:seq(2, N)
|
||||||
|
)
|
||||||
|
].
|
||||||
|
|
||||||
|
app_specs() ->
|
||||||
|
[
|
||||||
|
emqx_durable_storage,
|
||||||
|
{emqx, #{
|
||||||
|
before_start => fun() ->
|
||||||
|
emqx_app:set_config_loader(?MODULE)
|
||||||
|
end,
|
||||||
|
config => #{persistent_session_store => #{ds => true}},
|
||||||
|
override_env => [{boot_modules, [broker, listeners]}]
|
||||||
|
}}
|
||||||
|
].
|
||||||
|
|
||||||
|
get_mqtt_port(Node, Type) ->
|
||||||
|
{_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
|
||||||
|
Port.
|
||||||
|
|
||||||
|
get_all_iterator_ids(Node) ->
|
||||||
|
Fn = fun(K, _V, Acc) -> [K | Acc] end,
|
||||||
|
erpc:call(Node, fun() ->
|
||||||
|
emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, [])
|
||||||
|
end).
|
||||||
|
|
||||||
|
wait_nodeup(Node) ->
|
||||||
|
?retry(
|
||||||
|
_Sleep0 = 500,
|
||||||
|
_Attempts0 = 50,
|
||||||
|
pong = net_adm:ping(Node)
|
||||||
|
).
|
||||||
|
|
||||||
|
wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) ->
|
||||||
|
#{override_env := Env} = proplists:get_value(gen_rpc, Apps),
|
||||||
|
Port = proplists:get_value(tcp_server_port, Env),
|
||||||
|
?retry(
|
||||||
|
_Sleep0 = 500,
|
||||||
|
_Attempts0 = 50,
|
||||||
|
false = emqx_common_test_helpers:is_tcp_server_available("127.0.0.1", Port)
|
||||||
|
).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Testcases
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_session_subscription_idempotency(Config) ->
|
||||||
|
[Node1Spec | _] = ?config(node_specs, Config),
|
||||||
|
[Node1] = ?config(nodes, Config),
|
||||||
|
Port = get_mqtt_port(Node1, tcp),
|
||||||
|
SubTopicFilter = <<"t/+">>,
|
||||||
|
ClientId = <<"myclientid">>,
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
?force_ordering(
|
||||||
|
#{?snk_kind := persistent_session_ds_iterator_added},
|
||||||
|
_NEvents0 = 1,
|
||||||
|
#{?snk_kind := will_restart_node},
|
||||||
|
_Guard0 = true
|
||||||
|
),
|
||||||
|
?force_ordering(
|
||||||
|
#{?snk_kind := restarted_node},
|
||||||
|
_NEvents1 = 1,
|
||||||
|
#{?snk_kind := persistent_session_ds_open_iterators, ?snk_span := start},
|
||||||
|
_Guard1 = true
|
||||||
|
),
|
||||||
|
|
||||||
|
spawn_link(fun() ->
|
||||||
|
?tp(will_restart_node, #{}),
|
||||||
|
ct:pal("restarting node ~p", [Node1]),
|
||||||
|
true = monitor_node(Node1, true),
|
||||||
|
ok = erpc:call(Node1, init, restart, []),
|
||||||
|
receive
|
||||||
|
{nodedown, Node1} ->
|
||||||
|
ok
|
||||||
|
after 10_000 ->
|
||||||
|
ct:fail("node ~p didn't stop", [Node1])
|
||||||
|
end,
|
||||||
|
ct:pal("waiting for nodeup ~p", [Node1]),
|
||||||
|
wait_nodeup(Node1),
|
||||||
|
wait_gen_rpc_down(Node1Spec),
|
||||||
|
ct:pal("restarting apps on ~p", [Node1]),
|
||||||
|
Apps = maps:get(apps, Node1Spec),
|
||||||
|
ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]),
|
||||||
|
_ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]),
|
||||||
|
%% have to re-inject this so that we may stop the node succesfully at the
|
||||||
|
%% end....
|
||||||
|
ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec),
|
||||||
|
ct:pal("node ~p restarted", [Node1]),
|
||||||
|
?tp(restarted_node, #{}),
|
||||||
|
ok
|
||||||
|
end),
|
||||||
|
|
||||||
|
ct:pal("starting 1"),
|
||||||
|
{ok, Client0} = emqtt:start_link([
|
||||||
|
{port, Port},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{proto_ver, v5}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:connect(Client0),
|
||||||
|
ct:pal("subscribing 1"),
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
catch emqtt:subscribe(Client0, SubTopicFilter, qos2),
|
||||||
|
receive
|
||||||
|
{'EXIT', {shutdown, _}} ->
|
||||||
|
ok
|
||||||
|
after 0 -> ok
|
||||||
|
end,
|
||||||
|
process_flag(trap_exit, false),
|
||||||
|
|
||||||
|
{ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
|
||||||
|
ct:pal("starting 2"),
|
||||||
|
{ok, Client1} = emqtt:start_link([
|
||||||
|
{port, Port},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{proto_ver, v5}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:connect(Client1),
|
||||||
|
ct:pal("subscribing 2"),
|
||||||
|
{ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
|
||||||
|
|
||||||
|
ok = emqtt:stop(Client1),
|
||||||
|
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
ct:pal("trace:\n ~p", [Trace]),
|
||||||
|
%% Exactly one iterator should have been opened.
|
||||||
|
?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
|
||||||
|
?assertMatch(
|
||||||
|
{_IsNew = false, ClientId},
|
||||||
|
erpc:call(Node1, emqx_ds, session_open, [ClientId])
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok.
|
|
@ -41,6 +41,7 @@
|
||||||
{emqx_node_rebalance_evacuation,1}.
|
{emqx_node_rebalance_evacuation,1}.
|
||||||
{emqx_node_rebalance_status,1}.
|
{emqx_node_rebalance_status,1}.
|
||||||
{emqx_persistent_session,1}.
|
{emqx_persistent_session,1}.
|
||||||
|
{emqx_persistent_session_ds,1}.
|
||||||
{emqx_plugins,1}.
|
{emqx_plugins,1}.
|
||||||
{emqx_prometheus,1}.
|
{emqx_prometheus,1}.
|
||||||
{emqx_resource,1}.
|
{emqx_resource,1}.
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("emqx_cm.hrl").
|
-include("emqx_cm.hrl").
|
||||||
|
-include("emqx_session.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
-include("types.hrl").
|
-include("types.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
@ -301,7 +302,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||||
|
|
||||||
create_session(ClientInfo, ConnInfo) ->
|
create_session(ClientInfo, ConnInfo) ->
|
||||||
Options = get_session_confs(ClientInfo, ConnInfo),
|
Options = get_session_confs(ClientInfo, ConnInfo),
|
||||||
Session = emqx_session:init(Options),
|
Session = emqx_session:init_and_open(Options),
|
||||||
ok = emqx_metrics:inc('session.created'),
|
ok = emqx_metrics:inc('session.created'),
|
||||||
ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]),
|
ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]),
|
||||||
Session.
|
Session.
|
||||||
|
|
|
@ -16,15 +16,24 @@
|
||||||
|
|
||||||
-module(emqx_persistent_session_ds).
|
-module(emqx_persistent_session_ds).
|
||||||
|
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-export([init/0]).
|
-export([init/0]).
|
||||||
|
|
||||||
-export([persist_message/1]).
|
-export([
|
||||||
|
persist_message/1,
|
||||||
|
open_session/1,
|
||||||
|
add_subscription/2
|
||||||
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
serialize_message/1,
|
serialize_message/1,
|
||||||
deserialize_message/1
|
deserialize_message/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% RPC
|
||||||
|
-export([do_open_iterator/3]).
|
||||||
|
|
||||||
%% FIXME
|
%% FIXME
|
||||||
-define(DS_SHARD, <<"local">>).
|
-define(DS_SHARD, <<"local">>).
|
||||||
|
|
||||||
|
@ -72,6 +81,55 @@ store_message(Msg) ->
|
||||||
find_subscribers(_Msg) ->
|
find_subscribers(_Msg) ->
|
||||||
[node()].
|
[node()].
|
||||||
|
|
||||||
|
open_session(ClientID) ->
|
||||||
|
?WHEN_ENABLED(emqx_ds:session_open(ClientID)).
|
||||||
|
|
||||||
|
-spec add_subscription(emqx_types:topic(), emqx_ds:session_id()) ->
|
||||||
|
{ok, emqx_ds:iterator_id(), IsNew :: boolean()} | {skipped, disabled}.
|
||||||
|
add_subscription(TopicFilterBin, DSSessionID) ->
|
||||||
|
?WHEN_ENABLED(
|
||||||
|
begin
|
||||||
|
TopicFilter = emqx_topic:words(TopicFilterBin),
|
||||||
|
{ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator(
|
||||||
|
DSSessionID, TopicFilter
|
||||||
|
),
|
||||||
|
Ctx = #{
|
||||||
|
iterator_id => IteratorID,
|
||||||
|
start_time => StartMS,
|
||||||
|
is_new => IsNew
|
||||||
|
},
|
||||||
|
?tp(persistent_session_ds_iterator_added, Ctx),
|
||||||
|
?tp_span(
|
||||||
|
persistent_session_ds_open_iterators,
|
||||||
|
Ctx,
|
||||||
|
ok = open_iterator_on_all_shards(TopicFilter, StartMS, IteratorID)
|
||||||
|
),
|
||||||
|
{ok, IteratorID, IsNew}
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec open_iterator_on_all_shards(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok.
|
||||||
|
open_iterator_on_all_shards(TopicFilter, StartMS, IteratorID) ->
|
||||||
|
?tp(persistent_session_ds_will_open_iterators, #{
|
||||||
|
iterator_id => IteratorID,
|
||||||
|
start_time => StartMS
|
||||||
|
}),
|
||||||
|
%% Note: currently, shards map 1:1 to nodes, but this will change in the future.
|
||||||
|
Nodes = emqx:running_nodes(),
|
||||||
|
Results = emqx_persistent_session_ds_proto_v1:open_iterator(
|
||||||
|
Nodes, TopicFilter, StartMS, IteratorID
|
||||||
|
),
|
||||||
|
%% TODO: handle errors
|
||||||
|
true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% RPC target.
|
||||||
|
-spec do_open_iterator(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok.
|
||||||
|
do_open_iterator(TopicFilter, StartMS, IteratorID) ->
|
||||||
|
Replay = {TopicFilter, StartMS},
|
||||||
|
{ok, _It} = emqx_ds_storage_layer:ensure_iterator(?DS_SHARD, IteratorID, Replay),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
serialize_message(Msg) ->
|
serialize_message(Msg) ->
|
||||||
|
|
|
@ -44,6 +44,7 @@
|
||||||
-module(emqx_session).
|
-module(emqx_session).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
-include("emqx_session.hrl").
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
-include("types.hrl").
|
-include("types.hrl").
|
||||||
|
@ -59,7 +60,7 @@
|
||||||
unpersist/1
|
unpersist/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([init/1]).
|
-export([init/1, init_and_open/1]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
info/1,
|
info/1,
|
||||||
|
@ -101,49 +102,13 @@
|
||||||
%% Export for CT
|
%% Export for CT
|
||||||
-export([set_field/3]).
|
-export([set_field/3]).
|
||||||
|
|
||||||
-type sessionID() :: emqx_guid:guid().
|
-type session_id() :: emqx_guid:guid().
|
||||||
|
|
||||||
-export_type([
|
-export_type([
|
||||||
session/0,
|
session/0,
|
||||||
sessionID/0
|
session_id/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-record(session, {
|
|
||||||
%% Client's id
|
|
||||||
clientid :: emqx_types:clientid(),
|
|
||||||
id :: sessionID(),
|
|
||||||
%% Is this session a persistent session i.e. was it started with Session-Expiry > 0
|
|
||||||
is_persistent :: boolean(),
|
|
||||||
%% Client’s Subscriptions.
|
|
||||||
subscriptions :: map(),
|
|
||||||
%% Max subscriptions allowed
|
|
||||||
max_subscriptions :: non_neg_integer() | infinity,
|
|
||||||
%% Upgrade QoS?
|
|
||||||
upgrade_qos :: boolean(),
|
|
||||||
%% Client <- Broker: QoS1/2 messages sent to the client but
|
|
||||||
%% have not been unacked.
|
|
||||||
inflight :: emqx_inflight:inflight(),
|
|
||||||
%% All QoS1/2 messages published to when client is disconnected,
|
|
||||||
%% or QoS1/2 messages pending transmission to the Client.
|
|
||||||
%%
|
|
||||||
%% Optionally, QoS0 messages pending transmission to the Client.
|
|
||||||
mqueue :: emqx_mqueue:mqueue(),
|
|
||||||
%% Next packet id of the session
|
|
||||||
next_pkt_id = 1 :: emqx_types:packet_id(),
|
|
||||||
%% Retry interval for redelivering QoS1/2 messages (Unit: millisecond)
|
|
||||||
retry_interval :: timeout(),
|
|
||||||
%% Client -> Broker: QoS2 messages received from the client, but
|
|
||||||
%% have not been completely acknowledged
|
|
||||||
awaiting_rel :: map(),
|
|
||||||
%% Maximum number of awaiting QoS2 messages allowed
|
|
||||||
max_awaiting_rel :: non_neg_integer() | infinity,
|
|
||||||
%% Awaiting PUBREL Timeout (Unit: millisecond)
|
|
||||||
await_rel_timeout :: timeout(),
|
|
||||||
%% Created at
|
|
||||||
created_at :: pos_integer()
|
|
||||||
%% Message deliver latency stats
|
|
||||||
}).
|
|
||||||
|
|
||||||
-type inflight_data_phase() :: wait_ack | wait_comp.
|
-type inflight_data_phase() :: wait_ack | wait_comp.
|
||||||
|
|
||||||
-record(inflight_data, {
|
-record(inflight_data, {
|
||||||
|
@ -201,6 +166,13 @@
|
||||||
%% Init a Session
|
%% Init a Session
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec init_and_open(options()) -> session().
|
||||||
|
init_and_open(Options) ->
|
||||||
|
#{clientid := ClientID} = Options,
|
||||||
|
Session0 = emqx_session:init(Options),
|
||||||
|
_ = emqx_persistent_session_ds:open_session(ClientID),
|
||||||
|
Session0.
|
||||||
|
|
||||||
-spec init(options()) -> session().
|
-spec init(options()) -> session().
|
||||||
init(Opts) ->
|
init(Opts) ->
|
||||||
MaxInflight = maps:get(max_inflight, Opts),
|
MaxInflight = maps:get(max_inflight, Opts),
|
||||||
|
@ -324,11 +296,13 @@ subscribe(
|
||||||
case IsNew andalso is_subscriptions_full(Session) of
|
case IsNew andalso is_subscriptions_full(Session) of
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts),
|
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts),
|
||||||
|
Session1 = Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)},
|
||||||
|
Session2 = add_persistent_subscription(TopicFilter, ClientId, Session1),
|
||||||
ok = emqx_hooks:run(
|
ok = emqx_hooks:run(
|
||||||
'session.subscribed',
|
'session.subscribed',
|
||||||
[ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}]
|
[ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}]
|
||||||
),
|
),
|
||||||
{ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}};
|
{ok, Session2};
|
||||||
true ->
|
true ->
|
||||||
{error, ?RC_QUOTA_EXCEEDED}
|
{error, ?RC_QUOTA_EXCEEDED}
|
||||||
end.
|
end.
|
||||||
|
@ -341,6 +315,12 @@ is_subscriptions_full(#session{
|
||||||
}) ->
|
}) ->
|
||||||
maps:size(Subs) >= MaxLimit.
|
maps:size(Subs) >= MaxLimit.
|
||||||
|
|
||||||
|
-spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) ->
|
||||||
|
session().
|
||||||
|
add_persistent_subscription(TopicFilterBin, ClientId, Session) ->
|
||||||
|
_ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId),
|
||||||
|
Session.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Client -> Broker: UNSUBSCRIBE
|
%% Client -> Broker: UNSUBSCRIBE
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
-include("types.hrl").
|
-include("types.hrl").
|
||||||
|
-include("persistent_session/emqx_persistent_session.hrl").
|
||||||
|
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
|
|
|
@ -115,10 +115,10 @@ storage_backend() ->
|
||||||
%% Session message ADT API
|
%% Session message ADT API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec session_message_info('timestamp' | 'sessionID', sess_msg_key()) -> term().
|
-spec session_message_info('timestamp' | 'session_id', sess_msg_key()) -> term().
|
||||||
session_message_info(timestamp, {_, <<>>, <<TS:64>>, ?ABANDONED}) -> TS;
|
session_message_info(timestamp, {_, <<>>, <<TS:64>>, ?ABANDONED}) -> TS;
|
||||||
session_message_info(timestamp, {_, GUID, _, _}) -> emqx_guid:timestamp(GUID);
|
session_message_info(timestamp, {_, GUID, _, _}) -> emqx_guid:timestamp(GUID);
|
||||||
session_message_info(sessionID, {SessionID, _, _, _}) -> SessionID.
|
session_message_info(session_id, {SessionID, _, _, _}) -> SessionID.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% DB API
|
%% DB API
|
||||||
|
@ -243,7 +243,7 @@ discard_opt(true, ClientID, Session) ->
|
||||||
emqx_session_router:delete_routes(SessionID, Subscriptions),
|
emqx_session_router:delete_routes(SessionID, Subscriptions),
|
||||||
emqx_session:set_field(is_persistent, false, Session).
|
emqx_session:set_field(is_persistent, false, Session).
|
||||||
|
|
||||||
-spec mark_resume_begin(emqx_session:sessionID()) -> emqx_guid:guid().
|
-spec mark_resume_begin(emqx_session:session_id()) -> emqx_guid:guid().
|
||||||
mark_resume_begin(SessionID) ->
|
mark_resume_begin(SessionID) ->
|
||||||
MarkerID = emqx_guid:gen(),
|
MarkerID = emqx_guid:gen(),
|
||||||
put_session_message({SessionID, MarkerID, <<>>, ?MARKER}),
|
put_session_message({SessionID, MarkerID, <<>>, ?MARKER}),
|
||||||
|
@ -396,12 +396,12 @@ do_mark_as_delivered(SessionID, [{deliver, STopic, Msg} | Left]) ->
|
||||||
do_mark_as_delivered(_SessionID, []) ->
|
do_mark_as_delivered(_SessionID, []) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec pending(emqx_session:sessionID()) ->
|
-spec pending(emqx_session:session_id()) ->
|
||||||
[{emqx_types:message(), STopic :: binary()}].
|
[{emqx_types:message(), STopic :: binary()}].
|
||||||
pending(SessionID) ->
|
pending(SessionID) ->
|
||||||
pending_messages_in_db(SessionID, []).
|
pending_messages_in_db(SessionID, []).
|
||||||
|
|
||||||
-spec pending(emqx_session:sessionID(), MarkerIDs :: [emqx_guid:guid()]) ->
|
-spec pending(emqx_session:session_id(), MarkerIDs :: [emqx_guid:guid()]) ->
|
||||||
[{emqx_types:message(), STopic :: binary()}].
|
[{emqx_types:message(), STopic :: binary()}].
|
||||||
pending(SessionID, MarkerIds) ->
|
pending(SessionID, MarkerIds) ->
|
||||||
%% TODO: Handle lost MarkerIDs
|
%% TODO: Handle lost MarkerIDs
|
||||||
|
@ -460,8 +460,8 @@ read_pending_msgs([], Acc) ->
|
||||||
lists:reverse(Acc).
|
lists:reverse(Acc).
|
||||||
|
|
||||||
%% The keys are ordered by
|
%% The keys are ordered by
|
||||||
%% {sessionID(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired).
|
%% {session_id(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired).
|
||||||
%% {sessionID(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER}
|
%% {session_id(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER}
|
||||||
%% where
|
%% where
|
||||||
%% <<>> < emqx_guid:guid()
|
%% <<>> < emqx_guid:guid()
|
||||||
%% <<>> < bin_timestamp()
|
%% <<>> < bin_timestamp()
|
||||||
|
@ -491,7 +491,7 @@ pending_messages({SessionID, PrevMsgId, PrevSTopic, PrevTag} = PrevKey, Acc, Mar
|
||||||
false -> pending_messages(Key, Acc, MarkerIds);
|
false -> pending_messages(Key, Acc, MarkerIds);
|
||||||
true -> pending_messages(Key, [{PrevMsgId, PrevSTopic} | Acc], MarkerIds)
|
true -> pending_messages(Key, [{PrevMsgId, PrevSTopic} | Acc], MarkerIds)
|
||||||
end;
|
end;
|
||||||
%% Next sessionID or '$end_of_table'
|
%% Next session_id or '$end_of_table'
|
||||||
_What ->
|
_What ->
|
||||||
case PrevTag =:= ?UNDELIVERED of
|
case PrevTag =:= ?UNDELIVERED of
|
||||||
false -> {lists:reverse(Acc), MarkerIds};
|
false -> {lists:reverse(Acc), MarkerIds};
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard).
|
||||||
|
|
||||||
-record(session_store, {
|
-record(session_store, {
|
||||||
client_id :: binary(),
|
client_id :: binary(),
|
||||||
expiry_interval :: non_neg_integer(),
|
expiry_interval :: non_neg_integer(),
|
||||||
|
|
|
@ -56,6 +56,7 @@ start_link() ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
|
mria_rlog:ensure_shard(?PERSISTENT_SESSION_SHARD),
|
||||||
{ok, start_message_gc_timer(start_session_gc_timer(#{}))}.
|
{ok, start_message_gc_timer(start_session_gc_timer(#{}))}.
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call(_Request, _From, State) ->
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_persistent_session_ds_proto_v1).
|
||||||
|
|
||||||
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
introduced_in/0,
|
||||||
|
|
||||||
|
open_iterator/4
|
||||||
|
]).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
|
||||||
|
-define(TIMEOUT, 30_000).
|
||||||
|
|
||||||
|
introduced_in() ->
|
||||||
|
%% FIXME
|
||||||
|
"5.3.0".
|
||||||
|
|
||||||
|
-spec open_iterator(
|
||||||
|
[node()],
|
||||||
|
emqx_topic:words(),
|
||||||
|
emqx_ds:time(),
|
||||||
|
emqx_ds:iterator_id()
|
||||||
|
) ->
|
||||||
|
emqx_rpc:erpc_multicall(ok).
|
||||||
|
open_iterator(Nodes, TopicFilter, StartMS, IteratorID) ->
|
||||||
|
erpc:multicall(
|
||||||
|
Nodes,
|
||||||
|
emqx_persistent_session_ds,
|
||||||
|
do_open_iterator,
|
||||||
|
[TopicFilter, StartMS, IteratorID],
|
||||||
|
?TIMEOUT
|
||||||
|
).
|
|
@ -41,6 +41,7 @@ init_per_suite(Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
|
emqx_config:erase_all(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_testcase(TestCase, Config) when
|
init_per_testcase(TestCase, Config) when
|
||||||
|
|
|
@ -17,9 +17,11 @@
|
||||||
-module(emqx_cth_cluster).
|
-module(emqx_cth_cluster).
|
||||||
|
|
||||||
-export([start/2]).
|
-export([start/2]).
|
||||||
-export([stop/1]).
|
-export([stop/1, stop_node/1]).
|
||||||
|
|
||||||
-export([share_load_module/2]).
|
-export([share_load_module/2]).
|
||||||
|
-export([node_name/1, mk_nodespecs/2]).
|
||||||
|
-export([start_apps/2, set_node_opts/2]).
|
||||||
|
|
||||||
-define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
|
-define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
|
||||||
|
|
||||||
|
@ -83,7 +85,7 @@ when
|
||||||
}.
|
}.
|
||||||
start(Nodes, ClusterOpts) ->
|
start(Nodes, ClusterOpts) ->
|
||||||
NodeSpecs = mk_nodespecs(Nodes, ClusterOpts),
|
NodeSpecs = mk_nodespecs(Nodes, ClusterOpts),
|
||||||
ct:pal("Starting cluster: ~p", [NodeSpecs]),
|
ct:pal("Starting cluster:\n ~p", [NodeSpecs]),
|
||||||
% 1. Start bare nodes with only basic applications running
|
% 1. Start bare nodes with only basic applications running
|
||||||
_ = emqx_utils:pmap(fun start_node_init/1, NodeSpecs, ?TIMEOUT_NODE_START_MS),
|
_ = emqx_utils:pmap(fun start_node_init/1, NodeSpecs, ?TIMEOUT_NODE_START_MS),
|
||||||
% 2. Start applications needed to enable clustering
|
% 2. Start applications needed to enable clustering
|
||||||
|
@ -237,6 +239,8 @@ default_appspec(emqx_conf, Spec, _NodeSpecs) ->
|
||||||
listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)
|
listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
default_appspec(emqx, Spec, _NodeSpecs) ->
|
||||||
|
#{config => #{listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)}};
|
||||||
default_appspec(_App, _, _) ->
|
default_appspec(_App, _, _) ->
|
||||||
#{}.
|
#{}.
|
||||||
|
|
||||||
|
@ -285,17 +289,20 @@ load_apps(Node, #{apps := Apps}) ->
|
||||||
erpc:call(Node, emqx_cth_suite, load_apps, [Apps]).
|
erpc:call(Node, emqx_cth_suite, load_apps, [Apps]).
|
||||||
|
|
||||||
start_apps_clustering(Node, #{apps := Apps} = Spec) ->
|
start_apps_clustering(Node, #{apps := Apps} = Spec) ->
|
||||||
SuiteOpts = maps:with([work_dir], Spec),
|
SuiteOpts = suite_opts(Spec),
|
||||||
AppsClustering = [lists:keyfind(App, 1, Apps) || App <- ?APPS_CLUSTERING],
|
AppsClustering = [lists:keyfind(App, 1, Apps) || App <- ?APPS_CLUSTERING],
|
||||||
_Started = erpc:call(Node, emqx_cth_suite, start, [AppsClustering, SuiteOpts]),
|
_Started = erpc:call(Node, emqx_cth_suite, start, [AppsClustering, SuiteOpts]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
start_apps(Node, #{apps := Apps} = Spec) ->
|
start_apps(Node, #{apps := Apps} = Spec) ->
|
||||||
SuiteOpts = maps:with([work_dir], Spec),
|
SuiteOpts = suite_opts(Spec),
|
||||||
AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)],
|
AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)],
|
||||||
_Started = erpc:call(Node, emqx_cth_suite, start_apps, [AppsRest, SuiteOpts]),
|
_Started = erpc:call(Node, emqx_cth_suite, start_apps, [AppsRest, SuiteOpts]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
suite_opts(Spec) ->
|
||||||
|
maps:with([work_dir], Spec).
|
||||||
|
|
||||||
maybe_join_cluster(_Node, #{role := replicant}) ->
|
maybe_join_cluster(_Node, #{role := replicant}) ->
|
||||||
ok;
|
ok;
|
||||||
maybe_join_cluster(Node, Spec) ->
|
maybe_join_cluster(Node, Spec) ->
|
||||||
|
|
|
@ -101,7 +101,13 @@ when
|
||||||
%% function will raise an error.
|
%% function will raise an error.
|
||||||
work_dir := file:name()
|
work_dir := file:name()
|
||||||
}.
|
}.
|
||||||
start(Apps, SuiteOpts = #{work_dir := WorkDir}) ->
|
start(Apps, SuiteOpts0 = #{work_dir := WorkDir0}) ->
|
||||||
|
%% when running CT on the whole app, it seems like `priv_dir` is the same on all
|
||||||
|
%% suites and leads to the "clean slate" verificatin to fail.
|
||||||
|
WorkDir = binary_to_list(
|
||||||
|
filename:join([WorkDir0, emqx_guid:to_hexstr(emqx_guid:gen())])
|
||||||
|
),
|
||||||
|
SuiteOpts = SuiteOpts0#{work_dir := WorkDir},
|
||||||
% 1. Prepare appspec instructions
|
% 1. Prepare appspec instructions
|
||||||
AppSpecs = [mk_appspec(App, SuiteOpts) || App <- Apps],
|
AppSpecs = [mk_appspec(App, SuiteOpts) || App <- Apps],
|
||||||
% 2. Load every app so that stuff scanning attributes of loaded modules works
|
% 2. Load every app so that stuff scanning attributes of loaded modules works
|
||||||
|
|
|
@ -17,32 +17,46 @@
|
||||||
-module(emqx_persistent_messages_SUITE).
|
-module(emqx_persistent_messages_SUITE).
|
||||||
|
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-define(NOW,
|
-define(DS_SHARD, <<"local">>).
|
||||||
(calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}]))
|
|
||||||
).
|
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
{ok, _} = application:ensure_all_started(emqx_durable_storage),
|
%% avoid inter-suite flakiness...
|
||||||
ok = emqx_common_test_helpers:start_apps([], fun
|
%% TODO: remove after other suites start to use `emx_cth_suite'
|
||||||
(emqx) ->
|
application:stop(emqx),
|
||||||
emqx_common_test_helpers:boot_modules(all),
|
application:stop(emqx_durable_storage),
|
||||||
emqx_config:init_load(emqx_schema, <<"persistent_session_store.ds = true">>),
|
WorkDir = ?config(priv_dir, Config),
|
||||||
emqx_app:set_config_loader(?MODULE);
|
TCApps = emqx_cth_suite:start(
|
||||||
(_) ->
|
app_specs(),
|
||||||
ok
|
#{work_dir => WorkDir}
|
||||||
end),
|
),
|
||||||
|
[{tc_apps, TCApps} | Config].
|
||||||
|
|
||||||
|
end_per_suite(Config) ->
|
||||||
|
TCApps = ?config(tc_apps, Config),
|
||||||
|
emqx_cth_suite:stop(TCApps),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
init_per_testcase(t_session_subscription_iterators, Config) ->
|
||||||
|
Cluster = cluster(),
|
||||||
|
Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}),
|
||||||
|
[{nodes, Nodes} | Config];
|
||||||
|
init_per_testcase(_TestCase, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_testcase(t_session_subscription_iterators, Config) ->
|
||||||
emqx_common_test_helpers:stop_apps([]),
|
Nodes = ?config(nodes, Config),
|
||||||
application:stop(emqx_durable_storage),
|
ok = emqx_cth_cluster:stop(Nodes),
|
||||||
|
ok;
|
||||||
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_messages_persisted(_Config) ->
|
t_messages_persisted(_Config) ->
|
||||||
|
@ -76,7 +90,7 @@ t_messages_persisted(_Config) ->
|
||||||
|
|
||||||
ct:pal("Results = ~p", [Results]),
|
ct:pal("Results = ~p", [Results]),
|
||||||
|
|
||||||
Persisted = consume(<<"local">>, {['#'], 0}),
|
Persisted = consume(?DS_SHARD, {['#'], 0}),
|
||||||
|
|
||||||
ct:pal("Persisted = ~p", [Persisted]),
|
ct:pal("Persisted = ~p", [Persisted]),
|
||||||
|
|
||||||
|
@ -88,6 +102,97 @@ t_messages_persisted(_Config) ->
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% TODO: test quic and ws too
|
||||||
|
t_session_subscription_iterators(Config) ->
|
||||||
|
[Node1, Node2] = ?config(nodes, Config),
|
||||||
|
Port = get_mqtt_port(Node1, tcp),
|
||||||
|
Topic = <<"t/topic">>,
|
||||||
|
SubTopicFilter = <<"t/+">>,
|
||||||
|
AnotherTopic = <<"u/another-topic">>,
|
||||||
|
ClientId = <<"myclientid">>,
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
[
|
||||||
|
Payload1,
|
||||||
|
Payload2,
|
||||||
|
Payload3,
|
||||||
|
Payload4
|
||||||
|
] = lists:map(
|
||||||
|
fun(N) -> <<"hello", (integer_to_binary(N))/binary>> end,
|
||||||
|
lists:seq(1, 4)
|
||||||
|
),
|
||||||
|
ct:pal("starting"),
|
||||||
|
{ok, Client} = emqtt:start_link([
|
||||||
|
{port, Port},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{proto_ver, v5}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
ct:pal("publishing 1"),
|
||||||
|
Message1 = emqx_message:make(Topic, Payload1),
|
||||||
|
publish(Node1, Message1),
|
||||||
|
ct:pal("subscribing 1"),
|
||||||
|
{ok, _, [2]} = emqtt:subscribe(Client, SubTopicFilter, qos2),
|
||||||
|
ct:pal("publishing 2"),
|
||||||
|
Message2 = emqx_message:make(Topic, Payload2),
|
||||||
|
publish(Node1, Message2),
|
||||||
|
[_] = receive_messages(1),
|
||||||
|
ct:pal("subscribing 2"),
|
||||||
|
{ok, _, [1]} = emqtt:subscribe(Client, SubTopicFilter, qos1),
|
||||||
|
ct:pal("publishing 3"),
|
||||||
|
Message3 = emqx_message:make(Topic, Payload3),
|
||||||
|
publish(Node1, Message3),
|
||||||
|
[_] = receive_messages(1),
|
||||||
|
ct:pal("publishing 4"),
|
||||||
|
Message4 = emqx_message:make(AnotherTopic, Payload4),
|
||||||
|
publish(Node1, Message4),
|
||||||
|
emqtt:stop(Client),
|
||||||
|
#{
|
||||||
|
messages => [Message1, Message2, Message3, Message4]
|
||||||
|
}
|
||||||
|
end,
|
||||||
|
fun(Results, Trace) ->
|
||||||
|
ct:pal("trace:\n ~p", [Trace]),
|
||||||
|
#{
|
||||||
|
messages := [_Message1, Message2, Message3 | _]
|
||||||
|
} = Results,
|
||||||
|
case ?of_kind(ds_session_subscription_added, Trace) of
|
||||||
|
[] ->
|
||||||
|
%% Since `emqx_durable_storage' is a dependency of `emqx', it gets
|
||||||
|
%% compiled in "prod" mode when running emqx standalone tests.
|
||||||
|
ok;
|
||||||
|
[_ | _] ->
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{?snk_kind := ds_session_subscription_added},
|
||||||
|
#{?snk_kind := ds_session_subscription_present}
|
||||||
|
],
|
||||||
|
?of_kind(
|
||||||
|
[
|
||||||
|
ds_session_subscription_added,
|
||||||
|
ds_session_subscription_present
|
||||||
|
],
|
||||||
|
Trace
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
|
||||||
|
{ok, [IteratorId]} = get_all_iterator_ids(Node1),
|
||||||
|
?assertMatch({ok, [IteratorId]}, get_all_iterator_ids(Node2)),
|
||||||
|
ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end),
|
||||||
|
ExpectedMessages = [Message2, Message3],
|
||||||
|
%% Note: it is expected that this will break after replayers are in place.
|
||||||
|
%% They might have consumed all the messages by this time.
|
||||||
|
?assertEqual(ExpectedMessages, ReplayMessages1),
|
||||||
|
%% Different DS shard
|
||||||
|
ReplayMessages2 = erpc:call(Node2, fun() -> consume(?DS_SHARD, IteratorId) end),
|
||||||
|
?assertEqual([], ReplayMessages2),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
connect(ClientId, CleanStart, EI) ->
|
connect(ClientId, CleanStart, EI) ->
|
||||||
|
@ -103,8 +208,11 @@ connect(ClientId, CleanStart, EI) ->
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
Client.
|
Client.
|
||||||
|
|
||||||
consume(Shard, Replay) ->
|
consume(Shard, Replay = {_TopicFiler, _StartMS}) ->
|
||||||
{ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Replay),
|
{ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Replay),
|
||||||
|
consume(It);
|
||||||
|
consume(Shard, IteratorId) when is_binary(IteratorId) ->
|
||||||
|
{ok, It} = emqx_ds_storage_layer:restore_iterator(Shard, IteratorId),
|
||||||
consume(It).
|
consume(It).
|
||||||
|
|
||||||
consume(It) ->
|
consume(It) ->
|
||||||
|
@ -114,3 +222,54 @@ consume(It) ->
|
||||||
none ->
|
none ->
|
||||||
[]
|
[]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
receive_messages(Count) ->
|
||||||
|
receive_messages(Count, []).
|
||||||
|
|
||||||
|
receive_messages(0, Msgs) ->
|
||||||
|
Msgs;
|
||||||
|
receive_messages(Count, Msgs) ->
|
||||||
|
receive
|
||||||
|
{publish, Msg} ->
|
||||||
|
receive_messages(Count - 1, [Msg | Msgs])
|
||||||
|
after 5_000 ->
|
||||||
|
Msgs
|
||||||
|
end.
|
||||||
|
|
||||||
|
publish(Node, Message) ->
|
||||||
|
erpc:call(Node, emqx, publish, [Message]).
|
||||||
|
|
||||||
|
get_iterator_ids(Node, ClientId) ->
|
||||||
|
Channel = erpc:call(Node, fun() ->
|
||||||
|
[ConnPid] = emqx_cm:lookup_channels(ClientId),
|
||||||
|
sys:get_state(ConnPid)
|
||||||
|
end),
|
||||||
|
emqx_connection:info({channel, {session, iterators}}, Channel).
|
||||||
|
|
||||||
|
app_specs() ->
|
||||||
|
[
|
||||||
|
emqx_durable_storage,
|
||||||
|
{emqx, #{
|
||||||
|
config => #{persistent_session_store => #{ds => true}},
|
||||||
|
override_env => [{boot_modules, [broker, listeners]}]
|
||||||
|
}}
|
||||||
|
].
|
||||||
|
|
||||||
|
cluster() ->
|
||||||
|
Node1 = persistent_messages_SUITE1,
|
||||||
|
Spec = #{
|
||||||
|
role => core,
|
||||||
|
join_to => emqx_cth_cluster:node_name(Node1),
|
||||||
|
apps => app_specs()
|
||||||
|
},
|
||||||
|
[
|
||||||
|
{Node1, Spec},
|
||||||
|
{persistent_messages_SUITE2, Spec}
|
||||||
|
].
|
||||||
|
|
||||||
|
get_mqtt_port(Node, Type) ->
|
||||||
|
{_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
|
||||||
|
Port.
|
||||||
|
|
||||||
|
get_all_iterator_ids(Node) ->
|
||||||
|
erpc:call(Node, emqx_ds_storage_layer, list_iterator_prefix, [?DS_SHARD, <<>>]).
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
-include_lib("proper/include/proper.hrl").
|
-include_lib("proper/include/proper.hrl").
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
-include("emqx_session.hrl").
|
||||||
-include("emqx_access_control.hrl").
|
-include("emqx_access_control.hrl").
|
||||||
|
|
||||||
%% High level Types
|
%% High level Types
|
||||||
|
@ -132,33 +133,22 @@ clientinfo() ->
|
||||||
sessioninfo() ->
|
sessioninfo() ->
|
||||||
?LET(
|
?LET(
|
||||||
Session,
|
Session,
|
||||||
{session, clientid(),
|
#session{
|
||||||
% id
|
clientid = clientid(),
|
||||||
sessionid(),
|
id = sessionid(),
|
||||||
% is_persistent
|
is_persistent = boolean(),
|
||||||
boolean(),
|
subscriptions = subscriptions(),
|
||||||
% subscriptions
|
max_subscriptions = non_neg_integer(),
|
||||||
subscriptions(),
|
upgrade_qos = boolean(),
|
||||||
% max_subscriptions
|
inflight = inflight(),
|
||||||
non_neg_integer(),
|
mqueue = mqueue(),
|
||||||
% upgrade_qos
|
next_pkt_id = packet_id(),
|
||||||
boolean(),
|
retry_interval = safty_timeout(),
|
||||||
% emqx_inflight:inflight()
|
awaiting_rel = awaiting_rel(),
|
||||||
inflight(),
|
max_awaiting_rel = non_neg_integer(),
|
||||||
% emqx_mqueue:mqueue()
|
await_rel_timeout = safty_timeout(),
|
||||||
mqueue(),
|
created_at = timestamp()
|
||||||
% next_pkt_id
|
},
|
||||||
packet_id(),
|
|
||||||
% retry_interval
|
|
||||||
safty_timeout(),
|
|
||||||
% awaiting_rel
|
|
||||||
awaiting_rel(),
|
|
||||||
% max_awaiting_rel
|
|
||||||
non_neg_integer(),
|
|
||||||
% await_rel_timeout
|
|
||||||
safty_timeout(),
|
|
||||||
% created_at
|
|
||||||
timestamp()},
|
|
||||||
emqx_session:info(Session)
|
emqx_session:info(Session)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,9 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_ds).
|
-module(emqx_ds).
|
||||||
|
|
||||||
|
-include_lib("stdlib/include/ms_transform.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
%% API:
|
%% API:
|
||||||
-export([ensure_shard/2]).
|
-export([ensure_shard/2]).
|
||||||
%% Messages:
|
%% Messages:
|
||||||
|
@ -39,6 +42,8 @@
|
||||||
message_stats/0,
|
message_stats/0,
|
||||||
message_store_opts/0,
|
message_store_opts/0,
|
||||||
session_id/0,
|
session_id/0,
|
||||||
|
replay/0,
|
||||||
|
replay_id/0,
|
||||||
iterator_id/0,
|
iterator_id/0,
|
||||||
iterator/0,
|
iterator/0,
|
||||||
shard/0,
|
shard/0,
|
||||||
|
@ -56,7 +61,7 @@
|
||||||
|
|
||||||
-type iterator() :: term().
|
-type iterator() :: term().
|
||||||
|
|
||||||
-opaque iterator_id() :: binary().
|
-type iterator_id() :: binary().
|
||||||
|
|
||||||
%%-type session() :: #session{}.
|
%%-type session() :: #session{}.
|
||||||
|
|
||||||
|
@ -73,9 +78,17 @@
|
||||||
|
|
||||||
%% Timestamp
|
%% Timestamp
|
||||||
%% Earliest possible timestamp is 0.
|
%% Earliest possible timestamp is 0.
|
||||||
%% TODO granularity?
|
%% TODO granularity? Currently, we should always use micro second, as that's the unit we
|
||||||
|
%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps.
|
||||||
-type time() :: non_neg_integer().
|
-type time() :: non_neg_integer().
|
||||||
|
|
||||||
|
-type replay_id() :: binary().
|
||||||
|
|
||||||
|
-type replay() :: {
|
||||||
|
_TopicFilter :: emqx_topic:words(),
|
||||||
|
_StartTime :: time()
|
||||||
|
}.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API funcions
|
%% API funcions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -121,23 +134,20 @@ message_stats() ->
|
||||||
%%
|
%%
|
||||||
%% Note: session API doesn't handle session takeovers, it's the job of
|
%% Note: session API doesn't handle session takeovers, it's the job of
|
||||||
%% the broker.
|
%% the broker.
|
||||||
-spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id(), [iterator_id()]}.
|
-spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id()}.
|
||||||
session_open(ClientID) ->
|
session_open(ClientID) ->
|
||||||
{atomic, Ret} =
|
{atomic, Res} =
|
||||||
mria:transaction(
|
mria:transaction(?DS_SHARD, fun() ->
|
||||||
?DS_SHARD,
|
case mnesia:read(?SESSION_TAB, ClientID, write) of
|
||||||
fun() ->
|
[#session{}] ->
|
||||||
case mnesia:read(?SESSION_TAB, ClientID) of
|
{false, ClientID};
|
||||||
[#session{iterators = Iterators}] ->
|
[] ->
|
||||||
{false, ClientID, Iterators};
|
Session = #session{id = ClientID},
|
||||||
[] ->
|
mnesia:write(?SESSION_TAB, Session, write),
|
||||||
Session = #session{id = ClientID, iterators = []},
|
{true, ClientID}
|
||||||
mnesia:write(?SESSION_TAB, Session, write),
|
|
||||||
{true, ClientID, []}
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
),
|
end),
|
||||||
Ret.
|
Res.
|
||||||
|
|
||||||
%% @doc Called when a client reconnects with `clean session=true' or
|
%% @doc Called when a client reconnects with `clean session=true' or
|
||||||
%% during session GC
|
%% during session GC
|
||||||
|
@ -160,10 +170,36 @@ session_suspend(_SessionId) ->
|
||||||
|
|
||||||
%% @doc Called when a client subscribes to a topic. Idempotent.
|
%% @doc Called when a client subscribes to a topic. Idempotent.
|
||||||
-spec session_add_iterator(session_id(), emqx_topic:words()) ->
|
-spec session_add_iterator(session_id(), emqx_topic:words()) ->
|
||||||
{ok, iterator_id()} | {error, session_not_found}.
|
{ok, iterator_id(), time(), _IsNew :: boolean()}.
|
||||||
session_add_iterator(_SessionId, _TopicFilter) ->
|
session_add_iterator(DSSessionId, TopicFilter) ->
|
||||||
%% TODO
|
IteratorRefId = {DSSessionId, TopicFilter},
|
||||||
{ok, <<"">>}.
|
{atomic, Res} =
|
||||||
|
mria:transaction(?DS_SHARD, fun() ->
|
||||||
|
case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of
|
||||||
|
[] ->
|
||||||
|
{IteratorId, StartMS} = new_iterator_id(DSSessionId),
|
||||||
|
IteratorRef = #iterator_ref{
|
||||||
|
ref_id = IteratorRefId,
|
||||||
|
it_id = IteratorId,
|
||||||
|
start_time = StartMS
|
||||||
|
},
|
||||||
|
ok = mnesia:write(?ITERATOR_REF_TAB, IteratorRef, write),
|
||||||
|
?tp(
|
||||||
|
ds_session_subscription_added,
|
||||||
|
#{iterator_id => IteratorId, session_id => DSSessionId}
|
||||||
|
),
|
||||||
|
IsNew = true,
|
||||||
|
{ok, IteratorId, StartMS, IsNew};
|
||||||
|
[#iterator_ref{it_id = IteratorId, start_time = StartMS}] ->
|
||||||
|
?tp(
|
||||||
|
ds_session_subscription_present,
|
||||||
|
#{iterator_id => IteratorId, session_id => DSSessionId}
|
||||||
|
),
|
||||||
|
IsNew = false,
|
||||||
|
{ok, IteratorId, StartMS, IsNew}
|
||||||
|
end
|
||||||
|
end),
|
||||||
|
Res.
|
||||||
|
|
||||||
%% @doc Called when a client unsubscribes from a topic. Returns `true'
|
%% @doc Called when a client unsubscribes from a topic. Returns `true'
|
||||||
%% if the session contained the subscription or `false' if it wasn't
|
%% if the session contained the subscription or `false' if it wasn't
|
||||||
|
@ -201,3 +237,9 @@ iterator_stats() ->
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
|
-spec new_iterator_id(session_id()) -> {iterator_id(), time()}.
|
||||||
|
new_iterator_id(DSSessionId) ->
|
||||||
|
NowMS = erlang:system_time(microsecond),
|
||||||
|
IteratorId = <<DSSessionId/binary, (emqx_guid:gen())/binary>>,
|
||||||
|
{IteratorId, NowMS}.
|
||||||
|
|
|
@ -25,7 +25,18 @@ init_mnesia() ->
|
||||||
{record_name, session},
|
{record_name, session},
|
||||||
{attributes, record_info(fields, session)}
|
{attributes, record_info(fields, session)}
|
||||||
]
|
]
|
||||||
).
|
),
|
||||||
|
ok = mria:create_table(
|
||||||
|
?ITERATOR_REF_TAB,
|
||||||
|
[
|
||||||
|
{rlog_shard, ?DS_SHARD},
|
||||||
|
{type, ordered_set},
|
||||||
|
{storage, storage()},
|
||||||
|
{record_name, iterator_ref},
|
||||||
|
{attributes, record_info(fields, iterator_ref)}
|
||||||
|
]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
storage() ->
|
storage() ->
|
||||||
case mria:rocksdb_backend_available() of
|
case mria:rocksdb_backend_available() of
|
||||||
|
|
|
@ -17,11 +17,20 @@
|
||||||
-define(EMQX_DS_HRL, true).
|
-define(EMQX_DS_HRL, true).
|
||||||
|
|
||||||
-define(SESSION_TAB, emqx_ds_session).
|
-define(SESSION_TAB, emqx_ds_session).
|
||||||
|
-define(ITERATOR_REF_TAB, emqx_ds_iterator_ref).
|
||||||
-define(DS_SHARD, emqx_ds_shard).
|
-define(DS_SHARD, emqx_ds_shard).
|
||||||
|
|
||||||
-record(session, {
|
-record(session, {
|
||||||
|
%% same as clientid
|
||||||
id :: emqx_ds:session_id(),
|
id :: emqx_ds:session_id(),
|
||||||
iterators :: [{emqx_topic:words(), emqx_ds:iterator_id()}]
|
%% for future usage
|
||||||
|
props = #{} :: map()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(iterator_ref, {
|
||||||
|
ref_id :: {emqx_ds:session_id(), emqx_topic:words()},
|
||||||
|
it_id :: emqx_ds:iterator_id(),
|
||||||
|
start_time :: emqx_ds:time()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -1,36 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
-module(emqx_ds_replay).
|
|
||||||
|
|
||||||
%% API:
|
|
||||||
-export([]).
|
|
||||||
|
|
||||||
-export_type([replay_id/0, replay/0]).
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% Type declarations
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
-type replay_id() :: binary().
|
|
||||||
|
|
||||||
-type replay() :: {
|
|
||||||
_TopicFilter :: emqx_ds:topic(),
|
|
||||||
_StartTime :: emqx_ds:time()
|
|
||||||
}.
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% API funcions
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% behaviour callbacks
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% Internal exports
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% Internal functions
|
|
||||||
%%================================================================================
|
|
|
@ -13,7 +13,15 @@
|
||||||
|
|
||||||
-export([make_iterator/2, next/1]).
|
-export([make_iterator/2, next/1]).
|
||||||
|
|
||||||
-export([preserve_iterator/2, restore_iterator/2, discard_iterator/2]).
|
-export([
|
||||||
|
preserve_iterator/2,
|
||||||
|
restore_iterator/2,
|
||||||
|
discard_iterator/2,
|
||||||
|
ensure_iterator/3,
|
||||||
|
discard_iterator_prefix/2,
|
||||||
|
list_iterator_prefix/2,
|
||||||
|
foldl_iterator_prefix/4
|
||||||
|
]).
|
||||||
|
|
||||||
%% behaviour callbacks:
|
%% behaviour callbacks:
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
||||||
|
@ -160,10 +168,10 @@ next(It = #it{module = Mod, data = ItData}) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec preserve_iterator(iterator(), emqx_ds:replay_id()) ->
|
-spec preserve_iterator(iterator(), emqx_ds:iterator_id()) ->
|
||||||
ok | {error, _TODO}.
|
ok | {error, _TODO}.
|
||||||
preserve_iterator(It = #it{}, ReplayID) ->
|
preserve_iterator(It = #it{}, IteratorID) ->
|
||||||
iterator_put_state(ReplayID, It).
|
iterator_put_state(IteratorID, It).
|
||||||
|
|
||||||
-spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
|
-spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
|
@ -177,11 +185,50 @@ restore_iterator(Shard, ReplayID) ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec ensure_iterator(emqx_ds:shard(), emqx_ds:iterator_id(), emqx_ds:replay()) ->
|
||||||
|
{ok, iterator()} | {error, _TODO}.
|
||||||
|
ensure_iterator(Shard, IteratorID, Replay = {_TopicFilter, _StartMS}) ->
|
||||||
|
case restore_iterator(Shard, IteratorID) of
|
||||||
|
{ok, It} ->
|
||||||
|
{ok, It};
|
||||||
|
{error, not_found} ->
|
||||||
|
{ok, It} = make_iterator(Shard, Replay),
|
||||||
|
ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID),
|
||||||
|
{ok, It};
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
-spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
|
-spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
|
||||||
ok | {error, _TODO}.
|
ok | {error, _TODO}.
|
||||||
discard_iterator(Shard, ReplayID) ->
|
discard_iterator(Shard, ReplayID) ->
|
||||||
iterator_delete(Shard, ReplayID).
|
iterator_delete(Shard, ReplayID).
|
||||||
|
|
||||||
|
-spec discard_iterator_prefix(emqx_ds:shard(), binary()) ->
|
||||||
|
ok | {error, _TODO}.
|
||||||
|
discard_iterator_prefix(Shard, KeyPrefix) ->
|
||||||
|
case do_discard_iterator_prefix(Shard, KeyPrefix) of
|
||||||
|
{ok, _} -> ok;
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec list_iterator_prefix(
|
||||||
|
emqx_ds:shard(),
|
||||||
|
binary()
|
||||||
|
) -> {ok, [emqx_ds:iterator_id()]} | {error, _TODO}.
|
||||||
|
list_iterator_prefix(Shard, KeyPrefix) ->
|
||||||
|
do_list_iterator_prefix(Shard, KeyPrefix).
|
||||||
|
|
||||||
|
-spec foldl_iterator_prefix(
|
||||||
|
emqx_ds:shard(),
|
||||||
|
binary(),
|
||||||
|
fun((_Key :: binary(), _Value :: binary(), Acc) -> Acc),
|
||||||
|
Acc
|
||||||
|
) -> {ok, Acc} | {error, _TODO} when
|
||||||
|
Acc :: term().
|
||||||
|
foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) ->
|
||||||
|
do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% behaviour callbacks
|
%% behaviour callbacks
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -344,7 +391,11 @@ open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay},
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
-define(KEY_REPLAY_STATE(ReplayID), <<(ReplayID)/binary, "rs">>).
|
-define(KEY_REPLAY_STATE(IteratorId), <<(IteratorId)/binary, "rs">>).
|
||||||
|
-define(KEY_REPLAY_STATE_PAT(KeyReplayState), begin
|
||||||
|
<<IteratorId:(size(KeyReplayState) - 2)/binary, "rs">> = (KeyReplayState),
|
||||||
|
IteratorId
|
||||||
|
end).
|
||||||
|
|
||||||
-define(ITERATION_WRITE_OPTS, []).
|
-define(ITERATION_WRITE_OPTS, []).
|
||||||
-define(ITERATION_READ_OPTS, []).
|
-define(ITERATION_READ_OPTS, []).
|
||||||
|
@ -391,6 +442,44 @@ restore_iterator_state(
|
||||||
It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}},
|
It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}},
|
||||||
open_restore_iterator(meta_get_gen(Shard, Gen), It, State).
|
open_restore_iterator(meta_get_gen(Shard, Gen), It, State).
|
||||||
|
|
||||||
|
do_list_iterator_prefix(Shard, KeyPrefix) ->
|
||||||
|
Fn = fun(K0, _V, Acc) ->
|
||||||
|
K = ?KEY_REPLAY_STATE_PAT(K0),
|
||||||
|
[K | Acc]
|
||||||
|
end,
|
||||||
|
do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, []).
|
||||||
|
|
||||||
|
do_discard_iterator_prefix(Shard, KeyPrefix) ->
|
||||||
|
#db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db),
|
||||||
|
Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end,
|
||||||
|
do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, ok).
|
||||||
|
|
||||||
|
do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) ->
|
||||||
|
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
|
||||||
|
case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of
|
||||||
|
{ok, It} ->
|
||||||
|
NextAction = {seek, KeyPrefix},
|
||||||
|
do_foldl_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction, Fn, Acc);
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction, Fn, Acc) ->
|
||||||
|
case rocksdb:iterator_move(It, NextAction) of
|
||||||
|
{ok, K = <<KeyPrefix:(size(KeyPrefix))/binary, _/binary>>, V} ->
|
||||||
|
NewAcc = Fn(K, V, Acc),
|
||||||
|
do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, next, Fn, NewAcc);
|
||||||
|
{ok, _K, _V} ->
|
||||||
|
ok = rocksdb:iterator_close(It),
|
||||||
|
{ok, Acc};
|
||||||
|
{error, invalid_iterator} ->
|
||||||
|
ok = rocksdb:iterator_close(It),
|
||||||
|
{ok, Acc};
|
||||||
|
Error ->
|
||||||
|
ok = rocksdb:iterator_close(It),
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
%% Functions for dealing with the metadata stored persistently in rocksdb
|
%% Functions for dealing with the metadata stored persistently in rocksdb
|
||||||
|
|
||||||
-define(CURRENT_GEN, <<"current">>).
|
-define(CURRENT_GEN, <<"current">>).
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
{application, emqx_durable_storage, [
|
{application, emqx_durable_storage, [
|
||||||
{description, "Message persistence and subscription replays for EMQX"},
|
{description, "Message persistence and subscription replays for EMQX"},
|
||||||
% strict semver, bump manually!
|
% strict semver, bump manually!
|
||||||
{vsn, "0.1.2"},
|
{vsn, "0.1.3"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel, stdlib, rocksdb, gproc, mria]},
|
{applications, [kernel, stdlib, rocksdb, gproc, mria]},
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
|
|
||||||
-opaque t() :: ets:tid().
|
-opaque t() :: ets:tid().
|
||||||
|
|
||||||
|
-export_type([t/0]).
|
||||||
|
|
||||||
-spec open() -> t().
|
-spec open() -> t().
|
||||||
open() ->
|
open() ->
|
||||||
ets:new(?MODULE, [ordered_set, {keypos, 1}]).
|
ets:new(?MODULE, [ordered_set, {keypos, 1}]).
|
||||||
|
|
|
@ -927,7 +927,7 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
|
||||||
retry_interval,
|
retry_interval,
|
||||||
upgrade_qos,
|
upgrade_qos,
|
||||||
zone,
|
zone,
|
||||||
%% sessionID, defined in emqx_session.erl
|
%% session_id, defined in emqx_session.erl
|
||||||
id
|
id
|
||||||
],
|
],
|
||||||
TimesKeys = [created_at, connected_at, disconnected_at],
|
TimesKeys = [created_at, connected_at, disconnected_at],
|
||||||
|
|
|
@ -525,7 +525,6 @@ do_import_conf(RawConf, Opts) ->
|
||||||
Errors =
|
Errors =
|
||||||
lists:foldr(
|
lists:foldr(
|
||||||
fun(Module, ErrorsAcc) ->
|
fun(Module, ErrorsAcc) ->
|
||||||
Module:import_config(RawConf),
|
|
||||||
case Module:import_config(RawConf) of
|
case Module:import_config(RawConf) of
|
||||||
{ok, #{changed := Changed}} ->
|
{ok, #{changed := Changed}} ->
|
||||||
maybe_print_changed(Changed, Opts),
|
maybe_print_changed(Changed, Opts),
|
||||||
|
|
|
@ -440,8 +440,8 @@ create_test_tab(Attributes) ->
|
||||||
|
|
||||||
apps_to_start() ->
|
apps_to_start() ->
|
||||||
[
|
[
|
||||||
{emqx_conf, "dashboard.listeners.http.bind = 0"},
|
|
||||||
{emqx, #{override_env => [{boot_modules, [broker, router]}]}},
|
{emqx, #{override_env => [{boot_modules, [broker, router]}]}},
|
||||||
|
{emqx_conf, #{config => #{dashboard => #{listeners => #{http => #{bind => <<"0">>}}}}}},
|
||||||
emqx_psk,
|
emqx_psk,
|
||||||
emqx_management,
|
emqx_management,
|
||||||
emqx_dashboard,
|
emqx_dashboard,
|
||||||
|
|
Loading…
Reference in New Issue