From bcd15e93889b702aa4c2a1e3a97d409bbae4d7f3 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 26 Aug 2021 15:11:41 +0800 Subject: [PATCH] fix: delayed message api doc (#5569) * fix: delayed message api doc & add delayed message internal & remaning params --- apps/emqx_modules/src/emqx_delayed.erl | 24 +++++++++++++------ apps/emqx_modules/src/emqx_delayed_api.erl | 9 ++++--- apps/emqx_modules/test/emqx_delayed_SUITE.erl | 2 +- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index f7de5d69d..b773f04ac 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -50,7 +50,7 @@ , delete_delayed_message/1 ]). --record(delayed_message, {key, msg}). +-record(delayed_message, {key, delayed, msg}). -define(TAB, ?MODULE). -define(SERVER, ?MODULE). @@ -78,19 +78,19 @@ on_message_publish(Msg = #message{ timestamp = Ts }) -> [Delay, Topic1] = binary:split(Topic, <<"/">>), - PubAt = case binary_to_integer(Delay) of + {PubAt, Delayed} = case binary_to_integer(Delay) of Interval when Interval < ?MAX_INTERVAL -> - Interval + erlang:round(Ts / 1000); + {Interval + erlang:round(Ts / 1000), Interval}; Timestamp -> %% Check malicious timestamp? case (Timestamp - erlang:round(Ts / 1000)) > ?MAX_INTERVAL of true -> error(invalid_delayed_timestamp); - false -> Timestamp + false -> {Timestamp, Timestamp - erlang:round(Ts / 1000)} end end, PubMsg = Msg#message{topic = Topic1}, Headers = PubMsg#message.headers, - case store(#delayed_message{key = {PubAt, Id}, msg = PubMsg}) of + case store(#delayed_message{key = {PubAt, Id}, delayed = Delayed, msg = PubMsg}) of ok -> ok; {error, Error} -> ?LOG(error, "Store delayed message fail: ~p", [Error]) @@ -128,15 +128,22 @@ list(Params) -> format_delayed(Delayed) -> format_delayed(Delayed, false). -format_delayed(#delayed_message{key = {TimeStamp, Id}, +format_delayed(#delayed_message{key = {ExpectTimeStamp, Id}, delayed = Delayed, msg = #message{topic = Topic, from = From, headers = #{username := Username}, qos = Qos, + timestamp = PublishTimeStamp, payload = Payload}}, WithPayload) -> + PublishTime = to_rfc3339(PublishTimeStamp div 1000), + ExpectTime = to_rfc3339(ExpectTimeStamp), + RemainingTime = ExpectTimeStamp - erlang:system_time(second), Result = #{ id => emqx_guid:to_hexstr(Id), - publish_time => list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, second}])), + publish_at => PublishTime, + delayed_interval => Delayed, + delayed_remaining => RemainingTime, + expected_at => ExpectTime, topic => Topic, qos => Qos, from_clientid => From, @@ -149,6 +156,9 @@ format_delayed(#delayed_message{key = {TimeStamp, Id}, Result end. +to_rfc3339(Timestamp) -> + list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])). + get_delayed_message(Id0) -> Id = emqx_guid:from_hexstr(Id0), Ms = [{{delayed_message,{'_',Id},'_'},[],['$_']}], diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index e99242206..24f4822ab 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -59,12 +59,15 @@ properties() -> [?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH]), properties([ {id, integer, <<"Message Id (MQTT message id hash)">>}, - {publish_time, string, <<"publish time, rfc 3339">>}, + {publish_at, string, <<"Client publish message time, rfc 3339">>}, + {delayed_interval, integer, <<"Delayed interval, second">>}, + {delayed_remaining, integer, <<"Delayed remaining, second">>}, + {expected_at, string, <<"Expect publish time, rfc 3339">>}, {topic, string, <<"Topic">>}, {qos, string, <<"QoS">>}, {payload, string, iolist_to_binary(PayloadDesc)}, - {form_clientid, string, <<"Form ClientId">>}, - {form_username, string, <<"Form Username">>} + {from_clientid, string, <<"From ClientId">>}, + {from_username, string, <<"From Username">>} ]). parameters() -> diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index a9af83b1d..3e386ea1d 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -21,7 +21,7 @@ -compile(export_all). -compile(nowarn_export_all). --record(delayed_message, {key, msg}). +-record(delayed_message, {key, delayed, msg}). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl").