Merge branch 'dev' into dev-hd

This commit is contained in:
huangdan 2015-05-25 10:40:20 +08:00
commit 379426fa82
26 changed files with 524 additions and 212 deletions

View File

@ -32,6 +32,7 @@
open_listeners/1, close_listeners/1, open_listeners/1, close_listeners/1,
load_all_plugins/0, unload_all_plugins/0, load_all_plugins/0, unload_all_plugins/0,
load_plugin/1, unload_plugin/1, load_plugin/1, unload_plugin/1,
load_all_mods/0, is_mod_enabled/1,
loaded_plugins/0, loaded_plugins/0,
is_running/1]). is_running/1]).
@ -202,6 +203,17 @@ unload_app(App) ->
lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason}
end. end.
load_all_mods() ->
Mods = application:get_env(emqttd, modules, []),
lists:foreach(fun({Name, Opts}) ->
Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)),
Mod:load(Opts),
lager:info("load module ~s successfully", [Name])
end, Mods).
is_mod_enabled(Name) ->
env(modules, Name) =/= undefined.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Is running? %% @doc Is running?
%% @end %% @end

View File

@ -51,8 +51,9 @@ start(_StartType, _StartArgs) ->
emqttd_mnesia:start(), emqttd_mnesia:start(),
{ok, Sup} = emqttd_sup:start_link(), {ok, Sup} = emqttd_sup:start_link(),
start_servers(Sup), start_servers(Sup),
{ok, Listeners} = application:get_env(listeners), emqttd:load_all_mods(),
emqttd:load_all_plugins(), emqttd:load_all_plugins(),
{ok, Listeners} = application:get_env(listeners),
emqttd:open_listeners(Listeners), emqttd:open_listeners(Listeners),
register(emqttd, self()), register(emqttd, self()),
print_vsn(), print_vsn(),
@ -71,13 +72,14 @@ start_servers(Sup) ->
{"emqttd trace", emqttd_trace}, {"emqttd trace", emqttd_trace},
{"emqttd pooler", {supervisor, emqttd_pooler_sup}}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}},
{"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}},
{"emqttd session manager", emqttd_sm}, {"emqttd session manager", {supervisor, emqttd_sm_sup}},
{"emqttd session supervisor", {supervisor, emqttd_session_sup}}, {"emqttd session supervisor", {supervisor, emqttd_session_sup}},
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}},
{"emqttd stats", emqttd_stats}, {"emqttd stats", emqttd_stats},
{"emqttd metrics", emqttd_metrics}, {"emqttd metrics", emqttd_metrics},
%{"emqttd router", emqttd_router}, %{"emqttd router", emqttd_router},
{"emqttd broker", emqttd_broker}, {"emqttd broker", emqttd_broker},
{"emqttd mode supervisor", emqttd_mod_sup},
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
{"emqttd access control", emqttd_access_control}, {"emqttd access control", emqttd_access_control},
{"emqttd system monitor", emqttd_sysmon}], {"emqttd system monitor", emqttd_sysmon}],

View File

@ -42,6 +42,9 @@
%% Event API %% Event API
-export([subscribe/1, notify/2]). -export([subscribe/1, notify/2]).
%% Hook API
-export([hook/3, unhook/2, foreach_hooks/2, foldl_hooks/3]).
%% Broker API %% Broker API
-export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]). -export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]).
@ -127,6 +130,52 @@ datetime() ->
io_lib:format( io_lib:format(
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
%%------------------------------------------------------------------------------
%% @doc Hook
%% @end
%%------------------------------------------------------------------------------
-spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}.
hook(Hook, Name, MFA) ->
gen_server:call(?MODULE, {hook, Hook, Name, MFA}).
%%------------------------------------------------------------------------------
%% @doc Unhook
%% @end
%%------------------------------------------------------------------------------
-spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}.
unhook(Hook, Name) ->
gen_server:call(?MODULE, {unhook, Hook, Name}).
%%------------------------------------------------------------------------------
%% @doc Foreach hooks
%% @end
%%------------------------------------------------------------------------------
-spec foreach_hooks(Hook :: atom(), Args :: list()) -> any().
foreach_hooks(Hook, Args) ->
case ets:lookup(?BROKER_TAB, {hook, Hook}) of
[{_, Hooks}] ->
lists:foreach(fun({_Name, {M, F, A}}) ->
apply(M, F, Args++A)
end, Hooks);
[] ->
ok
end.
%%------------------------------------------------------------------------------
%% @doc Foldl hooks
%% @end
%%------------------------------------------------------------------------------
-spec foldl_hooks(Hook :: atom(), Args :: list(), Acc0 :: any()) -> any().
foldl_hooks(Hook, Args, Acc0) ->
case ets:lookup(?BROKER_TAB, {hook, Hook}) of
[{_, Hooks}] ->
lists:foldl(fun({_Name, {M, F, A}}, Acc) ->
apply(M, F, [Acc, Args++A])
end, Acc0, Hooks);
[] ->
ok
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Start a tick timer %% @doc Start a tick timer
%% @end %% @end
@ -163,8 +212,33 @@ init([]) ->
handle_call(uptime, _From, State) -> handle_call(uptime, _From, State) ->
{reply, uptime(State), State}; {reply, uptime(State), State};
handle_call({hook, Hook, Name, MFArgs}, _From, State) ->
Key = {hook, Hook}, Reply =
case ets:lookup(?BROKER_TAB, Key) of
[{Key, Hooks}] ->
case lists:keyfind(Name, 1, Hooks) of
{Name, _MFArgs} ->
{error, existed};
false ->
ets:insert(?BROKER_TAB, {Key, Hooks ++ [{Name, MFArgs}]})
end;
[] ->
ets:insert(?BROKER_TAB, {Key, [{Name, MFArgs}]})
end,
{reply, Reply, State};
handle_call({unhook, Name}, _From, State) ->
Key = {hook, Name}, Reply =
case ets:lookup(?BROKER_TAB, Key) of
[{Key, Hooks}] ->
ets:insert(?BROKER_TAB, {Key, lists:keydelete(Name, 1, Hooks)});
[] ->
{error, not_found}
end,
{reply, Reply, State};
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, error, State}. {reply, {error, unsupport_request}, State}.
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.

View File

@ -121,7 +121,7 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} =
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}}; {noreply, State#state{proto_state = ProtoState1}};
handle_info({force_subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) -> handle_info({subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) ->
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState), {ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}}; {noreply, State#state{proto_state = ProtoState1}};

View File

@ -33,7 +33,7 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
%% API Exports %% API Exports
-export([start_link/3]). -export([start_link/2, pool/0, table/0]).
-export([lookup/1, register/1, unregister/1]). -export([lookup/1, register/1, unregister/1]).
@ -41,9 +41,11 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {tab, statsfun}). -record(state, {id, tab, statsfun}).
-define(POOL, cm_pool). -define(CM_POOL, cm_pool).
-define(CLIENT_TAB, mqtt_client).
%%%============================================================================= %%%=============================================================================
%%% API %%% API
@ -53,12 +55,15 @@
%% @doc Start client manager %% @doc Start client manager
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when -spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
Id :: pos_integer(), Id :: pos_integer(),
TabId :: ets:tid(),
StatsFun :: fun(). StatsFun :: fun().
start_link(Id, TabId, StatsFun) -> start_link(Id, StatsFun) ->
gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []). gen_server:start_link(?MODULE, [Id, StatsFun], []).
pool() -> ?CM_POOL.
table() -> ?CLIENT_TAB.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Lookup client pid with clientId %% @doc Lookup client pid with clientId
@ -66,7 +71,7 @@ start_link(Id, TabId, StatsFun) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec lookup(ClientId :: binary()) -> pid() | undefined. -spec lookup(ClientId :: binary()) -> pid() | undefined.
lookup(ClientId) when is_binary(ClientId) -> lookup(ClientId) when is_binary(ClientId) ->
case ets:lookup(emqttd_cm_sup:table(), ClientId) of case ets:lookup(?CLIENT_TAB, ClientId) of
[{_, Pid, _}] -> Pid; [{_, Pid, _}] -> Pid;
[] -> undefined [] -> undefined
end. end.
@ -77,7 +82,7 @@ lookup(ClientId) when is_binary(ClientId) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec register(ClientId :: binary()) -> ok. -spec register(ClientId :: binary()) -> ok.
register(ClientId) when is_binary(ClientId) -> register(ClientId) when is_binary(ClientId) ->
CmPid = gproc_pool:pick_worker(?POOL, ClientId), CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
gen_server:call(CmPid, {register, ClientId, self()}, infinity). gen_server:call(CmPid, {register, ClientId, self()}, infinity).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -86,19 +91,19 @@ register(ClientId) when is_binary(ClientId) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec unregister(ClientId :: binary()) -> ok. -spec unregister(ClientId :: binary()) -> ok.
unregister(ClientId) when is_binary(ClientId) -> unregister(ClientId) when is_binary(ClientId) ->
CmPid = gproc_pool:pick_worker(?POOL, ClientId), CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
gen_server:cast(CmPid, {unregister, ClientId, self()}). gen_server:cast(CmPid, {unregister, ClientId, self()}).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================= %%%=============================================================================
init([Id, TabId, StatsFun]) -> init([Id, StatsFun]) ->
gproc_pool:connect_worker(?POOL, {?MODULE, Id}), gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}),
{ok, #state{tab = TabId, statsfun = StatsFun}}. {ok, #state{id = Id, statsfun = StatsFun}}.
handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> handle_call({register, ClientId, Pid}, _From, State) ->
case ets:lookup(Tab, ClientId) of case ets:lookup(?CLIENT_TAB, ClientId) of
[{_, Pid, _}] -> [{_, Pid, _}] ->
lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
ignore; ignore;
@ -106,9 +111,9 @@ handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) ->
lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]),
OldPid ! {stop, duplicate_id, Pid}, OldPid ! {stop, duplicate_id, Pid},
erlang:demonitor(MRef), erlang:demonitor(MRef),
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}); ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)});
[] -> [] ->
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}) ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)})
end, end,
{reply, ok, setstats(State)}; {reply, ok, setstats(State)};
@ -116,11 +121,11 @@ handle_call(Req, _From, State) ->
lager:error("unexpected request: ~p", [Req]), lager:error("unexpected request: ~p", [Req]),
{reply, {error, badreq}, State}. {reply, {error, badreq}, State}.
handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) -> handle_cast({unregister, ClientId, Pid}, State) ->
case ets:lookup(TabId, ClientId) of case ets:lookup(?CLIENT_TAB, ClientId) of
[{_, Pid, MRef}] -> [{_, Pid, MRef}] ->
erlang:demonitor(MRef, [flush]), erlang:demonitor(MRef, [flush]),
ets:delete(TabId, ClientId); ets:delete(?CLIENT_TAB, ClientId);
[_] -> [_] ->
ignore; ignore;
[] -> [] ->
@ -131,15 +136,15 @@ handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) ->
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = TabId}) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
ets:match_delete(TabId, {'_', DownPid, MRef}), ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}),
{noreply, setstats(State)}; {noreply, setstats(State)};
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, #state{id = Id}) ->
ok. gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -148,6 +153,6 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
setstats(State = #state{tab = TabId, statsfun = StatsFun}) -> setstats(State = #state{statsfun = StatsFun}) ->
StatsFun(ets:info(TabId, size)), State. StatsFun(ets:info(?CLIENT_TAB, size)), State.

View File

@ -33,29 +33,25 @@
-behaviour(supervisor). -behaviour(supervisor).
%% API %% API
-export([start_link/0, table/0]). -export([start_link/0]).
%% Supervisor callbacks %% Supervisor callbacks
-export([init/1]). -export([init/1]).
-define(CLIENT_TAB, mqtt_client).
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
table() -> ?CLIENT_TAB.
init([]) -> init([]) ->
TabId = ets:new(?CLIENT_TAB, [set, named_table, public, ets:new(emqttd_cm:table(), [set, named_table, public,
{write_concurrency, true}]), {write_concurrency, true}]),
Schedulers = erlang:system_info(schedulers), Schedulers = erlang:system_info(schedulers),
gproc_pool:new(cm_pool, hash, [{size, Schedulers}]), gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]),
StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'),
Children = lists:map( Children = lists:map(
fun(I) -> fun(I) ->
Name = {emqttd_cm, I}, Name = {emqttd_cm, I},
gproc_pool:add_worker(cm_pool, Name, I), gproc_pool:add_worker(emqttd_cm:pool(), Name, I),
{Name, {emqttd_cm, start_link, [I, TabId, StatsFun]}, {Name, {emqttd_cm, start_link, [I, StatsFun]},
permanent, 10000, worker, [emqttd_cm]} permanent, 10000, worker, [emqttd_cm]}
end, lists:seq(1, Schedulers)), end, lists:seq(1, Schedulers)),
{ok, {{one_for_all, 10, 100}, Children}}. {ok, {{one_for_all, 10, 100}, Children}}.

View File

@ -1,5 +1,5 @@
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io> %%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%% %%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal %%% of this software and associated documentation files (the "Software"), to deal
@ -20,23 +20,30 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqttd demo auth module. %%% emqttd gen_mod behaviour
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_plugin_demo_auth). -module(emqttd_gen_mod).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-include_lib("emqttd/include/emqttd.hrl"). -include("emqttd.hrl").
-behaviour(emqttd_auth_mod). -ifdef(use_specs).
-export([init/1, check/3, description/0]). -callback load(Opts :: any()) -> {ok, State :: any()}.
init(Opts) -> {ok, Opts}. -callback unload(State :: any()) -> any().
check(_Client, _Password, _Opts) -> ignore. -else.
description() -> "Demo authentication module". -export([behaviour_info/1]).
behaviour_info(callbacks) ->
[{load, 1}, {unload, 1}];
behaviour_info(_Other) ->
undefined.
-endif.

View File

@ -1,5 +1,5 @@
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io> %%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%% %%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal %%% of this software and associated documentation files (the "Software"), to deal
@ -20,26 +20,31 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqttd demo acl module. %%% emqttd auto subscribe module.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_plugin_demo_acl).
-module(emqttd_mod_autosub).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-include_lib("emqttd/include/emqttd.hrl"). -behaviour(emqttd_gen_mod).
-behaviour(emqttd_acl_mod). -export([load/1, subscribe/2, unload/1]).
%% ACL callbacks -record(state, {topics}).
-export([init/1, check_acl/2, reload_acl/1, description/0]).
init(Opts) -> {ok, Opts}. load(Opts) ->
Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2],
emqttd_broker:hook(client_connected, {?MODULE, subscribe},
{?MODULE, subscribe, [Topics]}),
{ok, #state{topics = Topics}}.
check_acl({_Client, _PubSub, _Topic}, _State) -> ignore. subscribe({Client, ClientId}, Topics) ->
F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end,
[Client ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics].
reload_acl(_State) -> ok. unload(_Opts) ->
emqttd_broker:unhook(client_connected, {?MODULE, subscribe}).
description() -> "Demo ACL Module".

View File

@ -0,0 +1,122 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd rewrite module.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_mod_rewrite).
-author("Feng Lee <feng@emqtt.io>").
-include_lib("emqtt/include/emqtt.hrl").
-behaviour(emqttd_gen_mod).
-export([load/1, reload/1, unload/1]).
-export([rewrite/2]).
%%%=============================================================================
%%% API
%%%=============================================================================
load(Opts) ->
File = proplists:get_value(file, Opts),
{ok, Terms} = file:consult(File),
Sections = compile(Terms),
emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe},
{?MODULE, rewrite, [subscribe, Sections]}),
emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe},
{?MODULE, rewrite_unsubscribe, [unsubscribe, Sections]}),
emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish},
{?MODULE, rewrite_publish, [publish, Sections]}).
rewrite(TopicTable, [subscribe, Sections]) ->
[{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable];
rewrite(Topics, [unsubscribe, Sections]) ->
[match_topic(Topic, Sections) || Topic <- Topics];
rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) ->
%%TODO: this will not work if the client is always online.
RewriteTopic =
case get({rewrite, Topic}) of
undefined ->
DestTopic = match_topic(Topic, Sections),
put({rewrite, Topic}, DestTopic), DestTopic;
DestTopic ->
DestTopic
end,
Message#mqtt_message{topic = RewriteTopic}.
reload(File) ->
%%TODO: The unload api is not right...
case emqttd:is_mod_enabled(rewrite) of
true ->
unload(state),
load([{file, File}]);
false ->
{error, module_unloaded}
end.
unload(_) ->
emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}),
emqttd_broker:unhook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}),
emqttd_broker:unhook(client_publish, {?MODULE, rewrite_publish}).
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
compile(Sections) ->
C = fun({rewrite, Re, Dest}) ->
{ok, MP} = re:compile(Re),
{rewrite, MP, Dest}
end,
[{topic, Topic, [C(R) || R <- Rules]} || {topic, Topic, Rules} <- Sections].
match_topic(Topic, []) ->
Topic;
match_topic(Topic, [{topic, Filter, Rules}|Sections]) ->
case emqtt_topic:match(Topic, Filter) of
true ->
match_rule(Topic, Rules);
false ->
match_topic(Topic, Sections)
end.
match_rule(Topic, []) ->
Topic;
match_rule(Topic, [{rewrite, MP, Dest}|Rules]) ->
case re:run(Topic, MP, [{captrue, all_but_first, list}]) of
{match, Captured} ->
%%TODO: stupid??? how to replace $1, $2?
Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured),
iolist_to_binary(lists:foldl(
fun({Var, Val}, Acc) ->
re:replace(Acc, Var, Val, [global])
end, Dest, Vars));
nomatch ->
match_rule(Topic, Rules)
end.

View File

@ -0,0 +1,67 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd module supervisor.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_mod_sup).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-behaviour(supervisor).
%% API
-export([start_link/0, start_child/1, start_child/2]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}).
%%%=============================================================================
%%% API
%%%=============================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(ChildSpec) when is_tuple(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec).
%%
%% start_child(Mod::atom(), Type::type()) -> {ok, pid()}
%% @type type() = worker | supervisor
%%
start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
%%%=============================================================================
%%% Supervisor callbacks
%%%=============================================================================
init([]) ->
{ok, {{one_for_one, 10, 3600}, []}}.

View File

@ -134,10 +134,10 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
emqttd_cm:register(ClientId1), emqttd_cm:register(ClientId1),
%%Starting session %%Starting session
{ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}), {ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}),
%% Force subscriptions
force_subscribe(ClientId1),
%% Start keepalive %% Start keepalive
start_keepalive(KeepAlive), start_keepalive(KeepAlive),
%% Run hooks
emqttd_broker:foreach_hooks(client_connected, [{self(), ClientId1}]),
{?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1, {?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1,
session = Session, session = Session,
will_msg = willmsg(Var)}}; will_msg = willmsg(Var)}};
@ -212,12 +212,13 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid =
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
{ok, State}; {ok, State};
false -> false ->
TopicTable1 = emqttd_broker:foldl_hooks(client_subscribe, [], TopicTable),
%%TODO: GrantedQos should be renamed. %%TODO: GrantedQos should be renamed.
{ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1),
send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession}) send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession})
end; end;
handle({subscribe, Topic, Qos}, State = #proto_state{clientid = ClientId, session = Session}) -> handle({subscribe, Topic, Qos}, State = #proto_state{session = Session}) ->
{ok, NewSession, _GrantedQos} = emqttd_session:subscribe(Session, [{Topic, Qos}]), {ok, NewSession, _GrantedQos} = emqttd_session:subscribe(Session, [{Topic, Qos}]),
{ok, State#proto_state{session = NewSession}}; {ok, State#proto_state{session = NewSession}};
@ -226,7 +227,8 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
send(?UNSUBACK_PACKET(PacketId), State); send(?UNSUBACK_PACKET(PacketId), State);
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
{ok, NewSession} = emqttd_session:unsubscribe(Session, Topics), Topics1 = emqttd_broker:foldl_hooks(client_unsubscribe, [], Topics),
{ok, NewSession} = emqttd_session:unsubscribe(Session, Topics1),
send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession}); send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession});
handle(?PACKET(?PINGREQ), State) -> handle(?PACKET(?PINGREQ), State) ->
@ -298,23 +300,8 @@ send_willmsg(_ClientId, undefined) ->
send_willmsg(ClientId, WillMsg) -> send_willmsg(ClientId, WillMsg) ->
emqttd_pubsub:publish(ClientId, WillMsg). emqttd_pubsub:publish(ClientId, WillMsg).
%%TODO: will be fixed in 0.8
force_subscribe(ClientId) ->
case emqttd_broker:env(forced_subscriptions) of
undefined ->
ingore;
Topics ->
[force_subscribe(ClientId, {Topic, Qos}) || {Topic, Qos} <- Topics]
end.
force_subscribe(ClientId, {Topic, Qos}) when is_list(Topic) ->
force_subscribe(ClientId, {list_to_binary(Topic), Qos});
force_subscribe(ClientId, {Topic, Qos}) when is_binary(Topic) ->
Topic1 = emqtt_topic:feed_var(<<"$c">>, ClientId, Topic),
self() ! {force_subscribe, Topic1, Qos}.
start_keepalive(0) -> ignore; start_keepalive(0) -> ignore;
start_keepalive(Sec) when Sec > 0 -> start_keepalive(Sec) when Sec > 0 ->
self() ! {keepalive, start, round(Sec * 1.5)}. self() ! {keepalive, start, round(Sec * 1.5)}.

View File

@ -44,10 +44,8 @@
-behaviour(gen_server). -behaviour(gen_server).
-define(SERVER, ?MODULE).
%% API Function Exports %% API Function Exports
-export([start_link/0]). -export([start_link/2, pool/0, table/0]).
-export([lookup_session/1, start_session/2, destroy_session/1]). -export([lookup_session/1, start_session/2, destroy_session/1]).
@ -55,16 +53,37 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {tabid, statsfun}). -record(state, {id, tabid, statsfun}).
-define(SM_POOL, sm_pool).
-define(SESSION_TAB, mqtt_session). -define(SESSION_TAB, mqtt_session).
%%%============================================================================= %%%=============================================================================
%%% API %%% API
%%%============================================================================= %%%=============================================================================
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
start_link() -> %%------------------------------------------------------------------------------
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %% @doc Start a session manager
%% @end
%%------------------------------------------------------------------------------
-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
Id :: pos_integer(),
StatsFun :: fun().
start_link(Id, StatsFun) ->
gen_server:start_link(?MODULE, [Id, StatsFun], []).
%%------------------------------------------------------------------------------
%% @doc Pool name.
%% @end
%%------------------------------------------------------------------------------
pool() -> ?SM_POOL.
%%------------------------------------------------------------------------------
%% @doc Table name.
%% @end
%%------------------------------------------------------------------------------
table() -> ?SESSION_TAB.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Lookup Session Pid %% @doc Lookup Session Pid
@ -72,7 +91,7 @@ start_link() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec lookup_session(binary()) -> pid() | undefined. -spec lookup_session(binary()) -> pid() | undefined.
lookup_session(ClientId) -> lookup_session(ClientId) ->
case ets:lookup(?SESSION_TAB, ClientId) of case ets:lookup(emqttd_sm_sup:table(), ClientId) of
[{_, SessPid, _}] -> SessPid; [{_, SessPid, _}] -> SessPid;
[] -> undefined [] -> undefined
end. end.
@ -83,7 +102,8 @@ lookup_session(ClientId) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}. -spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}.
start_session(ClientId, ClientPid) -> start_session(ClientId, ClientPid) ->
gen_server:call(?SERVER, {start_session, ClientId, ClientPid}). SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server:call(SmPid, {start_session, ClientId, ClientPid}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Destroy a session %% @doc Destroy a session
@ -91,29 +111,27 @@ start_session(ClientId, ClientPid) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec destroy_session(binary()) -> ok. -spec destroy_session(binary()) -> ok.
destroy_session(ClientId) -> destroy_session(ClientId) ->
gen_server:call(?SERVER, {destroy_session, ClientId}). SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server:call(SmPid, {destroy_session, ClientId}).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================= %%%=============================================================================
init([]) -> init([Id, StatsFun]) ->
process_flag(trap_exit, true), gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}),
TabId = ets:new(?SESSION_TAB, [set, protected, named_table]), {ok, #state{id = Id, statsfun = StatsFun}}.
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
{ok, #state{tabid = TabId, statsfun = StatsFun}}.
handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) -> handle_call({start_session, ClientId, ClientPid}, _From, State) ->
Reply = Reply =
case ets:lookup(Tab, ClientId) of case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, _MRef}] -> [{_, SessPid, _MRef}] ->
emqttd_session:resume(SessPid, ClientId, ClientPid), emqttd_session:resume(SessPid, ClientId, ClientPid),
{ok, SessPid}; {ok, SessPid};
[] -> [] ->
case emqttd_session_sup:start_session(ClientId, ClientPid) of case emqttd_session_sup:start_session(ClientId, ClientPid) of
{ok, SessPid} -> {ok, SessPid} ->
ets:insert(Tab, {ClientId, SessPid, ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}),
erlang:monitor(process, SessPid)}),
{ok, SessPid}; {ok, SessPid};
{error, Error} -> {error, Error} ->
{error, Error} {error, Error}
@ -121,12 +139,12 @@ handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid =
end, end,
{reply, Reply, setstats(State)}; {reply, Reply, setstats(State)};
handle_call({destroy_session, ClientId}, _From, State = #state{tabid = Tab}) -> handle_call({destroy_session, ClientId}, _From, State) ->
case ets:lookup(Tab, ClientId) of case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, MRef}] -> [{_, SessPid, MRef}] ->
emqttd_session:destroy(SessPid, ClientId), emqttd_session:destroy(SessPid, ClientId),
erlang:demonitor(MRef, [flush]), erlang:demonitor(MRef, [flush]),
ets:delete(Tab, ClientId); ets:delete(?SESSION_TAB, ClientId);
[] -> [] ->
ignore ignore
end, end,
@ -145,8 +163,8 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tabid = Ta
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, #state{id = Id}) ->
ok. gproc_pool:disconnect_worker(?SM_POOL, {?MODULE, Id}), ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.

View File

@ -0,0 +1,59 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd client manager supervisor.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_sm_sup).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
%% API
-export([start_link/0]).
-behaviour(supervisor).
%% Supervisor callbacks
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
ets:new(emqttd_sm:table(), [set, named_table, public,
{write_concurrency, true}]),
Schedulers = erlang:system_info(schedulers),
gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]),
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
Children = lists:map(
fun(I) ->
Name = {emqttd_sm, I},
gproc_pool:add_worker(emqttd_sm:pool(), Name, I),
{Name, {emqttd_sm, start_link, [I, StatsFun]},
permanent, 10000, worker, [emqttd_sm]}
end, lists:seq(1, Schedulers)),
{ok, {{one_for_all, 10, 100}, Children}}.

View File

@ -39,7 +39,7 @@
-export([init/1]). -export([init/1]).
%% Helper macro for declaring children of supervisor %% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). -define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}).
%%%============================================================================= %%%=============================================================================
%%% API %%% API
@ -63,5 +63,5 @@ start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
%%%============================================================================= %%%=============================================================================
init([]) -> init([]) ->
{ok, {{one_for_all, 10, 100}, []}}. {ok, {{one_for_all, 10, 3600}, []}}.

View File

@ -1,4 +1,3 @@
## Overview ## Overview
Authentication with LDAP. Authentication with LDAP.
@ -16,7 +15,6 @@ Authentication with LDAP.
{"certfile", "ssl.crt"}, {"certfile", "ssl.crt"},
{"keyfile", "ssl.key"}]} {"keyfile", "ssl.key"}]}
]} ]}
``` ```
## Load Plugin ## Load Plugin

View File

@ -1,6 +1,6 @@
{application, emqttd_auth_ldap, {application, emqttd_auth_ldap,
[ [
{description, "emqttd LDA Authentication Plugin"}, {description, "emqttd LDAP Authentication Plugin"},
{vsn, "1.0"}, {vsn, "1.0"},
{registered, []}, {registered, []},
{applications, [ {applications, [

View File

@ -30,6 +30,8 @@
-include_lib("emqttd/include/emqttd.hrl"). -include_lib("emqttd/include/emqttd.hrl").
-import(proplists, [get_value/2, get_value/3]).
-behaviour(emqttd_auth_mod). -behaviour(emqttd_auth_mod).
-export([init/1, check/3, description/0]). -export([init/1, check/3, description/0]).
@ -37,14 +39,14 @@
-record(state, {servers, user_dn, options}). -record(state, {servers, user_dn, options}).
init(Opts) -> init(Opts) ->
Servers = proplists:get_value(servers, Opts, ["localhost"]), Servers = get_value(servers, Opts, ["localhost"]),
Port = proplists:get_value(port, Opts, 389), Port = get_value(port, Opts, 389),
Timeout = proplists:get_value(timeout, Opts, 30), Timeout = get_value(timeout, Opts, 30),
UserDn = proplists:get_value(user_dn, Opts), UserDn = get_value(user_dn, Opts),
LdapOpts = LdapOpts =
case proplists:get_value(ssl, Opts, false) of case get_value(ssl, Opts, false) of
true -> true ->
SslOpts = proplists:get_value(sslopts, Opts), SslOpts = get_value(sslopts, Opts),
[{port, Port}, {timeout, Timeout}, {sslopts, SslOpts}]; [{port, Port}, {timeout, Timeout}, {sslopts, SslOpts}];
false -> false ->
[{port, Port}, {timeout, Timeout}] [{port, Port}, {timeout, Timeout}]
@ -67,8 +69,6 @@ check(#mqtt_client{username = Username}, Password,
{error, Reason} {error, Reason}
end. end.
description() -> "LDAP Authentication Module".
ldap_bind(LDAP, UserDn, Password) -> ldap_bind(LDAP, UserDn, Password) ->
case catch eldap:simple_bind(LDAP, UserDn, Password) of case catch eldap:simple_bind(LDAP, UserDn, Password) of
ok -> ok ->
@ -87,3 +87,6 @@ fill(Username, UserDn) ->
(S) -> S (S) -> S
end, string:tokens(UserDn, ",="))). end, string:tokens(UserDn, ",="))).
description() ->
"LDAP Authentication Module".

View File

@ -20,7 +20,7 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% LDAP Authentication APP. %%% LDAP Authentication Plugin.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
@ -56,4 +56,3 @@ stop(_State) ->
init([]) -> init([]) ->
{ok, { {one_for_one, 5, 10}, []} }. {ok, { {one_for_one, 5, 10}, []} }.

View File

@ -1,11 +1,11 @@
## Overview ## Overview
Authentication with user table of MySQL database. Authentication with user table of MySQL database.
## etc/plugin.config ## etc/plugin.config
```erlang ```
[
{emysql, [ {emysql, [
{pool, 4}, {pool, 4},
{host, "localhost"}, {host, "localhost"},
@ -24,7 +24,6 @@ Authentication with user table of MySQL database.
{password, password} {password, password}
]} ]}
]} ]}
].
``` ```
## Users Table(Demo) ## Users Table(Demo)

View File

@ -1,5 +1,4 @@
[ {emysql, [
{emysql, [
{pool, 4}, {pool, 4},
{host, "localhost"}, {host, "localhost"},
{port, 3306}, {port, 3306},
@ -7,12 +6,11 @@
{password, "public"}, {password, "public"},
{database, "mqtt"}, {database, "mqtt"},
{encoding, utf8} {encoding, utf8}
]}, ]},
{emqttd_auth_mysql, [ {emqttd_auth_mysql, [
{users_table, mqtt_users}, {users_table, mqtt_users},
{field_mapper, [ {field_mapper, [
{username, username}, {username, username},
{password, password, plain} {password, password, plain}
]} ]}
]} ]}
].

View File

@ -1,12 +0,0 @@
{application, emqttd_plugin_demo,
[
{description, "emqttd demo plugin"},
{vsn, "0.1"},
{registered, []},
{applications, [
kernel,
stdlib
]},
{mod, { emqttd_plugin_demo_app, []}},
{env, []}
]}.

View File

@ -1,21 +0,0 @@
-module(emqttd_plugin_demo_app).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
%% ===================================================================
%% Application callbacks
%% ===================================================================
start(_StartType, _StartArgs) ->
{ok, Sup} = emqttd_plugin_demo_sup:start_link(),
emqttd_access_control:register_mod(auth, emqttd_plugin_demo_auth, []),
emqttd_access_control:register_mod(acl, emqttd_plugin_demo_acl, []),
{ok, Sup}.
stop(_State) ->
emqttd_access_control:unregister_mod(auth, emqttd_plugin_demo_auth),
emqttd_access_control:unregister_mod(acl, emqttd_plugin_demo_acl),
ok.

View File

@ -1,27 +0,0 @@
-module(emqttd_plugin_demo_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
%% ===================================================================
%% API functions
%% ===================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
init([]) ->
{ok, { {one_for_one, 5, 10}, []} }.

View File

@ -86,15 +86,12 @@
%% System interval of publishing broker $SYS messages %% System interval of publishing broker $SYS messages
{sys_interval, 60}, {sys_interval, 60},
%% Subscribe these topics automatically when client connected
{forced_subscriptions, [{"$Q/client/$c", 0}]},
%% Retained messages %% Retained messages
{retained, [ {retained, [
%% Max number of retained messages %% Max number of retained messages
{max_message_num, 100000}, {max_message_num, 100000},
%% Max Payload Size of retained message %% Max Payload Size of retained message
{max_playload_size, 4096} {max_playload_size, 65536}
]}, ]},
%% PubSub %% PubSub
{pubsub, [ {pubsub, [
@ -109,6 +106,14 @@
{ping_down_interval, 1} %seconds {ping_down_interval, 1} %seconds
]} ]}
]}, ]},
%% Modules
{modules, [
%% Subscribe topics automatically when client connected
{autosub, [{"$Q/client/$c", 0}]},
%% Rewrite rules
{rewrite, [{file, "etc/rewrite.config"}]}
]},
%% Listeners %% Listeners
{listeners, [ {listeners, [
{mqtt, 1883, [ {mqtt, 1883, [
@ -135,8 +140,8 @@
%% Socket Access Control %% Socket Access Control
{access, [{allow, all}]}, {access, [{allow, all}]},
%% SSL certificate and key files %% SSL certificate and key files
{ssl, [{certfile, "etc/ssl.crt"}, {ssl, [{certfile, "etc/ssl/ssl.crt"},
{keyfile, "etc/ssl.key"}]}, {keyfile, "etc/ssl/ssl.key"}]},
%% Socket Options %% Socket Options
{sockopts, [ {sockopts, [
{backlog, 1024} {backlog, 1024}

14
rel/files/rewrite.config Normal file
View File

@ -0,0 +1,14 @@
%%%-----------------------------------------------------------------------------
%%
%% [Rewrite](https://github.com/emqtt/emqttd/wiki/Rewrite)
%%
%%%-----------------------------------------------------------------------------
{topic, "x/#", [
{rewrite, "^x/y/(.+)$", "z/y/$1"},
{rewrite, "^x/(.+)$", "y/$1"}
]}.
{topic, "y/+/z/#", [
{rewrite, "^y/(.+)/z/(.+)$", "y/z/$2"}
]}.

View File

@ -63,6 +63,7 @@
{overlay, [ {overlay, [
{mkdir, "log/"}, {mkdir, "log/"},
{mkdir, "etc/"}, {mkdir, "etc/"},
{mkdir, "etc/ssl/"},
{mkdir, "data/"}, {mkdir, "data/"},
{mkdir, "plugins/"}, {mkdir, "plugins/"},
{copy, "files/erl", "\{\{erts_vsn\}\}/bin/erl"}, {copy, "files/erl", "\{\{erts_vsn\}\}/bin/erl"},
@ -72,10 +73,11 @@
{template, "files/emqttd.cmd", "bin/emqttd.cmd"}, {template, "files/emqttd.cmd", "bin/emqttd.cmd"},
{copy, "files/start_erl.cmd", "bin/start_erl.cmd"}, {copy, "files/start_erl.cmd", "bin/start_erl.cmd"},
{copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"},
{copy, "files/ssl/ssl.crt", "etc/ssl.crt"}, {copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"},
{copy, "files/ssl/ssl.key", "etc/ssl.key"}, {copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"},
{template, "files/emqttd.config", "etc/emqttd.config"}, {template, "files/emqttd.config", "etc/emqttd.config"},
{template, "files/acl.config", "etc/acl.config"}, {template, "files/acl.config", "etc/acl.config"},
{template, "files/rewrite.config", "etc/rewrite.config"},
{template, "files/clients.config", "etc/clients.config"}, {template, "files/clients.config", "etc/clients.config"},
{template, "files/plugins.config", "etc/plugins.config"}, {template, "files/plugins.config", "etc/plugins.config"},
{template, "files/vm.args", "etc/vm.args"} {template, "files/vm.args", "etc/vm.args"}