From 5a612657bee357daefa1428287d6e72db3d72003 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 10 Aug 2015 12:48:28 +0800 Subject: [PATCH] session --- src/emqttd_client.erl | 9 ++++- src/emqttd_ctl.erl | 90 ++++++++++++++++++++++++++++++++++++++++-- src/emqttd_plugins.erl | 3 +- src/emqttd_session.erl | 28 +++++++++++-- 4 files changed, 121 insertions(+), 9 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index b55326061..c2c08607f 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_client). -author("Feng Lee "). @@ -33,7 +34,7 @@ -include("emqttd_protocol.hrl"). %% API Function Exports --export([start_link/2, info/1]). +-export([start_link/2, info/1, kick/1]). -behaviour(gen_server). @@ -60,6 +61,9 @@ start_link(SockArgs, PktOpts) -> info(Pid) -> gen_server:call(Pid, info, infinity). +kick(Pid) -> + gen_server:call(Pid, kick). + init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> % Transform if ssl. {ok, NewSock} = esockd_connection:accept(SockArgs), @@ -84,6 +88,9 @@ handle_call(info, _From, State = #state{conn_name = ConnName, proto_state = ProtoState}) -> {reply, [{conn_name, ConnName} | emqttd_protocol:info(ProtoState)], State}; +handle_call(kick, _From, State) -> + {stop, {shutdown, kick}, ok, State}; + handle_call(Req, _From, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]), {reply, {error, unsupported_request}, State}. diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 201641407..b4add6e84 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -42,6 +42,8 @@ stats/1, metrics/1, cluster/1, + clients/1, + sessions/1, listeners/1, bridges/1, plugins/1, @@ -135,6 +137,41 @@ stats([]) -> metrics([]) -> [?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()]. + +clients(["list"]) -> + dump(client, mqtt_client); + +clients(["show", ClientId]) -> + case emqttd_cm:lookup(list_to_binary(ClientId)) of + undefined -> + ?PRINT_MSG("Not Found.~n"); + Client -> + print(client, Client) + end; + +clients(["kick", ClientId]) -> + case emqttd_cm:lookup(list_to_binary(ClientId)) of + undefined -> + ?PRINT_MSG("Not Found.~n"); + #mqtt_client{client_pid = Pid} -> + emqttd_client:kick(Pid) + end. + +sessions(["list"]) -> + dump(session, mqtt_transient_session), + dump(session, mqtt_persistent_session); + +sessions(["show", ClientId0]) -> + ClientId = list_to_binary(ClientId0), + case {ets:lookup(mqtt_transient_session, ClientId), + ets:lookup(mqtt_persistent_session, ClientId)} of + {[], []} -> + ?PRINT_MSG("Not Found.~n"); + {[SessInfo], _} -> + print(session, SessInfo); + {_, [SessInfo]} -> + print(session, SessInfo) + end. listeners([]) -> lists:foreach(fun({{Protocol, Port}, Pid}) -> @@ -178,9 +215,7 @@ bridges(["stop", SNode, Topic]) -> end. plugins(["list"]) -> - lists:foreach(fun(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> - ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", [Name, Ver, Descr, Active]) - end, emqttd_plugins:list()); + lists:foreach(fun(Plugin) -> print(plugin, Plugin) end, emqttd_plugins:list()); plugins(["load", Name]) -> case emqttd_plugins:load(list_to_atom(Name)) of @@ -223,6 +258,7 @@ stop_trace(Who, Name) -> ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error]) end. + node_name(SNode) -> SNode1 = case string:tokens(SNode, "@") of @@ -260,3 +296,51 @@ parse_opt(bridge, queue, Len) -> parse_opt(_Cmd, Opt, _Val) -> ?PRINT("Bad Option: ~s~n", [Opt]). +dump(Type, Table) -> + dump(Type, Table, ets:first(Table)). + +dump(_Type, _Table, '$end_of_table') -> + ok; +dump(Type, Table, Key) -> + case ets:lookup(Table, Key) of + [Record] -> print(Type, Record); + [] -> ignore + end, + dump(Type, Table, ets:next(Table, Key)). + +print(client, #mqtt_client{client_id = ClientId, clean_sess = CleanSess, + username = Username, peername = Peername, + connected_at = ConnectedAt}) -> + ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n", + [ClientId, CleanSess, Username, + emqttd_net:format(Peername), + emqttd_util:now_to_secs(ConnectedAt)]); + +print(session, {ClientId, SessInfo}) -> + InfoKeys = [clean_sess, + max_inflight, + inflight_queue, + message_queue, + awaiting_rel, + awaiting_ack, + awaiting_comp, + created_at, + subscriptions], + ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, " + "message_queue=~w, awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, " + "created_at=~w, subscriptions=~s)~n", + [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]); + +print(plugin, #mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> + ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", + [Name, Ver, Descr, Active]). + +format(created_at, Val) -> + emqttd_util:now_to_secs(Val); + +format(subscriptions, List) -> + string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ","); + +format(_, Val) -> + Val. + diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index b3ccc122d..bc3d8f93e 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -173,7 +173,8 @@ set_config([{AppName, Envs} | Config]) -> start_app(App, SuccFun) -> case application:ensure_all_started(App) of {ok, Started} -> - lager:info("Started Apps: ~p, load plugin ~p successfully", [Started, App]), + lager:info("Started Apps: ~p", [Started]), + lager:info("Load plugin ~p successfully", [App]), SuccFun(App), {ok, Started}; {error, {ErrApp, Reason}} -> diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index dbae32bfb..38ab89a27 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -126,6 +126,10 @@ expired_after = 172800, expired_timer, + + collect_interval, + + collect_timer, timestamp}). @@ -231,9 +235,11 @@ init([CleanSess, ClientId, ClientPid]) -> await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv), max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv), expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600, + collect_interval = emqttd_opts:g(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, emqttd_sm:register_session(CleanSess, ClientId, info(Session)), - {ok, Session, hibernate}. + %% start statistics + {ok, start_collector(Session), hibernate}. handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> @@ -483,6 +489,10 @@ handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = Clie noreply(Session) end; +handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) -> + emqttd_sm:register_session(CleanSess, ClientId, info(Session)), + {noreply, start_collector(Session), hibernate}; + handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, client_pid = ClientPid}) -> {stop, normal, Session}; @@ -630,7 +640,15 @@ cancel_timer(Ref) -> noreply(State) -> {noreply, State, hibernate}. -info(#session{subscriptions = Subscriptions, +start_collector(Session = #session{collect_interval = 0}) -> + Session; + +start_collector(Session = #session{collect_interval = Interval}) -> + TRef = erlang:send_after(Interval * 1000, self(), collect_info), + Session#session{collect_timer = TRef}. + +info(#session{clean_sess = CleanSess, + subscriptions = Subscriptions, inflight_queue = InflightQueue, max_inflight = MaxInflight, message_queue = MessageQueue, @@ -638,9 +656,11 @@ info(#session{subscriptions = Subscriptions, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, timestamp = CreatedAt}) -> - [{pid, self()}, {subscriptions, Subscriptions}, + [{pid, self()}, + {clean_sess, CleanSess}, + {subscriptions, Subscriptions}, {max_inflight, MaxInflight}, - {inflight_queue, lists:length(InflightQueue)}, + {inflight_queue, length(InflightQueue)}, {message_queue, emqttd_mqueue:len(MessageQueue)}, {awaiting_rel, maps:size(AwaitingRel)}, {awaiting_ack, maps:size(AwaitingAck)},