diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index c0631e7ab..99bb05010 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -236,8 +236,9 @@ t_session_subscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), + ConnInfo = #{peername => {undefined, undefined}}, Session = erpc:call( - Node1, emqx_persistent_session_ds, session_open, [ClientId, _ConnInfo = #{}] + Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo] ), ?assertMatch( #{SubTopicFilter := #{}}, @@ -312,8 +313,9 @@ t_session_unsubscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), + ConnInfo = #{peername => {undefined, undefined}}, Session = erpc:call( - Node1, emqx_persistent_session_ds, session_open, [ClientId, _ConnInfo = #{}] + Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo] ), ?assertEqual( #{}, diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 2cbf65b47..bb3c78b48 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -633,7 +633,10 @@ session_open(SessionId, NewConnInfo) -> %% New connection being established S1 = emqx_persistent_session_ds_state:set_expiry_interval(EI, S0), S2 = emqx_persistent_session_ds_state:set_last_alive_at(NowMS, S1), - S = emqx_persistent_session_ds_state:commit(S2), + S3 = emqx_persistent_session_ds_state:set_peername( + maps:get(peername, NewConnInfo), S2 + ), + S = emqx_persistent_session_ds_state:commit(S3), Inflight = emqx_persistent_session_ds_inflight:new( receive_maximum(NewConnInfo) ), diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 8a24be31e..17e5d31e4 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -74,5 +74,6 @@ -define(expiry_interval, expiry_interval). %% Unique integer used to create unique identities -define(last_id, last_id). +-define(peername, peername). -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index bea8f25e3..167709d8a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -30,6 +30,7 @@ -export([get_created_at/1, set_created_at/2]). -export([get_last_alive_at/1, set_last_alive_at/2]). -export([get_expiry_interval/1, set_expiry_interval/2]). +-export([get_peername/1, set_peername/2]). -export([new_id/1]). -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). -export([get_seqno/2, put_seqno/3]). @@ -92,7 +93,8 @@ ?created_at => emqx_persistent_session_ds:timestamp(), ?last_alive_at => emqx_persistent_session_ds:timestamp(), ?expiry_interval => non_neg_integer(), - ?last_id => integer() + ?last_id => integer(), + ?peername => emqx_types:peername() }. -type seqno_type() :: @@ -278,6 +280,14 @@ get_expiry_interval(Rec) -> set_expiry_interval(Val, Rec) -> set_meta(?expiry_interval, Val, Rec). +-spec get_peername(t()) -> emqx_types:peername() | undefined. +get_peername(Rec) -> + get_meta(?peername, Rec). + +-spec set_peername(emqx_types:peername(), t()) -> t(). +set_peername(Val, Rec) -> + set_meta(?peername, Val, Rec). + -spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}. new_id(Rec) -> LastId = diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index bc07d38bf..e661a8360 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1181,14 +1181,21 @@ format_persistent_session_info(ClientId, PSInfo0) -> Metadata = maps:get(metadata, PSInfo0, #{}), PSInfo1 = maps:with([created_at, expiry_interval], Metadata), CreatedAt = maps:get(created_at, PSInfo1), + case Metadata of + #{peername := PeerName} -> + {IpAddress, Port} = peername_dispart(PeerName); + _ -> + IpAddress = undefined, + Port = undefined + end, PSInfo2 = convert_expiry_interval_unit(PSInfo1), PSInfo3 = PSInfo2#{ clientid => ClientId, connected => false, connected_at => CreatedAt, - ip_address => undefined, + ip_address => IpAddress, is_persistent => true, - port => undefined + port => Port }, PSInfo = lists:foldl( fun result_format_time_fun/2, diff --git a/changes/ce/fix-12707.en.md b/changes/ce/fix-12707.en.md new file mode 100644 index 000000000..032cdb2d6 --- /dev/null +++ b/changes/ce/fix-12707.en.md @@ -0,0 +1 @@ +Keep IP and port of the durable client sessions in the database.