Improve the process logic of DUP flag (#1319).
This commit is contained in:
parent
f31485f342
commit
cae743803b
|
@ -124,7 +124,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length)
|
||||||
_ -> <<Id:16/big, R/binary>> = Rest1,
|
_ -> <<Id:16/big, R/binary>> = Rest1,
|
||||||
{Id, R}
|
{Id, R}
|
||||||
end,
|
end,
|
||||||
wrap(Header, #mqtt_packet_publish{topic_name = TopicName,
|
wrap(fixdup(Header), #mqtt_packet_publish{topic_name = TopicName,
|
||||||
packet_id = PacketId},
|
packet_id = PacketId},
|
||||||
Payload, Rest);
|
Payload, Rest);
|
||||||
{?PUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
{?PUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
||||||
|
@ -222,3 +222,9 @@ fixqos(?SUBSCRIBE, 0) -> 1;
|
||||||
fixqos(?UNSUBSCRIBE, 0) -> 1;
|
fixqos(?UNSUBSCRIBE, 0) -> 1;
|
||||||
fixqos(_Type, QoS) -> QoS.
|
fixqos(_Type, QoS) -> QoS.
|
||||||
|
|
||||||
|
%% Fix Issue#1319
|
||||||
|
fixdup(Header = #mqtt_packet_header{qos = ?QOS0, dup = true}) ->
|
||||||
|
Header#mqtt_packet_header{dup = false};
|
||||||
|
fixdup(Header = #mqtt_packet_header{qos = ?QOS2, dup = true}) ->
|
||||||
|
Header#mqtt_packet_header{dup = false};
|
||||||
|
fixdup(Header) -> Header.
|
||||||
|
|
|
@ -308,9 +308,7 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
|
||||||
username = Username,
|
username = Username,
|
||||||
mountpoint = MountPoint,
|
mountpoint = MountPoint,
|
||||||
session = Session}) ->
|
session = Session}) ->
|
||||||
Msg0 = emqttd_message:from_packet(Username, ClientId, Packet),
|
Msg = emqttd_message:from_packet(Username, ClientId, Packet),
|
||||||
% MQTT 3.3.1-3: Need reset DUP flag when recv publish message
|
|
||||||
Msg = emqttd_message:unset_flag(dup, Msg0),
|
|
||||||
emqttd_session:publish(Session, mount(MountPoint, Msg));
|
emqttd_session:publish(Session, mount(MountPoint, Msg));
|
||||||
|
|
||||||
publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
|
publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
|
||||||
|
@ -324,9 +322,7 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
|
||||||
username = Username,
|
username = Username,
|
||||||
mountpoint = MountPoint,
|
mountpoint = MountPoint,
|
||||||
session = Session}) ->
|
session = Session}) ->
|
||||||
Msg0 = emqttd_message:from_packet(Username, ClientId, Packet),
|
Msg = emqttd_message:from_packet(Username, ClientId, Packet),
|
||||||
% MQTT 3.3.1-3: Need reset DUP flag when recv publish message
|
|
||||||
Msg = emqttd_message:unset_flag(dup, Msg0),
|
|
||||||
case emqttd_session:publish(Session, mount(MountPoint, Msg)) of
|
case emqttd_session:publish(Session, mount(MountPoint, Msg)) of
|
||||||
ok ->
|
ok ->
|
||||||
send(?PUBACK_PACKET(Type, PacketId), State);
|
send(?PUBACK_PACKET(Type, PacketId), State);
|
||||||
|
|
|
@ -535,11 +535,11 @@ handle_info({dispatch, Topic, Msg = #mqtt_message{from = {ClientId, _}}},
|
||||||
ignore_loop_deliver = IgnoreLoopDeliver}) when is_record(Msg, mqtt_message) ->
|
ignore_loop_deliver = IgnoreLoopDeliver}) when is_record(Msg, mqtt_message) ->
|
||||||
case IgnoreLoopDeliver of
|
case IgnoreLoopDeliver of
|
||||||
true -> {noreply, State, hibernate};
|
true -> {noreply, State, hibernate};
|
||||||
false -> {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate}
|
false -> {noreply, handle_dispatch(Topic, Msg, State), hibernate}
|
||||||
end;
|
end;
|
||||||
%% Dispatch Message
|
%% Dispatch Message
|
||||||
handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) ->
|
handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) ->
|
||||||
{noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate};
|
{noreply, handle_dispatch(Topic, Msg, State), hibernate};
|
||||||
|
|
||||||
%% Do nothing if the client has been disconnected.
|
%% Do nothing if the client has been disconnected.
|
||||||
handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) ->
|
handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) ->
|
||||||
|
@ -687,6 +687,9 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen})
|
||||||
%% Dispatch Messages
|
%% Dispatch Messages
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
handle_dispatch(Topic, Msg, State) ->
|
||||||
|
gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State)).
|
||||||
|
|
||||||
%% Enqueue message if the client has been disconnected
|
%% Enqueue message if the client has been disconnected
|
||||||
dispatch(Msg, State = #state{client_pid = undefined}) ->
|
dispatch(Msg, State = #state{client_pid = undefined}) ->
|
||||||
enqueue_msg(Msg, State);
|
enqueue_msg(Msg, State);
|
||||||
|
@ -801,6 +804,14 @@ tune_qos(Topic, Msg = #mqtt_message{qos = PubQoS},
|
||||||
Msg
|
Msg
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Reset Dup
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
reset_dup(Msg = #mqtt_message{dup = true}) ->
|
||||||
|
Msg#mqtt_message{dup = false};
|
||||||
|
reset_dup(Msg) -> Msg.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Next Msg Id
|
%% Next Msg Id
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue