From e439a2e0f2513c005e3126c93a36a55b7db10d28 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 16 Apr 2024 12:05:48 +0200 Subject: [PATCH] fix(sessds): Save protocol name and version in the session metadata --- .../emqx_persistent_session_ds_SUITE.erl | 2 +- apps/emqx/src/emqx_persistent_session_ds.erl | 17 +++++++++++++---- apps/emqx/src/emqx_persistent_session_ds.hrl | 4 +++- .../src/emqx_persistent_session_ds_state.erl | 17 +++++++++++++++-- .../src/emqx_mgmt_api_clients.erl | 5 ++++- 5 files changed, 36 insertions(+), 9 deletions(-) diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 39764af30..ab062bff7 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -184,7 +184,7 @@ list_all_pubranges(Node) -> session_open(Node, ClientId) -> ClientInfo = #{}, - ConnInfo = #{peername => {undefined, undefined}}, + ConnInfo = #{peername => {undefined, undefined}, proto_name => <<"MQTT">>, proto_ver => 5}, WillMsg = undefined, erpc:call( Node, diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index d77372864..908e71bb5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -767,7 +767,12 @@ sync(ClientId) -> %% the broker. -spec session_open(id(), emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message())) -> session() | false. -session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) -> +session_open( + SessionId, + ClientInfo, + NewConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer}, + MaybeWillMsg +) -> NowMS = now_ms(), case emqx_persistent_session_ds_state:open(SessionId) of {ok, S0} -> @@ -787,7 +792,8 @@ session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) -> ), S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3), S5 = set_clientinfo(ClientInfo, S4), - S = emqx_persistent_session_ds_state:commit(S5), + S6 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S5), + S = emqx_persistent_session_ds_state:commit(S6), Inflight = emqx_persistent_session_ds_inflight:new( receive_maximum(NewConnInfo) ), @@ -810,7 +816,9 @@ session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) -> emqx_session:conf() ) -> session(). -session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> +session_ensure_new( + Id, ClientInfo, ConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer}, MaybeWillMsg, Conf +) -> ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}), Now = now_ms(), S0 = emqx_persistent_session_ds_state:create_new(Id), @@ -834,7 +842,8 @@ session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> ), S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4), S6 = set_clientinfo(ClientInfo, S5), - S = emqx_persistent_session_ds_state:commit(S6), + S7 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S6), + S = emqx_persistent_session_ds_state:commit(S7), #{ id => Id, props => Conf, diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index e2b52e36d..79920629a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -74,10 +74,12 @@ -define(created_at, created_at). -define(last_alive_at, last_alive_at). -define(expiry_interval, expiry_interval). -%% Unique integer used to create unique identities +%% Unique integer used to create unique identities: -define(last_id, last_id). +%% Connection info (relevent for the dashboard): -define(peername, peername). -define(will_message, will_message). -define(clientinfo, clientinfo). +-define(protocol, protocol). -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index bad8352c8..bc603647a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -33,6 +33,7 @@ -export([get_clientinfo/1, set_clientinfo/2]). -export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]). -export([get_peername/1, set_peername/2]). +-export([get_protocol/1, set_protocol/2]). -export([new_id/1]). -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]). -export([get_seqno/2, put_seqno/3]). @@ -66,7 +67,8 @@ seqno_type/0, stream_key/0, rank_key/0, - session_iterator/0 + session_iterator/0, + protocol/0 ]). -include("emqx_mqtt.hrl"). @@ -108,13 +110,16 @@ dirty :: #{K => dirty | del} }. +-type protocol() :: {binary(), emqx_types:proto_ver()}. + -type metadata() :: #{ ?created_at => emqx_persistent_session_ds:timestamp(), ?last_alive_at => emqx_persistent_session_ds:timestamp(), ?expiry_interval => non_neg_integer(), ?last_id => integer(), - ?peername => emqx_types:peername() + ?peername => emqx_types:peername(), + ?protocol => protocol() }. -type seqno_type() :: @@ -321,6 +326,14 @@ get_peername(Rec) -> set_peername(Val, Rec) -> set_meta(?peername, Val, Rec). +-spec get_protocol(t()) -> protocol() | undefined. +get_protocol(Rec) -> + get_meta(?protocol, Rec). + +-spec set_protocol(protocol(), t()) -> t(). +set_protocol(Val, Rec) -> + set_meta(?protocol, Val, Rec). + -spec get_clientinfo(t()) -> emqx_maybe:t(emqx_types:clientinfo()). get_clientinfo(Rec) -> get_meta(?clientinfo, Rec). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 4d08854af..301d4e47e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1747,6 +1747,7 @@ format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) -> format_persistent_session_info(ClientId, PSInfo0) -> Metadata = maps:get(metadata, PSInfo0, #{}), + {ProtoName, ProtoVer} = maps:get(protocol, Metadata), PSInfo1 = maps:with([created_at, expiry_interval], Metadata), CreatedAt = maps:get(created_at, PSInfo1), case Metadata of @@ -1765,7 +1766,9 @@ format_persistent_session_info(ClientId, PSInfo0) -> is_persistent => true, port => Port, heap_size => 0, - mqueue_len => 0 + mqueue_len => 0, + proto_name => ProtoName, + proto_ver => ProtoVer }, PSInfo = lists:foldl( fun result_format_time_fun/2,