diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index a423fb9b7..4ef423b78 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -1,6 +1,6 @@ {application, emqx_retainer, [{description, "EMQ X Retainer"}, - {vsn, "4.4.0"}, % strict semver, bump manually! + {vsn, "4.4.1"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 340e6929d..fcc5652cc 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -78,7 +78,8 @@ dispatch(Pid, Topic) -> false -> read_messages(Topic); true -> match_messages(Topic) end, - [Pid ! {deliver, Topic, Msg} || Msg <- sort_retained(Msgs)]. + Now = erlang:system_time(millisecond), + [Pid ! {deliver, Topic, refresh_timestamp_expiry(Msg, Now)} || Msg <- sort_retained(Msgs)]. %% RETAIN flag set to 1 and payload containing zero bytes on_message_publish(Msg = #message{flags = #{retain := true}, @@ -146,7 +147,7 @@ init([Env]) -> ok end, StatsFun = emqx_stats:statsfun('retained.count', 'retained.max'), - {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats), + StatsTimer = erlang:send_after(timer:seconds(1), self(), stats), State = #state{stats_fun = StatsFun, stats_timer = StatsTimer}, {ok, start_expire_timer(proplists:get_value(expiry_interval, Env, 0), State)}. @@ -155,7 +156,7 @@ start_expire_timer(0, State) -> start_expire_timer(undefined, State) -> State; start_expire_timer(Ms, State) -> - {ok, Timer} = timer:send_interval(Ms, expire), + Timer = erlang:send_after(Ms, self(), stats), State#state{expiry_timer = Timer}. handle_call(Req, _From, State) -> @@ -194,7 +195,7 @@ sort_retained([]) -> []; sort_retained([Msg]) -> [Msg]; sort_retained(Msgs) -> lists:sort(fun(#message{timestamp = Ts1}, #message{timestamp = Ts2}) -> - Ts1 =< Ts2 + Ts1 =< Ts2 end, Msgs). store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) -> @@ -209,11 +210,13 @@ store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) -> fun() -> case mnesia:read(?TAB, Topic) of [_] -> - mnesia:write(?TAB, #retained{topic = topic2tokens(Topic), - msg = Msg, - expiry_time = get_expiry_time(Msg, Env)}, write); + mnesia:write(?TAB, + #retained{topic = topic2tokens(Topic), + msg = Msg, + expiry_time = get_expiry_time(Msg, Env)}, write); [] -> - ?LOG(error, "Cannot retain message(topic=~s) for table is full!", [Topic]) + ?LOG(error, + "Cannot retain message(topic=~s) for table is full!", [Topic]) end end), ok; @@ -234,7 +237,8 @@ is_too_big(Size, Env) -> get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}, _Env) -> 0; -get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, timestamp = Ts}, _Env) -> +get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, + timestamp = Ts}, _Env) -> Ts + Interval * 1000; get_expiry_time(#message{timestamp = Ts}, Env) -> case proplists:get_value(expiry_interval, Env, 0) of @@ -303,3 +307,18 @@ condition(Ws) -> false -> Ws1; _ -> (Ws1 -- ['#']) ++ '_' end. + +-spec(refresh_timestamp_expiry(emqx_types:message(), pos_integer()) -> emqx_types:message()). +refresh_timestamp_expiry(Msg = #message{headers = + #{properties := + #{'Message-Expiry-Interval' := Interval} = Props}, + timestamp = CreatedAt}, + Now) -> + Elapsed = max(0, Now - CreatedAt), + Interval1 = max(1, Interval - (Elapsed div 1000)), + emqx_message:set_header(properties, + Props#{'Message-Expiry-Interval' => Interval1}, + Msg#message{timestamp = Now}); + +refresh_timestamp_expiry(Msg, Now) -> + Msg#message{timestamp = Now}.