2.0 - read config from etc/emqttd.conf

This commit is contained in:
Feng Lee 2016-07-21 17:23:34 +08:00
parent 1602b44267
commit 0dbbe7d0df
21 changed files with 257 additions and 129 deletions

View File

@ -1,3 +1,3 @@
testclientid0
testclientid1 127.0.0.1
testclientid2 192.168.0.1/24
"testclientid0".
{"testclientid1", "127.0.0.1"}.
{"testclientid2", "192.168.0.1/24"}.

View File

@ -6,7 +6,7 @@
%%
%% {}: Tuple, usually {Key, Value}
%% []: List, seperated by comma
%% %%: comment
%% %%: Comment
%%
%%===================================================================
@ -31,11 +31,10 @@
{auth, anonymous, []}.
%% Authentication with username, password
%% Passwd Hash: plain | md5 | sha | sha256
{auth, username, [{passwd, "etc/passwd.conf"}, {passwd_hash, plain}]}.
{auth, username, [{passwd, "etc/passwd.conf"}]}.
%% Authentication with clientId
{auth, clientid, [{config, "etc/client.config"}, {password, no}]}.
{auth, clientid, [{config, "etc/client.conf"}, {password, no}]}.
%%--------------------------------------------------------------------
%% ACL
@ -45,6 +44,9 @@
{acl, internal, [{config, "etc/acl.conf"}, {nomatch, allow}]}.
%% Cache ACL result for PUBLISH
{cache_acl, true}.
%%--------------------------------------------------------------------
%% Broker
%%--------------------------------------------------------------------
@ -220,6 +222,9 @@
%% Plugins
%%-------------------------------------------------------------------
%% Dir of plugins' config
{plugins_etc_dir, "etc/plugins/"}.
%% File to store loaded plugin names.
{plugins_loaded_file, "data/loaded_plugins"}.

View File

@ -1,2 +1,2 @@
user1:passwd1
user2:passwd2
{"user1", "passwd1"}.
{"user2", "passwd2"}.

View File

@ -23,7 +23,7 @@
-define(SERVER, ?MODULE).
%% API Function Exports
-export([start_link/0, start_link/1,
-export([start_link/0,
auth/2, % authentication
check_acl/3, % acl check
reload_acl/0, % reload acl
@ -48,11 +48,8 @@
%% @doc Start access control server.
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() -> start_link(emqttd:env(access)).
-spec(start_link(Opts :: list()) -> {ok, pid()} | ignore | {error, any()}).
start_link(Opts) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% @doc Authenticate MQTT Client.
-spec(auth(Client :: mqtt_client(), Password :: password()) -> ok | {error, any()}).
@ -125,17 +122,14 @@ stop() -> gen_server:call(?MODULE, stop).
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Opts]) ->
init([]) ->
ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]),
ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, Opts))}),
ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, Opts))}),
ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(gen_conf:list(emqttd, auth))}),
ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(gen_conf:list(emqttd, acl))}),
{ok, #state{}}.
init_mods(auth, AuthMods) ->
[init_mod(authmod(Name), Opts) || {Name, Opts} <- AuthMods];
init_mods(acl, AclMods) ->
[init_mod(aclmod(Name), Opts) || {Name, Opts} <- AclMods].
init_mods(Mods) ->
[init_mod(mod_name(Type, Name), Opts) || {Type, Name, Opts} <- Mods].
init_mod(Mod, Opts) ->
{ok, State} = Mod:init(Opts), {Mod, State, 0}.
@ -191,15 +185,14 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
authmod(Name) when is_atom(Name) ->
mod(emqttd_auth_, Name).
aclmod(Name) when is_atom(Name) ->
mod(emqttd_acl_, Name).
mod_name(auth, Name) -> mod(emqttd_auth_, Name);
mod_name(acl, Name) -> mod(emqttd_acl_, Name).
mod(Prefix, Name) ->
list_to_atom(lists:concat([Prefix, Name])).
if_existed(false, Fun) -> Fun();
if_existed(_Mod, _Fun) -> {error, already_existed}.

View File

@ -0,0 +1,35 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_acl_anonymous).
-behaviour(emqttd_acl_mod).
%% ACL callbacks
-export([init/1, check_acl/2, reload_acl/1, description/0]).
init(Opts) ->
{ok, Opts}.
check_acl(_Who, _State) ->
allow.
reload_acl(_State) ->
ok.
description() ->
"Anonymous ACL".

View File

@ -27,7 +27,7 @@
-define(ACL_RULE_TAB, mqtt_acl_rule).
-record(state, {acl_file, nomatch = allow}).
-record(state, {config, nomatch = allow}).
%%--------------------------------------------------------------------
%% API
@ -46,16 +46,20 @@ all_rules() ->
%%--------------------------------------------------------------------
%% @doc Init internal ACL
-spec(init(AclOpts :: list()) -> {ok, State :: any()}).
init(AclOpts) ->
-spec(init(Opts :: list()) -> {ok, State :: any()}).
init(Opts) ->
ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]),
AclFile = proplists:get_value(file, AclOpts),
Default = proplists:get_value(nomatch, AclOpts, allow),
State = #state{acl_file = AclFile, nomatch = Default},
true = load_rules_from_file(State),
{ok, State}.
case proplists:get_value(config, Opts) of
undefined ->
{ok, #state{}};
File ->
Default = proplists:get_value(nomatch, Opts, allow),
State = #state{config = File, nomatch = Default},
true = load_rules_from_file(State),
{ok, State}
end.
load_rules_from_file(#state{acl_file = AclFile}) ->
load_rules_from_file(#state{config = AclFile}) ->
{ok, Terms} = file:consult(AclFile),
Rules = [emqttd_access_rule:compile(Term) || Term <- Terms],
lists:foreach(fun(PubSub) ->
@ -83,6 +87,8 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
PubSub :: pubsub(),
Topic :: binary(),
State :: #state{}).
check_acl(_Who, #state{config = undefined}) ->
allow;
check_acl({Client, PubSub, Topic}, #state{nomatch = Default}) ->
case match(Client, Topic, lookup(PubSub)) of
{matched, allow} -> allow;
@ -115,5 +121,6 @@ reload_acl(State) ->
%% @doc ACL Module Description
-spec(description() -> string()).
description() -> "Internal ACL with etc/acl.config".
description() ->
"Internal ACL with etc/acl.conf".

View File

@ -1,4 +1,4 @@
%%--------------------------------------------------------------------
%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
@ -26,12 +26,8 @@
-export([start_listener/1, stop_listener/1, is_mod_enabled/1]).
%% MQTT SockOpts
-define(MQTT_SOCKOPTS, [
binary,
{packet, raw},
{reuseaddr, true},
{backlog, 512},
{nodelay, true}]).
-define(MQTT_SOCKOPTS, [binary, {packet, raw}, {reuseaddr, true},
{backlog, 512}, {nodelay, true}]).
-type listener() :: {atom(), esockd:listen_on(), [esockd:option()]}.
@ -102,17 +98,17 @@ start_servers(Sup) ->
start_server(_Sup, {Name, F}) when is_function(F) ->
?PRINT("~s is starting...", [Name]),
F(),
?PRINT_MSG("[done]~n");
?PRINT_MSG("[ok]~n");
start_server(Sup, {Name, Server}) ->
?PRINT("~s is starting...", [Name]),
start_child(Sup, Server),
?PRINT_MSG("[done]~n");
?PRINT_MSG("[ok]~n");
start_server(Sup, {Name, Server, Opts}) ->
?PRINT("~s is starting...", [ Name]),
start_child(Sup, Server, Opts),
?PRINT_MSG("[done]~n").
?PRINT_MSG("[ok]~n").
start_child(Sup, {supervisor, Module}) ->
supervisor:start_child(Sup, supervisor_spec(Module));
@ -150,9 +146,9 @@ worker_spec(M, F, A) ->
%% @doc load all modules
load_all_mods() ->
lists:foreach(fun load_mod/1, emqttd:env(modules)).
lists:foreach(fun load_mod/1, gen_conf:list(emqttd, module)).
load_mod({Name, Opts}) ->
load_mod({module, Name, Opts}) ->
Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)),
case catch Mod:load(Opts) of
ok -> lager:info("Load module ~s successfully", [Name]);
@ -162,7 +158,8 @@ load_mod({Name, Opts}) ->
%% @doc Is module enabled?
-spec(is_mod_enabled(Name :: atom()) -> boolean()).
is_mod_enabled(Name) -> emqttd:env(modules, Name) =/= undefined.
is_mod_enabled(Name) ->
lists:keyfind(Name, 2, gen_conf:list(emqttd, module)).
%%--------------------------------------------------------------------
%% Start Listeners
@ -170,25 +167,27 @@ is_mod_enabled(Name) -> emqttd:env(modules, Name) =/= undefined.
%% @doc Start Listeners of the broker.
-spec(start_listeners() -> any()).
start_listeners() -> lists:foreach(fun start_listener/1, emqttd:env(listeners)).
start_listeners() -> lists:foreach(fun start_listener/1, gen_conf:list(emqttd, listener)).
%% Start mqtt listener
-spec(start_listener(listener()) -> any()).
start_listener({mqtt, ListenOn, Opts}) -> start_listener(mqtt, ListenOn, Opts);
start_listener({listener, mqtt, ListenOn, Opts}) ->
start_listener(mqtt, ListenOn, Opts);
%% Start mqtt(SSL) listener
start_listener({mqtts, ListenOn, Opts}) -> start_listener(mqtts, ListenOn, Opts);
start_listener({listener, mqtts, ListenOn, Opts}) ->
start_listener(mqtts, ListenOn, Opts);
%% Start http listener
start_listener({http, ListenOn, Opts}) ->
start_listener({listener, http, ListenOn, Opts}) ->
mochiweb:start_http(http, ListenOn, Opts, {emqttd_http, handle_request, []});
%% Start https listener
start_listener({https, ListenOn, Opts}) ->
start_listener({listener, https, ListenOn, Opts}) ->
mochiweb:start_http(https, ListenOn, Opts, {emqttd_http, handle_request, []}).
start_listener(Protocol, ListenOn, Opts) ->
MFArgs = {emqttd_client, start_link, [emqttd:env(mqtt)]},
MFArgs = {emqttd_client, start_link, [emqttd_conf:mqtt()]},
{ok, _} = esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs).
merge_sockopts(Options) ->
@ -201,8 +200,8 @@ merge_sockopts(Options) ->
%%--------------------------------------------------------------------
%% @doc Stop Listeners
stop_listeners() -> lists:foreach(fun stop_listener/1, emqttd:env(listeners)).
stop_listeners() -> lists:foreach(fun stop_listener/1, gen_conf:list(listener)).
%% @private
stop_listener({Protocol, ListenOn, _Opts}) -> esockd:close(Protocol, ListenOn).
stop_listener({listener, Protocol, ListenOn, _Opts}) -> esockd:close(Protocol, ListenOn).

View File

@ -69,7 +69,8 @@ init(Opts) ->
{ram_copies, [node()]},
{attributes, record_info(fields, ?AUTH_CLIENTID_TAB)}]),
mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies),
load(proplists:get_value(file, Opts)),
Clients = load_client_from(proplists:get_value(config, Opts)),
mnesia:transaction(fun() -> [mnesia:write(C) || C<- Clients] end),
{ok, Opts}.
check(#mqtt_client{client_id = undefined}, _Password, _Opts) ->
@ -93,32 +94,19 @@ description() -> "ClientId authentication module".
%% Internal functions
%%--------------------------------------------------------------------
load(undefined) ->
load_client_from(undefined) ->
ok;
load(File) ->
{ok, Fd} = file:open(File, [read]),
load(Fd, file:read_line(Fd), []).
load_client_from(File) ->
{ok, Clients} = file:consult(File),
[client(Client) || Client <- Clients].
load(Fd, {ok, Line}, Clients) when is_list(Line) ->
Clients1 =
case string:tokens(Line, " ") of
[ClientIdS] ->
ClientId = list_to_binary(string:strip(ClientIdS, right, $\n)),
[#mqtt_auth_clientid{client_id = ClientId} | Clients];
[ClientId, IpAddr0] ->
IpAddr = string:strip(IpAddr0, right, $\n),
[#mqtt_auth_clientid{client_id = list_to_binary(ClientId),
ipaddr = esockd_cidr:parse(IpAddr, true)} | Clients];
BadLine ->
lager:error("BadLine in clients.config: ~s", [BadLine]),
Clients
end,
load(Fd, file:read_line(Fd), Clients1);
client(ClientId) when is_list(ClientId) ->
#mqtt_auth_clientid{client_id = list_to_binary(ClientId)};
load(Fd, eof, Clients) ->
mnesia:transaction(fun() -> [mnesia:write(C) || C<- Clients] end),
file:close(Fd).
client({ClientId, IpAddr}) when is_list(ClientId) ->
#mqtt_auth_clientid{client_id = iolist_to_binary(ClientId),
ipaddr = esockd_cidr:parse(IpAddr, true)}.
check_clientid_only(ClientId, IpAddr) ->
case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of

View File

@ -68,7 +68,7 @@ if_enabled(Fun) ->
end.
hint() ->
?PRINT_MSG("Please enable '{username, []}' authentication in etc/emqttd.config first.~n").
?PRINT_MSG("Please enable '{auth, username, []}' in etc/emqttd.conf first.~n").
%%--------------------------------------------------------------------
%% API
@ -81,7 +81,13 @@ is_enabled() ->
-spec(add_user(binary(), binary()) -> ok | {error, any()}).
add_user(Username, Password) ->
User = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)},
ret(mnesia:transaction(fun mnesia:write/1, [User])).
ret(mnesia:transaction(fun insert_user/1, [User])).
insert_user(User = #?AUTH_USERNAME_TAB{username = Username}) ->
case mnesia:read(?AUTH_USERNAME_TAB, Username) of
[] -> mnesia:write(User);
[_|_] -> mnesia:abort(existed)
end.
add_default_user(Username, Password) when is_atom(Username) ->
add_default_user(atom_to_list(Username), Password);
@ -110,16 +116,20 @@ all_users() -> mnesia:dirty_all_keys(?AUTH_USERNAME_TAB).
%% emqttd_auth_mod callbacks
%%--------------------------------------------------------------------
init(DefautUsers) ->
init(Opts) ->
mnesia:create_table(?AUTH_USERNAME_TAB, [
{disc_copies, [node()]},
{attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]),
mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), disc_copies),
lists:foreach(fun({Username, Password}) ->
add_default_user(Username, Password)
end, DefautUsers),
case proplists:get_value(passwd, Opts) of
undefined -> ok;
File -> {ok, DefaultUsers} = file:consult(File),
lists:foreach(fun({Username, Password}) ->
add_default_user(Username, Password)
end, DefaultUsers)
end,
emqttd_ctl:register_cmd(users, {?MODULE, cli}, []),
{ok, []}.
{ok, Opts}.
check(#mqtt_client{username = undefined}, _Password, _Opts) ->
{error, username_undefined};
@ -127,7 +137,7 @@ check(_User, undefined, _Opts) ->
{error, password_undefined};
check(#mqtt_client{username = Username}, Password, _Opts) ->
case mnesia:dirty_read(?AUTH_USERNAME_TAB, Username) of
[] ->
[] ->
{error, username_not_found};
[#?AUTH_USERNAME_TAB{password = <<Salt:4/binary, Hash/binary>>}] ->
case Hash =:= md5_hash(Salt, Password) of

View File

@ -47,7 +47,7 @@ start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
start_bridge(Node, _Topic, _Options) when Node =:= node() ->
{error, bridge_to_self};
start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) ->
Options1 = emqttd_opts:merge(emqttd_broker:env(bridge), Options),
Options1 = emqttd_opts:merge(emqttd_conf:bridge(), Options),
supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)).
%% @doc Stop a bridge

View File

@ -29,7 +29,7 @@
-export([subscribe/1, notify/2]).
%% Broker API
-export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]).
-export([version/0, uptime/0, datetime/0, sysdescr/0]).
%% Tick API
-export([start_tick/1, stop_tick/1]).
@ -71,10 +71,6 @@ subscribe(EventType) ->
notify(EventType, Event) ->
gproc:send({p, l, {broker, EventType}}, {notify, EventType, self(), Event}).
%% @doc Get broker env
env(Name) ->
proplists:get_value(Name, emqttd:env(broker)).
%% @doc Get broker version
-spec(version() -> string()).
version() ->
@ -99,7 +95,7 @@ datetime() ->
%% @doc Start a tick timer
start_tick(Msg) ->
start_tick(timer:seconds(env(sys_interval)), Msg).
start_tick(timer:seconds(emqttd:conf(broker_sys_interval, 60)), Msg).
start_tick(0, _Msg) ->
undefined;

View File

@ -94,9 +94,8 @@ init([OriginConn, MqttEnv]) ->
error:Error -> Self ! {shutdown, Error}
end
end,
PktOpts = proplists:get_value(packet, MqttEnv),
ParserFun = emqttd_parser:new(PktOpts),
ProtoState = emqttd_protocol:init(PeerName, SendFun, PktOpts),
ParserFun = emqttd_parser:new(MqttEnv),
ProtoState = emqttd_protocol:init(PeerName, SendFun, MqttEnv),
RateLimit = proplists:get_value(rate_limit, Connection:opts()),
State = run_socket(#client_state{connection = Connection,
connname = ConnName,
@ -108,9 +107,8 @@ init([OriginConn, MqttEnv]) ->
rate_limit = RateLimit,
parser_fun = ParserFun,
proto_state = ProtoState,
packet_opts = PktOpts}),
ClientOpts = proplists:get_value(client, MqttEnv),
IdleTimout = proplists:get_value(idle_timeout, ClientOpts, 10),
packet_opts = MqttEnv}),
IdleTimout = proplists:get_value(client_idle_timeout, MqttEnv, 30),
gen_server:enter_loop(?MODULE, [], State, timer:seconds(IdleTimout)).
handle_call(session, _From, State = #client_state{proto_state = ProtoState}) ->

99
src/emqttd_conf.erl Normal file
View File

@ -0,0 +1,99 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_conf).
-export([mqtt/0, retained/0, session/0, queue/0, bridge/0, pubsub/0]).
mqtt() ->
[
%% Max ClientId Length Allowed.
{max_clientid_len, emqttd:conf(mqtt_max_clientid_len, 512)},
%% Max Packet Size Allowed, 64K by default.
{max_packet_size, emqttd:conf(mqtt_max_packet_size, 65536)},
%% Client Idle Timeout.
{client_idle_timeout, emqttd:conf(mqtt_client_idle_timeout, 30)}
].
retained() ->
[
%% Expired after seconds, never expired if 0
{expired_after, emqttd:conf(retained_expired_after, 0)},
%% Max number of retained messages
{max_message_num, emqttd:conf(retained_max_message_num, 100000)},
%% Max Payload Size of retained message
{max_playload_size, emqttd:conf(retained_max_playload_size, 65536)}
].
session() ->
[
%% Max number of QoS 1 and 2 messages that can be inflight at one time.
%% 0 means no limit
{max_inflight, emqttd:conf(session_max_inflight, 100)},
%% Retry interval for redelivering QoS1/2 messages.
{unack_retry_interval, emqttd:conf(session_unack_retry_interval, 60)},
%% Awaiting PUBREL Timeout
{await_rel_timeout, emqttd:conf(session_await_rel_timeout, 20)},
%% Max Packets that Awaiting PUBREL, 0 means no limit
{max_awaiting_rel, emqttd:conf(session_max_awaiting_rel, 0)},
%% Statistics Collection Interval(seconds)
{collect_interval, emqttd:conf(session_collect_interval, 0)},
%% Expired after 2 day (unit: minute)
{expired_after, emqttd:conf(session_expired_after, 2880)}
].
queue() ->
[
%% Type: simple | priority
{type, emqttd:conf(queue_type, simple)},
%% Topic Priority: 0~255, Default is 0
{priority, emqttd:conf(queue_priority, [])},
%% Max queue length. Enqueued messages when persistent client disconnected,
%% or inflight window is full.
{max_length, emqttd:conf(queue_max_length, infinity)},
%% Low-water mark of queued messages
{low_watermark, emqttd:conf(queue_low_watermark, 0.2)},
%% High-water mark of queued messages
{high_watermark, emqttd:conf(queue_high_watermark, 0.6)},
%% Queue Qos0 messages?
{queue_qos0, emqttd:conf(queue_qos0, true)}
].
bridge() ->
[
%% TODO: Bridge Queue Size
{max_queue_len, emqttd:conf(bridge_max_queue_len, 10000)},
%% Ping Interval of bridge node
{ping_down_interval, emqttd:conf(bridge_ping_down_interval, 1)}
].
pubsub() ->
[
%% PubSub and Router. Default should be scheduler numbers.
{pool_size, emqttd:conf(pubsub_pool_size, 8)}
].

View File

@ -30,12 +30,15 @@
%%--------------------------------------------------------------------
load(Opts) ->
File = proplists:get_value(file, Opts),
{ok, Terms} = file:consult(File),
Sections = compile(Terms),
emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Sections]),
emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Sections]),
emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections]).
case proplists:get_value(config, Opts) of
undefined ->
ok;
File ->
{ok, Terms} = file:consult(File), Sections = compile(Terms),
emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Sections]),
emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Sections]),
emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections])
end.
rewrite_subscribe(_ClientId, TopicTable, Sections) ->
lager:info("Rewrite subscribe: ~p", [TopicTable]),

View File

@ -30,10 +30,10 @@
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_broker:env(pubsub)]).
supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:pubsub()]).
pubsub_pool() ->
hd([Pid|| {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]).
hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]).
init([Env]) ->

View File

@ -71,7 +71,7 @@ limit(payload) -> env(max_playload_size).
env(Key) ->
case get({retained, Key}) of
undefined ->
Env = emqttd_broker:env(retained),
Env = emqttd_conf:retained(),
Val = proplists:get_value(Key, Env),
put({retained, Key}, Val), Val;
Val ->

View File

@ -214,8 +214,7 @@ unsubscribe(SessPid, Topics) ->
init([CleanSess, ClientId, ClientPid]) ->
process_flag(trap_exit, true),
true = link(ClientPid),
QEnv = emqttd:env(mqtt, queue),
SessEnv = emqttd:env(mqtt, session),
SessEnv = emqttd_conf:session(),
Session = #session{
clean_sess = CleanSess,
client_id = ClientId,
@ -223,7 +222,7 @@ init([CleanSess, ClientId, ClientPid]) ->
subscriptions = dict:new(),
inflight_queue = [],
max_inflight = get_value(max_inflight, SessEnv, 0),
message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
message_queue = emqttd_mqueue:new(ClientId, emqttd_conf:queue(), emqttd_alarm:alarm_fun()),
awaiting_rel = #{},
awaiting_ack = #{},
awaiting_comp = #{},
@ -234,7 +233,7 @@ init([CleanSess, ClientId, ClientPid]) ->
collect_interval = get_value(collect_interval, SessEnv, 0),
timestamp = os:timestamp()},
emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)),
%% start statistics
%% Start statistics
{ok, start_collector(Session), hibernate}.
prioritise_call(Msg, _From, _Len, _State) ->

View File

@ -33,12 +33,10 @@ init([]) ->
{ok, {{one_for_one, 10, 100}, [Sysmon]}}.
opts() ->
Opts = [{long_gc, config(sysmon_long_gc)},
{long_schedule, config(sysmon_long_schedule)},
{large_heap, config(sysmon_large_heap)},
{busy_port, config(busy_port)},
{busy_dist_port, config(sysmon_busy_dist_port)}],
Opts = [{long_gc, emqttd:conf(sysmon_long_gc)},
{long_schedule, emqttd:conf(sysmon_long_schedule)},
{large_heap, emqttd:conf(sysmon_large_heap)},
{busy_port, emqttd:conf(busy_port)},
{busy_dist_port, emqttd:conf(sysmon_busy_dist_port)}],
[{Key, Val} || {Key, {ok, Val}} <- Opts].
config(Key) -> gen_conf:value(emqttd, Key).

View File

@ -31,7 +31,7 @@
%% @doc Handle WebSocket Request.
handle_request(Req) ->
Peer = Req:get(peer),
PktOpts = emqttd:env(mqtt, packet),
PktOpts = emqttd_conf:mqtt(),
ParserFun = emqttd_parser:new(PktOpts),
{ReentryWs, ReplyChannel} = upgrade(Req),
{ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),

View File

@ -66,17 +66,15 @@ init([MqttEnv, WsPid, Req, ReplyChannel]) ->
{ok, Peername} = Req:get(peername),
Headers = mochiweb_headers:to_list(
mochiweb_request:get(headers, Req)),
PktOpts = proplists:get_value(packet, MqttEnv),
SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
ProtoState = emqttd_protocol:init(Peername, SendFun,
[{ws_initial_headers, Headers} | PktOpts]),
[{ws_initial_headers, Headers} | MqttEnv]),
{ok, #wsclient_state{ws_pid = WsPid, peer = Req:get(peer),
connection = Req:get(connection),
proto_state = ProtoState}, idle_timeout(MqttEnv)}.
idle_timeout(MqttEnv) ->
ClientOpts = proplists:get_value(client, MqttEnv),
timer:seconds(proplists:get_value(idle_timeout, ClientOpts, 10)).
timer:seconds(proplists:get_value(client_idle_timeout, MqttEnv, 10)).
handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
{reply, emqttd_protocol:session(ProtoState), State};

View File

@ -27,7 +27,7 @@
%% @doc Start websocket client supervisor
-spec(start_link() -> {ok, pid()}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd:env(mqtt)]).
supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:mqtt()]).
%% @doc Start a WebSocket Client
-spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}).