chore(persistent_sessions): address review comments
This commit is contained in:
parent
64787f4ccd
commit
f39ccfb304
|
@ -89,7 +89,7 @@
|
||||||
|
|
||||||
-record(route, {
|
-record(route, {
|
||||||
topic :: binary(),
|
topic :: binary(),
|
||||||
dest :: node() | {binary(), node()} | binary()
|
dest :: node() | {binary(), node()} | emqx_session:sessionID()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -37,6 +37,9 @@
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
%% TODO: Maybe these should be configurable?
|
||||||
|
-define(MARKER_GRACE_PERIOD, 60000000).
|
||||||
|
-define(ABANDONED_GRACE_PERIOD, 300000000).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
|
@ -97,10 +100,6 @@ session_gc_timeout(_Ref, State) ->
|
||||||
session_gc_worker() ->
|
session_gc_worker() ->
|
||||||
ok = emqx_persistent_session:gc_session_messages(fun session_gc_worker/2).
|
ok = emqx_persistent_session:gc_session_messages(fun session_gc_worker/2).
|
||||||
|
|
||||||
%% TODO: Maybe these should be configurable?
|
|
||||||
-define(MARKER_GRACE_PERIOD, 60000000).
|
|
||||||
-define(ABANDONED_GRACE_PERIOD, 300000000).
|
|
||||||
|
|
||||||
session_gc_worker(delete, Key) ->
|
session_gc_worker(delete, Key) ->
|
||||||
emqx_persistent_session:delete_session_message(Key);
|
emqx_persistent_session:delete_session_message(Key);
|
||||||
session_gc_worker(marker, Key) ->
|
session_gc_worker(marker, Key) ->
|
||||||
|
|
|
@ -259,7 +259,7 @@ stats(Session) -> info(?STATS_KEYS, Session).
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
ignore_local(Delivers, Subscriber, Session) ->
|
ignore_local(Delivers, Subscriber, Session) ->
|
||||||
Subs = emqx_session:info(subscriptions, Session),
|
Subs = info(subscriptions, Session),
|
||||||
lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
|
lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
|
||||||
case maps:find(Topic, Subs) of
|
case maps:find(Topic, Subs) of
|
||||||
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
|
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
|
||||||
|
|
Loading…
Reference in New Issue