Remove 'gc_state' and 'pub_stats' from channel's state

This commit is contained in:
Feng Lee 2019-11-01 08:08:38 +08:00
parent 605a03453e
commit 30adfc18e6
1 changed files with 29 additions and 98 deletions

View File

@ -40,15 +40,11 @@
, handle_in/2 , handle_in/2
, handle_out/2 , handle_out/2
, handle_call/2 , handle_call/2
, handle_info/2
, handle_timeout/3 , handle_timeout/3
, handle_info/2
, terminate/2 , terminate/2
]). ]).
-export([ recvd/2
, sent/2
]).
%% export for ct %% export for ct
-export([set_field/3]). -export([set_field/3]).
@ -75,14 +71,10 @@
topic_aliases :: maybe(map()), topic_aliases :: maybe(map()),
%% MQTT Topic Alias Maximum %% MQTT Topic Alias Maximum
alias_maximum :: maybe(map()), alias_maximum :: maybe(map()),
%% Publish Stats
pub_stats :: emqx_types:stats(),
%% Timers %% Timers
timers :: #{atom() => disabled | maybe(reference())}, timers :: #{atom() => disabled | maybe(reference())},
%% Conn State %% Conn State
conn_state :: conn_state(), conn_state :: conn_state(),
%% GC State
gc_state :: maybe(emqx_gc:gc_state()),
%% Takeover %% Takeover
takeover :: boolean(), takeover :: boolean(),
%% Resume %% Resume
@ -103,7 +95,6 @@
-type(output() :: emqx_types:packet() | action() | [action()]). -type(output() :: emqx_types:packet() | action() | [action()]).
-define(TIMER_TABLE, #{ -define(TIMER_TABLE, #{
stats_timer => emit_stats,
alive_timer => keepalive, alive_timer => keepalive,
retry_timer => retry_delivery, retry_timer => retry_delivery,
await_timer => expire_awaiting_rel, await_timer => expire_awaiting_rel,
@ -113,8 +104,7 @@
-define(ATTR_KEYS, [conninfo, clientinfo, session, conn_state]). -define(ATTR_KEYS, [conninfo, clientinfo, session, conn_state]).
-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases, -define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases, alias_maximum]).
alias_maximum, gc_state]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Info, Attrs and Caps %% Info, Attrs and Caps
@ -146,12 +136,8 @@ info(will_msg, #channel{will_msg = undefined}) ->
undefined; undefined;
info(will_msg, #channel{will_msg = WillMsg}) -> info(will_msg, #channel{will_msg = WillMsg}) ->
emqx_message:to_map(WillMsg); emqx_message:to_map(WillMsg);
info(pub_stats, #channel{pub_stats = PubStats}) ->
PubStats;
info(timers, #channel{timers = Timers}) -> info(timers, #channel{timers = Timers}) ->
Timers; Timers.
info(gc_state, #channel{gc_state = GcState}) ->
maybe_apply(fun emqx_gc:info/1, GcState).
%% @doc Get attrs of the channel. %% @doc Get attrs of the channel.
-spec(attrs(channel()) -> emqx_types:attrs()). -spec(attrs(channel()) -> emqx_types:attrs()).
@ -164,10 +150,8 @@ attrs(session, #channel{session = Session}) ->
attrs(Key, Channel) -> info(Key, Channel). attrs(Key, Channel) -> info(Key, Channel).
-spec(stats(channel()) -> emqx_types:stats()). -spec(stats(channel()) -> emqx_types:stats()).
stats(#channel{pub_stats = PubStats, session = undefined}) -> stats(#channel{session = Session})->
maps:to_list(PubStats); emqx_session:stats(Session).
stats(#channel{pub_stats = PubStats, session = Session}) ->
maps:to_list(PubStats) ++ emqx_session:stats(Session).
-spec(caps(channel()) -> emqx_types:caps()). -spec(caps(channel()) -> emqx_types:caps()).
caps(#channel{clientinfo = #{zone := Zone}}) -> caps(#channel{clientinfo = #{zone := Zone}}) ->
@ -194,7 +178,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) ->
_ -> undefined _ -> undefined
end, end,
Protocol = maps:get(protocol, ConnInfo, mqtt), Protocol = maps:get(protocol, ConnInfo, mqtt),
MountPoint = emqx_zone:get_env(Zone, mountpoint), MountPoint = emqx_zone:mountpoint(Zone),
ClientInfo = #{zone => Zone, ClientInfo = #{zone => Zone,
protocol => Protocol, protocol => Protocol,
peerhost => PeerHost, peerhost => PeerHost,
@ -205,16 +189,10 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) ->
is_bridge => false, is_bridge => false,
is_superuser => false is_superuser => false
}, },
StatsTimer = case emqx_zone:enable_stats(Zone) of
true -> undefined;
false -> disabled
end,
#channel{conninfo = ConnInfo, #channel{conninfo = ConnInfo,
clientinfo = ClientInfo, clientinfo = ClientInfo,
pub_stats = #{}, timers = #{},
timers = #{stats_timer => StatsTimer},
conn_state = idle, conn_state = idle,
gc_state = init_gc_state(Zone),
takeover = false, takeover = false,
resuming = false, resuming = false,
pendings = [] pendings = []
@ -223,17 +201,10 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) ->
peer_cert_as_username(Options) -> peer_cert_as_username(Options) ->
proplists:get_value(peer_cert_as_username, Options). proplists:get_value(peer_cert_as_username, Options).
init_gc_state(Zone) ->
maybe_apply(fun emqx_gc:init/1, emqx_zone:force_gc_policy(Zone)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle incoming packet %% Handle incoming packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(recvd(pos_integer(), channel()) -> channel()).
recvd(Bytes, Channel) ->
ensure_timer(stats_timer, maybe_gc_and_check_oom(Bytes, Channel)).
-spec(handle_in(emqx_types:packet(), channel()) -spec(handle_in(emqx_types:packet(), channel())
-> {ok, channel()} -> {ok, channel()}
| {ok, output(), channel()} | {ok, output(), channel()}
@ -258,74 +229,69 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
end; end;
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
NChannel = inc_pub_stats(publish_in, Channel),
case emqx_packet:check(Packet) of case emqx_packet:check(Packet) of
ok -> handle_publish(Packet, NChannel); ok -> handle_publish(Packet, Channel);
{error, ReasonCode} -> {error, ReasonCode} ->
handle_out(disconnect, ReasonCode, NChannel) handle_out(disconnect, ReasonCode, Channel)
end; end;
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
Channel = #channel{clientinfo = ClientInfo, session = Session}) -> Channel = #channel{clientinfo = ClientInfo, session = Session}) ->
NChannel = inc_pub_stats(puback_in, Channel),
case emqx_session:puback(PacketId, Session) of case emqx_session:puback(PacketId, Session) of
{ok, Msg, Publishes, NSession} -> {ok, Msg, Publishes, NSession} ->
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
handle_out({publish, Publishes}, NChannel#channel{session = NSession}); handle_out({publish, Publishes}, Channel#channel{session = NSession});
{ok, Msg, NSession} -> {ok, Msg, NSession} ->
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
{ok, NChannel#channel{session = NSession}}; {ok, Channel#channel{session = NSession}};
{error, ?RC_PACKET_IDENTIFIER_IN_USE} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
ok = emqx_metrics:inc('packets.puback.inuse'), ok = emqx_metrics:inc('packets.puback.inuse'),
{ok, NChannel}; {ok, Channel};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
ok = emqx_metrics:inc('packets.puback.missed'), ok = emqx_metrics:inc('packets.puback.missed'),
{ok, NChannel} {ok, Channel}
end; end;
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), handle_in(?PUBREC_PACKET(PacketId, _ReasonCode),
Channel = #channel{clientinfo = ClientInfo, session = Session}) -> Channel = #channel{clientinfo = ClientInfo, session = Session}) ->
Channel1 = inc_pub_stats(pubrec_in, Channel),
case emqx_session:pubrec(PacketId, Session) of case emqx_session:pubrec(PacketId, Session) of
{ok, Msg, NSession} -> {ok, Msg, NSession} ->
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
NChannel = Channel1#channel{session = NSession}, NChannel = Channel#channel{session = NSession},
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel); handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]), ?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]),
ok = emqx_metrics:inc('packets.pubrec.inuse'), ok = emqx_metrics:inc('packets.pubrec.inuse'),
handle_out(pubrel, {PacketId, RC}, Channel1); handle_out(pubrel, {PacketId, RC}, Channel);
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBREC ~w is not found.", [PacketId]), ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]),
ok = emqx_metrics:inc('packets.pubrec.missed'), ok = emqx_metrics:inc('packets.pubrec.missed'),
handle_out(pubrel, {PacketId, RC}, Channel1) handle_out(pubrel, {PacketId, RC}, Channel)
end; end;
handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
Channel1 = inc_pub_stats(pubrel_in, Channel),
case emqx_session:pubrel(PacketId, Session) of case emqx_session:pubrel(PacketId, Session) of
{ok, NSession} -> {ok, NSession} ->
Channel2 = Channel1#channel{session = NSession}, NChannel = Channel#channel{session = NSession},
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, Channel2); handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
{error, NotFound} -> {error, NotFound} ->
ok = emqx_metrics:inc('packets.pubrel.missed'), ok = emqx_metrics:inc('packets.pubrel.missed'),
?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]), ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]),
handle_out(pubcomp, {PacketId, NotFound}, Channel1) handle_out(pubcomp, {PacketId, NotFound}, Channel)
end; end;
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
Channel1 = inc_pub_stats(pubcomp_in, Channel),
case emqx_session:pubcomp(PacketId, Session) of case emqx_session:pubcomp(PacketId, Session) of
{ok, Publishes, NSession} -> {ok, Publishes, NSession} ->
handle_out({publish, Publishes}, Channel1#channel{session = NSession}); handle_out({publish, Publishes}, Channel#channel{session = NSession});
{ok, NSession} -> {ok, NSession} ->
{ok, Channel1#channel{session = NSession}}; {ok, Channel#channel{session = NSession}};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]), ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]),
ok = emqx_metrics:inc('packets.pubcomp.missed'), ok = emqx_metrics:inc('packets.pubcomp.missed'),
{ok, Channel1} {ok, Channel}
end; end;
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
@ -422,11 +388,6 @@ process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
%% Process Publish %% Process Publish
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
inc_pub_stats(Key, Channel) -> inc_pub_stats(Key, 1, Channel).
inc_pub_stats(Key, I, Channel = #channel{pub_stats = PubStats}) ->
NPubStats = maps:update_with(Key, fun(V) -> V+I end, I, PubStats),
Channel#channel{pub_stats = NPubStats}.
handle_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), handle_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId),
Channel = #channel{conninfo = #{proto_ver := ProtoVer}}) -> Channel = #channel{conninfo = #{proto_ver := ProtoVer}}) ->
case pipeline([fun process_alias/2, case pipeline([fun process_alias/2,
@ -540,10 +501,6 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
%% Handle outgoing packet %% Handle outgoing packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(sent(pos_integer(), channel()) -> channel()).
sent(Bytes, Channel) ->
ensure_timer(stats_timer, maybe_gc_and_check_oom(Bytes, Channel)).
-spec(handle_out(term(), channel()) -spec(handle_out(term(), channel())
-> {ok, channel()} -> {ok, channel()}
| {ok, output(), channel()} | {ok, output(), channel()}
@ -578,8 +535,7 @@ handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
{ok, _Ch} -> Acc {ok, _Ch} -> Acc
end end
end, [], Publishes), end, [], Publishes),
NChannel = inc_pub_stats(publish_out, length(Packets), Channel), {ok, {outgoing, lists:reverse(Packets)}, Channel};
{ok, {outgoing, lists:reverse(Packets)}, NChannel};
%% Ignore loop deliver %% Ignore loop deliver
handle_out({publish, _PacketId, #message{from = ClientId, handle_out({publish, _PacketId, #message{from = ClientId,
@ -635,16 +591,16 @@ handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn
shutdown(Reason, ?CONNACK_PACKET(ReasonCode1), Channel); shutdown(Reason, ?CONNACK_PACKET(ReasonCode1), Channel);
handle_out(puback, {PacketId, ReasonCode}, Channel) -> handle_out(puback, {PacketId, ReasonCode}, Channel) ->
{ok, ?PUBACK_PACKET(PacketId, ReasonCode), inc_pub_stats(puback_out, Channel)}; {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel};
handle_out(pubrec, {PacketId, ReasonCode}, Channel) -> handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
{ok, ?PUBREC_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrec_out, Channel)}; {ok, ?PUBREC_PACKET(PacketId, ReasonCode), Channel};
handle_out(pubrel, {PacketId, ReasonCode}, Channel) -> handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
{ok, ?PUBREL_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrel_out, Channel)}; {ok, ?PUBREL_PACKET(PacketId, ReasonCode), Channel};
handle_out(pubcomp, {PacketId, ReasonCode}, Channel) -> handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
{ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), inc_pub_stats(pubcomp_out, Channel)}; {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel};
handle_out(suback, {PacketId, ReasonCodes}, handle_out(suback, {PacketId, ReasonCodes},
Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) -> Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
@ -747,11 +703,6 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
{ok, NChannel}; {ok, NChannel};
handle_info({register, Attrs, Stats}, #channel{clientinfo = #{clientid := ClientId}}) ->
ok = emqx_cm:register_channel(ClientId),
emqx_cm:set_chan_attrs(ClientId, Attrs),
emqx_cm:set_chan_stats(ClientId, Stats);
handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) -> handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) ->
{ok, Channel}; {ok, Channel};
@ -788,12 +739,6 @@ handle_info(Info, Channel) ->
-> {ok, channel()} -> {ok, channel()}
| {ok, Result :: term(), channel()} | {ok, Result :: term(), channel()}
| {shutdown, Reason :: term(), channel()}). | {shutdown, Reason :: term(), channel()}).
handle_timeout(TRef, {emit_stats, Stats},
Channel = #channel{clientinfo = #{clientid := ClientId},
timers = #{stats_timer := TRef}}) ->
ok = emqx_cm:set_chan_stats(ClientId, Stats),
{ok, clean_timer(stats_timer, Channel)};
handle_timeout(TRef, {keepalive, StatVal}, handle_timeout(TRef, {keepalive, StatVal},
Channel = #channel{keepalive = Keepalive, Channel = #channel{keepalive = Keepalive,
timers = #{alive_timer := TRef}}) -> timers = #{alive_timer := TRef}}) ->
@ -873,8 +818,6 @@ reset_timer(Name, Time, Channel) ->
clean_timer(Name, Channel = #channel{timers = Timers}) -> clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}. Channel#channel{timers = maps:remove(Name, Timers)}.
interval(stats_timer, #channel{clientinfo = #{zone := Zone}}) ->
emqx_zone:get_env(Zone, idle_timeout, 30000);
interval(alive_timer, #channel{keepalive = KeepAlive}) -> interval(alive_timer, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive); emqx_keepalive:info(interval, KeepAlive);
interval(retry_timer, #channel{session = Session}) -> interval(retry_timer, #channel{session = Session}) ->
@ -912,6 +855,7 @@ publish_will_msg(undefined) ->
publish_will_msg(Msg) -> publish_will_msg(Msg) ->
emqx_broker:publish(Msg). emqx_broker:publish(Msg).
%% @doc Enrich MQTT Connect Info. %% @doc Enrich MQTT Connect Info.
enrich_conninfo(#mqtt_packet_connect{ enrich_conninfo(#mqtt_packet_connect{
proto_name = ProtoName, proto_name = ProtoName,
@ -1171,7 +1115,7 @@ ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) ->
ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(0, Channel) -> Channel;
ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75), Backoff = emqx_zone:keepalive_backoff(Zone),
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
@ -1197,19 +1141,6 @@ is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
parse_topic_filters(TopicFilters) -> parse_topic_filters(TopicFilters) ->
lists:map(fun emqx_topic:parse/1, TopicFilters). lists:map(fun emqx_topic:parse/1, TopicFilters).
%%--------------------------------------------------------------------
%% Maybe GC and Check OOM
%%--------------------------------------------------------------------
maybe_gc_and_check_oom(_Oct, Channel = #channel{gc_state = undefined}) ->
Channel;
maybe_gc_and_check_oom(Oct, Channel = #channel{clientinfo = #{zone := Zone},
gc_state = GCSt}) ->
{IsGC, GCSt1} = emqx_gc:run(1, Oct, GCSt),
IsGC andalso emqx_metrics:inc('channel.gc.cnt'),
IsGC andalso emqx_zone:check_oom(Zone, fun(Shutdown) -> self() ! Shutdown end),
Channel#channel{gc_state = GCSt1}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------