diff --git a/.gitignore b/.gitignore index 0d3edca80..15f67b8e0 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,5 @@ test/ebin/*.beam plugins/*/ebin log/ *.swp +*.so +examples diff --git a/.gitmodules b/.gitmodules index 61767a9fb..2d15edfc9 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "tests/org.eclipse.paho.mqtt.testing"] path = tests/org.eclipse.paho.mqtt.testing url = git://git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.testing.git +[submodule "plugins/emqttd_dashboard"] + path = plugins/emqttd_dashboard + url = https://github.com/emqtt/emqttd_dashboard.git diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c1610db3..c32652458 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,15 @@ emqttd ChangeLog 0.8.1-alpha (2015-05-28) ------------------------- +Client [Presence](https://github.com/emqtt/emqttd/wiki/Presence) Support and [$SYS Topics](https://github.com/emqtt/emqttd/wiki/$SYS-Topics) Redesigned! + Bugfix: issue #138 - when client disconnected normally, broker will not publish disconnected $SYS message Bugfix: fix websocket url in emqttd/priv/www/websocket.html -Improve: issue #136 - $SYS topics result should not include $SYS messages +Improve: etc/emqttd.config to allow websocket connections from any hosts + +Improve: rel/reltool.config to exclude unnecessary apps. 0.8.0-alpha (2015-05-25) diff --git a/apps/emqtt/src/emqtt.app.src b/apps/emqtt/src/emqtt.app.src index ea5191f92..488ebbb8e 100644 --- a/apps/emqtt/src/emqtt.app.src +++ b/apps/emqtt/src/emqtt.app.src @@ -1,7 +1,7 @@ {application, emqtt, [ {description, "Erlang MQTT Common Library"}, - {vsn, "0.8.0"}, + {vsn, "0.8.1"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqttd/include/emqttd.hrl b/apps/emqttd/include/emqttd.hrl index 9c2ab934a..63ca9a047 100644 --- a/apps/emqttd/include/emqttd.hrl +++ b/apps/emqttd/include/emqttd.hrl @@ -77,9 +77,13 @@ %% MQTT Client %%------------------------------------------------------------------------------ -record(mqtt_client, { - clientid :: binary(), + clientid :: binary() | undefined, username :: binary() | undefined, - ipaddr :: inet:ip_address() + ipaddress :: inet:ip_address(), + client_pid :: pid(), + client_mon :: reference(), + clean_sess :: boolean(), + proto_ver :: 3 | 4 }). -type mqtt_client() :: #mqtt_client{}. diff --git a/apps/emqttd/include/emqttd_systop.hrl b/apps/emqttd/include/emqttd_systop.hrl index d2c929273..9308ba682 100644 --- a/apps/emqttd/include/emqttd_systop.hrl +++ b/apps/emqttd/include/emqttd_systop.hrl @@ -43,8 +43,6 @@ -define(SYSTOP_CLIENTS, [ 'clients/count', % clients connected current 'clients/max' % max clients connected - %'clients/connected', - %'clients/disconnected', ]). %%------------------------------------------------------------------------------ @@ -59,12 +57,12 @@ %% $SYS Topics for Subscribers %%------------------------------------------------------------------------------ -define(SYSTOP_PUBSUB, [ - 'queues/count', % ... - 'queues/max', % ... - 'topics/count', % ... + 'topics/count', % ... 'topics/max', % ... - 'subscribers/count', % ... - 'subscribers/max' % ... + 'subscribers/count', % ... + 'subscribers/max', % ... + 'queues/count', % ... + 'queues/max' % ... ]). %%------------------------------------------------------------------------------ diff --git a/apps/emqttd/src/emqttd.app.src b/apps/emqttd/src/emqttd.app.src index f598c13ba..b7459e519 100644 --- a/apps/emqttd/src/emqttd.app.src +++ b/apps/emqttd/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.8.0"}, + {vsn, "0.8.1"}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/apps/emqttd/src/emqttd_access_rule.erl b/apps/emqttd/src/emqttd_access_rule.erl index 8f7761822..a492136cf 100644 --- a/apps/emqttd/src/emqttd_access_rule.erl +++ b/apps/emqttd/src/emqttd_access_rule.erl @@ -114,9 +114,9 @@ match_who(#mqtt_client{clientid = ClientId}, {client, ClientId}) -> true; match_who(#mqtt_client{username = Username}, {user, Username}) -> true; -match_who(#mqtt_client{ipaddr = undefined}, {ipaddr, _Tup}) -> +match_who(#mqtt_client{ipaddress = undefined}, {ipaddr, _Tup}) -> false; -match_who(#mqtt_client{ipaddr = IP}, {ipaddr, {_CDIR, Start, End}}) -> +match_who(#mqtt_client{ipaddress = IP}, {ipaddr, {_CDIR, Start, End}}) -> I = esockd_access:atoi(IP), I >= Start andalso I =< End; match_who(_Client, _Who) -> diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 5a8daf60f..138808b08 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -68,8 +68,7 @@ print_vsn() -> ?PRINT("~s ~s is running now~n", [Desc, Vsn]). start_servers(Sup) -> - Servers = [{"emqttd event", emqttd_event}, - {"emqttd trace", emqttd_trace}, + Servers = [{"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd session manager", {supervisor, emqttd_sm_sup}}, diff --git a/apps/emqttd/src/emqttd_auth_clientid.erl b/apps/emqttd/src/emqttd_auth_clientid.erl index 2f3d51617..e5171a239 100644 --- a/apps/emqttd/src/emqttd_auth_clientid.erl +++ b/apps/emqttd/src/emqttd_auth_clientid.erl @@ -101,10 +101,10 @@ init(Opts) -> check(#mqtt_client{clientid = undefined}, _Password, []) -> {error, "ClientId undefined"}; -check(#mqtt_client{clientid = ClientId, ipaddr = IpAddr}, _Password, []) -> - check_clientid_only(ClientId, IpAddr); -check(#mqtt_client{clientid = ClientId, ipaddr = IpAddr}, _Password, [{password, no}|_]) -> - check_clientid_only(ClientId, IpAddr); +check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, []) -> + check_clientid_only(ClientId, IpAddress); +check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, [{password, no}|_]) -> + check_clientid_only(ClientId, IpAddress); check(_Client, undefined, [{password, yes}|_]) -> {error, "Password undefined"}; check(#mqtt_client{clientid = ClientId}, Password, [{password, yes}|_]) -> diff --git a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.erl b/apps/emqttd/src/emqttd_auth_ldap.erl similarity index 100% rename from plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.erl rename to apps/emqttd/src/emqttd_auth_ldap.erl diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index bff331d9c..526d824b4 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -39,6 +39,9 @@ %% API Function Exports -export([start_link/0]). +%% Running nodes +-export([running_nodes/0]). + %% Event API -export([subscribe/1, notify/2]). @@ -71,6 +74,13 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +%%------------------------------------------------------------------------------ +%% @doc Get running nodes +%% @end +%%------------------------------------------------------------------------------ +running_nodes() -> + mnesia:system_info(running_db_nodes). + %%------------------------------------------------------------------------------ %% @doc Subscribe broker event %% @end @@ -205,6 +215,7 @@ init([]) -> random:seed(now()), ets:new(?BROKER_TAB, [set, public, named_table]), % Create $SYS Topics + emqttd_pubsub:create(<<"$SYS/brokers">>), [ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS], % Tick {ok, #state{started_at = os:timestamp(), tick_tref = start_tick(tick)}, hibernate}. @@ -244,6 +255,7 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info(tick, State) -> + retain(brokers), retain(version, list_to_binary(version())), retain(sysdescr, list_to_binary(sysdescr())), publish(uptime, list_to_binary(uptime(State))), @@ -266,6 +278,10 @@ code_change(_OldVsn, State, _Extra) -> create_topic(Topic) -> emqttd_pubsub:create(emqtt_topic:systop(Topic)). +retain(brokers) -> + Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")), + publish(#mqtt_message{retain = true, topic = <<"$SYS/brokers">>, payload = Payload}). + retain(Topic, Payload) when is_binary(Payload) -> publish(#mqtt_message{retain = true, topic = emqtt_topic:systop(Topic), diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 0c9e2413a..de5d65d45 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -87,22 +87,23 @@ handle_call(info, _From, State = #state{conn_name=ConnName, proto_state = ProtoState}) -> {reply, [{conn_name, ConnName} | emqttd_protocol:info(ProtoState)], State}; -handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. +handle_call(Req, _From, State = #state{peername = Peername}) -> + lager:critical("Client ~s: unexpected request - ~p",[emqttd_net:format(Peername), Req]), + {reply, {error, unsupported_request}, State}. -handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. +handle_cast(Msg, State = #state{peername = Peername}) -> + lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), + {noreply, State}. handle_info(timeout, State) -> stop({shutdown, timeout}, State); handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState, conn_name=ConnName}) -> - %% TODO: to... %% need transfer data??? %% emqttd_client:transfer(NewPid, Data), lager:error("Shutdown for duplicate clientid: ~s, conn:~s", - [emqttd_protocol:clientid(ProtoState), ConnName]), + [emqttd_protocol:clientid(ProtoState), ConnName]), stop({shutdown, duplicate_id}, State); %%TODO: ok?? @@ -158,17 +159,16 @@ handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive handle_info(Info, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]), - {stop, {badinfo, Info}, State}. + {noreply, State}. terminate(Reason, #state{peername = Peername, keepalive = KeepAlive, proto_state = ProtoState}) -> - lager:info("Client ~s: ~p terminated, reason: ~p~n", [emqttd_net:format(Peername), self(), Reason]), - notify(disconnected, Reason, ProtoState), + lager:info("Client ~s terminated, reason: ~p", [emqttd_net:format(Peername), Reason]), emqttd_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of {undefined, _} -> ok; {_, {shutdown, Error}} -> emqttd_protocol:shutdown(Error, ProtoState); - {_, Reason} -> + {_, Reason} -> emqttd_protocol:shutdown(Reason, ProtoState) end. @@ -231,7 +231,7 @@ control_throttle(State = #state{conn_state = Flow, {_, _} -> run_socket(State) end. -stop(Reason, State ) -> +stop(Reason, State) -> {stop, Reason, State}. received_stats(?PACKET(Type)) -> @@ -253,12 +253,3 @@ inc(?DISCONNECT) -> inc(_) -> ignore. -%%TODO: should be moved to emqttd_protocol... for event emitted when protocol shutdown... -notify(disconnected, _Reason, undefined) -> ingore; - -notify(disconnected, {shutdown, Reason}, ProtoState) -> - emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason}); - -notify(disconnected, Reason, ProtoState) -> - emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason}). - diff --git a/apps/emqttd/src/emqttd_cluster.erl b/apps/emqttd/src/emqttd_cluster.erl index 242661119..bd5a1a2ac 100644 --- a/apps/emqttd/src/emqttd_cluster.erl +++ b/apps/emqttd/src/emqttd_cluster.erl @@ -34,6 +34,7 @@ %% @doc Get running nodes %% @end %%------------------------------------------------------------------------------ +%%TODO: remove... running_nodes() -> mnesia:system_info(running_db_nodes). diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 7758f8ec0..1c3effa31 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -28,6 +28,8 @@ -author("Feng Lee "). +-include("emqttd.hrl"). + -behaviour(gen_server). -define(SERVER, ?MODULE). @@ -69,10 +71,10 @@ table() -> ?CLIENT_TAB. %% @doc Lookup client pid with clientId %% @end %%------------------------------------------------------------------------------ --spec lookup(ClientId :: binary()) -> pid() | undefined. +-spec lookup(ClientId :: binary()) -> mqtt_client() | undefined. lookup(ClientId) when is_binary(ClientId) -> case ets:lookup(?CLIENT_TAB, ClientId) of - [{_, Pid, _}] -> Pid; + [Client] -> Client; [] -> undefined end. @@ -80,10 +82,10 @@ lookup(ClientId) when is_binary(ClientId) -> %% @doc Register clientId with pid. %% @end %%------------------------------------------------------------------------------ --spec register(ClientId :: binary()) -> ok. -register(ClientId) when is_binary(ClientId) -> +-spec register(Client :: mqtt_client()) -> ok. +register(Client = #mqtt_client{clientid = ClientId}) -> CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), - gen_server:call(CmPid, {register, ClientId, self()}, infinity). + gen_server:call(CmPid, {register, Client}, infinity). %%------------------------------------------------------------------------------ %% @doc Unregister clientId with pid. @@ -102,18 +104,18 @@ init([Id, StatsFun]) -> gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}), {ok, #state{id = Id, statsfun = StatsFun}}. -handle_call({register, ClientId, Pid}, _From, State) -> +handle_call({register, Client = #mqtt_client{clientid = ClientId, client_pid = Pid}}, _From, State) -> case ets:lookup(?CLIENT_TAB, ClientId) of - [{_, Pid, _}] -> + [#mqtt_client{client_pid = Pid}] -> lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), ignore; - [{_, OldPid, MRef}] -> + [#mqtt_client{client_pid = OldPid, client_mon = MRef}] -> lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), OldPid ! {stop, duplicate_id, Pid}, erlang:demonitor(MRef), - ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}); + ets:insert(?CLIENT_TAB, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)}); [] -> - ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}) + ets:insert(?CLIENT_TAB, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)}) end, {reply, ok, setstats(State)}; @@ -123,7 +125,7 @@ handle_call(Req, _From, State) -> handle_cast({unregister, ClientId, Pid}, State) -> case ets:lookup(?CLIENT_TAB, ClientId) of - [{_, Pid, MRef}] -> + [#mqtt_client{client_pid = Pid, client_mon = MRef}] -> erlang:demonitor(MRef, [flush]), ets:delete(?CLIENT_TAB, ClientId); [_] -> @@ -136,8 +138,18 @@ handle_cast({unregister, ClientId, Pid}, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> - ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}), +handle_info({'DOWN', MRef, process, DownPid, Reason}, State) -> + case ets:match_object(?CLIENT_TAB, {mqtt_client, '$1', '_', '_', DownPid, MRef, '_', '_'}) of + [] -> + ignore; + Clients -> + lists:foreach( + fun(Client = #mqtt_client{clientid = ClientId}) -> + ets:delete_object(?CLIENT_TAB, Client), + lager:error("Client ~s is Down: ~p", [ClientId, Reason]), + emqttd_broker:foreach_hooks(client_disconnected, [Reason, ClientId]) + end, Clients) + end, {noreply, setstats(State)}; handle_info(_Info, State) -> @@ -156,3 +168,4 @@ code_change(_OldVsn, State, _Extra) -> setstats(State = #state{statsfun = StatsFun}) -> StatsFun(ets:info(?CLIENT_TAB, size)), State. + diff --git a/apps/emqttd/src/emqttd_cm_sup.erl b/apps/emqttd/src/emqttd_cm_sup.erl index 53a338404..739fc08ec 100644 --- a/apps/emqttd/src/emqttd_cm_sup.erl +++ b/apps/emqttd/src/emqttd_cm_sup.erl @@ -42,7 +42,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - ets:new(emqttd_cm:table(), [set, named_table, public, + ets:new(emqttd_cm:table(), [set, named_table, public, {keypos, 2}, {write_concurrency, true}]), Schedulers = erlang:system_info(schedulers), gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]), diff --git a/apps/emqttd/src/emqttd_event.erl b/apps/emqttd/src/emqttd_event.erl deleted file mode 100644 index e226de5de..000000000 --- a/apps/emqttd/src/emqttd_event.erl +++ /dev/null @@ -1,122 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd event manager. -%%% -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_event). - --author("Feng Lee "). - --include_lib("emqtt/include/emqtt.hrl"). - -%% API Function Exports --export([start_link/0, add_handler/2, notify/1]). - -%% gen_event Function Exports --export([init/1, handle_event/2, handle_call/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {systop}). - -%%------------------------------------------------------------------------------ -%% @doc Start event manager -%% @end -%%------------------------------------------------------------------------------ --spec start_link() -> {ok, pid()} | {error, any()}. -start_link() -> - case gen_event:start_link({local, ?MODULE}) of - {ok, Pid} -> - add_handler(?MODULE, []), - {ok, Pid}; - {error, Reason} -> - {error, Reason} - end. - -add_handler(Handler, Args) -> - gen_event:add_handler(?MODULE, Handler, Args). - -notify(Event) -> - gen_event:notify(?MODULE, Event). - -%%%============================================================================= -%%% gen_event callbacks -%%%============================================================================= - -init([]) -> - SysTop = list_to_binary(lists:concat(["$SYS/brokers/", node(), "/"])), - {ok, #state{systop = SysTop}}. - -handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) -> - Topic = <>, - Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)}, - emqttd_pubsub:publish(event, Msg), - {ok, State}; - -%%TODO: Protect from undefined clientId... -handle_event({disconnected, undefined, Reason}, State = #state{systop = SysTop}) -> - {ok, State}; - -handle_event({disconnected, ClientId, Reason}, State = #state{systop = SysTop}) -> - Topic = <>, - Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)}, - emqttd_pubsub:publish(event, Msg), - {ok, State}; - -handle_event({subscribed, ClientId, TopicTable}, State) -> - lager:error("TODO: subscribed ~s, ~p", [ClientId, TopicTable]), - {ok, State}; - -handle_event({unsubscribed, ClientId, Topics}, State) -> - lager:error("TODO: unsubscribed ~s, ~p", [ClientId, Topics]), - {ok, State}; - -handle_event(_Event, State) -> - {ok, State}. - -handle_call(_Request, State) -> - Reply = ok, - {ok, Reply, State}. - -handle_info(_Info, State) -> - {ok, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%============================================================================= -%%% Internal functions -%%%============================================================================= - -payload(connected, Params) -> - From = proplists:get_value(from, Params), - Proto = proplists:get_value(protocol, Params), - Sess = proplists:get_value(session, Params), - iolist_to_binary(io_lib:format("from: ~s~nprotocol: ~p~nsession: ~s", [From, Proto, Sess])); - -payload(disconnected, Reason) -> - list_to_binary(io_lib:format("reason: ~p", [Reason])). - diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index e4a7fea15..550d5fcd7 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -163,7 +163,7 @@ init([]) -> % Init metrics [create_metric(Metric) || Metric <- Metrics], % $SYS Topics for metrics - [ok = create_topic(Topic) || {_, Topic} <- Metrics], + [ok = emqttd_pubsub:create(metric_topic(Topic)) || {_, Topic} <- Metrics], % Tick to publish metrics {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. @@ -192,7 +192,7 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= publish(Metric, Val) -> - emqttd_pubsub:publish(metrics, #mqtt_message{topic = emqtt_topic:systop(Metric), + emqttd_pubsub:publish(metrics, #mqtt_message{topic = metric_topic(Metric), payload = emqttd_util:integer_to_binary(Val)}). create_metric({gauge, Name}) -> @@ -202,7 +202,7 @@ create_metric({counter, Name}) -> Schedulers = lists:seq(1, erlang:system_info(schedulers)), [ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers]. -create_topic(Topic) -> - emqttd_pubsub:create(emqtt_topic:systop(Topic)). +metric_topic(Metric) -> + emqtt_topic:systop(list_to_binary(lists:concat(['metrics/', Metric]))). diff --git a/apps/emqttd/src/emqttd_mod_autosub.erl b/apps/emqttd/src/emqttd_mod_autosub.erl index 0ed1be5e1..a28db1642 100644 --- a/apps/emqttd/src/emqttd_mod_autosub.erl +++ b/apps/emqttd/src/emqttd_mod_autosub.erl @@ -29,22 +29,32 @@ -author("Feng Lee "). +-include_lib("emqtt/include/emqtt.hrl"). + +-include_lib("emqtt/include/emqtt_packet.hrl"). + +-include("emqttd.hrl"). + -behaviour(emqttd_gen_mod). --export([load/1, subscribe/2, unload/1]). +-export([load/1, client_connected/3, unload/1]). -record(state, {topics}). load(Opts) -> Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2], - emqttd_broker:hook(client_connected, {?MODULE, subscribe}, - {?MODULE, subscribe, [Topics]}), + emqttd_broker:hook(client_connected, {?MODULE, client_connected}, + {?MODULE, client_connected, [Topics]}), {ok, #state{topics = Topics}}. -subscribe({Client, ClientId}, Topics) -> +client_connected(?CONNACK_ACCEPT, #mqtt_client{clientid = ClientId, client_pid = ClientPid}, Topics) -> F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end, - [Client ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics]. + [ClientPid ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics]; + +client_connected(_ConnAck, _Client, _Topics) -> + ignore. unload(_Opts) -> - emqttd_broker:unhook(client_connected, {?MODULE, subscribe}). + emqttd_broker:unhook(client_connected, {?MODULE, client_connected}). + diff --git a/apps/emqttd/src/emqttd_mod_presence.erl b/apps/emqttd/src/emqttd_mod_presence.erl new file mode 100644 index 000000000..981cb4350 --- /dev/null +++ b/apps/emqttd/src/emqttd_mod_presence.erl @@ -0,0 +1,81 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd presence management module. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_mod_presence). + +-include_lib("emqtt/include/emqtt.hrl"). + +-include("emqttd.hrl"). + +-export([load/1, unload/1]). + +-export([client_connected/3, client_disconnected/3]). + +load(Opts) -> + emqttd_broker:hook(client_connected, {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}), + emqttd_broker:hook(client_disconnected, {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}), + {ok, Opts}. + +client_connected(ConnAck, #mqtt_client{clientid = ClientId, + username = Username, + ipaddress = IpAddress, + clean_sess = CleanSess, + proto_ver = ProtoVer}, Opts) -> + Sess = case CleanSess of + true -> false; + false -> true + end, + Json = mochijson2:encode([{username, Username}, + {ipaddress, list_to_binary(emqttd_net:ntoa(IpAddress))}, + {session, Sess}, + {protocol, ProtoVer}, + {connack, ConnAck}, + {ts, emqttd_vm:timestamp()}]), + Message = #mqtt_message{qos = proplists:get_value(qos, Opts, 0), + topic = topic(connected, ClientId), + payload = iolist_to_binary(Json)}, + emqttd_pubsub:publish(presence, Message). + +client_disconnected(Reason, ClientId, Opts) -> + Json = mochijson2:encode([{reason, reason(Reason)}, {ts, emqttd_vm:timestamp()}]), + emqttd_pubsub:publish(presence, #mqtt_message{qos = proplists:get_value(qos, Opts, 0), + topic = topic(disconnected, ClientId), + payload = iolist_to_binary(Json)}). + +unload(_Opts) -> + emqttd_broker:unhook(client_connected, {?MODULE, client_connected}), + emqttd_broker:unhook(client_disconnected, {?MODULE, client_disconnected}). + + +topic(connected, ClientId) -> + emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/connected"])); +topic(disconnected, ClientId) -> + emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])). + +reason(Reason) when is_atom(Reason) -> Reason; +reason({Error, _}) when is_atom(Error) -> Error; +reason(_) -> internal_error. + diff --git a/apps/emqttd/src/emqttd_net.erl b/apps/emqttd/src/emqttd_net.erl index 0b0517f6b..8488fe76e 100644 --- a/apps/emqttd/src/emqttd_net.erl +++ b/apps/emqttd/src/emqttd_net.erl @@ -32,7 +32,7 @@ -export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]). --export([peername/1, sockname/1, format/2, format/1, connection_string/2]). +-export([peername/1, sockname/1, format/2, format/1, connection_string/2, ntoa/1]). -define(FIRST_TEST_BIND_PORT, 10000). diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 08b24674f..d519a182e 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -29,19 +29,18 @@ -author("Feng Lee "). -include_lib("emqtt/include/emqtt.hrl"). + -include_lib("emqtt/include/emqtt_packet.hrl"). -include("emqttd.hrl"). %% API --export([init/3, clientid/1]). +-export([init/3, info/1, clientid/1, client/1]). -export([received/2, send/2, redeliver/2, shutdown/2]). -export([handle/2]). --export([info/1]). - %% Protocol State -record(proto_state, { peername, @@ -49,30 +48,29 @@ connected = false, %received CONNECT action? proto_ver, proto_name, - %packet_id, username, clientid, clean_sess, - session, %% session state or session pid + session, %% session state or session pid will_msg, - max_clientid_len = ?MAX_CLIENTID_LEN + max_clientid_len = ?MAX_CLIENTID_LEN, + client_pid }). -type proto_state() :: #proto_state{}. +%%------------------------------------------------------------------------------ +%% @doc Init protocol +%% @end +%%------------------------------------------------------------------------------ init(Peername, SendFun, Opts) -> MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), #proto_state{ peername = Peername, sendfun = SendFun, - max_clientid_len = MaxLen}. + max_clientid_len = MaxLen, + client_pid = self()}. -clientid(#proto_state{clientid = ClientId}) -> ClientId. - -client(#proto_state{peername = {Addr, _Port}, clientid = ClientId, username = Username}) -> - #mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr}. - -%%SHOULD be registered in emqttd_cm info(#proto_state{proto_ver = ProtoVer, proto_name = ProtoName, clientid = ClientId, @@ -80,11 +78,27 @@ info(#proto_state{proto_ver = ProtoVer, will_msg = WillMsg}) -> [{proto_ver, ProtoVer}, {proto_name, ProtoName}, - {clientid, ClientId}, + {clientid, ClientId}, {clean_sess, CleanSess}, {will_msg, WillMsg}]. -%%CONNECT – Client requests a connection to a Server +clientid(#proto_state{clientid = ClientId}) -> + ClientId. + +client(#proto_state{peername = {Addr, _Port}, + clientid = ClientId, + username = Username, + clean_sess = CleanSess, + proto_ver = ProtoVer, + client_pid = Pid}) -> + #mqtt_client{clientid = ClientId, + username = Username, + ipaddress = Addr, + clean_sess = CleanSess, + proto_ver = ProtoVer, + client_pid = Pid}. + +%% CONNECT – Client requests a connection to a Server %%A Client can only send the CONNECT Packet once over a Network Connection. -spec received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}. @@ -107,42 +121,45 @@ received(Packet = ?PACKET(_Type), State) -> {error, Reason, State} end. -handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = {Addr, _}}) -> +handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) -> #mqtt_packet_connect{proto_ver = ProtoVer, + proto_name = ProtoName, username = Username, password = Password, clean_sess = CleanSess, keep_alive = KeepAlive, - clientid = ClientId} = Var, + clientid = ClientId} = Var, - trace(recv, Packet, State#proto_state{clientid = ClientId}), %%TODO: fix later... + State1 = State0#proto_state{proto_ver = ProtoVer, + proto_name = ProtoName, + username = Username, + clientid = ClientId, + clean_sess = CleanSess}, - State1 = State#proto_state{proto_ver = ProtoVer, - username = Username, - clientid = ClientId, - clean_sess = CleanSess}, - {ReturnCode1, State2} = - case validate_connect(Var, State) of + trace(recv, Packet, State1), + + {ReturnCode1, State3} = + case validate_connect(Var, State1) of ?CONNACK_ACCEPT -> - Client = #mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr}, - case emqttd_access_control:auth(Client, Password) of + case emqttd_access_control:auth(client(State1), Password) of ok -> - %% Generate one if null - ClientId1 = clientid(ClientId, State), - %% Register clientId - emqttd_cm:register(ClientId1), + %% Generate clientId if null + State2 = State1#proto_state{clientid = clientid(ClientId, State1)}, + + %% Register the client to cm + emqttd_cm:register(client(State2)), + %%Starting session - {ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}), + {ok, Session} = emqttd_session:start({CleanSess, clientid(State2), self()}), + %% Start keepalive start_keepalive(KeepAlive), - %% Run hooks - emqttd_broker:foreach_hooks(client_connected, [{self(), ClientId1}]), - {?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1, - session = Session, - will_msg = willmsg(Var)}}; + + %% ACCEPT + {?CONNACK_ACCEPT, State2#proto_state{session = Session, will_msg = willmsg(Var)}}; {error, Reason}-> - lager:error("~s@~s: username '~s' login failed - ~s", + lager:error("~s@~s: username '~s', login failed - ~s", [ClientId, emqttd_net:format(Peername), Username, Reason]), {?CONNACK_CREDENTIALS, State1} @@ -150,9 +167,10 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = ReturnCode -> {ReturnCode, State1} end, - %%TODO: this is not right... - notify(connected, ReturnCode1, State2), - send(?CONNACK_PACKET(ReturnCode1), State2); + %% Run hooks + emqttd_broker:foreach_hooks(client_connected, [ReturnCode1, client(State3)]), + %% Send connack + send(?CONNACK_PACKET(ReturnCode1), State3); handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> @@ -251,7 +269,6 @@ send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) -> %% message from session send({_From = SessPid, Message}, State = #proto_state{session = SessPid}) when is_pid(SessPid) -> send(emqtt_message:to_packet(Message), State); - %% message(qos1, qos2) not from session send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = Session}) when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> @@ -279,12 +296,24 @@ trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> redeliver({?PUBREL, PacketId}, State) -> send(?PUBREL_PACKET(PacketId), State). +shutdown(duplicate_id, _State) -> + quiet; %% + +shutdown(_, #proto_state{clientid = undefined}) -> + ignore; + +shutdown(normal, #proto_state{peername = Peername, clientid = ClientId}) -> + lager:info([{client, ClientId}], "Client ~s@~s: normal shutdown", + [ClientId, emqttd_net:format(Peername)]), + try_unregister(ClientId), + emqttd_broker:foreach_hooks(client_disconnected, [normal, ClientId]); + shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) -> - send_willmsg(ClientId, WillMsg), - try_unregister(ClientId, self()), - lager:info([{client, ClientId}], "Protocol ~s@~s Shutdown: ~p", + lager:info([{client, ClientId}], "Protocol ~s@~s: Shutdown for ~p", [ClientId, emqttd_net:format(Peername), Error]), - ok. + send_willmsg(ClientId, WillMsg), + try_unregister(ClientId), + emqttd_broker:foreach_hooks(client_disconnected, [Error, ClientId]). willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> emqtt_message:from_packet(Packet). @@ -377,8 +406,8 @@ validate_qos(undefined) -> true; validate_qos(Qos) when Qos =< ?QOS_2 -> true; validate_qos(_) -> false. -try_unregister(undefined, _) -> ok; -try_unregister(ClientId, _) -> emqttd_cm:unregister(ClientId). +try_unregister(undefined) -> ok; +try_unregister(ClientId) -> emqttd_cm:unregister(ClientId). %% publish ACL is cached in process dictionary. check_acl(publish, Topic, State) -> @@ -411,18 +440,3 @@ inc(?PINGRESP) -> inc(_) -> ingore. -notify(connected, ReturnCode, #proto_state{peername = Peername, - proto_ver = ProtoVer, - clientid = ClientId, - clean_sess = CleanSess}) -> - Sess = case CleanSess of - true -> false; - false -> true - end, - Params = [{from, emqttd_net:format(Peername)}, - {protocol, ProtoVer}, - {session, Sess}, - {connack, ReturnCode}], - emqttd_event:notify({connected, ClientId, Params}). - - diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 454363f11..15a465d4d 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -360,7 +360,7 @@ handle_info({dispatch, {_From, Message}}, State) -> handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = ClientId, client_pid = ClientPid}) -> - lager:error("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]), + lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]), {noreply, start_expire_timer(State#session_state{client_pid = undefined})}; handle_info({'EXIT', ClientPid0, _Reason}, State = #session_state{client_pid = ClientPid}) -> diff --git a/apps/emqttd/src/emqttd_stats.erl b/apps/emqttd/src/emqttd_stats.erl index d55a5942c..0b5e65522 100644 --- a/apps/emqttd/src/emqttd_stats.erl +++ b/apps/emqttd/src/emqttd_stats.erl @@ -126,7 +126,7 @@ init([]) -> Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB, [ets:insert(?STATS_TAB, {Topic, 0}) || Topic <- Topics], % Create $SYS Topics - [ok = emqttd_pubsub:create(emqtt_topic:systop(Topic)) || Topic <- Topics], + [ok = emqttd_pubsub:create(stats_topic(Topic)) || Topic <- Topics], % Tick to publish stats {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. @@ -154,6 +154,9 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= publish(Stat, Val) -> - emqttd_pubsub:publish(stats, #mqtt_message{topic = emqtt_topic:systop(Stat), + emqttd_pubsub:publish(stats, #mqtt_message{topic = stats_topic(Stat), payload = emqttd_util:integer_to_binary(Val)}). +stats_topic(Stat) -> + emqtt_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))). + diff --git a/apps/emqttd/src/emqttd_vm.erl b/apps/emqttd/src/emqttd_vm.erl index 1e4624a03..6cc9f7c50 100644 --- a/apps/emqttd/src/emqttd_vm.erl +++ b/apps/emqttd/src/emqttd_vm.erl @@ -24,12 +24,104 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_vm). -author("Feng Lee "). +-export([timestamp/0, microsecs/0]). + -export([loads/0]). +-define(SYSTEM_INFO, [ + allocated_areas, + allocator, + alloc_util_allocators, + build_type, + check_io, + compat_rel, + creation, + debug_compiled, + dist, + dist_ctrl, + driver_version, + elib_malloc, + dist_buf_busy_limit, + %fullsweep_after, % included in garbage_collection + garbage_collection, + %global_heaps_size, % deprecated + heap_sizes, + heap_type, + info, + kernel_poll, + loaded, + logical_processors, + logical_processors_available, + logical_processors_online, + machine, + %min_heap_size, % included in garbage_collection + %min_bin_vheap_size, % included in garbage_collection + modified_timing_level, + multi_scheduling, + multi_scheduling_blockers, + otp_release, + port_count, + process_count, + process_limit, + scheduler_bind_type, + scheduler_bindings, + scheduler_id, + schedulers, + schedulers_online, + smp_support, + system_version, + system_architecture, + threads, + thread_pool_size, + trace_control_word, + update_cpu_info, + version, + wordsize + ]). + +-define(SOCKET_OPTS, [ + active, + broadcast, + delay_send, + dontroute, + exit_on_close, + header, + keepalive, + nodelay, + packet, + packet_size, + read_packets, + recbuf, + reuseaddr, + send_timeout, + send_timeout_close, + sndbuf, + priority, + tos + ]). + + + +-export([loads/0, + get_system_info/0, + % get_statistics/0, + % get_process_info/0, + get_ports_info/0, + get_ets_info/0]). + +timestamp() -> + {MegaSecs, Secs, _MicroSecs} = os:timestamp(), + MegaSecs * 1000000 + Secs. + +microsecs() -> + {Mega, Sec, Micro} = erlang:now(), + (Mega * 1000000 + Sec) * 1000000 + Micro. + loads() -> [{load1, ftos(cpu_sup:avg1()/256)}, {load5, ftos(cpu_sup:avg5()/256)}, @@ -38,3 +130,142 @@ loads() -> ftos(F) -> [S] = io_lib:format("~.2f", [F]), S. +get_system_info() -> + [{Key, format_system_info(Key, get_system_info(Key))} || Key <- ?SYSTEM_INFO]. + +get_system_info(Key) -> + try erlang:system_info(Key) catch + error:badarg->undefined + end. + +%% conversion functions for erlang:system_info(Key) + +format_system_info(allocated_areas, List) -> + [convert_allocated_areas(Value) || Value <- List]; +format_system_info(allocator, {_,_,_,List}) -> + List; +format_system_info(dist_ctrl, List) -> + lists:map(fun({Node, Socket}) -> + {ok, Stats} = inet:getstat(Socket), + {Node, Stats} + end, List); +format_system_info(driver_version, Value) -> + list_to_binary(Value); +format_system_info(machine, Value) -> + list_to_binary(Value); +format_system_info(otp_release, Value) -> + list_to_binary(Value); +format_system_info(scheduler_bindings, Value) -> + tuple_to_list(Value); +format_system_info(system_version, Value) -> + list_to_binary(Value); +format_system_info(system_architecture, Value) -> + list_to_binary(Value); +format_system_info(version, Value) -> + list_to_binary(Value); +format_system_info(_, Value) -> + Value. + +convert_allocated_areas({Key, Value1, Value2}) -> + {Key, [Value1, Value2]}; +convert_allocated_areas({Key, Value}) -> + {Key, Value}. + + +get_ports_info()-> + [{pid_port_fun_to_atom(Port), get_port_info(Port)} || Port <- erlang:ports()]. + +get_port_info(Port) -> + Stat = get_socket_getstat(Port), + SockName = get_socket_sockname(Port), + Opts = get_socket_opts(Port), + Protocol = get_socket_protocol(Port), + Status = get_socket_status(Port), + Type = get_socket_type(Port), + + lists:flatten(lists:append([ + Stat, + SockName, + Opts, + Protocol, + Status, + Type + ])). + +get_socket_getstat(Socket) -> + case catch inet:getstat(Socket) of + {ok, Info} -> + Info; + _ -> + [] + end. + +get_socket_sockname(Socket) -> + case catch inet:sockname(Socket) of + {ok, {Ip, Port}} -> + [{ip, ip_to_binary(Ip)}, {port, Port}]; + _ -> + [] + end. + +ip_to_binary(Tuple) -> + iolist_to_binary(string:join(lists:map(fun integer_to_list/1, tuple_to_list(Tuple)), ".")). + + +get_socket_protocol(Socket) -> + case erlang:port_info(Socket, name) of + {name, "tcp_inet"} -> + [{protocol, tcp}]; + {name, "udp_inet"} -> + [{protocol, udp}]; + {name,"sctp_inet"} -> + [{protocol, sctp}]; + _ -> + [] + end. + +get_socket_status(Socket) -> + case catch prim_inet:getstatus(Socket) of + {ok, Status} -> + [{status, Status}]; + _ -> + [] + end. + +get_socket_type(Socket) -> + case catch prim_inet:gettype(Socket) of + {ok, Type} -> + [{type, tuple_to_list(Type)}]; + _ -> + [] + end. + +get_socket_opts(Socket) -> + [get_socket_opts(Socket, Key) || Key <- ?SOCKET_OPTS]. + +get_socket_opts(Socket, Key) -> + case catch inet:getopts(Socket, [Key]) of + {ok, Opt} -> + Opt; + _ -> + [] + end. + +get_ets_info() -> + [{Tab, get_ets_dets_info(ets, Tab)} || Tab <- ets:all()]. + +get_ets_dets_info(Type, Tab) -> + case Type:info(Tab) of + undefined -> []; + Entries when is_list(Entries) -> + [{Key, pid_port_fun_to_atom(Value)} || {Key, Value} <- Entries] + end. + +pid_port_fun_to_atom(Term) when is_pid(Term) -> + erlang:list_to_atom(pid_to_list(Term)); +pid_port_fun_to_atom(Term) when is_port(Term) -> + erlang:list_to_atom(erlang:port_to_list(Term)); +pid_port_fun_to_atom(Term) when is_function(Term) -> + erlang:list_to_atom(erlang:fun_to_list(Term)); +pid_port_fun_to_atom(Term) -> + Term. diff --git a/plugins/emqttd_amqp/src/emqttd_amqp.app.src b/plugins/emqttd_amqp/src/emqttd_amqp.app.src deleted file mode 100644 index 1664bee18..000000000 --- a/plugins/emqttd_amqp/src/emqttd_amqp.app.src +++ /dev/null @@ -1,12 +0,0 @@ -{application, emqttd_amqp, - [ - {description, ""}, - {vsn, "1"}, - {registered, []}, - {applications, [ - kernel, - stdlib - ]}, - {mod, { emqttd_amqp_app, []}}, - {env, []} - ]}. diff --git a/plugins/emqttd_amqp/src/emqttd_amqp_app.erl b/plugins/emqttd_amqp/src/emqttd_amqp_app.erl deleted file mode 100644 index 0087e7a7d..000000000 --- a/plugins/emqttd_amqp/src/emqttd_amqp_app.erl +++ /dev/null @@ -1,16 +0,0 @@ --module(emqttd_amqp_app). - --behaviour(application). - -%% Application callbacks --export([start/2, stop/1]). - -%% =================================================================== -%% Application callbacks -%% =================================================================== - -start(_StartType, _StartArgs) -> - emqttd_amqp_sup:start_link(). - -stop(_State) -> - ok. diff --git a/plugins/emqttd_amqp/src/emqttd_amqp_sup.erl b/plugins/emqttd_amqp/src/emqttd_amqp_sup.erl deleted file mode 100644 index 79ef00dff..000000000 --- a/plugins/emqttd_amqp/src/emqttd_amqp_sup.erl +++ /dev/null @@ -1,27 +0,0 @@ --module(emqttd_amqp_sup). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - -%% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). - -%% =================================================================== -%% API functions -%% =================================================================== - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% =================================================================== -%% Supervisor callbacks -%% =================================================================== - -init([]) -> - {ok, { {one_for_one, 5, 10}, []} }. - diff --git a/plugins/emqttd_auth_ldap/.placeholder b/plugins/emqttd_auth_ldap/.placeholder deleted file mode 100644 index e69de29bb..000000000 diff --git a/plugins/emqttd_auth_ldap/README.md b/plugins/emqttd_auth_ldap/README.md deleted file mode 100644 index 083b318ac..000000000 --- a/plugins/emqttd_auth_ldap/README.md +++ /dev/null @@ -1,23 +0,0 @@ -## Overview - -Authentication with LDAP. - -## Plugin Config - -``` - {emqttd_auth_ldap, [ - {servers, ["localhost"]}, - {port, 389}, - {timeout, 30}, - {user_dn, "uid=$u,ou=People,dc=example,dc=com"}, - {ssl, fasle}, - {sslopts, [ - {"certfile", "ssl.crt"}, - {"keyfile", "ssl.key"}]} - ]} -``` - -## Load Plugin - -Merge the'etc/plugin.config' to emqttd/etc/plugins.config, and the plugin will be loaded automatically. - diff --git a/plugins/emqttd_auth_ldap/etc/plugin.config b/plugins/emqttd_auth_ldap/etc/plugin.config deleted file mode 100644 index ac582d7d4..000000000 --- a/plugins/emqttd_auth_ldap/etc/plugin.config +++ /dev/null @@ -1,12 +0,0 @@ -[ - {emqttd_auth_ldap, [ - {servers, ["localhost"]}, - {port, 389}, - {timeout, 30}, - {user_dn, "uid=$u,ou=People,dc=example,dc=com"}, - {ssl, fasle}, - {sslopts, [ - {"certfile", "ssl.crt"}, - {"keyfile", "ssl.key"}]} - ]} -]. diff --git a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src deleted file mode 100644 index e699fdba3..000000000 --- a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src +++ /dev/null @@ -1,12 +0,0 @@ -{application, emqttd_auth_ldap, - [ - {description, "emqttd LDAP Authentication Plugin"}, - {vsn, "1.0"}, - {registered, []}, - {applications, [ - kernel, - stdlib - ]}, - {mod, { emqttd_auth_ldap_app, []}}, - {env, []} - ]}. diff --git a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl deleted file mode 100644 index 1cea23075..000000000 --- a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl +++ /dev/null @@ -1,58 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% LDAP Authentication Plugin. -%%% -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_auth_ldap_app). - --behaviour(application). -%% Application callbacks --export([start/2, prep_stop/1, stop/1]). - --behaviour(supervisor). -%% Supervisor callbacks --export([init/1]). - -%%%============================================================================= -%%% Application callbacks -%%%============================================================================= - -start(_StartType, _StartArgs) -> - Env = application:get_all_env(emqttd_auth_ldap), - emqttd_access_control:register_mod(auth, emqttd_auth_ldap, Env), - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -prep_stop(State) -> - emqttd_access_control:unregister_mod(auth, emqttd_auth_ldap), State. - -stop(_State) -> - ok. - -%%%============================================================================= -%%% Supervisor callbacks(Dummy) -%%%============================================================================= - -init([]) -> - {ok, { {one_for_one, 5, 10}, []} }. - diff --git a/plugins/emqttd_dashboard/src/emqttd_dashboard.app.src b/plugins/emqttd_dashboard/src/emqttd_dashboard.app.src deleted file mode 100644 index 12342ddc5..000000000 --- a/plugins/emqttd_dashboard/src/emqttd_dashboard.app.src +++ /dev/null @@ -1,12 +0,0 @@ -{application, emqttd_dashboard, - [ - {description, "emqttd management dashboard"}, - {vsn, "0.1"}, - {registered, []}, - {applications, [ - kernel, - stdlib - ]}, - {mod, {emqttd_dashboard_app, []}}, - {env, []} -]}. diff --git a/plugins/emqttd_dashboard/src/emqttd_dashboard.erl b/plugins/emqttd_dashboard/src/emqttd_dashboard.erl deleted file mode 100644 index fae297b16..000000000 --- a/plugins/emqttd_dashboard/src/emqttd_dashboard.erl +++ /dev/null @@ -1,38 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd management dashboard. -%%% -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_dashboard). - --author("Feng Lee "). - --export([handle_request/1]). - -%%TODO... - -handle_request(Req) -> - Req:ok("hello!"). - - diff --git a/plugins/emqttd_dashboard/src/emqttd_dashboard_app.erl b/plugins/emqttd_dashboard/src/emqttd_dashboard_app.erl deleted file mode 100644 index 8e0898679..000000000 --- a/plugins/emqttd_dashboard/src/emqttd_dashboard_app.erl +++ /dev/null @@ -1,27 +0,0 @@ --module(emqttd_dashboard_app). - --behaviour(application). - -%% Application callbacks --export([start/2, stop/1]). - -%% =================================================================== -%% Application callbacks -%% =================================================================== - -start(_StartType, _StartArgs) -> - {ok, Sup} = emqttd_dashboard_sup:start_link(), - open_listener(application:get_env(listener)), - {ok, Sup}. - -stop(_State) -> - ok. - -%% open http port -open_listener({_Http, Port, Options}) -> - MFArgs = {emqttd_dashboard, handle_request, []}, - mochiweb:start_http(Port, Options, MFArgs). - -close_listener(Port) -> - mochiweb:stop_http(Port). - diff --git a/plugins/emqttd_dashboard/src/emqttd_dashboard_sup.erl b/plugins/emqttd_dashboard/src/emqttd_dashboard_sup.erl deleted file mode 100644 index 895f00d83..000000000 --- a/plugins/emqttd_dashboard/src/emqttd_dashboard_sup.erl +++ /dev/null @@ -1,27 +0,0 @@ --module(emqttd_dashboard_sup). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - -%% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). - -%% =================================================================== -%% API functions -%% =================================================================== - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% =================================================================== -%% Supervisor callbacks -%% =================================================================== - -init([]) -> - {ok, { {one_for_one, 5, 10}, []} }. - diff --git a/plugins/emysql/src/emysql.erl b/plugins/emysql/src/emysql.erl index bc7cd352f..f6a620cfd 100644 --- a/plugins/emysql/src/emysql.erl +++ b/plugins/emysql/src/emysql.erl @@ -384,9 +384,15 @@ encode_where({like, Field, Value}) -> encode_where({'<', Field, Value}) -> atom_to_list(Field) ++ " < " ++ encode(Value); +encode_where({'<=', Field, Value}) -> + atom_to_list(Field) ++ " <= " ++ encode(Value); + encode_where({'>', Field, Value}) -> atom_to_list(Field) ++ " > " ++ encode(Value); +encode_where({'>=', Field, Value}) -> + atom_to_list(Field) ++ " >= " ++ encode(Value); + encode_where({'in', Field, Values}) -> InStr = string:join([encode(Value) || Value <- Values], ","), atom_to_list(Field) ++ " in (" ++ InStr ++ ")"; diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 6a723470c..39e92334e 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -45,8 +45,22 @@ {auth, [ %% Authentication with username, password %{username, []}, + %% Authentication with clientid %{clientid, [{password, no}, {file, "etc/clients.config"}]}, + + %% Authentication with LDAP + % {ldap, [ + % {servers, ["localhost"]}, + % {port, 389}, + % {timeout, 30}, + % {user_dn, "uid=$u,ou=People,dc=example,dc=com"}, + % {ssl, fasle}, + % {sslopts, [ + % {"certfile", "ssl.crt"}, + % {"keyfile", "ssl.key"}]} + % ]}, + %% Allow all {anonymous, []} ]}, @@ -108,8 +122,13 @@ ]}, %% Modules {modules, [ + %% Client presence management module. + %% Publish messages when client connected or disconnected + {presence, [{qos, 0}]}, + %% Subscribe topics automatically when client connected {autosub, [{"$Q/client/$c", 0}]} + %% Rewrite rules %% {rewrite, [{file, "etc/rewrite.config"}]} @@ -155,10 +174,7 @@ %% Maximum number of concurrent clients {max_clients, 512}, %% Socket Access Control - {access, [ - {allow, "127.0.0.1"}, - {deny, all} - ]}, + {access, [{allow, all}]}, %% Socket Options {sockopts, [ {backlog, 1024} diff --git a/rel/reltool.config b/rel/reltool.config index ac5d09e83..ace833401 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -4,19 +4,24 @@ {lib_dirs, ["../apps", "../deps", "../plugins"]}, {erts, [{mod_cond, derived}, {app_file, strip}]}, {app_file, strip}, - {rel, "emqttd", "0.7.0", + {rel, "emqttd", "0.8.1", [ kernel, stdlib, sasl, + asn1, syntax_tools, ssl, crypto, %mnesia, + eldap, + xmerl, os_mon, inets, goldrush, + compiler, lager, + {gen_logger, load}, gproc, esockd, mochiweb, @@ -30,7 +35,7 @@ ]}, {boot_rel, "emqttd"}, {profile, embedded}, - {incl_cond, derived}, + {incl_cond, exclude}, %{mod_cond, derived}, {excl_archive_filters, [".*"]}, %% Do not archive built libs {excl_sys_filters, ["^bin/(?!start_clean.boot)", @@ -40,16 +45,21 @@ {app, kernel, [{incl_cond, include}]}, {app, stdlib, [{incl_cond, include}]}, {app, sasl, [{incl_cond, include}]}, - {app, crypto, [{mod_cond, app}, {incl_cond, include}]}, - {app, ssl, [{mod_cond, app}, {incl_cond, include}]}, - {app, os_mon, [{mod_cond, app}, {incl_cond, include}]}, - {app, syntax_tools, [{mod_cond, app}, {incl_cond, include}]}, - {app, public_key, [{mod_cond, app}, {incl_cond, include}]}, - {app, mnesia, [{mod_cond, app}, {incl_cond, include}]}, - {app, inets, [{mod_cond, app},{incl_cond, include}]}, - {app, goldrush, [{mod_cond, app}, {incl_cond, include}]}, - {app, lager, [{mod_cond, app}, {incl_cond, include}]}, - {app, gproc, [{mod_cond, app}, {incl_cond, include}]}, + {app, asn1, [{incl_cond, include}]}, + {app, crypto, [{incl_cond, include}]}, + {app, ssl, [{incl_cond, include}]}, + {app, xmerl, [{incl_cond, include}]}, + {app, os_mon, [{incl_cond, include}]}, + {app, syntax_tools, [{incl_cond, include}]}, + {app, public_key, [{incl_cond, include}]}, + {app, mnesia, [{incl_cond, include}]}, + {app, eldap, [{incl_cond, include}]}, + {app, inets, [{incl_cond, include}]}, + {app, compiler, [{incl_cond, include}]}, + {app, goldrush, [{incl_cond, include}]}, + {app, gen_logger, [{incl_cond, include}]}, + {app, lager, [{incl_cond, include}]}, + {app, gproc, [{incl_cond, include}]}, {app, esockd, [{mod_cond, app}, {incl_cond, include}]}, {app, mochiweb, [{mod_cond, app}, {incl_cond, include}]}, {app, emqtt, [{mod_cond, app}, {incl_cond, include}]}, diff --git a/tests/benchmarks/high-mqtt.xml b/tests/benchmarks/high-mqtt.xml new file mode 100644 index 000000000..56907bb2b --- /dev/null +++ b/tests/benchmarks/high-mqtt.xml @@ -0,0 +1,64 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + test_message + + + + + + + + + +