refactor: move session stuff from `emqx_ds` to `emqx_persistent_session_ds`
Part of https://emqx.atlassian.net/browse/EMQX-10942
This commit is contained in:
parent
b77e5e880a
commit
2358d67908
|
@ -246,7 +246,7 @@ t_session_subscription_idempotency(Config) ->
|
||||||
?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
|
?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, #{}, #{SubTopicFilterWords := #{}}},
|
{ok, #{}, #{SubTopicFilterWords := #{}}},
|
||||||
erpc:call(Node1, emqx_ds, session_open, [ClientId])
|
erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId])
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
|
|
|
@ -57,6 +57,7 @@ init() ->
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
ok = emqx_persistent_session_ds_router:init_tables(),
|
ok = emqx_persistent_session_ds_router:init_tables(),
|
||||||
|
ok = emqx_persistent_session_ds:create_tables(),
|
||||||
ok
|
ok
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,13 @@
|
||||||
terminate/2
|
terminate/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% session table operations
|
||||||
|
-export([create_tables/0]).
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-export([session_open/1]).
|
||||||
|
-endif.
|
||||||
|
|
||||||
%% RPC
|
%% RPC
|
||||||
-export([
|
-export([
|
||||||
ensure_iterator_closed_on_all_shards/1,
|
ensure_iterator_closed_on_all_shards/1,
|
||||||
|
@ -71,8 +78,13 @@
|
||||||
-define(DEFAULT_KEYSPACE, default).
|
-define(DEFAULT_KEYSPACE, default).
|
||||||
-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
|
-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
|
||||||
|
|
||||||
-type id() :: emqx_ds:session_id().
|
%% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be
|
||||||
|
%% an atom, in theory (?).
|
||||||
|
-type id() :: binary().
|
||||||
-type iterator() :: emqx_ds:iterator().
|
-type iterator() :: emqx_ds:iterator().
|
||||||
|
-type iterator_id() :: emqx_ds:iterator_id().
|
||||||
|
-type topic_filter() :: emqx_ds:topic_filter().
|
||||||
|
-type iterators() :: #{topic_filter() => iterator()}.
|
||||||
-type session() :: #{
|
-type session() :: #{
|
||||||
%% Client ID
|
%% Client ID
|
||||||
id := id(),
|
id := id(),
|
||||||
|
@ -92,6 +104,8 @@
|
||||||
-type conninfo() :: emqx_session:conninfo().
|
-type conninfo() :: emqx_session:conninfo().
|
||||||
-type replies() :: emqx_session:replies().
|
-type replies() :: emqx_session:replies().
|
||||||
|
|
||||||
|
-export_type([id/0]).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
-spec create(clientinfo(), conninfo(), emqx_session:conf()) ->
|
-spec create(clientinfo(), conninfo(), emqx_session:conf()) ->
|
||||||
|
@ -118,11 +132,11 @@ open(#{clientid := ClientID}, _ConnInfo) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ensure_session(ClientID, Conf) ->
|
ensure_session(ClientID, Conf) ->
|
||||||
{ok, Session, #{}} = emqx_ds:session_ensure_new(ClientID, Conf),
|
{ok, Session, #{}} = session_ensure_new(ClientID, Conf),
|
||||||
Session#{iterators => #{}}.
|
Session#{iterators => #{}}.
|
||||||
|
|
||||||
open_session(ClientID) ->
|
open_session(ClientID) ->
|
||||||
case emqx_ds:session_open(ClientID) of
|
case session_open(ClientID) of
|
||||||
{ok, Session, Iterators} ->
|
{ok, Session, Iterators} ->
|
||||||
Session#{iterators => prep_iterators(Iterators)};
|
Session#{iterators => prep_iterators(Iterators)};
|
||||||
false ->
|
false ->
|
||||||
|
@ -144,7 +158,7 @@ destroy(#{clientid := ClientID}) ->
|
||||||
|
|
||||||
destroy_session(ClientID) ->
|
destroy_session(ClientID) ->
|
||||||
_ = ensure_all_iterators_closed(ClientID),
|
_ = ensure_all_iterators_closed(ClientID),
|
||||||
emqx_ds:session_drop(ClientID).
|
session_drop(ClientID).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Info, Stats
|
%% Info, Stats
|
||||||
|
@ -352,7 +366,7 @@ add_subscription(TopicFilterBin, SubOpts, DSSessionID) ->
|
||||||
% and iterator information can be reconstructed from this table, if needed.
|
% and iterator information can be reconstructed from this table, if needed.
|
||||||
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID),
|
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID),
|
||||||
TopicFilter = emqx_topic:words(TopicFilterBin),
|
TopicFilter = emqx_topic:words(TopicFilterBin),
|
||||||
{ok, Iterator, IsNew} = emqx_ds:session_add_iterator(
|
{ok, Iterator, IsNew} = session_add_iterator(
|
||||||
DSSessionID, TopicFilter, SubOpts
|
DSSessionID, TopicFilter, SubOpts
|
||||||
),
|
),
|
||||||
Ctx = #{iterator => Iterator, is_new => IsNew},
|
Ctx = #{iterator => Iterator, is_new => IsNew},
|
||||||
|
@ -368,7 +382,7 @@ add_subscription(TopicFilterBin, SubOpts, DSSessionID) ->
|
||||||
iterator().
|
iterator().
|
||||||
update_subscription(TopicFilterBin, Iterator, SubOpts, DSSessionID) ->
|
update_subscription(TopicFilterBin, Iterator, SubOpts, DSSessionID) ->
|
||||||
TopicFilter = emqx_topic:words(TopicFilterBin),
|
TopicFilter = emqx_topic:words(TopicFilterBin),
|
||||||
{ok, NIterator, false} = emqx_ds:session_add_iterator(
|
{ok, NIterator, false} = session_add_iterator(
|
||||||
DSSessionID, TopicFilter, SubOpts
|
DSSessionID, TopicFilter, SubOpts
|
||||||
),
|
),
|
||||||
ok = ?tp(persistent_session_ds_iterator_updated, #{iterator => Iterator}),
|
ok = ?tp(persistent_session_ds_iterator_updated, #{iterator => Iterator}),
|
||||||
|
@ -415,7 +429,7 @@ del_subscription(TopicFilterBin, #{id := IteratorID}, DSSessionID) ->
|
||||||
?tp_span(
|
?tp_span(
|
||||||
persistent_session_ds_iterator_delete,
|
persistent_session_ds_iterator_delete,
|
||||||
Ctx,
|
Ctx,
|
||||||
emqx_ds:session_del_iterator(DSSessionID, TopicFilter)
|
session_del_iterator(DSSessionID, TopicFilter)
|
||||||
),
|
),
|
||||||
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionID).
|
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionID).
|
||||||
|
|
||||||
|
@ -448,3 +462,210 @@ ensure_all_iterators_closed(DSSessionID) ->
|
||||||
do_ensure_all_iterators_closed(DSSessionID) ->
|
do_ensure_all_iterators_closed(DSSessionID) ->
|
||||||
ok = emqx_ds_storage_layer:discard_iterator_prefix(?DS_SHARD, DSSessionID),
|
ok = emqx_ds_storage_layer:discard_iterator_prefix(?DS_SHARD, DSSessionID),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Session tables operations
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(SESSION_TAB, emqx_ds_session).
|
||||||
|
-define(ITERATOR_REF_TAB, emqx_ds_iterator_ref).
|
||||||
|
-define(DS_MRIA_SHARD, emqx_ds_shard).
|
||||||
|
|
||||||
|
-record(session, {
|
||||||
|
%% same as clientid
|
||||||
|
id :: id(),
|
||||||
|
%% creation time
|
||||||
|
created_at :: _Millisecond :: non_neg_integer(),
|
||||||
|
expires_at = never :: _Millisecond :: non_neg_integer() | never,
|
||||||
|
%% for future usage
|
||||||
|
props = #{} :: map()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(iterator_ref, {
|
||||||
|
ref_id :: {id(), emqx_ds:topic_filter()},
|
||||||
|
it_id :: emqx_ds:iterator_id(),
|
||||||
|
start_time :: emqx_ds:time(),
|
||||||
|
props = #{} :: map()
|
||||||
|
}).
|
||||||
|
|
||||||
|
create_tables() ->
|
||||||
|
ok = mria:create_table(
|
||||||
|
?SESSION_TAB,
|
||||||
|
[
|
||||||
|
{rlog_shard, ?DS_MRIA_SHARD},
|
||||||
|
{type, set},
|
||||||
|
{storage, storage()},
|
||||||
|
{record_name, session},
|
||||||
|
{attributes, record_info(fields, session)}
|
||||||
|
]
|
||||||
|
),
|
||||||
|
ok = mria:create_table(
|
||||||
|
?ITERATOR_REF_TAB,
|
||||||
|
[
|
||||||
|
{rlog_shard, ?DS_MRIA_SHARD},
|
||||||
|
{type, ordered_set},
|
||||||
|
{storage, storage()},
|
||||||
|
{record_name, iterator_ref},
|
||||||
|
{attributes, record_info(fields, iterator_ref)}
|
||||||
|
]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-dialyzer({nowarn_function, storage/0}).
|
||||||
|
storage() ->
|
||||||
|
%% FIXME: This is a temporary workaround to avoid crashes when starting on Windows
|
||||||
|
case mria:rocksdb_backend_available() of
|
||||||
|
true ->
|
||||||
|
rocksdb_copies;
|
||||||
|
_ ->
|
||||||
|
disc_copies
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Called when a client connects. This function looks up a
|
||||||
|
%% session or returns `false` if previous one couldn't be found.
|
||||||
|
%%
|
||||||
|
%% This function also spawns replay agents for each iterator.
|
||||||
|
%%
|
||||||
|
%% Note: session API doesn't handle session takeovers, it's the job of
|
||||||
|
%% the broker.
|
||||||
|
-spec session_open(id()) ->
|
||||||
|
{ok, session(), iterators()} | false.
|
||||||
|
session_open(SessionId) ->
|
||||||
|
transaction(fun() ->
|
||||||
|
case mnesia:read(?SESSION_TAB, SessionId, write) of
|
||||||
|
[Record = #session{}] ->
|
||||||
|
Session = export_record(Record),
|
||||||
|
IteratorRefs = session_read_iterators(SessionId),
|
||||||
|
Iterators = export_iterators(IteratorRefs),
|
||||||
|
{ok, Session, Iterators};
|
||||||
|
[] ->
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end).
|
||||||
|
|
||||||
|
-spec session_ensure_new(id(), _Props :: map()) ->
|
||||||
|
{ok, session(), iterators()}.
|
||||||
|
session_ensure_new(SessionId, Props) ->
|
||||||
|
transaction(fun() ->
|
||||||
|
ok = session_drop_iterators(SessionId),
|
||||||
|
Session = export_record(session_create(SessionId, Props)),
|
||||||
|
{ok, Session, #{}}
|
||||||
|
end).
|
||||||
|
|
||||||
|
session_create(SessionId, Props) ->
|
||||||
|
Session = #session{
|
||||||
|
id = SessionId,
|
||||||
|
created_at = erlang:system_time(millisecond),
|
||||||
|
expires_at = never,
|
||||||
|
props = Props
|
||||||
|
},
|
||||||
|
ok = mnesia:write(?SESSION_TAB, Session, write),
|
||||||
|
Session.
|
||||||
|
|
||||||
|
%% @doc Called when a client reconnects with `clean session=true' or
|
||||||
|
%% during session GC
|
||||||
|
-spec session_drop(id()) -> ok.
|
||||||
|
session_drop(DSSessionId) ->
|
||||||
|
transaction(fun() ->
|
||||||
|
%% TODO: ensure all iterators from this clientid are closed?
|
||||||
|
ok = session_drop_iterators(DSSessionId),
|
||||||
|
ok = mnesia:delete(?SESSION_TAB, DSSessionId, write)
|
||||||
|
end).
|
||||||
|
|
||||||
|
session_drop_iterators(DSSessionId) ->
|
||||||
|
IteratorRefs = session_read_iterators(DSSessionId),
|
||||||
|
ok = lists:foreach(fun session_del_iterator/1, IteratorRefs).
|
||||||
|
|
||||||
|
%% @doc Called when a client subscribes to a topic. Idempotent.
|
||||||
|
-spec session_add_iterator(id(), topic_filter(), _Props :: map()) ->
|
||||||
|
{ok, iterator(), _IsNew :: boolean()}.
|
||||||
|
session_add_iterator(DSSessionId, TopicFilter, Props) ->
|
||||||
|
IteratorRefId = {DSSessionId, TopicFilter},
|
||||||
|
transaction(fun() ->
|
||||||
|
case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of
|
||||||
|
[] ->
|
||||||
|
IteratorRef = session_insert_iterator(DSSessionId, TopicFilter, Props),
|
||||||
|
Iterator = export_record(IteratorRef),
|
||||||
|
?tp(
|
||||||
|
ds_session_subscription_added,
|
||||||
|
#{iterator => Iterator, session_id => DSSessionId}
|
||||||
|
),
|
||||||
|
{ok, Iterator, _IsNew = true};
|
||||||
|
[#iterator_ref{} = IteratorRef] ->
|
||||||
|
NIteratorRef = session_update_iterator(IteratorRef, Props),
|
||||||
|
NIterator = export_record(NIteratorRef),
|
||||||
|
?tp(
|
||||||
|
ds_session_subscription_present,
|
||||||
|
#{iterator => NIterator, session_id => DSSessionId}
|
||||||
|
),
|
||||||
|
{ok, NIterator, _IsNew = false}
|
||||||
|
end
|
||||||
|
end).
|
||||||
|
|
||||||
|
session_insert_iterator(DSSessionId, TopicFilter, Props) ->
|
||||||
|
{IteratorId, StartMS} = new_iterator_id(DSSessionId),
|
||||||
|
IteratorRef = #iterator_ref{
|
||||||
|
ref_id = {DSSessionId, TopicFilter},
|
||||||
|
it_id = IteratorId,
|
||||||
|
start_time = StartMS,
|
||||||
|
props = Props
|
||||||
|
},
|
||||||
|
ok = mnesia:write(?ITERATOR_REF_TAB, IteratorRef, write),
|
||||||
|
IteratorRef.
|
||||||
|
|
||||||
|
session_update_iterator(IteratorRef, Props) ->
|
||||||
|
NIteratorRef = IteratorRef#iterator_ref{props = Props},
|
||||||
|
ok = mnesia:write(?ITERATOR_REF_TAB, NIteratorRef, write),
|
||||||
|
NIteratorRef.
|
||||||
|
|
||||||
|
%% @doc Called when a client unsubscribes from a topic.
|
||||||
|
-spec session_del_iterator(id(), topic_filter()) -> ok.
|
||||||
|
session_del_iterator(DSSessionId, TopicFilter) ->
|
||||||
|
IteratorRefId = {DSSessionId, TopicFilter},
|
||||||
|
transaction(fun() ->
|
||||||
|
mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write)
|
||||||
|
end).
|
||||||
|
|
||||||
|
session_del_iterator(#iterator_ref{ref_id = IteratorRefId}) ->
|
||||||
|
mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write).
|
||||||
|
|
||||||
|
session_read_iterators(DSSessionId) ->
|
||||||
|
% NOTE: somewhat convoluted way to trick dialyzer
|
||||||
|
Pat = erlang:make_tuple(record_info(size, iterator_ref), '_', [
|
||||||
|
{1, iterator_ref},
|
||||||
|
{#iterator_ref.ref_id, {DSSessionId, '_'}}
|
||||||
|
]),
|
||||||
|
mnesia:match_object(?ITERATOR_REF_TAB, Pat, read).
|
||||||
|
|
||||||
|
-spec new_iterator_id(id()) -> {iterator_id(), emqx_ds:time()}.
|
||||||
|
new_iterator_id(DSSessionId) ->
|
||||||
|
NowMS = erlang:system_time(microsecond),
|
||||||
|
IteratorId = <<DSSessionId/binary, (emqx_guid:gen())/binary>>,
|
||||||
|
{IteratorId, NowMS}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
transaction(Fun) ->
|
||||||
|
{atomic, Res} = mria:transaction(?DS_MRIA_SHARD, Fun),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
export_iterators(IteratorRefs) ->
|
||||||
|
lists:foldl(
|
||||||
|
fun(IteratorRef = #iterator_ref{ref_id = {_DSSessionId, TopicFilter}}, Acc) ->
|
||||||
|
Acc#{TopicFilter => export_record(IteratorRef)}
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
IteratorRefs
|
||||||
|
).
|
||||||
|
|
||||||
|
export_record(#session{} = Record) ->
|
||||||
|
export_record(Record, #session.id, [id, created_at, expires_at, props], #{});
|
||||||
|
export_record(#iterator_ref{} = Record) ->
|
||||||
|
export_record(Record, #iterator_ref.it_id, [id, start_time, props], #{}).
|
||||||
|
|
||||||
|
export_record(Record, I, [Field | Rest], Acc) ->
|
||||||
|
export_record(Record, I + 1, Rest, Acc#{Field => element(I, Record)});
|
||||||
|
export_record(_, _, [], Acc) ->
|
||||||
|
Acc.
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
|
|
||||||
-record(ps_route, {
|
-record(ps_route, {
|
||||||
topic :: binary(),
|
topic :: binary(),
|
||||||
dest :: emqx_ds:session_id()
|
dest :: emqx_persistent_session_ds:id()
|
||||||
}).
|
}).
|
||||||
-record(ps_routeidx, {
|
-record(ps_routeidx, {
|
||||||
entry :: emqx_topic_index:key(emqx_persistent_session_ds_router:dest()),
|
entry :: emqx_topic_index:key(emqx_persistent_session_ds_router:dest()),
|
||||||
|
|
|
@ -40,7 +40,7 @@
|
||||||
-export([has_route/2]).
|
-export([has_route/2]).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-type dest() :: emqx_ds:session_id().
|
-type dest() :: emqx_persistent_session_ds:id().
|
||||||
|
|
||||||
-export_type([dest/0]).
|
-export_type([dest/0]).
|
||||||
|
|
||||||
|
@ -159,7 +159,7 @@ print_routes(Topic) ->
|
||||||
match_routes(Topic)
|
match_routes(Topic)
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec cleanup_routes(emqx_ds:session_id()) -> ok.
|
-spec cleanup_routes(emqx_persistent_session_ds:id()) -> ok.
|
||||||
cleanup_routes(DSSessionId) ->
|
cleanup_routes(DSSessionId) ->
|
||||||
%% NOTE
|
%% NOTE
|
||||||
%% No point in transaction here because all the operations on filters table are dirty.
|
%% No point in transaction here because all the operations on filters table are dirty.
|
||||||
|
|
|
@ -66,7 +66,7 @@ close_iterator(Nodes, IteratorID) ->
|
||||||
|
|
||||||
-spec close_all_iterators(
|
-spec close_all_iterators(
|
||||||
[node()],
|
[node()],
|
||||||
emqx_ds:session_id()
|
emqx_persistent_session_ds:id()
|
||||||
) ->
|
) ->
|
||||||
emqx_rpc:erpc_multicall(ok).
|
emqx_rpc:erpc_multicall(ok).
|
||||||
close_all_iterators(Nodes, DSSessionID) ->
|
close_all_iterators(Nodes, DSSessionID) ->
|
||||||
|
|
|
@ -24,17 +24,6 @@
|
||||||
-export([message_store/2, message_store/1, message_stats/0]).
|
-export([message_store/2, message_store/1, message_stats/0]).
|
||||||
%% Iterator:
|
%% Iterator:
|
||||||
-export([iterator_update/2, iterator_next/1, iterator_stats/0]).
|
-export([iterator_update/2, iterator_next/1, iterator_stats/0]).
|
||||||
%% Session:
|
|
||||||
-export([
|
|
||||||
session_open/1,
|
|
||||||
session_ensure_new/2,
|
|
||||||
session_drop/1,
|
|
||||||
session_suspend/1,
|
|
||||||
session_add_iterator/3,
|
|
||||||
session_get_iterator_id/2,
|
|
||||||
session_del_iterator/2,
|
|
||||||
session_stats/0
|
|
||||||
]).
|
|
||||||
|
|
||||||
%% internal exports:
|
%% internal exports:
|
||||||
-export([]).
|
-export([]).
|
||||||
|
@ -44,7 +33,6 @@
|
||||||
message_id/0,
|
message_id/0,
|
||||||
message_stats/0,
|
message_stats/0,
|
||||||
message_store_opts/0,
|
message_store_opts/0,
|
||||||
session_id/0,
|
|
||||||
replay/0,
|
replay/0,
|
||||||
replay_id/0,
|
replay_id/0,
|
||||||
iterator_id/0,
|
iterator_id/0,
|
||||||
|
@ -56,27 +44,10 @@
|
||||||
time/0
|
time/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include("emqx_ds_int.hrl").
|
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Type declarations
|
%% Type declarations
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
%% Session
|
|
||||||
%% See also: `#session{}`.
|
|
||||||
-type session() :: #{
|
|
||||||
id := emqx_ds:session_id(),
|
|
||||||
created_at := _Millisecond :: non_neg_integer(),
|
|
||||||
expires_at := _Millisecond :: non_neg_integer() | never,
|
|
||||||
props := map()
|
|
||||||
}.
|
|
||||||
|
|
||||||
-type iterators() :: #{topic_filter() => iterator()}.
|
|
||||||
|
|
||||||
%% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be
|
|
||||||
%% an atom, in theory (?).
|
|
||||||
-type session_id() :: binary().
|
|
||||||
|
|
||||||
-type iterator() :: term().
|
-type iterator() :: term().
|
||||||
|
|
||||||
-type iterator_id() :: binary().
|
-type iterator_id() :: binary().
|
||||||
|
@ -148,144 +119,6 @@ message_stats() ->
|
||||||
%% Session
|
%% Session
|
||||||
%%--------------------------------------------------------------------------------
|
%%--------------------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Called when a client connects. This function looks up a
|
|
||||||
%% session or returns `false` if previous one couldn't be found.
|
|
||||||
%%
|
|
||||||
%% This function also spawns replay agents for each iterator.
|
|
||||||
%%
|
|
||||||
%% Note: session API doesn't handle session takeovers, it's the job of
|
|
||||||
%% the broker.
|
|
||||||
-spec session_open(session_id()) ->
|
|
||||||
{ok, session(), iterators()} | false.
|
|
||||||
session_open(SessionId) ->
|
|
||||||
transaction(fun() ->
|
|
||||||
case mnesia:read(?SESSION_TAB, SessionId, write) of
|
|
||||||
[Record = #session{}] ->
|
|
||||||
Session = export_record(Record),
|
|
||||||
IteratorRefs = session_read_iterators(SessionId),
|
|
||||||
Iterators = export_iterators(IteratorRefs),
|
|
||||||
{ok, Session, Iterators};
|
|
||||||
[] ->
|
|
||||||
false
|
|
||||||
end
|
|
||||||
end).
|
|
||||||
|
|
||||||
-spec session_ensure_new(session_id(), _Props :: map()) ->
|
|
||||||
{ok, session(), iterators()}.
|
|
||||||
session_ensure_new(SessionId, Props) ->
|
|
||||||
transaction(fun() ->
|
|
||||||
ok = session_drop_iterators(SessionId),
|
|
||||||
Session = export_record(session_create(SessionId, Props)),
|
|
||||||
{ok, Session, #{}}
|
|
||||||
end).
|
|
||||||
|
|
||||||
session_create(SessionId, Props) ->
|
|
||||||
Session = #session{
|
|
||||||
id = SessionId,
|
|
||||||
created_at = erlang:system_time(millisecond),
|
|
||||||
expires_at = never,
|
|
||||||
props = Props
|
|
||||||
},
|
|
||||||
ok = mnesia:write(?SESSION_TAB, Session, write),
|
|
||||||
Session.
|
|
||||||
|
|
||||||
%% @doc Called when a client reconnects with `clean session=true' or
|
|
||||||
%% during session GC
|
|
||||||
-spec session_drop(session_id()) -> ok.
|
|
||||||
session_drop(DSSessionId) ->
|
|
||||||
transaction(fun() ->
|
|
||||||
%% TODO: ensure all iterators from this clientid are closed?
|
|
||||||
ok = session_drop_iterators(DSSessionId),
|
|
||||||
ok = mnesia:delete(?SESSION_TAB, DSSessionId, write)
|
|
||||||
end).
|
|
||||||
|
|
||||||
session_drop_iterators(DSSessionId) ->
|
|
||||||
IteratorRefs = session_read_iterators(DSSessionId),
|
|
||||||
ok = lists:foreach(fun session_del_iterator/1, IteratorRefs).
|
|
||||||
|
|
||||||
%% @doc Called when a client disconnects. This function terminates all
|
|
||||||
%% active processes related to the session.
|
|
||||||
-spec session_suspend(session_id()) -> ok | {error, session_not_found}.
|
|
||||||
session_suspend(_SessionId) ->
|
|
||||||
%% TODO
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% @doc Called when a client subscribes to a topic. Idempotent.
|
|
||||||
-spec session_add_iterator(session_id(), topic_filter(), _Props :: map()) ->
|
|
||||||
{ok, iterator(), _IsNew :: boolean()}.
|
|
||||||
session_add_iterator(DSSessionId, TopicFilter, Props) ->
|
|
||||||
IteratorRefId = {DSSessionId, TopicFilter},
|
|
||||||
transaction(fun() ->
|
|
||||||
case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of
|
|
||||||
[] ->
|
|
||||||
IteratorRef = session_insert_iterator(DSSessionId, TopicFilter, Props),
|
|
||||||
Iterator = export_record(IteratorRef),
|
|
||||||
?tp(
|
|
||||||
ds_session_subscription_added,
|
|
||||||
#{iterator => Iterator, session_id => DSSessionId}
|
|
||||||
),
|
|
||||||
{ok, Iterator, _IsNew = true};
|
|
||||||
[#iterator_ref{} = IteratorRef] ->
|
|
||||||
NIteratorRef = session_update_iterator(IteratorRef, Props),
|
|
||||||
NIterator = export_record(NIteratorRef),
|
|
||||||
?tp(
|
|
||||||
ds_session_subscription_present,
|
|
||||||
#{iterator => NIterator, session_id => DSSessionId}
|
|
||||||
),
|
|
||||||
{ok, NIterator, _IsNew = false}
|
|
||||||
end
|
|
||||||
end).
|
|
||||||
|
|
||||||
session_insert_iterator(DSSessionId, TopicFilter, Props) ->
|
|
||||||
{IteratorId, StartMS} = new_iterator_id(DSSessionId),
|
|
||||||
IteratorRef = #iterator_ref{
|
|
||||||
ref_id = {DSSessionId, TopicFilter},
|
|
||||||
it_id = IteratorId,
|
|
||||||
start_time = StartMS,
|
|
||||||
props = Props
|
|
||||||
},
|
|
||||||
ok = mnesia:write(?ITERATOR_REF_TAB, IteratorRef, write),
|
|
||||||
IteratorRef.
|
|
||||||
|
|
||||||
session_update_iterator(IteratorRef, Props) ->
|
|
||||||
NIteratorRef = IteratorRef#iterator_ref{props = Props},
|
|
||||||
ok = mnesia:write(?ITERATOR_REF_TAB, NIteratorRef, write),
|
|
||||||
NIteratorRef.
|
|
||||||
|
|
||||||
-spec session_get_iterator_id(session_id(), topic_filter()) ->
|
|
||||||
{ok, iterator_id()} | {error, not_found}.
|
|
||||||
session_get_iterator_id(DSSessionId, TopicFilter) ->
|
|
||||||
IteratorRefId = {DSSessionId, TopicFilter},
|
|
||||||
case mnesia:dirty_read(?ITERATOR_REF_TAB, IteratorRefId) of
|
|
||||||
[] ->
|
|
||||||
{error, not_found};
|
|
||||||
[#iterator_ref{it_id = IteratorId}] ->
|
|
||||||
{ok, IteratorId}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @doc Called when a client unsubscribes from a topic.
|
|
||||||
-spec session_del_iterator(session_id(), topic_filter()) -> ok.
|
|
||||||
session_del_iterator(DSSessionId, TopicFilter) ->
|
|
||||||
IteratorRefId = {DSSessionId, TopicFilter},
|
|
||||||
transaction(fun() ->
|
|
||||||
mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write)
|
|
||||||
end).
|
|
||||||
|
|
||||||
session_del_iterator(#iterator_ref{ref_id = IteratorRefId}) ->
|
|
||||||
mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write).
|
|
||||||
|
|
||||||
session_read_iterators(DSSessionId) ->
|
|
||||||
% NOTE: somewhat convoluted way to trick dialyzer
|
|
||||||
Pat = erlang:make_tuple(record_info(size, iterator_ref), '_', [
|
|
||||||
{1, iterator_ref},
|
|
||||||
{#iterator_ref.ref_id, {DSSessionId, '_'}}
|
|
||||||
]),
|
|
||||||
mnesia:match_object(?ITERATOR_REF_TAB, Pat, read).
|
|
||||||
|
|
||||||
-spec session_stats() -> #{}.
|
|
||||||
session_stats() ->
|
|
||||||
#{}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------------------
|
%%--------------------------------------------------------------------------------
|
||||||
%% Iterator (pull API)
|
%% Iterator (pull API)
|
||||||
%%--------------------------------------------------------------------------------
|
%%--------------------------------------------------------------------------------
|
||||||
|
@ -309,36 +142,3 @@ iterator_stats() ->
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec new_iterator_id(session_id()) -> {iterator_id(), time()}.
|
|
||||||
new_iterator_id(DSSessionId) ->
|
|
||||||
NowMS = erlang:system_time(microsecond),
|
|
||||||
IteratorId = <<DSSessionId/binary, (emqx_guid:gen())/binary>>,
|
|
||||||
{IteratorId, NowMS}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
transaction(Fun) ->
|
|
||||||
{atomic, Res} = mria:transaction(?DS_MRIA_SHARD, Fun),
|
|
||||||
Res.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
export_iterators(IteratorRefs) ->
|
|
||||||
lists:foldl(
|
|
||||||
fun(IteratorRef = #iterator_ref{ref_id = {_DSSessionId, TopicFilter}}, Acc) ->
|
|
||||||
Acc#{TopicFilter => export_record(IteratorRef)}
|
|
||||||
end,
|
|
||||||
#{},
|
|
||||||
IteratorRefs
|
|
||||||
).
|
|
||||||
|
|
||||||
export_record(#session{} = Record) ->
|
|
||||||
export_record(Record, #session.id, [id, created_at, expires_at, props], #{});
|
|
||||||
export_record(#iterator_ref{} = Record) ->
|
|
||||||
export_record(Record, #iterator_ref.it_id, [id, start_time, props], #{}).
|
|
||||||
|
|
||||||
export_record(Record, I, [Field | Rest], Acc) ->
|
|
||||||
export_record(Record, I + 1, Rest, Acc#{Field => element(I, Record)});
|
|
||||||
export_record(_, _, [], Acc) ->
|
|
||||||
Acc.
|
|
||||||
|
|
|
@ -4,44 +4,7 @@
|
||||||
|
|
||||||
-module(emqx_ds_app).
|
-module(emqx_ds_app).
|
||||||
|
|
||||||
-dialyzer({nowarn_function, storage/0}).
|
-export([start/2]).
|
||||||
|
|
||||||
-export([start/2, storage/0]).
|
|
||||||
|
|
||||||
-include("emqx_ds_int.hrl").
|
|
||||||
|
|
||||||
start(_Type, _Args) ->
|
start(_Type, _Args) ->
|
||||||
init_mnesia(),
|
|
||||||
emqx_ds_sup:start_link().
|
emqx_ds_sup:start_link().
|
||||||
|
|
||||||
init_mnesia() ->
|
|
||||||
ok = mria:create_table(
|
|
||||||
?SESSION_TAB,
|
|
||||||
[
|
|
||||||
{rlog_shard, ?DS_MRIA_SHARD},
|
|
||||||
{type, set},
|
|
||||||
{storage, storage()},
|
|
||||||
{record_name, session},
|
|
||||||
{attributes, record_info(fields, session)}
|
|
||||||
]
|
|
||||||
),
|
|
||||||
ok = mria:create_table(
|
|
||||||
?ITERATOR_REF_TAB,
|
|
||||||
[
|
|
||||||
{rlog_shard, ?DS_MRIA_SHARD},
|
|
||||||
{type, ordered_set},
|
|
||||||
{storage, storage()},
|
|
||||||
{record_name, iterator_ref},
|
|
||||||
{attributes, record_info(fields, iterator_ref)}
|
|
||||||
]
|
|
||||||
),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
storage() ->
|
|
||||||
%% FIXME: This is a temporary workaround to avoid crashes when starting on Windows
|
|
||||||
case mria:rocksdb_backend_available() of
|
|
||||||
true ->
|
|
||||||
rocksdb_copies;
|
|
||||||
_ ->
|
|
||||||
disc_copies
|
|
||||||
end.
|
|
||||||
|
|
|
@ -1,40 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
-ifndef(EMQX_DS_HRL).
|
|
||||||
-define(EMQX_DS_HRL, true).
|
|
||||||
|
|
||||||
-define(SESSION_TAB, emqx_ds_session).
|
|
||||||
-define(ITERATOR_REF_TAB, emqx_ds_iterator_ref).
|
|
||||||
-define(DS_MRIA_SHARD, emqx_ds_shard).
|
|
||||||
|
|
||||||
-record(session, {
|
|
||||||
%% same as clientid
|
|
||||||
id :: emqx_ds:session_id(),
|
|
||||||
%% creation time
|
|
||||||
created_at :: _Millisecond :: non_neg_integer(),
|
|
||||||
expires_at = never :: _Millisecond :: non_neg_integer() | never,
|
|
||||||
%% for future usage
|
|
||||||
props = #{} :: map()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-record(iterator_ref, {
|
|
||||||
ref_id :: {emqx_ds:session_id(), emqx_ds:topic_filter()},
|
|
||||||
it_id :: emqx_ds:iterator_id(),
|
|
||||||
start_time :: emqx_ds:time(),
|
|
||||||
props = #{} :: map()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-endif.
|
|
Loading…
Reference in New Issue