From 2351b41f119835d7e9f4d1e07a47375a48367a50 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 30 Aug 2018 11:40:53 +0800 Subject: [PATCH] Add is_expired/1, check_expiry/1, check_expiry/2 --- src/emqx_message.erl | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 8b49d44d6..d700c2aee 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -22,6 +22,7 @@ -export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]). -export([set_headers/2]). -export([get_header/2, get_header/3, set_header/3]). +-export([is_expired/1, check_expiry/1, check_expiry/2]). -export([format/1]). -type(flag() :: atom()). @@ -93,6 +94,31 @@ set_header(Hdr, Val, Msg = #message{headers = undefined}) -> set_header(Hdr, Val, Msg = #message{headers = Headers}) -> Msg#message{headers = maps:put(Hdr, Val, Headers)}. +-spec(is_expired(emqx_types:message()) -> boolean()). +is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) -> + elapsed(CreatedAt) > Interval; +is_expired(_Msg) -> + false. + +-spec(check_expiry(emqx_types:message()) -> {ok, pos_integer()} | expired | false). +check_expiry(Msg = #message{timestamp = CreatedAt}) -> + check_expiry(Msg, CreatedAt); +check_expiry(_Msg) -> + false. + +-spec(check_expiry(emqx_types:message(), erlang:timestamp()) -> {ok, pos_integer()} | expired | false). +check_expiry(#message{headers = #{'Message-Expiry-Interval' := Interval}}, Since) -> + case Interval - elapsed(Since) of + I when I > 0 -> {ok, I}; + _ -> expired + end; +check_expiry(_Msg, _Since) -> + false. + +elapsed(Since) -> + Secs = timer:now_diff(os:timestamp(), Since) div 1000, + if Secs < 0 -> 0; true -> Secs end. + format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) -> io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~s, Flags=~s, Headers=~s)", [Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]).