From 0c7b221c4265c55e48aeda63f9a12b54698f6f15 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 7 May 2024 13:00:50 -0300 Subject: [PATCH] fix(clients mgmt api): set `durable` flag for DS sessions and subscriptions --- apps/emqx/src/emqx_persistent_session_ds.erl | 2 +- .../src/emqx_mgmt_api_clients.erl | 2 + .../src/emqx_mgmt_api_subscriptions.erl | 19 +++-- .../test/emqx_mgmt_api_clients_SUITE.erl | 72 ++++++++++++++++++- 4 files changed, 87 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 0cdb700af..28c370ba9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -703,7 +703,7 @@ list_client_subscriptions(ClientId) -> maps:fold( fun(Topic, #{current_state := CS}, Acc) -> #{subopts := SubOpts} = maps:get(CS, SStates), - Elem = {Topic, SubOpts}, + Elem = {Topic, SubOpts#{durable => true}}, [Elem | Acc] end, [], diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 8ef4ab98b..a32397bb1 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1753,6 +1753,7 @@ format_persistent_session_info( }), Info0#{ connected => false, + durable => true, is_persistent => true, subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{})) }; @@ -1773,6 +1774,7 @@ format_persistent_session_info(ClientId, PSInfo0) -> clientid => ClientId, connected => false, connected_at => CreatedAt, + durable => true, ip_address => IpAddress, is_persistent => true, port => Port, diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index b1a8fbce2..cc0e96ed6 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -171,21 +171,32 @@ subscriptions(get, #{query_string := QString}) -> {200, Result} end. -format(WhichNode, {{Topic, _Subscriber}, SubOpts}) -> +format(WhichNode, {{Topic, Subscriber}, SubOpts}) -> + FallbackClientId = + case is_binary(Subscriber) of + true -> + Subscriber; + false -> + %% e.g.: could be a pid... + null + end, maps:merge( #{ topic => emqx_topic:maybe_format_share(Topic), - clientid => maps:get(subid, SubOpts, null), - node => WhichNode, + clientid => maps:get(subid, SubOpts, FallbackClientId), + node => convert_null(WhichNode), durable => false }, - maps:with([qos, nl, rap, rh], SubOpts) + maps:with([qos, nl, rap, rh, durable], SubOpts) ). %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- +convert_null(undefined) -> null; +convert_null(Val) -> Val. + check_match_topic(#{<<"match_topic">> := MatchTopic}) -> try emqx_topic:parse(MatchTopic) of {#share{}, _} -> {error, invalid_match_topic}; diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 481129389..39d775a7a 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -49,6 +49,7 @@ persistent_session_testcases() -> t_persistent_sessions3, t_persistent_sessions4, t_persistent_sessions5, + t_persistent_sessions_subscriptions1, t_list_clients_v2 ]. client_msgs_testcases() -> @@ -333,7 +334,7 @@ t_persistent_sessions2(Config) -> %% 2) Client connects to the same node and takes over, listed only once. C2 = connect_client(#{port => Port1, clientid => ClientId}), assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), - ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}), + disconnect_and_destroy_session(C2), ?retry( 100, 20, @@ -377,7 +378,7 @@ t_persistent_sessions3(Config) -> list_request(APIPort, "node=" ++ atom_to_list(N1)) ) ), - ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}) + disconnect_and_destroy_session(C2) end, [] ), @@ -417,7 +418,7 @@ t_persistent_sessions4(Config) -> list_request(APIPort, "node=" ++ atom_to_list(N1)) ) ), - ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}) + disconnect_and_destroy_session(C2) end, [] ), @@ -552,6 +553,63 @@ t_persistent_sessions5(Config) -> ), ok. +%% Check that the output of `/clients/:clientid/subscriptions' has the expected keys. +t_persistent_sessions_subscriptions1(Config) -> + [N1, _N2] = ?config(nodes, Config), + APIPort = 18084, + Port1 = get_mqtt_port(N1, tcp), + + ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + + ?check_trace( + begin + ClientId = <<"c1">>, + C1 = connect_client(#{port => Port1, clientid => ClientId}), + {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(C1, <<"topic/1">>, 1), + ?assertMatch( + {ok, + {{_, 200, _}, _, [ + #{ + <<"durable">> := true, + <<"node">> := <<_/binary>>, + <<"clientid">> := ClientId, + <<"qos">> := 1, + <<"rap">> := 0, + <<"rh">> := 0, + <<"nl">> := 0, + <<"topic">> := <<"topic/1">> + } + ]}}, + get_subscriptions_request(APIPort, ClientId) + ), + + %% Just disconnect + ok = emqtt:disconnect(C1), + ?assertMatch( + {ok, + {{_, 200, _}, _, [ + #{ + <<"durable">> := true, + <<"node">> := null, + <<"clientid">> := ClientId, + <<"qos">> := 1, + <<"rap">> := 0, + <<"rh">> := 0, + <<"nl">> := 0, + <<"topic">> := <<"topic/1">> + } + ]}}, + get_subscriptions_request(APIPort, ClientId) + ), + + C2 = connect_client(#{port => Port1, clientid => ClientId}), + disconnect_and_destroy_session(C2), + ok + end, + [] + ), + ok. + t_clients_bad_value_type(_) -> %% get /clients AuthHeader = [emqx_common_test_http:default_auth_header()], @@ -1800,6 +1858,11 @@ maybe_json_decode(X) -> {error, _} -> X end. +get_subscriptions_request(APIPort, ClientId) -> + Host = "http://127.0.0.1:" ++ integer_to_list(APIPort), + Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "subscriptions"]), + request(get, Path, []). + get_client_request(Port, ClientId) -> Host = "http://127.0.0.1:" ++ integer_to_list(Port), Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]), @@ -1955,3 +2018,6 @@ do_traverse_in_reverse_v2(APIPort, QueryParams0, [Cursor | Rest], DirectOrderCli {ok, {{_, 200, _}, _, #{<<"data">> := Rows}}} = Res0, ClientIds = [ClientId || #{<<"clientid">> := ClientId} <- Rows], do_traverse_in_reverse_v2(APIPort, QueryParams0, Rest, DirectOrderClientIds, ClientIds ++ Acc). + +disconnect_and_destroy_session(Client) -> + ok = emqtt:disconnect(Client, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}).