diff --git a/src/emqx_message.erl b/src/emqx_message.erl index decbb0c18..3050ae517 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -67,10 +67,25 @@ -export([ to_packet/2 , to_map/1 , to_list/1 + , from_map/1 ]). +-export_type([message_map/0]). + +-type(message_map() :: #{id := binary(), + qos := 0 | 1 | 2, + from := atom() | binary(), + flags := emqx_types:flags(), + headers := emqx_types:headers(), + topic := emqx_types:topic(), + payload := emqx_types:payload(), + timestamp := integer()} + ). + -export([format/1]). +-elvis([{elvis_style, god_modules, disable}]). + -spec(make(emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()). make(Topic, Payload) -> make(undefined, Topic, Payload). @@ -228,8 +243,9 @@ is_expired(#message{headers = #{properties := #{'Message-Expiry-Interval' := Int is_expired(_Msg) -> false. -spec(update_expiry(emqx_types:message()) -> emqx_types:message()). -update_expiry(Msg = #message{headers = #{properties := Props = #{'Message-Expiry-Interval' := Interval}}, +update_expiry(Msg = #message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, timestamp = CreatedAt}) -> + Props = maps:get(properties, Msg#message.headers), case elapsed(CreatedAt) of Elapsed when Elapsed > 0 -> Interval1 = max(1, Interval - (Elapsed div 1000)), @@ -250,7 +266,8 @@ to_packet(PacketId, Msg = #message{qos = QoS, headers = Headers, }, variable = #mqtt_packet_publish{topic_name = Topic, packet_id = PacketId, - properties = filter_pub_props(maps:get(properties, Headers, #{})) + properties = filter_pub_props( + maps:get(properties, Headers, #{})) }, payload = Payload }. @@ -266,7 +283,7 @@ filter_pub_props(Props) -> ], Props). %% @doc Message to map --spec(to_map(emqx_types:message()) -> map()). +-spec(to_map(emqx_types:message()) -> message_map()). to_map(#message{ id = Id, qos = QoS, @@ -292,11 +309,38 @@ to_map(#message{ to_list(Msg) -> lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))). +%% @doc Map to message +-spec(from_map(message_map()) -> emqx_types:message()). +from_map(#{id := Id, + qos := QoS, + from := From, + flags := Flags, + headers := Headers, + topic := Topic, + payload := Payload, + timestamp := Timestamp + }) -> + #message{ + id = Id, + qos = QoS, + from = From, + flags = Flags, + headers = Headers, + topic = Topic, + payload = Payload, + timestamp = Timestamp + }. + %% MilliSeconds elapsed(Since) -> max(0, erlang:system_time(millisecond) - Since). -format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) -> +format(#message{id = Id, + qos = QoS, + topic = Topic, + from = From, + flags = Flags, + headers = Headers}) -> io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)", [Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]). diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index fec7967c8..bbead8f7e 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -210,3 +210,15 @@ t_to_map(_) -> ?assertEqual(List, emqx_message:to_list(Msg)), ?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)). +t_from_map(_) -> + Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>), + Map = #{id => emqx_message:id(Msg), + qos => ?QOS_1, + from => <<"clientid">>, + flags => #{}, + headers => #{}, + topic => <<"topic">>, + payload => <<"payload">>, + timestamp => emqx_message:timestamp(Msg)}, + ?assertEqual(Map, emqx_message:to_map(Msg)), + ?assertEqual(Msg, emqx_message:from_map(emqx_message:to_map(Msg))).