Add hook message.acked
This commit is contained in:
parent
a93f8c6788
commit
a19777c2e6
|
@ -378,6 +378,7 @@ do_publish(PacketId, Msg = #message{timestamp = Ts},
|
||||||
puback(PacketId, Session = #session{inflight = Inflight}) ->
|
puback(PacketId, Session = #session{inflight = Inflight}) ->
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
||||||
|
ok = emqx_hooks:run('message.acked', [Msg]),
|
||||||
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
||||||
dequeue(Session#session{inflight = Inflight1});
|
dequeue(Session#session{inflight = Inflight1});
|
||||||
{value, {_OtherPub, _Ts}} ->
|
{value, {_OtherPub, _Ts}} ->
|
||||||
|
@ -398,6 +399,7 @@ puback(PacketId, Session = #session{inflight = Inflight}) ->
|
||||||
pubrec(PacketId, Session = #session{inflight = Inflight}) ->
|
pubrec(PacketId, Session = #session{inflight = Inflight}) ->
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
||||||
|
ok = emqx_hooks:run('message.acked', [Msg]),
|
||||||
Inflight1 = emqx_inflight:update(PacketId, {pubrel, os:timestamp()}, Inflight),
|
Inflight1 = emqx_inflight:update(PacketId, {pubrel, os:timestamp()}, Inflight),
|
||||||
{ok, Session#session{inflight = Inflight1}};
|
{ok, Session#session{inflight = Inflight1}};
|
||||||
{value, {pubrel, _Ts}} ->
|
{value, {pubrel, _Ts}} ->
|
||||||
|
|
Loading…
Reference in New Issue