From ed81eb5a9d2b916ef5a4775f1098a399626b6719 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 6 Jul 2015 15:26:30 +0800 Subject: [PATCH] 'client.acked' hook --- src/emqttd_pubsub.erl | 1 + src/emqttd_session.erl | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 83f321fcd..a81ce1af0 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -158,6 +158,7 @@ cast(Msg) -> %%------------------------------------------------------------------------------ -spec publish(Msg :: mqtt_message()) -> ok. publish(#mqtt_message{from = From} = Msg) -> + trace(publish, From, Msg), Msg1 = #mqtt_message{topic = Topic} = emqttd_broker:foldl_hooks('client.publish', [], Msg), diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 6b32b844b..05ceab617 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -596,8 +596,15 @@ await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting, Awaiting1 = maps:put(PktId, {{Retries, Timeout}, TRef}, Awaiting), Session#session{awaiting_ack = Awaiting1}. -acked(PktId, Session = #session{inflight_queue = InflightQ, +acked(PktId, Session = #session{client_id = ClientId, + inflight_queue = InflightQ, awaiting_ack = Awaiting}) -> + case lists:keyfind(PktId, 1, InflightQ) of + {_, Msg} -> + emqttd_broker:foldl_hooks('client.acked', [ClientId], Msg); + false -> + lager:error("Session(~s) cannot find acked message: ~p", [PktId]) + end, Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), awaiting_ack = maps:remove(PktId, Awaiting)}.