diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 1ee80fbbf..871e3c7f2 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -435,7 +435,7 @@ deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); deliver({publish, PacketId, Msg}, PState = #pstate{is_bridge = IsBridge, mountpoint = MountPoint}) -> - _ = emqx_hooks:run('message.delivered', credentials(PState), Msg), + _ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg), Msg1 = emqx_mountpoint:unmount(MountPoint, clean_retain(IsBridge, Msg)), send(emqx_packet:from_message(PacketId, Msg1), PState); diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 33661e2c3..220e3efd8 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -761,7 +761,7 @@ await(PacketId, Msg, State = #state{inflight = Inflight, acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of - {value, {publish, Msg, _Ts}} -> + {value, {publish, {_, Msg}, _Ts}} -> emqx_hooks:run('message.acked', [#{client_id =>ClientId}], Msg), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> @@ -771,7 +771,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Infligh acked(pubrec, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of - {value, {publish, Msg, _Ts}} -> + {value, {publish, {_, Msg}, _Ts}} -> emqx_hooks:run('message.acked', [ClientId], Msg), State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)}; {value, {pubrel, PacketId, _Ts}} ->