diff --git a/elvis.config b/elvis.config index 0216d2e0f..42f67e40f 100644 --- a/elvis.config +++ b/elvis.config @@ -7,11 +7,14 @@ [ #{dirs => ["apps/**/src", "src"], filter => "*.erl", - ruleset => erl_files - %rules => [ - % {elvis_style, max_module_length, #{}}, - % {elvis_style, no_common_caveats_call, #{}} - % ] + ruleset => erl_files, + rules => [ + {elvis_style, state_record_and_type, disable}, + {elvis_style, no_common_caveats_call, #{}}, + {elvis_style, no_debug_call, #{ debug_functions => [ {ct, pal} + , {ct, print} + ]}} + ] }, #{dirs => ["apps/**/test", "test"], filter => "*.erl", diff --git a/include/emqx.hrl b/include/emqx.hrl index a76a86528..c1e701f12 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -112,6 +112,8 @@ node_id :: trie_node_id() }). +-type(trie_node() :: #trie_node{}). + %%-------------------------------------------------------------------- %% Plugin %%-------------------------------------------------------------------- diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index bff7e52cc..5a7c6479f 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -116,7 +116,8 @@ handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, State = #{nodes := Nodes}) -> +handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, + State = #{nodes := Nodes}) -> case ekka:is_member(Node) orelse lists:member(Node, Nodes) of true -> {noreply, State}; false -> diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 3add6dadc..7ec03cd87 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -63,10 +63,10 @@ -define(SHARED_SUBS, emqx_shared_subscriber). -define(ALIVE_SUBS, emqx_alive_shared_subscribers). -define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5). --define(ack, shared_sub_ack). --define(nack(Reason), {shared_sub_nack, Reason}). -define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())). --define(no_ack, no_ack). +-define(ACK, shared_sub_ack). +-define(NACK(Reason), {shared_sub_nack, Reason}). +-define(NO_ACK, no_ack). -record(state, {pmon}). @@ -168,9 +168,9 @@ dispatch_with_ack(SubPid, Topic, Msg) -> end, try receive - {Ref, ?ack} -> + {Ref, ?ACK} -> ok; - {Ref, ?nack(Reason)} -> + {Ref, ?NACK(Reason)} -> %% the receive session may nack this message when its queue is full {error, Reason}; {'DOWN', Ref, process, SubPid, Reason} -> @@ -187,19 +187,19 @@ with_ack_ref(Msg, SenderRef) -> emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg). without_ack_ref(Msg) -> - emqx_message:set_headers(#{shared_dispatch_ack => ?no_ack}, Msg). + emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg). get_ack_ref(Msg) -> - emqx_message:get_header(shared_dispatch_ack, Msg, ?no_ack). + emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). -spec(is_ack_required(emqx_types:message()) -> boolean()). -is_ack_required(Msg) -> ?no_ack =/= get_ack_ref(Msg). +is_ack_required(Msg) -> ?NO_ACK =/= get_ack_ref(Msg). %% @doc Negative ack dropped message due to inflight window or message queue being full. -spec(maybe_nack_dropped(emqx_types:message()) -> ok). maybe_nack_dropped(Msg) -> case get_ack_ref(Msg) of - ?no_ack -> ok; + ?NO_ACK -> ok; {Sender, Ref} -> nack(Sender, Ref, dropped) end. @@ -213,16 +213,16 @@ nack_no_connection(Msg) -> -spec(nack(pid(), reference(), dropped | no_connection) -> ok). nack(Sender, Ref, Reason) -> - erlang:send(Sender, {Ref, ?nack(Reason)}), + erlang:send(Sender, {Ref, ?NACK(Reason)}), ok. -spec(maybe_ack(emqx_types:message()) -> emqx_types:message()). maybe_ack(Msg) -> case get_ack_ref(Msg) of - ?no_ack -> + ?NO_ACK -> Msg; {Sender, Ref} -> - erlang:send(Sender, {Ref, ?ack}), + erlang:send(Sender, {Ref, ?ACK}), without_ack_ref(Msg) end. @@ -307,10 +307,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), - case ets:member(?SHARED_SUBS, {Group, Topic}) of - true -> ok; - false -> ok = emqx_router:do_delete_route(Topic, {Group, node()}) - end, + delete_route_if_needed({Group, Topic}), {reply, ok, State}; handle_call(Req, _From, State) -> @@ -361,14 +358,14 @@ cleanup_down(SubPid) -> fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) -> ok = mnesia:dirty_delete_object(?TAB, Record), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), - case ets:member(?SHARED_SUBS, {Group, Topic}) of - true -> ok; - false -> ok = emqx_router:do_delete_route(Topic, {Group, node()}) - end + delete_route_if_needed({Group, Topic}) end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})). update_stats(State) -> - emqx_stats:setstat('subscriptions.shared.count', 'subscriptions.shared.max', ets:info(?TAB, size)), + emqx_stats:setstat('subscriptions.shared.count', + 'subscriptions.shared.max', + ets:info(?TAB, size) + ), State. %% Return 'true' if the subscriber process is alive AND not in the failed list @@ -381,3 +378,8 @@ is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> is_alive_sub(Pid) -> [] =/= ets:lookup(?ALIVE_SUBS, Pid). +delete_route_if_needed({Group, Topic}) -> + case ets:member(?SHARED_SUBS, {Group, Topic}) of + true -> ok; + false -> ok = emqx_router:do_delete_route(Topic, {Group, node()}) + end. diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index d3a22e0b2..ae85d92c2 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -44,6 +44,8 @@ -define(TRIE_TAB, emqx_trie). -define(TRIE_NODE_TAB, emqx_trie_node). +-elvis([{elvis_style, function_naming_convention, disable}]). + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -99,7 +101,7 @@ match(Topic) when is_binary(Topic) -> [Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined]. %% @doc Lookup a trie node. --spec(lookup(NodeId :: binary()) -> [#trie_node{}]). +-spec(lookup(NodeId :: binary()) -> [trie_node()]). lookup(NodeId) -> mnesia:read(?TRIE_NODE_TAB, NodeId).