chore(elvis): fix elvis warnings
This commit is contained in:
parent
f2190dd5b5
commit
d437f9f228
13
elvis.config
13
elvis.config
|
@ -7,11 +7,14 @@
|
||||||
[
|
[
|
||||||
#{dirs => ["apps/**/src", "src"],
|
#{dirs => ["apps/**/src", "src"],
|
||||||
filter => "*.erl",
|
filter => "*.erl",
|
||||||
ruleset => erl_files
|
ruleset => erl_files,
|
||||||
%rules => [
|
rules => [
|
||||||
% {elvis_style, max_module_length, #{}},
|
{elvis_style, state_record_and_type, disable},
|
||||||
% {elvis_style, no_common_caveats_call, #{}}
|
{elvis_style, no_common_caveats_call, #{}},
|
||||||
% ]
|
{elvis_style, no_debug_call, #{ debug_functions => [ {ct, pal}
|
||||||
|
, {ct, print}
|
||||||
|
]}}
|
||||||
|
]
|
||||||
},
|
},
|
||||||
#{dirs => ["apps/**/test", "test"],
|
#{dirs => ["apps/**/test", "test"],
|
||||||
filter => "*.erl",
|
filter => "*.erl",
|
||||||
|
|
|
@ -112,6 +112,8 @@
|
||||||
node_id :: trie_node_id()
|
node_id :: trie_node_id()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-type(trie_node() :: #trie_node{}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Plugin
|
%% Plugin
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -116,7 +116,8 @@ handle_cast(Msg, State) ->
|
||||||
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, State = #{nodes := Nodes}) ->
|
handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}},
|
||||||
|
State = #{nodes := Nodes}) ->
|
||||||
case ekka:is_member(Node) orelse lists:member(Node, Nodes) of
|
case ekka:is_member(Node) orelse lists:member(Node, Nodes) of
|
||||||
true -> {noreply, State};
|
true -> {noreply, State};
|
||||||
false ->
|
false ->
|
||||||
|
|
|
@ -63,10 +63,10 @@
|
||||||
-define(SHARED_SUBS, emqx_shared_subscriber).
|
-define(SHARED_SUBS, emqx_shared_subscriber).
|
||||||
-define(ALIVE_SUBS, emqx_alive_shared_subscribers).
|
-define(ALIVE_SUBS, emqx_alive_shared_subscribers).
|
||||||
-define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
|
-define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
|
||||||
-define(ack, shared_sub_ack).
|
|
||||||
-define(nack(Reason), {shared_sub_nack, Reason}).
|
|
||||||
-define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())).
|
-define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())).
|
||||||
-define(no_ack, no_ack).
|
-define(ACK, shared_sub_ack).
|
||||||
|
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
||||||
|
-define(NO_ACK, no_ack).
|
||||||
|
|
||||||
-record(state, {pmon}).
|
-record(state, {pmon}).
|
||||||
|
|
||||||
|
@ -168,9 +168,9 @@ dispatch_with_ack(SubPid, Topic, Msg) ->
|
||||||
end,
|
end,
|
||||||
try
|
try
|
||||||
receive
|
receive
|
||||||
{Ref, ?ack} ->
|
{Ref, ?ACK} ->
|
||||||
ok;
|
ok;
|
||||||
{Ref, ?nack(Reason)} ->
|
{Ref, ?NACK(Reason)} ->
|
||||||
%% the receive session may nack this message when its queue is full
|
%% the receive session may nack this message when its queue is full
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
{'DOWN', Ref, process, SubPid, Reason} ->
|
{'DOWN', Ref, process, SubPid, Reason} ->
|
||||||
|
@ -187,19 +187,19 @@ with_ack_ref(Msg, SenderRef) ->
|
||||||
emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg).
|
emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg).
|
||||||
|
|
||||||
without_ack_ref(Msg) ->
|
without_ack_ref(Msg) ->
|
||||||
emqx_message:set_headers(#{shared_dispatch_ack => ?no_ack}, Msg).
|
emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg).
|
||||||
|
|
||||||
get_ack_ref(Msg) ->
|
get_ack_ref(Msg) ->
|
||||||
emqx_message:get_header(shared_dispatch_ack, Msg, ?no_ack).
|
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
|
||||||
|
|
||||||
-spec(is_ack_required(emqx_types:message()) -> boolean()).
|
-spec(is_ack_required(emqx_types:message()) -> boolean()).
|
||||||
is_ack_required(Msg) -> ?no_ack =/= get_ack_ref(Msg).
|
is_ack_required(Msg) -> ?NO_ACK =/= get_ack_ref(Msg).
|
||||||
|
|
||||||
%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
||||||
-spec(maybe_nack_dropped(emqx_types:message()) -> ok).
|
-spec(maybe_nack_dropped(emqx_types:message()) -> ok).
|
||||||
maybe_nack_dropped(Msg) ->
|
maybe_nack_dropped(Msg) ->
|
||||||
case get_ack_ref(Msg) of
|
case get_ack_ref(Msg) of
|
||||||
?no_ack -> ok;
|
?NO_ACK -> ok;
|
||||||
{Sender, Ref} -> nack(Sender, Ref, dropped)
|
{Sender, Ref} -> nack(Sender, Ref, dropped)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -213,16 +213,16 @@ nack_no_connection(Msg) ->
|
||||||
|
|
||||||
-spec(nack(pid(), reference(), dropped | no_connection) -> ok).
|
-spec(nack(pid(), reference(), dropped | no_connection) -> ok).
|
||||||
nack(Sender, Ref, Reason) ->
|
nack(Sender, Ref, Reason) ->
|
||||||
erlang:send(Sender, {Ref, ?nack(Reason)}),
|
erlang:send(Sender, {Ref, ?NACK(Reason)}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec(maybe_ack(emqx_types:message()) -> emqx_types:message()).
|
-spec(maybe_ack(emqx_types:message()) -> emqx_types:message()).
|
||||||
maybe_ack(Msg) ->
|
maybe_ack(Msg) ->
|
||||||
case get_ack_ref(Msg) of
|
case get_ack_ref(Msg) of
|
||||||
?no_ack ->
|
?NO_ACK ->
|
||||||
Msg;
|
Msg;
|
||||||
{Sender, Ref} ->
|
{Sender, Ref} ->
|
||||||
erlang:send(Sender, {Ref, ?ack}),
|
erlang:send(Sender, {Ref, ?ACK}),
|
||||||
without_ack_ref(Msg)
|
without_ack_ref(Msg)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -307,10 +307,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon
|
||||||
handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
|
handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
|
||||||
mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
|
mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
|
||||||
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
||||||
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
delete_route_if_needed({Group, Topic}),
|
||||||
true -> ok;
|
|
||||||
false -> ok = emqx_router:do_delete_route(Topic, {Group, node()})
|
|
||||||
end,
|
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
@ -361,14 +358,14 @@ cleanup_down(SubPid) ->
|
||||||
fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) ->
|
fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) ->
|
||||||
ok = mnesia:dirty_delete_object(?TAB, Record),
|
ok = mnesia:dirty_delete_object(?TAB, Record),
|
||||||
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
||||||
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
delete_route_if_needed({Group, Topic})
|
||||||
true -> ok;
|
|
||||||
false -> ok = emqx_router:do_delete_route(Topic, {Group, node()})
|
|
||||||
end
|
|
||||||
end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).
|
end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).
|
||||||
|
|
||||||
update_stats(State) ->
|
update_stats(State) ->
|
||||||
emqx_stats:setstat('subscriptions.shared.count', 'subscriptions.shared.max', ets:info(?TAB, size)),
|
emqx_stats:setstat('subscriptions.shared.count',
|
||||||
|
'subscriptions.shared.max',
|
||||||
|
ets:info(?TAB, size)
|
||||||
|
),
|
||||||
State.
|
State.
|
||||||
|
|
||||||
%% Return 'true' if the subscriber process is alive AND not in the failed list
|
%% Return 'true' if the subscriber process is alive AND not in the failed list
|
||||||
|
@ -381,3 +378,8 @@ is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
|
||||||
is_alive_sub(Pid) ->
|
is_alive_sub(Pid) ->
|
||||||
[] =/= ets:lookup(?ALIVE_SUBS, Pid).
|
[] =/= ets:lookup(?ALIVE_SUBS, Pid).
|
||||||
|
|
||||||
|
delete_route_if_needed({Group, Topic}) ->
|
||||||
|
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
||||||
|
true -> ok;
|
||||||
|
false -> ok = emqx_router:do_delete_route(Topic, {Group, node()})
|
||||||
|
end.
|
||||||
|
|
|
@ -44,6 +44,8 @@
|
||||||
-define(TRIE_TAB, emqx_trie).
|
-define(TRIE_TAB, emqx_trie).
|
||||||
-define(TRIE_NODE_TAB, emqx_trie_node).
|
-define(TRIE_NODE_TAB, emqx_trie_node).
|
||||||
|
|
||||||
|
-elvis([{elvis_style, function_naming_convention, disable}]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -99,7 +101,7 @@ match(Topic) when is_binary(Topic) ->
|
||||||
[Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined].
|
[Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined].
|
||||||
|
|
||||||
%% @doc Lookup a trie node.
|
%% @doc Lookup a trie node.
|
||||||
-spec(lookup(NodeId :: binary()) -> [#trie_node{}]).
|
-spec(lookup(NodeId :: binary()) -> [trie_node()]).
|
||||||
lookup(NodeId) ->
|
lookup(NodeId) ->
|
||||||
mnesia:read(?TRIE_NODE_TAB, NodeId).
|
mnesia:read(?TRIE_NODE_TAB, NodeId).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue