diff --git a/CHANGELOG.md b/CHANGELOG.md index 870c3c18a..4a63afae5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,18 @@ emqttd ChangeLog ================== +0.10.0-alpha (2015-08-10) +------------------------- + +New Plugin Architecture + +Web Dashboard + +MySQL Authentication and ACL Plugin + +Session Statistics + + 0.9.3-alpha (2015-07-25) ------------------------- diff --git a/Makefile b/Makefile index b319d3928..a4121ee8c 100644 --- a/Makefile +++ b/Makefile @@ -34,8 +34,11 @@ rel: compile plugins: @for plugin in ./plugins/* ; do \ if [ -d $${plugin} ]; then \ - echo "copy $${plugin}"; \ - cp -R $${plugin} $(DIST)/plugins/ && rm -rf $(DIST)/$${plugin}/src/ ; \ + mkdir -p $(DIST)/$${plugin}/ ; \ + cp -R $${plugin}/ebin $(DIST)/$${plugin}/ ; \ + [ -d "$${plugin}/priv" ] && cp -R $${plugin}/priv $(DIST)/$${plugin}/ ; \ + [ -d "$${plugin}/etc" ] && cp -R $${plugin}/etc $(DIST)/$${plugin}/ ; \ + echo "$${plugin} copied" ; \ fi \ done diff --git a/PLUGIN.md b/PLUGIN.md index 7991de2cd..26993390a 100644 --- a/PLUGIN.md +++ b/PLUGIN.md @@ -1,4 +1,8 @@ +git submodule init + +or + +git submodule update --remote + Please see [Plugin Design](https://github.com/emqtt/emqttd/wiki/Plugin%20Design). - - diff --git a/plugins/emqttd_dashboard b/plugins/emqttd_dashboard index 2d3c9aeab..dd202346f 160000 --- a/plugins/emqttd_dashboard +++ b/plugins/emqttd_dashboard @@ -1 +1 @@ -Subproject commit 2d3c9aeabeb5289b9ae27c503f017ad71bd81174 +Subproject commit dd202346fcfce6b0ae8da76bb7233e91db996bfa diff --git a/plugins/emysql b/plugins/emysql index 38927104b..3305c1ad9 160000 --- a/plugins/emysql +++ b/plugins/emysql @@ -1 +1 @@ -Subproject commit 38927104b44b3f8d237bcf3a2b50f2e0608291b3 +Subproject commit 3305c1ad951e091c198ae10ca852752ca598e5b0 diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 9ee5fcb29..b4681e448 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -87,9 +87,6 @@ ]}, %% Session {session, [ - %% Expired after 2 days - {expired_after, 48}, - %% Max number of QoS 1 and 2 messages that can be “in flight” at one time. %% 0 means no limit {max_inflight, 100}, @@ -104,7 +101,13 @@ {await_rel_timeout, 8}, %% Max Packets that Awaiting PUBREL, 0 means no limit - {max_awaiting_rel, 0} + {max_awaiting_rel, 0}, + + %% Statistics Collection Interval(seconds) + {collect_interval, 10}, + + %% Expired after 2 days + {expired_after, 48} ]}, %% Session @@ -176,12 +179,12 @@ %% Size of acceptor pool {acceptors, 16}, %% Maximum number of concurrent clients - {max_clients, 1024}, + {max_clients, 512}, %% Socket Access Control {access, [{allow, all}]}, %% Socket Options {sockopts, [ - {backlog, 1024} + {backlog, 512} %Set buffer if hight thoughtput %{recbuf, 4096}, %{sndbuf, 4096} @@ -192,7 +195,7 @@ %% Size of acceptor pool {acceptors, 4}, %% Maximum number of concurrent clients - {max_clients, 1024}, + {max_clients, 512}, %% Socket Access Control {access, [{allow, all}]}, %% SSL certificate and key files @@ -226,7 +229,7 @@ %% Size of acceptor pool {acceptors, 4}, %% Maximum number of concurrent clients - {max_clients, 512}, + {max_clients, 64}, %% Socket Access Control {access, [{allow, all}]}, %% Socket Options diff --git a/rel/files/emqttd_ctl b/rel/files/emqttd_ctl index f98a3da66..41f6d0433 100755 --- a/rel/files/emqttd_ctl +++ b/rel/files/emqttd_ctl @@ -242,6 +242,45 @@ case "$1" in exit 1 fi ;; + clients) + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + if [ $# -eq 2 -a $2 = "list" ]; then + $NODETOOL rpc emqttd_ctl clients list + elif [ $# -eq 3 ]; then + shift + $NODETOOL rpc emqttd_ctl clients $@ + else + echo "Usage: " + echo "$SCRIPT clients list" + echo "$SCRIPT clients show " + echo "$SCRIPT clients kick " + exit 1 + fi + ;; + sessions) + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + if [ $# -eq 2 -a $2 = "list" ]; then + $NODETOOL rpc emqttd_ctl sessions list + elif [ $# -eq 3 ]; then + shift + $NODETOOL rpc emqttd_ctl sessions $@ + else + echo "Usage: " + echo "$SCRIPT sessions list" + echo "$SCRIPT sessions show " + exit 1 + fi + ;; plugins) # Make sure the local node IS running RES=`$NODETOOL ping` @@ -309,6 +348,12 @@ case "$1" in echo " metrics #query broker metrics" echo " cluster [] #query or cluster nodes" echo " ----------------------------------------------------------------" + echo " clients list #list all clients" + echo " clients show #show a client" + echo " clients kick #kick a client" + echo " session list #list all sessions" + echo " session show #show a sessions" + echo " ----------------------------------------------------------------" echo " plugins list #query loaded plugins" echo " plugins load #load plugin" echo " plugins unload #unload plugin" diff --git a/rel/files/vm.args b/rel/files/vm.args index 3e9784935..0f3894c92 100644 --- a/rel/files/vm.args +++ b/rel/files/vm.args @@ -22,10 +22,10 @@ ## Enable kernel poll and a few async threads +K true -+A 32 ++A 16 ## max process numbers -+P 1000000 ++P 8192 ##------------------------------------------------------------------------- ## Env @@ -36,8 +36,7 @@ -env ERTS_MAX_PORTS 4096 -#-env ERL_MAX_ETS_TABLES 1024 +-env ERL_MAX_ETS_TABLES 1024 ## Tweak GC to run more often -##-env ERL_FULLSWEEP_AFTER 1000 -# +-env ERL_FULLSWEEP_AFTER 1000 diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index b55326061..c2c08607f 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_client). -author("Feng Lee "). @@ -33,7 +34,7 @@ -include("emqttd_protocol.hrl"). %% API Function Exports --export([start_link/2, info/1]). +-export([start_link/2, info/1, kick/1]). -behaviour(gen_server). @@ -60,6 +61,9 @@ start_link(SockArgs, PktOpts) -> info(Pid) -> gen_server:call(Pid, info, infinity). +kick(Pid) -> + gen_server:call(Pid, kick). + init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> % Transform if ssl. {ok, NewSock} = esockd_connection:accept(SockArgs), @@ -84,6 +88,9 @@ handle_call(info, _From, State = #state{conn_name = ConnName, proto_state = ProtoState}) -> {reply, [{conn_name, ConnName} | emqttd_protocol:info(ProtoState)], State}; +handle_call(kick, _From, State) -> + {stop, {shutdown, kick}, ok, State}; + handle_call(Req, _From, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]), {reply, {error, unsupported_request}, State}. diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 201641407..b4add6e84 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -42,6 +42,8 @@ stats/1, metrics/1, cluster/1, + clients/1, + sessions/1, listeners/1, bridges/1, plugins/1, @@ -135,6 +137,41 @@ stats([]) -> metrics([]) -> [?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()]. + +clients(["list"]) -> + dump(client, mqtt_client); + +clients(["show", ClientId]) -> + case emqttd_cm:lookup(list_to_binary(ClientId)) of + undefined -> + ?PRINT_MSG("Not Found.~n"); + Client -> + print(client, Client) + end; + +clients(["kick", ClientId]) -> + case emqttd_cm:lookup(list_to_binary(ClientId)) of + undefined -> + ?PRINT_MSG("Not Found.~n"); + #mqtt_client{client_pid = Pid} -> + emqttd_client:kick(Pid) + end. + +sessions(["list"]) -> + dump(session, mqtt_transient_session), + dump(session, mqtt_persistent_session); + +sessions(["show", ClientId0]) -> + ClientId = list_to_binary(ClientId0), + case {ets:lookup(mqtt_transient_session, ClientId), + ets:lookup(mqtt_persistent_session, ClientId)} of + {[], []} -> + ?PRINT_MSG("Not Found.~n"); + {[SessInfo], _} -> + print(session, SessInfo); + {_, [SessInfo]} -> + print(session, SessInfo) + end. listeners([]) -> lists:foreach(fun({{Protocol, Port}, Pid}) -> @@ -178,9 +215,7 @@ bridges(["stop", SNode, Topic]) -> end. plugins(["list"]) -> - lists:foreach(fun(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> - ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", [Name, Ver, Descr, Active]) - end, emqttd_plugins:list()); + lists:foreach(fun(Plugin) -> print(plugin, Plugin) end, emqttd_plugins:list()); plugins(["load", Name]) -> case emqttd_plugins:load(list_to_atom(Name)) of @@ -223,6 +258,7 @@ stop_trace(Who, Name) -> ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error]) end. + node_name(SNode) -> SNode1 = case string:tokens(SNode, "@") of @@ -260,3 +296,51 @@ parse_opt(bridge, queue, Len) -> parse_opt(_Cmd, Opt, _Val) -> ?PRINT("Bad Option: ~s~n", [Opt]). +dump(Type, Table) -> + dump(Type, Table, ets:first(Table)). + +dump(_Type, _Table, '$end_of_table') -> + ok; +dump(Type, Table, Key) -> + case ets:lookup(Table, Key) of + [Record] -> print(Type, Record); + [] -> ignore + end, + dump(Type, Table, ets:next(Table, Key)). + +print(client, #mqtt_client{client_id = ClientId, clean_sess = CleanSess, + username = Username, peername = Peername, + connected_at = ConnectedAt}) -> + ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n", + [ClientId, CleanSess, Username, + emqttd_net:format(Peername), + emqttd_util:now_to_secs(ConnectedAt)]); + +print(session, {ClientId, SessInfo}) -> + InfoKeys = [clean_sess, + max_inflight, + inflight_queue, + message_queue, + awaiting_rel, + awaiting_ack, + awaiting_comp, + created_at, + subscriptions], + ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, " + "message_queue=~w, awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, " + "created_at=~w, subscriptions=~s)~n", + [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]); + +print(plugin, #mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> + ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", + [Name, Ver, Descr, Active]). + +format(created_at, Val) -> + emqttd_util:now_to_secs(Val); + +format(subscriptions, List) -> + string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ","); + +format(_, Val) -> + Val. + diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index d9f3c3c58..803104c5e 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -58,7 +58,8 @@ -export([new/3, name/1, is_empty/1, is_full/1, - len/1, in/2, out/1]). + len/1, max_len/1, + in/2, out/1]). -define(LOW_WM, 0.2). @@ -108,6 +109,8 @@ is_full(_MQ) -> false. len(#mqueue{len = Len}) -> Len. +max_len(#mqueue{max_len= MaxLen}) -> MaxLen. + %%------------------------------------------------------------------------------ %% @doc Queue one message. %% @end diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index b3ccc122d..bc3d8f93e 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -173,7 +173,8 @@ set_config([{AppName, Envs} | Config]) -> start_app(App, SuccFun) -> case application:ensure_all_started(App) of {ok, Started} -> - lager:info("Started Apps: ~p, load plugin ~p successfully", [Started, App]), + lager:info("Started Apps: ~p", [Started]), + lager:info("Load plugin ~p successfully", [App]), SuccFun(App), {ok, Started}; {error, {ErrApp, Reason}} -> diff --git a/src/emqttd_qos.erl b/src/emqttd_qos.erl deleted file mode 100644 index 49e863e17..000000000 --- a/src/emqttd_qos.erl +++ /dev/null @@ -1,47 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd Qos Functions. -%%% -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_qos). - --include("emqttd_protocol.hrl"). - --export([a/1, i/1]). - -a(?QOS_0) -> qos0; -a(?QOS_1) -> qos1; -a(?QOS_2) -> qos2; -a(qos0) -> qos0; -a(qos1) -> qos1; -a(qos2) -> qos2. - -i(?QOS_0) -> ?QOS_0; -i(?QOS_1) -> ?QOS_1; -i(?QOS_2) -> ?QOS_2; -i(qos0) -> ?QOS_0; -i(qos1) -> ?QOS_1; -i(qos2) -> ?QOS_2. - - diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index f4f6d1c2d..38ab89a27 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -126,6 +126,10 @@ expired_after = 172800, expired_timer, + + collect_interval, + + collect_timer, timestamp}). @@ -231,8 +235,11 @@ init([CleanSess, ClientId, ClientPid]) -> await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv), max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv), expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600, + collect_interval = emqttd_opts:g(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, - {ok, Session, hibernate}. + emqttd_sm:register_session(CleanSess, ClientId, info(Session)), + %% start statistics + {ok, start_collector(Session), hibernate}. handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> @@ -482,6 +489,10 @@ handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = Clie noreply(Session) end; +handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) -> + emqttd_sm:register_session(CleanSess, ClientId, info(Session)), + {noreply, start_collector(Session), hibernate}; + handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, client_pid = ClientPid}) -> {stop, normal, Session}; @@ -510,8 +521,8 @@ handle_info(Info, Session = #session{client_id = ClientId}) -> lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]), {noreply, Session}. -terminate(_Reason, _Session) -> - ok. +terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) -> + emqttd_sm:unregister_session(CleanSess, ClientId). code_change(_OldVsn, Session, _Extra) -> {ok, Session}. @@ -629,3 +640,30 @@ cancel_timer(Ref) -> noreply(State) -> {noreply, State, hibernate}. +start_collector(Session = #session{collect_interval = 0}) -> + Session; + +start_collector(Session = #session{collect_interval = Interval}) -> + TRef = erlang:send_after(Interval * 1000, self(), collect_info), + Session#session{collect_timer = TRef}. + +info(#session{clean_sess = CleanSess, + subscriptions = Subscriptions, + inflight_queue = InflightQueue, + max_inflight = MaxInflight, + message_queue = MessageQueue, + awaiting_rel = AwaitingRel, + awaiting_ack = AwaitingAck, + awaiting_comp = AwaitingComp, + timestamp = CreatedAt}) -> + [{pid, self()}, + {clean_sess, CleanSess}, + {subscriptions, Subscriptions}, + {max_inflight, MaxInflight}, + {inflight_queue, length(InflightQueue)}, + {message_queue, emqttd_mqueue:len(MessageQueue)}, + {awaiting_rel, maps:size(AwaitingRel)}, + {awaiting_ack, maps:size(AwaitingAck)}, + {awaiting_comp, maps:size(AwaitingComp)}, + {created_at, CreatedAt}]. + diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 5becbba1e..f1e3ae567 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -42,6 +42,8 @@ -export([start_session/2, lookup_session/1]). +-export([register_session/3, unregister_session/2]). + -behaviour(gen_server). %% gen_server Function Exports @@ -57,6 +59,7 @@ %%%============================================================================= mnesia(boot) -> + %% global session... ok = emqttd_mnesia:create_table(session, [ {type, ordered_set}, {ram_copies, [node()]}, @@ -107,6 +110,34 @@ lookup_session(ClientId) -> [] -> undefined end. +%%------------------------------------------------------------------------------ +%% @doc Register a session with info. +%% @end +%%------------------------------------------------------------------------------ +-spec register_session(CleanSess, ClientId, Info) -> ok when + CleanSess :: boolean(), + ClientId :: binary(), + Info :: [tuple()]. +register_session(true, ClientId, Info) -> + ets:insert(mqtt_transient_session, {ClientId, Info}); + +register_session(false, ClientId, Info) -> + SM = gproc_pool:pick_worker(?SM_POOL, ClientId), + gen_server:cast(SM, {register, ClientId, Info}). + +%%------------------------------------------------------------------------------ +%% @doc Unregister a session. +%% @end +%%------------------------------------------------------------------------------ +-spec unregister_session(CleanSess, ClientId) -> ok when + CleanSess :: boolean(), + ClientId :: binary(). +unregister_session(true, ClientId) -> + ets:delete(mqtt_transient_session, ClientId); +unregister_session(false, ClientId) -> + SM = gproc_pool:pick_worker(?SM_POOL, ClientId), + gen_server:cast(SM, {unregister, ClientId}). + call(SM, Req) -> gen_server:call(SM, Req, infinity). %%%============================================================================= @@ -143,7 +174,17 @@ handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> handle_call(_Request, _From, State) -> {reply, ok, State}. -handle_cast(_Msg, State) -> +%% persistent session +handle_cast({register, ClientId, Info}, State) -> + ets:insert(mqtt_persistent_session, {ClientId, Info}), + {noreply, setstats(State)}; + +handle_cast({unregister, ClientId}, State) -> + ets:delete(mqtt_persistent_session, ClientId), + {noreply, setstats(State)}; + +handle_cast(Msg, State) -> + lager:critical("Unexpected Msg: ~p", [Msg]), {noreply, State}. handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) -> @@ -275,7 +316,6 @@ remove_session(Session) -> mnesia:delete_object(session, Session, write) end). -setstats(State = #state{statsfun = _StatsFun}) -> - State. - +setstats(State = #state{statsfun = StatsFun}) -> + StatsFun(ets:info(mqtt_persistent_session, size)), State. diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index a597d6dad..468016795 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -43,6 +43,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> + init_session_ets(), Schedulers = erlang:system_info(schedulers), gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), @@ -55,3 +56,8 @@ init([]) -> end, lists:seq(1, Schedulers)), {ok, {{one_for_all, 10, 100}, Children}}. +init_session_ets() -> + Tables = [mqtt_transient_session, mqtt_persistent_session], + Attrs = [ordered_set, named_table, public, {write_concurrency, true}], + lists:foreach(fun(Tab) -> ets:new(Tab, Attrs) end, Tables). +