chore(gw): fix dialyzer warnings

This commit is contained in:
JianBo He 2021-07-21 20:37:24 +08:00 committed by turtleDeng
parent fd2c3fe37b
commit 46ae179a7a
9 changed files with 87 additions and 76 deletions

View File

@ -71,7 +71,9 @@ start_link() ->
, state => any() , state => any()
}. }.
-spec load(gateway_type(), registry_options(), gateway_options()) -> ok | {error, any()}. -spec load(gateway_type(), registry_options(), gateway_options())
-> ok
| {error, any()}.
load(Type, RgOpts, GwOpts) -> load(Type, RgOpts, GwOpts) ->
CbMod = proplists:get_value(cbkmod, RgOpts, Type), CbMod = proplists:get_value(cbkmod, RgOpts, Type),
Dscrptr = #{ cbkmod => CbMod Dscrptr = #{ cbkmod => CbMod

View File

@ -163,12 +163,21 @@ init(ConnInfo = #{socktype := Socktype,
try_dispatch(on_socket_created, wrap(Req), Channel). try_dispatch(on_socket_created, wrap(Req), Channel).
%% @private %% @private
peercert(nossl, ConnInfo) -> peercert(NoSsl, ConnInfo) when NoSsl == nossl;
NoSsl == undefined ->
ConnInfo; ConnInfo;
peercert(Peercert, ConnInfo) -> peercert(Peercert, ConnInfo) ->
ConnInfo#{peercert => Fn = fun(_, V) -> V =/= undefined end,
Infos = maps:filter(Fn,
#{cn => esockd_peercert:common_name(Peercert), #{cn => esockd_peercert:common_name(Peercert),
dn => esockd_peercert:subject(Peercert)}}. dn => esockd_peercert:subject(Peercert)}
),
case maps:size(Infos) of
0 ->
ConnInfo;
_ ->
ConnInfo#{peercert => Infos}
end.
%% @private %% @private
socktype(tcp) -> 'TCP'; socktype(tcp) -> 'TCP';

View File

@ -106,12 +106,8 @@ start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) ->
%% tcp/ssl/dtls %% tcp/ssl/dtls
start_link(esockd_transport, Sock, Options) -> start_link(esockd_transport, Sock, Options) ->
Socket = {esockd_transport, Sock}, Socket = {esockd_transport, Sock},
case esockd_transport:peername(Sock) of Args = [self(), Socket, undefined, Options],
{ok, Peername} -> {ok, proc_lib:spawn_link(?MODULE, init, Args)}.
Args = [self(), Socket, Peername, Options],
{ok, proc_lib:spawn_link(?MODULE, init, Args)};
R = {error, _} -> R
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
@ -170,6 +166,12 @@ stop(Pid) ->
%% Wrapped funcs %% Wrapped funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
esockd_peername({udp, _SockPid, _Sock}, Peername) ->
Peername;
esockd_peername({esockd_transport, Sock}, _Peername) ->
{ok, Peername} = esockd_transport:ensure_ok_or_exit(peername, [Sock]),
Peername.
esockd_wait(Socket = {udp, _SockPid, _Sock}) -> esockd_wait(Socket = {udp, _SockPid, _Sock}) ->
{ok, Socket}; {ok, Socket};
esockd_wait({esockd_transport, Sock}) -> esockd_wait({esockd_transport, Sock}) ->
@ -222,9 +224,10 @@ send(Data, #state{socket = {esockd_transport, Sock}}) ->
-define(DEFAULT_OOM_POLICY, #{enable => true, max_heap_size => 4194304, -define(DEFAULT_OOM_POLICY, #{enable => true, max_heap_size => 4194304,
max_message_queue_len => 32000}). max_message_queue_len => 32000}).
init(Parent, WrappedSock, Peername, Options) -> init(Parent, WrappedSock, Peername0, Options) ->
case esockd_wait(WrappedSock) of case esockd_wait(WrappedSock) of
{ok, NWrappedSock} -> {ok, NWrappedSock} ->
Peername = esockd_peername(NWrappedSock, Peername0),
run_loop(Parent, init_state(NWrappedSock, Peername, Options)); run_loop(Parent, init_state(NWrappedSock, Peername, Options));
{error, Reason} -> {error, Reason} ->
ok = esockd_close(WrappedSock), ok = esockd_close(WrappedSock),

View File

@ -720,8 +720,9 @@ run_gc(Stats, State = #state{gc_state = GcSt}) ->
check_oom(State = #state{oom_policy = OomPolicy}) -> check_oom(State = #state{oom_policy = OomPolicy}) ->
case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
Shutdown = {shutdown, _Reason} -> {shutdown, Reason} ->
erlang:send(self(), Shutdown); %% triggers terminate/2 callback immediately
erlang:exit({shutdown, Reason});
_Other -> ok _Other -> ok
end, end,
State. State.

View File

@ -79,7 +79,7 @@
| {event, conn_state()|updated} | {event, conn_state()|updated}
| {close, Reason :: atom()}). | {close, Reason :: atom()}).
-type(replies() :: emqx_stomp_frame:packet() | reply() | [reply()]). -type(replies() :: stomp_frame() | reply() | [reply()]).
-define(TIMER_TABLE, #{ -define(TIMER_TABLE, #{
incoming_timer => incoming, incoming_timer => incoming,
@ -97,11 +97,6 @@
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
-dialyzer({nowarn_function, [init/2,enrich_conninfo/2,ensure_connected/1,
process_connect/1,handle_in/2,handle_info/2,
ensure_disconnected/2,reverse_heartbeats/1,
negotiate_version/2]}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Init the channel %% Init the channel
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -136,6 +131,7 @@ init(ConnInfo = #{peername := {PeerHost, _},
, clientinfo_override = Override , clientinfo_override = Override
, timers = #{} , timers = #{}
, transaction = #{} , transaction = #{}
, conn_state = idle
}. }.
setting_peercert_infos(NoSSL, ClientInfo) setting_peercert_infos(NoSSL, ClientInfo)
@ -180,7 +176,7 @@ enrich_conninfo(_Packet,
Channel = #channel{conninfo = ConnInfo}) -> Channel = #channel{conninfo = ConnInfo}) ->
%% XXX: How enrich more infos? %% XXX: How enrich more infos?
NConnInfo = ConnInfo#{ proto_name => <<"STOMP">> NConnInfo = ConnInfo#{ proto_name => <<"STOMP">>
, proto_ver => undefined , proto_ver => <<"1.2">>
, clean_start => true , clean_start => true
, keepalive => 0 , keepalive => 0
, expiry_interval => 0 , expiry_interval => 0
@ -320,7 +316,7 @@ process_connect(Channel = #channel{
%% Handle incoming packet %% Handle incoming packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec handle_in(emqx_types:packet(), channel()) -spec handle_in(stomp_frame() | {frame_error, any()}, channel())
-> {ok, channel()} -> {ok, channel()}
| {ok, replies(), channel()} | {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()} | {shutdown, Reason :: term(), channel()}
@ -467,12 +463,12 @@ handle_in(?PACKET(?CMD_COMMIT, Headers), Channel) ->
case trans_pipeline(lists:reverse(Actions), [], Chann0) of case trans_pipeline(lists:reverse(Actions), [], Chann0) of
{ok, Outgoings, Chann1} -> {ok, Outgoings, Chann1} ->
maybe_outgoing_receipt(receipt_id(Headers), Outgoings, Chann1); maybe_outgoing_receipt(receipt_id(Headers), Outgoings, Chann1);
{error, Reason} -> {error, Reason, Chann1} ->
%% FIXME: atomic for transaction ?? %% FIXME: atomic for transaction ??
ErrMsg = io_lib:format("Execute transaction ~s falied: ~0p", ErrMsg = io_lib:format("Execute transaction ~s falied: ~0p",
[TxId, Reason] [TxId, Reason]
), ),
handle_out(error, {receipt_id(Headers), ErrMsg}, Chann0) handle_out(error, {receipt_id(Headers), ErrMsg}, Chann1)
end end
end); end);
@ -563,13 +559,15 @@ handle_out(receipt, ReceiptId, Channel) ->
-spec(handle_call(Req :: term(), channel()) -spec(handle_call(Req :: term(), channel())
-> {reply, Reply :: term(), channel()} -> {reply, Reply :: term(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}). | {shutdown, Reason :: term(), Reply :: term(), stomp_frame(), channel()}).
handle_call(kick, Channel) -> handle_call(kick, Channel) ->
NChannel = ensure_disconnected(kicked, Channel), NChannel = ensure_disconnected(kicked, Channel),
shutdown_and_reply(kicked, ok, NChannel); Frame = error_frame(undefined, <<"Kicked out">>),
shutdown_and_reply(kicked, ok, Frame, NChannel);
handle_call(discard, Channel) -> handle_call(discard, Channel) ->
shutdown_and_reply(discarded, ok, Channel); Frame = error_frame(undefined, <<"Discarded">>),
shutdown_and_reply(discarded, ok, Frame, Channel);
%% XXX: No Session Takeover %% XXX: No Session Takeover
%handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> %handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
@ -785,8 +783,11 @@ shutdown_with_recepit(Reason, ReceiptId, Channel) ->
shutdown(Reason, AckFrame, Channel) -> shutdown(Reason, AckFrame, Channel) ->
{shutdown, Reason, AckFrame, Channel}. {shutdown, Reason, AckFrame, Channel}.
shutdown_and_reply(Reason, Reply, Channel) -> %shutdown_and_reply(Reason, Reply, Channel) ->
{shutdown, Reason, Reply, Channel}. % {shutdown, Reason, Reply, Channel}.
shutdown_and_reply(Reason, Reply, OutPkt, Channel) ->
{shutdown, Reason, Reply, OutPkt, Channel}.
do_negotiate_version(undefined) -> do_negotiate_version(undefined) ->
{ok, <<"1.0">>}; {ok, <<"1.0">>};

View File

@ -87,11 +87,13 @@
%% GC State %% GC State
gc_state :: emqx_gc:gc_state() | undefined, gc_state :: emqx_gc:gc_state() | undefined,
%% Stats Timer %% Stats Timer
stats_timer :: disabled | reference(), stats_timer :: disabled | reference() | undefined,
%% Idle Timeout %% Idle Timeout
idle_timeout :: integer(), idle_timeout :: integer(),
%% Idle Timer %% Idle Timer
idle_timer :: reference() | undefined idle_timer :: reference() | undefined,
%% OOM Policy
oom_policy :: emqx_types:oom_policy() | undefined
}). }).
-type(state() :: #state{}). -type(state() :: #state{}).
@ -121,11 +123,9 @@
, system_code_change/4 , system_code_change/4
]}). ]}).
-dialyzer({nowarn_function, [ensure_stats_timer/2,cancel_stats_timer/1, -dialyzer({no_match, [ handle_call/3
terminate/2,handle_call/3,handle_timeout/3, , serialize_and_inc_stats_fun/1
parse_incoming/3,serialize_and_inc_stats_fun/1, ]}).
check_oom/1,inc_incoming_stats/1,
inc_outgoing_stats/1]}).
-spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist()) -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist())
-> {ok, pid()}). -> {ok, pid()}).
@ -244,9 +244,7 @@ init_state(Transport, Socket, Options) ->
peername => Peername, peername => Peername,
sockname => Sockname, sockname => Sockname,
peercert => Peercert, peercert => Peercert,
conn_mod => ?MODULE, conn_mod => ?MODULE
zone => default,
listener => mqtt_tcp
}, },
ActiveN = emqx_gateway_utils:active_n(Options), ActiveN = emqx_gateway_utils:active_n(Options),
%% TODO: RateLimit ? How ? %% TODO: RateLimit ? How ?
@ -260,6 +258,7 @@ init_state(Transport, Socket, Options) ->
GcState = emqx_gateway_utils:init_gc_state(Options), GcState = emqx_gateway_utils:init_gc_state(Options),
StatsTimer = emqx_gateway_utils:stats_timer(Options), StatsTimer = emqx_gateway_utils:stats_timer(Options),
IdleTimeout = emqx_gateway_utils:idle_timeout(Options), IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
OomPolicy = emqx_gateway_utils:oom_policy(Options),
IdleTimer = emqx_misc:start_timer(IdleTimeout, idle_timeout), IdleTimer = emqx_misc:start_timer(IdleTimeout, idle_timeout),
#state{transport = Transport, #state{transport = Transport,
socket = Socket, socket = Socket,
@ -274,17 +273,16 @@ init_state(Transport, Socket, Options) ->
gc_state = GcState, gc_state = GcState,
stats_timer = StatsTimer, stats_timer = StatsTimer,
idle_timeout = IdleTimeout, idle_timeout = IdleTimeout,
idle_timer = IdleTimer idle_timer = IdleTimer,
oom_policy = OomPolicy
}. }.
run_loop(Parent, State = #state{transport = Transport, run_loop(Parent, State = #state{transport = Transport,
socket = Socket, socket = Socket,
peername = Peername, peername = Peername,
channel = _Channel}) -> oom_policy = OomPolicy}) ->
emqx_logger:set_metadata_peername(esockd:format(Peername)), emqx_logger:set_metadata_peername(esockd:format(Peername)),
% TODO: How yo get oom_policy ??? _ = emqx_misc:tune_heap_size(OomPolicy),
%emqx_misc:tune_heap_size(emqx_gateway_utils:oom_policy(
% emqx_stomp_channel:info(zone, Channel))),
case activate_socket(State) of case activate_socket(State) of
{ok, NState} -> hibernate(Parent, NState); {ok, NState} -> hibernate(Parent, NState);
{error, Reason} -> {error, Reason} ->
@ -806,16 +804,12 @@ run_gc(Stats, State = #state{gc_state = GcSt}) ->
State#state{gc_state = GcSt1} State#state{gc_state = GcSt1}
end. end.
check_oom(State = #state{channel = Channel}) -> check_oom(State = #state{oom_policy = OomPolicy}) ->
Zone = emqx_stomp_channel:info(zone, Channel),
OomPolicy = emqx_gateway_utils:oom_policy(Zone),
?tp(debug, check_oom, #{policy => OomPolicy}),
case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
{shutdown, Reason} -> {shutdown, Reason} ->
%% triggers terminate/2 callback immediately %% triggers terminate/2 callback immediately
erlang:exit({shutdown, Reason}); erlang:exit({shutdown, Reason});
_Other -> _Other -> ok
ok
end, end,
State. State.
@ -847,28 +841,29 @@ close_socket(State = #state{transport = Transport, socket = Socket}) ->
%% Inc incoming/outgoing stats %% Inc incoming/outgoing stats
%% XXX: Other packet type? %% XXX: Other packet type?
inc_incoming_stats(Packet = ?PACKET(Type)) -> inc_incoming_stats(_Packet) ->
inc_counter(recv_pkt, 1), inc_counter(recv_pkt, 1),
case Type =:= ?CMD_SEND of ok.
true -> %case Type =:= ?CMD_SEND of
inc_counter(recv_msg, 1), % true ->
inc_counter(incoming_pubs, 1); % inc_counter(recv_msg, 1),
false -> % inc_counter(incoming_pubs, 1);
ok % false ->
end, % ok
%% FIXME: %end,
emqx_metrics:inc_recv(Packet). %emqx_metrics:inc_recv(Packet).
inc_outgoing_stats(Packet = ?PACKET(Type)) -> inc_outgoing_stats(_Packet) ->
inc_counter(send_pkt, 1), inc_counter(send_pkt, 1),
case Type =:= ?CMD_MESSAGE of ok.
true -> %case Type =:= ?CMD_MESSAGE of
inc_counter(send_msg, 1), % true ->
inc_counter(outgoing_pubs, 1); % inc_counter(send_msg, 1),
false -> % inc_counter(outgoing_pubs, 1);
ok % false ->
end, % ok
emqx_metrics:inc_sent(Packet). %end,
%emqx_metrics:inc_sent(Packet).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions

View File

@ -98,7 +98,7 @@
-record(frame_limit, {max_header_num, max_header_length, max_body_length}). -record(frame_limit, {max_header_num, max_header_length, max_body_length}).
-type(parse_result() :: {ok, stomp_frame(), binary()} -type(parse_result() :: {ok, stomp_frame(), binary(), parse_state()}
| {more, parse_state()}). | {more, parse_state()}).
-type(parse_state() :: -type(parse_state() ::
@ -107,7 +107,7 @@
state := #parser_state{} state := #parser_state{}
}). }).
-dialyzer({nowarn_function, [serialize_pkt/2,make/1]}). %-dialyzer({nowarn_function, [serialize_pkt/2,make/1]}).
%% @doc Initialize a parser %% @doc Initialize a parser
-spec initial_parse_state(map()) -> parse_state(). -spec initial_parse_state(map()) -> parse_state().

View File

@ -33,20 +33,19 @@
-define(TCP_OPTS, [binary, {packet, raw}, {reuseaddr, true}, {nodelay, true}]). -define(TCP_OPTS, [binary, {packet, raw}, {reuseaddr, true}, {nodelay, true}]).
-dialyzer({nowarn_function, [load/0]}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec load() -> ok | {error, any()}.
load() -> load() ->
RegistryOptions = [ {cbkmod, ?MODULE} RegistryOptions = [ {cbkmod, ?MODULE}
, {schema, emqx_stomp_schema}
], ],
YourOptions = [param1, param2], YourOptions = [param1, param2],
emqx_gateway_registry:load(stomp, RegistryOptions, YourOptions). emqx_gateway_registry:load(stomp, RegistryOptions, YourOptions).
-spec unload() -> ok | {error, any()}.
unload() -> unload() ->
emqx_gateway_registry:unload(stomp). emqx_gateway_registry:unload(stomp).

View File

@ -44,15 +44,16 @@
-define(CMD_RECEIPT, <<"RECEIPT">>). -define(CMD_RECEIPT, <<"RECEIPT">>).
-define(CMD_ERROR, <<"ERROR">>). -define(CMD_ERROR, <<"ERROR">>).
-type client_command() :: binary().
%-type client_command() :: ?CMD_SEND | ?CMD_SUBSCRIBE | ?CMD_UNSUBSCRIBE %-type client_command() :: ?CMD_SEND | ?CMD_SUBSCRIBE | ?CMD_UNSUBSCRIBE
% | ?CMD_BEGIN | ?CMD_COMMIT | ?CMD_ABORT | ?CMD_ACK % | ?CMD_BEGIN | ?CMD_COMMIT | ?CMD_ABORT | ?CMD_ACK
% | ?CMD_NACK | ?CMD_DISCONNECT | ?CMD_CONNECT % | ?CMD_NACK | ?CMD_DISCONNECT | ?CMD_CONNECT
% | ?CMD_STOMP. % | ?CMD_STOMP.
% %
-type server_command() :: binary(). -type client_command() :: binary().
%-type server_command() :: ?CMD_CONNECTED | ?CMD_MESSAGE | ?CMD_RECEIPT %-type server_command() :: ?CMD_CONNECTED | ?CMD_MESSAGE | ?CMD_RECEIPT
% | ?CMD_ERROR. % | ?CMD_ERROR.
-type server_command() :: binary() | heartbeat.
-record(stomp_frame, { -record(stomp_frame, {
command :: client_command() | server_command(), command :: client_command() | server_command(),