chore: fix ignored dialyzer warnings (part 1) (4.3)
This commit is contained in:
parent
b6f24b3ffe
commit
d72ca84af0
|
@ -147,4 +147,3 @@ safe_get(K, ClientInfo) ->
|
||||||
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
||||||
bin(B) when is_binary(B) -> B;
|
bin(B) when is_binary(B) -> B;
|
||||||
bin(X) -> X.
|
bin(X) -> X.
|
||||||
|
|
||||||
|
|
|
@ -46,7 +46,7 @@
|
||||||
channel :: pid(),
|
channel :: pid(),
|
||||||
%% Registered hook names and options
|
%% Registered hook names and options
|
||||||
hookspec :: #{hookpoint() => map()},
|
hookspec :: #{hookpoint() => map()},
|
||||||
%% Metrcis name prefix
|
%% Metrics name prefix
|
||||||
prefix :: list()
|
prefix :: list()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -75,8 +75,6 @@
|
||||||
|
|
||||||
-export_type([server/0]).
|
-export_type([server/0]).
|
||||||
|
|
||||||
-dialyzer({nowarn_function, [inc_metrics/2]}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Load/Unload APIs
|
%% Load/Unload APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -249,10 +247,6 @@ call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts,
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
inc_metrics(IncFun, Name) when is_function(IncFun) ->
|
|
||||||
%% BACKW: e4.2.0-e4.2.2
|
|
||||||
{env, [Prefix|_]} = erlang:fun_info(IncFun, env),
|
|
||||||
inc_metrics(Prefix, Name);
|
|
||||||
inc_metrics(Prefix, Name) when is_list(Prefix) ->
|
inc_metrics(Prefix, Name) when is_list(Prefix) ->
|
||||||
emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))).
|
emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))).
|
||||||
|
|
||||||
|
|
|
@ -91,14 +91,6 @@
|
||||||
|
|
||||||
-define(ENABLED(X), (X =/= undefined)).
|
-define(ENABLED(X), (X =/= undefined)).
|
||||||
|
|
||||||
-dialyzer({nowarn_function,
|
|
||||||
[ system_terminate/4
|
|
||||||
, handle_call/3
|
|
||||||
, handle_msg/2
|
|
||||||
, shutdown/3
|
|
||||||
, stop/3
|
|
||||||
]}).
|
|
||||||
|
|
||||||
%% udp
|
%% udp
|
||||||
start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) ->
|
start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) ->
|
||||||
Args = [self(), Socket, Peername, Options],
|
Args = [self(), Socket, Peername, Options],
|
||||||
|
@ -501,6 +493,7 @@ terminate(Reason, State = #state{channel = Channel}) ->
|
||||||
system_continue(Parent, _Debug, State) ->
|
system_continue(Parent, _Debug, State) ->
|
||||||
recvloop(Parent, State).
|
recvloop(Parent, State).
|
||||||
|
|
||||||
|
-spec system_terminate(atom(), term(), term(), state()) -> no_return().
|
||||||
system_terminate(Reason, _Parent, _Debug, State) ->
|
system_terminate(Reason, _Parent, _Debug, State) ->
|
||||||
terminate(Reason, State).
|
terminate(Reason, State).
|
||||||
|
|
||||||
|
|
|
@ -185,7 +185,6 @@ confs_to_binary(Confs) ->
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-dialyzer([{nowarn_function, [import_rules/1, import_rule/1]}]).
|
|
||||||
import_rule(#{<<"id">> := RuleId,
|
import_rule(#{<<"id">> := RuleId,
|
||||||
<<"rawsql">> := RawSQL,
|
<<"rawsql">> := RawSQL,
|
||||||
<<"actions">> := Actions,
|
<<"actions">> := Actions,
|
||||||
|
@ -537,7 +536,6 @@ do_import_acl_mnesia(Acls) ->
|
||||||
end, Acls).
|
end, Acls).
|
||||||
|
|
||||||
-ifdef(EMQX_ENTERPRISE).
|
-ifdef(EMQX_ENTERPRISE).
|
||||||
-dialyzer({nowarn_function, [import_modules/1]}).
|
|
||||||
import_modules(Modules) ->
|
import_modules(Modules) ->
|
||||||
case ets:info(emqx_modules) of
|
case ets:info(emqx_modules) of
|
||||||
undefined ->
|
undefined ->
|
||||||
|
|
|
@ -207,7 +207,7 @@ create_rule(Params = #{rawsql := Sql, actions := ActArgs}) ->
|
||||||
Reason -> {error, Reason}
|
Reason -> {error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(update_rule(#{id := binary(), _=>_}) -> {ok, rule()} | {error, {not_found, rule_id()}}).
|
-spec(update_rule(#{id := binary(), _=>_}) -> {ok, rule()} | {error, {not_found, rule_id()} | term()}).
|
||||||
update_rule(Params = #{id := RuleId}) ->
|
update_rule(Params = #{id := RuleId}) ->
|
||||||
case emqx_rule_registry:get_rule(RuleId) of
|
case emqx_rule_registry:get_rule(RuleId) of
|
||||||
{ok, Rule0} ->
|
{ok, Rule0} ->
|
||||||
|
@ -336,7 +336,6 @@ start_resource(ResId) ->
|
||||||
{error, {resource_not_found, ResId}}
|
{error, {resource_not_found, ResId}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-dialyzer([{nowarn_function, test_resource/1}]).
|
|
||||||
-spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}).
|
-spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}).
|
||||||
test_resource(#{type := Type} = Params) ->
|
test_resource(#{type := Type} = Params) ->
|
||||||
case emqx_rule_registry:find_resource_type(Type) of
|
case emqx_rule_registry:find_resource_type(Type) of
|
||||||
|
|
|
@ -207,12 +207,6 @@
|
||||||
<<"Bad Arguments: ", R0/binary>>
|
<<"Bad Arguments: ", R0/binary>>
|
||||||
end).
|
end).
|
||||||
|
|
||||||
-dialyzer({nowarn_function, [create_rule/2,
|
|
||||||
test_rule_sql/1,
|
|
||||||
do_create_rule/1,
|
|
||||||
update_rule/2
|
|
||||||
]}).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Rules API
|
%% Rules API
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -96,7 +96,6 @@ unload() ->
|
||||||
%%-----------------------------------------------------------------------------
|
%%-----------------------------------------------------------------------------
|
||||||
%% 'rules' command
|
%% 'rules' command
|
||||||
%%-----------------------------------------------------------------------------
|
%%-----------------------------------------------------------------------------
|
||||||
-dialyzer([{nowarn_function, [rules/1]}]).
|
|
||||||
rules(["list"]) ->
|
rules(["list"]) ->
|
||||||
print_all(emqx_rule_registry:get_rules_ordered_by_ts());
|
print_all(emqx_rule_registry:get_rules_ordered_by_ts());
|
||||||
|
|
||||||
|
|
|
@ -180,7 +180,6 @@ select_and_collect([Field|More], Input, {Output, LastKV}) ->
|
||||||
{nested_put(Key, Val, Output), LastKV}).
|
{nested_put(Key, Val, Output), LastKV}).
|
||||||
|
|
||||||
%% Filter each item got from FOREACH
|
%% Filter each item got from FOREACH
|
||||||
-dialyzer({nowarn_function, filter_collection/4}).
|
|
||||||
filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) ->
|
filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) ->
|
||||||
lists:filtermap(
|
lists:filtermap(
|
||||||
fun(Item) ->
|
fun(Item) ->
|
||||||
|
|
|
@ -20,16 +20,6 @@
|
||||||
-export([ test/1
|
-export([ test/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Dialyzer gives up on the generated code.
|
|
||||||
%% probably due to stack depth, or inlines.
|
|
||||||
-dialyzer({nowarn_function, [test/1,
|
|
||||||
test_rule/4,
|
|
||||||
flatten/1,
|
|
||||||
sql_test_action/0,
|
|
||||||
fill_default_values/2,
|
|
||||||
envs_examp/1
|
|
||||||
]}).
|
|
||||||
|
|
||||||
-spec(test(#{}) -> {ok, map() | list()} | {error, term()}).
|
-spec(test(#{}) -> {ok, map() | list()} | {error, term()}).
|
||||||
test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
|
test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
|
||||||
{ok, Select} = emqx_rule_sqlparser:parse_select(Sql),
|
{ok, Select} = emqx_rule_sqlparser:parse_select(Sql),
|
||||||
|
|
|
@ -91,13 +91,6 @@
|
||||||
|
|
||||||
-define(ENABLED(X), (X =/= undefined)).
|
-define(ENABLED(X), (X =/= undefined)).
|
||||||
|
|
||||||
-dialyzer({nowarn_function, [ ensure_stats_timer/2
|
|
||||||
]}).
|
|
||||||
|
|
||||||
-dialyzer({no_return, [ init/1
|
|
||||||
, init_state/3
|
|
||||||
]}).
|
|
||||||
|
|
||||||
start_link(Transport, Sock, ProtoEnv) ->
|
start_link(Transport, Sock, ProtoEnv) ->
|
||||||
{ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, ProtoEnv]])}.
|
{ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, ProtoEnv]])}.
|
||||||
|
|
||||||
|
@ -143,6 +136,7 @@ call(Pid, Req) ->
|
||||||
call(Pid, Req, Timeout) ->
|
call(Pid, Req, Timeout) ->
|
||||||
gen_server:call(Pid, Req, Timeout).
|
gen_server:call(Pid, Req, Timeout).
|
||||||
|
|
||||||
|
-spec init([term()]) -> no_return().
|
||||||
init([Transport, RawSocket, ProtoEnv]) ->
|
init([Transport, RawSocket, ProtoEnv]) ->
|
||||||
case Transport:wait(RawSocket) of
|
case Transport:wait(RawSocket) of
|
||||||
{ok, Socket} ->
|
{ok, Socket} ->
|
||||||
|
@ -152,6 +146,7 @@ init([Transport, RawSocket, ProtoEnv]) ->
|
||||||
exit_on_sock_error(Reason)
|
exit_on_sock_error(Reason)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec init_state(module(), port(), [proplists:property()]) -> no_return().
|
||||||
init_state(Transport, Socket, ProtoEnv) ->
|
init_state(Transport, Socket, ProtoEnv) ->
|
||||||
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
|
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
|
||||||
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
|
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
|
||||||
|
|
|
@ -50,9 +50,29 @@
|
||||||
, handle_recv_nack_frame/2
|
, handle_recv_nack_frame/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-type stomp_conninfo() :: #{socktype := emqx_types:socktype(),
|
||||||
|
sockname := emqx_types:peername(),
|
||||||
|
peername := emqx_types:peername(),
|
||||||
|
peercert := nossl | undefined | esockd_peercert:peercert(),
|
||||||
|
conn_mod := module(),
|
||||||
|
proto_name => binary(),
|
||||||
|
proto_ver => emqx_types:ver(),
|
||||||
|
clean_start => boolean(),
|
||||||
|
clientid => emqx_types:clientid(),
|
||||||
|
username => emqx_types:username(),
|
||||||
|
conn_props => emqx_types:properties(),
|
||||||
|
connected => boolean(),
|
||||||
|
connected_at => undefined | non_neg_integer(),
|
||||||
|
disconnected_at => non_neg_integer(),
|
||||||
|
keepalive => undefined | 0..16#FFFF,
|
||||||
|
receive_maximum => non_neg_integer(),
|
||||||
|
expiry_interval => non_neg_integer(),
|
||||||
|
atom() => term()
|
||||||
|
}.
|
||||||
|
|
||||||
-record(pstate, {
|
-record(pstate, {
|
||||||
%% Stomp ConnInfo
|
%% Stomp ConnInfo
|
||||||
conninfo :: emqx_types:conninfo(),
|
conninfo :: stomp_conninfo(),
|
||||||
%% Stomp ClientInfo
|
%% Stomp ClientInfo
|
||||||
clientinfo :: emqx_types:clientinfo(),
|
clientinfo :: emqx_types:clientinfo(),
|
||||||
%% Stomp Heartbeats
|
%% Stomp Heartbeats
|
||||||
|
@ -104,10 +124,6 @@
|
||||||
awaiting_rel_max
|
awaiting_rel_max
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-dialyzer({nowarn_function, [ check_acl/3
|
|
||||||
, init/2
|
|
||||||
]}).
|
|
||||||
|
|
||||||
-type(pstate() :: #pstate{}).
|
-type(pstate() :: #pstate{}).
|
||||||
|
|
||||||
%% @doc Init protocol
|
%% @doc Init protocol
|
||||||
|
@ -687,12 +703,8 @@ backoff({Cx, Cy}) ->
|
||||||
parse_topic_filters(TopicFilters) ->
|
parse_topic_filters(TopicFilters) ->
|
||||||
lists:map(fun emqx_topic:parse/1, TopicFilters).
|
lists:map(fun emqx_topic:parse/1, TopicFilters).
|
||||||
|
|
||||||
check_acl(PubSub, Topic, State = #pstate{clientinfo = ClientInfo}) ->
|
check_acl(PubSub, Topic, #pstate{clientinfo = ClientInfo}) ->
|
||||||
case is_acl_enabled(State) andalso
|
emqx_access_control:check_acl(ClientInfo, PubSub, Topic).
|
||||||
emqx_access_control:check_acl(ClientInfo, PubSub, Topic) of
|
|
||||||
false -> allow;
|
|
||||||
Res -> Res
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_subscribe(TopicFilter, SubOpts,
|
do_subscribe(TopicFilter, SubOpts,
|
||||||
State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) ->
|
State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) ->
|
||||||
|
@ -728,10 +740,6 @@ find_sub_by_id(Id, Subs) ->
|
||||||
[Sub | _] -> Sub
|
[Sub | _] -> Sub
|
||||||
end.
|
end.
|
||||||
|
|
||||||
is_acl_enabled(_) ->
|
|
||||||
%% TODO: configs from somewhere
|
|
||||||
true.
|
|
||||||
|
|
||||||
%% automaticly fill the next sub-id and ack if sub-id is absent
|
%% automaticly fill the next sub-id and ack if sub-id is absent
|
||||||
enrich_sub_opts(SubOpts0, Subs) ->
|
enrich_sub_opts(SubOpts0, Subs) ->
|
||||||
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
|
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
|
||||||
|
|
|
@ -85,9 +85,13 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-record(route, {
|
-record(route, {
|
||||||
topic :: binary(),
|
topic,
|
||||||
dest :: node() | {binary(), node()}
|
dest
|
||||||
}).
|
}).
|
||||||
|
-type route() :: #route{
|
||||||
|
topic :: binary(),
|
||||||
|
dest :: node() | {binary(), node()}
|
||||||
|
}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Plugin
|
%% Plugin
|
||||||
|
@ -132,4 +136,3 @@
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
|
|
|
@ -133,12 +133,6 @@ get_telemetry() ->
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% This is to suppress dialyzer warnings for mnesia:dirty_write and
|
|
||||||
%% dirty_read race condition. Given that the init function is not evaluated
|
|
||||||
%% concurrently in one node, it should be free of race condition.
|
|
||||||
%% Given the chance of having two nodes bootstraping with the write
|
|
||||||
%% is very small, it should be safe to ignore.
|
|
||||||
-dialyzer([{nowarn_function, [init/1]}]).
|
|
||||||
init([Opts]) ->
|
init([Opts]) ->
|
||||||
State = #state{url = get_value(url, Opts),
|
State = #state{url = get_value(url, Opts),
|
||||||
report_interval = timer:seconds(get_value(report_interval, Opts))},
|
report_interval = timer:seconds(get_value(report_interval, Opts))},
|
||||||
|
|
|
@ -191,10 +191,8 @@ init([Opts]) ->
|
||||||
size_limit = SizeLimit,
|
size_limit = SizeLimit,
|
||||||
validity_period = ValidityPeriod})}.
|
validity_period = ValidityPeriod})}.
|
||||||
|
|
||||||
%% suppress dialyzer warning due to dirty read/write race condition.
|
|
||||||
%% TODO: change from dirty_read/write to transactional.
|
%% TODO: change from dirty_read/write to transactional.
|
||||||
%% TODO: handle mnesia write errors.
|
%% TODO: handle mnesia write errors.
|
||||||
-dialyzer([{nowarn_function, [handle_call/3]}]).
|
|
||||||
handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Actions}) ->
|
handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Actions}) ->
|
||||||
case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
|
case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
|
||||||
[#activated_alarm{name = Name}] ->
|
[#activated_alarm{name = Name}] ->
|
||||||
|
|
|
@ -87,7 +87,7 @@
|
||||||
%% Quota checkers
|
%% Quota checkers
|
||||||
quota :: maybe(emqx_limiter:limiter()),
|
quota :: maybe(emqx_limiter:limiter()),
|
||||||
%% Timers
|
%% Timers
|
||||||
timers :: #{atom() => disabled | maybe(reference())},
|
timers :: #{channel_timer() => disabled | maybe(reference())},
|
||||||
%% Conn State
|
%% Conn State
|
||||||
conn_state :: conn_state(),
|
conn_state :: conn_state(),
|
||||||
%% Takeover
|
%% Takeover
|
||||||
|
@ -109,6 +109,13 @@
|
||||||
|
|
||||||
-type(replies() :: emqx_types:packet() | reply() | [reply()]).
|
-type(replies() :: emqx_types:packet() | reply() | [reply()]).
|
||||||
|
|
||||||
|
-type(channel_timer() :: alive_timer
|
||||||
|
| retry_timer
|
||||||
|
| await_timer
|
||||||
|
| expire_timer
|
||||||
|
| will_timer
|
||||||
|
| quota_timer).
|
||||||
|
|
||||||
-define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
|
-define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
|
||||||
|
|
||||||
-define(TIMER_TABLE, #{
|
-define(TIMER_TABLE, #{
|
||||||
|
@ -122,8 +129,6 @@
|
||||||
|
|
||||||
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
|
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
|
||||||
|
|
||||||
-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Info, Attrs and Caps
|
%% Info, Attrs and Caps
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -244,7 +249,6 @@ setting_peercert_infos(Peercert, ClientInfo, Options) ->
|
||||||
ClientId = peer_cert_as(peer_cert_as_clientid, Options, Peercert, DN, CN),
|
ClientId = peer_cert_as(peer_cert_as_clientid, Options, Peercert, DN, CN),
|
||||||
ClientInfo#{username => Username, clientid => ClientId, dn => DN, cn => CN}.
|
ClientInfo#{username => Username, clientid => ClientId, dn => DN, cn => CN}.
|
||||||
|
|
||||||
-dialyzer([{nowarn_function, [peer_cert_as/5]}]).
|
|
||||||
% esockd_peercert:peercert is opaque
|
% esockd_peercert:peercert is opaque
|
||||||
% https://github.com/emqx/esockd/blob/master/src/esockd_peercert.erl
|
% https://github.com/emqx/esockd/blob/master/src/esockd_peercert.erl
|
||||||
peer_cert_as(Key, Options, Peercert, DN, CN) ->
|
peer_cert_as(Key, Options, Peercert, DN, CN) ->
|
||||||
|
@ -1113,12 +1117,8 @@ handle_timeout(_TRef, Msg, Channel) ->
|
||||||
%% Ensure timers
|
%% Ensure timers
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
ensure_timer([Name], Channel) ->
|
-spec ensure_timer(channel_timer(), channel()) -> channel().
|
||||||
ensure_timer(Name, Channel);
|
ensure_timer(Name, Channel = #channel{timers = Timers}) when is_atom(Name) ->
|
||||||
ensure_timer([Name | Rest], Channel) ->
|
|
||||||
ensure_timer(Rest, ensure_timer(Name, Channel));
|
|
||||||
|
|
||||||
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
|
|
||||||
TRef = maps:get(Name, Timers, undefined),
|
TRef = maps:get(Name, Timers, undefined),
|
||||||
Time = interval(Name, Channel),
|
Time = interval(Name, Channel),
|
||||||
case TRef == undefined andalso Time > 0 of
|
case TRef == undefined andalso Time > 0 of
|
||||||
|
@ -1126,6 +1126,7 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) ->
|
||||||
false -> Channel %% Timer disabled or exists
|
false -> Channel %% Timer disabled or exists
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec ensure_timer(channel_timer(), timeout(), channel()) -> channel().
|
||||||
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
|
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
|
||||||
Msg = maps:get(Name, ?TIMER_TABLE),
|
Msg = maps:get(Name, ?TIMER_TABLE),
|
||||||
TRef = emqx_misc:start_timer(Time, Msg),
|
TRef = emqx_misc:start_timer(Time, Msg),
|
||||||
|
@ -1140,16 +1141,13 @@ reset_timer(Name, Time, Channel) ->
|
||||||
clean_timer(Name, Channel = #channel{timers = Timers}) ->
|
clean_timer(Name, Channel = #channel{timers = Timers}) ->
|
||||||
Channel#channel{timers = maps:remove(Name, Timers)}.
|
Channel#channel{timers = maps:remove(Name, Timers)}.
|
||||||
|
|
||||||
|
-spec interval(channel_timer(), channel()) -> timeout().
|
||||||
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
||||||
emqx_keepalive:info(interval, KeepAlive);
|
emqx_keepalive:info(interval, KeepAlive);
|
||||||
interval(retry_timer, #channel{session = Session}) ->
|
interval(retry_timer, #channel{session = Session}) ->
|
||||||
timer:seconds(emqx_session:info(retry_interval, Session));
|
timer:seconds(emqx_session:info(retry_interval, Session));
|
||||||
interval(await_timer, #channel{session = Session}) ->
|
interval(await_timer, #channel{session = Session}) ->
|
||||||
timer:seconds(emqx_session:info(await_rel_timeout, Session));
|
timer:seconds(emqx_session:info(await_rel_timeout, Session)).
|
||||||
interval(expire_timer, #channel{conninfo = ConnInfo}) ->
|
|
||||||
timer:seconds(maps:get(expiry_interval, ConnInfo));
|
|
||||||
interval(will_timer, #channel{will_msg = WillMsg}) ->
|
|
||||||
timer:seconds(will_delay_interval(WillMsg)).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Terminate
|
%% Terminate
|
||||||
|
@ -1757,8 +1755,6 @@ shutdown(success, Reply, Channel) ->
|
||||||
shutdown(Reason, Reply, Channel) ->
|
shutdown(Reason, Reply, Channel) ->
|
||||||
{shutdown, Reason, Reply, Channel}.
|
{shutdown, Reason, Reply, Channel}.
|
||||||
|
|
||||||
shutdown(success, Reply, Packet, Channel) ->
|
|
||||||
shutdown(normal, Reply, Packet, Channel);
|
|
||||||
shutdown(Reason, Reply, Packet, Channel) ->
|
shutdown(Reason, Reply, Packet, Channel) ->
|
||||||
{shutdown, Reason, Reply, Packet, Channel}.
|
{shutdown, Reason, Reply, Packet, Channel}.
|
||||||
|
|
||||||
|
|
|
@ -125,12 +125,6 @@
|
||||||
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
|
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
|
||||||
|
|
||||||
-dialyzer({no_match, [info/2]}).
|
-dialyzer({no_match, [info/2]}).
|
||||||
-dialyzer({nowarn_function, [ init/4
|
|
||||||
, init_state/3
|
|
||||||
, run_loop/2
|
|
||||||
, system_terminate/4
|
|
||||||
, system_code_change/4
|
|
||||||
]}).
|
|
||||||
|
|
||||||
-spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist())
|
-spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist())
|
||||||
-> {ok, pid()}).
|
-> {ok, pid()}).
|
||||||
|
@ -286,8 +280,8 @@ run_loop(Parent, State = #state{transport = Transport,
|
||||||
peername = Peername,
|
peername = Peername,
|
||||||
channel = Channel}) ->
|
channel = Channel}) ->
|
||||||
emqx_logger:set_metadata_peername(esockd:format(Peername)),
|
emqx_logger:set_metadata_peername(esockd:format(Peername)),
|
||||||
emqx_misc:tune_heap_size(emqx_zone:oom_policy(
|
_ = emqx_misc:tune_heap_size(emqx_zone:oom_policy(
|
||||||
emqx_channel:info(zone, Channel))),
|
emqx_channel:info(zone, Channel))),
|
||||||
case activate_socket(State) of
|
case activate_socket(State) of
|
||||||
{ok, NState} -> hibernate(Parent, NState);
|
{ok, NState} -> hibernate(Parent, NState);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
|
|
@ -78,8 +78,6 @@
|
||||||
%% 16#0D,16#0A, 16#0D,16#0A,16#00,16#0D,16#0A,16#51,16#55,16#49,16#54,16#0A
|
%% 16#0D,16#0A, 16#0D,16#0A,16#00,16#0D,16#0A,16#51,16#55,16#49,16#54,16#0A
|
||||||
-define(PPV2_HEADER_SIG, "\r\n\r\n\0\r\nQUIT\n").
|
-define(PPV2_HEADER_SIG, "\r\n\r\n\0\r\nQUIT\n").
|
||||||
|
|
||||||
-dialyzer({no_match, [serialize_utf8_string/2]}).
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([parse_variable_byte_integer/1]).
|
-export([parse_variable_byte_integer/1]).
|
||||||
-endif.
|
-endif.
|
||||||
|
@ -788,8 +786,6 @@ serialize_utf8_pair({Name, Value}) ->
|
||||||
serialize_binary_data(Bin) ->
|
serialize_binary_data(Bin) ->
|
||||||
[<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin].
|
[<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin].
|
||||||
|
|
||||||
serialize_utf8_string(undefined, false) ->
|
|
||||||
error(utf8_string_undefined);
|
|
||||||
serialize_utf8_string(undefined, true) ->
|
serialize_utf8_string(undefined, true) ->
|
||||||
<<>>;
|
<<>>;
|
||||||
serialize_utf8_string(String, _AllowNull) ->
|
serialize_utf8_string(String, _AllowNull) ->
|
||||||
|
|
|
@ -55,8 +55,6 @@
|
||||||
|
|
||||||
-type(limiter() :: #limiter{}).
|
-type(limiter() :: #limiter{}).
|
||||||
|
|
||||||
-dialyzer({nowarn_function, [consume/3]}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -41,10 +41,6 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-dialyzer({no_match, [ plugin_loaded/2
|
|
||||||
, plugin_unloaded/2
|
|
||||||
]}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -105,7 +101,7 @@ unload(PluginName) when is_atom(PluginName) ->
|
||||||
?LOG(error, "Plugin ~s is not started", [PluginName]),
|
?LOG(error, "Plugin ~s is not started", [PluginName]),
|
||||||
{error, not_started};
|
{error, not_started};
|
||||||
{_, _} ->
|
{_, _} ->
|
||||||
unload_plugin(PluginName, true)
|
unload_plugin(PluginName)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
reload(PluginName) when is_atom(PluginName)->
|
reload(PluginName) when is_atom(PluginName)->
|
||||||
|
@ -384,10 +380,11 @@ start_app(App, SuccFun) ->
|
||||||
{error, {ErrApp, Reason}}
|
{error, {ErrApp, Reason}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
unload_plugin(App, Persistent) ->
|
unload_plugin(App) ->
|
||||||
case stop_app(App) of
|
case stop_app(App) of
|
||||||
ok ->
|
ok ->
|
||||||
_ = plugin_unloaded(App, Persistent), ok;
|
_ = plugin_unloaded(App),
|
||||||
|
ok;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
@ -428,9 +425,7 @@ plugin_loaded(Name, true) ->
|
||||||
?LOG(error, "Cannot read loaded plugins: ~p", [Error])
|
?LOG(error, "Cannot read loaded plugins: ~p", [Error])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
plugin_unloaded(_Name, false) ->
|
plugin_unloaded(Name) ->
|
||||||
ok;
|
|
||||||
plugin_unloaded(Name, true) ->
|
|
||||||
case read_loaded() of
|
case read_loaded() of
|
||||||
{ok, Names0} ->
|
{ok, Names0} ->
|
||||||
Names = filter_plugins(Names0),
|
Names = filter_plugins(Names0),
|
||||||
|
|
|
@ -266,9 +266,6 @@ maybe_trans(Fun, Args) ->
|
||||||
end, [])
|
end, [])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% The created fun only terminates with explicit exception
|
|
||||||
-dialyzer({nowarn_function, [trans/2]}).
|
|
||||||
|
|
||||||
-spec(trans(function(), list(any())) -> ok | {error, term()}).
|
-spec(trans(function(), list(any())) -> ok | {error, term()}).
|
||||||
trans(Fun, Args) ->
|
trans(Fun, Args) ->
|
||||||
{WPid, RefMon} =
|
{WPid, RefMon} =
|
||||||
|
@ -277,13 +274,7 @@ trans(Fun, Args) ->
|
||||||
%% are caught by mnesia:transaction/2.
|
%% are caught by mnesia:transaction/2.
|
||||||
%% Future changes should keep in mind that this process
|
%% Future changes should keep in mind that this process
|
||||||
%% always exit with database write result.
|
%% always exit with database write result.
|
||||||
fun() ->
|
make_trans(Fun, Args)),
|
||||||
Res = case mnesia:transaction(Fun, Args) of
|
|
||||||
{atomic, Ok} -> Ok;
|
|
||||||
{aborted, Reason} -> {error, Reason}
|
|
||||||
end,
|
|
||||||
exit({shutdown, Res})
|
|
||||||
end),
|
|
||||||
%% Receive a 'shutdown' exit to pass result from the short-lived process.
|
%% Receive a 'shutdown' exit to pass result from the short-lived process.
|
||||||
%% so the receive below can be receive-mark optimized by the compiler.
|
%% so the receive below can be receive-mark optimized by the compiler.
|
||||||
%%
|
%%
|
||||||
|
@ -300,6 +291,16 @@ trans(Fun, Args) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec make_trans(fun((...) -> term()), [term()]) -> fun(() -> no_return()).
|
||||||
|
make_trans(Fun, Args) ->
|
||||||
|
fun() ->
|
||||||
|
Res = case mnesia:transaction(Fun, Args) of
|
||||||
|
{atomic, Ok} -> Ok;
|
||||||
|
{aborted, Reason} -> {error, Reason}
|
||||||
|
end,
|
||||||
|
exit({shutdown, Res})
|
||||||
|
end.
|
||||||
|
|
||||||
lock_router() ->
|
lock_router() ->
|
||||||
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
|
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
|
||||||
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
|
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
|
||||||
|
|
|
@ -53,8 +53,6 @@
|
||||||
-define(ROUTING_NODE, emqx_routing_node).
|
-define(ROUTING_NODE, emqx_routing_node).
|
||||||
-define(LOCK, {?MODULE, cleanup_routes}).
|
-define(LOCK, {?MODULE, cleanup_routes}).
|
||||||
|
|
||||||
-dialyzer({nowarn_function, [cleanup_routes/1]}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -176,4 +174,3 @@ cleanup_routes(Node) ->
|
||||||
#route{_ = '_', dest = {'_', Node}}],
|
#route{_ = '_', dest = {'_', Node}}],
|
||||||
[mnesia:delete_object(?ROUTE, Route, write)
|
[mnesia:delete_object(?ROUTE, Route, write)
|
||||||
|| Pat <- Patterns, Route <- mnesia:match_object(?ROUTE, Pat, write)].
|
|| Pat <- Patterns, Route <- mnesia:match_object(?ROUTE, Pat, write)].
|
||||||
|
|
||||||
|
|
|
@ -59,8 +59,6 @@
|
||||||
L =:= info orelse
|
L =:= info orelse
|
||||||
L =:= debug).
|
L =:= debug).
|
||||||
|
|
||||||
-dialyzer({nowarn_function, [install_trace_handler/3]}).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -201,7 +201,6 @@
|
||||||
-type(deliver_result() :: ok | {ok, non_neg_integer()} | {error, term()}).
|
-type(deliver_result() :: ok | {ok, non_neg_integer()} | {error, term()}).
|
||||||
-type(publish_result() :: [{node(), topic(), deliver_result()} |
|
-type(publish_result() :: [{node(), topic(), deliver_result()} |
|
||||||
{share, topic(), deliver_result()}]).
|
{share, topic(), deliver_result()}]).
|
||||||
-type(route() :: #route{}).
|
|
||||||
-type(sub_group() :: tuple() | binary()).
|
-type(sub_group() :: tuple() | binary()).
|
||||||
-type(route_entry() :: {topic(), node()} | {topic, sub_group()}).
|
-type(route_entry() :: {topic(), node()} | {topic, sub_group()}).
|
||||||
-type(plugin() :: #plugin{}).
|
-type(plugin() :: #plugin{}).
|
||||||
|
@ -215,4 +214,3 @@
|
||||||
-type(oom_policy() :: #{message_queue_len => non_neg_integer(),
|
-type(oom_policy() :: #{message_queue_len => non_neg_integer(),
|
||||||
max_heap_size => non_neg_integer()
|
max_heap_size => non_neg_integer()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
|
|
@ -100,7 +100,6 @@
|
||||||
-define(ENABLED(X), (X =/= undefined)).
|
-define(ENABLED(X), (X =/= undefined)).
|
||||||
|
|
||||||
-dialyzer({no_match, [info/2]}).
|
-dialyzer({no_match, [info/2]}).
|
||||||
-dialyzer({nowarn_function, [websocket_init/1]}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Info, Stats
|
%% Info, Stats
|
||||||
|
@ -304,7 +303,7 @@ websocket_init([Req, Opts]) ->
|
||||||
%% MQTT Idle Timeout
|
%% MQTT Idle Timeout
|
||||||
IdleTimeout = emqx_zone:idle_timeout(Zone),
|
IdleTimeout = emqx_zone:idle_timeout(Zone),
|
||||||
IdleTimer = start_timer(IdleTimeout, idle_timeout),
|
IdleTimer = start_timer(IdleTimeout, idle_timeout),
|
||||||
emqx_misc:tune_heap_size(emqx_zone:oom_policy(Zone)),
|
_ = emqx_misc:tune_heap_size(emqx_zone:oom_policy(Zone)),
|
||||||
emqx_logger:set_metadata_peername(esockd:format(Peername)),
|
emqx_logger:set_metadata_peername(esockd:format(Peername)),
|
||||||
{ok, #state{peername = Peername,
|
{ok, #state{peername = Peername,
|
||||||
sockname = Sockname,
|
sockname = Sockname,
|
||||||
|
@ -777,4 +776,3 @@ get_peer(Req, Opts) ->
|
||||||
set_field(Name, Value, State) ->
|
set_field(Name, Value, State) ->
|
||||||
Pos = emqx_misc:index_of(Name, record_info(fields, state)),
|
Pos = emqx_misc:index_of(Name, record_info(fields, state)),
|
||||||
setelement(Pos+1, State, Value).
|
setelement(Pos+1, State, Value).
|
||||||
|
|
||||||
|
|
|
@ -160,8 +160,8 @@ t_plugin_loaded(_) ->
|
||||||
?assertEqual(ok, emqx_plugins:plugin_loaded(emqx_mini_plugin, true)).
|
?assertEqual(ok, emqx_plugins:plugin_loaded(emqx_mini_plugin, true)).
|
||||||
|
|
||||||
t_plugin_unloaded(_) ->
|
t_plugin_unloaded(_) ->
|
||||||
?assertEqual(ok, emqx_plugins:plugin_unloaded(emqx_mini_plugin, false)),
|
?assertEqual(ok, emqx_plugins:plugin_unloaded(emqx_mini_plugin)),
|
||||||
?assertEqual(ok, emqx_plugins:plugin_unloaded(emqx_mini_plugin, true)).
|
?assertEqual(ok, emqx_plugins:plugin_unloaded(emqx_mini_plugin)).
|
||||||
|
|
||||||
t_plugin(_) ->
|
t_plugin(_) ->
|
||||||
try
|
try
|
||||||
|
@ -199,8 +199,8 @@ t_unload_plugin(_) ->
|
||||||
(error_app) -> {error, error};
|
(error_app) -> {error, error};
|
||||||
(_) -> ok end),
|
(_) -> ok end),
|
||||||
|
|
||||||
?assertEqual(ok, emqx_plugins:unload_plugin(not_started_app, true)),
|
?assertEqual(ok, emqx_plugins:unload_plugin(not_started_app)),
|
||||||
?assertEqual(ok, emqx_plugins:unload_plugin(normal, true)),
|
?assertEqual(ok, emqx_plugins:unload_plugin(normal)),
|
||||||
?assertEqual({error,error}, emqx_plugins:unload_plugin(error_app, true)),
|
?assertEqual({error,error}, emqx_plugins:unload_plugin(error_app)),
|
||||||
|
|
||||||
ok = meck:unload(application).
|
ok = meck:unload(application).
|
||||||
|
|
Loading…
Reference in New Issue