From 565c1a8c856bfac414162be9e2e93b2c52282020 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 22 Feb 2019 07:51:14 +0800 Subject: [PATCH] Optimize unset_flag/2 and add some specs - Optimize unset_flag/2 - Add some function specs - Add emqx_message_SUITE to Makefile --- Makefile | 2 +- src/emqx_message.erl | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index 0d9bfb7c4..6022dbf8f 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ - emqx_packet emqx_connection emqx_tracer emqx_sys_mon + emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 7e5388778..f2b821884 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -48,11 +48,13 @@ make(From, QoS, Topic, Payload) -> payload = Payload, timestamp = os:timestamp()}. +-spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()). set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) -> Msg#message{flags = Flags}; set_flags(New, Msg = #message{flags = Old}) when is_map(New) -> Msg#message{flags = maps:merge(Old, New)}. +-spec(get_flag(flag(), emqx_types:message()) -> boolean()). get_flag(Flag, Msg) -> get_flag(Flag, Msg, false). get_flag(Flag, #message{flags = Flags}, Default) -> @@ -73,20 +75,26 @@ set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) -> -spec(unset_flag(flag(), emqx_types:message()) -> emqx_types:message()). unset_flag(Flag, Msg = #message{flags = Flags}) -> - Msg#message{flags = maps:remove(Flag, Flags)}. + case maps:is_key(Flag, Flags) of + true -> + Msg#message{flags = maps:remove(Flag, Flags)}; + false -> Msg + end. +-spec(set_headers(map(), emqx_types:message()) -> emqx_types:message()). set_headers(Headers, Msg = #message{headers = undefined}) when is_map(Headers) -> Msg#message{headers = Headers}; set_headers(New, Msg = #message{headers = Old}) when is_map(New) -> - Msg#message{headers = maps:merge(Old, New)}; -set_headers(_, Msg) -> - Msg. + Msg#message{headers = maps:merge(Old, New)}. +-spec(get_header(term(), emqx_types:message()) -> term()). get_header(Hdr, Msg) -> get_header(Hdr, Msg, undefined). +-spec(get_header(term(), emqx_types:message(), Default :: term()) -> term()). get_header(Hdr, #message{headers = Headers}, Default) -> maps:get(Hdr, Headers, Default). +-spec(set_header(term(), term(), emqx_types:message()) -> emqx_types:message()). set_header(Hdr, Val, Msg = #message{headers = undefined}) -> Msg#message{headers = #{Hdr => Val}}; set_header(Hdr, Val, Msg = #message{headers = Headers}) -> @@ -98,13 +106,13 @@ is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestam is_expired(_Msg) -> false. +-spec(update_expiry(emqx_types:message()) -> emqx_types:message()). update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) -> case elapsed(CreatedAt) of Elapsed when Elapsed > 0 -> set_header('Message-Expiry-Interval', max(1, Interval - (Elapsed div 1000)), Msg); _ -> Msg end; - update_expiry(Msg) -> Msg. remove_topic_alias(Msg = #message{headers = Headers}) ->