'client.acked' hook
This commit is contained in:
parent
4f20f9691a
commit
ed81eb5a9d
|
@ -158,6 +158,7 @@ cast(Msg) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec publish(Msg :: mqtt_message()) -> ok.
|
-spec publish(Msg :: mqtt_message()) -> ok.
|
||||||
publish(#mqtt_message{from = From} = Msg) ->
|
publish(#mqtt_message{from = From} = Msg) ->
|
||||||
|
|
||||||
trace(publish, From, Msg),
|
trace(publish, From, Msg),
|
||||||
|
|
||||||
Msg1 = #mqtt_message{topic = Topic} = emqttd_broker:foldl_hooks('client.publish', [], Msg),
|
Msg1 = #mqtt_message{topic = Topic} = emqttd_broker:foldl_hooks('client.publish', [], Msg),
|
||||||
|
|
|
@ -596,8 +596,15 @@ await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting,
|
||||||
Awaiting1 = maps:put(PktId, {{Retries, Timeout}, TRef}, Awaiting),
|
Awaiting1 = maps:put(PktId, {{Retries, Timeout}, TRef}, Awaiting),
|
||||||
Session#session{awaiting_ack = Awaiting1}.
|
Session#session{awaiting_ack = Awaiting1}.
|
||||||
|
|
||||||
acked(PktId, Session = #session{inflight_queue = InflightQ,
|
acked(PktId, Session = #session{client_id = ClientId,
|
||||||
|
inflight_queue = InflightQ,
|
||||||
awaiting_ack = Awaiting}) ->
|
awaiting_ack = Awaiting}) ->
|
||||||
|
case lists:keyfind(PktId, 1, InflightQ) of
|
||||||
|
{_, Msg} ->
|
||||||
|
emqttd_broker:foldl_hooks('client.acked', [ClientId], Msg);
|
||||||
|
false ->
|
||||||
|
lager:error("Session(~s) cannot find acked message: ~p", [PktId])
|
||||||
|
end,
|
||||||
Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ),
|
Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ),
|
||||||
awaiting_ack = maps:remove(PktId, Awaiting)}.
|
awaiting_ack = maps:remove(PktId, Awaiting)}.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue