From 6a99e1535fb7a396b8ec9012cda7b84df11687cb Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 21 Jul 2021 10:49:12 +0800 Subject: [PATCH] chore(gw): fix dialyzer warnings --- apps/emqx_gateway/src/emqx_gateway_cm.erl | 6 +- apps/emqx_gateway/src/emqx_gateway_ctx.erl | 7 +- apps/emqx_gateway/src/emqx_gateway_utils.erl | 4 +- .../src/mqttsn/emqx_sn_channel.erl | 104 ++++++++++++------ apps/emqx_gateway/src/mqttsn/emqx_sn_conn.erl | 20 ++-- .../src/mqttsn/include/emqx_sn.hrl | 14 --- 6 files changed, 95 insertions(+), 60 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index f8ca18c1a..270d7de4b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -209,8 +209,10 @@ connection_closed(Type, ClientId) -> -spec open_session(Type :: atom(), CleanStart :: boolean(), ClientInfo :: emqx_types:clientinfo(), ConnInfo :: emqx_types:conninfo(), - CreateSessionFun :: function()) - -> {ok, #{session := map(), + CreateSessionFun :: fun((emqx_types:clientinfo(), + emqx_types:conninfo()) -> Session + )) + -> {ok, #{session := Session, present := boolean(), pendings => list() }} diff --git a/apps/emqx_gateway/src/emqx_gateway_ctx.erl b/apps/emqx_gateway/src/emqx_gateway_ctx.erl index 3af6fde20..722d4e549 100644 --- a/apps/emqx_gateway/src/emqx_gateway_ctx.erl +++ b/apps/emqx_gateway/src/emqx_gateway_ctx.erl @@ -86,8 +86,11 @@ authenticate(_Ctx, ClientInfo) -> %% This function should be called after the client has authenticated %% successfully so that the client can be managed in the cluster. -spec open_session(context(), boolean(), emqx_types:clientinfo(), - emqx_types:conninfo(), function()) - -> {ok, #{session := any(), + emqx_types:conninfo(), + fun((emqx_types:clientinfo(), + emqx_types:conninfo()) -> Session) + ) + -> {ok, #{session := Session, present := boolean(), pendings => list() }} diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index b7e6658d1..b3292c895 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -43,6 +43,8 @@ -define(ACTIVE_N, 100). -define(DEFAULT_IDLE_TIMEOUT, 30000). +-define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304, + message_queue_len => 32000}). -spec childspec(supervisor:worker(), Mod :: atom()) -> supervisor:child_spec(). @@ -160,7 +162,7 @@ force_gc_policy(Options) -> -spec oom_policy(map()) -> emqx_types:oom_policy(). oom_policy(Options) -> - maps:get(force_shutdown_policy, Options). + maps:get(force_shutdown_policy, Options, ?DEFAULT_OOM_POLICY). -spec stats_timer(map()) -> undefined | disabled. stats_timer(Options) -> diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index c1588f364..23d5fc030 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -47,7 +47,7 @@ %% Context ctx :: emqx_gateway_ctx:context(), %% Registry - registry :: pid(), + registry :: emqx_sn_registry:registry(), %% Gateway Id gateway_id :: integer(), %% Enable QoS3 @@ -91,7 +91,6 @@ alive_timer => keepalive, retry_timer => retry_delivery, await_timer => expire_awaiting_rel, - expire_timer => expire_session, asleep_timer => expire_asleep }). @@ -190,7 +189,7 @@ set_conn_state(ConnState, Channel) -> enrich_conninfo(?SN_CONNECT_MSG(_Flags, _ProtoId, Duration, _ClientId), Channel = #channel{conninfo = ConnInfo}) -> NConnInfo = ConnInfo#{ proto_name => <<"MQTT-SN">> - , proto_ver => "1.2" + , proto_ver => <<"1.2">> , clean_start => true , keepalive => Duration , expiry_interval => 0 @@ -299,12 +298,7 @@ process_connect(Channel = #channel{ conninfo = ConnInfo, clientinfo = ClientInfo }) -> - SessFun = fun(_,_) -> - %% TODO: - emqx_session:init(#{zone => undefined}, - #{receive_maximum => 100} - ) - end, + SessFun = fun(_,_) -> emqx_session:init(#{max_inflight => 1}) end, case emqx_gateway_ctx:open_session( Ctx, true, @@ -336,7 +330,7 @@ ensure_keepalive_timer(Interval, Channel) -> %% Handle incoming packet %%-------------------------------------------------------------------- --spec handle_in(emqx_types:packet(), channel()) +-spec handle_in(emqx_types:packet() | {frame_error, any()}, channel()) -> {ok, channel()} | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()} @@ -357,16 +351,15 @@ handle_in(?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, TopicId, _MsgId, Data), Channel = #channel{conn_state = idle, registry = Registry}) -> %% FIXME: check enable_qos3 ?? - ClientId = undefined, TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of + true -> + <>; false -> emqx_sn_registry:lookup_topic( Registry, - ClientId, + ?NEG_QOS_CLIENT_ID, TopicId - ); - true -> - <> + ) end, _ = case TopicName =/= undefined of true -> @@ -381,7 +374,7 @@ handle_in(?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, ok end, ?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", - [ClientId]), + [?NEG_QOS_CLIENT_ID]), {ok, Channel}; handle_in(Pkt = #mqtt_sn_message{type = Type}, @@ -409,8 +402,8 @@ handle_in(?SN_WILLTOPIC_EMPTY_MSG, case auth_connect(fake_packet, Channel#channel{will_msg = undefined}) of {ok, NChannel} -> process_connect(ensure_connected(NChannel)); - {error, ReasonCode, NChannel} -> - handle_out(connack, ReasonCode, NChannel) + {error, ReasonCode} -> + handle_out(connack, ReasonCode, Channel) end; handle_in(?SN_WILLTOPIC_MSG(Flags, Topic), @@ -429,8 +422,8 @@ handle_in(?SN_WILLMSG_MSG(Payload), case auth_connect(fake_packet, Channel#channel{will_msg = NWillMsg}) of {ok, NChannel} -> process_connect(ensure_connected(NChannel)); - {error, ReasonCode, NChannel} -> - handle_out(connack, ReasonCode, NChannel) + {error, ReasonCode} -> + handle_out(connack, ReasonCode, Channel) end; handle_in(Packet = ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId), @@ -892,9 +885,9 @@ do_unsubscribe(TopicName, session = Session, clientinfo = ClientInfo = #{mountpoint := Mountpoint}}) -> - SubOpts = #{}, NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName), - case emqx_session:unsubscribe(ClientInfo, NTopicName, SubOpts, Session) of + case emqx_session:unsubscribe(ClientInfo, NTopicName, + ?DEFAULT_SUBOPTS, Session) of {ok, NSession} -> {ok, Channel#channel{session = NSession}}; {error, ?RC_NO_SUBSCRIPTION_EXISTED} -> @@ -943,7 +936,7 @@ asleep(Duration, Channel = #channel{conn_state = connected}) -> handle_out(connack, ?SN_RC_ACCEPTED, Channel = #channel{ctx = Ctx, conninfo = ConnInfo}) -> _ = run_hooks(Ctx, 'client.connack', - [ConnInfo, ?SN_RC_NAME(?SN_RC_ACCEPTED)], + [ConnInfo, returncode_name(?SN_RC_ACCEPTED)], #{} ), return_connack(?SN_CONNACK_MSG(?SN_RC_ACCEPTED), @@ -951,7 +944,7 @@ handle_out(connack, ?SN_RC_ACCEPTED, handle_out(connack, ReasonCode, Channel = #channel{ctx = Ctx, conninfo = ConnInfo}) -> - Reason = ?SN_RC_NAME(ReasonCode), + Reason = returncode_name(ReasonCode), _ = run_hooks(Ctx, 'client.connack', [ConnInfo, Reason], #{}), AckPacket = ?SN_CONNACK_MSG(ReasonCode), shutdown(Reason, AckPacket, Channel); @@ -1281,7 +1274,45 @@ handle_timeout(_TRef, {keepalive, StatVal}, {ok, reset_timer(alive_timer, NChannel)}; {error, timeout} -> handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel) - end. + end; + +handle_timeout(_TRef, retry_delivery, + Channel = #channel{conn_state = disconnected}) -> + {ok, Channel}; +handle_timeout(_TRef, retry_delivery, + Channel = #channel{conn_state = asleep}) -> + {ok, reset_timer(retry_timer, Channel)}; +handle_timeout(_TRef, retry_delivery, + Channel = #channel{session = Session}) -> + case emqx_session:retry(Session) of + {ok, NSession} -> + {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; + {ok, Publishes, Timeout, NSession} -> + NChannel = Channel#channel{session = NSession}, + handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) + end; + +handle_timeout(_TRef, expire_awaiting_rel, + Channel = #channel{conn_state = disconnected}) -> + {ok, Channel}; +handle_timeout(_TRef, expire_awaiting_rel, + Channel = #channel{conn_state = asleep}) -> + {ok, reset_timer(await_timer, Channel)}; +handle_timeout(_TRef, expire_awaiting_rel, + Channel = #channel{session = Session}) -> + case emqx_session:expire(awaiting_rel, Session) of + {ok, NSession} -> + {ok, clean_timer(await_timer, Channel#channel{session = NSession})}; + {ok, Timeout, NSession} -> + {ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})} + end; + +handle_timeout(_TRef, expire_asleep, Channel) -> + shutdown(asleep_timeout, Channel); + +handle_timeout(_TRef, Msg, Channel) -> + ?LOG(error, "Unexpected timeout: ~p~n", [Msg]), + {ok, Channel}. %%-------------------------------------------------------------------- %% Terminate @@ -1329,11 +1360,6 @@ cancel_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:without([Name], Timers)} end. -ensure_timer([Name], Channel) -> - ensure_timer(Name, Channel); -ensure_timer([Name | Rest], Channel) -> - ensure_timer(Rest, ensure_timer(Name, Channel)); - ensure_timer(Name, Channel = #channel{timers = Timers}) -> TRef = maps:get(Name, Timers, undefined), Time = interval(Name, Channel), @@ -1350,6 +1376,9 @@ ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> reset_timer(Name, Channel) -> ensure_timer(Name, clean_timer(Name, Channel)). +reset_timer(Name, Time, Channel) -> + ensure_timer(Name, Time, clean_timer(Name, Channel)). + clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. @@ -1358,9 +1387,7 @@ interval(alive_timer, #channel{keepalive = KeepAlive}) -> interval(retry_timer, #channel{session = Session}) -> timer:seconds(emqx_session:info(retry_interval, Session)); interval(await_timer, #channel{session = Session}) -> - timer:seconds(emqx_session:info(await_rel_timeout, Session)); -interval(expire_timer, #channel{conninfo = ConnInfo}) -> - timer:seconds(maps:get(expiry_interval, ConnInfo)). + timer:seconds(emqx_session:info(await_rel_timeout, Session)). %%-------------------------------------------------------------------- %% Helper functions @@ -1382,3 +1409,14 @@ run_hooks_without_metrics(_Ctx, Name, Args, Acc) -> metrics_inc(Name, #channel{ctx = Ctx}) -> emqx_gateway_ctx:metrics_inc(Ctx, Name). + +returncode_name(?SN_RC_ACCEPTED) -> accepted; +returncode_name(?SN_RC_CONGESTION) -> rejected_congestion; +returncode_name(?SN_RC_INVALID_TOPIC_ID) -> rejected_invaild_topic_id; +returncode_name(?SN_RC_NOT_SUPPORTED) -> rejected_not_supported; +returncode_name(?SN_RC_NOT_AUTHORIZE) -> rejected_not_authorize; +returncode_name(?SN_RC_FAILED_SESSION) -> rejected_failed_open_session; +returncode_name(?SN_EXCEED_LIMITATION) -> rejected_exceed_limitation; +returncode_name(_) -> accepted. + + diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_conn.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_conn.erl index a8780b5e0..cf2ac4011 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_conn.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_conn.erl @@ -79,7 +79,9 @@ %% Idle Timeout idle_timeout :: integer(), %% Idle Timer - idle_timer :: maybe(reference()) + idle_timer :: maybe(reference()), + %% OOM Policy + oom_policy :: maybe(emqx_types:oom_policy()) }). -type(state() :: #state{}). @@ -92,8 +94,6 @@ -define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}). -define(DEFAULT_IDLE_TIMEOUT, 30000). --define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304, - message_queue_len => 32000}). -dialyzer({nowarn_function, [ system_terminate/4 @@ -101,6 +101,7 @@ , handle_msg/2 , shutdown/3 , stop/3 + , parse_incoming/3 ]}). %% udp @@ -252,6 +253,7 @@ init_state(WrappedSock, Peername, Options) -> GcState = emqx_gateway_utils:init_gc_state(Options), StatsTimer = emqx_gateway_utils:stats_timer(Options), IdleTimeout = emqx_gateway_utils:idle_timeout(Options), + OomPolicy = emqx_gateway_utils:oom_policy(Options), IdleTimer = start_timer(IdleTimeout, idle_timeout), #state{socket = WrappedSock, peername = Peername, @@ -265,13 +267,16 @@ init_state(WrappedSock, Peername, Options) -> gc_state = GcState, stats_timer = StatsTimer, idle_timeout = IdleTimeout, - idle_timer = IdleTimer + idle_timer = IdleTimer, + oom_policy = OomPolicy }. run_loop(Parent, State = #state{socket = Socket, - peername = Peername}) -> + peername = Peername, + oom_policy = OomPolicy + }) -> emqx_logger:set_metadata_peername(esockd:format(Peername)), - _ = emqx_misc:tune_heap_size(?DEFAULT_OOM_POLICY), + _ = emqx_misc:tune_heap_size(OomPolicy), case activate_socket(State) of {ok, NState} -> hibernate(Parent, NState); @@ -713,8 +718,7 @@ run_gc(Stats, State = #state{gc_state = GcSt}) -> State#state{gc_state = GcSt1} end. -check_oom(State) -> - OomPolicy = ?DEFAULT_OOM_POLICY, +check_oom(State = #state{oom_policy = OomPolicy}) -> case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of Shutdown = {shutdown, _Reason} -> erlang:send(self(), Shutdown); diff --git a/apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl b/apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl index c7d9ce6b7..ca0433011 100644 --- a/apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl +++ b/apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl @@ -58,20 +58,6 @@ -define(SN_RC_FAILED_SESSION, 16#05). -define(SN_EXCEED_LIMITATION, 16#06). --define(SN_RC_NAME(Rc), - (begin - case Rc of - ?SN_RC_ACCEPTED -> accepted; - ?SN_RC_CONGESTION -> rejected_congestion; - ?SN_RC_INVALID_TOPIC_ID -> rejected_invaild_topic_id; - ?SN_RC_NOT_SUPPORTED -> rejected_not_supported; - ?SN_RC_NOT_AUTHORIZE -> rejected_not_authorize; - ?SN_RC_FAILED_SESSION -> rejected_failed_open_session; - ?SN_EXCEED_LIMITATION -> rejected_exceed_limitation; - _ -> reserved - end - end)). - -define(QOS_NEG1, 3). -type(mqtt_sn_return_code() :: ?SN_RC_ACCEPTED .. ?SN_EXCEED_LIMITATION).