From 021d43755fa55a9840514c2ef2d040659683498a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 30 Aug 2018 18:28:02 +0800 Subject: [PATCH] Add update_expiry/1 function --- src/emqx_message.erl | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/emqx_message.erl b/src/emqx_message.erl index d700c2aee..9ff8e7511 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -22,7 +22,7 @@ -export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]). -export([set_headers/2]). -export([get_header/2, get_header/3, set_header/3]). --export([is_expired/1, check_expiry/1, check_expiry/2]). +-export([is_expired/1, check_expiry/1, check_expiry/2, update_expiry/1]). -export([format/1]). -type(flag() :: atom()). @@ -96,7 +96,7 @@ set_header(Hdr, Val, Msg = #message{headers = Headers}) -> -spec(is_expired(emqx_types:message()) -> boolean()). is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) -> - elapsed(CreatedAt) > Interval; + elapsed(CreatedAt) > timer:seconds(Interval); is_expired(_Msg) -> false. @@ -108,16 +108,23 @@ check_expiry(_Msg) -> -spec(check_expiry(emqx_types:message(), erlang:timestamp()) -> {ok, pos_integer()} | expired | false). check_expiry(#message{headers = #{'Message-Expiry-Interval' := Interval}}, Since) -> - case Interval - elapsed(Since) of - I when I > 0 -> {ok, I}; + case Interval - (elapsed(Since) div 1000) of + Timeout when Timeout > 0 -> {ok, Timeout}; _ -> expired end; check_expiry(_Msg, _Since) -> false. +update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) -> + case elapsed(CreatedAt) of + Elapsed when Elapsed > 0 -> + set_header('Message-Expiry-Interval', max(1, Interval - (Elapsed div 1000)), Msg); + _ -> Msg + end. + +%% MilliSeconds elapsed(Since) -> - Secs = timer:now_diff(os:timestamp(), Since) div 1000, - if Secs < 0 -> 0; true -> Secs end. + max(0, timer:now_diff(os:timestamp(), Since) div 1000). format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) -> io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~s, Flags=~s, Headers=~s)",