commit
bfa9fc675c
|
@ -453,6 +453,8 @@ handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) ->
|
||||||
{noreply,
|
{noreply,
|
||||||
case maps:take(PacketId, AwaitingRel) of
|
case maps:take(PacketId, AwaitingRel) of
|
||||||
{Msg, AwaitingRel1} ->
|
{Msg, AwaitingRel1} ->
|
||||||
|
%% Implement Qos2 by method A [MQTT 4.33]
|
||||||
|
%% Dispatch to subscriber when received PUBREL
|
||||||
spawn(emqttd_server, publish, [Msg]), %%:)
|
spawn(emqttd_server, publish, [Msg]), %%:)
|
||||||
gc(State#state{awaiting_rel = AwaitingRel1});
|
gc(State#state{awaiting_rel = AwaitingRel1});
|
||||||
error ->
|
error ->
|
||||||
|
@ -628,8 +630,10 @@ retry_delivery(Force, [{Type, Msg, Ts} | Msgs], Now,
|
||||||
redeliver(Msg, State),
|
redeliver(Msg, State),
|
||||||
Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}),
|
Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}),
|
||||||
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
|
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
|
||||||
{pubrel, PacketId} -> %% remove 'pubrel' directly?
|
{pubrel, PacketId} ->
|
||||||
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight:delete(PacketId)})
|
redeliver({pubrel, PacketId}, State),
|
||||||
|
Inflight1 = Inflight:update(PacketId, {pubrel, PacketId, Now}),
|
||||||
|
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1})
|
||||||
end;
|
end;
|
||||||
true ->
|
true ->
|
||||||
State#state{retry_timer = start_timer(Interval - Diff, retry_delivery)}
|
State#state{retry_timer = start_timer(Interval - Diff, retry_delivery)}
|
||||||
|
@ -649,11 +653,13 @@ expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) ->
|
||||||
expire_awaiting_rel([], _Now, State) ->
|
expire_awaiting_rel([], _Now, State) ->
|
||||||
State#state{await_rel_timer = undefined};
|
State#state{await_rel_timer = undefined};
|
||||||
|
|
||||||
expire_awaiting_rel([{PacketId, #mqtt_message{timestamp = TS}} | Msgs],
|
expire_awaiting_rel([{PacketId, Msg = #mqtt_message{timestamp = TS}} | Msgs],
|
||||||
Now, State = #state{awaiting_rel = AwaitingRel,
|
Now, State = #state{awaiting_rel = AwaitingRel,
|
||||||
await_rel_timeout = Timeout}) ->
|
await_rel_timeout = Timeout}) ->
|
||||||
case (timer:now_diff(Now, TS) div 1000) of
|
case (timer:now_diff(Now, TS) div 1000) of
|
||||||
Diff when Diff >= Timeout ->
|
Diff when Diff >= Timeout ->
|
||||||
|
?LOG(warning, "Dropped Qos2 Message for await_rel_timeout: ~p", [Msg], State),
|
||||||
|
emqttd_metrics:inc('messages/qos2/dropped'),
|
||||||
expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
|
expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
|
||||||
Diff ->
|
Diff ->
|
||||||
State#state{await_rel_timer = start_timer(Timeout - Diff, check_awaiting_rel)}
|
State#state{await_rel_timer = start_timer(Timeout - Diff, check_awaiting_rel)}
|
||||||
|
@ -714,7 +720,10 @@ enqueue_msg(Msg, State = #state{mqueue = Q}) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
redeliver(Msg = #mqtt_message{qos = QoS}, State) ->
|
redeliver(Msg = #mqtt_message{qos = QoS}, State) ->
|
||||||
deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State).
|
deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State);
|
||||||
|
|
||||||
|
redeliver({pubrel, PacketId}, #state{client_pid = Pid}) ->
|
||||||
|
Pid ! {redeliver, {?PUBREL, PacketId}}.
|
||||||
|
|
||||||
deliver(Msg, #state{client_pid = Pid}) ->
|
deliver(Msg, #state{client_pid = Pid}) ->
|
||||||
inc_stats(deliver_msg),
|
inc_stats(deliver_msg),
|
||||||
|
|
Loading…
Reference in New Issue