Add update_expiry/1 function
This commit is contained in:
parent
7b58718280
commit
021d43755f
|
@ -22,7 +22,7 @@
|
||||||
-export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]).
|
-export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]).
|
||||||
-export([set_headers/2]).
|
-export([set_headers/2]).
|
||||||
-export([get_header/2, get_header/3, set_header/3]).
|
-export([get_header/2, get_header/3, set_header/3]).
|
||||||
-export([is_expired/1, check_expiry/1, check_expiry/2]).
|
-export([is_expired/1, check_expiry/1, check_expiry/2, update_expiry/1]).
|
||||||
-export([format/1]).
|
-export([format/1]).
|
||||||
|
|
||||||
-type(flag() :: atom()).
|
-type(flag() :: atom()).
|
||||||
|
@ -96,7 +96,7 @@ set_header(Hdr, Val, Msg = #message{headers = Headers}) ->
|
||||||
|
|
||||||
-spec(is_expired(emqx_types:message()) -> boolean()).
|
-spec(is_expired(emqx_types:message()) -> boolean()).
|
||||||
is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) ->
|
is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) ->
|
||||||
elapsed(CreatedAt) > Interval;
|
elapsed(CreatedAt) > timer:seconds(Interval);
|
||||||
is_expired(_Msg) ->
|
is_expired(_Msg) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
|
@ -108,16 +108,23 @@ check_expiry(_Msg) ->
|
||||||
|
|
||||||
-spec(check_expiry(emqx_types:message(), erlang:timestamp()) -> {ok, pos_integer()} | expired | false).
|
-spec(check_expiry(emqx_types:message(), erlang:timestamp()) -> {ok, pos_integer()} | expired | false).
|
||||||
check_expiry(#message{headers = #{'Message-Expiry-Interval' := Interval}}, Since) ->
|
check_expiry(#message{headers = #{'Message-Expiry-Interval' := Interval}}, Since) ->
|
||||||
case Interval - elapsed(Since) of
|
case Interval - (elapsed(Since) div 1000) of
|
||||||
I when I > 0 -> {ok, I};
|
Timeout when Timeout > 0 -> {ok, Timeout};
|
||||||
_ -> expired
|
_ -> expired
|
||||||
end;
|
end;
|
||||||
check_expiry(_Msg, _Since) ->
|
check_expiry(_Msg, _Since) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
|
update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) ->
|
||||||
|
case elapsed(CreatedAt) of
|
||||||
|
Elapsed when Elapsed > 0 ->
|
||||||
|
set_header('Message-Expiry-Interval', max(1, Interval - (Elapsed div 1000)), Msg);
|
||||||
|
_ -> Msg
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% MilliSeconds
|
||||||
elapsed(Since) ->
|
elapsed(Since) ->
|
||||||
Secs = timer:now_diff(os:timestamp(), Since) div 1000,
|
max(0, timer:now_diff(os:timestamp(), Since) div 1000).
|
||||||
if Secs < 0 -> 0; true -> Secs end.
|
|
||||||
|
|
||||||
format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) ->
|
format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) ->
|
||||||
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~s, Flags=~s, Headers=~s)",
|
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~s, Flags=~s, Headers=~s)",
|
||||||
|
|
Loading…
Reference in New Issue