diff --git a/CHANGELOG.md b/CHANGELOG.md index 664f5bdf4..700532931 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,44 @@ emqttd ChangeLog ================== +0.8.1-alpha (2015-05-28) +------------------------- + +Bugfix: issue #138 - when client disconnected normally, broker will not publish disconnected $SYS message + +Improve: issue #136 - $SYS topics result should not include $SYS messages + + +0.8.0-alpha (2015-05-25) +------------------------- + +[Hooks](https://github.com/emqtt/emqttd/wiki/Hooks%20Design), Modules and [Plugins](https://github.com/emqtt/emqttd/wiki/Plugin%20Design) to extend the broker Now! + +Plugin: emqttd_auth_mysql - MySQL authentication plugin (issues #116, #120) + +Plugin: emqttd_auth_ldap - LDAP authentication plugin + +Feature: emqttd_broker to support Hooks API + +Feature: issue #111 - Support 'Forced Subscriptions' by emqttd_mod_autosub module + +Feature: issue #126 - Support 'Rewrite rules' by emqttd_mod_rewrite module + +Improve: Support hooks, modules to extend the broker + +Improve: issue #76 - dialyzer check + +Improve: 'Get Started', 'User Guide', 'Developer Guide' Wiki + +Improve: emqtt_topic to add join/1, feed_var/3, is_queue/1 + +Improve: emqttd_pooler to execute common tasks + +Improve: add emqttd_sm_sup module, and use 'hash' gproc_pool to manage sessions + +Tests: add more test cases for 'emqttd' app + + 0.7.1-alpha (2015-05-04) ------------------------- diff --git a/README.md b/README.md index f3ad0cd1c..bc86472e7 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ emqttd is aimed to provide a solid, enterprise grade, extensible open-source MQT * Cluster brokers on several servers. * Bridge brokers locally or remotelly * 500K+ concurrent clients connections per server -* Extensible architecture with plugin support +* Extensible architecture with Hooks, Modules and Plugins * Passed eclipse paho interoperability tests diff --git a/TODO b/TODO index 6b39dca41..e43cbfda2 100644 --- a/TODO +++ b/TODO @@ -2,16 +2,22 @@ v0.9.0-alpha (2015-05-30) ------------------------- +Presence Management.... + +Dashboard + +Presence Management.... + v0.8.0-alpha (2015-05-10) ------------------------- +Force Subscriptions... + Documents... MySQL Auth -Dashboard - AMQP Bridge Test @@ -20,8 +26,6 @@ Bridge Test 0.8.0 (2015-05-10) ------------------------- -Presence Management.... - Force Subscription... Point2Point Queue... diff --git a/apps/emqtt/src/emqtt.app.src b/apps/emqtt/src/emqtt.app.src index f156c6e98..ea5191f92 100644 --- a/apps/emqtt/src/emqtt.app.src +++ b/apps/emqtt/src/emqtt.app.src @@ -1,7 +1,7 @@ {application, emqtt, [ {description, "Erlang MQTT Common Library"}, - {vsn, "0.7.1"}, + {vsn, "0.8.0"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqttd/src/emqttd.app.src b/apps/emqttd/src/emqttd.app.src index deedd416e..f598c13ba 100644 --- a/apps/emqttd/src/emqttd.app.src +++ b/apps/emqttd/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.7.1"}, + {vsn, "0.8.0"}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl index 4de58fbd7..30faa3e2d 100644 --- a/apps/emqttd/src/emqttd.erl +++ b/apps/emqttd/src/emqttd.erl @@ -32,6 +32,7 @@ open_listeners/1, close_listeners/1, load_all_plugins/0, unload_all_plugins/0, load_plugin/1, unload_plugin/1, + load_all_mods/0, is_mod_enabled/1, loaded_plugins/0, is_running/1]). @@ -202,6 +203,17 @@ unload_app(App) -> lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} end. +load_all_mods() -> + Mods = application:get_env(emqttd, modules, []), + lists:foreach(fun({Name, Opts}) -> + Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)), + Mod:load(Opts), + lager:info("load module ~s successfully", [Name]) + end, Mods). + +is_mod_enabled(Name) -> + env(modules, Name) =/= undefined. + %%------------------------------------------------------------------------------ %% @doc Is running? %% @end diff --git a/apps/emqttd/src/emqttd_access_control.erl b/apps/emqttd/src/emqttd_access_control.erl index eb4344b38..3f1ca1b36 100644 --- a/apps/emqttd/src/emqttd_access_control.erl +++ b/apps/emqttd/src/emqttd_access_control.erl @@ -152,7 +152,7 @@ stop() -> %%%============================================================================= init([]) -> - {ok, AcOpts} = application:get_env(access), + {ok, AcOpts} = application:get_env(emqttd, access), ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]), ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, AcOpts))}), ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, AcOpts))}), diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 80bbd968e..5a8daf60f 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -51,8 +51,9 @@ start(_StartType, _StartArgs) -> emqttd_mnesia:start(), {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), - {ok, Listeners} = application:get_env(listeners), + emqttd:load_all_mods(), emqttd:load_all_plugins(), + {ok, Listeners} = application:get_env(listeners), emqttd:open_listeners(Listeners), register(emqttd, self()), print_vsn(), @@ -71,13 +72,14 @@ start_servers(Sup) -> {"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}}, - {"emqttd session manager", emqttd_sm}, + {"emqttd session manager", {supervisor, emqttd_sm_sup}}, {"emqttd session supervisor", {supervisor, emqttd_session_sup}}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd stats", emqttd_stats}, {"emqttd metrics", emqttd_metrics}, %{"emqttd router", emqttd_router}, {"emqttd broker", emqttd_broker}, + {"emqttd mode supervisor", emqttd_mod_sup}, {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, {"emqttd access control", emqttd_access_control}, {"emqttd system monitor", emqttd_sysmon}], diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 4a9b55ea6..bff331d9c 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -42,6 +42,9 @@ %% Event API -export([subscribe/1, notify/2]). +%% Hook API +-export([hook/3, unhook/2, foreach_hooks/2, foldl_hooks/3]). + %% Broker API -export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]). @@ -127,6 +130,52 @@ datetime() -> io_lib:format( "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). +%%------------------------------------------------------------------------------ +%% @doc Hook +%% @end +%%------------------------------------------------------------------------------ +-spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}. +hook(Hook, Name, MFA) -> + gen_server:call(?MODULE, {hook, Hook, Name, MFA}). + +%%------------------------------------------------------------------------------ +%% @doc Unhook +%% @end +%%------------------------------------------------------------------------------ +-spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}. +unhook(Hook, Name) -> + gen_server:call(?MODULE, {unhook, Hook, Name}). + +%%------------------------------------------------------------------------------ +%% @doc Foreach hooks +%% @end +%%------------------------------------------------------------------------------ +-spec foreach_hooks(Hook :: atom(), Args :: list()) -> any(). +foreach_hooks(Hook, Args) -> + case ets:lookup(?BROKER_TAB, {hook, Hook}) of + [{_, Hooks}] -> + lists:foreach(fun({_Name, {M, F, A}}) -> + apply(M, F, Args++A) + end, Hooks); + [] -> + ok + end. + +%%------------------------------------------------------------------------------ +%% @doc Foldl hooks +%% @end +%%------------------------------------------------------------------------------ +-spec foldl_hooks(Hook :: atom(), Args :: list(), Acc0 :: any()) -> any(). +foldl_hooks(Hook, Args, Acc0) -> + case ets:lookup(?BROKER_TAB, {hook, Hook}) of + [{_, Hooks}] -> + lists:foldl(fun({_Name, {M, F, A}}, Acc) -> + apply(M, F, [Acc, Args++A]) + end, Acc0, Hooks); + [] -> + Acc0 + end. + %%------------------------------------------------------------------------------ %% @doc Start a tick timer %% @end @@ -163,8 +212,33 @@ init([]) -> handle_call(uptime, _From, State) -> {reply, uptime(State), State}; +handle_call({hook, Hook, Name, MFArgs}, _From, State) -> + Key = {hook, Hook}, Reply = + case ets:lookup(?BROKER_TAB, Key) of + [{Key, Hooks}] -> + case lists:keyfind(Name, 1, Hooks) of + {Name, _MFArgs} -> + {error, existed}; + false -> + ets:insert(?BROKER_TAB, {Key, Hooks ++ [{Name, MFArgs}]}) + end; + [] -> + ets:insert(?BROKER_TAB, {Key, [{Name, MFArgs}]}) + end, + {reply, Reply, State}; + +handle_call({unhook, Name}, _From, State) -> + Key = {hook, Name}, Reply = + case ets:lookup(?BROKER_TAB, Key) of + [{Key, Hooks}] -> + ets:insert(?BROKER_TAB, {Key, lists:keydelete(Name, 1, Hooks)}); + [] -> + {error, not_found} + end, + {reply, Reply, State}; + handle_call(_Request, _From, State) -> - {reply, error, State}. + {reply, {error, unsupport_request}, State}. handle_cast(_Msg, State) -> {noreply, State}. diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 6c5469090..0c9e2413a 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -121,7 +121,7 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; -handle_info({force_subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) -> +handle_info({subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) -> {ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; @@ -161,15 +161,15 @@ handle_info(Info, State = #state{peername = Peername}) -> {stop, {badinfo, Info}, State}. terminate(Reason, #state{peername = Peername, keepalive = KeepAlive, proto_state = ProtoState}) -> - lager:debug("Client ~s: ~p terminated, reason: ~p~n", [emqttd_net:format(Peername), self(), Reason]), + lager:info("Client ~s: ~p terminated, reason: ~p~n", [emqttd_net:format(Peername), self(), Reason]), notify(disconnected, Reason, ProtoState), emqttd_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of {undefined, _} -> ok; {_, {shutdown, Error}} -> emqttd_protocol:shutdown(Error, ProtoState); - {_, _} -> - ok + {_, Reason} -> + emqttd_protocol:shutdown(Reason, ProtoState) end. code_change(_OldVsn, State, _Extra) -> @@ -257,7 +257,8 @@ inc(_) -> notify(disconnected, _Reason, undefined) -> ingore; notify(disconnected, {shutdown, Reason}, ProtoState) -> - emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), [{reason, Reason}]}); + emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason}); notify(disconnected, Reason, ProtoState) -> - emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), [{reason, Reason}]}). + emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason}). + diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 51b230f7b..7758f8ec0 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -33,7 +33,7 @@ -define(SERVER, ?MODULE). %% API Exports --export([start_link/3]). +-export([start_link/2, pool/0, table/0]). -export([lookup/1, register/1, unregister/1]). @@ -41,9 +41,11 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {tab, statsfun}). +-record(state, {id, tab, statsfun}). --define(POOL, cm_pool). +-define(CM_POOL, cm_pool). + +-define(CLIENT_TAB, mqtt_client). %%%============================================================================= %%% API @@ -53,12 +55,15 @@ %% @doc Start client manager %% @end %%------------------------------------------------------------------------------ --spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when +-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when Id :: pos_integer(), - TabId :: ets:tid(), StatsFun :: fun(). -start_link(Id, TabId, StatsFun) -> - gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []). +start_link(Id, StatsFun) -> + gen_server:start_link(?MODULE, [Id, StatsFun], []). + +pool() -> ?CM_POOL. + +table() -> ?CLIENT_TAB. %%------------------------------------------------------------------------------ %% @doc Lookup client pid with clientId @@ -66,7 +71,7 @@ start_link(Id, TabId, StatsFun) -> %%------------------------------------------------------------------------------ -spec lookup(ClientId :: binary()) -> pid() | undefined. lookup(ClientId) when is_binary(ClientId) -> - case ets:lookup(emqttd_cm_sup:table(), ClientId) of + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, _}] -> Pid; [] -> undefined end. @@ -77,7 +82,7 @@ lookup(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec register(ClientId :: binary()) -> ok. register(ClientId) when is_binary(ClientId) -> - CmPid = gproc_pool:pick_worker(?POOL, ClientId), + CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), gen_server:call(CmPid, {register, ClientId, self()}, infinity). %%------------------------------------------------------------------------------ @@ -86,19 +91,19 @@ register(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec unregister(ClientId :: binary()) -> ok. unregister(ClientId) when is_binary(ClientId) -> - CmPid = gproc_pool:pick_worker(?POOL, ClientId), + CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), gen_server:cast(CmPid, {unregister, ClientId, self()}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([Id, TabId, StatsFun]) -> - gproc_pool:connect_worker(?POOL, {?MODULE, Id}), - {ok, #state{tab = TabId, statsfun = StatsFun}}. +init([Id, StatsFun]) -> + gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}), + {ok, #state{id = Id, statsfun = StatsFun}}. -handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> - case ets:lookup(Tab, ClientId) of +handle_call({register, ClientId, Pid}, _From, State) -> + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, _}] -> lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), ignore; @@ -106,9 +111,9 @@ handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), OldPid ! {stop, duplicate_id, Pid}, erlang:demonitor(MRef), - ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}); + ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}); [] -> - ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}) + ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}) end, {reply, ok, setstats(State)}; @@ -116,11 +121,11 @@ handle_call(Req, _From, State) -> lager:error("unexpected request: ~p", [Req]), {reply, {error, badreq}, State}. -handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) -> - case ets:lookup(TabId, ClientId) of +handle_cast({unregister, ClientId, Pid}, State) -> + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, MRef}] -> erlang:demonitor(MRef, [flush]), - ets:delete(TabId, ClientId); + ets:delete(?CLIENT_TAB, ClientId); [_] -> ignore; [] -> @@ -131,15 +136,15 @@ handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = TabId}) -> - ets:match_delete(TabId, {'_', DownPid, MRef}), +handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> + ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}), {noreply, setstats(State)}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(_Reason, #state{id = Id}) -> + gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -148,6 +153,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -setstats(State = #state{tab = TabId, statsfun = StatsFun}) -> - StatsFun(ets:info(TabId, size)), State. +setstats(State = #state{statsfun = StatsFun}) -> + StatsFun(ets:info(?CLIENT_TAB, size)), State. diff --git a/apps/emqttd/src/emqttd_cm_sup.erl b/apps/emqttd/src/emqttd_cm_sup.erl index 666b87548..53a338404 100644 --- a/apps/emqttd/src/emqttd_cm_sup.erl +++ b/apps/emqttd/src/emqttd_cm_sup.erl @@ -33,30 +33,26 @@ -behaviour(supervisor). %% API --export([start_link/0, table/0]). +-export([start_link/0]). %% Supervisor callbacks -export([init/1]). --define(CLIENT_TAB, mqtt_client). - start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -table() -> ?CLIENT_TAB. - init([]) -> - TabId = ets:new(?CLIENT_TAB, [set, named_table, public, - {write_concurrency, true}]), + ets:new(emqttd_cm:table(), [set, named_table, public, + {write_concurrency, true}]), Schedulers = erlang:system_info(schedulers), - gproc_pool:new(cm_pool, hash, [{size, Schedulers}]), + gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]), StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), Children = lists:map( fun(I) -> Name = {emqttd_cm, I}, - gproc_pool:add_worker(cm_pool, Name, I), - {Name, {emqttd_cm, start_link, [I, TabId, StatsFun]}, - permanent, 10000, worker, [emqttd_cm]} + gproc_pool:add_worker(emqttd_cm:pool(), Name, I), + {Name, {emqttd_cm, start_link, [I, StatsFun]}, + permanent, 10000, worker, [emqttd_cm]} end, lists:seq(1, Schedulers)), {ok, {{one_for_all, 10, 100}, Children}}. diff --git a/apps/emqttd/src/emqttd_ctl.erl b/apps/emqttd/src/emqttd_ctl.erl index 3499c3855..0a3debac6 100644 --- a/apps/emqttd/src/emqttd_ctl.erl +++ b/apps/emqttd/src/emqttd_ctl.erl @@ -95,7 +95,7 @@ cluster([SNode]) -> end. %%------------------------------------------------------------------------------ -%% @doc Add usern +%% @doc Add user %% @end %%------------------------------------------------------------------------------ useradd([Username, Password]) -> diff --git a/apps/emqttd/src/emqttd_event.erl b/apps/emqttd/src/emqttd_event.erl index 3db7817aa..e226de5de 100644 --- a/apps/emqttd/src/emqttd_event.erl +++ b/apps/emqttd/src/emqttd_event.erl @@ -73,7 +73,11 @@ handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) -> emqttd_pubsub:publish(event, Msg), {ok, State}; -handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) -> +%%TODO: Protect from undefined clientId... +handle_event({disconnected, undefined, Reason}, State = #state{systop = SysTop}) -> + {ok, State}; + +handle_event({disconnected, ClientId, Reason}, State = #state{systop = SysTop}) -> Topic = <>, Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)}, emqttd_pubsub:publish(event, Msg), diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_acl.erl b/apps/emqttd/src/emqttd_gen_mod.erl similarity index 77% rename from plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_acl.erl rename to apps/emqttd/src/emqttd_gen_mod.erl index 424ee2c86..190971d0e 100644 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_acl.erl +++ b/apps/emqttd/src/emqttd_gen_mod.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -20,26 +20,30 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd demo acl module. +%%% emqttd gen_mod behaviour %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_plugin_demo_acl). +-module(emqttd_gen_mod). -author("Feng Lee "). --include_lib("emqttd/include/emqttd.hrl"). +-include("emqttd.hrl"). --behaviour(emqttd_acl_mod). +-ifdef(use_specs). -%% ACL callbacks --export([init/1, check_acl/2, reload_acl/1, description/0]). +-callback load(Opts :: any()) -> {ok, State :: any()}. -init(Opts) -> {ok, Opts}. +-callback unload(State :: any()) -> any(). -check_acl({_Client, _PubSub, _Topic}, _State) -> ignore. +-else. -reload_acl(_State) -> ok. +-export([behaviour_info/1]). -description() -> "Demo ACL Module". +behaviour_info(callbacks) -> + [{load, 1}, {unload, 1}]; +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/plugins/emqttd_presence/src/emqttd_presence_app.erl b/apps/emqttd/src/emqttd_mod_autosub.erl similarity index 63% rename from plugins/emqttd_presence/src/emqttd_presence_app.erl rename to apps/emqttd/src/emqttd_mod_autosub.erl index da827c791..0ed1be5e1 100644 --- a/plugins/emqttd_presence/src/emqttd_presence_app.erl +++ b/apps/emqttd/src/emqttd_mod_autosub.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -20,23 +20,31 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% presence manager application +%%% emqttd auto subscribe module. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_presence_app). --behaviour(application). +-module(emqttd_mod_autosub). -%% Application callbacks --export([start/2, stop/1]). +-author("Feng Lee "). -%% =================================================================== -%% Application callbacks -%% =================================================================== +-behaviour(emqttd_gen_mod). -start(_StartType, _StartArgs) -> - emqttd_presence_sup:start_link(). +-export([load/1, subscribe/2, unload/1]). + +-record(state, {topics}). + +load(Opts) -> + Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2], + emqttd_broker:hook(client_connected, {?MODULE, subscribe}, + {?MODULE, subscribe, [Topics]}), + {ok, #state{topics = Topics}}. + +subscribe({Client, ClientId}, Topics) -> + F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end, + [Client ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics]. + +unload(_Opts) -> + emqttd_broker:unhook(client_connected, {?MODULE, subscribe}). -stop(_State) -> - ok. diff --git a/apps/emqttd/src/emqttd_mod_rewrite.erl b/apps/emqttd/src/emqttd_mod_rewrite.erl new file mode 100644 index 000000000..fc038d95f --- /dev/null +++ b/apps/emqttd/src/emqttd_mod_rewrite.erl @@ -0,0 +1,127 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd rewrite module. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_mod_rewrite). + +-author("Feng Lee "). + +-include_lib("emqtt/include/emqtt.hrl"). + +-behaviour(emqttd_gen_mod). + +-export([load/1, reload/1, unload/1]). + +-export([rewrite/2]). + +%%%============================================================================= +%%% API +%%%============================================================================= + +load(Opts) -> + File = proplists:get_value(file, Opts), + {ok, Terms} = file:consult(File), + Sections = compile(Terms), + emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe}, + {?MODULE, rewrite, [subscribe, Sections]}), + emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}, + {?MODULE, rewrite, [unsubscribe, Sections]}), + emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish}, + {?MODULE, rewrite, [publish, Sections]}). + +rewrite(TopicTable, [subscribe, Sections]) -> + lager:info("rewrite subscribe: ~p", [TopicTable]), + [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]; + +rewrite(Topics, [unsubscribe, Sections]) -> + lager:info("rewrite unsubscribe: ~p", [Topics]), + [match_topic(Topic, Sections) || Topic <- Topics]; + +rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) -> + %%TODO: this will not work if the client is always online. + RewriteTopic = + case get({rewrite, Topic}) of + undefined -> + DestTopic = match_topic(Topic, Sections), + put({rewrite, Topic}, DestTopic), DestTopic; + DestTopic -> + DestTopic + end, + Message#mqtt_message{topic = RewriteTopic}. + +reload(File) -> + %%TODO: The unload api is not right... + case emqttd:is_mod_enabled(rewrite) of + true -> + unload(state), + load([{file, File}]); + false -> + {error, module_unloaded} + end. + +unload(_) -> + emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}), + emqttd_broker:unhook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}), + emqttd_broker:unhook(client_publish, {?MODULE, rewrite_publish}). + +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +compile(Sections) -> + C = fun({rewrite, Re, Dest}) -> + {ok, MP} = re:compile(Re), + {rewrite, MP, Dest} + end, + F = fun({topic, Topic, Rules}) -> + {topic, list_to_binary(Topic), [C(R) || R <- Rules]} + end, + [F(Section) || Section <- Sections]. + +match_topic(Topic, []) -> + Topic; +match_topic(Topic, [{topic, Filter, Rules} | Sections]) -> + case emqtt_topic:match(Topic, Filter) of + true -> + match_rule(Topic, Rules); + false -> + match_topic(Topic, Sections) + end. + +match_rule(Topic, []) -> + Topic; +match_rule(Topic, [{rewrite, MP, Dest} | Rules]) -> + case re:run(Topic, MP, [{capture, all_but_first, list}]) of + {match, Captured} -> + %%TODO: stupid??? how to replace $1, $2? + Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured), + iolist_to_binary(lists:foldl( + fun({Var, Val}, Acc) -> + re:replace(Acc, Var, Val, [global]) + end, Dest, Vars)); + nomatch -> + match_rule(Topic, Rules) + end. diff --git a/apps/emqttd/src/emqttd_mod_sup.erl b/apps/emqttd/src/emqttd_mod_sup.erl new file mode 100644 index 000000000..8fa3ced59 --- /dev/null +++ b/apps/emqttd/src/emqttd_mod_sup.erl @@ -0,0 +1,67 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd module supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_mod_sup). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/0, start_child/1, start_child/2]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}). + +%%%============================================================================= +%%% API +%%%============================================================================= + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_child(ChildSpec) when is_tuple(ChildSpec) -> + supervisor:start_child(?MODULE, ChildSpec). + +%% +%% start_child(Mod::atom(), Type::type()) -> {ok, pid()} +%% @type type() = worker | supervisor +%% +start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> + supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). + +%%%============================================================================= +%%% Supervisor callbacks +%%%============================================================================= + +init([]) -> + {ok, {{one_for_one, 10, 3600}, []}}. + diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index eb08b62fd..08b24674f 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -134,10 +134,10 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = emqttd_cm:register(ClientId1), %%Starting session {ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}), - %% Force subscriptions - force_subscribe(ClientId1), %% Start keepalive start_keepalive(KeepAlive), + %% Run hooks + emqttd_broker:foreach_hooks(client_connected, [{self(), ClientId1}]), {?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1, session = Session, will_msg = willmsg(Var)}}; @@ -158,7 +158,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case check_acl(publish, Topic, State) of allow -> - emqttd_session:publish(Session, ClientId, {?QOS_0, emqtt_message:from_packet(Packet)}); + do_publish(Session, ClientId, ?QOS_0, Packet); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]) end, @@ -168,7 +168,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case check_acl(publish, Topic, State) of allow -> - emqttd_session:publish(Session, ClientId, {?QOS_1, emqtt_message:from_packet(Packet)}), + do_publish(Session, ClientId, ?QOS_1, Packet), send(?PUBACK_PACKET(?PUBACK, PacketId), State); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), @@ -179,7 +179,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case check_acl(publish, Topic, State) of allow -> - NewSession = emqttd_session:publish(Session, ClientId, {?QOS_2, emqtt_message:from_packet(Packet)}), + NewSession = do_publish(Session, ClientId, ?QOS_2, Packet), send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession}); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), @@ -212,12 +212,13 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), {ok, State}; false -> + TopicTable1 = emqttd_broker:foldl_hooks(client_subscribe, [], TopicTable), %%TODO: GrantedQos should be renamed. - {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), + {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1), send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession}) end; -handle({subscribe, Topic, Qos}, State = #proto_state{clientid = ClientId, session = Session}) -> +handle({subscribe, Topic, Qos}, State = #proto_state{session = Session}) -> {ok, NewSession, _GrantedQos} = emqttd_session:subscribe(Session, [{Topic, Qos}]), {ok, State#proto_state{session = NewSession}}; @@ -226,7 +227,8 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> - {ok, NewSession} = emqttd_session:unsubscribe(Session, Topics), + Topics1 = emqttd_broker:foldl_hooks(client_unsubscribe, [], Topics), + {ok, NewSession} = emqttd_session:unsubscribe(Session, Topics1), send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession}); handle(?PACKET(?PINGREQ), State) -> @@ -237,6 +239,10 @@ handle(?PACKET(?DISCONNECT), State) -> % clean willmsg {stop, normal, State#proto_state{will_msg = undefined}}. +do_publish(Session, ClientId, Qos, Packet) -> + Message = emqttd_broker:foldl_hooks(client_publish, [], emqtt_message:from_packet(Packet)), + emqttd_session:publish(Session, ClientId, {Qos, Message}). + -spec send({pid() | tuple(), mqtt_message()} | mqtt_packet(), proto_state()) -> {ok, proto_state()}. %% qos0 message send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) -> @@ -298,23 +304,8 @@ send_willmsg(_ClientId, undefined) -> send_willmsg(ClientId, WillMsg) -> emqttd_pubsub:publish(ClientId, WillMsg). -%%TODO: will be fixed in 0.8 -force_subscribe(ClientId) -> - case emqttd_broker:env(forced_subscriptions) of - undefined -> - ingore; - Topics -> - [force_subscribe(ClientId, {Topic, Qos}) || {Topic, Qos} <- Topics] - end. - -force_subscribe(ClientId, {Topic, Qos}) when is_list(Topic) -> - force_subscribe(ClientId, {list_to_binary(Topic), Qos}); - -force_subscribe(ClientId, {Topic, Qos}) when is_binary(Topic) -> - Topic1 = emqtt_topic:feed_var(<<"$c">>, ClientId, Topic), - self() ! {force_subscribe, Topic1, Qos}. - start_keepalive(0) -> ignore; + start_keepalive(Sec) when Sec > 0 -> self() ! {keepalive, start, round(Sec * 1.5)}. diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 51a6a217a..b52037d29 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -159,7 +159,7 @@ cast(Msg) -> %%------------------------------------------------------------------------------ -spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok. publish(From, #mqtt_message{topic=Topic} = Msg) -> - lager:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s", [From, Topic]), + trace(publish, From, Msg), %% Retain message first. Don't create retained topic. case emqttd_msg_store:retain(Msg) of ok -> @@ -169,7 +169,7 @@ publish(From, #mqtt_message{topic=Topic} = Msg) -> publish(From, Topic, Msg) end. -publish(_From, <<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) -> +publish(From, <<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) -> lists:foreach( fun(#mqtt_queue{subpid = SubPid, qos = SubQos}) -> Msg1 = if @@ -450,3 +450,16 @@ setstats(dropped, false) -> setstats(dropped, true) -> emqttd_metrics:inc('messages/dropped'). + +%%%============================================================================= +%%% Trace functions +%%%============================================================================= + +trace(publish, From, _Msg) when is_atom(From) -> + %%dont' trace broker publish + ignore; + +trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) -> + lager:info([{client, From}, {topic, Topic}], + "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). + diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index ef26cb44e..05d9e62b0 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -44,10 +44,8 @@ -behaviour(gen_server). --define(SERVER, ?MODULE). - %% API Function Exports --export([start_link/0]). +-export([start_link/2, pool/0, table/0]). -export([lookup_session/1, start_session/2, destroy_session/1]). @@ -55,16 +53,37 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {tabid, statsfun}). +-record(state, {id, tabid, statsfun}). + +-define(SM_POOL, sm_pool). -define(SESSION_TAB, mqtt_session). %%%============================================================================= %%% API %%%============================================================================= --spec start_link() -> {ok, pid()} | ignore | {error, any()}. -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%------------------------------------------------------------------------------ +%% @doc Start a session manager +%% @end +%%------------------------------------------------------------------------------ +-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when + Id :: pos_integer(), + StatsFun :: fun(). +start_link(Id, StatsFun) -> + gen_server:start_link(?MODULE, [Id, StatsFun], []). + +%%------------------------------------------------------------------------------ +%% @doc Pool name. +%% @end +%%------------------------------------------------------------------------------ +pool() -> ?SM_POOL. + +%%------------------------------------------------------------------------------ +%% @doc Table name. +%% @end +%%------------------------------------------------------------------------------ +table() -> ?SESSION_TAB. %%------------------------------------------------------------------------------ %% @doc Lookup Session Pid @@ -72,7 +91,7 @@ start_link() -> %%------------------------------------------------------------------------------ -spec lookup_session(binary()) -> pid() | undefined. lookup_session(ClientId) -> - case ets:lookup(?SESSION_TAB, ClientId) of + case ets:lookup(emqttd_sm_sup:table(), ClientId) of [{_, SessPid, _}] -> SessPid; [] -> undefined end. @@ -83,7 +102,8 @@ lookup_session(ClientId) -> %%------------------------------------------------------------------------------ -spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}. start_session(ClientId, ClientPid) -> - gen_server:call(?SERVER, {start_session, ClientId, ClientPid}). + SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), + gen_server:call(SmPid, {start_session, ClientId, ClientPid}). %%------------------------------------------------------------------------------ %% @doc Destroy a session @@ -91,29 +111,27 @@ start_session(ClientId, ClientPid) -> %%------------------------------------------------------------------------------ -spec destroy_session(binary()) -> ok. destroy_session(ClientId) -> - gen_server:call(?SERVER, {destroy_session, ClientId}). + SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), + gen_server:call(SmPid, {destroy_session, ClientId}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([]) -> - process_flag(trap_exit, true), - TabId = ets:new(?SESSION_TAB, [set, protected, named_table]), - StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), - {ok, #state{tabid = TabId, statsfun = StatsFun}}. +init([Id, StatsFun]) -> + gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}), + {ok, #state{id = Id, statsfun = StatsFun}}. -handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) -> +handle_call({start_session, ClientId, ClientPid}, _From, State) -> Reply = - case ets:lookup(Tab, ClientId) of + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, _MRef}] -> emqttd_session:resume(SessPid, ClientId, ClientPid), {ok, SessPid}; [] -> case emqttd_session_sup:start_session(ClientId, ClientPid) of {ok, SessPid} -> - ets:insert(Tab, {ClientId, SessPid, - erlang:monitor(process, SessPid)}), + ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}), {ok, SessPid}; {error, Error} -> {error, Error} @@ -121,12 +139,12 @@ handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = end, {reply, Reply, setstats(State)}; -handle_call({destroy_session, ClientId}, _From, State = #state{tabid = Tab}) -> - case ets:lookup(Tab, ClientId) of +handle_call({destroy_session, ClientId}, _From, State) -> + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, MRef}] -> emqttd_session:destroy(SessPid, ClientId), erlang:demonitor(MRef, [flush]), - ets:delete(Tab, ClientId); + ets:delete(?SESSION_TAB, ClientId); [] -> ignore end, @@ -145,8 +163,8 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tabid = Ta handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(_Reason, #state{id = Id}) -> + gproc_pool:disconnect_worker(?SM_POOL, {?MODULE, Id}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/apps/emqttd/src/emqttd_sm_sup.erl b/apps/emqttd/src/emqttd_sm_sup.erl new file mode 100644 index 000000000..ece44dd38 --- /dev/null +++ b/apps/emqttd/src/emqttd_sm_sup.erl @@ -0,0 +1,59 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd client manager supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_sm_sup). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +%% API +-export([start_link/0]). + +-behaviour(supervisor). + +%% Supervisor callbacks +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + ets:new(emqttd_sm:table(), [set, named_table, public, + {write_concurrency, true}]), + Schedulers = erlang:system_info(schedulers), + gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), + StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), + Children = lists:map( + fun(I) -> + Name = {emqttd_sm, I}, + gproc_pool:add_worker(emqttd_sm:pool(), Name, I), + {Name, {emqttd_sm, start_link, [I, StatsFun]}, + permanent, 10000, worker, [emqttd_sm]} + end, lists:seq(1, Schedulers)), + {ok, {{one_for_all, 10, 100}, Children}}. + + diff --git a/apps/emqttd/src/emqttd_sup.erl b/apps/emqttd/src/emqttd_sup.erl index bb460d144..6e57818b7 100644 --- a/apps/emqttd/src/emqttd_sup.erl +++ b/apps/emqttd/src/emqttd_sup.erl @@ -39,7 +39,7 @@ -export([init/1]). %% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). +-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}). %%%============================================================================= %%% API @@ -63,5 +63,5 @@ start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> %%%============================================================================= init([]) -> - {ok, {{one_for_all, 10, 100}, []}}. + {ok, {{one_for_all, 10, 3600}, []}}. diff --git a/apps/emqttd/test/emqttd_access_control_tests.erl b/apps/emqttd/test/emqttd_access_control_tests.erl new file mode 100644 index 000000000..5c906804e --- /dev/null +++ b/apps/emqttd/test/emqttd_access_control_tests.erl @@ -0,0 +1,107 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd_access_control tests. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_access_control_tests). + +-include("emqttd.hrl"). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +reload_acl_test() -> + with_acl( + fun() -> + ?assertEqual([ok], emqttd_access_control:reload_acl()) + end). + +register_mod_test() -> + with_acl( + fun() -> + emqttd_access_control:register_mod(acl, emqttd_acl_test_mod, []), + ?assertMatch([{emqttd_acl_test_mod, _}, {emqttd_acl_internal, _}], + emqttd_access_control:lookup_mods(acl)), + emqttd_access_control:register_mod(auth, emqttd_auth_anonymous_test_mod,[]), + ?assertMatch([{emqttd_auth_anonymous_test_mod, _}, {emqttd_auth_anonymous, _}], + emqttd_access_control:lookup_mods(auth)) + end). + +unregister_mod_test() -> + with_acl( + fun() -> + emqttd_access_control:register_mod(acl,emqttd_acl_test_mod, []), + ?assertMatch([{emqttd_acl_test_mod, _}, {emqttd_acl_internal, _}], + emqttd_access_control:lookup_mods(acl)), + emqttd_access_control:unregister_mod(acl, emqttd_acl_test_mod), + timer:sleep(5), + ?assertMatch([{emqttd_acl_internal, _}], emqttd_access_control:lookup_mods(acl)), + + emqttd_access_control:register_mod(auth, emqttd_auth_anonymous_test_mod,[]), + ?assertMatch([{emqttd_auth_anonymous_test_mod, _}, {emqttd_auth_anonymous, _}], + emqttd_access_control:lookup_mods(auth)), + + emqttd_access_control:unregister_mod(auth, emqttd_auth_anonymous_test_mod), + timer:sleep(5), + ?assertMatch([{emqttd_auth_anonymous, _}], emqttd_access_control:lookup_mods(auth)) + end). + +check_acl_test() -> + with_acl( + fun() -> + User1 = #mqtt_client{clientid = <<"client1">>, username = <<"testuser">>}, + User2 = #mqtt_client{clientid = <<"client2">>, username = <<"xyz">>}, + ?assertEqual(allow, emqttd_access_control:check_acl(User1, subscribe, <<"users/testuser/1">>)), + ?assertEqual(allow, emqttd_access_control:check_acl(User1, subscribe, <<"clients/client1">>)), + ?assertEqual(deny, emqttd_access_control:check_acl(User1, subscribe, <<"clients/client1/x/y">>)), + ?assertEqual(allow, emqttd_access_control:check_acl(User1, publish, <<"users/testuser/1">>)), + ?assertEqual(allow, emqttd_access_control:check_acl(User1, subscribe, <<"a/b/c">>)), + ?assertEqual(deny, emqttd_access_control:check_acl(User2, subscribe, <<"a/b/c">>)) + end). + +with_acl(Fun) -> + process_flag(trap_exit, true), + AclOpts = [ + {auth, [ + %% Authentication with username, password + %{username, []}, + %% Authentication with clientid + %{clientid, [{password, no}, {file, "etc/clients.config"}]}, + %% Allow all + {anonymous, []} + ]}, + %% ACL config + {acl, [ + %% Internal ACL module + {internal, [{file, "../test/test_acl.config"}, {nomatch, allow}]} + ]} + ], + application:set_env(emqttd, access, AclOpts), + emqttd_access_control:start_link(), + Fun(), + emqttd_access_control:stop(). + +-endif. + diff --git a/apps/emqttd/test/emqttd_access_rule_tests.erl b/apps/emqttd/test/emqttd_access_rule_tests.erl index 66632c663..e0a5427df 100644 --- a/apps/emqttd/test/emqttd_access_rule_tests.erl +++ b/apps/emqttd/test/emqttd_access_rule_tests.erl @@ -53,8 +53,8 @@ compile_test() -> ?assertEqual({deny, all}, compile({deny, all})). match_test() -> - User = #mqtt_user{ipaddr = {127,0,0,1}, clientid = <<"testClient">>, username = <<"TestUser">>}, - User2 = #mqtt_user{ipaddr = {192,168,0,10}, clientid = <<"testClient">>, username = <<"TestUser">>}, + User = #mqtt_client{ipaddr = {127,0,0,1}, clientid = <<"testClient">>, username = <<"TestUser">>}, + User2 = #mqtt_client{ipaddr = {192,168,0,10}, clientid = <<"testClient">>, username = <<"TestUser">>}, ?assertEqual({matched, allow}, match(User, <<"Test/Topic">>, {allow, all})), ?assertEqual({matched, deny}, match(User, <<"Test/Topic">>, {deny, all})), @@ -68,7 +68,7 @@ match_test() -> compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]}))), ?assertMatch({matched, allow}, match(User, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/$c"]}))), - ?assertMatch({matched, allow}, match(#mqtt_user{username = <<"user2">>}, <<"users/user2/abc/def">>, + ?assertMatch({matched, allow}, match(#mqtt_client{username = <<"user2">>}, <<"users/user2/abc/def">>, compile({allow, all, subscribe, ["users/$u/#"]}))), ?assertMatch({matched, deny}, match(User, <<"d/e/f">>, diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_auth.erl b/apps/emqttd/test/emqttd_auth_anonymous_test_mod.erl similarity index 83% rename from plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_auth.erl rename to apps/emqttd/test/emqttd_auth_anonymous_test_mod.erl index 3e693335c..a69d24630 100644 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_auth.erl +++ b/apps/emqttd/test/emqttd_auth_anonymous_test_mod.erl @@ -20,23 +20,20 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd demo auth module. +%%% Test ACL Module. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_plugin_demo_auth). - --author("Feng Lee "). - --include_lib("emqttd/include/emqttd.hrl"). - --behaviour(emqttd_auth_mod). +-module(emqttd_auth_anonymous_test_mod). +%% ACL callbacks -export([init/1, check/3, description/0]). -init(Opts) -> {ok, Opts}. +init(AclOpts) -> + {ok, AclOpts}. -check(_Client, _Password, _Opts) -> ignore. - -description() -> "Demo authentication module". +check(_Client, _Password, _Opts) -> + allow. +description() -> + "Test emqttd_auth_anonymous Mod". diff --git a/plugins/emqttd_auth_ldap/README.md b/plugins/emqttd_auth_ldap/README.md new file mode 100644 index 000000000..083b318ac --- /dev/null +++ b/plugins/emqttd_auth_ldap/README.md @@ -0,0 +1,23 @@ +## Overview + +Authentication with LDAP. + +## Plugin Config + +``` + {emqttd_auth_ldap, [ + {servers, ["localhost"]}, + {port, 389}, + {timeout, 30}, + {user_dn, "uid=$u,ou=People,dc=example,dc=com"}, + {ssl, fasle}, + {sslopts, [ + {"certfile", "ssl.crt"}, + {"keyfile", "ssl.key"}]} + ]} +``` + +## Load Plugin + +Merge the'etc/plugin.config' to emqttd/etc/plugins.config, and the plugin will be loaded automatically. + diff --git a/plugins/emqttd_auth_ldap/etc/plugin.config b/plugins/emqttd_auth_ldap/etc/plugin.config new file mode 100644 index 000000000..ac582d7d4 --- /dev/null +++ b/plugins/emqttd_auth_ldap/etc/plugin.config @@ -0,0 +1,12 @@ +[ + {emqttd_auth_ldap, [ + {servers, ["localhost"]}, + {port, 389}, + {timeout, 30}, + {user_dn, "uid=$u,ou=People,dc=example,dc=com"}, + {ssl, fasle}, + {sslopts, [ + {"certfile", "ssl.crt"}, + {"keyfile", "ssl.key"}]} + ]} +]. diff --git a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src new file mode 100644 index 000000000..e699fdba3 --- /dev/null +++ b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src @@ -0,0 +1,12 @@ +{application, emqttd_auth_ldap, + [ + {description, "emqttd LDAP Authentication Plugin"}, + {vsn, "1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {mod, { emqttd_auth_ldap_app, []}}, + {env, []} + ]}. diff --git a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.erl b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.erl new file mode 100644 index 000000000..ba1bdca4d --- /dev/null +++ b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.erl @@ -0,0 +1,92 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% LDAP Authentication Module. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_auth_ldap). + +-author("Feng Lee "). + +-include_lib("emqttd/include/emqttd.hrl"). + +-import(proplists, [get_value/2, get_value/3]). + +-behaviour(emqttd_auth_mod). + +-export([init/1, check/3, description/0]). + +-record(state, {servers, user_dn, options}). + +init(Opts) -> + Servers = get_value(servers, Opts, ["localhost"]), + Port = get_value(port, Opts, 389), + Timeout = get_value(timeout, Opts, 30), + UserDn = get_value(user_dn, Opts), + LdapOpts = + case get_value(ssl, Opts, false) of + true -> + SslOpts = get_value(sslopts, Opts), + [{port, Port}, {timeout, Timeout}, {sslopts, SslOpts}]; + false -> + [{port, Port}, {timeout, Timeout}] + end, + {ok, #state{servers = Servers, user_dn = UserDn, options = LdapOpts}}. + +check(#mqtt_client{username = undefined}, _Password, _State) -> + {error, "Username undefined"}; +check(_Client, undefined, _State) -> + {error, "Password undefined"}; +check(_Client, <<>>, _State) -> + {error, "Password undefined"}; +check(#mqtt_client{username = Username}, Password, + #state{servers = Servers, user_dn = UserDn, options = Options}) -> + case eldap:open(Servers, Options) of + {ok, LDAP} -> + UserDn1 = fill(binary_to_list(Username), UserDn), + ldap_bind(LDAP, UserDn1, binary_to_list(Password)); + {error, Reason} -> + {error, Reason} + end. + +ldap_bind(LDAP, UserDn, Password) -> + case catch eldap:simple_bind(LDAP, UserDn, Password) of + ok -> + ok; + {error, invalidCredentials} -> + {error, "LDAP Invalid Credentials"}; + {error, Error} -> + {error, Error}; + {'EXIT', Reason} -> + {error, Reason} + end. + +fill(Username, UserDn) -> + lists:append(lists:map( + fun("$u") -> Username; + (S) -> S + end, string:tokens(UserDn, ",="))). + +description() -> + "LDAP Authentication Module". + diff --git a/plugins/emqttd_presence/src/emqttd_presence_sup.erl b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl similarity index 64% rename from plugins/emqttd_presence/src/emqttd_presence_sup.erl rename to plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl index e7dbc7e6d..1cea23075 100644 --- a/plugins/emqttd_presence/src/emqttd_presence_sup.erl +++ b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -20,35 +20,39 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% presence manager supervisor. +%%% LDAP Authentication Plugin. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_presence_sup). +-module(emqttd_auth_ldap_app). + +-behaviour(application). +%% Application callbacks +-export([start/2, prep_stop/1, stop/1]). -behaviour(supervisor). - -%% API --export([start_link/0]). - %% Supervisor callbacks -export([init/1]). -%% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). +%%%============================================================================= +%%% Application callbacks +%%%============================================================================= -%% =================================================================== -%% API functions -%% =================================================================== - -start_link() -> +start(_StartType, _StartArgs) -> + Env = application:get_all_env(emqttd_auth_ldap), + emqttd_access_control:register_mod(auth, emqttd_auth_ldap, Env), supervisor:start_link({local, ?MODULE}, ?MODULE, []). -%% =================================================================== -%% Supervisor callbacks -%% =================================================================== +prep_stop(State) -> + emqttd_access_control:unregister_mod(auth, emqttd_auth_ldap), State. + +stop(_State) -> + ok. + +%%%============================================================================= +%%% Supervisor callbacks(Dummy) +%%%============================================================================= init([]) -> {ok, { {one_for_one, 5, 10}, []} }. - diff --git a/plugins/emqttd_auth_mysql/README.md b/plugins/emqttd_auth_mysql/README.md index 736f0fae2..02db17e10 100644 --- a/plugins/emqttd_auth_mysql/README.md +++ b/plugins/emqttd_auth_mysql/README.md @@ -1,8 +1,32 @@ -## Overview + +## Overview Authentication with user table of MySQL database. -## User Table +## etc/plugin.config + +``` + {emysql, [ + {pool, 4}, + {host, "localhost"}, + {port, 3306}, + {username, ""}, + {password, ""}, + {database, "mqtt"}, + {encoding, utf8} + ]}, + {emqttd_auth_mysql, [ + {user_table, mqtt_users}, + %% plain password only + {password_hash, plain}, + {field_mapper, [ + {username, username}, + {password, password} + ]} + ]} +``` + +## Users Table(Demo) Notice: This is a demo table. You could authenticate with any user tables. @@ -18,8 +42,7 @@ CREATE TABLE `mqtt_users` ( ) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ``` -## Plugins config - -Please configure 'etc/plugins.config' to loade emysql and emqttd_auth_mysql plugins. +## Load Plugin +Merge the'etc/plugin.config' to emqttd/etc/plugins.config, and the plugin will be loaded by the broker. diff --git a/plugins/emqttd_auth_mysql/etc/plugin.config b/plugins/emqttd_auth_mysql/etc/plugin.config new file mode 100644 index 000000000..a5ef4bc41 --- /dev/null +++ b/plugins/emqttd_auth_mysql/etc/plugin.config @@ -0,0 +1,16 @@ +{emysql, [ + {pool, 4}, + {host, "localhost"}, + {port, 3306}, + {username, "root"}, + {password, "public"}, + {database, "mqtt"}, + {encoding, utf8} +]}, +{emqttd_auth_mysql, [ + {users_table, mqtt_users}, + {field_mapper, [ + {username, username}, + {password, password, plain} + ]} +]} diff --git a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.app.src b/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.app.src index 9094f9e77..965e1825e 100644 --- a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.app.src +++ b/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.app.src @@ -1,7 +1,7 @@ {application, emqttd_auth_mysql, [ - {description, ""}, - {vsn, "0.1"}, + {description, "emqttd MySQL Authentication Plugin"}, + {vsn, "1.0"}, {registered, []}, {applications, [ kernel, diff --git a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.erl b/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.erl index 0ba398249..cf2d32cbc 100644 --- a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.erl +++ b/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd authentication by mysql user table. +%%% emqttd authentication by mysql 'user' table. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -34,22 +34,42 @@ -export([init/1, check/3, description/0]). --record(state, {user_tab}). +-record(state, {user_table, name_field, pass_field, pass_hash}). init(Opts) -> - UserTab = proplists:get_value(user_table, Opts, mqtt_users), - {ok, #state{user_tab = UserTab}}. + Mapper = proplists:get_value(field_mapper, Opts), + {ok, #state{user_table = proplists:get_value(user_table, Opts, mqtt_users), + name_field = proplists:get_value(username, Mapper), + pass_field = proplists:get_value(password, Mapper), + pass_hash = proplists:get_value(Opts, password_hash)}}. check(#mqtt_client{username = undefined}, _Password, _State) -> {error, "Username undefined"}; check(_Client, undefined, _State) -> {error, "Password undefined"}; -check(#mqtt_client{username = Username}, Password, #state{user_tab = UserTab}) -> - %%TODO: hash password... - case emysql:select(UserTab, {'and', {username, Username}, {password, Password}}) of - {ok, []} -> {error, "Username or Password not match"}; +check(#mqtt_client{username = Username}, Password, + #state{user_table = UserTab, pass_hash = Type, + name_field = NameField, pass_field = PassField}) -> + Where = {'and', {NameField, Username}, {PassField, hash(Type, Password)}}, + case emysql:select(UserTab, Where) of + {ok, []} -> {error, "Username or Password "}; {ok, _Record} -> ok end. description() -> "Authentication by MySQL". +hash(plain, Password) -> + Password; + +hash(md5, Password) -> + hexstring(crypto:hash(md5, Password)); + +hash(sha, Password) -> + hexstring(crypto:hash(sha, Password)). + +hexstring(<>) -> + lists:flatten(io_lib:format("~32.16.0b", [X])); + +hexstring(<>) -> + lists:flatten(io_lib:format("~40.16.0b", [X])). + diff --git a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql_app.erl b/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql_app.erl index 80b3f37f2..86881a4bb 100644 --- a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql_app.erl +++ b/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql_app.erl @@ -20,27 +20,40 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% mysql authentication app. +%%% emqttd mysql authentication app. %%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_auth_mysql_app). -behaviour(application). - %% Application callbacks --export([start/2, stop/1]). +-export([start/2, prep_stop/1, stop/1]). -%% =================================================================== -%% Application callbacks -%% =================================================================== +-behaviour(supervisor). +%% Supervisor callbacks +-export([init/1]). + +%%%============================================================================= +%%% Application callbacks +%%%============================================================================= start(_StartType, _StartArgs) -> - {ok, Sup} = emqttd_auth_mysql_sup:start_link(), Env = application:get_all_env(), emqttd_access_control:register_mod(auth, emqttd_auth_mysql, Env), - {ok, Sup}. + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +prep_stop(State) -> + emqttd_access_control:unregister_mod(auth, emqttd_auth_mysql), State. stop(_State) -> - emqttd_access_control:unregister_mod(auth, emqttd_auth_mysql), ok. + +%%%============================================================================= +%%% Supervisor callbacks(Dummy) +%%%============================================================================= + +init([]) -> + {ok, { {one_for_one, 5, 10}, []} }. + + diff --git a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql_sup.erl b/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql_sup.erl deleted file mode 100644 index f2fc9e0b1..000000000 --- a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql_sup.erl +++ /dev/null @@ -1,27 +0,0 @@ --module(emqttd_auth_mysql_sup). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - -%% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). - -%% =================================================================== -%% API functions -%% =================================================================== - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% =================================================================== -%% Supervisor callbacks -%% =================================================================== - -init([]) -> - {ok, { {one_for_one, 5, 10}, []} }. - diff --git a/plugins/emqttd_dashboard/src/emqttd_dashboard.erl b/plugins/emqttd_dashboard/src/emqttd_dashboard.erl index 733d4d683..fae297b16 100644 --- a/plugins/emqttd_dashboard/src/emqttd_dashboard.erl +++ b/plugins/emqttd_dashboard/src/emqttd_dashboard.erl @@ -28,6 +28,11 @@ -author("Feng Lee "). +-export([handle_request/1]). + %%TODO... +handle_request(Req) -> + Req:ok("hello!"). + diff --git a/plugins/emqttd_dashboard/src/emqttd_dashboard_app.erl b/plugins/emqttd_dashboard/src/emqttd_dashboard_app.erl index e1f6c9550..8e0898679 100644 --- a/plugins/emqttd_dashboard/src/emqttd_dashboard_app.erl +++ b/plugins/emqttd_dashboard/src/emqttd_dashboard_app.erl @@ -10,7 +10,18 @@ %% =================================================================== start(_StartType, _StartArgs) -> - emqttd_dashboard_sup:start_link(). + {ok, Sup} = emqttd_dashboard_sup:start_link(), + open_listener(application:get_env(listener)), + {ok, Sup}. stop(_State) -> ok. + +%% open http port +open_listener({_Http, Port, Options}) -> + MFArgs = {emqttd_dashboard, handle_request, []}, + mochiweb:start_http(Port, Options, MFArgs). + +close_listener(Port) -> + mochiweb:stop_http(Port). + diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src deleted file mode 100644 index ecc0b1114..000000000 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src +++ /dev/null @@ -1,12 +0,0 @@ -{application, emqttd_plugin_demo, - [ - {description, "emqttd demo plugin"}, - {vsn, "0.1"}, - {registered, []}, - {applications, [ - kernel, - stdlib - ]}, - {mod, { emqttd_plugin_demo_app, []}}, - {env, []} - ]}. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl deleted file mode 100644 index b3898f7f6..000000000 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl +++ /dev/null @@ -1,21 +0,0 @@ --module(emqttd_plugin_demo_app). - --behaviour(application). - -%% Application callbacks --export([start/2, stop/1]). - -%% =================================================================== -%% Application callbacks -%% =================================================================== - -start(_StartType, _StartArgs) -> - {ok, Sup} = emqttd_plugin_demo_sup:start_link(), - emqttd_access_control:register_mod(auth, emqttd_plugin_demo_auth, []), - emqttd_access_control:register_mod(acl, emqttd_plugin_demo_acl, []), - {ok, Sup}. - -stop(_State) -> - emqttd_access_control:unregister_mod(auth, emqttd_plugin_demo_auth), - emqttd_access_control:unregister_mod(acl, emqttd_plugin_demo_acl), - ok. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl deleted file mode 100644 index 65dad7a60..000000000 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl +++ /dev/null @@ -1,27 +0,0 @@ --module(emqttd_plugin_demo_sup). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - -%% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). - -%% =================================================================== -%% API functions -%% =================================================================== - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% =================================================================== -%% Supervisor callbacks -%% =================================================================== - -init([]) -> - {ok, { {one_for_one, 5, 10}, []} }. - diff --git a/plugins/emqttd_plugin_mysql/.placehodler b/plugins/emqttd_plugin_mysql/.placehodler new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/emqttd_plugin_mysql/Makefile b/plugins/emqttd_plugin_mysql/Makefile new file mode 100755 index 000000000..086a1fa40 --- /dev/null +++ b/plugins/emqttd_plugin_mysql/Makefile @@ -0,0 +1,32 @@ +REBAR?=./rebar + + +all: build + + +clean: + $(REBAR) clean + rm -rf logs + rm -rf .eunit + rm -f test/*.beam + + +distclean: clean + git clean -fxd + +build: depends + $(REBAR) compile + + +eunit: + $(REBAR) eunit skip_deps=true + + +check: build eunit + + +%.beam: %.erl + erlc -o test/ $< + + +.PHONY: all clean distclean depends build eunit check diff --git a/plugins/emqttd_plugin_mysql/README.md b/plugins/emqttd_plugin_mysql/README.md new file mode 100644 index 000000000..481a8f3e7 --- /dev/null +++ b/plugins/emqttd_plugin_mysql/README.md @@ -0,0 +1,49 @@ +## Overview + +Authentication with user table of MySQL database. + +## etc/plugin.config + +```erlang +[ + {emysql, [ + {pool, 4}, + {host, "localhost"}, + {port, 3306}, + {username, ""}, + {password, ""}, + {database, "mqtt"}, + {encoding, utf8} + ]}, + {emqttd_auth_mysql, [ + {user_table, mqtt_users}, + %% plain password only + {password_hash, plain}, + {field_mapper, [ + {username, username}, + {password, password} + ]} + ]} +]. +``` + +## Users Table(Demo) + +Notice: This is a demo table. You could authenticate with any user tables. + +``` +CREATE TABLE `mqtt_users` ( + `id` int(11) unsigned NOT NULL AUTO_INCREMENT, + `username` varchar(60) DEFAULT NULL, + `password` varchar(60) DEFAULT NULL, + `salt` varchar(20) DEFAULT NULL, + `created` datetime DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `mqtt_users_username` (`username`) +) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; +``` + +## Load Plugin + +Merge the'etc/plugin.config' to emqttd/etc/plugins.config, and the plugin will be loaded by the broker. + diff --git a/plugins/emqttd_plugin_mysql/c_src/base64.c b/plugins/emqttd_plugin_mysql/c_src/base64.c new file mode 100755 index 000000000..457c1a138 --- /dev/null +++ b/plugins/emqttd_plugin_mysql/c_src/base64.c @@ -0,0 +1,151 @@ +/* + * Copyright (c) 1995, 1996, 1997 Kungliga Tekniska Hgskolan + * (Royal Institute of Technology, Stockholm, Sweden). + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. All advertising materials mentioning features or use of this software + * must display the following acknowledgement: + * This product includes software developed by the Kungliga Tekniska + * Hgskolan and its contributors. + * + * 4. Neither the name of the Institute nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#ifdef HAVE_CONFIG_H +#include +/*RCSID("$Id: base64.c,v 1.1 2005/02/11 07:34:35 jpm Exp jpm $");*/ +#endif +#include +#include +#include "base64.h" + +static char base64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + +static int pos(char c) +{ + char *p; + for(p = base64; *p; p++) + if(*p == c) + return p - base64; + return -1; +} + +int base64_encode(const void *data, int size, char **str) +{ + char *s, *p; + int i; + int c; + unsigned char *q; + + p = s = (char*)malloc(size*4/3+4); + if (p == NULL) + return -1; + q = (unsigned char*)data; + i=0; + for(i = 0; i < size;){ + c=q[i++]; + c*=256; + if(i < size) + c+=q[i]; + i++; + c*=256; + if(i < size) + c+=q[i]; + i++; + p[0]=base64[(c&0x00fc0000) >> 18]; + p[1]=base64[(c&0x0003f000) >> 12]; + p[2]=base64[(c&0x00000fc0) >> 6]; + p[3]=base64[(c&0x0000003f) >> 0]; + if(i > size) + p[3]='='; + if(i > size+1) + p[2]='='; + p+=4; + } + *p=0; + *str = s; + return strlen(s); +} + +int base64_decode(const char *str, void *data) +{ + const char *p; + unsigned char *q; + int c; + int x; + int done = 0; + q=(unsigned char*)data; + for(p=str; *p && !done; p+=4){ + x = pos(p[0]); + if(x >= 0) + c = x; + else{ + done = 3; + break; + } + c*=64; + + x = pos(p[1]); + if(x >= 0) + c += x; + else + return -1; + c*=64; + + if(p[2] == '=') + done++; + else{ + x = pos(p[2]); + if(x >= 0) + c += x; + else + return -1; + } + c*=64; + + if(p[3] == '=') + done++; + else{ + if(done) + return -1; + x = pos(p[3]); + if(x >= 0) + c += x; + else + return -1; + } + if(done < 3) + *q++=(c&0x00ff0000)>>16; + + if(done < 2) + *q++=(c&0x0000ff00)>>8; + if(done < 1) + *q++=(c&0x000000ff)>>0; + } + return q - (unsigned char*)data; +} diff --git a/plugins/emqttd_plugin_mysql/c_src/base64.h b/plugins/emqttd_plugin_mysql/c_src/base64.h new file mode 100755 index 000000000..380a31d49 --- /dev/null +++ b/plugins/emqttd_plugin_mysql/c_src/base64.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 1995, 1996, 1997 Kungliga Tekniska Hgskolan + * (Royal Institute of Technology, Stockholm, Sweden). + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. All advertising materials mentioning features or use of this software + * must display the following acknowledgement: + * This product includes software developed by the Kungliga Tekniska + * Hgskolan and its contributors. + * + * 4. Neither the name of the Institute nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +/* $Id: base64.h,v 1.1 2005/02/11 07:34:35 jpm Exp jpm $ */ + +#ifndef _BASE64_H_ +#define _BASE64_H_ + +int base64_encode(const void *data, int size, char **str); +int base64_decode(const char *str, void *data); + +#endif diff --git a/plugins/emqttd_plugin_mysql/c_src/emqttd_plugin_mysql_app.c b/plugins/emqttd_plugin_mysql/c_src/emqttd_plugin_mysql_app.c new file mode 100755 index 000000000..374de99ac --- /dev/null +++ b/plugins/emqttd_plugin_mysql/c_src/emqttd_plugin_mysql_app.c @@ -0,0 +1,60 @@ +// This file is part of Jiffy released under the MIT license. +// See the LICENSE file for more information. + +#include "emqttd_plugin_mysql_app.h" + +static int +load(ErlNifEnv* env, void** priv, ERL_NIF_TERM info) +{ + emqttd_plugin_mysql_app_st* st = enif_alloc(sizeof(emqttd_plugin_mysql_app_st)); + if(st == NULL) { + return 1; + } + + st->atom_ok = make_atom(env, "ok"); + st->atom_error = make_atom(env, "error"); + st->atom_null = make_atom(env, "null"); + st->atom_true = make_atom(env, "true"); + st->atom_false = make_atom(env, "false"); + st->atom_bignum = make_atom(env, "bignum"); + st->atom_bignum_e = make_atom(env, "bignum_e"); + st->atom_bigdbl = make_atom(env, "bigdbl"); + st->atom_partial = make_atom(env, "partial"); + st->atom_uescape = make_atom(env, "uescape"); + st->atom_pretty = make_atom(env, "pretty"); + st->atom_force_utf8 = make_atom(env, "force_utf8"); + + // Markers used in encoding + st->ref_object = make_atom(env, "$object_ref$"); + st->ref_array = make_atom(env, "$array_ref$"); + + *priv = (void*) st; + + return 0; +} + +static int +reload(ErlNifEnv* env, void** priv, ERL_NIF_TERM info) +{ + return 0; +} + +static int +upgrade(ErlNifEnv* env, void** priv, void** old_priv, ERL_NIF_TERM info) +{ + return load(env, priv, info); +} + +static void +unload(ErlNifEnv* env, void* priv) +{ + enif_free(priv); + return; +} + +static ErlNifFunc funcs[] = +{ + {"nif_pbkdf2_check", 2, pbkdf2_check} +}; + +ERL_NIF_INIT(emqttd_plugin_mysql_app, funcs, &load, &reload, &upgrade, &unload); diff --git a/plugins/emqttd_plugin_mysql/c_src/emqttd_plugin_mysql_app.h b/plugins/emqttd_plugin_mysql/c_src/emqttd_plugin_mysql_app.h new file mode 100755 index 000000000..e77ae3b29 --- /dev/null +++ b/plugins/emqttd_plugin_mysql/c_src/emqttd_plugin_mysql_app.h @@ -0,0 +1,44 @@ +// This file is part of Jiffy released under the MIT license. +// See the LICENSE file for more information. + +#ifndef EMQTTD_PLUGIN_MYSQL_APP_H +#define EMQTTD_PLUGIN_MYSQL_APP_H + +#include "erl_nif.h" + +typedef struct { + ERL_NIF_TERM atom_ok; + ERL_NIF_TERM atom_error; + ERL_NIF_TERM atom_null; + ERL_NIF_TERM atom_true; + ERL_NIF_TERM atom_false; + ERL_NIF_TERM atom_bignum; + ERL_NIF_TERM atom_bignum_e; + ERL_NIF_TERM atom_bigdbl; + ERL_NIF_TERM atom_partial; + ERL_NIF_TERM atom_uescape; + ERL_NIF_TERM atom_pretty; + ERL_NIF_TERM atom_force_utf8; + + ERL_NIF_TERM ref_object; + ERL_NIF_TERM ref_array; +} emqttd_plugin_mysql_app_st; + +ERL_NIF_TERM make_atom(ErlNifEnv* env, const char* name); +ERL_NIF_TERM make_ok(emqttd_plugin_mysql_app_st* st, ErlNifEnv* env, ERL_NIF_TERM data); +ERL_NIF_TERM make_error(emqttd_plugin_mysql_app_st* st, ErlNifEnv* env, const char* error); + +ERL_NIF_TERM pbkdf2_check(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); + +int int_from_hex(const unsigned char* p); +int int_to_hex(int val, char* p); +int utf8_len(int c); +int utf8_esc_len(int c); +int utf8_validate(unsigned char* data, size_t size); +int utf8_to_unicode(unsigned char* buf, size_t size); +int unicode_to_utf8(int c, unsigned char* buf); +int unicode_from_pair(int hi, int lo); +int unicode_uescape(int c, char* buf); +int double_to_shortest(char *buf, size_t size, size_t* len, double val); + +#endif // Included EMQTTD_PLUGIN_MYSQL_APP_H diff --git a/plugins/emqttd_plugin_mysql/c_src/pbkdf2_check.c b/plugins/emqttd_plugin_mysql/c_src/pbkdf2_check.c new file mode 100644 index 000000000..0e40b933b --- /dev/null +++ b/plugins/emqttd_plugin_mysql/c_src/pbkdf2_check.c @@ -0,0 +1,278 @@ +/* + * Copyright (c) 2013 Jan-Piet Mens + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of mosquitto nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include "base64.h" +#include "erl_nif.h" +#include "emqttd_plugin_mysql_app.h" + +#define KEY_LENGTH 24 +#define SEPARATOR "$" +#define SEPARATOR1 "_" +#define TRUE (1) +#define FALSE (0) + +/* + * Split PBKDF2$... string into their components. The caller must free() + * the strings. + */ + +static int detoken(char *pbkstr, char **sha, int *iter, char **salt, char **key) +{ + char *p, *s, *save; + int rc = 1; + + save = s = strdup(pbkstr); + + if ((p = strsep(&s, SEPARATOR1)) == NULL) + goto out; + if (strcmp(p, "pbkdf2") != 0) + goto out; + + if ((p = strsep(&s, SEPARATOR)) == NULL) + goto out; + *sha = strdup(p); + + if ((p = strsep(&s, SEPARATOR)) == NULL) + goto out; + *iter = atoi(p); + + if ((p = strsep(&s, SEPARATOR)) == NULL) + goto out; + *salt = strdup(p); + + if ((p = strsep(&s, SEPARATOR)) == NULL) + goto out; + *key = strdup(p); + + rc = 0; + +out: + free(save); + return rc; +} + + ERL_NIF_TERM +pbkdf2_check(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + ERL_NIF_TERM ret; + ErlNifBinary binps, binhash; + emqttd_plugin_mysql_app_st* st = enif_alloc(sizeof(emqttd_plugin_mysql_app_st)); + if(st == NULL) { + return make_atom(env, "alloc_error"); + } + + st->atom_ok = make_atom(env, "ok"); + st->atom_error = make_atom(env, "error"); + st->atom_null = make_atom(env, "null"); + st->atom_true = make_atom(env, "true"); + st->atom_false = make_atom(env, "false"); + st->atom_bignum = make_atom(env, "bignum"); + st->atom_bignum_e = make_atom(env, "bignum_e"); + st->atom_bigdbl = make_atom(env, "bigdbl"); + st->atom_partial = make_atom(env, "partial"); + st->atom_uescape = make_atom(env, "uescape"); + st->atom_pretty = make_atom(env, "pretty"); + st->atom_force_utf8 = make_atom(env, "force_utf8"); + + // Markers used in encoding + st->ref_object = make_atom(env, "$object_ref$"); + st->ref_array = make_atom(env, "$array_ref$"); + + if(argc != 2) { + return make_error(st, env, "Bad args"); + } else if(!enif_inspect_binary(env, argv[0], &binps)|!enif_inspect_binary(env, argv[1], &binhash)) { + return make_error(st, env, "Bad args password or username inspect error"); + } + + char* password = (char*)binps.data; + char* hash = (char*)binhash.data; + static char *sha, *salt, *h_pw; + int iterations, saltlen, blen; + char *b64, *keybuf; + unsigned char *out; + int match = FALSE; + const EVP_MD *evpmd; + int keylen, rc; + + if (detoken(hash, &sha, &iterations, &salt, &h_pw) != 0) + return match; + + /* Determine key length by decoding base64 */ + if ((keybuf = malloc(strlen(h_pw) + 1)) == NULL) { + return make_error(st, env, "internal_error: Out Of memory"); + } + keylen = base64_decode(h_pw, keybuf); + if (keylen < 1) { + free(keybuf); + return make_atom(env, "false"); + } + free(keybuf); + + if ((out = malloc(keylen)) == NULL) { + return make_error(st, env, "Cannot allocate out; out of memory\n"); + } + +#ifdef PWDEBUG + fprintf(stderr, "sha =[%s]\n", sha); + fprintf(stderr, "iterations =%d\n", iterations); + fprintf(stderr, "salt =[%s]\n", salt); + fprintf(stderr, "h_pw =[%s]\n", h_pw); + fprintf(stderr, "kenlen =[%d]\n", keylen); +#endif + + saltlen = strlen((char *)salt); + + evpmd = EVP_sha256(); + if (strcmp(sha, "sha1") == 0) { + evpmd = EVP_sha1(); + } else if (strcmp(sha, "sha512") == 0) { + evpmd = EVP_sha512(); + } + + rc = PKCS5_PBKDF2_HMAC(password, strlen(password), + (unsigned char *)salt, saltlen, + iterations, + evpmd, keylen, out); + if (rc != 1) { + goto out; + } + + blen = base64_encode(out, keylen, &b64); + if (blen > 0) { + int i, diff = 0, hlen = strlen(h_pw); +#ifdef PWDEBUG + fprintf(stderr, "HMAC b64 =[%s]\n", b64); +#endif + + /* "manual" strcmp() to ensure constant time */ + for (i = 0; (i < blen) && (i < hlen); i++) { + diff |= h_pw[i] ^ b64[i]; + } + + match = diff == 0; + if (hlen != blen) + match = 0; + + free(b64); + } + +out: + free(sha); + free(salt); + free(h_pw); + free(out); + + if(match == 0){ + ret = make_atom(env, "false"); + }else{ + ret = make_atom(env, "true"); + } + return ret; +} + +int pbkdf2_check_native(char *password, char *hash) +{ + static char *sha, *salt, *h_pw; + int iterations, saltlen, blen; + char *b64; + unsigned char key[128]; + int match = FALSE; + const EVP_MD *evpmd; + + if (detoken(hash, &sha, &iterations, &salt, &h_pw) != 0) + return match; + +#ifdef PWDEBUG + fprintf(stderr, "sha =[%s]\n", sha); + fprintf(stderr, "iterations =%d\n", iterations); + fprintf(stderr, "salt =[%s]\n", salt); + fprintf(stderr, "h_pw =[%s]\n", h_pw); +#endif + + saltlen = strlen((char *)salt); + + evpmd = EVP_sha256(); + if (strcmp(sha, "sha1") == 0) { + evpmd = EVP_sha1(); + } else if (strcmp(sha, "sha512") == 0) { + evpmd = EVP_sha512(); + } + + PKCS5_PBKDF2_HMAC(password, strlen(password), + (unsigned char *)salt, saltlen, + iterations, + evpmd, KEY_LENGTH, key); + + blen = base64_encode(key, KEY_LENGTH, &b64); + if (blen > 0) { + int i, diff = 0, hlen = strlen(h_pw); +#ifdef PWDEBUG + fprintf(stderr, "HMAC b64 =[%s]\n", b64); +#endif + + /* "manual" strcmp() to ensure constant time */ + for (i = 0; (i < blen) && (i < hlen); i++) { + diff |= h_pw[i] ^ b64[i]; + } + + match = diff == 0; + if (hlen != blen) + match = 0; + + free(b64); + } + + free(sha); + free(salt); + free(h_pw); + + return match; +} +int main() +{ + // char password[] = "hello"; + // char PB1[] = "PBKDF2$sha256$10000$eytf9sEo8EprP9P3$2eO6tROHiqI3bm+gg+vpmWooWMpz1zji"; + char password[] = "supersecret"; + //char PB1[] = "PBKDF2$sha256$10000$YEbSTt8FaMRDq/ib$Kt97+sMCYg00mqMOBAYinqZlnxX8HqHk"; + char PB1[] = "pbkdf2_sha256$10000$YEbSTt8FaMRDq/ib$Kt97+sMCYg00mqMOBAYinqZlnxX8HqHk"; + // char PB1[] = "PBKDF2$sha1$10000$XWfyPLeC9gsD6SbI$HOnjU4Ux7RpeBHdqYxpIGH1R5qCCtNA1"; + // char PB1[] = "PBKDF2$sha512$10000$v/aaCgBZ+VZN5L8n$BpgjSTyb4weVxr9cA2mvQ+jaCyaAPeYe"; + int match; + + printf("Checking password [%s] for %s\n", password, PB1); + + match = pbkdf2_check_native(password, PB1); + printf("match == %d\n", match); + return match; +} diff --git a/plugins/emqttd_plugin_mysql/c_src/util.c b/plugins/emqttd_plugin_mysql/c_src/util.c new file mode 100755 index 000000000..8c0291b22 --- /dev/null +++ b/plugins/emqttd_plugin_mysql/c_src/util.c @@ -0,0 +1,26 @@ +// This file is part of Jiffy released under the MIT license. +// See the LICENSE file for more information. + +#include "emqttd_plugin_mysql_app.h" + +ERL_NIF_TERM +make_atom(ErlNifEnv* env, const char* name) +{ + ERL_NIF_TERM ret; + if(enif_make_existing_atom(env, name, &ret, ERL_NIF_LATIN1)) { + return ret; + } + return enif_make_atom(env, name); +} + +ERL_NIF_TERM +make_ok(emqttd_plugin_mysql_app_st* st, ErlNifEnv* env, ERL_NIF_TERM value) +{ + return enif_make_tuple2(env, st->atom_ok, value); +} + +ERL_NIF_TERM +make_error(emqttd_plugin_mysql_app_st* st, ErlNifEnv* env, const char* error) +{ + return enif_make_tuple2(env, st->atom_error, make_atom(env, error)); +} diff --git a/plugins/emqttd_plugin_mysql/etc/plugin.config b/plugins/emqttd_plugin_mysql/etc/plugin.config new file mode 100644 index 000000000..4b5044bde --- /dev/null +++ b/plugins/emqttd_plugin_mysql/etc/plugin.config @@ -0,0 +1,23 @@ +[ + {emysql, [ + {pool, 4}, + {host, "localhost"}, + {port, 3306}, + {username, "root"}, + {password, "root"}, + {database, "emqtt"}, + {encoding, utf8} + ]}, + {emqttd_plugin_mysql, [ + {users_table, auth_user}, + {acls_table, auth_acl}, + {field_mapper, [ + {username, username}, + {password, password, pbkdf2}, + {user_super, is_super_user}, + {acl_username, username}, + {acl_rw, rw}, + {acl_topic, topic} + ]} + ]} +]. diff --git a/plugins/emqttd_plugin_mysql/etc/plugins.config b/plugins/emqttd_plugin_mysql/etc/plugins.config new file mode 100644 index 000000000..7b132dd3f --- /dev/null +++ b/plugins/emqttd_plugin_mysql/etc/plugins.config @@ -0,0 +1,23 @@ +[ + {emysql, [ + {pool, 4}, + {host, "59.188.253.198"}, + {port, 3306}, + {username, "root"}, + {password, "lhroot."}, + {database, "musicfield"}, + {encoding, utf8} + ]}, + {emqttd_plugin_mysql, [ + {users_table, auth_user}, + {acls_table, auth_acl}, + {field_mapper, [ + {username, username}, + {password, password, pbkdf2}, + {user_super, is_super_user}, + {acl_username, username}, + {acl_rw, rw}, + {acl_topic, topic} + ]} + ]} +]. diff --git a/plugins/emqttd_plugin_mysql/include/emqttd.hrl b/plugins/emqttd_plugin_mysql/include/emqttd.hrl new file mode 100644 index 000000000..9c2ab934a --- /dev/null +++ b/plugins/emqttd_plugin_mysql/include/emqttd.hrl @@ -0,0 +1,112 @@ +%%------------------------------------------------------------------------------ +%% Copyright (c) 2012-2015, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ +%%% @doc +%%% MQTT Broker Header. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +%%------------------------------------------------------------------------------ +%% Banner +%%------------------------------------------------------------------------------ +-define(COPYRIGHT, "Copyright (C) 2012-2015, Feng Lee "). + +-define(LICENSE_MESSAGE, "Licensed under MIT"). + +-define(PROTOCOL_VERSION, "MQTT/3.1.1"). + +-define(ERTS_MINIMUM, "6.0"). + +%%------------------------------------------------------------------------------ +%% PubSub +%%------------------------------------------------------------------------------ +-type pubsub() :: publish | subscribe. + +%%------------------------------------------------------------------------------ +%% MQTT Topic +%%------------------------------------------------------------------------------ +-record(mqtt_topic, { + topic :: binary(), + node :: node() +}). + +-type mqtt_topic() :: #mqtt_topic{}. + +%%------------------------------------------------------------------------------ +%% MQTT Subscriber +%%------------------------------------------------------------------------------ +-record(mqtt_subscriber, { + topic :: binary(), + qos = 0 :: 0 | 1 | 2, + pid :: pid() +}). + +-type mqtt_subscriber() :: #mqtt_subscriber{}. + +%%------------------------------------------------------------------------------ +%% P2P Queue Subscriber +%%------------------------------------------------------------------------------ +-record(mqtt_queue, { + name :: binary(), + subpid :: pid(), + qos = 0 :: 0 | 1 | 2 +}). + +-type mqtt_queue() :: #mqtt_queue{}. + +%%------------------------------------------------------------------------------ +%% MQTT Client +%%------------------------------------------------------------------------------ +-record(mqtt_client, { + clientid :: binary(), + username :: binary() | undefined, + ipaddr :: inet:ip_address() +}). + +-type mqtt_client() :: #mqtt_client{}. + +%%------------------------------------------------------------------------------ +%% MQTT Session +%%------------------------------------------------------------------------------ +-record(mqtt_session, { + clientid, + session_pid, + subscriptions = [], + awaiting_ack, + awaiting_rel +}). + +-type mqtt_session() :: #mqtt_session{}. + +%%------------------------------------------------------------------------------ +%% MQTT Plugin +%%------------------------------------------------------------------------------ +-record(mqtt_plugin, { + name, + version, + attrs, + description +}). + +-type mqtt_plugin() :: #mqtt_plugin{}. + + diff --git a/plugins/emqttd_plugin_mysql/priv/.placeholder b/plugins/emqttd_plugin_mysql/priv/.placeholder new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/emqttd_plugin_mysql/rebar b/plugins/emqttd_plugin_mysql/rebar new file mode 100755 index 000000000..36ef01107 Binary files /dev/null and b/plugins/emqttd_plugin_mysql/rebar differ diff --git a/plugins/emqttd_plugin_mysql/rebar.config b/plugins/emqttd_plugin_mysql/rebar.config new file mode 100755 index 000000000..df10d9593 --- /dev/null +++ b/plugins/emqttd_plugin_mysql/rebar.config @@ -0,0 +1,32 @@ +{port_specs, [ + {"priv/emqttd_plugin_mysql_app.so", [ + "c_src/*.c" + ]} +]}. + +{port_env, [ + {".*", "CXXFLAGS", "$CXXFLAGS -g -Wall -Werror -O3"}, + + {"(linux|solaris|freebsd|netbsd|openbsd|dragonfly|darwin)", + "LDFLAGS", "$LDFLAGS -lstdc++ -lcrypto"}, + + %% OS X Leopard flags for 64-bit + {"darwin9.*-64$", "CXXFLAGS", "-m64"}, + {"darwin9.*-64$", "LDFLAGS", "-arch x86_64"}, + + %% OS X Snow Leopard flags for 32-bit + {"darwin10.*-32$", "CXXFLAGS", "-m32"}, + {"darwin10.*-32$", "LDFLAGS", "-arch i386"}, + + %% This will merge into basho/rebar/rebar.config eventually + {"win32", "CFLAGS", "/Wall /DWIN32 /D_WINDOWS /D_WIN32 /DWINDOWS"}, + {"win32", "CXXFLAGS", "-g -Wall -O3"} +]}. + + +{eunit_opts, [ + verbose, + {report, { + eunit_surefire, [{dir,"."}] + }} +]}. diff --git a/plugins/emqttd_plugin_mysql/src/emqttd_acl_mysql.erl b/plugins/emqttd_plugin_mysql/src/emqttd_acl_mysql.erl new file mode 100644 index 000000000..a2c995367 --- /dev/null +++ b/plugins/emqttd_plugin_mysql/src/emqttd_acl_mysql.erl @@ -0,0 +1,70 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd demo acl module. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_acl_mysql). + +-include("emqttd.hrl"). + +-behaviour(emqttd_acl_mod). + +%% ACL callbacks +-export([init/1, check_acl/2, reload_acl/1, description/0]). +-record(state, {user_table, acl_table, acl_username_field, acl_topic_field, acl_rw_field, user_name_field, user_super_field}). + +init(Opts) -> + Mapper = proplists:get_value(field_mapper, Opts), + State = + #state{ + user_table = proplists:get_value(users_table, Opts, auth_user), + user_super_field = proplists:get_value(is_super, Mapper, is_superuser), + user_name_field = proplists:get_value(username, Mapper, username), + acl_table = proplists:get_value(acls_table, Opts, auth_acl), + acl_username_field = proplists:get_value(acl_username, Mapper, username), + acl_rw_field = proplists:get_value(acl_rw, Mapper, rw), + acl_topic_field = proplists:get_value(acl_topic, Mapper, topic) + }, + {ok, State}. + +check_acl({#mqtt_client{username = Username}, PubSub, Topic}, #state{user_table = UserTab, acl_table = AclTab, user_name_field = UsernameField, user_super_field = SuperField, acl_topic_field = TopicField, acl_username_field = AclUserField, acl_rw_field = AclRwField}) -> + Flag = case PubSub of publish -> 2; subscribe -> 1; pubsub -> 2 end, + Where = {'and', {'>=', AclRwField, Flag}, {TopicField, Topic}}, + Where1 = {'or', {AclUserField, Username}, {AclUserField, "*"}}, + Where2 = {'and', Where, Where1}, + case emysql:select(UserTab, {'and', {UsernameField, Username}, {SuperField, 1}}) of + {ok, []} -> + case emysql:select(UserTab, {UsernameField, Username}) of + {ok, []} -> ignore; + {ok, _} -> case emysql:select(AclTab, Where2) of + {ok, []} -> deny; + {ok, _Record} -> allow + end + end; + {ok, _} -> allow + end. + +reload_acl(_State) -> ok. + +description() -> "ACL Module by Mysql". diff --git a/plugins/emqttd_plugin_mysql/src/emqttd_auth_mysql.erl b/plugins/emqttd_plugin_mysql/src/emqttd_auth_mysql.erl new file mode 100644 index 000000000..a913dd587 --- /dev/null +++ b/plugins/emqttd_plugin_mysql/src/emqttd_auth_mysql.erl @@ -0,0 +1,110 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd authentication by mysql 'user' table. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_auth_mysql). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-behaviour(emqttd_auth_mod). + +-export([init/1, check/3, description/0]). + +-define(NOT_LOADED, not_loaded(?LINE)). + +-record(state, {user_table, name_field, pass_field, pass_hash}). + +init(Opts) -> + Mapper = proplists:get_value(field_mapper, Opts), + {ok, #state{user_table = proplists:get_value(user_table, Opts, auth_user), + name_field = proplists:get_value(username, Mapper), + pass_field = proplists:get_value(password, Mapper), + pass_hash = proplists:get_value(Opts, password_hash)}}. + +check(#mqtt_client{username = undefined}, _Password, _State) -> + {error, "Username undefined"}; +check(_Client, undefined, _State) -> + {error, "Password undefined"}; +check(#mqtt_client{username = Username}, Password, + #state{user_table = UserTab, pass_hash = Type, + name_field = NameField, pass_field = PassField}) -> + Where = {'and', {NameField, Username}, {PassField, hash(Type, Password)}}, + if Type =:= pbkdf2 -> + case emysql:select(UserTab, [PassField], {NameField, Username}) of + {ok, []} -> {error, "User not exist"}; + {ok, Records} -> + if length(Records) =:= 1 -> + case pbkdf2_check(Password, lists:nth(Records, 1)) of + true -> + {ok, []}; + false -> + {error, "UserName or Password is invalid"}; + ErrorInfo -> + {error, ErrorInfo} + end; + true -> + {error, "UserName is ambiguous"} + end + end; + true -> + case emysql:select(UserTab, Where) of + {ok, []} -> {error, "Username or Password "}; + {ok, _Record} -> ok + end + end. + +description() -> "Authentication by MySQL". + +hash(plain, Password) -> + Password; + +hash(md5, Password) -> + hexstring(crypto:hash(md5, Password)); + +hash(sha, Password) -> + hexstring(crypto:hash(sha, Password)). + +hexstring(<>) -> + lists:flatten(io_lib:format("~32.16.0b", [X])); + +hexstring(<>) -> + lists:flatten(io_lib:format("~40.16.0b", [X])). + +not_loaded(Line) -> + erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, Line}]}). + +pbkdf2_check(Password, Pbkstr) -> + case nif_pbkdf2_check(Password, Pbkstr) of + {error, _} = Error -> + throw(Error); + IOData -> + IOData + end. + +nif_pbkdf2_check(Password, Pbkstr) -> + ?NOT_LOADED. + diff --git a/plugins/emqttd_plugin_mysql/src/emqttd_plugin_mysql.app.src b/plugins/emqttd_plugin_mysql/src/emqttd_plugin_mysql.app.src new file mode 100644 index 000000000..389bd33cb --- /dev/null +++ b/plugins/emqttd_plugin_mysql/src/emqttd_plugin_mysql.app.src @@ -0,0 +1,12 @@ +{application, emqttd_plugin_mysql, + [ + {description, "emqttd MySQL Authentication Plugin"}, + {vsn, "1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {mod, {emqttd_plugin_mysql_app, []}}, + {env, []} + ]}. diff --git a/plugins/emqttd_plugin_mysql/src/emqttd_plugin_mysql_app.erl b/plugins/emqttd_plugin_mysql/src/emqttd_plugin_mysql_app.erl new file mode 100644 index 000000000..712e75ea6 --- /dev/null +++ b/plugins/emqttd_plugin_mysql/src/emqttd_plugin_mysql_app.erl @@ -0,0 +1,80 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd mysql authentication app. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_plugin_mysql_app). +-on_load(init/0). +-behaviour(application). +%% Application callbacks +-export([start/2, prep_stop/1, stop/1, nif_pbkdf2_check/2]). + +-behaviour(supervisor). +%% Supervisor callbacks +-export([init/1]). +-define(NOT_LOADED, not_loaded(?LINE)). + + +%%%============================================================================= +%%% Application callbacks +%%%============================================================================= + +start(_StartType, _StartArgs) -> + Env = application:get_all_env(), + emqttd_access_control:register_mod(auth, emqttd_auth_mysql, Env), + emqttd_access_control:register_mod(acl, emqttd_acl_mysql, Env), + crypto:start(), + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +prep_stop(State) -> + emqttd_access_control:unregister_mod(auth, emqttd_auth_mysql), State, + emqttd_access_control:unregister_mod(acl, emqttd_acl_mysql), State, + crypto:stop(). + +stop(_State) -> + ok. + +init() -> + PrivDir = case code:priv_dir(?MODULE) of + {error, _} -> + EbinDir = filename:dirname(code:which(?MODULE)), + AppPath = filename:dirname(EbinDir), + filename:join(AppPath, "priv"); + Path -> + Path + end, + erlang:load_nif(filename:join(PrivDir, "emqttd_plugin_mysql_app"), 0). + +%%%============================================================================= +%%% Supervisor callbacks(Dummy) +%%%============================================================================= + +init([]) -> + {ok, {{one_for_one, 5, 10}, []}}. + +not_loaded(Line) -> + erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, Line}]}). + +nif_pbkdf2_check(Password, Hash) -> + ?NOT_LOADED. diff --git a/plugins/emqttd_presence/src/emqttd_presence.app.src b/plugins/emqttd_presence/src/emqttd_presence.app.src deleted file mode 100644 index 1f95fb6aa..000000000 --- a/plugins/emqttd_presence/src/emqttd_presence.app.src +++ /dev/null @@ -1,12 +0,0 @@ -{application, emqttd_presence, - [ - {description, ""}, - {vsn, "1"}, - {registered, []}, - {applications, [ - kernel, - stdlib - ]}, - {mod, { emqttd_presence_app, []}}, - {env, []} - ]}. diff --git a/plugins/emqttd_presence/src/emqttd_presence_mgr.erl b/plugins/emqttd_presence/src/emqttd_presence_mgr.erl deleted file mode 100644 index cf0e12b7a..000000000 --- a/plugins/emqttd_presence/src/emqttd_presence_mgr.erl +++ /dev/null @@ -1,78 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd client presence manager -%%% -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_presence_mgr). - --behaviour(gen_server). - --define(SERVER, ?MODULE). - -%% ------------------------------------------------------------------ -%% API Function Exports -%% ------------------------------------------------------------------ - --export([start_link/0]). - -%% ------------------------------------------------------------------ -%% gen_server Function Exports -%% ------------------------------------------------------------------ - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -%% ------------------------------------------------------------------ -%% API Function Definitions -%% ------------------------------------------------------------------ - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%% ------------------------------------------------------------------ -%% gen_server Function Definitions -%% ------------------------------------------------------------------ - -init(Args) -> - {ok, Args}. - -handle_call(_Request, _From, State) -> - {reply, ok, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% ------------------------------------------------------------------ -%% Internal Function Definitions -%% ------------------------------------------------------------------ - diff --git a/plugins/emysql/src/emysql.erl b/plugins/emysql/src/emysql.erl index bc7cd352f..f6a620cfd 100644 --- a/plugins/emysql/src/emysql.erl +++ b/plugins/emysql/src/emysql.erl @@ -384,9 +384,15 @@ encode_where({like, Field, Value}) -> encode_where({'<', Field, Value}) -> atom_to_list(Field) ++ " < " ++ encode(Value); +encode_where({'<=', Field, Value}) -> + atom_to_list(Field) ++ " <= " ++ encode(Value); + encode_where({'>', Field, Value}) -> atom_to_list(Field) ++ " > " ++ encode(Value); +encode_where({'>=', Field, Value}) -> + atom_to_list(Field) ++ " >= " ++ encode(Value); + encode_where({'in', Field, Values}) -> InStr = string:join([encode(Value) || Value <- Values], ","), atom_to_list(Field) ++ " in (" ++ InStr ++ ")"; diff --git a/plugins/emysql/src/emysql_sup.erl b/plugins/emysql/src/emysql_sup.erl index fde735c61..b915f3593 100644 --- a/plugins/emysql/src/emysql_sup.erl +++ b/plugins/emysql/src/emysql_sup.erl @@ -21,7 +21,7 @@ start_link(Opts) -> supervisor:start_link({local, ?MODULE}, ?MODULE, Opts). init(Opts) -> - PoolSize = proplists:get_value(pool_size, Opts, + PoolSize = proplists:get_value(pool, Opts, erlang:system_info(schedulers)), {ok, {{one_for_one, 10, 10}, [{emysql, {emysql, start_link, [PoolSize]}, transient, @@ -31,3 +31,4 @@ init(Opts) -> } }. + diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 6681abbae..6a723470c 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -62,8 +62,8 @@ {packet, [ %% Max ClientId Length Allowed {max_clientid_len, 1024}, - %% Max Packet Size Allowed, 4K default - {max_packet_size, 4096} + %% Max Packet Size Allowed, 64K default + {max_packet_size, 65536} ]}, %% Client {client, [ @@ -86,15 +86,12 @@ %% System interval of publishing broker $SYS messages {sys_interval, 60}, - %% Subscribe these topics automatically when client connected - {forced_subscriptions, [{"$Q/client/$c", 0}]}, - %% Retained messages {retained, [ %% Max number of retained messages {max_message_num, 100000}, %% Max Payload Size of retained message - {max_playload_size, 4096} + {max_playload_size, 65536} ]}, %% PubSub {pubsub, [ @@ -109,6 +106,14 @@ {ping_down_interval, 1} %seconds ]} ]}, + %% Modules + {modules, [ + %% Subscribe topics automatically when client connected + {autosub, [{"$Q/client/$c", 0}]} + %% Rewrite rules + %% {rewrite, [{file, "etc/rewrite.config"}]} + + ]}, %% Listeners {listeners, [ {mqtt, 1883, [ @@ -135,8 +140,8 @@ %% Socket Access Control {access, [{allow, all}]}, %% SSL certificate and key files - {ssl, [{certfile, "etc/ssl.crt"}, - {keyfile, "etc/ssl.key"}]}, + {ssl, [{certfile, "etc/ssl/ssl.crt"}, + {keyfile, "etc/ssl/ssl.key"}]}, %% Socket Options {sockopts, [ {backlog, 1024} diff --git a/rel/files/plugins.config b/rel/files/plugins.config index 9267dae6e..57dcb8abf 100644 --- a/rel/files/plugins.config +++ b/rel/files/plugins.config @@ -1,20 +1,20 @@ [ - {emysql, [ - {pool_size, 4}, - {host, "localhost"}, - {port, 3306}, - {username, "root"}, - {password, "public"}, - {database, "mqtt"}, - {encoding, utf8} - ]}, - {emqttd_auth_mysql, [ - {user_table, mqtt_users} - ]} +% {emysql, [ +% {pool_size, 4}, +% {host, "localhost"}, +% {port, 3306}, +% {username, "root"}, +% {password, "public"}, +% {database, "mqtt"}, +% {encoding, utf8} +% ]}, +% {emqttd_auth_mysql, [ +% {user_table, mqtt_users} +% ]} % % {emqttd_dashboard, [ % {listener, -% {http, 8080, [ +% {http, 18083, [ % {acceptors, 4}, % {max_clients, 512}]}} % ]} diff --git a/rel/files/rewrite.config b/rel/files/rewrite.config new file mode 100644 index 000000000..494a85f74 --- /dev/null +++ b/rel/files/rewrite.config @@ -0,0 +1,14 @@ +%%%----------------------------------------------------------------------------- +%% +%% [Rewrite](https://github.com/emqtt/emqttd/wiki/Rewrite) +%% +%%%----------------------------------------------------------------------------- + +%{topic, "x/#", [ +% {rewrite, "^x/y/(.+)$", "z/y/$1"}, +% {rewrite, "^x/(.+)$", "y/$1"} +%]}. + +%{topic, "y/+/z/#", [ +% {rewrite, "^y/(.+)/z/(.+)$", "y/z/$2"} +%]}. diff --git a/rel/reltool.config b/rel/reltool.config index 933300fd3..ac5d09e83 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -63,6 +63,7 @@ {overlay, [ {mkdir, "log/"}, {mkdir, "etc/"}, + {mkdir, "etc/ssl/"}, {mkdir, "data/"}, {mkdir, "plugins/"}, {copy, "files/erl", "\{\{erts_vsn\}\}/bin/erl"}, @@ -72,10 +73,11 @@ {template, "files/emqttd.cmd", "bin/emqttd.cmd"}, {copy, "files/start_erl.cmd", "bin/start_erl.cmd"}, {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, - {copy, "files/ssl/ssl.crt", "etc/ssl.crt"}, - {copy, "files/ssl/ssl.key", "etc/ssl.key"}, + {copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"}, + {copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"}, {template, "files/emqttd.config", "etc/emqttd.config"}, {template, "files/acl.config", "etc/acl.config"}, + {template, "files/rewrite.config", "etc/rewrite.config"}, {template, "files/clients.config", "etc/clients.config"}, {template, "files/plugins.config", "etc/plugins.config"}, {template, "files/vm.args", "etc/vm.args"}