Merge pull request #4314 from z8674558/to-message
feat(emqx_message): add from_map
This commit is contained in:
commit
d2bb7d0371
|
@ -67,10 +67,25 @@
|
|||
-export([ to_packet/2
|
||||
, to_map/1
|
||||
, to_list/1
|
||||
, from_map/1
|
||||
]).
|
||||
|
||||
-export_type([message_map/0]).
|
||||
|
||||
-type(message_map() :: #{id := binary(),
|
||||
qos := 0 | 1 | 2,
|
||||
from := atom() | binary(),
|
||||
flags := emqx_types:flags(),
|
||||
headers := emqx_types:headers(),
|
||||
topic := emqx_types:topic(),
|
||||
payload := emqx_types:payload(),
|
||||
timestamp := integer()}
|
||||
).
|
||||
|
||||
-export([format/1]).
|
||||
|
||||
-elvis([{elvis_style, god_modules, disable}]).
|
||||
|
||||
-spec(make(emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()).
|
||||
make(Topic, Payload) ->
|
||||
make(undefined, Topic, Payload).
|
||||
|
@ -228,8 +243,9 @@ is_expired(#message{headers = #{properties := #{'Message-Expiry-Interval' := Int
|
|||
is_expired(_Msg) -> false.
|
||||
|
||||
-spec(update_expiry(emqx_types:message()) -> emqx_types:message()).
|
||||
update_expiry(Msg = #message{headers = #{properties := Props = #{'Message-Expiry-Interval' := Interval}},
|
||||
update_expiry(Msg = #message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
|
||||
timestamp = CreatedAt}) ->
|
||||
Props = maps:get(properties, Msg#message.headers),
|
||||
case elapsed(CreatedAt) of
|
||||
Elapsed when Elapsed > 0 ->
|
||||
Interval1 = max(1, Interval - (Elapsed div 1000)),
|
||||
|
@ -250,7 +266,8 @@ to_packet(PacketId, Msg = #message{qos = QoS, headers = Headers,
|
|||
},
|
||||
variable = #mqtt_packet_publish{topic_name = Topic,
|
||||
packet_id = PacketId,
|
||||
properties = filter_pub_props(maps:get(properties, Headers, #{}))
|
||||
properties = filter_pub_props(
|
||||
maps:get(properties, Headers, #{}))
|
||||
},
|
||||
payload = Payload
|
||||
}.
|
||||
|
@ -266,7 +283,7 @@ filter_pub_props(Props) ->
|
|||
], Props).
|
||||
|
||||
%% @doc Message to map
|
||||
-spec(to_map(emqx_types:message()) -> map()).
|
||||
-spec(to_map(emqx_types:message()) -> message_map()).
|
||||
to_map(#message{
|
||||
id = Id,
|
||||
qos = QoS,
|
||||
|
@ -292,11 +309,38 @@ to_map(#message{
|
|||
to_list(Msg) ->
|
||||
lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))).
|
||||
|
||||
%% @doc Map to message
|
||||
-spec(from_map(message_map()) -> emqx_types:message()).
|
||||
from_map(#{id := Id,
|
||||
qos := QoS,
|
||||
from := From,
|
||||
flags := Flags,
|
||||
headers := Headers,
|
||||
topic := Topic,
|
||||
payload := Payload,
|
||||
timestamp := Timestamp
|
||||
}) ->
|
||||
#message{
|
||||
id = Id,
|
||||
qos = QoS,
|
||||
from = From,
|
||||
flags = Flags,
|
||||
headers = Headers,
|
||||
topic = Topic,
|
||||
payload = Payload,
|
||||
timestamp = Timestamp
|
||||
}.
|
||||
|
||||
%% MilliSeconds
|
||||
elapsed(Since) ->
|
||||
max(0, erlang:system_time(millisecond) - Since).
|
||||
|
||||
format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) ->
|
||||
format(#message{id = Id,
|
||||
qos = QoS,
|
||||
topic = Topic,
|
||||
from = From,
|
||||
flags = Flags,
|
||||
headers = Headers}) ->
|
||||
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)",
|
||||
[Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]).
|
||||
|
||||
|
|
|
@ -210,3 +210,15 @@ t_to_map(_) ->
|
|||
?assertEqual(List, emqx_message:to_list(Msg)),
|
||||
?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)).
|
||||
|
||||
t_from_map(_) ->
|
||||
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>),
|
||||
Map = #{id => emqx_message:id(Msg),
|
||||
qos => ?QOS_1,
|
||||
from => <<"clientid">>,
|
||||
flags => #{},
|
||||
headers => #{},
|
||||
topic => <<"topic">>,
|
||||
payload => <<"payload">>,
|
||||
timestamp => emqx_message:timestamp(Msg)},
|
||||
?assertEqual(Map, emqx_message:to_map(Msg)),
|
||||
?assertEqual(Msg, emqx_message:from_map(emqx_message:to_map(Msg))).
|
||||
|
|
Loading…
Reference in New Issue