Merge pull request #13034 from thalesmg/test-flaky-mq-r57-20240513
test(client mgmt api): attempt to fix flaky test
This commit is contained in:
commit
15acd86b31
|
@ -1193,7 +1193,7 @@ t_mqueue_messages(Config) ->
|
||||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
Topic = <<"t/test_mqueue_msgs">>,
|
Topic = <<"t/test_mqueue_msgs">>,
|
||||||
Count = emqx_mgmt:default_row_limit(),
|
Count = emqx_mgmt:default_row_limit(),
|
||||||
{ok, _Client} = client_with_mqueue(ClientId, Topic, Count),
|
ok = client_with_mqueue(ClientId, Topic, Count),
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]),
|
Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]),
|
||||||
?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])),
|
?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])),
|
||||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
@ -1244,14 +1244,16 @@ client_with_mqueue(ClientId, Topic, Count) ->
|
||||||
{ok, Client} = emqtt:start_link([
|
{ok, Client} = emqtt:start_link([
|
||||||
{proto_ver, v5},
|
{proto_ver, v5},
|
||||||
{clientid, ClientId},
|
{clientid, ClientId},
|
||||||
{clean_start, false},
|
{clean_start, true},
|
||||||
{properties, #{'Session-Expiry-Interval' => 120}}
|
{properties, #{'Session-Expiry-Interval' => 120}}
|
||||||
]),
|
]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, Topic, 1),
|
{ok, _, _} = emqtt:subscribe(Client, Topic, 1),
|
||||||
|
ct:sleep(300),
|
||||||
ok = emqtt:disconnect(Client),
|
ok = emqtt:disconnect(Client),
|
||||||
|
ct:sleep(100),
|
||||||
publish_msgs(Topic, Count),
|
publish_msgs(Topic, Count),
|
||||||
{ok, Client}.
|
ok.
|
||||||
|
|
||||||
client_with_inflight(ClientId, Topic, Count) ->
|
client_with_inflight(ClientId, Topic, Count) ->
|
||||||
{ok, Client} = emqtt:start_link([
|
{ok, Client} = emqtt:start_link([
|
||||||
|
@ -1275,13 +1277,18 @@ publish_msgs(Topic, Count) ->
|
||||||
|
|
||||||
test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, IsMqueue) ->
|
test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, IsMqueue) ->
|
||||||
Qs0 = io_lib:format("payload=~s", [PayloadEncoding]),
|
Qs0 = io_lib:format("payload=~s", [PayloadEncoding]),
|
||||||
{ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
|
|
||||||
#{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
|
|
||||||
#{<<"start">> := StartPos, <<"position">> := Pos} = Meta,
|
|
||||||
|
|
||||||
?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)),
|
{Msgs, StartPos, Pos} = ?retry(500, 10, begin
|
||||||
?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)),
|
{ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
|
||||||
?assertEqual(length(Msgs), Count),
|
#{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
|
||||||
|
#{<<"start">> := StartPos, <<"position">> := Pos} = Meta,
|
||||||
|
|
||||||
|
?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)),
|
||||||
|
?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)),
|
||||||
|
?assertEqual(length(Msgs), Count),
|
||||||
|
|
||||||
|
{Msgs, StartPos, Pos}
|
||||||
|
end),
|
||||||
|
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun({Seq, #{<<"payload">> := P} = M}) ->
|
fun({Seq, #{<<"payload">> := P} = M}) ->
|
||||||
|
|
Loading…
Reference in New Issue