Improve the 'info/1', 'attrs/1' and 'stats/1' APIs

This commit is contained in:
Feng Lee 2019-07-25 09:26:36 +08:00
parent b4a0a1c228
commit 64148ac0e8
3 changed files with 128 additions and 80 deletions

View File

@ -27,11 +27,6 @@
-export([ info/1
, info/2
, attrs/1
, stats/1
]).
-export([ client_id/1
, session/1
]).
-export([ init/2
@ -54,39 +49,66 @@
will_msg :: emqx_types:message(),
enable_acl :: boolean(),
is_bridge :: boolean(),
connected :: boolean(),
connected_at :: erlang:timestamp(),
topic_aliases :: map(),
alias_maximum :: map()
}).
-opaque(proto_state() :: #protocol{}).
info(#protocol{client = Client, session = Session}) ->
lists:append([maps:to_list(Client), emqx_session:info(Session)]).
-spec(info(proto_state()) -> emqx_types:infos()).
info(#protocol{proto_name = ProtoName,
proto_ver = ProtoVer,
client = Client,
session = Session,
keepalive = Keepalive,
will_msg = WillMsg,
enable_acl = EnableAcl,
is_bridge = IsBridge,
topic_aliases = Aliases}) ->
#{proto_name => ProtoName,
proto_ver => ProtoVer,
client => Client,
session => emqx_session:info(Session),
keepalive => Keepalive,
will_msg => WillMsg,
enable_acl => EnableAcl,
is_bridge => IsBridge,
topic_aliases => Aliases
}.
info(zone, #protocol{client = #{zone := Zone}}) ->
Zone;
-spec(info(atom(), proto_state()) -> term()).
info(proto_name, #protocol{proto_name = ProtoName}) ->
ProtoName;
info(proto_ver, #protocol{proto_ver = ProtoVer}) ->
ProtoVer;
info(client, #protocol{client = Client}) ->
Client;
info(zone, #protocol{client = #{zone := Zone}}) ->
Zone;
info(client_id, #protocol{client = #{client_id := ClientId}}) ->
ClientId;
info(session, #protocol{session = Session}) ->
Session;
info(keepalive, #protocol{keepalive = Keepalive}) ->
Keepalive.
Keepalive;
info(is_bridge, #protocol{is_bridge = IsBridge}) ->
IsBridge;
info(topic_aliases, #protocol{topic_aliases = Aliases}) ->
Aliases.
attrs(#protocol{}) ->
#{}.
stats(#protocol{}) ->
[].
-spec(client_id(proto_state()) -> emqx_types:client_id()).
client_id(#protocol{client = #{client_id := ClientId}}) ->
ClientId.
-spec(session(proto_state()) -> emqx_session:session()).
session(#protocol{session = Session}) ->
Session.
attrs(#protocol{proto_name = ProtoName,
proto_ver = ProtoVer,
client = Client,
session = Session,
keepalive = Keepalive,
is_bridge = IsBridge}) ->
#{proto_name => ProtoName,
proto_ver => ProtoVer,
client => Client,
session => emqx_session:attrs(Session),
keepalive => Keepalive,
is_bridge => IsBridge
}.
-spec(init(map(), proplists:proplist()) -> proto_state()).
init(ConnInfo, Options) ->
@ -105,8 +127,7 @@ init(ConnInfo, Options) ->
#protocol{client = Client,
mountfun = MountFun,
enable_acl = EnableAcl,
is_bridge = false,
connected = false
is_bridge = false
}.
peer_cert_as_username(Peercert, Options) ->
@ -382,10 +403,7 @@ handle_connect(#mqtt_packet_connect{proto_name = ProtoName,
case open_session(ConnPkt, PState) of
{ok, Session, SP} ->
PState1 = PState#protocol{client = Client1,
session = Session,
connected = true,
connected_at = os:timestamp()
},
session = Session},
ok = emqx_cm:register_channel(ClientId),
{ok, SP, PState1};
{error, Error} ->

View File

@ -48,9 +48,12 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Session]").
-export([init/1]).
-export([ info/1
, attrs/1
, stats/1
]).
@ -80,12 +83,12 @@
%% Clean Start Flag
clean_start :: boolean(),
%% Max subscriptions allowed
max_subscriptions :: non_neg_integer(),
%% Clients Subscriptions.
subscriptions :: map(),
%% Max subscriptions allowed
max_subscriptions :: non_neg_integer(),
%% Upgrade QoS?
upgrade_qos :: boolean(),
@ -112,12 +115,12 @@
%% Inflight QoS2 messages received from client and waiting for pubrel.
awaiting_rel :: map(),
%% Awaiting PUBREL Timer
await_rel_timer :: maybe(reference()),
%% Max Packets Awaiting PUBREL
max_awaiting_rel :: non_neg_integer(),
%% Awaiting PUBREL Timer
await_rel_timer :: maybe(reference()),
%% Awaiting PUBREL Timeout
await_rel_timeout :: timeout(),
@ -133,8 +136,6 @@
-opaque(session() :: #session{}).
-logger_header("[Session]").
-define(DEFAULT_BATCH_N, 1000).
%%--------------------------------------------------------------------
@ -170,10 +171,10 @@ init_mqueue(Zone) ->
}).
%%--------------------------------------------------------------------
%% Info, Stats of Session
%% Infos of the session
%%--------------------------------------------------------------------
-spec(info(session()) -> proplists:proplist()).
-spec(info(session()) -> emqx_types:infos()).
info(#session{clean_start = CleanStart,
max_subscriptions = MaxSubscriptions,
subscriptions = Subscriptions,
@ -187,37 +188,58 @@ info(#session{clean_start = CleanStart,
await_rel_timeout = AwaitRelTimeout,
expiry_interval = ExpiryInterval,
created_at = CreatedAt}) ->
[{clean_start, CleanStart},
{max_subscriptions, MaxSubscriptions},
{subscriptions, Subscriptions},
{upgrade_qos, UpgradeQoS},
{inflight, Inflight},
{retry_interval, RetryInterval},
{mqueue_len, emqx_mqueue:len(MQueue)},
{next_pkt_id, PacketId},
{awaiting_rel, AwaitingRel},
{max_awaiting_rel, MaxAwaitingRel},
{await_rel_timeout, AwaitRelTimeout},
{expiry_interval, ExpiryInterval div 1000},
{created_at, CreatedAt}].
#{clean_start => CleanStart,
subscriptions => Subscriptions,
max_subscriptions => MaxSubscriptions,
upgrade_qos => UpgradeQoS,
inflight => emqx_inflight:size(Inflight),
max_inflight => emqx_inflight:max_size(Inflight),
retry_interval => RetryInterval,
mqueue_len => emqx_mqueue:len(MQueue),
max_mqueue => emqx_mqueue:max_len(MQueue),
mqueue_dropped => emqx_mqueue:dropped(MQueue),
next_pkt_id => PacketId,
awaiting_rel => maps:size(AwaitingRel),
max_awaiting_rel => MaxAwaitingRel,
await_rel_timeout => AwaitRelTimeout,
expiry_interval => ExpiryInterval div 1000,
created_at => CreatedAt
}.
%% @doc Get session stats.
-spec(stats(session()) -> list({atom(), non_neg_integer()})).
stats(#session{max_subscriptions = MaxSubscriptions,
subscriptions = Subscriptions,
%%--------------------------------------------------------------------
%% Attrs of the session
%%--------------------------------------------------------------------
-spec(attrs(session()) -> emqx_types:attrs()).
attrs(#session{clean_start = CleanStart,
expiry_interval = ExpiryInterval,
created_at = CreatedAt}) ->
#{clean_start => CleanStart,
expiry_interval => ExpiryInterval,
created_at => CreatedAt
}.
%%--------------------------------------------------------------------
%% Stats of the session
%%--------------------------------------------------------------------
%% @doc Get stats of the session.
-spec(stats(session()) -> emqx_types:stats()).
stats(#session{subscriptions = Subscriptions,
max_subscriptions = MaxSubscriptions,
inflight = Inflight,
mqueue = MQueue,
max_awaiting_rel = MaxAwaitingRel,
awaiting_rel = AwaitingRel}) ->
[{max_subscriptions, MaxSubscriptions},
{subscriptions_count, maps:size(Subscriptions)},
awaiting_rel = AwaitingRel,
max_awaiting_rel = MaxAwaitingRel}) ->
[{subscriptions, maps:size(Subscriptions)},
{max_subscriptions, MaxSubscriptions},
{inflight, emqx_inflight:size(Inflight)},
{max_inflight, emqx_inflight:max_size(Inflight)},
{inflight_len, emqx_inflight:size(Inflight)},
{max_mqueue, emqx_mqueue:max_len(MQueue)},
{mqueue_len, emqx_mqueue:len(MQueue)},
{max_mqueue, emqx_mqueue:max_len(MQueue)},
{mqueue_dropped, emqx_mqueue:dropped(MQueue)},
{max_awaiting_rel, MaxAwaitingRel},
{awaiting_rel_len, maps:size(AwaitingRel)}].
{awaiting_rel, maps:size(AwaitingRel)},
{max_awaiting_rel, MaxAwaitingRel}].
%%--------------------------------------------------------------------
%% Client -> Broker: SUBSCRIBE

View File

@ -72,6 +72,11 @@
, command/0
]).
-export_type([ infos/0
, attrs/0
, stats/0
]).
-type(zone() :: emqx_zone:zone()).
-type(ver() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5).
-type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2).
@ -87,7 +92,6 @@
rap := 0 | 1,
nl := 0 | 1,
qos := qos(),
rc => reason_code(),
share => binary(),
atom() => term()
}).
@ -143,3 +147,7 @@
-type(plugin() :: #plugin{}).
-type(command() :: #command{}).
-type(infos() :: #{atom() => term()}).
-type(attrs() :: #{atom() => term()}).
-type(stats() :: list({atom(), non_neg_integer()})).