diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 6fbd3bbf5..ca8a433d0 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -38,6 +38,7 @@ %% Flags -export([ get_flag/2 , get_flag/3 + , get_flags/1 , set_flag/2 , set_flag/3 , set_flags/2 @@ -85,6 +86,7 @@ make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 -> qos = QoS, from = From, flags = #{dup => false}, + headers = #{}, topic = Topic, payload = Payload, timestamp = os:timestamp()}. @@ -119,6 +121,9 @@ get_flag(Flag, Msg) -> get_flag(Flag, #message{flags = Flags}, Default) -> maps:get(Flag, Flags, Default). +-spec(get_flags(emqx_types:message()) -> maybe(map())). +get_flags(#message{flags = Flags}) -> Flags. + -spec(set_flag(flag(), emqx_types:message()) -> emqx_types:message()). set_flag(Flag, Msg = #message{flags = undefined}) when is_atom(Flag) -> Msg#message{flags = #{Flag => true}}; @@ -144,8 +149,7 @@ unset_flag(Flag, Msg = #message{flags = Flags}) -> set_headers(Headers, Msg = #message{headers = undefined}) when is_map(Headers) -> Msg#message{headers = Headers}; set_headers(New, Msg = #message{headers = Old}) when is_map(New) -> - Msg#message{headers = maps:merge(Old, New)}; -set_headers(undefined, Msg) -> Msg. + Msg#message{headers = maps:merge(Old, New)}. -spec(get_headers(emqx_types:message()) -> map()). get_headers(Msg) ->