diff --git a/apps/emqx/integration_test/emqx_ds_SUITE.erl b/apps/emqx/integration_test/emqx_ds_SUITE.erl index b042aa87a..34c15b505 100644 --- a/apps/emqx/integration_test/emqx_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_ds_SUITE.erl @@ -246,7 +246,7 @@ t_session_subscription_idempotency(Config) -> ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), ?assertMatch( {ok, #{}, #{SubTopicFilterWords := #{}}}, - erpc:call(Node1, emqx_ds, session_open, [ClientId]) + erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId]) ) end ), diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 7146332fc..609b0139d 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -57,6 +57,7 @@ init() -> } ), ok = emqx_persistent_session_ds_router:init_tables(), + ok = emqx_persistent_session_ds:create_tables(), ok end). diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index e56a05484..e456211fc 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -55,6 +55,13 @@ terminate/2 ]). +%% session table operations +-export([create_tables/0]). + +-ifdef(TEST). +-export([session_open/1]). +-endif. + %% RPC -export([ ensure_iterator_closed_on_all_shards/1, @@ -71,8 +78,13 @@ -define(DEFAULT_KEYSPACE, default). -define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}). --type id() :: emqx_ds:session_id(). +%% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be +%% an atom, in theory (?). +-type id() :: binary(). -type iterator() :: emqx_ds:iterator(). +-type iterator_id() :: emqx_ds:iterator_id(). +-type topic_filter() :: emqx_ds:topic_filter(). +-type iterators() :: #{topic_filter() => iterator()}. -type session() :: #{ %% Client ID id := id(), @@ -92,6 +104,8 @@ -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). +-export_type([id/0]). + %% -spec create(clientinfo(), conninfo(), emqx_session:conf()) -> @@ -118,11 +132,11 @@ open(#{clientid := ClientID}, _ConnInfo) -> end. ensure_session(ClientID, Conf) -> - {ok, Session, #{}} = emqx_ds:session_ensure_new(ClientID, Conf), + {ok, Session, #{}} = session_ensure_new(ClientID, Conf), Session#{iterators => #{}}. open_session(ClientID) -> - case emqx_ds:session_open(ClientID) of + case session_open(ClientID) of {ok, Session, Iterators} -> Session#{iterators => prep_iterators(Iterators)}; false -> @@ -144,7 +158,7 @@ destroy(#{clientid := ClientID}) -> destroy_session(ClientID) -> _ = ensure_all_iterators_closed(ClientID), - emqx_ds:session_drop(ClientID). + session_drop(ClientID). %%-------------------------------------------------------------------- %% Info, Stats @@ -352,7 +366,7 @@ add_subscription(TopicFilterBin, SubOpts, DSSessionID) -> % and iterator information can be reconstructed from this table, if needed. ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID), TopicFilter = emqx_topic:words(TopicFilterBin), - {ok, Iterator, IsNew} = emqx_ds:session_add_iterator( + {ok, Iterator, IsNew} = session_add_iterator( DSSessionID, TopicFilter, SubOpts ), Ctx = #{iterator => Iterator, is_new => IsNew}, @@ -368,7 +382,7 @@ add_subscription(TopicFilterBin, SubOpts, DSSessionID) -> iterator(). update_subscription(TopicFilterBin, Iterator, SubOpts, DSSessionID) -> TopicFilter = emqx_topic:words(TopicFilterBin), - {ok, NIterator, false} = emqx_ds:session_add_iterator( + {ok, NIterator, false} = session_add_iterator( DSSessionID, TopicFilter, SubOpts ), ok = ?tp(persistent_session_ds_iterator_updated, #{iterator => Iterator}), @@ -415,7 +429,7 @@ del_subscription(TopicFilterBin, #{id := IteratorID}, DSSessionID) -> ?tp_span( persistent_session_ds_iterator_delete, Ctx, - emqx_ds:session_del_iterator(DSSessionID, TopicFilter) + session_del_iterator(DSSessionID, TopicFilter) ), ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionID). @@ -448,3 +462,210 @@ ensure_all_iterators_closed(DSSessionID) -> do_ensure_all_iterators_closed(DSSessionID) -> ok = emqx_ds_storage_layer:discard_iterator_prefix(?DS_SHARD, DSSessionID), ok. + +%%-------------------------------------------------------------------- +%% Session tables operations +%%-------------------------------------------------------------------- + +-define(SESSION_TAB, emqx_ds_session). +-define(ITERATOR_REF_TAB, emqx_ds_iterator_ref). +-define(DS_MRIA_SHARD, emqx_ds_shard). + +-record(session, { + %% same as clientid + id :: id(), + %% creation time + created_at :: _Millisecond :: non_neg_integer(), + expires_at = never :: _Millisecond :: non_neg_integer() | never, + %% for future usage + props = #{} :: map() +}). + +-record(iterator_ref, { + ref_id :: {id(), emqx_ds:topic_filter()}, + it_id :: emqx_ds:iterator_id(), + start_time :: emqx_ds:time(), + props = #{} :: map() +}). + +create_tables() -> + ok = mria:create_table( + ?SESSION_TAB, + [ + {rlog_shard, ?DS_MRIA_SHARD}, + {type, set}, + {storage, storage()}, + {record_name, session}, + {attributes, record_info(fields, session)} + ] + ), + ok = mria:create_table( + ?ITERATOR_REF_TAB, + [ + {rlog_shard, ?DS_MRIA_SHARD}, + {type, ordered_set}, + {storage, storage()}, + {record_name, iterator_ref}, + {attributes, record_info(fields, iterator_ref)} + ] + ), + ok. + +-dialyzer({nowarn_function, storage/0}). +storage() -> + %% FIXME: This is a temporary workaround to avoid crashes when starting on Windows + case mria:rocksdb_backend_available() of + true -> + rocksdb_copies; + _ -> + disc_copies + end. + +%% @doc Called when a client connects. This function looks up a +%% session or returns `false` if previous one couldn't be found. +%% +%% This function also spawns replay agents for each iterator. +%% +%% Note: session API doesn't handle session takeovers, it's the job of +%% the broker. +-spec session_open(id()) -> + {ok, session(), iterators()} | false. +session_open(SessionId) -> + transaction(fun() -> + case mnesia:read(?SESSION_TAB, SessionId, write) of + [Record = #session{}] -> + Session = export_record(Record), + IteratorRefs = session_read_iterators(SessionId), + Iterators = export_iterators(IteratorRefs), + {ok, Session, Iterators}; + [] -> + false + end + end). + +-spec session_ensure_new(id(), _Props :: map()) -> + {ok, session(), iterators()}. +session_ensure_new(SessionId, Props) -> + transaction(fun() -> + ok = session_drop_iterators(SessionId), + Session = export_record(session_create(SessionId, Props)), + {ok, Session, #{}} + end). + +session_create(SessionId, Props) -> + Session = #session{ + id = SessionId, + created_at = erlang:system_time(millisecond), + expires_at = never, + props = Props + }, + ok = mnesia:write(?SESSION_TAB, Session, write), + Session. + +%% @doc Called when a client reconnects with `clean session=true' or +%% during session GC +-spec session_drop(id()) -> ok. +session_drop(DSSessionId) -> + transaction(fun() -> + %% TODO: ensure all iterators from this clientid are closed? + ok = session_drop_iterators(DSSessionId), + ok = mnesia:delete(?SESSION_TAB, DSSessionId, write) + end). + +session_drop_iterators(DSSessionId) -> + IteratorRefs = session_read_iterators(DSSessionId), + ok = lists:foreach(fun session_del_iterator/1, IteratorRefs). + +%% @doc Called when a client subscribes to a topic. Idempotent. +-spec session_add_iterator(id(), topic_filter(), _Props :: map()) -> + {ok, iterator(), _IsNew :: boolean()}. +session_add_iterator(DSSessionId, TopicFilter, Props) -> + IteratorRefId = {DSSessionId, TopicFilter}, + transaction(fun() -> + case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of + [] -> + IteratorRef = session_insert_iterator(DSSessionId, TopicFilter, Props), + Iterator = export_record(IteratorRef), + ?tp( + ds_session_subscription_added, + #{iterator => Iterator, session_id => DSSessionId} + ), + {ok, Iterator, _IsNew = true}; + [#iterator_ref{} = IteratorRef] -> + NIteratorRef = session_update_iterator(IteratorRef, Props), + NIterator = export_record(NIteratorRef), + ?tp( + ds_session_subscription_present, + #{iterator => NIterator, session_id => DSSessionId} + ), + {ok, NIterator, _IsNew = false} + end + end). + +session_insert_iterator(DSSessionId, TopicFilter, Props) -> + {IteratorId, StartMS} = new_iterator_id(DSSessionId), + IteratorRef = #iterator_ref{ + ref_id = {DSSessionId, TopicFilter}, + it_id = IteratorId, + start_time = StartMS, + props = Props + }, + ok = mnesia:write(?ITERATOR_REF_TAB, IteratorRef, write), + IteratorRef. + +session_update_iterator(IteratorRef, Props) -> + NIteratorRef = IteratorRef#iterator_ref{props = Props}, + ok = mnesia:write(?ITERATOR_REF_TAB, NIteratorRef, write), + NIteratorRef. + +%% @doc Called when a client unsubscribes from a topic. +-spec session_del_iterator(id(), topic_filter()) -> ok. +session_del_iterator(DSSessionId, TopicFilter) -> + IteratorRefId = {DSSessionId, TopicFilter}, + transaction(fun() -> + mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write) + end). + +session_del_iterator(#iterator_ref{ref_id = IteratorRefId}) -> + mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write). + +session_read_iterators(DSSessionId) -> + % NOTE: somewhat convoluted way to trick dialyzer + Pat = erlang:make_tuple(record_info(size, iterator_ref), '_', [ + {1, iterator_ref}, + {#iterator_ref.ref_id, {DSSessionId, '_'}} + ]), + mnesia:match_object(?ITERATOR_REF_TAB, Pat, read). + +-spec new_iterator_id(id()) -> {iterator_id(), emqx_ds:time()}. +new_iterator_id(DSSessionId) -> + NowMS = erlang:system_time(microsecond), + IteratorId = <>, + {IteratorId, NowMS}. + +%%-------------------------------------------------------------------------------- + +transaction(Fun) -> + {atomic, Res} = mria:transaction(?DS_MRIA_SHARD, Fun), + Res. + +%%-------------------------------------------------------------------------------- + +export_iterators(IteratorRefs) -> + lists:foldl( + fun(IteratorRef = #iterator_ref{ref_id = {_DSSessionId, TopicFilter}}, Acc) -> + Acc#{TopicFilter => export_record(IteratorRef)} + end, + #{}, + IteratorRefs + ). + +export_record(#session{} = Record) -> + export_record(Record, #session.id, [id, created_at, expires_at, props], #{}); +export_record(#iterator_ref{} = Record) -> + export_record(Record, #iterator_ref.it_id, [id, start_time, props], #{}). + +export_record(Record, I, [Field | Rest], Acc) -> + export_record(Record, I + 1, Rest, Acc#{Field => element(I, Record)}); +export_record(_, _, [], Acc) -> + Acc. diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl b/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl index 7815e25f5..30820b126 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl @@ -21,7 +21,7 @@ -record(ps_route, { topic :: binary(), - dest :: emqx_ds:session_id() + dest :: emqx_persistent_session_ds:id() }). -record(ps_routeidx, { entry :: emqx_topic_index:key(emqx_persistent_session_ds_router:dest()), diff --git a/apps/emqx/src/emqx_persistent_session_ds_router.erl b/apps/emqx/src/emqx_persistent_session_ds_router.erl index 324db991a..cc6332f1b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_router.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_router.erl @@ -40,7 +40,7 @@ -export([has_route/2]). -endif. --type dest() :: emqx_ds:session_id(). +-type dest() :: emqx_persistent_session_ds:id(). -export_type([dest/0]). @@ -159,7 +159,7 @@ print_routes(Topic) -> match_routes(Topic) ). --spec cleanup_routes(emqx_ds:session_id()) -> ok. +-spec cleanup_routes(emqx_persistent_session_ds:id()) -> ok. cleanup_routes(DSSessionId) -> %% NOTE %% No point in transaction here because all the operations on filters table are dirty. diff --git a/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl b/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl index 47c9ed541..d35ccd963 100644 --- a/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl @@ -66,7 +66,7 @@ close_iterator(Nodes, IteratorID) -> -spec close_all_iterators( [node()], - emqx_ds:session_id() + emqx_persistent_session_ds:id() ) -> emqx_rpc:erpc_multicall(ok). close_all_iterators(Nodes, DSSessionID) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index b311d2550..feaa37bc0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -24,17 +24,6 @@ -export([message_store/2, message_store/1, message_stats/0]). %% Iterator: -export([iterator_update/2, iterator_next/1, iterator_stats/0]). -%% Session: --export([ - session_open/1, - session_ensure_new/2, - session_drop/1, - session_suspend/1, - session_add_iterator/3, - session_get_iterator_id/2, - session_del_iterator/2, - session_stats/0 -]). %% internal exports: -export([]). @@ -44,7 +33,6 @@ message_id/0, message_stats/0, message_store_opts/0, - session_id/0, replay/0, replay_id/0, iterator_id/0, @@ -56,27 +44,10 @@ time/0 ]). --include("emqx_ds_int.hrl"). - %%================================================================================ %% Type declarations %%================================================================================ -%% Session -%% See also: `#session{}`. --type session() :: #{ - id := emqx_ds:session_id(), - created_at := _Millisecond :: non_neg_integer(), - expires_at := _Millisecond :: non_neg_integer() | never, - props := map() -}. - --type iterators() :: #{topic_filter() => iterator()}. - -%% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be -%% an atom, in theory (?). --type session_id() :: binary(). - -type iterator() :: term(). -type iterator_id() :: binary(). @@ -148,144 +119,6 @@ message_stats() -> %% Session %%-------------------------------------------------------------------------------- -%% @doc Called when a client connects. This function looks up a -%% session or returns `false` if previous one couldn't be found. -%% -%% This function also spawns replay agents for each iterator. -%% -%% Note: session API doesn't handle session takeovers, it's the job of -%% the broker. --spec session_open(session_id()) -> - {ok, session(), iterators()} | false. -session_open(SessionId) -> - transaction(fun() -> - case mnesia:read(?SESSION_TAB, SessionId, write) of - [Record = #session{}] -> - Session = export_record(Record), - IteratorRefs = session_read_iterators(SessionId), - Iterators = export_iterators(IteratorRefs), - {ok, Session, Iterators}; - [] -> - false - end - end). - --spec session_ensure_new(session_id(), _Props :: map()) -> - {ok, session(), iterators()}. -session_ensure_new(SessionId, Props) -> - transaction(fun() -> - ok = session_drop_iterators(SessionId), - Session = export_record(session_create(SessionId, Props)), - {ok, Session, #{}} - end). - -session_create(SessionId, Props) -> - Session = #session{ - id = SessionId, - created_at = erlang:system_time(millisecond), - expires_at = never, - props = Props - }, - ok = mnesia:write(?SESSION_TAB, Session, write), - Session. - -%% @doc Called when a client reconnects with `clean session=true' or -%% during session GC --spec session_drop(session_id()) -> ok. -session_drop(DSSessionId) -> - transaction(fun() -> - %% TODO: ensure all iterators from this clientid are closed? - ok = session_drop_iterators(DSSessionId), - ok = mnesia:delete(?SESSION_TAB, DSSessionId, write) - end). - -session_drop_iterators(DSSessionId) -> - IteratorRefs = session_read_iterators(DSSessionId), - ok = lists:foreach(fun session_del_iterator/1, IteratorRefs). - -%% @doc Called when a client disconnects. This function terminates all -%% active processes related to the session. --spec session_suspend(session_id()) -> ok | {error, session_not_found}. -session_suspend(_SessionId) -> - %% TODO - ok. - -%% @doc Called when a client subscribes to a topic. Idempotent. --spec session_add_iterator(session_id(), topic_filter(), _Props :: map()) -> - {ok, iterator(), _IsNew :: boolean()}. -session_add_iterator(DSSessionId, TopicFilter, Props) -> - IteratorRefId = {DSSessionId, TopicFilter}, - transaction(fun() -> - case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of - [] -> - IteratorRef = session_insert_iterator(DSSessionId, TopicFilter, Props), - Iterator = export_record(IteratorRef), - ?tp( - ds_session_subscription_added, - #{iterator => Iterator, session_id => DSSessionId} - ), - {ok, Iterator, _IsNew = true}; - [#iterator_ref{} = IteratorRef] -> - NIteratorRef = session_update_iterator(IteratorRef, Props), - NIterator = export_record(NIteratorRef), - ?tp( - ds_session_subscription_present, - #{iterator => NIterator, session_id => DSSessionId} - ), - {ok, NIterator, _IsNew = false} - end - end). - -session_insert_iterator(DSSessionId, TopicFilter, Props) -> - {IteratorId, StartMS} = new_iterator_id(DSSessionId), - IteratorRef = #iterator_ref{ - ref_id = {DSSessionId, TopicFilter}, - it_id = IteratorId, - start_time = StartMS, - props = Props - }, - ok = mnesia:write(?ITERATOR_REF_TAB, IteratorRef, write), - IteratorRef. - -session_update_iterator(IteratorRef, Props) -> - NIteratorRef = IteratorRef#iterator_ref{props = Props}, - ok = mnesia:write(?ITERATOR_REF_TAB, NIteratorRef, write), - NIteratorRef. - --spec session_get_iterator_id(session_id(), topic_filter()) -> - {ok, iterator_id()} | {error, not_found}. -session_get_iterator_id(DSSessionId, TopicFilter) -> - IteratorRefId = {DSSessionId, TopicFilter}, - case mnesia:dirty_read(?ITERATOR_REF_TAB, IteratorRefId) of - [] -> - {error, not_found}; - [#iterator_ref{it_id = IteratorId}] -> - {ok, IteratorId} - end. - -%% @doc Called when a client unsubscribes from a topic. --spec session_del_iterator(session_id(), topic_filter()) -> ok. -session_del_iterator(DSSessionId, TopicFilter) -> - IteratorRefId = {DSSessionId, TopicFilter}, - transaction(fun() -> - mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write) - end). - -session_del_iterator(#iterator_ref{ref_id = IteratorRefId}) -> - mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write). - -session_read_iterators(DSSessionId) -> - % NOTE: somewhat convoluted way to trick dialyzer - Pat = erlang:make_tuple(record_info(size, iterator_ref), '_', [ - {1, iterator_ref}, - {#iterator_ref.ref_id, {DSSessionId, '_'}} - ]), - mnesia:match_object(?ITERATOR_REF_TAB, Pat, read). - --spec session_stats() -> #{}. -session_stats() -> - #{}. - %%-------------------------------------------------------------------------------- %% Iterator (pull API) %%-------------------------------------------------------------------------------- @@ -309,36 +142,3 @@ iterator_stats() -> %%================================================================================ %% Internal functions %%================================================================================ - --spec new_iterator_id(session_id()) -> {iterator_id(), time()}. -new_iterator_id(DSSessionId) -> - NowMS = erlang:system_time(microsecond), - IteratorId = <>, - {IteratorId, NowMS}. - -%%-------------------------------------------------------------------------------- - -transaction(Fun) -> - {atomic, Res} = mria:transaction(?DS_MRIA_SHARD, Fun), - Res. - -%%-------------------------------------------------------------------------------- - -export_iterators(IteratorRefs) -> - lists:foldl( - fun(IteratorRef = #iterator_ref{ref_id = {_DSSessionId, TopicFilter}}, Acc) -> - Acc#{TopicFilter => export_record(IteratorRef)} - end, - #{}, - IteratorRefs - ). - -export_record(#session{} = Record) -> - export_record(Record, #session.id, [id, created_at, expires_at, props], #{}); -export_record(#iterator_ref{} = Record) -> - export_record(Record, #iterator_ref.it_id, [id, start_time, props], #{}). - -export_record(Record, I, [Field | Rest], Acc) -> - export_record(Record, I + 1, Rest, Acc#{Field => element(I, Record)}); -export_record(_, _, [], Acc) -> - Acc. diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index 09856df3c..858855b6f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -4,44 +4,7 @@ -module(emqx_ds_app). --dialyzer({nowarn_function, storage/0}). - --export([start/2, storage/0]). - --include("emqx_ds_int.hrl"). +-export([start/2]). start(_Type, _Args) -> - init_mnesia(), emqx_ds_sup:start_link(). - -init_mnesia() -> - ok = mria:create_table( - ?SESSION_TAB, - [ - {rlog_shard, ?DS_MRIA_SHARD}, - {type, set}, - {storage, storage()}, - {record_name, session}, - {attributes, record_info(fields, session)} - ] - ), - ok = mria:create_table( - ?ITERATOR_REF_TAB, - [ - {rlog_shard, ?DS_MRIA_SHARD}, - {type, ordered_set}, - {storage, storage()}, - {record_name, iterator_ref}, - {attributes, record_info(fields, iterator_ref)} - ] - ), - ok. - -storage() -> - %% FIXME: This is a temporary workaround to avoid crashes when starting on Windows - case mria:rocksdb_backend_available() of - true -> - rocksdb_copies; - _ -> - disc_copies - end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_int.hrl b/apps/emqx_durable_storage/src/emqx_ds_int.hrl deleted file mode 100644 index 162d14b83..000000000 --- a/apps/emqx_durable_storage/src/emqx_ds_int.hrl +++ /dev/null @@ -1,40 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --ifndef(EMQX_DS_HRL). --define(EMQX_DS_HRL, true). - --define(SESSION_TAB, emqx_ds_session). --define(ITERATOR_REF_TAB, emqx_ds_iterator_ref). --define(DS_MRIA_SHARD, emqx_ds_shard). - --record(session, { - %% same as clientid - id :: emqx_ds:session_id(), - %% creation time - created_at :: _Millisecond :: non_neg_integer(), - expires_at = never :: _Millisecond :: non_neg_integer() | never, - %% for future usage - props = #{} :: map() -}). - --record(iterator_ref, { - ref_id :: {emqx_ds:session_id(), emqx_ds:topic_filter()}, - it_id :: emqx_ds:iterator_id(), - start_time :: emqx_ds:time(), - props = #{} :: map() -}). - --endif.