From 82c337a040201b8a9a093d6cb4b10629cde6f719 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 21 Feb 2019 16:04:24 +0800 Subject: [PATCH 1/4] Fix 'drpped' typo - Fix 'drpped' typo - Add specs for exported functions - Add 'group()' type in emqx_topic module --- src/emqx_shared_sub.erl | 12 ++++++++---- src/emqx_topic.erl | 3 ++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 5c4b46217..65f535059 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -75,15 +75,19 @@ mnesia(copy) -> start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec(subscribe(emqx_topic:group(), emqx_topic:topic(), pid()) -> ok). subscribe(Group, Topic, SubPid) when is_pid(SubPid) -> gen_server:call(?SERVER, {subscribe, Group, Topic, SubPid}). +-spec(unsubscribe(emqx_topic:group(), emqx_topic:topic(), pid()) -> ok). unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}). record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. +-spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery()) + -> emqx_types:delivery()). dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery, _FailedSubs = []). @@ -173,12 +177,12 @@ get_ack_ref(Msg) -> -spec(is_ack_required(emqx_types:message()) -> boolean()). is_ack_required(Msg) -> ?no_ack =/= get_ack_ref(Msg). -%% @doc Negative ack dropped message due to message queue being full. +%% @doc Negative ack dropped message due to inflight window or message queue being full. -spec(maybe_nack_dropped(emqx_types:message()) -> ok). maybe_nack_dropped(Msg) -> case get_ack_ref(Msg) of ?no_ack -> ok; - {Sender, Ref} -> nack(Sender, Ref, drpped) + {Sender, Ref} -> nack(Sender, Ref, dropped) end. %% @doc Negative ack message due to connection down. @@ -325,9 +329,9 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% keep track of alive remote pids maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok; diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 2a37a873d..5339a7879 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -25,12 +25,13 @@ -export([systop/1]). -export([parse/1, parse/2]). +-type(group() :: binary()). -type(topic() :: binary()). -type(word() :: '' | '+' | '#' | binary()). -type(words() :: list(word())). -type(triple() :: {root | binary(), word(), binary()}). --export_type([topic/0, word/0, triple/0]). +-export_type([group/0, topic/0, word/0, triple/0]). -define(MAX_TOPIC_LEN, 4096). From 565c1a8c856bfac414162be9e2e93b2c52282020 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 22 Feb 2019 07:51:14 +0800 Subject: [PATCH 2/4] Optimize unset_flag/2 and add some specs - Optimize unset_flag/2 - Add some function specs - Add emqx_message_SUITE to Makefile --- Makefile | 2 +- src/emqx_message.erl | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index 0d9bfb7c4..6022dbf8f 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ - emqx_packet emqx_connection emqx_tracer emqx_sys_mon + emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 7e5388778..f2b821884 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -48,11 +48,13 @@ make(From, QoS, Topic, Payload) -> payload = Payload, timestamp = os:timestamp()}. +-spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()). set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) -> Msg#message{flags = Flags}; set_flags(New, Msg = #message{flags = Old}) when is_map(New) -> Msg#message{flags = maps:merge(Old, New)}. +-spec(get_flag(flag(), emqx_types:message()) -> boolean()). get_flag(Flag, Msg) -> get_flag(Flag, Msg, false). get_flag(Flag, #message{flags = Flags}, Default) -> @@ -73,20 +75,26 @@ set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) -> -spec(unset_flag(flag(), emqx_types:message()) -> emqx_types:message()). unset_flag(Flag, Msg = #message{flags = Flags}) -> - Msg#message{flags = maps:remove(Flag, Flags)}. + case maps:is_key(Flag, Flags) of + true -> + Msg#message{flags = maps:remove(Flag, Flags)}; + false -> Msg + end. +-spec(set_headers(map(), emqx_types:message()) -> emqx_types:message()). set_headers(Headers, Msg = #message{headers = undefined}) when is_map(Headers) -> Msg#message{headers = Headers}; set_headers(New, Msg = #message{headers = Old}) when is_map(New) -> - Msg#message{headers = maps:merge(Old, New)}; -set_headers(_, Msg) -> - Msg. + Msg#message{headers = maps:merge(Old, New)}. +-spec(get_header(term(), emqx_types:message()) -> term()). get_header(Hdr, Msg) -> get_header(Hdr, Msg, undefined). +-spec(get_header(term(), emqx_types:message(), Default :: term()) -> term()). get_header(Hdr, #message{headers = Headers}, Default) -> maps:get(Hdr, Headers, Default). +-spec(set_header(term(), term(), emqx_types:message()) -> emqx_types:message()). set_header(Hdr, Val, Msg = #message{headers = undefined}) -> Msg#message{headers = #{Hdr => Val}}; set_header(Hdr, Val, Msg = #message{headers = Headers}) -> @@ -98,13 +106,13 @@ is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestam is_expired(_Msg) -> false. +-spec(update_expiry(emqx_types:message()) -> emqx_types:message()). update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) -> case elapsed(CreatedAt) of Elapsed when Elapsed > 0 -> set_header('Message-Expiry-Interval', max(1, Interval - (Elapsed div 1000)), Msg); _ -> Msg end; - update_expiry(Msg) -> Msg. remove_topic_alias(Msg = #message{headers = Headers}) -> From e16f2fe1fd36ef5bbdfae31969db1a28ed4fa347 Mon Sep 17 00:00:00 2001 From: linjun <1045735402@qq.com> Date: Fri, 22 Feb 2019 14:06:44 +0800 Subject: [PATCH 3/4] Increasing coverage for emqx_message --- test/emqx_message_SUITE.erl | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index 9bf862af3..0975585cc 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -33,11 +33,13 @@ all() -> ]. message_make(_) -> - Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), + Msg = emqx_message:make(<<"clientid">>, <<"payload">>), ?assertEqual(0, Msg#message.qos), - Msg1 = emqx_message:make(<<"clientid">>, ?QOS_2, <<"topic">>, <<"payload">>), - ?assert(is_binary(Msg1#message.id)), - ?assertEqual(2, Msg1#message.qos). + Msg1 = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), + ?assertEqual(0, Msg1#message.qos), + Msg2 = emqx_message:make(<<"clientid">>, ?QOS_2, <<"topic">>, <<"payload">>), + ?assert(is_binary(Msg2#message.id)), + ?assertEqual(2, Msg2#message.qos). message_flag(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), From c8b243ed22a16cbc483b18302ba8f25302ba5b4f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 21 Feb 2019 10:02:27 +0800 Subject: [PATCH 4/4] Change some '-type' attrs to '-opaque' and improve emqx_gc, emqx_inflight modules - Define 'GCS(St)' macro to improve emqx_gc module - Define 'Inflight(MaxSize, Tree)' macro to improve emqx_inflight module --- src/emqx_batch.erl | 5 +++- src/emqx_frame.erl | 2 +- src/emqx_gc.erl | 21 ++++++++----- src/emqx_inflight.erl | 68 ++++++++++++++++++++++-------------------- src/emqx_keepalive.erl | 2 +- src/emqx_protocol.erl | 5 ++-- src/emqx_topic.erl | 2 +- 7 files changed, 59 insertions(+), 46 deletions(-) diff --git a/src/emqx_batch.erl b/src/emqx_batch.erl index f2592dd93..9fb59eb1f 100644 --- a/src/emqx_batch.erl +++ b/src/emqx_batch.erl @@ -22,6 +22,7 @@ linger_ms => pos_integer(), commit_fun := function() }). + -export_type([options/0]). -record(batch, { @@ -31,7 +32,9 @@ linger_timer :: reference() | undefined, commit_fun :: function() }). --type(batch() :: #batch{}). + +-opaque(batch() :: #batch{}). + -export_type([batch/0]). -spec(init(options()) -> batch()). diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 4d958a036..19c6ca87b 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -24,7 +24,7 @@ -type(options() :: #{max_packet_size => 1..?MAX_PACKET_SIZE, version => emqx_mqtt_types:version()}). --type(parse_state() :: {none, options()} | cont_fun(binary())). +-opaque(parse_state() :: {none, options()} | cont_fun(binary())). -type(cont_fun(Bin) :: fun((Bin) -> {ok, emqx_mqtt_types:packet(), binary()} | {more, cont_fun(Bin)})). diff --git a/src/emqx_gc.erl b/src/emqx_gc.erl index 17bafa8ff..1c438ddf1 100644 --- a/src/emqx_gc.erl +++ b/src/emqx_gc.erl @@ -31,7 +31,11 @@ -type(st() :: #{cnt => {integer(), integer()}, oct => {integer(), integer()}}). --type(gc_state() :: {?MODULE, st()}). +-opaque(gc_state() :: {?MODULE, st()}). + +-export_type([gc_state/0]). + +-define(GCS(St), {?MODULE, St}). -define(disabled, disabled). -define(ENABLED(X), (is_integer(X) andalso X > 0)). @@ -41,14 +45,15 @@ init(#{count := Count, bytes := Bytes}) -> Cnt = [{cnt, {Count, Count}} || ?ENABLED(Count)], Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)], - {?MODULE, maps:from_list(Cnt ++ Oct)}; + ?GCS(maps:from_list(Cnt ++ Oct)); init(false) -> undefined. %% @doc Try to run GC based on reduntions of count or bytes. --spec(run(pos_integer(), pos_integer(), gc_state()) -> {boolean(), gc_state()}). -run(Cnt, Oct, {?MODULE, St}) -> +-spec(run(pos_integer(), pos_integer(), gc_state()) + -> {boolean(), gc_state()}). +run(Cnt, Oct, ?GCS(St)) -> {Res, St1} = run([{cnt, Cnt}, {oct, Oct}], St), - {Res, {?MODULE, St1}}; + {Res, ?GCS(St1)}; run(_Cnt, _Oct, undefined) -> {false, undefined}. @@ -64,15 +69,15 @@ run([{K, N}|T], St) -> %% @doc Info of GC state. -spec(info(gc_state()) -> maybe(map())). -info({?MODULE, St}) -> +info(?GCS(St)) -> St; info(undefined) -> undefined. %% @doc Reset counters to zero. -spec(reset(gc_state()) -> gc_state()). -reset({?MODULE, St}) -> - {?MODULE, do_reset(St)}; +reset(?GCS(St)) -> + ?GCS(do_reset(St)); reset(undefined) -> undefined. diff --git a/src/emqx_inflight.erl b/src/emqx_inflight.erl index da80e0b30..7e1fc8a67 100644 --- a/src/emqx_inflight.erl +++ b/src/emqx_inflight.erl @@ -14,11 +14,15 @@ -module(emqx_inflight). --export([new/1, contain/2, lookup/2, insert/3, update/3, update_size/2, delete/2, values/1, - to_list/1, size/1, max_size/1, is_full/1, is_empty/1, window/1]). +-export([new/1, contain/2, lookup/2, insert/3, update/3, update_size/2, delete/2, + values/1, to_list/1, size/1, max_size/1, is_full/1, is_empty/1, window/1]). +-type(key() :: term()). -type(max_size() :: pos_integer()). --type(inflight() :: {?MODULE, max_size(), gb_trees:tree()}). +-opaque(inflight() :: {?MODULE, max_size(), gb_trees:tree()}). + +-define(Inflight(Tree), {?MODULE, _MaxSize, Tree}). +-define(Inflight(MaxSize, Tree), {?MODULE, MaxSize, (Tree)}). -export_type([inflight/0]). @@ -26,68 +30,68 @@ new(MaxSize) when MaxSize >= 0 -> {?MODULE, MaxSize, gb_trees:empty()}. --spec(contain(Key :: term(), inflight()) -> boolean()). -contain(Key, {?MODULE, _MaxSize, Tree}) -> +-spec(contain(key(), inflight()) -> boolean()). +contain(Key, ?Inflight(Tree)) -> gb_trees:is_defined(Key, Tree). --spec(lookup(Key :: term(), inflight()) -> {value, term()} | none). -lookup(Key, {?MODULE, _MaxSize, Tree}) -> +-spec(lookup(key(), inflight()) -> {value, term()} | none). +lookup(Key, ?Inflight(Tree)) -> gb_trees:lookup(Key, Tree). --spec(insert(Key :: term(), Value :: term(), inflight()) -> inflight()). -insert(Key, Value, {?MODULE, MaxSize, Tree}) -> - {?MODULE, MaxSize, gb_trees:insert(Key, Value, Tree)}. +-spec(insert(key(), Val :: term(), inflight()) -> inflight()). +insert(Key, Val, ?Inflight(MaxSize, Tree)) -> + ?Inflight(MaxSize, gb_trees:insert(Key, Val, Tree)). --spec(delete(Key :: term(), inflight()) -> inflight()). -delete(Key, {?MODULE, MaxSize, Tree}) -> - {?MODULE, MaxSize, gb_trees:delete(Key, Tree)}. +-spec(delete(key(), inflight()) -> inflight()). +delete(Key, ?Inflight(MaxSize, Tree)) -> + ?Inflight(MaxSize, gb_trees:delete(Key, Tree)). --spec(update(Key :: term(), Val :: term(), inflight()) -> inflight()). -update(Key, Val, {?MODULE, MaxSize, Tree}) -> - {?MODULE, MaxSize, gb_trees:update(Key, Val, Tree)}. +-spec(update(key(), Val :: term(), inflight()) -> inflight()). +update(Key, Val, ?Inflight(MaxSize, Tree)) -> + ?Inflight(MaxSize, gb_trees:update(Key, Val, Tree)). -spec(update_size(integer(), inflight()) -> inflight()). -update_size(MaxSize, {?MODULE, _OldMaxSize, Tree}) -> - {?MODULE, MaxSize, Tree}. +update_size(MaxSize, ?Inflight(Tree)) -> + ?Inflight(MaxSize, Tree). -spec(is_full(inflight()) -> boolean()). -is_full({?MODULE, 0, _Tree}) -> +is_full(?Inflight(0, _Tree)) -> false; -is_full({?MODULE, MaxSize, Tree}) -> +is_full(?Inflight(MaxSize, Tree)) -> MaxSize =< gb_trees:size(Tree). -spec(is_empty(inflight()) -> boolean()). -is_empty({?MODULE, _MaxSize, Tree}) -> +is_empty(?Inflight(Tree)) -> gb_trees:is_empty(Tree). --spec(smallest(inflight()) -> {K :: term(), V :: term()}). -smallest({?MODULE, _MaxSize, Tree}) -> +-spec(smallest(inflight()) -> {key(), term()}). +smallest(?Inflight(Tree)) -> gb_trees:smallest(Tree). --spec(largest(inflight()) -> {K :: term(), V :: term()}). -largest({?MODULE, _MaxSize, Tree}) -> +-spec(largest(inflight()) -> {key(), term()}). +largest(?Inflight(Tree)) -> gb_trees:largest(Tree). -spec(values(inflight()) -> list()). -values({?MODULE, _MaxSize, Tree}) -> +values(?Inflight(Tree)) -> gb_trees:values(Tree). --spec(to_list(inflight()) -> list({K :: term(), V :: term()})). -to_list({?MODULE, _MaxSize, Tree}) -> +-spec(to_list(inflight()) -> list({key(), term()})). +to_list(?Inflight(Tree)) -> gb_trees:to_list(Tree). -spec(window(inflight()) -> list()). -window(Inflight = {?MODULE, _MaxSize, Tree}) -> +window(Inflight = ?Inflight(Tree)) -> case gb_trees:is_empty(Tree) of - true -> []; + true -> []; false -> [Key || {Key, _Val} <- [smallest(Inflight), largest(Inflight)]] end. -spec(size(inflight()) -> non_neg_integer()). -size({?MODULE, _MaxSize, Tree}) -> +size(?Inflight(Tree)) -> gb_trees:size(Tree). -spec(max_size(inflight()) -> non_neg_integer()). -max_size({?MODULE, MaxSize, _Tree}) -> +max_size(?Inflight(MaxSize, _Tree)) -> MaxSize. diff --git a/src/emqx_keepalive.erl b/src/emqx_keepalive.erl index 2d9b0513f..bd1b9b9ef 100644 --- a/src/emqx_keepalive.erl +++ b/src/emqx_keepalive.erl @@ -18,7 +18,7 @@ -record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}). --type(keepalive() :: #keepalive{}). +-opaque(keepalive() :: #keepalive{}). -export_type([keepalive/0]). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index fad3c6f1a..c6260168c 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -34,6 +34,8 @@ -export([send/2]). -export([shutdown/2]). +-export_type([state/0]). + -record(pstate, { zone, sendfun, @@ -66,8 +68,7 @@ topic_alias_maximum }). --type(state() :: #pstate{}). --export_type([state/0]). +-opaque(state() :: #pstate{}). -ifdef(TEST). -compile(export_all). diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 5339a7879..59f592984 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -29,7 +29,7 @@ -type(topic() :: binary()). -type(word() :: '' | '+' | '#' | binary()). -type(words() :: list(word())). --type(triple() :: {root | binary(), word(), binary()}). +-opaque(triple() :: {root | binary(), word(), binary()}). -export_type([group/0, topic/0, word/0, triple/0]).