diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 854dee0a5..aa1a027a4 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -453,6 +453,8 @@ handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) -> {noreply, case maps:take(PacketId, AwaitingRel) of {Msg, AwaitingRel1} -> + %% Implement Qos2 by method A [MQTT 4.33] + %% Dispatch to subscriber when received PUBREL spawn(emqttd_server, publish, [Msg]), %%:) gc(State#state{awaiting_rel = AwaitingRel1}); error -> @@ -628,8 +630,10 @@ retry_delivery(Force, [{Type, Msg, Ts} | Msgs], Now, redeliver(Msg, State), Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}), retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); - {pubrel, PacketId} -> %% remove 'pubrel' directly? - retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight:delete(PacketId)}) + {pubrel, PacketId} -> + redeliver({pubrel, PacketId}, State), + Inflight1 = Inflight:update(PacketId, {pubrel, PacketId, Now}), + retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}) end; true -> State#state{retry_timer = start_timer(Interval - Diff, retry_delivery)} @@ -649,11 +653,13 @@ expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) -> expire_awaiting_rel([], _Now, State) -> State#state{await_rel_timer = undefined}; -expire_awaiting_rel([{PacketId, #mqtt_message{timestamp = TS}} | Msgs], +expire_awaiting_rel([{PacketId, Msg = #mqtt_message{timestamp = TS}} | Msgs], Now, State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> case (timer:now_diff(Now, TS) div 1000) of Diff when Diff >= Timeout -> + ?LOG(warning, "Dropped Qos2 Message for await_rel_timeout: ~p", [Msg], State), + emqttd_metrics:inc('messages/qos2/dropped'), expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); Diff -> State#state{await_rel_timer = start_timer(Timeout - Diff, check_awaiting_rel)} @@ -714,7 +720,10 @@ enqueue_msg(Msg, State = #state{mqueue = Q}) -> %%-------------------------------------------------------------------- redeliver(Msg = #mqtt_message{qos = QoS}, State) -> - deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State). + deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State); + +redeliver({pubrel, PacketId}, #state{client_pid = Pid}) -> + Pid ! {redeliver, {?PUBREL, PacketId}}. deliver(Msg, #state{client_pid = Pid}) -> inc_stats(deliver_msg),