Merge pull request #12988 from thalesmg/fix-ds-durable-response-r57-20240507

fix(clients mgmt api): set `durable` flag for DS sessions and subscriptions, add `clientid` to subscriptions
This commit is contained in:
Thales Macedo Garitezi 2024-05-07 18:04:33 -03:00 committed by GitHub
commit 1b2a204e38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 87 additions and 8 deletions

View File

@ -703,7 +703,7 @@ list_client_subscriptions(ClientId) ->
maps:fold( maps:fold(
fun(Topic, #{current_state := CS}, Acc) -> fun(Topic, #{current_state := CS}, Acc) ->
#{subopts := SubOpts} = maps:get(CS, SStates), #{subopts := SubOpts} = maps:get(CS, SStates),
Elem = {Topic, SubOpts}, Elem = {Topic, SubOpts#{durable => true}},
[Elem | Acc] [Elem | Acc]
end, end,
[], [],

View File

@ -1753,6 +1753,7 @@ format_persistent_session_info(
}), }),
Info0#{ Info0#{
connected => false, connected => false,
durable => true,
is_persistent => true, is_persistent => true,
subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{})) subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{}))
}; };
@ -1773,6 +1774,7 @@ format_persistent_session_info(ClientId, PSInfo0) ->
clientid => ClientId, clientid => ClientId,
connected => false, connected => false,
connected_at => CreatedAt, connected_at => CreatedAt,
durable => true,
ip_address => IpAddress, ip_address => IpAddress,
is_persistent => true, is_persistent => true,
port => Port, port => Port,

View File

@ -171,21 +171,32 @@ subscriptions(get, #{query_string := QString}) ->
{200, Result} {200, Result}
end. end.
format(WhichNode, {{Topic, _Subscriber}, SubOpts}) -> format(WhichNode, {{Topic, Subscriber}, SubOpts}) ->
FallbackClientId =
case is_binary(Subscriber) of
true ->
Subscriber;
false ->
%% e.g.: could be a pid...
null
end,
maps:merge( maps:merge(
#{ #{
topic => emqx_topic:maybe_format_share(Topic), topic => emqx_topic:maybe_format_share(Topic),
clientid => maps:get(subid, SubOpts, null), clientid => maps:get(subid, SubOpts, FallbackClientId),
node => WhichNode, node => convert_null(WhichNode),
durable => false durable => false
}, },
maps:with([qos, nl, rap, rh], SubOpts) maps:with([qos, nl, rap, rh, durable], SubOpts)
). ).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
convert_null(undefined) -> null;
convert_null(Val) -> Val.
check_match_topic(#{<<"match_topic">> := MatchTopic}) -> check_match_topic(#{<<"match_topic">> := MatchTopic}) ->
try emqx_topic:parse(MatchTopic) of try emqx_topic:parse(MatchTopic) of
{#share{}, _} -> {error, invalid_match_topic}; {#share{}, _} -> {error, invalid_match_topic};

View File

@ -49,6 +49,7 @@ persistent_session_testcases() ->
t_persistent_sessions3, t_persistent_sessions3,
t_persistent_sessions4, t_persistent_sessions4,
t_persistent_sessions5, t_persistent_sessions5,
t_persistent_sessions_subscriptions1,
t_list_clients_v2 t_list_clients_v2
]. ].
client_msgs_testcases() -> client_msgs_testcases() ->
@ -333,7 +334,7 @@ t_persistent_sessions2(Config) ->
%% 2) Client connects to the same node and takes over, listed only once. %% 2) Client connects to the same node and takes over, listed only once.
C2 = connect_client(#{port => Port1, clientid => ClientId}), C2 = connect_client(#{port => Port1, clientid => ClientId}),
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}), disconnect_and_destroy_session(C2),
?retry( ?retry(
100, 100,
20, 20,
@ -377,7 +378,7 @@ t_persistent_sessions3(Config) ->
list_request(APIPort, "node=" ++ atom_to_list(N1)) list_request(APIPort, "node=" ++ atom_to_list(N1))
) )
), ),
ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}) disconnect_and_destroy_session(C2)
end, end,
[] []
), ),
@ -417,7 +418,7 @@ t_persistent_sessions4(Config) ->
list_request(APIPort, "node=" ++ atom_to_list(N1)) list_request(APIPort, "node=" ++ atom_to_list(N1))
) )
), ),
ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}) disconnect_and_destroy_session(C2)
end, end,
[] []
), ),
@ -552,6 +553,63 @@ t_persistent_sessions5(Config) ->
), ),
ok. ok.
%% Check that the output of `/clients/:clientid/subscriptions' has the expected keys.
t_persistent_sessions_subscriptions1(Config) ->
[N1, _N2] = ?config(nodes, Config),
APIPort = 18084,
Port1 = get_mqtt_port(N1, tcp),
?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
?check_trace(
begin
ClientId = <<"c1">>,
C1 = connect_client(#{port => Port1, clientid => ClientId}),
{ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(C1, <<"topic/1">>, 1),
?assertMatch(
{ok,
{{_, 200, _}, _, [
#{
<<"durable">> := true,
<<"node">> := <<_/binary>>,
<<"clientid">> := ClientId,
<<"qos">> := 1,
<<"rap">> := 0,
<<"rh">> := 0,
<<"nl">> := 0,
<<"topic">> := <<"topic/1">>
}
]}},
get_subscriptions_request(APIPort, ClientId)
),
%% Just disconnect
ok = emqtt:disconnect(C1),
?assertMatch(
{ok,
{{_, 200, _}, _, [
#{
<<"durable">> := true,
<<"node">> := null,
<<"clientid">> := ClientId,
<<"qos">> := 1,
<<"rap">> := 0,
<<"rh">> := 0,
<<"nl">> := 0,
<<"topic">> := <<"topic/1">>
}
]}},
get_subscriptions_request(APIPort, ClientId)
),
C2 = connect_client(#{port => Port1, clientid => ClientId}),
disconnect_and_destroy_session(C2),
ok
end,
[]
),
ok.
t_clients_bad_value_type(_) -> t_clients_bad_value_type(_) ->
%% get /clients %% get /clients
AuthHeader = [emqx_common_test_http:default_auth_header()], AuthHeader = [emqx_common_test_http:default_auth_header()],
@ -1800,6 +1858,11 @@ maybe_json_decode(X) ->
{error, _} -> X {error, _} -> X
end. end.
get_subscriptions_request(APIPort, ClientId) ->
Host = "http://127.0.0.1:" ++ integer_to_list(APIPort),
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "subscriptions"]),
request(get, Path, []).
get_client_request(Port, ClientId) -> get_client_request(Port, ClientId) ->
Host = "http://127.0.0.1:" ++ integer_to_list(Port), Host = "http://127.0.0.1:" ++ integer_to_list(Port),
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]), Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]),
@ -1955,3 +2018,6 @@ do_traverse_in_reverse_v2(APIPort, QueryParams0, [Cursor | Rest], DirectOrderCli
{ok, {{_, 200, _}, _, #{<<"data">> := Rows}}} = Res0, {ok, {{_, 200, _}, _, #{<<"data">> := Rows}}} = Res0,
ClientIds = [ClientId || #{<<"clientid">> := ClientId} <- Rows], ClientIds = [ClientId || #{<<"clientid">> := ClientId} <- Rows],
do_traverse_in_reverse_v2(APIPort, QueryParams0, Rest, DirectOrderClientIds, ClientIds ++ Acc). do_traverse_in_reverse_v2(APIPort, QueryParams0, Rest, DirectOrderClientIds, ClientIds ++ Acc).
disconnect_and_destroy_session(Client) ->
ok = emqtt:disconnect(Client, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}).