Merge pull request #12707 from ieQu1/dev/store-ds-session-ip
Store peer name in the durable session metadata.
This commit is contained in:
commit
c22735b3f5
|
@ -236,8 +236,9 @@ t_session_subscription_idempotency(Config) ->
|
||||||
end,
|
end,
|
||||||
fun(Trace) ->
|
fun(Trace) ->
|
||||||
ct:pal("trace:\n ~p", [Trace]),
|
ct:pal("trace:\n ~p", [Trace]),
|
||||||
|
ConnInfo = #{peername => {undefined, undefined}},
|
||||||
Session = erpc:call(
|
Session = erpc:call(
|
||||||
Node1, emqx_persistent_session_ds, session_open, [ClientId, _ConnInfo = #{}]
|
Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]
|
||||||
),
|
),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{SubTopicFilter := #{}},
|
#{SubTopicFilter := #{}},
|
||||||
|
@ -312,8 +313,9 @@ t_session_unsubscription_idempotency(Config) ->
|
||||||
end,
|
end,
|
||||||
fun(Trace) ->
|
fun(Trace) ->
|
||||||
ct:pal("trace:\n ~p", [Trace]),
|
ct:pal("trace:\n ~p", [Trace]),
|
||||||
|
ConnInfo = #{peername => {undefined, undefined}},
|
||||||
Session = erpc:call(
|
Session = erpc:call(
|
||||||
Node1, emqx_persistent_session_ds, session_open, [ClientId, _ConnInfo = #{}]
|
Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
#{},
|
#{},
|
||||||
|
|
|
@ -633,7 +633,10 @@ session_open(SessionId, NewConnInfo) ->
|
||||||
%% New connection being established
|
%% New connection being established
|
||||||
S1 = emqx_persistent_session_ds_state:set_expiry_interval(EI, S0),
|
S1 = emqx_persistent_session_ds_state:set_expiry_interval(EI, S0),
|
||||||
S2 = emqx_persistent_session_ds_state:set_last_alive_at(NowMS, S1),
|
S2 = emqx_persistent_session_ds_state:set_last_alive_at(NowMS, S1),
|
||||||
S = emqx_persistent_session_ds_state:commit(S2),
|
S3 = emqx_persistent_session_ds_state:set_peername(
|
||||||
|
maps:get(peername, NewConnInfo), S2
|
||||||
|
),
|
||||||
|
S = emqx_persistent_session_ds_state:commit(S3),
|
||||||
Inflight = emqx_persistent_session_ds_inflight:new(
|
Inflight = emqx_persistent_session_ds_inflight:new(
|
||||||
receive_maximum(NewConnInfo)
|
receive_maximum(NewConnInfo)
|
||||||
),
|
),
|
||||||
|
|
|
@ -74,5 +74,6 @@
|
||||||
-define(expiry_interval, expiry_interval).
|
-define(expiry_interval, expiry_interval).
|
||||||
%% Unique integer used to create unique identities
|
%% Unique integer used to create unique identities
|
||||||
-define(last_id, last_id).
|
-define(last_id, last_id).
|
||||||
|
-define(peername, peername).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -30,6 +30,7 @@
|
||||||
-export([get_created_at/1, set_created_at/2]).
|
-export([get_created_at/1, set_created_at/2]).
|
||||||
-export([get_last_alive_at/1, set_last_alive_at/2]).
|
-export([get_last_alive_at/1, set_last_alive_at/2]).
|
||||||
-export([get_expiry_interval/1, set_expiry_interval/2]).
|
-export([get_expiry_interval/1, set_expiry_interval/2]).
|
||||||
|
-export([get_peername/1, set_peername/2]).
|
||||||
-export([new_id/1]).
|
-export([new_id/1]).
|
||||||
-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]).
|
-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]).
|
||||||
-export([get_seqno/2, put_seqno/3]).
|
-export([get_seqno/2, put_seqno/3]).
|
||||||
|
@ -92,7 +93,8 @@
|
||||||
?created_at => emqx_persistent_session_ds:timestamp(),
|
?created_at => emqx_persistent_session_ds:timestamp(),
|
||||||
?last_alive_at => emqx_persistent_session_ds:timestamp(),
|
?last_alive_at => emqx_persistent_session_ds:timestamp(),
|
||||||
?expiry_interval => non_neg_integer(),
|
?expiry_interval => non_neg_integer(),
|
||||||
?last_id => integer()
|
?last_id => integer(),
|
||||||
|
?peername => emqx_types:peername()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-type seqno_type() ::
|
-type seqno_type() ::
|
||||||
|
@ -278,6 +280,14 @@ get_expiry_interval(Rec) ->
|
||||||
set_expiry_interval(Val, Rec) ->
|
set_expiry_interval(Val, Rec) ->
|
||||||
set_meta(?expiry_interval, Val, Rec).
|
set_meta(?expiry_interval, Val, Rec).
|
||||||
|
|
||||||
|
-spec get_peername(t()) -> emqx_types:peername() | undefined.
|
||||||
|
get_peername(Rec) ->
|
||||||
|
get_meta(?peername, Rec).
|
||||||
|
|
||||||
|
-spec set_peername(emqx_types:peername(), t()) -> t().
|
||||||
|
set_peername(Val, Rec) ->
|
||||||
|
set_meta(?peername, Val, Rec).
|
||||||
|
|
||||||
-spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
|
-spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
|
||||||
new_id(Rec) ->
|
new_id(Rec) ->
|
||||||
LastId =
|
LastId =
|
||||||
|
|
|
@ -1181,14 +1181,21 @@ format_persistent_session_info(ClientId, PSInfo0) ->
|
||||||
Metadata = maps:get(metadata, PSInfo0, #{}),
|
Metadata = maps:get(metadata, PSInfo0, #{}),
|
||||||
PSInfo1 = maps:with([created_at, expiry_interval], Metadata),
|
PSInfo1 = maps:with([created_at, expiry_interval], Metadata),
|
||||||
CreatedAt = maps:get(created_at, PSInfo1),
|
CreatedAt = maps:get(created_at, PSInfo1),
|
||||||
|
case Metadata of
|
||||||
|
#{peername := PeerName} ->
|
||||||
|
{IpAddress, Port} = peername_dispart(PeerName);
|
||||||
|
_ ->
|
||||||
|
IpAddress = undefined,
|
||||||
|
Port = undefined
|
||||||
|
end,
|
||||||
PSInfo2 = convert_expiry_interval_unit(PSInfo1),
|
PSInfo2 = convert_expiry_interval_unit(PSInfo1),
|
||||||
PSInfo3 = PSInfo2#{
|
PSInfo3 = PSInfo2#{
|
||||||
clientid => ClientId,
|
clientid => ClientId,
|
||||||
connected => false,
|
connected => false,
|
||||||
connected_at => CreatedAt,
|
connected_at => CreatedAt,
|
||||||
ip_address => undefined,
|
ip_address => IpAddress,
|
||||||
is_persistent => true,
|
is_persistent => true,
|
||||||
port => undefined
|
port => Port
|
||||||
},
|
},
|
||||||
PSInfo = lists:foldl(
|
PSInfo = lists:foldl(
|
||||||
fun result_format_time_fun/2,
|
fun result_format_time_fun/2,
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Keep IP and port of the durable client sessions in the database.
|
Loading…
Reference in New Issue