diff --git a/Makefile b/Makefile index 9aa90f94c..c4e0dc020 100644 --- a/Makefile +++ b/Makefile @@ -39,6 +39,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message +CT_SUITES = emqx_portal + CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 3bba42216..1c2ce1a27 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -176,11 +176,6 @@ -define(MAX_PACKET_ID, 16#ffff). -define(MAX_PACKET_SIZE, 16#fffffff). --define(BUMP_PACKET_ID(Base, Bump), - case Base + Bump of - __I__ when __I__ > ?MAX_PACKET_ID -> __I__ - ?MAX_PACKET_ID; - __I__ -> __I__ - end). %%-------------------------------------------------------------------- %% MQTT Frame Mask diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 71527871f..fbcfcc059 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -35,6 +35,7 @@ -export([pubcomp/2, pubcomp/3, pubcomp/4]). -export([subscriptions/1]). -export([info/1, stop/1]). +-export([next_packet_id/1, next_packet_id/2]). %% For test cases -export([pause/1, resume/1]). @@ -421,6 +422,19 @@ disconnect(Client, ReasonCode) -> disconnect(Client, ReasonCode, Properties) -> gen_statem:call(Client, {disconnect, ReasonCode, Properties}). +-spec next_packet_id(packet_id()) -> packet_id(). +next_packet_id(?MAX_PACKET_ID) -> 1; +next_packet_id(Id) -> Id + 1. + +-spec next_packet_id(packet_id(), integer()) -> packet_id(). +next_packet_id(Id, Bump) -> + true = (Bump < ?MAX_PACKET_ID div 2), %% assert + N = Id + Bump, + case N > ?MAX_PACKET_ID of + true -> N - ?MAX_PACKET_ID; + false -> N + end. + %%------------------------------------------------------------------------------ %% For test cases %%------------------------------------------------------------------------------ @@ -1354,7 +1368,7 @@ send(Packet, State = #state{socket = Sock, proto_ver = Ver}) Data = emqx_frame:serialize(Packet, #{version => Ver}), emqx_logger:debug("SEND Data: ~p", [Data]), case emqx_client_sock:send(Sock, Data) of - ok -> {ok, next_packet_id(State)}; + ok -> {ok, bump_last_packet_id(State)}; Error -> Error end. @@ -1397,8 +1411,6 @@ assign_packet_id([H | T], Id) -> assign_packet_id([], _Id) -> []. -next_packet_id(State = #state{last_packet_id = Id}) -> - State#state{last_packet_id = next_packet_id(Id)}; -next_packet_id(16#ffff) -> 1; -next_packet_id(Id) -> Id + 1. +bump_last_packet_id(State = #state{last_packet_id = Id}) -> + State#state{last_packet_id = next_packet_id(Id)}. diff --git a/src/portal/emqx_portal_mqtt.erl b/src/portal/emqx_portal_mqtt.erl index 0817e3fe2..de5bf6042 100644 --- a/src/portal/emqx_portal_mqtt.erl +++ b/src/portal/emqx_portal_mqtt.erl @@ -103,7 +103,7 @@ safe_stop(Pid, StopF, Timeout) -> send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, Batch) -> case emqx_client:publish(ClientPid, Batch) of {ok, BasePktId} -> - LastPktId = ?BUMP_PACKET_ID(BasePktId, length(Batch) - 1), + LastPktId = emqx_client:next_packet_id(BasePktId, length(Batch) - 1), AckCollector ! ?SENT(?RANGE(BasePktId, LastPktId)), %% return last pakcet id as batch reference {ok, LastPktId}; @@ -145,7 +145,7 @@ match_acks_1(Parent, {{value, PktId}, Acked}, [?RANGE(PktId, PktId) | Sent]) -> ok = emqx_portal:handle_ack(Parent, PktId), match_acks(Parent, Acked, Sent); match_acks_1(Parent, {{value, PktId}, Acked}, [?RANGE(PktId, Max) | Sent]) -> - match_acks(Parent, Acked, [?RANGE(PktId + 1, Max) | Sent]). + match_acks(Parent, Acked, [?RANGE(emqx_client:next_packet_id(PktId), Max) | Sent]). %% When puback for QoS-1 message is received from remote MQTT broker %% NOTE: no support for QoS-2 diff --git a/test/emqx_portal_SUITE.erl b/test/emqx_portal_SUITE.erl index c375ee994..96b11fcf9 100644 --- a/test/emqx_portal_SUITE.erl +++ b/test/emqx_portal_SUITE.erl @@ -156,6 +156,7 @@ t_mqtt(Config) when is_list(Config) -> end, Msgs), ok = receive_and_match_messages(Ref, Msgs), ok = emqx_portal:ensure_forward_present(Pid, SendToTopic2), + timer:sleep(200), Msgs2 = lists:seq(Max + 1, Max * 2), lists:foreach(fun(I) -> Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)), @@ -181,8 +182,11 @@ do_receive_and_match_messages(_Ref, []) -> ok; do_receive_and_match_messages(Ref, [I | Rest]) -> receive {Ref, timeout} -> erlang:error(timeout); - {Ref, [#{payload := P}]} -> - ?assertEqual(I, binary_to_integer(P)), + {Ref, [#{payload := P} = Msg]} -> + case I =:= binary_to_integer(P) of + true -> ok; + false -> throw({unexpected, Msg, [I | Rest]}) + end, do_receive_and_match_messages(Ref, Rest) end.