diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 9bbb63bf4..ea052f05f 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -27,11 +27,6 @@ -export([ info/1 , info/2 , attrs/1 - , stats/1 - ]). - --export([ client_id/1 - , session/1 ]). -export([ init/2 @@ -45,48 +40,75 @@ -export_type([proto_state/0]). -record(protocol, { - proto_name :: binary(), - proto_ver :: emqx_types:version(), - client :: emqx_types:client(), - session :: emqx_session:session(), - mountfun :: fun((emqx_topic:topic()) -> emqx_topic:topic()), - keepalive :: non_neg_integer(), - will_msg :: emqx_types:message(), - enable_acl :: boolean(), - is_bridge :: boolean(), - connected :: boolean(), - connected_at :: erlang:timestamp(), + proto_name :: binary(), + proto_ver :: emqx_types:version(), + client :: emqx_types:client(), + session :: emqx_session:session(), + mountfun :: fun((emqx_topic:topic()) -> emqx_topic:topic()), + keepalive :: non_neg_integer(), + will_msg :: emqx_types:message(), + enable_acl :: boolean(), + is_bridge :: boolean(), topic_aliases :: map(), alias_maximum :: map() - }). + }). -opaque(proto_state() :: #protocol{}). -info(#protocol{client = Client, session = Session}) -> - lists:append([maps:to_list(Client), emqx_session:info(Session)]). +-spec(info(proto_state()) -> emqx_types:infos()). +info(#protocol{proto_name = ProtoName, + proto_ver = ProtoVer, + client = Client, + session = Session, + keepalive = Keepalive, + will_msg = WillMsg, + enable_acl = EnableAcl, + is_bridge = IsBridge, + topic_aliases = Aliases}) -> + #{proto_name => ProtoName, + proto_ver => ProtoVer, + client => Client, + session => emqx_session:info(Session), + keepalive => Keepalive, + will_msg => WillMsg, + enable_acl => EnableAcl, + is_bridge => IsBridge, + topic_aliases => Aliases + }. -info(zone, #protocol{client = #{zone := Zone}}) -> - Zone; +-spec(info(atom(), proto_state()) -> term()). info(proto_name, #protocol{proto_name = ProtoName}) -> ProtoName; info(proto_ver, #protocol{proto_ver = ProtoVer}) -> ProtoVer; +info(client, #protocol{client = Client}) -> + Client; +info(zone, #protocol{client = #{zone := Zone}}) -> + Zone; +info(client_id, #protocol{client = #{client_id := ClientId}}) -> + ClientId; +info(session, #protocol{session = Session}) -> + Session; info(keepalive, #protocol{keepalive = Keepalive}) -> - Keepalive. + Keepalive; +info(is_bridge, #protocol{is_bridge = IsBridge}) -> + IsBridge; +info(topic_aliases, #protocol{topic_aliases = Aliases}) -> + Aliases. -attrs(#protocol{}) -> - #{}. - -stats(#protocol{}) -> - []. - --spec(client_id(proto_state()) -> emqx_types:client_id()). -client_id(#protocol{client = #{client_id := ClientId}}) -> - ClientId. - --spec(session(proto_state()) -> emqx_session:session()). -session(#protocol{session = Session}) -> - Session. +attrs(#protocol{proto_name = ProtoName, + proto_ver = ProtoVer, + client = Client, + session = Session, + keepalive = Keepalive, + is_bridge = IsBridge}) -> + #{proto_name => ProtoName, + proto_ver => ProtoVer, + client => Client, + session => emqx_session:attrs(Session), + keepalive => Keepalive, + is_bridge => IsBridge + }. -spec(init(map(), proplists:proplist()) -> proto_state()). init(ConnInfo, Options) -> @@ -105,8 +127,7 @@ init(ConnInfo, Options) -> #protocol{client = Client, mountfun = MountFun, enable_acl = EnableAcl, - is_bridge = false, - connected = false + is_bridge = false }. peer_cert_as_username(Peercert, Options) -> @@ -382,10 +403,7 @@ handle_connect(#mqtt_packet_connect{proto_name = ProtoName, case open_session(ConnPkt, PState) of {ok, Session, SP} -> PState1 = PState#protocol{client = Client1, - session = Session, - connected = true, - connected_at = os:timestamp() - }, + session = Session}, ok = emqx_cm:register_channel(ClientId), {ok, SP, PState1}; {error, Error} -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index e5978142b..95f8b3727 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -48,9 +48,12 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Session]"). + -export([init/1]). -export([ info/1 + , attrs/1 , stats/1 ]). @@ -80,12 +83,12 @@ %% Clean Start Flag clean_start :: boolean(), - %% Max subscriptions allowed - max_subscriptions :: non_neg_integer(), - %% Client’s Subscriptions. subscriptions :: map(), + %% Max subscriptions allowed + max_subscriptions :: non_neg_integer(), + %% Upgrade QoS? upgrade_qos :: boolean(), @@ -112,12 +115,12 @@ %% Inflight QoS2 messages received from client and waiting for pubrel. awaiting_rel :: map(), - %% Awaiting PUBREL Timer - await_rel_timer :: maybe(reference()), - %% Max Packets Awaiting PUBREL max_awaiting_rel :: non_neg_integer(), + %% Awaiting PUBREL Timer + await_rel_timer :: maybe(reference()), + %% Awaiting PUBREL Timeout await_rel_timeout :: timeout(), @@ -133,8 +136,6 @@ -opaque(session() :: #session{}). --logger_header("[Session]"). - -define(DEFAULT_BATCH_N, 1000). %%-------------------------------------------------------------------- @@ -170,10 +171,10 @@ init_mqueue(Zone) -> }). %%-------------------------------------------------------------------- -%% Info, Stats of Session +%% Infos of the session %%-------------------------------------------------------------------- --spec(info(session()) -> proplists:proplist()). +-spec(info(session()) -> emqx_types:infos()). info(#session{clean_start = CleanStart, max_subscriptions = MaxSubscriptions, subscriptions = Subscriptions, @@ -187,37 +188,58 @@ info(#session{clean_start = CleanStart, await_rel_timeout = AwaitRelTimeout, expiry_interval = ExpiryInterval, created_at = CreatedAt}) -> - [{clean_start, CleanStart}, - {max_subscriptions, MaxSubscriptions}, - {subscriptions, Subscriptions}, - {upgrade_qos, UpgradeQoS}, - {inflight, Inflight}, - {retry_interval, RetryInterval}, - {mqueue_len, emqx_mqueue:len(MQueue)}, - {next_pkt_id, PacketId}, - {awaiting_rel, AwaitingRel}, - {max_awaiting_rel, MaxAwaitingRel}, - {await_rel_timeout, AwaitRelTimeout}, - {expiry_interval, ExpiryInterval div 1000}, - {created_at, CreatedAt}]. + #{clean_start => CleanStart, + subscriptions => Subscriptions, + max_subscriptions => MaxSubscriptions, + upgrade_qos => UpgradeQoS, + inflight => emqx_inflight:size(Inflight), + max_inflight => emqx_inflight:max_size(Inflight), + retry_interval => RetryInterval, + mqueue_len => emqx_mqueue:len(MQueue), + max_mqueue => emqx_mqueue:max_len(MQueue), + mqueue_dropped => emqx_mqueue:dropped(MQueue), + next_pkt_id => PacketId, + awaiting_rel => maps:size(AwaitingRel), + max_awaiting_rel => MaxAwaitingRel, + await_rel_timeout => AwaitRelTimeout, + expiry_interval => ExpiryInterval div 1000, + created_at => CreatedAt + }. -%% @doc Get session stats. --spec(stats(session()) -> list({atom(), non_neg_integer()})). -stats(#session{max_subscriptions = MaxSubscriptions, - subscriptions = Subscriptions, +%%-------------------------------------------------------------------- +%% Attrs of the session +%%-------------------------------------------------------------------- + +-spec(attrs(session()) -> emqx_types:attrs()). +attrs(#session{clean_start = CleanStart, + expiry_interval = ExpiryInterval, + created_at = CreatedAt}) -> + #{clean_start => CleanStart, + expiry_interval => ExpiryInterval, + created_at => CreatedAt + }. + +%%-------------------------------------------------------------------- +%% Stats of the session +%%-------------------------------------------------------------------- + +%% @doc Get stats of the session. +-spec(stats(session()) -> emqx_types:stats()). +stats(#session{subscriptions = Subscriptions, + max_subscriptions = MaxSubscriptions, inflight = Inflight, mqueue = MQueue, - max_awaiting_rel = MaxAwaitingRel, - awaiting_rel = AwaitingRel}) -> - [{max_subscriptions, MaxSubscriptions}, - {subscriptions_count, maps:size(Subscriptions)}, - {max_inflight, emqx_inflight:max_size(Inflight)}, - {inflight_len, emqx_inflight:size(Inflight)}, - {max_mqueue, emqx_mqueue:max_len(MQueue)}, - {mqueue_len, emqx_mqueue:len(MQueue)}, - {mqueue_dropped, emqx_mqueue:dropped(MQueue)}, - {max_awaiting_rel, MaxAwaitingRel}, - {awaiting_rel_len, maps:size(AwaitingRel)}]. + awaiting_rel = AwaitingRel, + max_awaiting_rel = MaxAwaitingRel}) -> + [{subscriptions, maps:size(Subscriptions)}, + {max_subscriptions, MaxSubscriptions}, + {inflight, emqx_inflight:size(Inflight)}, + {max_inflight, emqx_inflight:max_size(Inflight)}, + {mqueue_len, emqx_mqueue:len(MQueue)}, + {max_mqueue, emqx_mqueue:max_len(MQueue)}, + {mqueue_dropped, emqx_mqueue:dropped(MQueue)}, + {awaiting_rel, maps:size(AwaitingRel)}, + {max_awaiting_rel, MaxAwaitingRel}]. %%-------------------------------------------------------------------- %% Client -> Broker: SUBSCRIBE diff --git a/src/emqx_types.erl b/src/emqx_types.erl index e4ec9ec75..490284039 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -72,6 +72,11 @@ , command/0 ]). +-export_type([ infos/0 + , attrs/0 + , stats/0 + ]). + -type(zone() :: emqx_zone:zone()). -type(ver() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5). -type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2). @@ -87,7 +92,6 @@ rap := 0 | 1, nl := 0 | 1, qos := qos(), - rc => reason_code(), share => binary(), atom() => term() }). @@ -143,3 +147,7 @@ -type(plugin() :: #plugin{}). -type(command() :: #command{}). +-type(infos() :: #{atom() => term()}). +-type(attrs() :: #{atom() => term()}). +-type(stats() :: list({atom(), non_neg_integer()})). +