fix(clients mgmt api): set `durable` flag for DS sessions and subscriptions
This commit is contained in:
parent
02c58b67f0
commit
0c7b221c42
|
@ -703,7 +703,7 @@ list_client_subscriptions(ClientId) ->
|
||||||
maps:fold(
|
maps:fold(
|
||||||
fun(Topic, #{current_state := CS}, Acc) ->
|
fun(Topic, #{current_state := CS}, Acc) ->
|
||||||
#{subopts := SubOpts} = maps:get(CS, SStates),
|
#{subopts := SubOpts} = maps:get(CS, SStates),
|
||||||
Elem = {Topic, SubOpts},
|
Elem = {Topic, SubOpts#{durable => true}},
|
||||||
[Elem | Acc]
|
[Elem | Acc]
|
||||||
end,
|
end,
|
||||||
[],
|
[],
|
||||||
|
|
|
@ -1753,6 +1753,7 @@ format_persistent_session_info(
|
||||||
}),
|
}),
|
||||||
Info0#{
|
Info0#{
|
||||||
connected => false,
|
connected => false,
|
||||||
|
durable => true,
|
||||||
is_persistent => true,
|
is_persistent => true,
|
||||||
subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{}))
|
subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{}))
|
||||||
};
|
};
|
||||||
|
@ -1773,6 +1774,7 @@ format_persistent_session_info(ClientId, PSInfo0) ->
|
||||||
clientid => ClientId,
|
clientid => ClientId,
|
||||||
connected => false,
|
connected => false,
|
||||||
connected_at => CreatedAt,
|
connected_at => CreatedAt,
|
||||||
|
durable => true,
|
||||||
ip_address => IpAddress,
|
ip_address => IpAddress,
|
||||||
is_persistent => true,
|
is_persistent => true,
|
||||||
port => Port,
|
port => Port,
|
||||||
|
|
|
@ -171,21 +171,32 @@ subscriptions(get, #{query_string := QString}) ->
|
||||||
{200, Result}
|
{200, Result}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
format(WhichNode, {{Topic, _Subscriber}, SubOpts}) ->
|
format(WhichNode, {{Topic, Subscriber}, SubOpts}) ->
|
||||||
|
FallbackClientId =
|
||||||
|
case is_binary(Subscriber) of
|
||||||
|
true ->
|
||||||
|
Subscriber;
|
||||||
|
false ->
|
||||||
|
%% e.g.: could be a pid...
|
||||||
|
null
|
||||||
|
end,
|
||||||
maps:merge(
|
maps:merge(
|
||||||
#{
|
#{
|
||||||
topic => emqx_topic:maybe_format_share(Topic),
|
topic => emqx_topic:maybe_format_share(Topic),
|
||||||
clientid => maps:get(subid, SubOpts, null),
|
clientid => maps:get(subid, SubOpts, FallbackClientId),
|
||||||
node => WhichNode,
|
node => convert_null(WhichNode),
|
||||||
durable => false
|
durable => false
|
||||||
},
|
},
|
||||||
maps:with([qos, nl, rap, rh], SubOpts)
|
maps:with([qos, nl, rap, rh, durable], SubOpts)
|
||||||
).
|
).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
convert_null(undefined) -> null;
|
||||||
|
convert_null(Val) -> Val.
|
||||||
|
|
||||||
check_match_topic(#{<<"match_topic">> := MatchTopic}) ->
|
check_match_topic(#{<<"match_topic">> := MatchTopic}) ->
|
||||||
try emqx_topic:parse(MatchTopic) of
|
try emqx_topic:parse(MatchTopic) of
|
||||||
{#share{}, _} -> {error, invalid_match_topic};
|
{#share{}, _} -> {error, invalid_match_topic};
|
||||||
|
|
|
@ -49,6 +49,7 @@ persistent_session_testcases() ->
|
||||||
t_persistent_sessions3,
|
t_persistent_sessions3,
|
||||||
t_persistent_sessions4,
|
t_persistent_sessions4,
|
||||||
t_persistent_sessions5,
|
t_persistent_sessions5,
|
||||||
|
t_persistent_sessions_subscriptions1,
|
||||||
t_list_clients_v2
|
t_list_clients_v2
|
||||||
].
|
].
|
||||||
client_msgs_testcases() ->
|
client_msgs_testcases() ->
|
||||||
|
@ -333,7 +334,7 @@ t_persistent_sessions2(Config) ->
|
||||||
%% 2) Client connects to the same node and takes over, listed only once.
|
%% 2) Client connects to the same node and takes over, listed only once.
|
||||||
C2 = connect_client(#{port => Port1, clientid => ClientId}),
|
C2 = connect_client(#{port => Port1, clientid => ClientId}),
|
||||||
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
|
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
|
||||||
ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}),
|
disconnect_and_destroy_session(C2),
|
||||||
?retry(
|
?retry(
|
||||||
100,
|
100,
|
||||||
20,
|
20,
|
||||||
|
@ -377,7 +378,7 @@ t_persistent_sessions3(Config) ->
|
||||||
list_request(APIPort, "node=" ++ atom_to_list(N1))
|
list_request(APIPort, "node=" ++ atom_to_list(N1))
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0})
|
disconnect_and_destroy_session(C2)
|
||||||
end,
|
end,
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
|
@ -417,7 +418,7 @@ t_persistent_sessions4(Config) ->
|
||||||
list_request(APIPort, "node=" ++ atom_to_list(N1))
|
list_request(APIPort, "node=" ++ atom_to_list(N1))
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0})
|
disconnect_and_destroy_session(C2)
|
||||||
end,
|
end,
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
|
@ -552,6 +553,63 @@ t_persistent_sessions5(Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% Check that the output of `/clients/:clientid/subscriptions' has the expected keys.
|
||||||
|
t_persistent_sessions_subscriptions1(Config) ->
|
||||||
|
[N1, _N2] = ?config(nodes, Config),
|
||||||
|
APIPort = 18084,
|
||||||
|
Port1 = get_mqtt_port(N1, tcp),
|
||||||
|
|
||||||
|
?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
|
||||||
|
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
ClientId = <<"c1">>,
|
||||||
|
C1 = connect_client(#{port => Port1, clientid => ClientId}),
|
||||||
|
{ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(C1, <<"topic/1">>, 1),
|
||||||
|
?assertMatch(
|
||||||
|
{ok,
|
||||||
|
{{_, 200, _}, _, [
|
||||||
|
#{
|
||||||
|
<<"durable">> := true,
|
||||||
|
<<"node">> := <<_/binary>>,
|
||||||
|
<<"clientid">> := ClientId,
|
||||||
|
<<"qos">> := 1,
|
||||||
|
<<"rap">> := 0,
|
||||||
|
<<"rh">> := 0,
|
||||||
|
<<"nl">> := 0,
|
||||||
|
<<"topic">> := <<"topic/1">>
|
||||||
|
}
|
||||||
|
]}},
|
||||||
|
get_subscriptions_request(APIPort, ClientId)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% Just disconnect
|
||||||
|
ok = emqtt:disconnect(C1),
|
||||||
|
?assertMatch(
|
||||||
|
{ok,
|
||||||
|
{{_, 200, _}, _, [
|
||||||
|
#{
|
||||||
|
<<"durable">> := true,
|
||||||
|
<<"node">> := null,
|
||||||
|
<<"clientid">> := ClientId,
|
||||||
|
<<"qos">> := 1,
|
||||||
|
<<"rap">> := 0,
|
||||||
|
<<"rh">> := 0,
|
||||||
|
<<"nl">> := 0,
|
||||||
|
<<"topic">> := <<"topic/1">>
|
||||||
|
}
|
||||||
|
]}},
|
||||||
|
get_subscriptions_request(APIPort, ClientId)
|
||||||
|
),
|
||||||
|
|
||||||
|
C2 = connect_client(#{port => Port1, clientid => ClientId}),
|
||||||
|
disconnect_and_destroy_session(C2),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_clients_bad_value_type(_) ->
|
t_clients_bad_value_type(_) ->
|
||||||
%% get /clients
|
%% get /clients
|
||||||
AuthHeader = [emqx_common_test_http:default_auth_header()],
|
AuthHeader = [emqx_common_test_http:default_auth_header()],
|
||||||
|
@ -1800,6 +1858,11 @@ maybe_json_decode(X) ->
|
||||||
{error, _} -> X
|
{error, _} -> X
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
get_subscriptions_request(APIPort, ClientId) ->
|
||||||
|
Host = "http://127.0.0.1:" ++ integer_to_list(APIPort),
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "subscriptions"]),
|
||||||
|
request(get, Path, []).
|
||||||
|
|
||||||
get_client_request(Port, ClientId) ->
|
get_client_request(Port, ClientId) ->
|
||||||
Host = "http://127.0.0.1:" ++ integer_to_list(Port),
|
Host = "http://127.0.0.1:" ++ integer_to_list(Port),
|
||||||
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]),
|
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]),
|
||||||
|
@ -1955,3 +2018,6 @@ do_traverse_in_reverse_v2(APIPort, QueryParams0, [Cursor | Rest], DirectOrderCli
|
||||||
{ok, {{_, 200, _}, _, #{<<"data">> := Rows}}} = Res0,
|
{ok, {{_, 200, _}, _, #{<<"data">> := Rows}}} = Res0,
|
||||||
ClientIds = [ClientId || #{<<"clientid">> := ClientId} <- Rows],
|
ClientIds = [ClientId || #{<<"clientid">> := ClientId} <- Rows],
|
||||||
do_traverse_in_reverse_v2(APIPort, QueryParams0, Rest, DirectOrderClientIds, ClientIds ++ Acc).
|
do_traverse_in_reverse_v2(APIPort, QueryParams0, Rest, DirectOrderClientIds, ClientIds ++ Acc).
|
||||||
|
|
||||||
|
disconnect_and_destroy_session(Client) ->
|
||||||
|
ok = emqtt:disconnect(Client, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}).
|
||||||
|
|
Loading…
Reference in New Issue