From 30adfc18e684df3e9f470fc8943aa165e0cfd9da Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 1 Nov 2019 08:08:38 +0800 Subject: [PATCH] Remove 'gc_state' and 'pub_stats' from channel's state --- src/emqx_channel.erl | 127 ++++++++++--------------------------------- 1 file changed, 29 insertions(+), 98 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 99a595929..eedeacf06 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -40,15 +40,11 @@ , handle_in/2 , handle_out/2 , handle_call/2 - , handle_info/2 , handle_timeout/3 + , handle_info/2 , terminate/2 ]). --export([ recvd/2 - , sent/2 - ]). - %% export for ct -export([set_field/3]). @@ -75,14 +71,10 @@ topic_aliases :: maybe(map()), %% MQTT Topic Alias Maximum alias_maximum :: maybe(map()), - %% Publish Stats - pub_stats :: emqx_types:stats(), %% Timers timers :: #{atom() => disabled | maybe(reference())}, %% Conn State conn_state :: conn_state(), - %% GC State - gc_state :: maybe(emqx_gc:gc_state()), %% Takeover takeover :: boolean(), %% Resume @@ -103,7 +95,6 @@ -type(output() :: emqx_types:packet() | action() | [action()]). -define(TIMER_TABLE, #{ - stats_timer => emit_stats, alive_timer => keepalive, retry_timer => retry_delivery, await_timer => expire_awaiting_rel, @@ -113,8 +104,7 @@ -define(ATTR_KEYS, [conninfo, clientinfo, session, conn_state]). --define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases, - alias_maximum, gc_state]). +-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases, alias_maximum]). %%-------------------------------------------------------------------- %% Info, Attrs and Caps @@ -146,12 +136,8 @@ info(will_msg, #channel{will_msg = undefined}) -> undefined; info(will_msg, #channel{will_msg = WillMsg}) -> emqx_message:to_map(WillMsg); -info(pub_stats, #channel{pub_stats = PubStats}) -> - PubStats; info(timers, #channel{timers = Timers}) -> - Timers; -info(gc_state, #channel{gc_state = GcState}) -> - maybe_apply(fun emqx_gc:info/1, GcState). + Timers. %% @doc Get attrs of the channel. -spec(attrs(channel()) -> emqx_types:attrs()). @@ -164,10 +150,8 @@ attrs(session, #channel{session = Session}) -> attrs(Key, Channel) -> info(Key, Channel). -spec(stats(channel()) -> emqx_types:stats()). -stats(#channel{pub_stats = PubStats, session = undefined}) -> - maps:to_list(PubStats); -stats(#channel{pub_stats = PubStats, session = Session}) -> - maps:to_list(PubStats) ++ emqx_session:stats(Session). +stats(#channel{session = Session})-> + emqx_session:stats(Session). -spec(caps(channel()) -> emqx_types:caps()). caps(#channel{clientinfo = #{zone := Zone}}) -> @@ -194,7 +178,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) -> _ -> undefined end, Protocol = maps:get(protocol, ConnInfo, mqtt), - MountPoint = emqx_zone:get_env(Zone, mountpoint), + MountPoint = emqx_zone:mountpoint(Zone), ClientInfo = #{zone => Zone, protocol => Protocol, peerhost => PeerHost, @@ -205,16 +189,10 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) -> is_bridge => false, is_superuser => false }, - StatsTimer = case emqx_zone:enable_stats(Zone) of - true -> undefined; - false -> disabled - end, #channel{conninfo = ConnInfo, clientinfo = ClientInfo, - pub_stats = #{}, - timers = #{stats_timer => StatsTimer}, + timers = #{}, conn_state = idle, - gc_state = init_gc_state(Zone), takeover = false, resuming = false, pendings = [] @@ -223,17 +201,10 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) -> peer_cert_as_username(Options) -> proplists:get_value(peer_cert_as_username, Options). -init_gc_state(Zone) -> - maybe_apply(fun emqx_gc:init/1, emqx_zone:force_gc_policy(Zone)). - %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- --spec(recvd(pos_integer(), channel()) -> channel()). -recvd(Bytes, Channel) -> - ensure_timer(stats_timer, maybe_gc_and_check_oom(Bytes, Channel)). - -spec(handle_in(emqx_types:packet(), channel()) -> {ok, channel()} | {ok, output(), channel()} @@ -258,74 +229,69 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> end; handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> - NChannel = inc_pub_stats(publish_in, Channel), case emqx_packet:check(Packet) of - ok -> handle_publish(Packet, NChannel); + ok -> handle_publish(Packet, Channel); {error, ReasonCode} -> - handle_out(disconnect, ReasonCode, NChannel) + handle_out(disconnect, ReasonCode, Channel) end; handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> - NChannel = inc_pub_stats(puback_in, Channel), case emqx_session:puback(PacketId, Session) of {ok, Msg, Publishes, NSession} -> ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), - handle_out({publish, Publishes}, NChannel#channel{session = NSession}); + handle_out({publish, Publishes}, Channel#channel{session = NSession}); {ok, Msg, NSession} -> ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), - {ok, NChannel#channel{session = NSession}}; + {ok, Channel#channel{session = NSession}}; {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), ok = emqx_metrics:inc('packets.puback.inuse'), - {ok, NChannel}; + {ok, Channel}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.puback.missed'), - {ok, NChannel} + {ok, Channel} end; handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> - Channel1 = inc_pub_stats(pubrec_in, Channel), case emqx_session:pubrec(PacketId, Session) of {ok, Msg, NSession} -> ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), - NChannel = Channel1#channel{session = NSession}, + NChannel = Channel#channel{session = NSession}, handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> ?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]), ok = emqx_metrics:inc('packets.pubrec.inuse'), - handle_out(pubrel, {PacketId, RC}, Channel1); + handle_out(pubrel, {PacketId, RC}, Channel); {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]), ok = emqx_metrics:inc('packets.pubrec.missed'), - handle_out(pubrel, {PacketId, RC}, Channel1) + handle_out(pubrel, {PacketId, RC}, Channel) end; handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> - Channel1 = inc_pub_stats(pubrel_in, Channel), case emqx_session:pubrel(PacketId, Session) of {ok, NSession} -> - Channel2 = Channel1#channel{session = NSession}, - handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, Channel2); + NChannel = Channel#channel{session = NSession}, + handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel); {error, NotFound} -> ok = emqx_metrics:inc('packets.pubrel.missed'), ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]), - handle_out(pubcomp, {PacketId, NotFound}, Channel1) + handle_out(pubcomp, {PacketId, NotFound}, Channel) end; handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> - Channel1 = inc_pub_stats(pubcomp_in, Channel), case emqx_session:pubcomp(PacketId, Session) of {ok, Publishes, NSession} -> - handle_out({publish, Publishes}, Channel1#channel{session = NSession}); + handle_out({publish, Publishes}, Channel#channel{session = NSession}); {ok, NSession} -> - {ok, Channel1#channel{session = NSession}}; + {ok, Channel#channel{session = NSession}}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.pubcomp.missed'), - {ok, Channel1} + {ok, Channel} end; handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), @@ -422,11 +388,6 @@ process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart}, %% Process Publish %%-------------------------------------------------------------------- -inc_pub_stats(Key, Channel) -> inc_pub_stats(Key, 1, Channel). -inc_pub_stats(Key, I, Channel = #channel{pub_stats = PubStats}) -> - NPubStats = maps:update_with(Key, fun(V) -> V+I end, I, PubStats), - Channel#channel{pub_stats = NPubStats}. - handle_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), Channel = #channel{conninfo = #{proto_ver := ProtoVer}}) -> case pipeline([fun process_alias/2, @@ -540,10 +501,6 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel = %% Handle outgoing packet %%-------------------------------------------------------------------- --spec(sent(pos_integer(), channel()) -> channel()). -sent(Bytes, Channel) -> - ensure_timer(stats_timer, maybe_gc_and_check_oom(Bytes, Channel)). - -spec(handle_out(term(), channel()) -> {ok, channel()} | {ok, output(), channel()} @@ -578,8 +535,7 @@ handle_out({publish, Publishes}, Channel) when is_list(Publishes) -> {ok, _Ch} -> Acc end end, [], Publishes), - NChannel = inc_pub_stats(publish_out, length(Packets), Channel), - {ok, {outgoing, lists:reverse(Packets)}, NChannel}; + {ok, {outgoing, lists:reverse(Packets)}, Channel}; %% Ignore loop deliver handle_out({publish, _PacketId, #message{from = ClientId, @@ -635,16 +591,16 @@ handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn shutdown(Reason, ?CONNACK_PACKET(ReasonCode1), Channel); handle_out(puback, {PacketId, ReasonCode}, Channel) -> - {ok, ?PUBACK_PACKET(PacketId, ReasonCode), inc_pub_stats(puback_out, Channel)}; + {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel}; handle_out(pubrec, {PacketId, ReasonCode}, Channel) -> - {ok, ?PUBREC_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrec_out, Channel)}; + {ok, ?PUBREC_PACKET(PacketId, ReasonCode), Channel}; handle_out(pubrel, {PacketId, ReasonCode}, Channel) -> - {ok, ?PUBREL_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrel_out, Channel)}; + {ok, ?PUBREL_PACKET(PacketId, ReasonCode), Channel}; handle_out(pubcomp, {PacketId, ReasonCode}, Channel) -> - {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), inc_pub_stats(pubcomp_out, Channel)}; + {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel}; handle_out(suback, {PacketId, ReasonCodes}, Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) -> @@ -747,11 +703,6 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {ok, NChannel}; -handle_info({register, Attrs, Stats}, #channel{clientinfo = #{clientid := ClientId}}) -> - ok = emqx_cm:register_channel(ClientId), - emqx_cm:set_chan_attrs(ClientId, Attrs), - emqx_cm:set_chan_stats(ClientId, Stats); - handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) -> {ok, Channel}; @@ -788,12 +739,6 @@ handle_info(Info, Channel) -> -> {ok, channel()} | {ok, Result :: term(), channel()} | {shutdown, Reason :: term(), channel()}). -handle_timeout(TRef, {emit_stats, Stats}, - Channel = #channel{clientinfo = #{clientid := ClientId}, - timers = #{stats_timer := TRef}}) -> - ok = emqx_cm:set_chan_stats(ClientId, Stats), - {ok, clean_timer(stats_timer, Channel)}; - handle_timeout(TRef, {keepalive, StatVal}, Channel = #channel{keepalive = Keepalive, timers = #{alive_timer := TRef}}) -> @@ -873,8 +818,6 @@ reset_timer(Name, Time, Channel) -> clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. -interval(stats_timer, #channel{clientinfo = #{zone := Zone}}) -> - emqx_zone:get_env(Zone, idle_timeout, 30000); interval(alive_timer, #channel{keepalive = KeepAlive}) -> emqx_keepalive:info(interval, KeepAlive); interval(retry_timer, #channel{session = Session}) -> @@ -912,6 +855,7 @@ publish_will_msg(undefined) -> publish_will_msg(Msg) -> emqx_broker:publish(Msg). + %% @doc Enrich MQTT Connect Info. enrich_conninfo(#mqtt_packet_connect{ proto_name = ProtoName, @@ -1171,7 +1115,7 @@ ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) -> ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> - Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75), + Backoff = emqx_zone:keepalive_backoff(Zone), Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). @@ -1197,19 +1141,6 @@ is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) -> parse_topic_filters(TopicFilters) -> lists:map(fun emqx_topic:parse/1, TopicFilters). -%%-------------------------------------------------------------------- -%% Maybe GC and Check OOM -%%-------------------------------------------------------------------- - -maybe_gc_and_check_oom(_Oct, Channel = #channel{gc_state = undefined}) -> - Channel; -maybe_gc_and_check_oom(Oct, Channel = #channel{clientinfo = #{zone := Zone}, - gc_state = GCSt}) -> - {IsGC, GCSt1} = emqx_gc:run(1, Oct, GCSt), - IsGC andalso emqx_metrics:inc('channel.gc.cnt'), - IsGC andalso emqx_zone:check_oom(Zone, fun(Shutdown) -> self() ! Shutdown end), - Channel#channel{gc_state = GCSt1}. - %%-------------------------------------------------------------------- %% Helper functions %%--------------------------------------------------------------------