From 6836a502ac61f25dd6c11e7b7f97d77bf51a459f Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 27 Sep 2022 13:24:54 +0800 Subject: [PATCH] fix(delayed): Improve time precision of delayed messages Use milliseconds internally in emqx_delayed to store the publish time, improving precision --- apps/emqx_modules/src/emqx_delayed.erl | 22 ++++++------ apps/emqx_modules/test/emqx_delayed_SUITE.erl | 34 +++++++++++++++++++ 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index ac7f75158..76646bc64 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -91,6 +91,7 @@ -define(SERVER, ?MODULE). -define(MAX_INTERVAL, 4294967). -define(FORMAT_FUN, {?MODULE, format_delayed}). +-define(NOW, erlang:system_time(milli_seconds)). %%-------------------------------------------------------------------- %% Mnesia bootstrap @@ -118,12 +119,13 @@ on_message_publish( {PubAt, Delayed} = case binary_to_integer(Delay) of Interval when Interval < ?MAX_INTERVAL -> - {Interval + erlang:round(Ts / 1000), Interval}; + {Interval * 1000 + Ts, Interval}; Timestamp -> %% Check malicious timestamp? - case (Timestamp - erlang:round(Ts / 1000)) > ?MAX_INTERVAL of + Internal = Timestamp - erlang:round(Ts / 1000), + case Internal > ?MAX_INTERVAL of true -> error(invalid_delayed_timestamp); - false -> {Timestamp, Timestamp - erlang:round(Ts / 1000)} + false -> {Timestamp * 1000, Internal} end end, PubMsg = Msg#message{topic = Topic1}, @@ -189,14 +191,14 @@ format_delayed( WithPayload ) -> PublishTime = to_rfc3339(PublishTimeStamp div 1000), - ExpectTime = to_rfc3339(ExpectTimeStamp), - RemainingTime = ExpectTimeStamp - erlang:system_time(second), + ExpectTime = to_rfc3339(ExpectTimeStamp div 1000), + RemainingTime = ExpectTimeStamp - ?NOW, Result = #{ msgid => emqx_guid:to_hexstr(Id), node => node(), publish_at => PublishTime, delayed_interval => Delayed, - delayed_remaining => RemainingTime, + delayed_remaining => RemainingTime div 1000, expected_at => ExpectTime, topic => Topic, qos => Qos, @@ -296,7 +298,7 @@ handle_cast(Msg, State) -> %% Do Publish... handle_info({timeout, TRef, do_publish}, State = #{publish_timer := TRef}) -> - DeletedKeys = do_publish(mnesia:dirty_first(?TAB), erlang:system_time(seconds)), + DeletedKeys = do_publish(mnesia:dirty_first(?TAB), ?NOW), lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys), {noreply, ensure_publish_timer(State#{publish_timer := undefined, publish_at := 0})}; handle_info(stats, State = #{stats_fun := StatsFun}) -> @@ -347,18 +349,18 @@ ensure_publish_timer(State) -> ensure_publish_timer('$end_of_table', State) -> State#{publish_timer := undefined, publish_at := 0}; ensure_publish_timer({Ts, _Id}, State = #{publish_timer := undefined}) -> - ensure_publish_timer(Ts, erlang:system_time(seconds), State); + ensure_publish_timer(Ts, ?NOW, State); ensure_publish_timer({Ts, _Id}, State = #{publish_timer := TRef, publish_at := PubAt}) when Ts < PubAt -> ok = emqx_misc:cancel_timer(TRef), - ensure_publish_timer(Ts, erlang:system_time(seconds), State); + ensure_publish_timer(Ts, ?NOW, State); ensure_publish_timer(_Key, State) -> State. ensure_publish_timer(Ts, Now, State) -> Interval = max(1, Ts - Now), - TRef = emqx_misc:start_timer(timer:seconds(Interval), do_publish), + TRef = emqx_misc:start_timer(Interval, do_publish), State#{publish_timer := TRef, publish_at := Now + Interval}. do_publish(Key, Now) -> diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index d1af9a064..e5e3db98c 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -202,3 +202,37 @@ t_get_basic_usage_info(_Config) -> ), ?assertEqual(#{delayed_message_count => 4}, emqx_delayed:get_basic_usage_info()), ok. + +t_delayed_precision(_) -> + MaxSpan = 1250, + FutureDiff = subscribe_proc(), + DelayedMsg0 = emqx_message:make( + ?MODULE, 1, <<"$delayed/1/delayed/test">>, <<"delayed/test">> + ), + _ = on_message_publish(DelayedMsg0), + ?assert(FutureDiff() =< MaxSpan). + +subscribe_proc() -> + Self = self(), + Ref = erlang:make_ref(), + erlang:spawn(fun() -> + Topic = <<"delayed/+">>, + emqx_broker:subscribe(Topic), + Self ! + {Ref, + receive + {deliver, Topic, Msg} -> + erlang:system_time(milli_seconds) - Msg#message.timestamp + after 2000 -> + 2000 + end}, + emqx_broker:unsubscribe(Topic) + end), + fun() -> + receive + {Ref, Diff} -> + Diff + after 2000 -> + 2000 + end + end.