fix(delayed): Improve time precision of delayed messages
Use milliseconds internally in emqx_delayed to store the publish time, improving precision
This commit is contained in:
parent
b2d69b85dc
commit
6836a502ac
|
@ -91,6 +91,7 @@
|
|||
-define(SERVER, ?MODULE).
|
||||
-define(MAX_INTERVAL, 4294967).
|
||||
-define(FORMAT_FUN, {?MODULE, format_delayed}).
|
||||
-define(NOW, erlang:system_time(milli_seconds)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Mnesia bootstrap
|
||||
|
@ -118,12 +119,13 @@ on_message_publish(
|
|||
{PubAt, Delayed} =
|
||||
case binary_to_integer(Delay) of
|
||||
Interval when Interval < ?MAX_INTERVAL ->
|
||||
{Interval + erlang:round(Ts / 1000), Interval};
|
||||
{Interval * 1000 + Ts, Interval};
|
||||
Timestamp ->
|
||||
%% Check malicious timestamp?
|
||||
case (Timestamp - erlang:round(Ts / 1000)) > ?MAX_INTERVAL of
|
||||
Internal = Timestamp - erlang:round(Ts / 1000),
|
||||
case Internal > ?MAX_INTERVAL of
|
||||
true -> error(invalid_delayed_timestamp);
|
||||
false -> {Timestamp, Timestamp - erlang:round(Ts / 1000)}
|
||||
false -> {Timestamp * 1000, Internal}
|
||||
end
|
||||
end,
|
||||
PubMsg = Msg#message{topic = Topic1},
|
||||
|
@ -189,14 +191,14 @@ format_delayed(
|
|||
WithPayload
|
||||
) ->
|
||||
PublishTime = to_rfc3339(PublishTimeStamp div 1000),
|
||||
ExpectTime = to_rfc3339(ExpectTimeStamp),
|
||||
RemainingTime = ExpectTimeStamp - erlang:system_time(second),
|
||||
ExpectTime = to_rfc3339(ExpectTimeStamp div 1000),
|
||||
RemainingTime = ExpectTimeStamp - ?NOW,
|
||||
Result = #{
|
||||
msgid => emqx_guid:to_hexstr(Id),
|
||||
node => node(),
|
||||
publish_at => PublishTime,
|
||||
delayed_interval => Delayed,
|
||||
delayed_remaining => RemainingTime,
|
||||
delayed_remaining => RemainingTime div 1000,
|
||||
expected_at => ExpectTime,
|
||||
topic => Topic,
|
||||
qos => Qos,
|
||||
|
@ -296,7 +298,7 @@ handle_cast(Msg, State) ->
|
|||
|
||||
%% Do Publish...
|
||||
handle_info({timeout, TRef, do_publish}, State = #{publish_timer := TRef}) ->
|
||||
DeletedKeys = do_publish(mnesia:dirty_first(?TAB), erlang:system_time(seconds)),
|
||||
DeletedKeys = do_publish(mnesia:dirty_first(?TAB), ?NOW),
|
||||
lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys),
|
||||
{noreply, ensure_publish_timer(State#{publish_timer := undefined, publish_at := 0})};
|
||||
handle_info(stats, State = #{stats_fun := StatsFun}) ->
|
||||
|
@ -347,18 +349,18 @@ ensure_publish_timer(State) ->
|
|||
ensure_publish_timer('$end_of_table', State) ->
|
||||
State#{publish_timer := undefined, publish_at := 0};
|
||||
ensure_publish_timer({Ts, _Id}, State = #{publish_timer := undefined}) ->
|
||||
ensure_publish_timer(Ts, erlang:system_time(seconds), State);
|
||||
ensure_publish_timer(Ts, ?NOW, State);
|
||||
ensure_publish_timer({Ts, _Id}, State = #{publish_timer := TRef, publish_at := PubAt}) when
|
||||
Ts < PubAt
|
||||
->
|
||||
ok = emqx_misc:cancel_timer(TRef),
|
||||
ensure_publish_timer(Ts, erlang:system_time(seconds), State);
|
||||
ensure_publish_timer(Ts, ?NOW, State);
|
||||
ensure_publish_timer(_Key, State) ->
|
||||
State.
|
||||
|
||||
ensure_publish_timer(Ts, Now, State) ->
|
||||
Interval = max(1, Ts - Now),
|
||||
TRef = emqx_misc:start_timer(timer:seconds(Interval), do_publish),
|
||||
TRef = emqx_misc:start_timer(Interval, do_publish),
|
||||
State#{publish_timer := TRef, publish_at := Now + Interval}.
|
||||
|
||||
do_publish(Key, Now) ->
|
||||
|
|
|
@ -202,3 +202,37 @@ t_get_basic_usage_info(_Config) ->
|
|||
),
|
||||
?assertEqual(#{delayed_message_count => 4}, emqx_delayed:get_basic_usage_info()),
|
||||
ok.
|
||||
|
||||
t_delayed_precision(_) ->
|
||||
MaxSpan = 1250,
|
||||
FutureDiff = subscribe_proc(),
|
||||
DelayedMsg0 = emqx_message:make(
|
||||
?MODULE, 1, <<"$delayed/1/delayed/test">>, <<"delayed/test">>
|
||||
),
|
||||
_ = on_message_publish(DelayedMsg0),
|
||||
?assert(FutureDiff() =< MaxSpan).
|
||||
|
||||
subscribe_proc() ->
|
||||
Self = self(),
|
||||
Ref = erlang:make_ref(),
|
||||
erlang:spawn(fun() ->
|
||||
Topic = <<"delayed/+">>,
|
||||
emqx_broker:subscribe(Topic),
|
||||
Self !
|
||||
{Ref,
|
||||
receive
|
||||
{deliver, Topic, Msg} ->
|
||||
erlang:system_time(milli_seconds) - Msg#message.timestamp
|
||||
after 2000 ->
|
||||
2000
|
||||
end},
|
||||
emqx_broker:unsubscribe(Topic)
|
||||
end),
|
||||
fun() ->
|
||||
receive
|
||||
{Ref, Diff} ->
|
||||
Diff
|
||||
after 2000 ->
|
||||
2000
|
||||
end
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue