chore(gw): fix dialyzer warnings

This commit is contained in:
JianBo He 2021-07-21 10:49:12 +08:00
parent 623fc67fc9
commit 6a99e1535f
6 changed files with 95 additions and 60 deletions

View File

@ -209,8 +209,10 @@ connection_closed(Type, ClientId) ->
-spec open_session(Type :: atom(), CleanStart :: boolean(),
ClientInfo :: emqx_types:clientinfo(),
ConnInfo :: emqx_types:conninfo(),
CreateSessionFun :: function())
-> {ok, #{session := map(),
CreateSessionFun :: fun((emqx_types:clientinfo(),
emqx_types:conninfo()) -> Session
))
-> {ok, #{session := Session,
present := boolean(),
pendings => list()
}}

View File

@ -86,8 +86,11 @@ authenticate(_Ctx, ClientInfo) ->
%% This function should be called after the client has authenticated
%% successfully so that the client can be managed in the cluster.
-spec open_session(context(), boolean(), emqx_types:clientinfo(),
emqx_types:conninfo(), function())
-> {ok, #{session := any(),
emqx_types:conninfo(),
fun((emqx_types:clientinfo(),
emqx_types:conninfo()) -> Session)
)
-> {ok, #{session := Session,
present := boolean(),
pendings => list()
}}

View File

@ -43,6 +43,8 @@
-define(ACTIVE_N, 100).
-define(DEFAULT_IDLE_TIMEOUT, 30000).
-define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304,
message_queue_len => 32000}).
-spec childspec(supervisor:worker(), Mod :: atom())
-> supervisor:child_spec().
@ -160,7 +162,7 @@ force_gc_policy(Options) ->
-spec oom_policy(map()) -> emqx_types:oom_policy().
oom_policy(Options) ->
maps:get(force_shutdown_policy, Options).
maps:get(force_shutdown_policy, Options, ?DEFAULT_OOM_POLICY).
-spec stats_timer(map()) -> undefined | disabled.
stats_timer(Options) ->

View File

@ -47,7 +47,7 @@
%% Context
ctx :: emqx_gateway_ctx:context(),
%% Registry
registry :: pid(),
registry :: emqx_sn_registry:registry(),
%% Gateway Id
gateway_id :: integer(),
%% Enable QoS3
@ -91,7 +91,6 @@
alive_timer => keepalive,
retry_timer => retry_delivery,
await_timer => expire_awaiting_rel,
expire_timer => expire_session,
asleep_timer => expire_asleep
}).
@ -190,7 +189,7 @@ set_conn_state(ConnState, Channel) ->
enrich_conninfo(?SN_CONNECT_MSG(_Flags, _ProtoId, Duration, _ClientId),
Channel = #channel{conninfo = ConnInfo}) ->
NConnInfo = ConnInfo#{ proto_name => <<"MQTT-SN">>
, proto_ver => "1.2"
, proto_ver => <<"1.2">>
, clean_start => true
, keepalive => Duration
, expiry_interval => 0
@ -299,12 +298,7 @@ process_connect(Channel = #channel{
conninfo = ConnInfo,
clientinfo = ClientInfo
}) ->
SessFun = fun(_,_) ->
%% TODO:
emqx_session:init(#{zone => undefined},
#{receive_maximum => 100}
)
end,
SessFun = fun(_,_) -> emqx_session:init(#{max_inflight => 1}) end,
case emqx_gateway_ctx:open_session(
Ctx,
true,
@ -336,7 +330,7 @@ ensure_keepalive_timer(Interval, Channel) ->
%% Handle incoming packet
%%--------------------------------------------------------------------
-spec handle_in(emqx_types:packet(), channel())
-spec handle_in(emqx_types:packet() | {frame_error, any()}, channel())
-> {ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}
@ -357,16 +351,15 @@ handle_in(?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
TopicId, _MsgId, Data),
Channel = #channel{conn_state = idle, registry = Registry}) ->
%% FIXME: check enable_qos3 ??
ClientId = undefined,
TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of
true ->
<<TopicId:16>>;
false ->
emqx_sn_registry:lookup_topic(
Registry,
ClientId,
?NEG_QOS_CLIENT_ID,
TopicId
);
true ->
<<TopicId:16>>
)
end,
_ = case TopicName =/= undefined of
true ->
@ -381,7 +374,7 @@ handle_in(?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
ok
end,
?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!",
[ClientId]),
[?NEG_QOS_CLIENT_ID]),
{ok, Channel};
handle_in(Pkt = #mqtt_sn_message{type = Type},
@ -409,8 +402,8 @@ handle_in(?SN_WILLTOPIC_EMPTY_MSG,
case auth_connect(fake_packet, Channel#channel{will_msg = undefined}) of
{ok, NChannel} ->
process_connect(ensure_connected(NChannel));
{error, ReasonCode, NChannel} ->
handle_out(connack, ReasonCode, NChannel)
{error, ReasonCode} ->
handle_out(connack, ReasonCode, Channel)
end;
handle_in(?SN_WILLTOPIC_MSG(Flags, Topic),
@ -429,8 +422,8 @@ handle_in(?SN_WILLMSG_MSG(Payload),
case auth_connect(fake_packet, Channel#channel{will_msg = NWillMsg}) of
{ok, NChannel} ->
process_connect(ensure_connected(NChannel));
{error, ReasonCode, NChannel} ->
handle_out(connack, ReasonCode, NChannel)
{error, ReasonCode} ->
handle_out(connack, ReasonCode, Channel)
end;
handle_in(Packet = ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId),
@ -892,9 +885,9 @@ do_unsubscribe(TopicName,
session = Session,
clientinfo = ClientInfo
= #{mountpoint := Mountpoint}}) ->
SubOpts = #{},
NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
case emqx_session:unsubscribe(ClientInfo, NTopicName, SubOpts, Session) of
case emqx_session:unsubscribe(ClientInfo, NTopicName,
?DEFAULT_SUBOPTS, Session) of
{ok, NSession} ->
{ok, Channel#channel{session = NSession}};
{error, ?RC_NO_SUBSCRIPTION_EXISTED} ->
@ -943,7 +936,7 @@ asleep(Duration, Channel = #channel{conn_state = connected}) ->
handle_out(connack, ?SN_RC_ACCEPTED,
Channel = #channel{ctx = Ctx, conninfo = ConnInfo}) ->
_ = run_hooks(Ctx, 'client.connack',
[ConnInfo, ?SN_RC_NAME(?SN_RC_ACCEPTED)],
[ConnInfo, returncode_name(?SN_RC_ACCEPTED)],
#{}
),
return_connack(?SN_CONNACK_MSG(?SN_RC_ACCEPTED),
@ -951,7 +944,7 @@ handle_out(connack, ?SN_RC_ACCEPTED,
handle_out(connack, ReasonCode,
Channel = #channel{ctx = Ctx, conninfo = ConnInfo}) ->
Reason = ?SN_RC_NAME(ReasonCode),
Reason = returncode_name(ReasonCode),
_ = run_hooks(Ctx, 'client.connack', [ConnInfo, Reason], #{}),
AckPacket = ?SN_CONNACK_MSG(ReasonCode),
shutdown(Reason, AckPacket, Channel);
@ -1281,7 +1274,45 @@ handle_timeout(_TRef, {keepalive, StatVal},
{ok, reset_timer(alive_timer, NChannel)};
{error, timeout} ->
handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
end.
end;
handle_timeout(_TRef, retry_delivery,
Channel = #channel{conn_state = disconnected}) ->
{ok, Channel};
handle_timeout(_TRef, retry_delivery,
Channel = #channel{conn_state = asleep}) ->
{ok, reset_timer(retry_timer, Channel)};
handle_timeout(_TRef, retry_delivery,
Channel = #channel{session = Session}) ->
case emqx_session:retry(Session) of
{ok, NSession} ->
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
{ok, Publishes, Timeout, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
end;
handle_timeout(_TRef, expire_awaiting_rel,
Channel = #channel{conn_state = disconnected}) ->
{ok, Channel};
handle_timeout(_TRef, expire_awaiting_rel,
Channel = #channel{conn_state = asleep}) ->
{ok, reset_timer(await_timer, Channel)};
handle_timeout(_TRef, expire_awaiting_rel,
Channel = #channel{session = Session}) ->
case emqx_session:expire(awaiting_rel, Session) of
{ok, NSession} ->
{ok, clean_timer(await_timer, Channel#channel{session = NSession})};
{ok, Timeout, NSession} ->
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})}
end;
handle_timeout(_TRef, expire_asleep, Channel) ->
shutdown(asleep_timeout, Channel);
handle_timeout(_TRef, Msg, Channel) ->
?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Terminate
@ -1329,11 +1360,6 @@ cancel_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:without([Name], Timers)}
end.
ensure_timer([Name], Channel) ->
ensure_timer(Name, Channel);
ensure_timer([Name | Rest], Channel) ->
ensure_timer(Rest, ensure_timer(Name, Channel));
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
TRef = maps:get(Name, Timers, undefined),
Time = interval(Name, Channel),
@ -1350,6 +1376,9 @@ ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
reset_timer(Name, Channel) ->
ensure_timer(Name, clean_timer(Name, Channel)).
reset_timer(Name, Time, Channel) ->
ensure_timer(Name, Time, clean_timer(Name, Channel)).
clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
@ -1358,9 +1387,7 @@ interval(alive_timer, #channel{keepalive = KeepAlive}) ->
interval(retry_timer, #channel{session = Session}) ->
timer:seconds(emqx_session:info(retry_interval, Session));
interval(await_timer, #channel{session = Session}) ->
timer:seconds(emqx_session:info(await_rel_timeout, Session));
interval(expire_timer, #channel{conninfo = ConnInfo}) ->
timer:seconds(maps:get(expiry_interval, ConnInfo)).
timer:seconds(emqx_session:info(await_rel_timeout, Session)).
%%--------------------------------------------------------------------
%% Helper functions
@ -1382,3 +1409,14 @@ run_hooks_without_metrics(_Ctx, Name, Args, Acc) ->
metrics_inc(Name, #channel{ctx = Ctx}) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name).
returncode_name(?SN_RC_ACCEPTED) -> accepted;
returncode_name(?SN_RC_CONGESTION) -> rejected_congestion;
returncode_name(?SN_RC_INVALID_TOPIC_ID) -> rejected_invaild_topic_id;
returncode_name(?SN_RC_NOT_SUPPORTED) -> rejected_not_supported;
returncode_name(?SN_RC_NOT_AUTHORIZE) -> rejected_not_authorize;
returncode_name(?SN_RC_FAILED_SESSION) -> rejected_failed_open_session;
returncode_name(?SN_EXCEED_LIMITATION) -> rejected_exceed_limitation;
returncode_name(_) -> accepted.

View File

@ -79,7 +79,9 @@
%% Idle Timeout
idle_timeout :: integer(),
%% Idle Timer
idle_timer :: maybe(reference())
idle_timer :: maybe(reference()),
%% OOM Policy
oom_policy :: maybe(emqx_types:oom_policy())
}).
-type(state() :: #state{}).
@ -92,8 +94,6 @@
-define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}).
-define(DEFAULT_IDLE_TIMEOUT, 30000).
-define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304,
message_queue_len => 32000}).
-dialyzer({nowarn_function,
[ system_terminate/4
@ -101,6 +101,7 @@
, handle_msg/2
, shutdown/3
, stop/3
, parse_incoming/3
]}).
%% udp
@ -252,6 +253,7 @@ init_state(WrappedSock, Peername, Options) ->
GcState = emqx_gateway_utils:init_gc_state(Options),
StatsTimer = emqx_gateway_utils:stats_timer(Options),
IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
OomPolicy = emqx_gateway_utils:oom_policy(Options),
IdleTimer = start_timer(IdleTimeout, idle_timeout),
#state{socket = WrappedSock,
peername = Peername,
@ -265,13 +267,16 @@ init_state(WrappedSock, Peername, Options) ->
gc_state = GcState,
stats_timer = StatsTimer,
idle_timeout = IdleTimeout,
idle_timer = IdleTimer
idle_timer = IdleTimer,
oom_policy = OomPolicy
}.
run_loop(Parent, State = #state{socket = Socket,
peername = Peername}) ->
peername = Peername,
oom_policy = OomPolicy
}) ->
emqx_logger:set_metadata_peername(esockd:format(Peername)),
_ = emqx_misc:tune_heap_size(?DEFAULT_OOM_POLICY),
_ = emqx_misc:tune_heap_size(OomPolicy),
case activate_socket(State) of
{ok, NState} ->
hibernate(Parent, NState);
@ -713,8 +718,7 @@ run_gc(Stats, State = #state{gc_state = GcSt}) ->
State#state{gc_state = GcSt1}
end.
check_oom(State) ->
OomPolicy = ?DEFAULT_OOM_POLICY,
check_oom(State = #state{oom_policy = OomPolicy}) ->
case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
Shutdown = {shutdown, _Reason} ->
erlang:send(self(), Shutdown);

View File

@ -58,20 +58,6 @@
-define(SN_RC_FAILED_SESSION, 16#05).
-define(SN_EXCEED_LIMITATION, 16#06).
-define(SN_RC_NAME(Rc),
(begin
case Rc of
?SN_RC_ACCEPTED -> accepted;
?SN_RC_CONGESTION -> rejected_congestion;
?SN_RC_INVALID_TOPIC_ID -> rejected_invaild_topic_id;
?SN_RC_NOT_SUPPORTED -> rejected_not_supported;
?SN_RC_NOT_AUTHORIZE -> rejected_not_authorize;
?SN_RC_FAILED_SESSION -> rejected_failed_open_session;
?SN_EXCEED_LIMITATION -> rejected_exceed_limitation;
_ -> reserved
end
end)).
-define(QOS_NEG1, 3).
-type(mqtt_sn_return_code() :: ?SN_RC_ACCEPTED .. ?SN_EXCEED_LIMITATION).