chore(emqx_retainer): refresh the timestamp when dispatch retained message (#6155)
1. refresh the timestamp when dispatch retained message 2. fix some elvis style error
This commit is contained in:
parent
8bf6668e4c
commit
ad4d3fc652
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_retainer,
|
{application, emqx_retainer,
|
||||||
[{description, "EMQ X Retainer"},
|
[{description, "EMQ X Retainer"},
|
||||||
{vsn, "4.4.0"}, % strict semver, bump manually!
|
{vsn, "4.4.1"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_retainer_sup]},
|
{registered, [emqx_retainer_sup]},
|
||||||
{applications, [kernel,stdlib]},
|
{applications, [kernel,stdlib]},
|
||||||
|
|
|
@ -78,7 +78,8 @@ dispatch(Pid, Topic) ->
|
||||||
false -> read_messages(Topic);
|
false -> read_messages(Topic);
|
||||||
true -> match_messages(Topic)
|
true -> match_messages(Topic)
|
||||||
end,
|
end,
|
||||||
[Pid ! {deliver, Topic, Msg} || Msg <- sort_retained(Msgs)].
|
Now = erlang:system_time(millisecond),
|
||||||
|
[Pid ! {deliver, Topic, refresh_timestamp_expiry(Msg, Now)} || Msg <- sort_retained(Msgs)].
|
||||||
|
|
||||||
%% RETAIN flag set to 1 and payload containing zero bytes
|
%% RETAIN flag set to 1 and payload containing zero bytes
|
||||||
on_message_publish(Msg = #message{flags = #{retain := true},
|
on_message_publish(Msg = #message{flags = #{retain := true},
|
||||||
|
@ -146,7 +147,7 @@ init([Env]) ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
StatsFun = emqx_stats:statsfun('retained.count', 'retained.max'),
|
StatsFun = emqx_stats:statsfun('retained.count', 'retained.max'),
|
||||||
{ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats),
|
StatsTimer = erlang:send_after(timer:seconds(1), self(), stats),
|
||||||
State = #state{stats_fun = StatsFun, stats_timer = StatsTimer},
|
State = #state{stats_fun = StatsFun, stats_timer = StatsTimer},
|
||||||
{ok, start_expire_timer(proplists:get_value(expiry_interval, Env, 0), State)}.
|
{ok, start_expire_timer(proplists:get_value(expiry_interval, Env, 0), State)}.
|
||||||
|
|
||||||
|
@ -155,7 +156,7 @@ start_expire_timer(0, State) ->
|
||||||
start_expire_timer(undefined, State) ->
|
start_expire_timer(undefined, State) ->
|
||||||
State;
|
State;
|
||||||
start_expire_timer(Ms, State) ->
|
start_expire_timer(Ms, State) ->
|
||||||
{ok, Timer} = timer:send_interval(Ms, expire),
|
Timer = erlang:send_after(Ms, self(), stats),
|
||||||
State#state{expiry_timer = Timer}.
|
State#state{expiry_timer = Timer}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
@ -194,7 +195,7 @@ sort_retained([]) -> [];
|
||||||
sort_retained([Msg]) -> [Msg];
|
sort_retained([Msg]) -> [Msg];
|
||||||
sort_retained(Msgs) ->
|
sort_retained(Msgs) ->
|
||||||
lists:sort(fun(#message{timestamp = Ts1}, #message{timestamp = Ts2}) ->
|
lists:sort(fun(#message{timestamp = Ts1}, #message{timestamp = Ts2}) ->
|
||||||
Ts1 =< Ts2
|
Ts1 =< Ts2
|
||||||
end, Msgs).
|
end, Msgs).
|
||||||
|
|
||||||
store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) ->
|
store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) ->
|
||||||
|
@ -209,11 +210,13 @@ store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) ->
|
||||||
fun() ->
|
fun() ->
|
||||||
case mnesia:read(?TAB, Topic) of
|
case mnesia:read(?TAB, Topic) of
|
||||||
[_] ->
|
[_] ->
|
||||||
mnesia:write(?TAB, #retained{topic = topic2tokens(Topic),
|
mnesia:write(?TAB,
|
||||||
msg = Msg,
|
#retained{topic = topic2tokens(Topic),
|
||||||
expiry_time = get_expiry_time(Msg, Env)}, write);
|
msg = Msg,
|
||||||
|
expiry_time = get_expiry_time(Msg, Env)}, write);
|
||||||
[] ->
|
[] ->
|
||||||
?LOG(error, "Cannot retain message(topic=~s) for table is full!", [Topic])
|
?LOG(error,
|
||||||
|
"Cannot retain message(topic=~s) for table is full!", [Topic])
|
||||||
end
|
end
|
||||||
end),
|
end),
|
||||||
ok;
|
ok;
|
||||||
|
@ -234,7 +237,8 @@ is_too_big(Size, Env) ->
|
||||||
|
|
||||||
get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}, _Env) ->
|
get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}, _Env) ->
|
||||||
0;
|
0;
|
||||||
get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, timestamp = Ts}, _Env) ->
|
get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
|
||||||
|
timestamp = Ts}, _Env) ->
|
||||||
Ts + Interval * 1000;
|
Ts + Interval * 1000;
|
||||||
get_expiry_time(#message{timestamp = Ts}, Env) ->
|
get_expiry_time(#message{timestamp = Ts}, Env) ->
|
||||||
case proplists:get_value(expiry_interval, Env, 0) of
|
case proplists:get_value(expiry_interval, Env, 0) of
|
||||||
|
@ -303,3 +307,18 @@ condition(Ws) ->
|
||||||
false -> Ws1;
|
false -> Ws1;
|
||||||
_ -> (Ws1 -- ['#']) ++ '_'
|
_ -> (Ws1 -- ['#']) ++ '_'
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec(refresh_timestamp_expiry(emqx_types:message(), pos_integer()) -> emqx_types:message()).
|
||||||
|
refresh_timestamp_expiry(Msg = #message{headers =
|
||||||
|
#{properties :=
|
||||||
|
#{'Message-Expiry-Interval' := Interval} = Props},
|
||||||
|
timestamp = CreatedAt},
|
||||||
|
Now) ->
|
||||||
|
Elapsed = max(0, Now - CreatedAt),
|
||||||
|
Interval1 = max(1, Interval - (Elapsed div 1000)),
|
||||||
|
emqx_message:set_header(properties,
|
||||||
|
Props#{'Message-Expiry-Interval' => Interval1},
|
||||||
|
Msg#message{timestamp = Now});
|
||||||
|
|
||||||
|
refresh_timestamp_expiry(Msg, Now) ->
|
||||||
|
Msg#message{timestamp = Now}.
|
||||||
|
|
Loading…
Reference in New Issue