Merge pull request #9627 from emqx/1228-sync-changes-from-ee-to-ce
1228 sync changes from ee to ce
This commit is contained in:
commit
953a230abb
|
@ -25,3 +25,14 @@
|
|||
- Fixed EMQX Helm Chart can not set JSON type value for EMQX Broker configuration items [#9504](https://github.com/emqx/emqx/pull/9504).
|
||||
|
||||
- When resource creation is too slow, there may be some temporary probing connections left [#9539](https://github.com/emqx/emqx/pull/9539).
|
||||
|
||||
- After a reconnect, the unacknowledged QoS1/QoS2 messages in non-clean session were not retransmitted periodically as before the reconnect [#9627](https://github.com/emqx/emqx/pull/9627).
|
||||
The configuration `zone.<zone-name>.retry_interval` specifies the retransmission interval of
|
||||
unacknowledged QoS1/QoS2 messages (defaults to 30s).
|
||||
Prior to this fix, unacknowledged messages buffered in the session are re-sent only once after session take-over, but not retried at configured interval.
|
||||
|
||||
- The expired 'awaiting_rel' queue is not cleared after persistent session MQTT client disconnected [#9627](https://github.com/emqx/emqx/pull/9627).
|
||||
Before this change, if the 'awaiting_rel' queue is full when the MQTT client reconnect
|
||||
to the broker and publish a QoS2 message, the client will get disconnected by the broker
|
||||
with reason code RC_RECEIVE_MAXIMUM_EXCEEDED(0x93), even if the packet IDs in the 'awaiting_rel'
|
||||
queue have already expired.
|
||||
|
|
|
@ -26,3 +26,11 @@
|
|||
- 修复 EMQX Helm Chart 无法配置 value 为 JSON 类型的 EMQX Broker 配置项 [#9504](https://github.com/emqx/emqx/pull/9504)。
|
||||
|
||||
- 当创建资源过慢的情况下,有可能会残留一些用来探活的临时的连接 [#9539](https://github.com/emqx/emqx/pull/9539)。
|
||||
|
||||
- 持久会话的 MQTT 客户端重新连接 emqx 之后,未被确认过的 QoS1/QoS2 消息不再周期性重发 [#9627](https://github.com/emqx/emqx/pull/9627)。
|
||||
`zone.<zone-name>.retry_interval` 配置指定了没有被确认过的 QoS1/QoS2 消息的重发间隔,(默认为 30s)。在这个修复之前,
|
||||
当持久会话的 MQTT 客户端重新连接 emqx 之后,emqx 会将队列中缓存的未被确认过的消息重发一次,但是不会按配置的时间间隔重试。
|
||||
|
||||
- 持久会话的 MQTT 客户端断连之后,已经过期的 'awaiting_rel' 队列没有清除 [#9627](https://github.com/emqx/emqx/pull/9627)。
|
||||
在这个改动之前,在客户端重连并且发布 QoS2 消息的时候,如果 'awaiting_rel' 队列已满,此客户端会被服务器以
|
||||
RC_RECEIVE_MAXIMUM_EXCEEDED(0x93) 错误码断开连接,即使这时候 'awaiting_rel' 队列里面的报文 ID 已经过期了。
|
||||
|
|
|
@ -60,9 +60,9 @@
|
|||
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
|
||||
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
|
||||
, {getopt, "1.0.1"}
|
||||
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.1"}}}
|
||||
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.3"}}}
|
||||
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}
|
||||
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.16"}}}
|
||||
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.17"}}}
|
||||
, {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}}
|
||||
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
|
||||
]}.
|
||||
|
|
|
@ -56,9 +56,12 @@
|
|||
, clear_keepalive/1
|
||||
]).
|
||||
|
||||
%% Exports for CT
|
||||
-export([set_field/3]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([ensure_timer/3]).
|
||||
-endif.
|
||||
|
||||
-import(emqx_misc,
|
||||
[ run_fold/3
|
||||
, pipeline/3
|
||||
|
@ -622,20 +625,20 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
|
|||
NChannel = ensure_quota(PubRes, Channel),
|
||||
handle_out(puback, {PacketId, RC}, NChannel);
|
||||
|
||||
do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
||||
Channel = #channel{session = Session}) ->
|
||||
do_publish(PacketId, Msg = #message{qos = ?QOS_2}, Channel0) ->
|
||||
#channel{session = Session} = NChannel = maybe_clean_expired_awaiting_rel(Channel0),
|
||||
case emqx_session:publish(PacketId, Msg, Session) of
|
||||
{ok, PubRes, NSession} ->
|
||||
RC = puback_reason_code(PubRes),
|
||||
NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession}),
|
||||
NChannel1 = ensure_timer(await_timer, NChannel#channel{session = NSession}),
|
||||
NChannel2 = ensure_quota(PubRes, NChannel1),
|
||||
handle_out(pubrec, {PacketId, RC}, NChannel2);
|
||||
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||
ok = emqx_metrics:inc('packets.publish.inuse'),
|
||||
handle_out(pubrec, {PacketId, RC}, Channel);
|
||||
handle_out(pubrec, {PacketId, RC}, NChannel);
|
||||
{error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
|
||||
ok = emqx_metrics:inc('packets.publish.dropped'),
|
||||
handle_out(disconnect, RC, Channel)
|
||||
handle_out(disconnect, RC, NChannel)
|
||||
end.
|
||||
|
||||
ensure_quota(_, Channel = #channel{quota = undefined}) ->
|
||||
|
@ -841,7 +844,6 @@ handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = Conn
|
|||
[ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)],
|
||||
AckProps
|
||||
),
|
||||
|
||||
return_connack(?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps),
|
||||
ensure_keepalive(NAckProps, Channel));
|
||||
|
||||
|
@ -923,7 +925,7 @@ return_connack(AckPacket, Channel) ->
|
|||
},
|
||||
{Packets, NChannel1} = do_deliver(Publishes, NChannel),
|
||||
Outgoing = [{outgoing, Packets} || length(Packets) > 0],
|
||||
{ok, Replies ++ Outgoing, NChannel1}
|
||||
{ok, Replies ++ Outgoing, ensure_timer(retry_timer, NChannel1)}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -1127,17 +1129,8 @@ handle_timeout(_TRef, retry_delivery,
|
|||
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
|
||||
end;
|
||||
|
||||
handle_timeout(_TRef, expire_awaiting_rel,
|
||||
Channel = #channel{conn_state = disconnected}) ->
|
||||
{ok, Channel};
|
||||
handle_timeout(_TRef, expire_awaiting_rel,
|
||||
Channel = #channel{session = Session}) ->
|
||||
case emqx_session:expire(awaiting_rel, Session) of
|
||||
{ok, NSession} ->
|
||||
{ok, clean_timer(await_timer, Channel#channel{session = NSession})};
|
||||
{ok, Timeout, NSession} ->
|
||||
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})}
|
||||
end;
|
||||
handle_timeout(_TRef, expire_awaiting_rel, Channel) ->
|
||||
{ok, clean_expired_awaiting_rel(Channel)};
|
||||
|
||||
handle_timeout(_TRef, expire_session, Channel) ->
|
||||
shutdown(expired, Channel);
|
||||
|
@ -1182,6 +1175,26 @@ reset_timer(Name, Time, Channel) ->
|
|||
clean_timer(Name, Channel = #channel{timers = Timers}) ->
|
||||
Channel#channel{timers = maps:remove(Name, Timers)}.
|
||||
|
||||
is_timer_alive(Name, #channel{timers = Timers}) ->
|
||||
case maps:find(Name, Timers) of
|
||||
error -> false;
|
||||
{ok, _TRef} -> true
|
||||
end.
|
||||
|
||||
maybe_clean_expired_awaiting_rel(Channel) ->
|
||||
case is_timer_alive(await_timer, Channel) of
|
||||
true -> Channel;
|
||||
false -> clean_expired_awaiting_rel(Channel)
|
||||
end.
|
||||
|
||||
clean_expired_awaiting_rel(Channel = #channel{session = Session}) ->
|
||||
case emqx_session:expire(awaiting_rel, Session) of
|
||||
{ok, NSession} ->
|
||||
clean_timer(await_timer, Channel#channel{session = NSession});
|
||||
{ok, Timeout, NSession} ->
|
||||
reset_timer(await_timer, Timeout, Channel#channel{session = NSession})
|
||||
end.
|
||||
|
||||
-spec interval(channel_timer(), channel()) -> timeout().
|
||||
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
||||
emqx_keepalive:info(interval, KeepAlive);
|
||||
|
@ -1878,10 +1891,6 @@ is_disconnect_event_enabled(discarded) ->
|
|||
is_disconnect_event_enabled(takeovered) ->
|
||||
emqx:get_env(client_disconnect_takeovered, false).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% For CT tests
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
set_field(Name, Value, Channel) ->
|
||||
Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
|
||||
setelement(Pos+1, Channel, Value).
|
||||
|
|
|
@ -558,7 +558,8 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
clean_down({ChanPid, ClientId}) ->
|
||||
do_unregister_channel({ClientId, ChanPid}).
|
||||
do_unregister_channel({ClientId, ChanPid}),
|
||||
?tp(debug, emqx_cm_clean_down, #{client_id => ClientId}).
|
||||
|
||||
stats_fun() ->
|
||||
lists:foreach(fun update_stats/1, ?CHAN_STATS).
|
||||
|
|
|
@ -31,12 +31,14 @@
|
|||
post_release_upgrade(FromRelVsn, _) ->
|
||||
{_, CurrRelVsn} = ?EMQX_RELEASE,
|
||||
?INFO("emqx has been upgraded from ~s to ~s!", [FromRelVsn, CurrRelVsn]),
|
||||
maybe_refresh_jwt_module(FromRelVsn),
|
||||
reload_components().
|
||||
|
||||
%% What to do after downgraded to an old release vsn.
|
||||
post_release_downgrade(ToRelVsn, _) ->
|
||||
{_, CurrRelVsn} = ?EMQX_RELEASE,
|
||||
?INFO("emqx has been downgraded from ~s to ~s!", [CurrRelVsn, ToRelVsn]),
|
||||
maybe_refresh_jwt_module(ToRelVsn),
|
||||
reload_components().
|
||||
|
||||
-ifdef(EMQX_ENTERPRISE).
|
||||
|
@ -73,3 +75,21 @@ load_plugins() ->
|
|||
true -> emqx_plugins:force_load();
|
||||
false -> emqx_plugins:load()
|
||||
end.
|
||||
|
||||
-ifdef(EMQX_ENTERPRISE).
|
||||
maybe_refresh_jwt_module(Release) when Release =:= "4.4.0"
|
||||
orelse Release =:= "4.4.1"
|
||||
orelse Release =:= "4.4.2"
|
||||
orelse Release =:= "4.4.3" ->
|
||||
_ = emqx:unhook('client.authenticate', fun emqx_auth_jwt:check/3),
|
||||
_ = emqx:unhook('client.authenticate', fun emqx_auth_jwt:check_auth/3),
|
||||
emqx_modules:refresh_module(jwt_authentication);
|
||||
maybe_refresh_jwt_module(_) ->
|
||||
ok.
|
||||
|
||||
-else.
|
||||
|
||||
maybe_refresh_jwt_module(_) ->
|
||||
ok.
|
||||
|
||||
-endif.
|
||||
|
|
|
@ -205,7 +205,8 @@ t_handle_in_qos2_publish_with_error_return(_) ->
|
|||
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
||||
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
||||
Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}),
|
||||
Channel = channel(#{conn_state => connected, session => Session}),
|
||||
Channel = emqx_channel:ensure_timer(await_timer, 2000,
|
||||
channel(#{conn_state => connected, session => Session})),
|
||||
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
||||
{ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} =
|
||||
emqx_channel:handle_in(Publish1, Channel),
|
||||
|
|
|
@ -41,12 +41,24 @@
|
|||
<<"TopicA/#">>
|
||||
]).
|
||||
|
||||
-define(do_receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS),
|
||||
receive
|
||||
{tcp, SOCKET, PACKET} -> EXPRESS
|
||||
after TIMEOUT ->
|
||||
ct:fail({receive_timeout, TIMEOUT})
|
||||
end).
|
||||
|
||||
-define(receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS),
|
||||
fun() ->
|
||||
?do_receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS)
|
||||
end()).
|
||||
|
||||
all() ->
|
||||
[{group, mqttv3},
|
||||
{group, mqttv4},
|
||||
{group, mqttv5},
|
||||
{group, others}
|
||||
{group, others},
|
||||
{group, bugfixes}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
|
@ -73,6 +85,10 @@ groups() ->
|
|||
t_certcn_as_clientid_default_config_tls,
|
||||
t_certcn_as_clientid_tlsv1_3,
|
||||
t_certcn_as_clientid_tlsv1_2
|
||||
]},
|
||||
{bugfixes, [non_parallel_tests],
|
||||
[ t_qos2_no_pubrel_received
|
||||
, t_retry_timer_after_session_taken_over
|
||||
]}
|
||||
].
|
||||
|
||||
|
@ -84,6 +100,18 @@ init_per_suite(Config) ->
|
|||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
init_per_testcase(Func, Cfg) ->
|
||||
maybe_run_fun(setup, Func, Cfg).
|
||||
|
||||
end_per_testcase(Func, Cfg) ->
|
||||
maybe_run_fun(teardown, Func, Cfg).
|
||||
|
||||
maybe_run_fun(Tag, Func, Cfg) ->
|
||||
try ?MODULE:Func(Tag, Cfg)
|
||||
catch
|
||||
error:undef -> Cfg
|
||||
end.
|
||||
|
||||
set_special_confs(emqx) ->
|
||||
emqx_ct_helpers:change_emqx_opts(ssl_twoway, [{peer_cert_as_username, cn}]);
|
||||
set_special_confs(_) ->
|
||||
|
@ -297,12 +325,193 @@ t_certcn_as_clientid_tlsv1_3(_) ->
|
|||
t_certcn_as_clientid_tlsv1_2(_) ->
|
||||
tls_certcn_as_clientid('tlsv1.2').
|
||||
|
||||
t_qos2_no_pubrel_received(setup, Cfg) ->
|
||||
OldMaxAwaitRel = emqx_zone:get_env(external, max_awaiting_rel),
|
||||
OldWaitTimeout = emqx_zone:get_env(external, await_rel_timeout),
|
||||
emqx_zone:set_env(external, max_awaiting_rel, 2),
|
||||
emqx_zone:set_env(external, await_rel_timeout, 1),
|
||||
[{old_max_await_rel, OldMaxAwaitRel},
|
||||
{old_wait_timeout, OldWaitTimeout} | Cfg];
|
||||
t_qos2_no_pubrel_received(teardown, Cfg) ->
|
||||
OldMaxAwaitRel = ?config(old_max_await_rel, Cfg),
|
||||
OldWaitTimeout = ?config(old_wait_timeout, Cfg),
|
||||
emqx_zone:set_env(external, max_awaiting_rel, OldMaxAwaitRel),
|
||||
emqx_zone:set_env(external, await_rel_timeout, OldWaitTimeout).
|
||||
t_qos2_no_pubrel_received(_) ->
|
||||
%% [The Scenario]:
|
||||
%% Client --- CONN: clean_session=false --> EMQX
|
||||
%% Client --- PUB: QoS2 ---> EMQX
|
||||
%% Client --- PUB: QoS2 ---> EMQX
|
||||
%% Client --- ... ---> EMQX
|
||||
%% Client <---- PUBREC ---- EMQX
|
||||
%% Client <---- PUBREC ---- EMQX
|
||||
%% Client <---- ... ---- EMQX
|
||||
%% Client --- PUBREL --X--> EMQX (PUBREL not received)
|
||||
%% Client <--- DISCONN: RC_RECEIVE_MAXIMUM_EXCEEDED --- EMQX
|
||||
%%
|
||||
%% [A few hours later..]:
|
||||
%% Client --- CONN: clean_session=false --> EMQX
|
||||
%% Client --- PUB: QoS2 ---> EMQX
|
||||
%% Client <--- DISCONN: RC_RECEIVE_MAXIMUM_EXCEEDED --- EMQX (we should clear the awaiting_rel queue but it is still full).
|
||||
ct:pal("1. reconnect after awaiting_rel is cleared"),
|
||||
qos2_no_pubrel_received(fun
|
||||
(1) -> timer:sleep(1500); %% reconnect 1.5s later, ensure the await_rel_timeout triggered
|
||||
(2) -> ok
|
||||
end),
|
||||
ct:pal("2. reconnect before awaiting_rel is cleared"),
|
||||
qos2_no_pubrel_received(fun
|
||||
(1) -> ok; %% reconnect as fast as possiable, ensure the await_rel_timeout NOT triggered
|
||||
(2) -> timer:sleep(1500) %% send msgs 1.5s later, ensure the await_rel_timeout triggered
|
||||
end).
|
||||
|
||||
qos2_no_pubrel_received(Actions) ->
|
||||
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||
Topic = <<"t/foo">>,
|
||||
{ok, Sock} = gen_tcp:connect("127.0.0.1", 1883, [], 3000),
|
||||
gen_tcp:send(Sock, make_connect_packet(ClientId, false)),
|
||||
timer:sleep(100),
|
||||
ok = gen_tcp:send(Sock, make_publish_packet(Topic, 2, 100, <<"foo">>)),
|
||||
ok = gen_tcp:send(Sock, make_publish_packet(Topic, 2, 101, <<"foo">>)),
|
||||
%% Send the 3rd publish with id 103 will got disconnected:
|
||||
%% - "Dropped the qos2 packet 103 due to awaiting_rel is full".
|
||||
?assertMatch(ok, gen_tcp:send(Sock, make_publish_packet(Topic, 2, 103, <<"foo">>))),
|
||||
%% not the connections should be closed by the server due to ?RC_RECEIVE_MAXIMUM_EXCEEDED
|
||||
receive
|
||||
{tcp_closed, Sock} -> ok
|
||||
after 500 ->
|
||||
ct:fail({wait_tcp_close_timeout, 500})
|
||||
end,
|
||||
|
||||
%% before reconnecting
|
||||
action_point(Actions, 1),
|
||||
|
||||
{ok, Sock1} = gen_tcp:connect("127.0.0.1", 1883, [], 3000),
|
||||
gen_tcp:send(Sock1, make_connect_packet(ClientId, false)),
|
||||
|
||||
%% after connected and before sending any msgs
|
||||
action_point(Actions, 2),
|
||||
|
||||
ok = gen_tcp:send(Sock1, make_publish_packet(Topic, 2, 104, <<"foo">>)),
|
||||
|
||||
receive
|
||||
{tcp_closed, Sock1} ->
|
||||
%% this is the bug, the wait_rel queue should have expired but not cleared
|
||||
ct:fail(unexpected_disconnect)
|
||||
after 500 -> ok
|
||||
end,
|
||||
gen_tcp:close(Sock1),
|
||||
ok.
|
||||
|
||||
t_retry_timer_after_session_taken_over(setup, Cfg) ->
|
||||
%emqx_logger:set_log_level(debug),
|
||||
OldRetryInterval = emqx_zone:get_env(external, retry_interval),
|
||||
emqx_zone:set_env(external, retry_interval, 1),
|
||||
[{old_retry_interval, OldRetryInterval} | Cfg];
|
||||
t_retry_timer_after_session_taken_over(teardown, Cfg) ->
|
||||
%emqx_logger:set_log_level(warning),
|
||||
OldRetryInterval = ?config(old_retry_interval, Cfg),
|
||||
emqx_zone:set_env(external, retry_interval, OldRetryInterval).
|
||||
t_retry_timer_after_session_taken_over(_) ->
|
||||
%% [The Scenario]:
|
||||
%% Client --- CONN: clean_session=false --> EMQX
|
||||
%% Client <--- PUB: QoS1 --- EMQX
|
||||
%% Client --X-- PUBACK ----> EMQX (the client doesn't send PUBACK)
|
||||
%% Client <---- PUB: QoS1 ---- EMQX (resend the PUBLISH msg)
|
||||
%% Client --- DISCONN ---> EMQX
|
||||
%% [A few seconds later..]:
|
||||
%% Client --- CONN: clean_session=false --> EMQX
|
||||
%% Client <--- PUB: QoS1 --- EMQX (resume session and resend the inflight messages)
|
||||
%% Client --X-- PUBACK ----> EMQX (the client doesn't send PUBACK)
|
||||
TcpOpts = [binary, {active, true}],
|
||||
|
||||
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||
Topic = <<"t/foo">>,
|
||||
Payload = <<"DO NOT REPLY!">>,
|
||||
|
||||
%% CONNECT
|
||||
{ok, Sock1} = gen_tcp:connect("127.0.0.1", 1883, TcpOpts, 3000),
|
||||
ok = gen_tcp:send(Sock1, make_connect_packet(ClientId, false)),
|
||||
?receive_tcp_packet(1000, Sock1, _, ok),
|
||||
|
||||
%% SUBSCRIBE
|
||||
ok = gen_tcp:send(Sock1, make_subscribe_packet(Topic, 2, 1)),
|
||||
?receive_tcp_packet(200, Sock1, _, ok),
|
||||
|
||||
emqx_broker:publish(emqx_message:make(<<"publisher">>, 1, Topic, Payload)),
|
||||
%% note that here we don't reply the publish with puback
|
||||
?receive_tcp_packet(200, Sock1, PubPacket1,
|
||||
begin
|
||||
?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _},
|
||||
emqx_frame:parse(PubPacket1))
|
||||
end),
|
||||
|
||||
ok = gen_tcp:close(Sock1),
|
||||
timer:sleep(100),
|
||||
|
||||
%% CONNECT again
|
||||
{ok, Sock2} = gen_tcp:connect("127.0.0.1", 1883, TcpOpts, 3000),
|
||||
ok = gen_tcp:send(Sock2, make_connect_packet(ClientId, false)),
|
||||
ConnAckRem = ?do_receive_tcp_packet(200, Sock2, ConnPack2, begin
|
||||
ConnAck = iolist_to_binary(make_connack_packet(?CONNACK_ACCEPT, 1)),
|
||||
ct:pal("--- connack: ~p, got: ~p", [ConnAck, ConnPack2]),
|
||||
<<ConnAck:4/binary, Rem/binary>> = ConnPack2,
|
||||
Rem
|
||||
end),
|
||||
|
||||
%% emqx should resend the non-ACKed messages now
|
||||
PubPacket2 = case ConnAckRem of
|
||||
<<>> ->
|
||||
?receive_tcp_packet(200, Sock2, Packet, Packet);
|
||||
_ ->
|
||||
ConnAckRem
|
||||
end,
|
||||
?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _},
|
||||
emqx_frame:parse(PubPacket2)),
|
||||
|
||||
%% ... and emqx should resend the message 1s later, as we didn't ACK it.
|
||||
?receive_tcp_packet(1200, Sock2, Packet,
|
||||
begin
|
||||
?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _},
|
||||
emqx_frame:parse(Packet))
|
||||
end),
|
||||
|
||||
%% we ACK it now, then emqx should stop the resending
|
||||
ok = gen_tcp:send(Sock2, make_connect_packet(ClientId, false)),
|
||||
receive
|
||||
{tcp, Sock2, _PACKET} ->
|
||||
{ok, MqttPacket, <<>>, _} = emqx_frame:parse(_PACKET),
|
||||
?assertNotMatch(?PUBLISH_PACKET(_), MqttPacket)
|
||||
after 1200 -> ok
|
||||
end,
|
||||
ok = gen_tcp:close(Sock1).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
action_point(Action, N) ->
|
||||
Action(N).
|
||||
|
||||
make_connect_packet(ClientId, CleanStart) ->
|
||||
emqx_frame:serialize(?CONNECT_PACKET(
|
||||
#mqtt_packet_connect{
|
||||
proto_name = <<"MQTT">>,
|
||||
clean_start = CleanStart,
|
||||
keepalive = 0,
|
||||
clientid = ClientId
|
||||
}
|
||||
)).
|
||||
make_connack_packet(Code, SP) ->
|
||||
emqx_frame:serialize(?CONNACK_PACKET(Code, SP)).
|
||||
|
||||
make_publish_packet(Topic, QoS, PacketId, Payload) ->
|
||||
emqx_frame:serialize(
|
||||
?PUBLISH_PACKET(QoS, Topic, PacketId, Payload)).
|
||||
|
||||
make_subscribe_packet(TopicFileter, QoS, PacketId) ->
|
||||
emqx_frame:serialize(
|
||||
?SUBSCRIBE_PACKET(PacketId, [{TopicFileter, #{rh => 0, rap => 0, nl => 0, qos => QoS}}])).
|
||||
|
||||
recv_msgs(Count) ->
|
||||
recv_msgs(Count, []).
|
||||
|
||||
|
|
Loading…
Reference in New Issue