Merge pull request #141 from kimiscircle/master

add mysql acl, auth plugin
This commit is contained in:
Feng Lee 2015-05-27 13:13:37 +08:00
commit 4282c135d5
69 changed files with 2149 additions and 391 deletions

View File

@ -2,6 +2,44 @@
emqttd ChangeLog
==================
0.8.1-alpha (2015-05-28)
-------------------------
Bugfix: issue #138 - when client disconnected normally, broker will not publish disconnected $SYS message
Improve: issue #136 - $SYS topics result should not include $SYS messages
0.8.0-alpha (2015-05-25)
-------------------------
[Hooks](https://github.com/emqtt/emqttd/wiki/Hooks%20Design), Modules and [Plugins](https://github.com/emqtt/emqttd/wiki/Plugin%20Design) to extend the broker Now!
Plugin: emqttd_auth_mysql - MySQL authentication plugin (issues #116, #120)
Plugin: emqttd_auth_ldap - LDAP authentication plugin
Feature: emqttd_broker to support Hooks API
Feature: issue #111 - Support 'Forced Subscriptions' by emqttd_mod_autosub module
Feature: issue #126 - Support 'Rewrite rules' by emqttd_mod_rewrite module
Improve: Support hooks, modules to extend the broker
Improve: issue #76 - dialyzer check
Improve: 'Get Started', 'User Guide', 'Developer Guide' Wiki
Improve: emqtt_topic to add join/1, feed_var/3, is_queue/1
Improve: emqttd_pooler to execute common tasks
Improve: add emqttd_sm_sup module, and use 'hash' gproc_pool to manage sessions
Tests: add more test cases for 'emqttd' app
0.7.1-alpha (2015-05-04)
-------------------------

View File

@ -33,7 +33,7 @@ emqttd is aimed to provide a solid, enterprise grade, extensible open-source MQT
* Cluster brokers on several servers.
* Bridge brokers locally or remotelly
* 500K+ concurrent clients connections per server
* Extensible architecture with plugin support
* Extensible architecture with Hooks, Modules and Plugins
* Passed eclipse paho interoperability tests

12
TODO
View File

@ -2,16 +2,22 @@
v0.9.0-alpha (2015-05-30)
-------------------------
Presence Management....
Dashboard
Presence Management....
v0.8.0-alpha (2015-05-10)
-------------------------
Force Subscriptions...
Documents...
MySQL Auth
Dashboard
AMQP
Bridge Test
@ -20,8 +26,6 @@ Bridge Test
0.8.0 (2015-05-10)
-------------------------
Presence Management....
Force Subscription...
Point2Point Queue...

View File

@ -1,7 +1,7 @@
{application, emqtt,
[
{description, "Erlang MQTT Common Library"},
{vsn, "0.7.1"},
{vsn, "0.8.0"},
{modules, []},
{registered, []},
{applications, [

View File

@ -1,7 +1,7 @@
{application, emqttd,
[
{description, "Erlang MQTT Broker"},
{vsn, "0.7.1"},
{vsn, "0.8.0"},
{modules, []},
{registered, []},
{applications, [kernel,

View File

@ -32,6 +32,7 @@
open_listeners/1, close_listeners/1,
load_all_plugins/0, unload_all_plugins/0,
load_plugin/1, unload_plugin/1,
load_all_mods/0, is_mod_enabled/1,
loaded_plugins/0,
is_running/1]).
@ -202,6 +203,17 @@ unload_app(App) ->
lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason}
end.
load_all_mods() ->
Mods = application:get_env(emqttd, modules, []),
lists:foreach(fun({Name, Opts}) ->
Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)),
Mod:load(Opts),
lager:info("load module ~s successfully", [Name])
end, Mods).
is_mod_enabled(Name) ->
env(modules, Name) =/= undefined.
%%------------------------------------------------------------------------------
%% @doc Is running?
%% @end

View File

@ -152,7 +152,7 @@ stop() ->
%%%=============================================================================
init([]) ->
{ok, AcOpts} = application:get_env(access),
{ok, AcOpts} = application:get_env(emqttd, access),
ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]),
ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, AcOpts))}),
ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, AcOpts))}),

View File

@ -51,8 +51,9 @@ start(_StartType, _StartArgs) ->
emqttd_mnesia:start(),
{ok, Sup} = emqttd_sup:start_link(),
start_servers(Sup),
{ok, Listeners} = application:get_env(listeners),
emqttd:load_all_mods(),
emqttd:load_all_plugins(),
{ok, Listeners} = application:get_env(listeners),
emqttd:open_listeners(Listeners),
register(emqttd, self()),
print_vsn(),
@ -71,13 +72,14 @@ start_servers(Sup) ->
{"emqttd trace", emqttd_trace},
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
{"emqttd client manager", {supervisor, emqttd_cm_sup}},
{"emqttd session manager", emqttd_sm},
{"emqttd session manager", {supervisor, emqttd_sm_sup}},
{"emqttd session supervisor", {supervisor, emqttd_session_sup}},
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}},
{"emqttd stats", emqttd_stats},
{"emqttd metrics", emqttd_metrics},
%{"emqttd router", emqttd_router},
{"emqttd broker", emqttd_broker},
{"emqttd mode supervisor", emqttd_mod_sup},
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
{"emqttd access control", emqttd_access_control},
{"emqttd system monitor", emqttd_sysmon}],

View File

@ -42,6 +42,9 @@
%% Event API
-export([subscribe/1, notify/2]).
%% Hook API
-export([hook/3, unhook/2, foreach_hooks/2, foldl_hooks/3]).
%% Broker API
-export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]).
@ -127,6 +130,52 @@ datetime() ->
io_lib:format(
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
%%------------------------------------------------------------------------------
%% @doc Hook
%% @end
%%------------------------------------------------------------------------------
-spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}.
hook(Hook, Name, MFA) ->
gen_server:call(?MODULE, {hook, Hook, Name, MFA}).
%%------------------------------------------------------------------------------
%% @doc Unhook
%% @end
%%------------------------------------------------------------------------------
-spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}.
unhook(Hook, Name) ->
gen_server:call(?MODULE, {unhook, Hook, Name}).
%%------------------------------------------------------------------------------
%% @doc Foreach hooks
%% @end
%%------------------------------------------------------------------------------
-spec foreach_hooks(Hook :: atom(), Args :: list()) -> any().
foreach_hooks(Hook, Args) ->
case ets:lookup(?BROKER_TAB, {hook, Hook}) of
[{_, Hooks}] ->
lists:foreach(fun({_Name, {M, F, A}}) ->
apply(M, F, Args++A)
end, Hooks);
[] ->
ok
end.
%%------------------------------------------------------------------------------
%% @doc Foldl hooks
%% @end
%%------------------------------------------------------------------------------
-spec foldl_hooks(Hook :: atom(), Args :: list(), Acc0 :: any()) -> any().
foldl_hooks(Hook, Args, Acc0) ->
case ets:lookup(?BROKER_TAB, {hook, Hook}) of
[{_, Hooks}] ->
lists:foldl(fun({_Name, {M, F, A}}, Acc) ->
apply(M, F, [Acc, Args++A])
end, Acc0, Hooks);
[] ->
Acc0
end.
%%------------------------------------------------------------------------------
%% @doc Start a tick timer
%% @end
@ -163,8 +212,33 @@ init([]) ->
handle_call(uptime, _From, State) ->
{reply, uptime(State), State};
handle_call({hook, Hook, Name, MFArgs}, _From, State) ->
Key = {hook, Hook}, Reply =
case ets:lookup(?BROKER_TAB, Key) of
[{Key, Hooks}] ->
case lists:keyfind(Name, 1, Hooks) of
{Name, _MFArgs} ->
{error, existed};
false ->
ets:insert(?BROKER_TAB, {Key, Hooks ++ [{Name, MFArgs}]})
end;
[] ->
ets:insert(?BROKER_TAB, {Key, [{Name, MFArgs}]})
end,
{reply, Reply, State};
handle_call({unhook, Name}, _From, State) ->
Key = {hook, Name}, Reply =
case ets:lookup(?BROKER_TAB, Key) of
[{Key, Hooks}] ->
ets:insert(?BROKER_TAB, {Key, lists:keydelete(Name, 1, Hooks)});
[] ->
{error, not_found}
end,
{reply, Reply, State};
handle_call(_Request, _From, State) ->
{reply, error, State}.
{reply, {error, unsupport_request}, State}.
handle_cast(_Msg, State) ->
{noreply, State}.

View File

@ -121,7 +121,7 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} =
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
handle_info({force_subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) ->
handle_info({subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) ->
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
@ -161,15 +161,15 @@ handle_info(Info, State = #state{peername = Peername}) ->
{stop, {badinfo, Info}, State}.
terminate(Reason, #state{peername = Peername, keepalive = KeepAlive, proto_state = ProtoState}) ->
lager:debug("Client ~s: ~p terminated, reason: ~p~n", [emqttd_net:format(Peername), self(), Reason]),
lager:info("Client ~s: ~p terminated, reason: ~p~n", [emqttd_net:format(Peername), self(), Reason]),
notify(disconnected, Reason, ProtoState),
emqttd_keepalive:cancel(KeepAlive),
case {ProtoState, Reason} of
{undefined, _} -> ok;
{_, {shutdown, Error}} ->
emqttd_protocol:shutdown(Error, ProtoState);
{_, _} ->
ok
{_, Reason} ->
emqttd_protocol:shutdown(Reason, ProtoState)
end.
code_change(_OldVsn, State, _Extra) ->
@ -257,7 +257,8 @@ inc(_) ->
notify(disconnected, _Reason, undefined) -> ingore;
notify(disconnected, {shutdown, Reason}, ProtoState) ->
emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), [{reason, Reason}]});
emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason});
notify(disconnected, Reason, ProtoState) ->
emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), [{reason, Reason}]}).
emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason}).

View File

@ -33,7 +33,7 @@
-define(SERVER, ?MODULE).
%% API Exports
-export([start_link/3]).
-export([start_link/2, pool/0, table/0]).
-export([lookup/1, register/1, unregister/1]).
@ -41,9 +41,11 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {tab, statsfun}).
-record(state, {id, tab, statsfun}).
-define(POOL, cm_pool).
-define(CM_POOL, cm_pool).
-define(CLIENT_TAB, mqtt_client).
%%%=============================================================================
%%% API
@ -53,12 +55,15 @@
%% @doc Start client manager
%% @end
%%------------------------------------------------------------------------------
-spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
Id :: pos_integer(),
TabId :: ets:tid(),
StatsFun :: fun().
start_link(Id, TabId, StatsFun) ->
gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []).
start_link(Id, StatsFun) ->
gen_server:start_link(?MODULE, [Id, StatsFun], []).
pool() -> ?CM_POOL.
table() -> ?CLIENT_TAB.
%%------------------------------------------------------------------------------
%% @doc Lookup client pid with clientId
@ -66,7 +71,7 @@ start_link(Id, TabId, StatsFun) ->
%%------------------------------------------------------------------------------
-spec lookup(ClientId :: binary()) -> pid() | undefined.
lookup(ClientId) when is_binary(ClientId) ->
case ets:lookup(emqttd_cm_sup:table(), ClientId) of
case ets:lookup(?CLIENT_TAB, ClientId) of
[{_, Pid, _}] -> Pid;
[] -> undefined
end.
@ -77,7 +82,7 @@ lookup(ClientId) when is_binary(ClientId) ->
%%------------------------------------------------------------------------------
-spec register(ClientId :: binary()) -> ok.
register(ClientId) when is_binary(ClientId) ->
CmPid = gproc_pool:pick_worker(?POOL, ClientId),
CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
gen_server:call(CmPid, {register, ClientId, self()}, infinity).
%%------------------------------------------------------------------------------
@ -86,19 +91,19 @@ register(ClientId) when is_binary(ClientId) ->
%%------------------------------------------------------------------------------
-spec unregister(ClientId :: binary()) -> ok.
unregister(ClientId) when is_binary(ClientId) ->
CmPid = gproc_pool:pick_worker(?POOL, ClientId),
CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
gen_server:cast(CmPid, {unregister, ClientId, self()}).
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([Id, TabId, StatsFun]) ->
gproc_pool:connect_worker(?POOL, {?MODULE, Id}),
{ok, #state{tab = TabId, statsfun = StatsFun}}.
init([Id, StatsFun]) ->
gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}),
{ok, #state{id = Id, statsfun = StatsFun}}.
handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) ->
case ets:lookup(Tab, ClientId) of
handle_call({register, ClientId, Pid}, _From, State) ->
case ets:lookup(?CLIENT_TAB, ClientId) of
[{_, Pid, _}] ->
lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
ignore;
@ -106,9 +111,9 @@ handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) ->
lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]),
OldPid ! {stop, duplicate_id, Pid},
erlang:demonitor(MRef),
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)});
ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)});
[] ->
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)})
ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)})
end,
{reply, ok, setstats(State)};
@ -116,11 +121,11 @@ handle_call(Req, _From, State) ->
lager:error("unexpected request: ~p", [Req]),
{reply, {error, badreq}, State}.
handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) ->
case ets:lookup(TabId, ClientId) of
handle_cast({unregister, ClientId, Pid}, State) ->
case ets:lookup(?CLIENT_TAB, ClientId) of
[{_, Pid, MRef}] ->
erlang:demonitor(MRef, [flush]),
ets:delete(TabId, ClientId);
ets:delete(?CLIENT_TAB, ClientId);
[_] ->
ignore;
[] ->
@ -131,15 +136,15 @@ handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) ->
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = TabId}) ->
ets:match_delete(TabId, {'_', DownPid, MRef}),
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}),
{noreply, setstats(State)};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
terminate(_Reason, #state{id = Id}) ->
gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -148,6 +153,6 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%=============================================================================
setstats(State = #state{tab = TabId, statsfun = StatsFun}) ->
StatsFun(ets:info(TabId, size)), State.
setstats(State = #state{statsfun = StatsFun}) ->
StatsFun(ets:info(?CLIENT_TAB, size)), State.

View File

@ -33,30 +33,26 @@
-behaviour(supervisor).
%% API
-export([start_link/0, table/0]).
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(CLIENT_TAB, mqtt_client).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
table() -> ?CLIENT_TAB.
init([]) ->
TabId = ets:new(?CLIENT_TAB, [set, named_table, public,
{write_concurrency, true}]),
ets:new(emqttd_cm:table(), [set, named_table, public,
{write_concurrency, true}]),
Schedulers = erlang:system_info(schedulers),
gproc_pool:new(cm_pool, hash, [{size, Schedulers}]),
gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]),
StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'),
Children = lists:map(
fun(I) ->
Name = {emqttd_cm, I},
gproc_pool:add_worker(cm_pool, Name, I),
{Name, {emqttd_cm, start_link, [I, TabId, StatsFun]},
permanent, 10000, worker, [emqttd_cm]}
gproc_pool:add_worker(emqttd_cm:pool(), Name, I),
{Name, {emqttd_cm, start_link, [I, StatsFun]},
permanent, 10000, worker, [emqttd_cm]}
end, lists:seq(1, Schedulers)),
{ok, {{one_for_all, 10, 100}, Children}}.

View File

@ -95,7 +95,7 @@ cluster([SNode]) ->
end.
%%------------------------------------------------------------------------------
%% @doc Add usern
%% @doc Add user
%% @end
%%------------------------------------------------------------------------------
useradd([Username, Password]) ->

View File

@ -73,7 +73,11 @@ handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) ->
emqttd_pubsub:publish(event, Msg),
{ok, State};
handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) ->
%%TODO: Protect from undefined clientId...
handle_event({disconnected, undefined, Reason}, State = #state{systop = SysTop}) ->
{ok, State};
handle_event({disconnected, ClientId, Reason}, State = #state{systop = SysTop}) ->
Topic = <<SysTop/binary, "clients/", ClientId/binary, "/disconnected">>,
Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)},
emqttd_pubsub:publish(event, Msg),

View File

@ -1,5 +1,5 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
@ -20,26 +20,30 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd demo acl module.
%%% emqttd gen_mod behaviour
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_plugin_demo_acl).
-module(emqttd_gen_mod).
-author("Feng Lee <feng@emqtt.io>").
-include_lib("emqttd/include/emqttd.hrl").
-include("emqttd.hrl").
-behaviour(emqttd_acl_mod).
-ifdef(use_specs).
%% ACL callbacks
-export([init/1, check_acl/2, reload_acl/1, description/0]).
-callback load(Opts :: any()) -> {ok, State :: any()}.
init(Opts) -> {ok, Opts}.
-callback unload(State :: any()) -> any().
check_acl({_Client, _PubSub, _Topic}, _State) -> ignore.
-else.
reload_acl(_State) -> ok.
-export([behaviour_info/1]).
description() -> "Demo ACL Module".
behaviour_info(callbacks) ->
[{load, 1}, {unload, 1}];
behaviour_info(_Other) ->
undefined.
-endif.

View File

@ -1,5 +1,5 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
@ -20,23 +20,31 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% presence manager application
%%% emqttd auto subscribe module.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_presence_app).
-behaviour(application).
-module(emqttd_mod_autosub).
%% Application callbacks
-export([start/2, stop/1]).
-author("Feng Lee <feng@emqtt.io>").
%% ===================================================================
%% Application callbacks
%% ===================================================================
-behaviour(emqttd_gen_mod).
start(_StartType, _StartArgs) ->
emqttd_presence_sup:start_link().
-export([load/1, subscribe/2, unload/1]).
-record(state, {topics}).
load(Opts) ->
Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2],
emqttd_broker:hook(client_connected, {?MODULE, subscribe},
{?MODULE, subscribe, [Topics]}),
{ok, #state{topics = Topics}}.
subscribe({Client, ClientId}, Topics) ->
F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end,
[Client ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics].
unload(_Opts) ->
emqttd_broker:unhook(client_connected, {?MODULE, subscribe}).
stop(_State) ->
ok.

View File

@ -0,0 +1,127 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd rewrite module.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_mod_rewrite).
-author("Feng Lee <feng@emqtt.io>").
-include_lib("emqtt/include/emqtt.hrl").
-behaviour(emqttd_gen_mod).
-export([load/1, reload/1, unload/1]).
-export([rewrite/2]).
%%%=============================================================================
%%% API
%%%=============================================================================
load(Opts) ->
File = proplists:get_value(file, Opts),
{ok, Terms} = file:consult(File),
Sections = compile(Terms),
emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe},
{?MODULE, rewrite, [subscribe, Sections]}),
emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe},
{?MODULE, rewrite, [unsubscribe, Sections]}),
emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish},
{?MODULE, rewrite, [publish, Sections]}).
rewrite(TopicTable, [subscribe, Sections]) ->
lager:info("rewrite subscribe: ~p", [TopicTable]),
[{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable];
rewrite(Topics, [unsubscribe, Sections]) ->
lager:info("rewrite unsubscribe: ~p", [Topics]),
[match_topic(Topic, Sections) || Topic <- Topics];
rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) ->
%%TODO: this will not work if the client is always online.
RewriteTopic =
case get({rewrite, Topic}) of
undefined ->
DestTopic = match_topic(Topic, Sections),
put({rewrite, Topic}, DestTopic), DestTopic;
DestTopic ->
DestTopic
end,
Message#mqtt_message{topic = RewriteTopic}.
reload(File) ->
%%TODO: The unload api is not right...
case emqttd:is_mod_enabled(rewrite) of
true ->
unload(state),
load([{file, File}]);
false ->
{error, module_unloaded}
end.
unload(_) ->
emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}),
emqttd_broker:unhook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}),
emqttd_broker:unhook(client_publish, {?MODULE, rewrite_publish}).
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
compile(Sections) ->
C = fun({rewrite, Re, Dest}) ->
{ok, MP} = re:compile(Re),
{rewrite, MP, Dest}
end,
F = fun({topic, Topic, Rules}) ->
{topic, list_to_binary(Topic), [C(R) || R <- Rules]}
end,
[F(Section) || Section <- Sections].
match_topic(Topic, []) ->
Topic;
match_topic(Topic, [{topic, Filter, Rules} | Sections]) ->
case emqtt_topic:match(Topic, Filter) of
true ->
match_rule(Topic, Rules);
false ->
match_topic(Topic, Sections)
end.
match_rule(Topic, []) ->
Topic;
match_rule(Topic, [{rewrite, MP, Dest} | Rules]) ->
case re:run(Topic, MP, [{capture, all_but_first, list}]) of
{match, Captured} ->
%%TODO: stupid??? how to replace $1, $2?
Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured),
iolist_to_binary(lists:foldl(
fun({Var, Val}, Acc) ->
re:replace(Acc, Var, Val, [global])
end, Dest, Vars));
nomatch ->
match_rule(Topic, Rules)
end.

View File

@ -0,0 +1,67 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd module supervisor.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_mod_sup).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-behaviour(supervisor).
%% API
-export([start_link/0, start_child/1, start_child/2]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}).
%%%=============================================================================
%%% API
%%%=============================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(ChildSpec) when is_tuple(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec).
%%
%% start_child(Mod::atom(), Type::type()) -> {ok, pid()}
%% @type type() = worker | supervisor
%%
start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
%%%=============================================================================
%%% Supervisor callbacks
%%%=============================================================================
init([]) ->
{ok, {{one_for_one, 10, 3600}, []}}.

View File

@ -134,10 +134,10 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
emqttd_cm:register(ClientId1),
%%Starting session
{ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}),
%% Force subscriptions
force_subscribe(ClientId1),
%% Start keepalive
start_keepalive(KeepAlive),
%% Run hooks
emqttd_broker:foreach_hooks(client_connected, [{self(), ClientId1}]),
{?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1,
session = Session,
will_msg = willmsg(Var)}};
@ -158,7 +158,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) ->
case check_acl(publish, Topic, State) of
allow ->
emqttd_session:publish(Session, ClientId, {?QOS_0, emqtt_message:from_packet(Packet)});
do_publish(Session, ClientId, ?QOS_0, Packet);
deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic])
end,
@ -168,7 +168,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) ->
case check_acl(publish, Topic, State) of
allow ->
emqttd_session:publish(Session, ClientId, {?QOS_1, emqtt_message:from_packet(Packet)}),
do_publish(Session, ClientId, ?QOS_1, Packet),
send(?PUBACK_PACKET(?PUBACK, PacketId), State);
deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
@ -179,7 +179,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) ->
case check_acl(publish, Topic, State) of
allow ->
NewSession = emqttd_session:publish(Session, ClientId, {?QOS_2, emqtt_message:from_packet(Packet)}),
NewSession = do_publish(Session, ClientId, ?QOS_2, Packet),
send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession});
deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
@ -212,12 +212,13 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid =
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
{ok, State};
false ->
TopicTable1 = emqttd_broker:foldl_hooks(client_subscribe, [], TopicTable),
%%TODO: GrantedQos should be renamed.
{ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable),
{ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1),
send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession})
end;
handle({subscribe, Topic, Qos}, State = #proto_state{clientid = ClientId, session = Session}) ->
handle({subscribe, Topic, Qos}, State = #proto_state{session = Session}) ->
{ok, NewSession, _GrantedQos} = emqttd_session:subscribe(Session, [{Topic, Qos}]),
{ok, State#proto_state{session = NewSession}};
@ -226,7 +227,8 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
send(?UNSUBACK_PACKET(PacketId), State);
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
{ok, NewSession} = emqttd_session:unsubscribe(Session, Topics),
Topics1 = emqttd_broker:foldl_hooks(client_unsubscribe, [], Topics),
{ok, NewSession} = emqttd_session:unsubscribe(Session, Topics1),
send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession});
handle(?PACKET(?PINGREQ), State) ->
@ -237,6 +239,10 @@ handle(?PACKET(?DISCONNECT), State) ->
% clean willmsg
{stop, normal, State#proto_state{will_msg = undefined}}.
do_publish(Session, ClientId, Qos, Packet) ->
Message = emqttd_broker:foldl_hooks(client_publish, [], emqtt_message:from_packet(Packet)),
emqttd_session:publish(Session, ClientId, {Qos, Message}).
-spec send({pid() | tuple(), mqtt_message()} | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
%% qos0 message
send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) ->
@ -298,23 +304,8 @@ send_willmsg(_ClientId, undefined) ->
send_willmsg(ClientId, WillMsg) ->
emqttd_pubsub:publish(ClientId, WillMsg).
%%TODO: will be fixed in 0.8
force_subscribe(ClientId) ->
case emqttd_broker:env(forced_subscriptions) of
undefined ->
ingore;
Topics ->
[force_subscribe(ClientId, {Topic, Qos}) || {Topic, Qos} <- Topics]
end.
force_subscribe(ClientId, {Topic, Qos}) when is_list(Topic) ->
force_subscribe(ClientId, {list_to_binary(Topic), Qos});
force_subscribe(ClientId, {Topic, Qos}) when is_binary(Topic) ->
Topic1 = emqtt_topic:feed_var(<<"$c">>, ClientId, Topic),
self() ! {force_subscribe, Topic1, Qos}.
start_keepalive(0) -> ignore;
start_keepalive(Sec) when Sec > 0 ->
self() ! {keepalive, start, round(Sec * 1.5)}.

View File

@ -159,7 +159,7 @@ cast(Msg) ->
%%------------------------------------------------------------------------------
-spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok.
publish(From, #mqtt_message{topic=Topic} = Msg) ->
lager:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s", [From, Topic]),
trace(publish, From, Msg),
%% Retain message first. Don't create retained topic.
case emqttd_msg_store:retain(Msg) of
ok ->
@ -169,7 +169,7 @@ publish(From, #mqtt_message{topic=Topic} = Msg) ->
publish(From, Topic, Msg)
end.
publish(_From, <<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) ->
publish(From, <<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) ->
lists:foreach(
fun(#mqtt_queue{subpid = SubPid, qos = SubQos}) ->
Msg1 = if
@ -450,3 +450,16 @@ setstats(dropped, false) ->
setstats(dropped, true) ->
emqttd_metrics:inc('messages/dropped').
%%%=============================================================================
%%% Trace functions
%%%=============================================================================
trace(publish, From, _Msg) when is_atom(From) ->
%%dont' trace broker publish
ignore;
trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) ->
lager:info([{client, From}, {topic, Topic}],
"~s PUBLISH to ~s: ~p", [From, Topic, Payload]).

View File

@ -44,10 +44,8 @@
-behaviour(gen_server).
-define(SERVER, ?MODULE).
%% API Function Exports
-export([start_link/0]).
-export([start_link/2, pool/0, table/0]).
-export([lookup_session/1, start_session/2, destroy_session/1]).
@ -55,16 +53,37 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {tabid, statsfun}).
-record(state, {id, tabid, statsfun}).
-define(SM_POOL, sm_pool).
-define(SESSION_TAB, mqtt_session).
%%%=============================================================================
%%% API
%%%=============================================================================
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%------------------------------------------------------------------------------
%% @doc Start a session manager
%% @end
%%------------------------------------------------------------------------------
-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
Id :: pos_integer(),
StatsFun :: fun().
start_link(Id, StatsFun) ->
gen_server:start_link(?MODULE, [Id, StatsFun], []).
%%------------------------------------------------------------------------------
%% @doc Pool name.
%% @end
%%------------------------------------------------------------------------------
pool() -> ?SM_POOL.
%%------------------------------------------------------------------------------
%% @doc Table name.
%% @end
%%------------------------------------------------------------------------------
table() -> ?SESSION_TAB.
%%------------------------------------------------------------------------------
%% @doc Lookup Session Pid
@ -72,7 +91,7 @@ start_link() ->
%%------------------------------------------------------------------------------
-spec lookup_session(binary()) -> pid() | undefined.
lookup_session(ClientId) ->
case ets:lookup(?SESSION_TAB, ClientId) of
case ets:lookup(emqttd_sm_sup:table(), ClientId) of
[{_, SessPid, _}] -> SessPid;
[] -> undefined
end.
@ -83,7 +102,8 @@ lookup_session(ClientId) ->
%%------------------------------------------------------------------------------
-spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}.
start_session(ClientId, ClientPid) ->
gen_server:call(?SERVER, {start_session, ClientId, ClientPid}).
SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server:call(SmPid, {start_session, ClientId, ClientPid}).
%%------------------------------------------------------------------------------
%% @doc Destroy a session
@ -91,29 +111,27 @@ start_session(ClientId, ClientPid) ->
%%------------------------------------------------------------------------------
-spec destroy_session(binary()) -> ok.
destroy_session(ClientId) ->
gen_server:call(?SERVER, {destroy_session, ClientId}).
SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server:call(SmPid, {destroy_session, ClientId}).
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([]) ->
process_flag(trap_exit, true),
TabId = ets:new(?SESSION_TAB, [set, protected, named_table]),
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
{ok, #state{tabid = TabId, statsfun = StatsFun}}.
init([Id, StatsFun]) ->
gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}),
{ok, #state{id = Id, statsfun = StatsFun}}.
handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) ->
handle_call({start_session, ClientId, ClientPid}, _From, State) ->
Reply =
case ets:lookup(Tab, ClientId) of
case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, _MRef}] ->
emqttd_session:resume(SessPid, ClientId, ClientPid),
{ok, SessPid};
[] ->
case emqttd_session_sup:start_session(ClientId, ClientPid) of
{ok, SessPid} ->
ets:insert(Tab, {ClientId, SessPid,
erlang:monitor(process, SessPid)}),
ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}),
{ok, SessPid};
{error, Error} ->
{error, Error}
@ -121,12 +139,12 @@ handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid =
end,
{reply, Reply, setstats(State)};
handle_call({destroy_session, ClientId}, _From, State = #state{tabid = Tab}) ->
case ets:lookup(Tab, ClientId) of
handle_call({destroy_session, ClientId}, _From, State) ->
case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, MRef}] ->
emqttd_session:destroy(SessPid, ClientId),
erlang:demonitor(MRef, [flush]),
ets:delete(Tab, ClientId);
ets:delete(?SESSION_TAB, ClientId);
[] ->
ignore
end,
@ -145,8 +163,8 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tabid = Ta
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
terminate(_Reason, #state{id = Id}) ->
gproc_pool:disconnect_worker(?SM_POOL, {?MODULE, Id}), ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -0,0 +1,59 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd client manager supervisor.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_sm_sup).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
%% API
-export([start_link/0]).
-behaviour(supervisor).
%% Supervisor callbacks
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
ets:new(emqttd_sm:table(), [set, named_table, public,
{write_concurrency, true}]),
Schedulers = erlang:system_info(schedulers),
gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]),
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
Children = lists:map(
fun(I) ->
Name = {emqttd_sm, I},
gproc_pool:add_worker(emqttd_sm:pool(), Name, I),
{Name, {emqttd_sm, start_link, [I, StatsFun]},
permanent, 10000, worker, [emqttd_sm]}
end, lists:seq(1, Schedulers)),
{ok, {{one_for_all, 10, 100}, Children}}.

View File

@ -39,7 +39,7 @@
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}).
%%%=============================================================================
%%% API
@ -63,5 +63,5 @@ start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
%%%=============================================================================
init([]) ->
{ok, {{one_for_all, 10, 100}, []}}.
{ok, {{one_for_all, 10, 3600}, []}}.

View File

@ -0,0 +1,107 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd_access_control tests.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_access_control_tests).
-include("emqttd.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
reload_acl_test() ->
with_acl(
fun() ->
?assertEqual([ok], emqttd_access_control:reload_acl())
end).
register_mod_test() ->
with_acl(
fun() ->
emqttd_access_control:register_mod(acl, emqttd_acl_test_mod, []),
?assertMatch([{emqttd_acl_test_mod, _}, {emqttd_acl_internal, _}],
emqttd_access_control:lookup_mods(acl)),
emqttd_access_control:register_mod(auth, emqttd_auth_anonymous_test_mod,[]),
?assertMatch([{emqttd_auth_anonymous_test_mod, _}, {emqttd_auth_anonymous, _}],
emqttd_access_control:lookup_mods(auth))
end).
unregister_mod_test() ->
with_acl(
fun() ->
emqttd_access_control:register_mod(acl,emqttd_acl_test_mod, []),
?assertMatch([{emqttd_acl_test_mod, _}, {emqttd_acl_internal, _}],
emqttd_access_control:lookup_mods(acl)),
emqttd_access_control:unregister_mod(acl, emqttd_acl_test_mod),
timer:sleep(5),
?assertMatch([{emqttd_acl_internal, _}], emqttd_access_control:lookup_mods(acl)),
emqttd_access_control:register_mod(auth, emqttd_auth_anonymous_test_mod,[]),
?assertMatch([{emqttd_auth_anonymous_test_mod, _}, {emqttd_auth_anonymous, _}],
emqttd_access_control:lookup_mods(auth)),
emqttd_access_control:unregister_mod(auth, emqttd_auth_anonymous_test_mod),
timer:sleep(5),
?assertMatch([{emqttd_auth_anonymous, _}], emqttd_access_control:lookup_mods(auth))
end).
check_acl_test() ->
with_acl(
fun() ->
User1 = #mqtt_client{clientid = <<"client1">>, username = <<"testuser">>},
User2 = #mqtt_client{clientid = <<"client2">>, username = <<"xyz">>},
?assertEqual(allow, emqttd_access_control:check_acl(User1, subscribe, <<"users/testuser/1">>)),
?assertEqual(allow, emqttd_access_control:check_acl(User1, subscribe, <<"clients/client1">>)),
?assertEqual(deny, emqttd_access_control:check_acl(User1, subscribe, <<"clients/client1/x/y">>)),
?assertEqual(allow, emqttd_access_control:check_acl(User1, publish, <<"users/testuser/1">>)),
?assertEqual(allow, emqttd_access_control:check_acl(User1, subscribe, <<"a/b/c">>)),
?assertEqual(deny, emqttd_access_control:check_acl(User2, subscribe, <<"a/b/c">>))
end).
with_acl(Fun) ->
process_flag(trap_exit, true),
AclOpts = [
{auth, [
%% Authentication with username, password
%{username, []},
%% Authentication with clientid
%{clientid, [{password, no}, {file, "etc/clients.config"}]},
%% Allow all
{anonymous, []}
]},
%% ACL config
{acl, [
%% Internal ACL module
{internal, [{file, "../test/test_acl.config"}, {nomatch, allow}]}
]}
],
application:set_env(emqttd, access, AclOpts),
emqttd_access_control:start_link(),
Fun(),
emqttd_access_control:stop().
-endif.

View File

@ -53,8 +53,8 @@ compile_test() ->
?assertEqual({deny, all}, compile({deny, all})).
match_test() ->
User = #mqtt_user{ipaddr = {127,0,0,1}, clientid = <<"testClient">>, username = <<"TestUser">>},
User2 = #mqtt_user{ipaddr = {192,168,0,10}, clientid = <<"testClient">>, username = <<"TestUser">>},
User = #mqtt_client{ipaddr = {127,0,0,1}, clientid = <<"testClient">>, username = <<"TestUser">>},
User2 = #mqtt_client{ipaddr = {192,168,0,10}, clientid = <<"testClient">>, username = <<"TestUser">>},
?assertEqual({matched, allow}, match(User, <<"Test/Topic">>, {allow, all})),
?assertEqual({matched, deny}, match(User, <<"Test/Topic">>, {deny, all})),
@ -68,7 +68,7 @@ match_test() ->
compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]}))),
?assertMatch({matched, allow}, match(User, <<"clients/testClient">>,
compile({allow, all, pubsub, ["clients/$c"]}))),
?assertMatch({matched, allow}, match(#mqtt_user{username = <<"user2">>}, <<"users/user2/abc/def">>,
?assertMatch({matched, allow}, match(#mqtt_client{username = <<"user2">>}, <<"users/user2/abc/def">>,
compile({allow, all, subscribe, ["users/$u/#"]}))),
?assertMatch({matched, deny},
match(User, <<"d/e/f">>,

View File

@ -20,23 +20,20 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd demo auth module.
%%% Test ACL Module.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_plugin_demo_auth).
-author("Feng Lee <feng@emqtt.io>").
-include_lib("emqttd/include/emqttd.hrl").
-behaviour(emqttd_auth_mod).
-module(emqttd_auth_anonymous_test_mod).
%% ACL callbacks
-export([init/1, check/3, description/0]).
init(Opts) -> {ok, Opts}.
init(AclOpts) ->
{ok, AclOpts}.
check(_Client, _Password, _Opts) -> ignore.
description() -> "Demo authentication module".
check(_Client, _Password, _Opts) ->
allow.
description() ->
"Test emqttd_auth_anonymous Mod".

View File

@ -0,0 +1,23 @@
## Overview
Authentication with LDAP.
## Plugin Config
```
{emqttd_auth_ldap, [
{servers, ["localhost"]},
{port, 389},
{timeout, 30},
{user_dn, "uid=$u,ou=People,dc=example,dc=com"},
{ssl, fasle},
{sslopts, [
{"certfile", "ssl.crt"},
{"keyfile", "ssl.key"}]}
]}
```
## Load Plugin
Merge the'etc/plugin.config' to emqttd/etc/plugins.config, and the plugin will be loaded automatically.

View File

@ -0,0 +1,12 @@
[
{emqttd_auth_ldap, [
{servers, ["localhost"]},
{port, 389},
{timeout, 30},
{user_dn, "uid=$u,ou=People,dc=example,dc=com"},
{ssl, fasle},
{sslopts, [
{"certfile", "ssl.crt"},
{"keyfile", "ssl.key"}]}
]}
].

View File

@ -0,0 +1,12 @@
{application, emqttd_auth_ldap,
[
{description, "emqttd LDAP Authentication Plugin"},
{vsn, "1.0"},
{registered, []},
{applications, [
kernel,
stdlib
]},
{mod, { emqttd_auth_ldap_app, []}},
{env, []}
]}.

View File

@ -0,0 +1,92 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% LDAP Authentication Module.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_auth_ldap).
-author("Feng Lee <feng@emqtt.io>").
-include_lib("emqttd/include/emqttd.hrl").
-import(proplists, [get_value/2, get_value/3]).
-behaviour(emqttd_auth_mod).
-export([init/1, check/3, description/0]).
-record(state, {servers, user_dn, options}).
init(Opts) ->
Servers = get_value(servers, Opts, ["localhost"]),
Port = get_value(port, Opts, 389),
Timeout = get_value(timeout, Opts, 30),
UserDn = get_value(user_dn, Opts),
LdapOpts =
case get_value(ssl, Opts, false) of
true ->
SslOpts = get_value(sslopts, Opts),
[{port, Port}, {timeout, Timeout}, {sslopts, SslOpts}];
false ->
[{port, Port}, {timeout, Timeout}]
end,
{ok, #state{servers = Servers, user_dn = UserDn, options = LdapOpts}}.
check(#mqtt_client{username = undefined}, _Password, _State) ->
{error, "Username undefined"};
check(_Client, undefined, _State) ->
{error, "Password undefined"};
check(_Client, <<>>, _State) ->
{error, "Password undefined"};
check(#mqtt_client{username = Username}, Password,
#state{servers = Servers, user_dn = UserDn, options = Options}) ->
case eldap:open(Servers, Options) of
{ok, LDAP} ->
UserDn1 = fill(binary_to_list(Username), UserDn),
ldap_bind(LDAP, UserDn1, binary_to_list(Password));
{error, Reason} ->
{error, Reason}
end.
ldap_bind(LDAP, UserDn, Password) ->
case catch eldap:simple_bind(LDAP, UserDn, Password) of
ok ->
ok;
{error, invalidCredentials} ->
{error, "LDAP Invalid Credentials"};
{error, Error} ->
{error, Error};
{'EXIT', Reason} ->
{error, Reason}
end.
fill(Username, UserDn) ->
lists:append(lists:map(
fun("$u") -> Username;
(S) -> S
end, string:tokens(UserDn, ",="))).
description() ->
"LDAP Authentication Module".

View File

@ -1,5 +1,5 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
@ -20,35 +20,39 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% presence manager supervisor.
%%% LDAP Authentication Plugin.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_presence_sup).
-module(emqttd_auth_ldap_app).
-behaviour(application).
%% Application callbacks
-export([start/2, prep_stop/1, stop/1]).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
%%%=============================================================================
%%% Application callbacks
%%%=============================================================================
%% ===================================================================
%% API functions
%% ===================================================================
start_link() ->
start(_StartType, _StartArgs) ->
Env = application:get_all_env(emqttd_auth_ldap),
emqttd_access_control:register_mod(auth, emqttd_auth_ldap, Env),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
prep_stop(State) ->
emqttd_access_control:unregister_mod(auth, emqttd_auth_ldap), State.
stop(_State) ->
ok.
%%%=============================================================================
%%% Supervisor callbacks(Dummy)
%%%=============================================================================
init([]) ->
{ok, { {one_for_one, 5, 10}, []} }.

View File

@ -1,8 +1,32 @@
## Overview
## Overview
Authentication with user table of MySQL database.
## User Table
## etc/plugin.config
```
{emysql, [
{pool, 4},
{host, "localhost"},
{port, 3306},
{username, ""},
{password, ""},
{database, "mqtt"},
{encoding, utf8}
]},
{emqttd_auth_mysql, [
{user_table, mqtt_users},
%% plain password only
{password_hash, plain},
{field_mapper, [
{username, username},
{password, password}
]}
]}
```
## Users Table(Demo)
Notice: This is a demo table. You could authenticate with any user tables.
@ -18,8 +42,7 @@ CREATE TABLE `mqtt_users` (
) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
```
## Plugins config
Please configure 'etc/plugins.config' to loade emysql and emqttd_auth_mysql plugins.
## Load Plugin
Merge the'etc/plugin.config' to emqttd/etc/plugins.config, and the plugin will be loaded by the broker.

View File

@ -0,0 +1,16 @@
{emysql, [
{pool, 4},
{host, "localhost"},
{port, 3306},
{username, "root"},
{password, "public"},
{database, "mqtt"},
{encoding, utf8}
]},
{emqttd_auth_mysql, [
{users_table, mqtt_users},
{field_mapper, [
{username, username},
{password, password, plain}
]}
]}

View File

@ -1,7 +1,7 @@
{application, emqttd_auth_mysql,
[
{description, ""},
{vsn, "0.1"},
{description, "emqttd MySQL Authentication Plugin"},
{vsn, "1.0"},
{registered, []},
{applications, [
kernel,

View File

@ -20,7 +20,7 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd authentication by mysql user table.
%%% emqttd authentication by mysql 'user' table.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
@ -34,22 +34,42 @@
-export([init/1, check/3, description/0]).
-record(state, {user_tab}).
-record(state, {user_table, name_field, pass_field, pass_hash}).
init(Opts) ->
UserTab = proplists:get_value(user_table, Opts, mqtt_users),
{ok, #state{user_tab = UserTab}}.
Mapper = proplists:get_value(field_mapper, Opts),
{ok, #state{user_table = proplists:get_value(user_table, Opts, mqtt_users),
name_field = proplists:get_value(username, Mapper),
pass_field = proplists:get_value(password, Mapper),
pass_hash = proplists:get_value(Opts, password_hash)}}.
check(#mqtt_client{username = undefined}, _Password, _State) ->
{error, "Username undefined"};
check(_Client, undefined, _State) ->
{error, "Password undefined"};
check(#mqtt_client{username = Username}, Password, #state{user_tab = UserTab}) ->
%%TODO: hash password...
case emysql:select(UserTab, {'and', {username, Username}, {password, Password}}) of
{ok, []} -> {error, "Username or Password not match"};
check(#mqtt_client{username = Username}, Password,
#state{user_table = UserTab, pass_hash = Type,
name_field = NameField, pass_field = PassField}) ->
Where = {'and', {NameField, Username}, {PassField, hash(Type, Password)}},
case emysql:select(UserTab, Where) of
{ok, []} -> {error, "Username or Password "};
{ok, _Record} -> ok
end.
description() -> "Authentication by MySQL".
hash(plain, Password) ->
Password;
hash(md5, Password) ->
hexstring(crypto:hash(md5, Password));
hash(sha, Password) ->
hexstring(crypto:hash(sha, Password)).
hexstring(<<X:128/big-unsigned-integer>>) ->
lists:flatten(io_lib:format("~32.16.0b", [X]));
hexstring(<<X:160/big-unsigned-integer>>) ->
lists:flatten(io_lib:format("~40.16.0b", [X])).

View File

@ -20,27 +20,40 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% mysql authentication app.
%%% emqttd mysql authentication app.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_auth_mysql_app).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
-export([start/2, prep_stop/1, stop/1]).
%% ===================================================================
%% Application callbacks
%% ===================================================================
-behaviour(supervisor).
%% Supervisor callbacks
-export([init/1]).
%%%=============================================================================
%%% Application callbacks
%%%=============================================================================
start(_StartType, _StartArgs) ->
{ok, Sup} = emqttd_auth_mysql_sup:start_link(),
Env = application:get_all_env(),
emqttd_access_control:register_mod(auth, emqttd_auth_mysql, Env),
{ok, Sup}.
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
prep_stop(State) ->
emqttd_access_control:unregister_mod(auth, emqttd_auth_mysql), State.
stop(_State) ->
emqttd_access_control:unregister_mod(auth, emqttd_auth_mysql),
ok.
%%%=============================================================================
%%% Supervisor callbacks(Dummy)
%%%=============================================================================
init([]) ->
{ok, { {one_for_one, 5, 10}, []} }.

View File

@ -1,27 +0,0 @@
-module(emqttd_auth_mysql_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
%% ===================================================================
%% API functions
%% ===================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
init([]) ->
{ok, { {one_for_one, 5, 10}, []} }.

View File

@ -28,6 +28,11 @@
-author("Feng Lee <feng@emqtt.io>").
-export([handle_request/1]).
%%TODO...
handle_request(Req) ->
Req:ok("hello!").

View File

@ -10,7 +10,18 @@
%% ===================================================================
start(_StartType, _StartArgs) ->
emqttd_dashboard_sup:start_link().
{ok, Sup} = emqttd_dashboard_sup:start_link(),
open_listener(application:get_env(listener)),
{ok, Sup}.
stop(_State) ->
ok.
%% open http port
open_listener({_Http, Port, Options}) ->
MFArgs = {emqttd_dashboard, handle_request, []},
mochiweb:start_http(Port, Options, MFArgs).
close_listener(Port) ->
mochiweb:stop_http(Port).

View File

@ -1,12 +0,0 @@
{application, emqttd_plugin_demo,
[
{description, "emqttd demo plugin"},
{vsn, "0.1"},
{registered, []},
{applications, [
kernel,
stdlib
]},
{mod, { emqttd_plugin_demo_app, []}},
{env, []}
]}.

View File

@ -1,21 +0,0 @@
-module(emqttd_plugin_demo_app).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
%% ===================================================================
%% Application callbacks
%% ===================================================================
start(_StartType, _StartArgs) ->
{ok, Sup} = emqttd_plugin_demo_sup:start_link(),
emqttd_access_control:register_mod(auth, emqttd_plugin_demo_auth, []),
emqttd_access_control:register_mod(acl, emqttd_plugin_demo_acl, []),
{ok, Sup}.
stop(_State) ->
emqttd_access_control:unregister_mod(auth, emqttd_plugin_demo_auth),
emqttd_access_control:unregister_mod(acl, emqttd_plugin_demo_acl),
ok.

View File

@ -1,27 +0,0 @@
-module(emqttd_plugin_demo_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
%% ===================================================================
%% API functions
%% ===================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
init([]) ->
{ok, { {one_for_one, 5, 10}, []} }.

View File

View File

@ -0,0 +1,32 @@
REBAR?=./rebar
all: build
clean:
$(REBAR) clean
rm -rf logs
rm -rf .eunit
rm -f test/*.beam
distclean: clean
git clean -fxd
build: depends
$(REBAR) compile
eunit:
$(REBAR) eunit skip_deps=true
check: build eunit
%.beam: %.erl
erlc -o test/ $<
.PHONY: all clean distclean depends build eunit check

View File

@ -0,0 +1,49 @@
## Overview
Authentication with user table of MySQL database.
## etc/plugin.config
```erlang
[
{emysql, [
{pool, 4},
{host, "localhost"},
{port, 3306},
{username, ""},
{password, ""},
{database, "mqtt"},
{encoding, utf8}
]},
{emqttd_auth_mysql, [
{user_table, mqtt_users},
%% plain password only
{password_hash, plain},
{field_mapper, [
{username, username},
{password, password}
]}
]}
].
```
## Users Table(Demo)
Notice: This is a demo table. You could authenticate with any user tables.
```
CREATE TABLE `mqtt_users` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`username` varchar(60) DEFAULT NULL,
`password` varchar(60) DEFAULT NULL,
`salt` varchar(20) DEFAULT NULL,
`created` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_users_username` (`username`)
) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
```
## Load Plugin
Merge the'etc/plugin.config' to emqttd/etc/plugins.config, and the plugin will be loaded by the broker.

View File

@ -0,0 +1,151 @@
/*
* Copyright (c) 1995, 1996, 1997 Kungliga Tekniska Hgskolan
* (Royal Institute of Technology, Stockholm, Sweden).
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. All advertising materials mentioning features or use of this software
* must display the following acknowledgement:
* This product includes software developed by the Kungliga Tekniska
* Hgskolan and its contributors.
*
* 4. Neither the name of the Institute nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
/*RCSID("$Id: base64.c,v 1.1 2005/02/11 07:34:35 jpm Exp jpm $");*/
#endif
#include <stdlib.h>
#include <string.h>
#include "base64.h"
static char base64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
static int pos(char c)
{
char *p;
for(p = base64; *p; p++)
if(*p == c)
return p - base64;
return -1;
}
int base64_encode(const void *data, int size, char **str)
{
char *s, *p;
int i;
int c;
unsigned char *q;
p = s = (char*)malloc(size*4/3+4);
if (p == NULL)
return -1;
q = (unsigned char*)data;
i=0;
for(i = 0; i < size;){
c=q[i++];
c*=256;
if(i < size)
c+=q[i];
i++;
c*=256;
if(i < size)
c+=q[i];
i++;
p[0]=base64[(c&0x00fc0000) >> 18];
p[1]=base64[(c&0x0003f000) >> 12];
p[2]=base64[(c&0x00000fc0) >> 6];
p[3]=base64[(c&0x0000003f) >> 0];
if(i > size)
p[3]='=';
if(i > size+1)
p[2]='=';
p+=4;
}
*p=0;
*str = s;
return strlen(s);
}
int base64_decode(const char *str, void *data)
{
const char *p;
unsigned char *q;
int c;
int x;
int done = 0;
q=(unsigned char*)data;
for(p=str; *p && !done; p+=4){
x = pos(p[0]);
if(x >= 0)
c = x;
else{
done = 3;
break;
}
c*=64;
x = pos(p[1]);
if(x >= 0)
c += x;
else
return -1;
c*=64;
if(p[2] == '=')
done++;
else{
x = pos(p[2]);
if(x >= 0)
c += x;
else
return -1;
}
c*=64;
if(p[3] == '=')
done++;
else{
if(done)
return -1;
x = pos(p[3]);
if(x >= 0)
c += x;
else
return -1;
}
if(done < 3)
*q++=(c&0x00ff0000)>>16;
if(done < 2)
*q++=(c&0x0000ff00)>>8;
if(done < 1)
*q++=(c&0x000000ff)>>0;
}
return q - (unsigned char*)data;
}

View File

@ -0,0 +1,47 @@
/*
* Copyright (c) 1995, 1996, 1997 Kungliga Tekniska Hgskolan
* (Royal Institute of Technology, Stockholm, Sweden).
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. All advertising materials mentioning features or use of this software
* must display the following acknowledgement:
* This product includes software developed by the Kungliga Tekniska
* Hgskolan and its contributors.
*
* 4. Neither the name of the Institute nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
/* $Id: base64.h,v 1.1 2005/02/11 07:34:35 jpm Exp jpm $ */
#ifndef _BASE64_H_
#define _BASE64_H_
int base64_encode(const void *data, int size, char **str);
int base64_decode(const char *str, void *data);
#endif

View File

@ -0,0 +1,60 @@
// This file is part of Jiffy released under the MIT license.
// See the LICENSE file for more information.
#include "emqttd_plugin_mysql_app.h"
static int
load(ErlNifEnv* env, void** priv, ERL_NIF_TERM info)
{
emqttd_plugin_mysql_app_st* st = enif_alloc(sizeof(emqttd_plugin_mysql_app_st));
if(st == NULL) {
return 1;
}
st->atom_ok = make_atom(env, "ok");
st->atom_error = make_atom(env, "error");
st->atom_null = make_atom(env, "null");
st->atom_true = make_atom(env, "true");
st->atom_false = make_atom(env, "false");
st->atom_bignum = make_atom(env, "bignum");
st->atom_bignum_e = make_atom(env, "bignum_e");
st->atom_bigdbl = make_atom(env, "bigdbl");
st->atom_partial = make_atom(env, "partial");
st->atom_uescape = make_atom(env, "uescape");
st->atom_pretty = make_atom(env, "pretty");
st->atom_force_utf8 = make_atom(env, "force_utf8");
// Markers used in encoding
st->ref_object = make_atom(env, "$object_ref$");
st->ref_array = make_atom(env, "$array_ref$");
*priv = (void*) st;
return 0;
}
static int
reload(ErlNifEnv* env, void** priv, ERL_NIF_TERM info)
{
return 0;
}
static int
upgrade(ErlNifEnv* env, void** priv, void** old_priv, ERL_NIF_TERM info)
{
return load(env, priv, info);
}
static void
unload(ErlNifEnv* env, void* priv)
{
enif_free(priv);
return;
}
static ErlNifFunc funcs[] =
{
{"nif_pbkdf2_check", 2, pbkdf2_check}
};
ERL_NIF_INIT(emqttd_plugin_mysql_app, funcs, &load, &reload, &upgrade, &unload);

View File

@ -0,0 +1,44 @@
// This file is part of Jiffy released under the MIT license.
// See the LICENSE file for more information.
#ifndef EMQTTD_PLUGIN_MYSQL_APP_H
#define EMQTTD_PLUGIN_MYSQL_APP_H
#include "erl_nif.h"
typedef struct {
ERL_NIF_TERM atom_ok;
ERL_NIF_TERM atom_error;
ERL_NIF_TERM atom_null;
ERL_NIF_TERM atom_true;
ERL_NIF_TERM atom_false;
ERL_NIF_TERM atom_bignum;
ERL_NIF_TERM atom_bignum_e;
ERL_NIF_TERM atom_bigdbl;
ERL_NIF_TERM atom_partial;
ERL_NIF_TERM atom_uescape;
ERL_NIF_TERM atom_pretty;
ERL_NIF_TERM atom_force_utf8;
ERL_NIF_TERM ref_object;
ERL_NIF_TERM ref_array;
} emqttd_plugin_mysql_app_st;
ERL_NIF_TERM make_atom(ErlNifEnv* env, const char* name);
ERL_NIF_TERM make_ok(emqttd_plugin_mysql_app_st* st, ErlNifEnv* env, ERL_NIF_TERM data);
ERL_NIF_TERM make_error(emqttd_plugin_mysql_app_st* st, ErlNifEnv* env, const char* error);
ERL_NIF_TERM pbkdf2_check(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
int int_from_hex(const unsigned char* p);
int int_to_hex(int val, char* p);
int utf8_len(int c);
int utf8_esc_len(int c);
int utf8_validate(unsigned char* data, size_t size);
int utf8_to_unicode(unsigned char* buf, size_t size);
int unicode_to_utf8(int c, unsigned char* buf);
int unicode_from_pair(int hi, int lo);
int unicode_uescape(int c, char* buf);
int double_to_shortest(char *buf, size_t size, size_t* len, double val);
#endif // Included EMQTTD_PLUGIN_MYSQL_APP_H

View File

@ -0,0 +1,278 @@
/*
* Copyright (c) 2013 Jan-Piet Mens <jpmens()gmail.com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of mosquitto nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <openssl/evp.h>
#include <openssl/rand.h>
#include "base64.h"
#include "erl_nif.h"
#include "emqttd_plugin_mysql_app.h"
#define KEY_LENGTH 24
#define SEPARATOR "$"
#define SEPARATOR1 "_"
#define TRUE (1)
#define FALSE (0)
/*
* Split PBKDF2$... string into their components. The caller must free()
* the strings.
*/
static int detoken(char *pbkstr, char **sha, int *iter, char **salt, char **key)
{
char *p, *s, *save;
int rc = 1;
save = s = strdup(pbkstr);
if ((p = strsep(&s, SEPARATOR1)) == NULL)
goto out;
if (strcmp(p, "pbkdf2") != 0)
goto out;
if ((p = strsep(&s, SEPARATOR)) == NULL)
goto out;
*sha = strdup(p);
if ((p = strsep(&s, SEPARATOR)) == NULL)
goto out;
*iter = atoi(p);
if ((p = strsep(&s, SEPARATOR)) == NULL)
goto out;
*salt = strdup(p);
if ((p = strsep(&s, SEPARATOR)) == NULL)
goto out;
*key = strdup(p);
rc = 0;
out:
free(save);
return rc;
}
ERL_NIF_TERM
pbkdf2_check(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
ERL_NIF_TERM ret;
ErlNifBinary binps, binhash;
emqttd_plugin_mysql_app_st* st = enif_alloc(sizeof(emqttd_plugin_mysql_app_st));
if(st == NULL) {
return make_atom(env, "alloc_error");
}
st->atom_ok = make_atom(env, "ok");
st->atom_error = make_atom(env, "error");
st->atom_null = make_atom(env, "null");
st->atom_true = make_atom(env, "true");
st->atom_false = make_atom(env, "false");
st->atom_bignum = make_atom(env, "bignum");
st->atom_bignum_e = make_atom(env, "bignum_e");
st->atom_bigdbl = make_atom(env, "bigdbl");
st->atom_partial = make_atom(env, "partial");
st->atom_uescape = make_atom(env, "uescape");
st->atom_pretty = make_atom(env, "pretty");
st->atom_force_utf8 = make_atom(env, "force_utf8");
// Markers used in encoding
st->ref_object = make_atom(env, "$object_ref$");
st->ref_array = make_atom(env, "$array_ref$");
if(argc != 2) {
return make_error(st, env, "Bad args");
} else if(!enif_inspect_binary(env, argv[0], &binps)|!enif_inspect_binary(env, argv[1], &binhash)) {
return make_error(st, env, "Bad args password or username inspect error");
}
char* password = (char*)binps.data;
char* hash = (char*)binhash.data;
static char *sha, *salt, *h_pw;
int iterations, saltlen, blen;
char *b64, *keybuf;
unsigned char *out;
int match = FALSE;
const EVP_MD *evpmd;
int keylen, rc;
if (detoken(hash, &sha, &iterations, &salt, &h_pw) != 0)
return match;
/* Determine key length by decoding base64 */
if ((keybuf = malloc(strlen(h_pw) + 1)) == NULL) {
return make_error(st, env, "internal_error: Out Of memory");
}
keylen = base64_decode(h_pw, keybuf);
if (keylen < 1) {
free(keybuf);
return make_atom(env, "false");
}
free(keybuf);
if ((out = malloc(keylen)) == NULL) {
return make_error(st, env, "Cannot allocate out; out of memory\n");
}
#ifdef PWDEBUG
fprintf(stderr, "sha =[%s]\n", sha);
fprintf(stderr, "iterations =%d\n", iterations);
fprintf(stderr, "salt =[%s]\n", salt);
fprintf(stderr, "h_pw =[%s]\n", h_pw);
fprintf(stderr, "kenlen =[%d]\n", keylen);
#endif
saltlen = strlen((char *)salt);
evpmd = EVP_sha256();
if (strcmp(sha, "sha1") == 0) {
evpmd = EVP_sha1();
} else if (strcmp(sha, "sha512") == 0) {
evpmd = EVP_sha512();
}
rc = PKCS5_PBKDF2_HMAC(password, strlen(password),
(unsigned char *)salt, saltlen,
iterations,
evpmd, keylen, out);
if (rc != 1) {
goto out;
}
blen = base64_encode(out, keylen, &b64);
if (blen > 0) {
int i, diff = 0, hlen = strlen(h_pw);
#ifdef PWDEBUG
fprintf(stderr, "HMAC b64 =[%s]\n", b64);
#endif
/* "manual" strcmp() to ensure constant time */
for (i = 0; (i < blen) && (i < hlen); i++) {
diff |= h_pw[i] ^ b64[i];
}
match = diff == 0;
if (hlen != blen)
match = 0;
free(b64);
}
out:
free(sha);
free(salt);
free(h_pw);
free(out);
if(match == 0){
ret = make_atom(env, "false");
}else{
ret = make_atom(env, "true");
}
return ret;
}
int pbkdf2_check_native(char *password, char *hash)
{
static char *sha, *salt, *h_pw;
int iterations, saltlen, blen;
char *b64;
unsigned char key[128];
int match = FALSE;
const EVP_MD *evpmd;
if (detoken(hash, &sha, &iterations, &salt, &h_pw) != 0)
return match;
#ifdef PWDEBUG
fprintf(stderr, "sha =[%s]\n", sha);
fprintf(stderr, "iterations =%d\n", iterations);
fprintf(stderr, "salt =[%s]\n", salt);
fprintf(stderr, "h_pw =[%s]\n", h_pw);
#endif
saltlen = strlen((char *)salt);
evpmd = EVP_sha256();
if (strcmp(sha, "sha1") == 0) {
evpmd = EVP_sha1();
} else if (strcmp(sha, "sha512") == 0) {
evpmd = EVP_sha512();
}
PKCS5_PBKDF2_HMAC(password, strlen(password),
(unsigned char *)salt, saltlen,
iterations,
evpmd, KEY_LENGTH, key);
blen = base64_encode(key, KEY_LENGTH, &b64);
if (blen > 0) {
int i, diff = 0, hlen = strlen(h_pw);
#ifdef PWDEBUG
fprintf(stderr, "HMAC b64 =[%s]\n", b64);
#endif
/* "manual" strcmp() to ensure constant time */
for (i = 0; (i < blen) && (i < hlen); i++) {
diff |= h_pw[i] ^ b64[i];
}
match = diff == 0;
if (hlen != blen)
match = 0;
free(b64);
}
free(sha);
free(salt);
free(h_pw);
return match;
}
int main()
{
// char password[] = "hello";
// char PB1[] = "PBKDF2$sha256$10000$eytf9sEo8EprP9P3$2eO6tROHiqI3bm+gg+vpmWooWMpz1zji";
char password[] = "supersecret";
//char PB1[] = "PBKDF2$sha256$10000$YEbSTt8FaMRDq/ib$Kt97+sMCYg00mqMOBAYinqZlnxX8HqHk";
char PB1[] = "pbkdf2_sha256$10000$YEbSTt8FaMRDq/ib$Kt97+sMCYg00mqMOBAYinqZlnxX8HqHk";
// char PB1[] = "PBKDF2$sha1$10000$XWfyPLeC9gsD6SbI$HOnjU4Ux7RpeBHdqYxpIGH1R5qCCtNA1";
// char PB1[] = "PBKDF2$sha512$10000$v/aaCgBZ+VZN5L8n$BpgjSTyb4weVxr9cA2mvQ+jaCyaAPeYe";
int match;
printf("Checking password [%s] for %s\n", password, PB1);
match = pbkdf2_check_native(password, PB1);
printf("match == %d\n", match);
return match;
}

View File

@ -0,0 +1,26 @@
// This file is part of Jiffy released under the MIT license.
// See the LICENSE file for more information.
#include "emqttd_plugin_mysql_app.h"
ERL_NIF_TERM
make_atom(ErlNifEnv* env, const char* name)
{
ERL_NIF_TERM ret;
if(enif_make_existing_atom(env, name, &ret, ERL_NIF_LATIN1)) {
return ret;
}
return enif_make_atom(env, name);
}
ERL_NIF_TERM
make_ok(emqttd_plugin_mysql_app_st* st, ErlNifEnv* env, ERL_NIF_TERM value)
{
return enif_make_tuple2(env, st->atom_ok, value);
}
ERL_NIF_TERM
make_error(emqttd_plugin_mysql_app_st* st, ErlNifEnv* env, const char* error)
{
return enif_make_tuple2(env, st->atom_error, make_atom(env, error));
}

View File

@ -0,0 +1,23 @@
[
{emysql, [
{pool, 4},
{host, "localhost"},
{port, 3306},
{username, "root"},
{password, "root"},
{database, "emqtt"},
{encoding, utf8}
]},
{emqttd_plugin_mysql, [
{users_table, auth_user},
{acls_table, auth_acl},
{field_mapper, [
{username, username},
{password, password, pbkdf2},
{user_super, is_super_user},
{acl_username, username},
{acl_rw, rw},
{acl_topic, topic}
]}
]}
].

View File

@ -0,0 +1,23 @@
[
{emysql, [
{pool, 4},
{host, "59.188.253.198"},
{port, 3306},
{username, "root"},
{password, "lhroot."},
{database, "musicfield"},
{encoding, utf8}
]},
{emqttd_plugin_mysql, [
{users_table, auth_user},
{acls_table, auth_acl},
{field_mapper, [
{username, username},
{password, password, pbkdf2},
{user_super, is_super_user},
{acl_username, username},
{acl_rw, rw},
{acl_topic, topic}
]}
]}
].

View File

@ -0,0 +1,112 @@
%%------------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
%%% @doc
%%% MQTT Broker Header.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Banner
%%------------------------------------------------------------------------------
-define(COPYRIGHT, "Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>").
-define(LICENSE_MESSAGE, "Licensed under MIT").
-define(PROTOCOL_VERSION, "MQTT/3.1.1").
-define(ERTS_MINIMUM, "6.0").
%%------------------------------------------------------------------------------
%% PubSub
%%------------------------------------------------------------------------------
-type pubsub() :: publish | subscribe.
%%------------------------------------------------------------------------------
%% MQTT Topic
%%------------------------------------------------------------------------------
-record(mqtt_topic, {
topic :: binary(),
node :: node()
}).
-type mqtt_topic() :: #mqtt_topic{}.
%%------------------------------------------------------------------------------
%% MQTT Subscriber
%%------------------------------------------------------------------------------
-record(mqtt_subscriber, {
topic :: binary(),
qos = 0 :: 0 | 1 | 2,
pid :: pid()
}).
-type mqtt_subscriber() :: #mqtt_subscriber{}.
%%------------------------------------------------------------------------------
%% P2P Queue Subscriber
%%------------------------------------------------------------------------------
-record(mqtt_queue, {
name :: binary(),
subpid :: pid(),
qos = 0 :: 0 | 1 | 2
}).
-type mqtt_queue() :: #mqtt_queue{}.
%%------------------------------------------------------------------------------
%% MQTT Client
%%------------------------------------------------------------------------------
-record(mqtt_client, {
clientid :: binary(),
username :: binary() | undefined,
ipaddr :: inet:ip_address()
}).
-type mqtt_client() :: #mqtt_client{}.
%%------------------------------------------------------------------------------
%% MQTT Session
%%------------------------------------------------------------------------------
-record(mqtt_session, {
clientid,
session_pid,
subscriptions = [],
awaiting_ack,
awaiting_rel
}).
-type mqtt_session() :: #mqtt_session{}.
%%------------------------------------------------------------------------------
%% MQTT Plugin
%%------------------------------------------------------------------------------
-record(mqtt_plugin, {
name,
version,
attrs,
description
}).
-type mqtt_plugin() :: #mqtt_plugin{}.

BIN
plugins/emqttd_plugin_mysql/rebar Executable file

Binary file not shown.

View File

@ -0,0 +1,32 @@
{port_specs, [
{"priv/emqttd_plugin_mysql_app.so", [
"c_src/*.c"
]}
]}.
{port_env, [
{".*", "CXXFLAGS", "$CXXFLAGS -g -Wall -Werror -O3"},
{"(linux|solaris|freebsd|netbsd|openbsd|dragonfly|darwin)",
"LDFLAGS", "$LDFLAGS -lstdc++ -lcrypto"},
%% OS X Leopard flags for 64-bit
{"darwin9.*-64$", "CXXFLAGS", "-m64"},
{"darwin9.*-64$", "LDFLAGS", "-arch x86_64"},
%% OS X Snow Leopard flags for 32-bit
{"darwin10.*-32$", "CXXFLAGS", "-m32"},
{"darwin10.*-32$", "LDFLAGS", "-arch i386"},
%% This will merge into basho/rebar/rebar.config eventually
{"win32", "CFLAGS", "/Wall /DWIN32 /D_WINDOWS /D_WIN32 /DWINDOWS"},
{"win32", "CXXFLAGS", "-g -Wall -O3"}
]}.
{eunit_opts, [
verbose,
{report, {
eunit_surefire, [{dir,"."}]
}}
]}.

View File

@ -0,0 +1,70 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd demo acl module.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_acl_mysql).
-include("emqttd.hrl").
-behaviour(emqttd_acl_mod).
%% ACL callbacks
-export([init/1, check_acl/2, reload_acl/1, description/0]).
-record(state, {user_table, acl_table, acl_username_field, acl_topic_field, acl_rw_field, user_name_field, user_super_field}).
init(Opts) ->
Mapper = proplists:get_value(field_mapper, Opts),
State =
#state{
user_table = proplists:get_value(users_table, Opts, auth_user),
user_super_field = proplists:get_value(is_super, Mapper, is_superuser),
user_name_field = proplists:get_value(username, Mapper, username),
acl_table = proplists:get_value(acls_table, Opts, auth_acl),
acl_username_field = proplists:get_value(acl_username, Mapper, username),
acl_rw_field = proplists:get_value(acl_rw, Mapper, rw),
acl_topic_field = proplists:get_value(acl_topic, Mapper, topic)
},
{ok, State}.
check_acl({#mqtt_client{username = Username}, PubSub, Topic}, #state{user_table = UserTab, acl_table = AclTab, user_name_field = UsernameField, user_super_field = SuperField, acl_topic_field = TopicField, acl_username_field = AclUserField, acl_rw_field = AclRwField}) ->
Flag = case PubSub of publish -> 2; subscribe -> 1; pubsub -> 2 end,
Where = {'and', {'>=', AclRwField, Flag}, {TopicField, Topic}},
Where1 = {'or', {AclUserField, Username}, {AclUserField, "*"}},
Where2 = {'and', Where, Where1},
case emysql:select(UserTab, {'and', {UsernameField, Username}, {SuperField, 1}}) of
{ok, []} ->
case emysql:select(UserTab, {UsernameField, Username}) of
{ok, []} -> ignore;
{ok, _} -> case emysql:select(AclTab, Where2) of
{ok, []} -> deny;
{ok, _Record} -> allow
end
end;
{ok, _} -> allow
end.
reload_acl(_State) -> ok.
description() -> "ACL Module by Mysql".

View File

@ -0,0 +1,110 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd authentication by mysql 'user' table.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_auth_mysql).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-behaviour(emqttd_auth_mod).
-export([init/1, check/3, description/0]).
-define(NOT_LOADED, not_loaded(?LINE)).
-record(state, {user_table, name_field, pass_field, pass_hash}).
init(Opts) ->
Mapper = proplists:get_value(field_mapper, Opts),
{ok, #state{user_table = proplists:get_value(user_table, Opts, auth_user),
name_field = proplists:get_value(username, Mapper),
pass_field = proplists:get_value(password, Mapper),
pass_hash = proplists:get_value(Opts, password_hash)}}.
check(#mqtt_client{username = undefined}, _Password, _State) ->
{error, "Username undefined"};
check(_Client, undefined, _State) ->
{error, "Password undefined"};
check(#mqtt_client{username = Username}, Password,
#state{user_table = UserTab, pass_hash = Type,
name_field = NameField, pass_field = PassField}) ->
Where = {'and', {NameField, Username}, {PassField, hash(Type, Password)}},
if Type =:= pbkdf2 ->
case emysql:select(UserTab, [PassField], {NameField, Username}) of
{ok, []} -> {error, "User not exist"};
{ok, Records} ->
if length(Records) =:= 1 ->
case pbkdf2_check(Password, lists:nth(Records, 1)) of
true ->
{ok, []};
false ->
{error, "UserName or Password is invalid"};
ErrorInfo ->
{error, ErrorInfo}
end;
true ->
{error, "UserName is ambiguous"}
end
end;
true ->
case emysql:select(UserTab, Where) of
{ok, []} -> {error, "Username or Password "};
{ok, _Record} -> ok
end
end.
description() -> "Authentication by MySQL".
hash(plain, Password) ->
Password;
hash(md5, Password) ->
hexstring(crypto:hash(md5, Password));
hash(sha, Password) ->
hexstring(crypto:hash(sha, Password)).
hexstring(<<X:128/big-unsigned-integer>>) ->
lists:flatten(io_lib:format("~32.16.0b", [X]));
hexstring(<<X:160/big-unsigned-integer>>) ->
lists:flatten(io_lib:format("~40.16.0b", [X])).
not_loaded(Line) ->
erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, Line}]}).
pbkdf2_check(Password, Pbkstr) ->
case nif_pbkdf2_check(Password, Pbkstr) of
{error, _} = Error ->
throw(Error);
IOData ->
IOData
end.
nif_pbkdf2_check(Password, Pbkstr) ->
?NOT_LOADED.

View File

@ -0,0 +1,12 @@
{application, emqttd_plugin_mysql,
[
{description, "emqttd MySQL Authentication Plugin"},
{vsn, "1.0"},
{registered, []},
{applications, [
kernel,
stdlib
]},
{mod, {emqttd_plugin_mysql_app, []}},
{env, []}
]}.

View File

@ -0,0 +1,80 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd mysql authentication app.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_plugin_mysql_app).
-on_load(init/0).
-behaviour(application).
%% Application callbacks
-export([start/2, prep_stop/1, stop/1, nif_pbkdf2_check/2]).
-behaviour(supervisor).
%% Supervisor callbacks
-export([init/1]).
-define(NOT_LOADED, not_loaded(?LINE)).
%%%=============================================================================
%%% Application callbacks
%%%=============================================================================
start(_StartType, _StartArgs) ->
Env = application:get_all_env(),
emqttd_access_control:register_mod(auth, emqttd_auth_mysql, Env),
emqttd_access_control:register_mod(acl, emqttd_acl_mysql, Env),
crypto:start(),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
prep_stop(State) ->
emqttd_access_control:unregister_mod(auth, emqttd_auth_mysql), State,
emqttd_access_control:unregister_mod(acl, emqttd_acl_mysql), State,
crypto:stop().
stop(_State) ->
ok.
init() ->
PrivDir = case code:priv_dir(?MODULE) of
{error, _} ->
EbinDir = filename:dirname(code:which(?MODULE)),
AppPath = filename:dirname(EbinDir),
filename:join(AppPath, "priv");
Path ->
Path
end,
erlang:load_nif(filename:join(PrivDir, "emqttd_plugin_mysql_app"), 0).
%%%=============================================================================
%%% Supervisor callbacks(Dummy)
%%%=============================================================================
init([]) ->
{ok, {{one_for_one, 5, 10}, []}}.
not_loaded(Line) ->
erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, Line}]}).
nif_pbkdf2_check(Password, Hash) ->
?NOT_LOADED.

View File

@ -1,12 +0,0 @@
{application, emqttd_presence,
[
{description, ""},
{vsn, "1"},
{registered, []},
{applications, [
kernel,
stdlib
]},
{mod, { emqttd_presence_app, []}},
{env, []}
]}.

View File

@ -1,78 +0,0 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd client presence manager
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_presence_mgr).
-behaviour(gen_server).
-define(SERVER, ?MODULE).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([start_link/0]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init(Args) ->
{ok, Args}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------

View File

@ -384,9 +384,15 @@ encode_where({like, Field, Value}) ->
encode_where({'<', Field, Value}) ->
atom_to_list(Field) ++ " < " ++ encode(Value);
encode_where({'<=', Field, Value}) ->
atom_to_list(Field) ++ " <= " ++ encode(Value);
encode_where({'>', Field, Value}) ->
atom_to_list(Field) ++ " > " ++ encode(Value);
encode_where({'>=', Field, Value}) ->
atom_to_list(Field) ++ " >= " ++ encode(Value);
encode_where({'in', Field, Values}) ->
InStr = string:join([encode(Value) || Value <- Values], ","),
atom_to_list(Field) ++ " in (" ++ InStr ++ ")";

View File

@ -21,7 +21,7 @@ start_link(Opts) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, Opts).
init(Opts) ->
PoolSize = proplists:get_value(pool_size, Opts,
PoolSize = proplists:get_value(pool, Opts,
erlang:system_info(schedulers)),
{ok, {{one_for_one, 10, 10},
[{emysql, {emysql, start_link, [PoolSize]}, transient,
@ -31,3 +31,4 @@ init(Opts) ->
}
}.

View File

@ -62,8 +62,8 @@
{packet, [
%% Max ClientId Length Allowed
{max_clientid_len, 1024},
%% Max Packet Size Allowed, 4K default
{max_packet_size, 4096}
%% Max Packet Size Allowed, 64K default
{max_packet_size, 65536}
]},
%% Client
{client, [
@ -86,15 +86,12 @@
%% System interval of publishing broker $SYS messages
{sys_interval, 60},
%% Subscribe these topics automatically when client connected
{forced_subscriptions, [{"$Q/client/$c", 0}]},
%% Retained messages
{retained, [
%% Max number of retained messages
{max_message_num, 100000},
%% Max Payload Size of retained message
{max_playload_size, 4096}
{max_playload_size, 65536}
]},
%% PubSub
{pubsub, [
@ -109,6 +106,14 @@
{ping_down_interval, 1} %seconds
]}
]},
%% Modules
{modules, [
%% Subscribe topics automatically when client connected
{autosub, [{"$Q/client/$c", 0}]}
%% Rewrite rules
%% {rewrite, [{file, "etc/rewrite.config"}]}
]},
%% Listeners
{listeners, [
{mqtt, 1883, [
@ -135,8 +140,8 @@
%% Socket Access Control
{access, [{allow, all}]},
%% SSL certificate and key files
{ssl, [{certfile, "etc/ssl.crt"},
{keyfile, "etc/ssl.key"}]},
{ssl, [{certfile, "etc/ssl/ssl.crt"},
{keyfile, "etc/ssl/ssl.key"}]},
%% Socket Options
{sockopts, [
{backlog, 1024}

View File

@ -1,20 +1,20 @@
[
{emysql, [
{pool_size, 4},
{host, "localhost"},
{port, 3306},
{username, "root"},
{password, "public"},
{database, "mqtt"},
{encoding, utf8}
]},
{emqttd_auth_mysql, [
{user_table, mqtt_users}
]}
% {emysql, [
% {pool_size, 4},
% {host, "localhost"},
% {port, 3306},
% {username, "root"},
% {password, "public"},
% {database, "mqtt"},
% {encoding, utf8}
% ]},
% {emqttd_auth_mysql, [
% {user_table, mqtt_users}
% ]}
%
% {emqttd_dashboard, [
% {listener,
% {http, 8080, [
% {http, 18083, [
% {acceptors, 4},
% {max_clients, 512}]}}
% ]}

14
rel/files/rewrite.config Normal file
View File

@ -0,0 +1,14 @@
%%%-----------------------------------------------------------------------------
%%
%% [Rewrite](https://github.com/emqtt/emqttd/wiki/Rewrite)
%%
%%%-----------------------------------------------------------------------------
%{topic, "x/#", [
% {rewrite, "^x/y/(.+)$", "z/y/$1"},
% {rewrite, "^x/(.+)$", "y/$1"}
%]}.
%{topic, "y/+/z/#", [
% {rewrite, "^y/(.+)/z/(.+)$", "y/z/$2"}
%]}.

View File

@ -63,6 +63,7 @@
{overlay, [
{mkdir, "log/"},
{mkdir, "etc/"},
{mkdir, "etc/ssl/"},
{mkdir, "data/"},
{mkdir, "plugins/"},
{copy, "files/erl", "\{\{erts_vsn\}\}/bin/erl"},
@ -72,10 +73,11 @@
{template, "files/emqttd.cmd", "bin/emqttd.cmd"},
{copy, "files/start_erl.cmd", "bin/start_erl.cmd"},
{copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"},
{copy, "files/ssl/ssl.crt", "etc/ssl.crt"},
{copy, "files/ssl/ssl.key", "etc/ssl.key"},
{copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"},
{copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"},
{template, "files/emqttd.config", "etc/emqttd.config"},
{template, "files/acl.config", "etc/acl.config"},
{template, "files/rewrite.config", "etc/rewrite.config"},
{template, "files/clients.config", "etc/clients.config"},
{template, "files/plugins.config", "etc/plugins.config"},
{template, "files/vm.args", "etc/vm.args"}