Add emqx_connection:attrs/1, emqx_protocol:attrs/1, emqx_session:attrs/1 APIs

This commit is contained in:
Feng Lee 2018-08-30 03:03:19 +08:00
parent 98824a56c2
commit 53a2f93b7e
6 changed files with 215 additions and 178 deletions

View File

@ -20,7 +20,9 @@
-include("emqx_mqtt.hrl").
-export([start_link/3]).
-export([info/1, stats/1, kick/1]).
-export([info/1, attrs/1]).
-export([stats/1]).
-export([kick/1]).
-export([session/1]).
%% gen_server callbacks
@ -49,7 +51,7 @@
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
-define(LOG(Level, Format, Args, State),
emqx_logger:Level("Client(~s): " ++ Format,
emqx_logger:Level("MQTT(~s): " ++ Format,
[esockd_net:format(State#state.peername) | Args])).
start_link(Transport, Socket, Options) ->
@ -59,17 +61,58 @@ start_link(Transport, Socket, Options) ->
%% API
%%------------------------------------------------------------------------------
info(CPid) ->
call(CPid, info).
%% for debug
info(CPid) when is_pid(CPid) ->
call(CPid, info);
stats(CPid) ->
call(CPid, stats).
info(#state{transport = Transport,
socket = Socket,
peername = Peername,
sockname = Sockname,
conn_state = ConnState,
await_recv = AwaitRecv,
rate_limit = RateLimit,
publish_limit = PubLimit,
proto_state = ProtoState}) ->
ConnInfo = [{socktype, Transport:type(Socket)},
{peername, Peername},
{sockname, Sockname},
{conn_state, ConnState},
{await_recv, AwaitRecv},
{rate_limit, esockd_rate_limit:info(RateLimit)},
{publish_limit, esockd_rate_limit:info(PubLimit)}],
ProtoInfo = emqx_protocol:info(ProtoState),
lists:usort(lists:append(ConnInfo, ProtoInfo)).
kick(CPid) ->
call(CPid, kick).
%% for dashboard
attrs(CPid) when is_pid(CPid) ->
call(CPid, attrs);
session(CPid) ->
call(CPid, session).
attrs(#state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
SockAttrs = [{peername, Peername},
{sockname, Sockname}],
ProtoAttrs = emqx_protocol:attrs(ProtoState),
lists:usort(lists:append(SockAttrs, ProtoAttrs)).
%% Conn stats
stats(CPid) when is_pid(CPid) ->
call(CPid, stats);
stats(#state{transport = Transport,
socket = Socket,
proto_state = ProtoState}) ->
lists:append([emqx_misc:proc_stats(),
emqx_protocol:stats(ProtoState),
case Transport:getstat(Socket, ?SOCK_STATS) of
{ok, Ss} -> Ss;
{error, _} -> []
end]).
kick(CPid) -> call(CPid, kick).
session(CPid) -> call(CPid, session).
call(CPid, Req) ->
gen_server:call(CPid, Req, infinity).
@ -131,38 +174,17 @@ send_fun(Transport, Socket, Peername) ->
end
end.
handle_call(info, _From, State = #state{transport = Transport,
socket = Socket,
peername = Peername,
sockname = Sockname,
conn_state = ConnState,
await_recv = AwaitRecv,
rate_limit = RateLimit,
publish_limit = PubLimit,
proto_state = ProtoState}) ->
ConnInfo = [{socktype, Transport:type(Socket)},
{peername, Peername},
{sockname, Sockname},
{conn_state, ConnState},
{await_recv, AwaitRecv},
{rate_limit, esockd_rate_limit:info(RateLimit)},
{publish_limit, esockd_rate_limit:info(PubLimit)}],
ProtoInfo = emqx_protocol:info(ProtoState),
{reply, lists:usort(lists:append([ConnInfo, ProtoInfo])), State};
handle_call(info, _From, State) ->
{reply, info(State), State};
handle_call(stats, _From, State = #state{transport = Transport,
socket = Socket,
proto_state = ProtoState}) ->
ProcStats = emqx_misc:proc_stats(),
ProtoStats = emqx_protocol:stats(ProtoState),
SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of
{ok, Ss} -> Ss;
{error, _} -> []
end,
{reply, lists:append([ProcStats, ProtoStats, SockStats]), State};
handle_call(attrs, _From, State) ->
{reply, attrs(State), State};
handle_call(stats, _From, State) ->
{reply, stats(State), State};
handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State};
{stop, {shutdown, kicked}, ok, State};
handle_call(session, _From, State = #state{proto_state = ProtoState}) ->
{reply, emqx_protocol:session(ProtoState), State};
@ -186,8 +208,7 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
end;
handle_info(emit_stats, State = #state{proto_state = ProtoState}) ->
Stats = element(2, handle_call(stats, undefined, State)),
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), Stats),
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
{noreply, State#state{stats_timer = undefined}, hibernate};
handle_info(timeout, State) ->

View File

@ -47,7 +47,7 @@ banned(ClientId) ->
%%--------------------------------------------------------------------
init([]) ->
_ = ets:new(banned, [public, ordered_set, named_table]),
%% ets:new(banned, [public, ordered_set, named_table]),
{ok, #state{}}.
handle_call(_Request, _From, State) ->

View File

@ -19,8 +19,6 @@
%% Memory: (10, 100, 1000)
%%
%%-record
-export([conn_max_gc_count/0, reset_conn_gc_count/2, maybe_force_gc/2,
maybe_force_gc/3]).

View File

@ -28,8 +28,8 @@
load(Topics) ->
emqx_hooks:add('session.created', fun ?MODULE:on_session_created/3, [Topics]).
on_session_created(#{client_id := ClientId}, SessInfo, Topics) ->
Username = proplists:get_value(username, SessInfo),
on_session_created(#{client_id := ClientId}, SessAttrs, Topics) ->
Username = proplists:get_value(username, SessAttrs),
Replace = fun(Topic) ->
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
end,

View File

@ -17,7 +17,11 @@
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-export([init/2, info/1, caps/1, stats/1]).
-export([init/2]).
-export([info/1]).
-export([attrs/1]).
-export([caps/1]).
-export([stats/1]).
-export([client_id/1]).
-export([credentials/1]).
-export([parser/1]).
@ -28,22 +32,6 @@
-export([send/2]).
-export([shutdown/2]).
%%-record(mqtt_client, {
%% client_id :: binary() | undefined,
%% client_pid :: pid(),
%% username :: binary() | undefined,
%% peername :: {inet:ip_address(), inet:port_number()},
%% clean_start :: boolean(),
%% proto_ver :: emqx_mqtt_types:version(),
%% keepalive = 0 :: non_neg_integer(),
%% will_topic :: undefined | binary(),
%% mountpoint :: undefined | binary(),
%% connected_at :: erlang:timestamp(),
%% attributes :: map()
%% }).
-record(pstate, {
zone,
sendfun,
@ -61,6 +49,7 @@
clean_start,
topic_aliases,
packet_size,
will_topic,
will_msg,
keepalive,
mountpoint,
@ -81,7 +70,7 @@
-endif.
-define(LOG(Level, Format, Args, PState),
emqx_logger:Level([{client, PState#pstate.client_id}], "Client(~s@~s): " ++ Format,
emqx_logger:Level([{client, PState#pstate.client_id}], "MQTT(~s@~s): " ++ Format,
[PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])).
%%------------------------------------------------------------------------------
@ -127,33 +116,46 @@ set_username(_Username, PState) ->
%% API
%%------------------------------------------------------------------------------
info(#pstate{zone = Zone,
client_id = ClientId,
username = Username,
peername = Peername,
proto_ver = ProtoVer,
proto_name = ProtoName,
clean_start = CleanStart,
conn_props = ConnProps,
keepalive = Keepalive,
mountpoint = Mountpoint,
is_super = IsSuper,
is_bridge = IsBridge,
connected = Connected,
connected_at = ConnectedAt}) ->
info(PState = #pstate{conn_props = ConnProps,
ack_props = AclProps,
session = Session,
topic_aliases = Aliases,
will_msg = WillMsg,
enable_acl = EnableAcl}) ->
attrs(PState) ++ [{conn_props, ConnProps},
{ack_props, AclProps},
{session, Session},
{topic_aliases, Aliases},
{will_msg, WillMsg},
{enable_acl, EnableAcl}].
attrs(#pstate{zone = Zone,
client_id = ClientId,
username = Username,
peername = Peername,
peercert = Peercert,
clean_start = CleanStart,
proto_ver = ProtoVer,
proto_name = ProtoName,
keepalive = Keepalive,
will_topic = WillTopic,
mountpoint = Mountpoint,
is_super = IsSuper,
is_bridge = IsBridge,
connected_at = ConnectedAt}) ->
[{zone, Zone},
{client_id, ClientId},
{username, Username},
{peername, Peername},
{peercert, Peercert},
{proto_ver, ProtoVer},
{proto_name, ProtoName},
{conn_props, ConnProps},
{clean_start, CleanStart},
{keepalive, Keepalive},
{will_topic, WillTopic},
{mountpoint, Mountpoint},
{is_super, IsSuper},
{is_bridge, IsBridge},
{connected, Connected},
{connected_at, ConnectedAt}].
caps(#pstate{zone = Zone}) ->
@ -254,6 +256,7 @@ process_packet(?CONNECT_PACKET(
clean_start = CleanStart,
keepalive = Keepalive,
properties = ConnProps,
will_topic = WillTopic,
client_id = ClientId,
username = Username,
password = Password} = Connect), PState) ->
@ -269,9 +272,9 @@ process_packet(?CONNECT_PACKET(
clean_start = CleanStart,
keepalive = Keepalive,
conn_props = ConnProps,
will_topic = WillTopic,
will_msg = WillMsg,
is_bridge = IsBridge,
connected = true,
connected_at = os:timestamp()}),
connack(
@ -284,8 +287,8 @@ process_packet(?CONNECT_PACKET(
%% Open session
case try_open_session(PState3) of
{ok, SPid, SP} ->
PState4 = PState3#pstate{session = SPid},
ok = emqx_cm:register_connection(client_id(PState4), info(PState4)),
PState4 = PState3#pstate{session = SPid, connected = true},
ok = emqx_cm:register_connection(client_id(PState4), attrs(PState4)),
%% Start keepalive
start_keepalive(Keepalive, PState4),
%% Success
@ -521,7 +524,8 @@ try_open_session(#pstate{zone = Zone,
username => Username,
clean_start => CleanStart,
conn_props => ConnProps}) of
{ok, SPid} -> {ok, SPid, false};
{ok, SPid} ->
{ok, SPid, false};
Other -> Other
end.

View File

@ -44,7 +44,8 @@
-include("emqx_mqtt.hrl").
-export([start_link/1]).
-export([info/1, stats/1]).
-export([info/1, attrs/1]).
-export([stats/1]).
-export([resume/2, discard/2]).
-export([subscribe/2, subscribe/4]).
-export([publish/3]).
@ -94,8 +95,8 @@
%% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked.
inflight :: emqx_inflight:inflight(),
%% Max Inflight Size
max_inflight = 32 :: non_neg_integer(),
%% Max Inflight Size. DEPRECATED: Get from inflight
%% max_inflight = 32 :: non_neg_integer(),
%% Retry interval for redelivering QoS1/2 messages
retry_interval = 20000 :: timeout(),
@ -145,11 +146,6 @@
-define(TIMEOUT, 60000).
-define(INFO_KEYS, [clean_start, client_id, username, binding, conn_pid, old_conn_pid,
next_pkt_id, max_subscriptions, subscriptions, upgrade_qos, inflight,
max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel,
await_rel_timeout, expiry_interval, enable_stats, created_at]).
-define(LOG(Level, Format, Args, State),
emqx_logger:Level([{client, State#state.client_id}],
"Session(~s): " ++ Format, [State#state.client_id | Args])).
@ -160,6 +156,77 @@ start_link(SessAttrs) ->
IdleTimeout = maps:get(idle_timeout, SessAttrs, 30000),
gen_server:start_link(?MODULE, SessAttrs, [{hibernate_after, IdleTimeout}]).
%% @doc Get session info
-spec(info(pid() | #state{}) -> list({atom(), term()})).
info(SPid) when is_pid(SPid) ->
gen_server:call(SPid, info, infinity);
info(State = #state{conn_pid = ConnPid,
next_pkt_id = PktId,
max_subscriptions = MaxSubscriptions,
subscriptions = Subscriptions,
upgrade_qos = UpgradeQoS,
inflight = Inflight,
retry_interval = RetryInterval,
mqueue = MQueue,
awaiting_rel = AwaitingRel,
max_awaiting_rel = MaxAwaitingRel,
await_rel_timeout = AwaitRelTimeout}) ->
attrs(State) ++ [{conn_pid, ConnPid},
{next_pkt_id, PktId},
{max_subscriptions, MaxSubscriptions},
{subscriptions, Subscriptions},
{upgrade_qos, UpgradeQoS},
{inflight, Inflight},
{retry_interval, RetryInterval},
{mqueue_len, MQueue},
{awaiting_rel, AwaitingRel},
{max_awaiting_rel, MaxAwaitingRel},
{await_rel_timeout, AwaitRelTimeout}].
%% @doc Get session attrs
-spec(attrs(pid() | #state{}) -> list({atom(), term()})).
attrs(SPid) when is_pid(SPid) ->
gen_server:call(SPid, attrs, infinity);
attrs(#state{clean_start = CleanStart,
binding = Binding,
client_id = ClientId,
username = Username,
expiry_interval = ExpiryInterval,
created_at = CreatedAt}) ->
[{clean_start, CleanStart},
{binding, Binding},
{client_id, ClientId},
{username, Username},
{expiry_interval, ExpiryInterval div 1000},
{created_at, CreatedAt}].
-spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})).
stats(SPid) when is_pid(SPid) ->
gen_server:call(SPid, stats, infinity);
stats(#state{max_subscriptions = MaxSubscriptions,
subscriptions = Subscriptions,
inflight = Inflight,
mqueue = MQueue,
max_awaiting_rel = MaxAwaitingRel,
awaiting_rel = AwaitingRel,
deliver_stats = DeliverMsg,
enqueue_stats = EnqueueMsg}) ->
lists:append(emqx_misc:proc_stats(),
[{max_subscriptions, MaxSubscriptions},
{subscriptions_num, maps:size(Subscriptions)},
{max_inflight, emqx_inflight:max_size(Inflight)},
{inflight_len, emqx_inflight:size(Inflight)},
{max_mqueue, emqx_mqueue:max_len(MQueue)},
{mqueue_len, emqx_mqueue:len(MQueue)},
{mqueue_dropped, emqx_mqueue:dropped(MQueue)},
{max_awaiting_rel, MaxAwaitingRel},
{awaiting_rel_len, maps:size(AwaitingRel)},
{deliver_msg, DeliverMsg},
{enqueue_msg, EnqueueMsg}]).
%%------------------------------------------------------------------------------
%% PubSub API
%%------------------------------------------------------------------------------
@ -229,70 +296,6 @@ unsubscribe(SPid, PacketId, Properties, TopicFilters) ->
resume(SPid, ConnPid) ->
gen_server:cast(SPid, {resume, ConnPid}).
%% @doc Get session info
-spec(info(pid() | #state{}) -> list(tuple())).
info(SPid) when is_pid(SPid) ->
gen_server:call(SPid, info);
info(#state{clean_start = CleanStart,
binding = Binding,
client_id = ClientId,
username = Username,
max_subscriptions = MaxSubscriptions,
subscriptions = Subscriptions,
upgrade_qos = UpgradeQoS,
inflight = Inflight,
max_inflight = MaxInflight,
retry_interval = RetryInterval,
mqueue = MQueue,
awaiting_rel = AwaitingRel,
max_awaiting_rel = MaxAwaitingRel,
await_rel_timeout = AwaitRelTimeout,
expiry_interval = ExpiryInterval,
created_at = CreatedAt}) ->
[{clean_start, CleanStart},
{binding, Binding},
{client_id, ClientId},
{username, Username},
{max_subscriptions, MaxSubscriptions},
{subscriptions, maps:size(Subscriptions)},
{upgrade_qos, UpgradeQoS},
{inflight, emqx_inflight:size(Inflight)},
{max_inflight, MaxInflight},
{retry_interval, RetryInterval},
{mqueue_len, emqx_mqueue:len(MQueue)},
{awaiting_rel, maps:size(AwaitingRel)},
{max_awaiting_rel, MaxAwaitingRel},
{await_rel_timeout, AwaitRelTimeout},
{expiry_interval, ExpiryInterval},
{created_at, CreatedAt}].
-spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})).
stats(SPid) when is_pid(SPid) ->
gen_server:call(SPid, stats, infinity);
stats(#state{max_subscriptions = MaxSubscriptions,
subscriptions = Subscriptions,
inflight = Inflight,
max_inflight = MaxInflight,
mqueue = MQueue,
max_awaiting_rel = MaxAwaitingRel,
awaiting_rel = AwaitingRel,
deliver_stats = DeliverMsg,
enqueue_stats = EnqueueMsg}) ->
lists:append(emqx_misc:proc_stats(),
[{max_subscriptions, MaxSubscriptions},
{subscriptions, maps:size(Subscriptions)},
{max_inflight, MaxInflight},
{inflight_len, emqx_inflight:size(Inflight)},
{max_mqueue, emqx_mqueue:max_len(MQueue)},
{mqueue_len, emqx_mqueue:len(MQueue)},
{mqueue_dropped, emqx_mqueue:dropped(MQueue)},
{max_awaiting_rel, MaxAwaitingRel},
{awaiting_rel_len, maps:size(AwaitingRel)},
{deliver_msg, DeliverMsg},
{enqueue_msg, EnqueueMsg}]).
%% @doc Discard the session
-spec(discard(pid(), emqx_types:client_id()) -> ok).
discard(SPid, ClientId) ->
@ -311,7 +314,7 @@ init(#{zone := Zone,
username := Username,
conn_pid := ConnPid,
clean_start := CleanStart,
conn_props := _ConnProps}) ->
conn_props := ConnProps}) ->
process_flag(trap_exit, true),
true = link(ConnPid),
MaxInflight = get_env(Zone, max_inflight),
@ -323,21 +326,26 @@ init(#{zone := Zone,
subscriptions = #{},
max_subscriptions = get_env(Zone, max_subscriptions, 0),
upgrade_qos = get_env(Zone, upgrade_qos, false),
max_inflight = MaxInflight,
inflight = emqx_inflight:new(MaxInflight),
mqueue = init_mqueue(Zone, ClientId),
retry_interval = get_env(Zone, retry_interval, 0),
awaiting_rel = #{},
await_rel_timeout = get_env(Zone, await_rel_timeout),
max_awaiting_rel = get_env(Zone, max_awaiting_rel),
expiry_interval = get_env(Zone, session_expiry_interval),
expiry_interval = expire_interval(Zone, ConnProps),
enable_stats = get_env(Zone, enable_stats, true),
deliver_stats = 0,
enqueue_stats = 0,
deliver_stats = 0,
enqueue_stats = 0,
created_at = os:timestamp()},
emqx_sm:register_session(ClientId, info(State)),
emqx_sm:register_session(ClientId, [{zone, Zone} | attrs(State)]),
emqx_sm:set_session_stats(ClientId, stats(State)),
emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
{ok, ensure_stats_timer(State), hibernate}.
{ok, State}.
expire_interval(_Zone, #{'Session-Expiry-Interval' := I}) ->
I * 1000;
expire_interval(Zone, _ConnProps) -> %% Maybe v3.1.1
get_env(Zone, session_expiry_interval, 0).
init_mqueue(Zone, ClientId) ->
emqx_mqueue:new(ClientId, #{type => simple,
@ -399,6 +407,9 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From,
handle_call(info, _From, State) ->
reply(info(State), State);
handle_call(attrs, _From, State) ->
reply(attrs(State), State);
handle_call(stats, _From, State) ->
reply(stats(State), State);
@ -501,7 +512,7 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
%% Clean Session: true -> false?
CleanStart andalso emqx_sm:set_session_attrs(ClientId, info(State1)),
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, info(State)]),
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
%% Replay delivery and Dequeue pending messages
{noreply, ensure_stats_timer(dequeue(retry_delivery(true, State1)))};
@ -541,20 +552,18 @@ handle_info({timeout, _Timer, expired}, State) ->
?LOG(info, "Expired, shutdown now.", [], State),
shutdown(expired, State);
handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start= true, conn_pid = ConnPid}) ->
{stop, normal, State};
handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = true, conn_pid = ConnPid}) ->
{stop, Reason, State};
handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = false,
conn_pid = ConnPid,
expiry_interval = Interval}) ->
?LOG(info, "Connection ~p EXIT for ~p", [ConnPid, Reason], State),
ExpireTimer = emqx_misc:start_timer(Interval, expired),
State1 = State#state{conn_pid = undefined, expiry_timer = ExpireTimer},
{noreply, State1};
handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) ->
{stop, Reason, State};
handle_info({'EXIT', Pid, _Reason}, State = #state{old_conn_pid = Pid}) ->
handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start = false, conn_pid = ConnPid}) ->
{noreply, ensure_expire_timer(State#state{conn_pid = undefined})};
handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
%% ignore
{noreply, State, hibernate};
{noreply, State#state{old_conn_pid = undefined}, hibernate};
handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
?LOG(error, "unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
@ -571,6 +580,7 @@ handle_info(Info, State) ->
terminate(Reason, #state{client_id = ClientId}) ->
emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
%%TODO: notify conn_pid to shutdown?
emqx_sm:unregister_session(ClientId).
code_change(_OldVsn, State, _Extra) ->
@ -819,6 +829,11 @@ ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_tim
ensure_await_rel_timer(State) ->
State.
ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 ->
State#state{expiry_timer = emqx_misc:start_timer(Interval, expired)};
ensure_expire_timer(State) ->
State.
%%------------------------------------------------------------------------------
%% Reset Dup
@ -837,8 +852,7 @@ next_pkt_id(State = #state{next_pkt_id = Id}) ->
%%------------------------------------------------------------------------------
%% Ensure stats timer
ensure_stats_timer(State = #state{enable_stats = true,
stats_timer = undefined}) ->
ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined}) ->
State#state{stats_timer = erlang:send_after(30000, self(), emit_stats)};
ensure_stats_timer(State) ->
State.
@ -857,5 +871,5 @@ reply(Reply, State) ->
shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.
%%TODO: maybe_gc(State) -> State.
%% TODO: maybe_gc(State) -> State.