From 0dbbe7d0df5441e52e3ec7ef617055e6bbe9ce1a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 21 Jul 2016 17:23:34 +0800 Subject: [PATCH] 2.0 - read config from etc/emqttd.conf --- etc/client.conf | 6 +-- etc/emqttd.conf | 13 +++-- etc/passwd.conf | 4 +- src/emqttd_access_control.erl | 31 +++++------ src/emqttd_acl_anonymous.erl | 35 +++++++++++++ src/emqttd_acl_internal.erl | 27 ++++++---- src/emqttd_app.erl | 41 +++++++-------- src/emqttd_auth_clientid.erl | 34 ++++-------- src/emqttd_auth_username.erl | 26 ++++++--- src/emqttd_bridge_sup.erl | 2 +- src/emqttd_broker.erl | 8 +-- src/emqttd_client.erl | 10 ++-- src/emqttd_conf.erl | 99 +++++++++++++++++++++++++++++++++++ src/emqttd_mod_rewrite.erl | 15 +++--- src/emqttd_pubsub_sup.erl | 4 +- src/emqttd_retainer.erl | 2 +- src/emqttd_session.erl | 7 ++- src/emqttd_sysmon_sup.erl | 12 ++--- src/emqttd_ws.erl | 2 +- src/emqttd_ws_client.erl | 6 +-- src/emqttd_ws_client_sup.erl | 2 +- 21 files changed, 257 insertions(+), 129 deletions(-) create mode 100644 src/emqttd_acl_anonymous.erl create mode 100644 src/emqttd_conf.erl diff --git a/etc/client.conf b/etc/client.conf index 2c880c365..45d62d440 100644 --- a/etc/client.conf +++ b/etc/client.conf @@ -1,3 +1,3 @@ -testclientid0 -testclientid1 127.0.0.1 -testclientid2 192.168.0.1/24 +"testclientid0". +{"testclientid1", "127.0.0.1"}. +{"testclientid2", "192.168.0.1/24"}. diff --git a/etc/emqttd.conf b/etc/emqttd.conf index 927bc1d78..6e195fb3b 100644 --- a/etc/emqttd.conf +++ b/etc/emqttd.conf @@ -6,7 +6,7 @@ %% %% {}: Tuple, usually {Key, Value} %% []: List, seperated by comma -%% %%: comment +%% %%: Comment %% %%=================================================================== @@ -31,11 +31,10 @@ {auth, anonymous, []}. %% Authentication with username, password -%% Passwd Hash: plain | md5 | sha | sha256 -{auth, username, [{passwd, "etc/passwd.conf"}, {passwd_hash, plain}]}. +{auth, username, [{passwd, "etc/passwd.conf"}]}. %% Authentication with clientId -{auth, clientid, [{config, "etc/client.config"}, {password, no}]}. +{auth, clientid, [{config, "etc/client.conf"}, {password, no}]}. %%-------------------------------------------------------------------- %% ACL @@ -45,6 +44,9 @@ {acl, internal, [{config, "etc/acl.conf"}, {nomatch, allow}]}. +%% Cache ACL result for PUBLISH +{cache_acl, true}. + %%-------------------------------------------------------------------- %% Broker %%-------------------------------------------------------------------- @@ -220,6 +222,9 @@ %% Plugins %%------------------------------------------------------------------- +%% Dir of plugins' config +{plugins_etc_dir, "etc/plugins/"}. + %% File to store loaded plugin names. {plugins_loaded_file, "data/loaded_plugins"}. diff --git a/etc/passwd.conf b/etc/passwd.conf index e6998746d..89ac8ffe2 100644 --- a/etc/passwd.conf +++ b/etc/passwd.conf @@ -1,2 +1,2 @@ -user1:passwd1 -user2:passwd2 +{"user1", "passwd1"}. +{"user2", "passwd2"}. diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index 47d6417d4..079bdbec3 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -23,7 +23,7 @@ -define(SERVER, ?MODULE). %% API Function Exports --export([start_link/0, start_link/1, +-export([start_link/0, auth/2, % authentication check_acl/3, % acl check reload_acl/0, % reload acl @@ -48,11 +48,8 @@ %% @doc Start access control server. -spec(start_link() -> {ok, pid()} | ignore | {error, any()}). -start_link() -> start_link(emqttd:env(access)). - --spec(start_link(Opts :: list()) -> {ok, pid()} | ignore | {error, any()}). -start_link(Opts) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %% @doc Authenticate MQTT Client. -spec(auth(Client :: mqtt_client(), Password :: password()) -> ok | {error, any()}). @@ -125,17 +122,14 @@ stop() -> gen_server:call(?MODULE, stop). %% gen_server callbacks %%-------------------------------------------------------------------- -init([Opts]) -> +init([]) -> ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]), - ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, Opts))}), - ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, Opts))}), + ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(gen_conf:list(emqttd, auth))}), + ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(gen_conf:list(emqttd, acl))}), {ok, #state{}}. -init_mods(auth, AuthMods) -> - [init_mod(authmod(Name), Opts) || {Name, Opts} <- AuthMods]; - -init_mods(acl, AclMods) -> - [init_mod(aclmod(Name), Opts) || {Name, Opts} <- AclMods]. +init_mods(Mods) -> + [init_mod(mod_name(Type, Name), Opts) || {Type, Name, Opts} <- Mods]. init_mod(Mod, Opts) -> {ok, State} = Mod:init(Opts), {Mod, State, 0}. @@ -191,15 +185,14 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -authmod(Name) when is_atom(Name) -> - mod(emqttd_auth_, Name). - -aclmod(Name) when is_atom(Name) -> - mod(emqttd_acl_, Name). +mod_name(auth, Name) -> mod(emqttd_auth_, Name); +mod_name(acl, Name) -> mod(emqttd_acl_, Name). + mod(Prefix, Name) -> list_to_atom(lists:concat([Prefix, Name])). if_existed(false, Fun) -> Fun(); + if_existed(_Mod, _Fun) -> {error, already_existed}. diff --git a/src/emqttd_acl_anonymous.erl b/src/emqttd_acl_anonymous.erl new file mode 100644 index 000000000..ef80457fd --- /dev/null +++ b/src/emqttd_acl_anonymous.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_acl_anonymous). + +-behaviour(emqttd_acl_mod). + +%% ACL callbacks +-export([init/1, check_acl/2, reload_acl/1, description/0]). + +init(Opts) -> + {ok, Opts}. + +check_acl(_Who, _State) -> + allow. + +reload_acl(_State) -> + ok. + +description() -> + "Anonymous ACL". + diff --git a/src/emqttd_acl_internal.erl b/src/emqttd_acl_internal.erl index eba5bd905..ff1efcd16 100644 --- a/src/emqttd_acl_internal.erl +++ b/src/emqttd_acl_internal.erl @@ -27,7 +27,7 @@ -define(ACL_RULE_TAB, mqtt_acl_rule). --record(state, {acl_file, nomatch = allow}). +-record(state, {config, nomatch = allow}). %%-------------------------------------------------------------------- %% API @@ -46,16 +46,20 @@ all_rules() -> %%-------------------------------------------------------------------- %% @doc Init internal ACL --spec(init(AclOpts :: list()) -> {ok, State :: any()}). -init(AclOpts) -> +-spec(init(Opts :: list()) -> {ok, State :: any()}). +init(Opts) -> ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]), - AclFile = proplists:get_value(file, AclOpts), - Default = proplists:get_value(nomatch, AclOpts, allow), - State = #state{acl_file = AclFile, nomatch = Default}, - true = load_rules_from_file(State), - {ok, State}. + case proplists:get_value(config, Opts) of + undefined -> + {ok, #state{}}; + File -> + Default = proplists:get_value(nomatch, Opts, allow), + State = #state{config = File, nomatch = Default}, + true = load_rules_from_file(State), + {ok, State} + end. -load_rules_from_file(#state{acl_file = AclFile}) -> +load_rules_from_file(#state{config = AclFile}) -> {ok, Terms} = file:consult(AclFile), Rules = [emqttd_access_rule:compile(Term) || Term <- Terms], lists:foreach(fun(PubSub) -> @@ -83,6 +87,8 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> PubSub :: pubsub(), Topic :: binary(), State :: #state{}). +check_acl(_Who, #state{config = undefined}) -> + allow; check_acl({Client, PubSub, Topic}, #state{nomatch = Default}) -> case match(Client, Topic, lookup(PubSub)) of {matched, allow} -> allow; @@ -115,5 +121,6 @@ reload_acl(State) -> %% @doc ACL Module Description -spec(description() -> string()). -description() -> "Internal ACL with etc/acl.config". +description() -> + "Internal ACL with etc/acl.conf". diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 277d0fd4d..870a349a6 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -1,4 +1,4 @@ -%%-------------------------------------------------------------------- +%-------------------------------------------------------------------- %% Copyright (c) 2012-2016 Feng Lee . %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -26,12 +26,8 @@ -export([start_listener/1, stop_listener/1, is_mod_enabled/1]). %% MQTT SockOpts --define(MQTT_SOCKOPTS, [ - binary, - {packet, raw}, - {reuseaddr, true}, - {backlog, 512}, - {nodelay, true}]). +-define(MQTT_SOCKOPTS, [binary, {packet, raw}, {reuseaddr, true}, + {backlog, 512}, {nodelay, true}]). -type listener() :: {atom(), esockd:listen_on(), [esockd:option()]}. @@ -102,17 +98,17 @@ start_servers(Sup) -> start_server(_Sup, {Name, F}) when is_function(F) -> ?PRINT("~s is starting...", [Name]), F(), - ?PRINT_MSG("[done]~n"); + ?PRINT_MSG("[ok]~n"); start_server(Sup, {Name, Server}) -> ?PRINT("~s is starting...", [Name]), start_child(Sup, Server), - ?PRINT_MSG("[done]~n"); + ?PRINT_MSG("[ok]~n"); start_server(Sup, {Name, Server, Opts}) -> ?PRINT("~s is starting...", [ Name]), start_child(Sup, Server, Opts), - ?PRINT_MSG("[done]~n"). + ?PRINT_MSG("[ok]~n"). start_child(Sup, {supervisor, Module}) -> supervisor:start_child(Sup, supervisor_spec(Module)); @@ -150,9 +146,9 @@ worker_spec(M, F, A) -> %% @doc load all modules load_all_mods() -> - lists:foreach(fun load_mod/1, emqttd:env(modules)). + lists:foreach(fun load_mod/1, gen_conf:list(emqttd, module)). -load_mod({Name, Opts}) -> +load_mod({module, Name, Opts}) -> Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)), case catch Mod:load(Opts) of ok -> lager:info("Load module ~s successfully", [Name]); @@ -162,7 +158,8 @@ load_mod({Name, Opts}) -> %% @doc Is module enabled? -spec(is_mod_enabled(Name :: atom()) -> boolean()). -is_mod_enabled(Name) -> emqttd:env(modules, Name) =/= undefined. +is_mod_enabled(Name) -> + lists:keyfind(Name, 2, gen_conf:list(emqttd, module)). %%-------------------------------------------------------------------- %% Start Listeners @@ -170,25 +167,27 @@ is_mod_enabled(Name) -> emqttd:env(modules, Name) =/= undefined. %% @doc Start Listeners of the broker. -spec(start_listeners() -> any()). -start_listeners() -> lists:foreach(fun start_listener/1, emqttd:env(listeners)). +start_listeners() -> lists:foreach(fun start_listener/1, gen_conf:list(emqttd, listener)). %% Start mqtt listener -spec(start_listener(listener()) -> any()). -start_listener({mqtt, ListenOn, Opts}) -> start_listener(mqtt, ListenOn, Opts); +start_listener({listener, mqtt, ListenOn, Opts}) -> + start_listener(mqtt, ListenOn, Opts); %% Start mqtt(SSL) listener -start_listener({mqtts, ListenOn, Opts}) -> start_listener(mqtts, ListenOn, Opts); +start_listener({listener, mqtts, ListenOn, Opts}) -> + start_listener(mqtts, ListenOn, Opts); %% Start http listener -start_listener({http, ListenOn, Opts}) -> +start_listener({listener, http, ListenOn, Opts}) -> mochiweb:start_http(http, ListenOn, Opts, {emqttd_http, handle_request, []}); %% Start https listener -start_listener({https, ListenOn, Opts}) -> +start_listener({listener, https, ListenOn, Opts}) -> mochiweb:start_http(https, ListenOn, Opts, {emqttd_http, handle_request, []}). start_listener(Protocol, ListenOn, Opts) -> - MFArgs = {emqttd_client, start_link, [emqttd:env(mqtt)]}, + MFArgs = {emqttd_client, start_link, [emqttd_conf:mqtt()]}, {ok, _} = esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs). merge_sockopts(Options) -> @@ -201,8 +200,8 @@ merge_sockopts(Options) -> %%-------------------------------------------------------------------- %% @doc Stop Listeners -stop_listeners() -> lists:foreach(fun stop_listener/1, emqttd:env(listeners)). +stop_listeners() -> lists:foreach(fun stop_listener/1, gen_conf:list(listener)). %% @private -stop_listener({Protocol, ListenOn, _Opts}) -> esockd:close(Protocol, ListenOn). +stop_listener({listener, Protocol, ListenOn, _Opts}) -> esockd:close(Protocol, ListenOn). diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl index 35b71035b..15a751ea8 100644 --- a/src/emqttd_auth_clientid.erl +++ b/src/emqttd_auth_clientid.erl @@ -69,7 +69,8 @@ init(Opts) -> {ram_copies, [node()]}, {attributes, record_info(fields, ?AUTH_CLIENTID_TAB)}]), mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies), - load(proplists:get_value(file, Opts)), + Clients = load_client_from(proplists:get_value(config, Opts)), + mnesia:transaction(fun() -> [mnesia:write(C) || C<- Clients] end), {ok, Opts}. check(#mqtt_client{client_id = undefined}, _Password, _Opts) -> @@ -93,32 +94,19 @@ description() -> "ClientId authentication module". %% Internal functions %%-------------------------------------------------------------------- -load(undefined) -> +load_client_from(undefined) -> ok; -load(File) -> - {ok, Fd} = file:open(File, [read]), - load(Fd, file:read_line(Fd), []). +load_client_from(File) -> + {ok, Clients} = file:consult(File), + [client(Client) || Client <- Clients]. -load(Fd, {ok, Line}, Clients) when is_list(Line) -> - Clients1 = - case string:tokens(Line, " ") of - [ClientIdS] -> - ClientId = list_to_binary(string:strip(ClientIdS, right, $\n)), - [#mqtt_auth_clientid{client_id = ClientId} | Clients]; - [ClientId, IpAddr0] -> - IpAddr = string:strip(IpAddr0, right, $\n), - [#mqtt_auth_clientid{client_id = list_to_binary(ClientId), - ipaddr = esockd_cidr:parse(IpAddr, true)} | Clients]; - BadLine -> - lager:error("BadLine in clients.config: ~s", [BadLine]), - Clients - end, - load(Fd, file:read_line(Fd), Clients1); +client(ClientId) when is_list(ClientId) -> + #mqtt_auth_clientid{client_id = list_to_binary(ClientId)}; -load(Fd, eof, Clients) -> - mnesia:transaction(fun() -> [mnesia:write(C) || C<- Clients] end), - file:close(Fd). +client({ClientId, IpAddr}) when is_list(ClientId) -> + #mqtt_auth_clientid{client_id = iolist_to_binary(ClientId), + ipaddr = esockd_cidr:parse(IpAddr, true)}. check_clientid_only(ClientId, IpAddr) -> case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index 3c59b7c20..6a0b1d17f 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -68,7 +68,7 @@ if_enabled(Fun) -> end. hint() -> - ?PRINT_MSG("Please enable '{username, []}' authentication in etc/emqttd.config first.~n"). + ?PRINT_MSG("Please enable '{auth, username, []}' in etc/emqttd.conf first.~n"). %%-------------------------------------------------------------------- %% API @@ -81,7 +81,13 @@ is_enabled() -> -spec(add_user(binary(), binary()) -> ok | {error, any()}). add_user(Username, Password) -> User = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)}, - ret(mnesia:transaction(fun mnesia:write/1, [User])). + ret(mnesia:transaction(fun insert_user/1, [User])). + +insert_user(User = #?AUTH_USERNAME_TAB{username = Username}) -> + case mnesia:read(?AUTH_USERNAME_TAB, Username) of + [] -> mnesia:write(User); + [_|_] -> mnesia:abort(existed) + end. add_default_user(Username, Password) when is_atom(Username) -> add_default_user(atom_to_list(Username), Password); @@ -110,16 +116,20 @@ all_users() -> mnesia:dirty_all_keys(?AUTH_USERNAME_TAB). %% emqttd_auth_mod callbacks %%-------------------------------------------------------------------- -init(DefautUsers) -> +init(Opts) -> mnesia:create_table(?AUTH_USERNAME_TAB, [ {disc_copies, [node()]}, {attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]), mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), disc_copies), - lists:foreach(fun({Username, Password}) -> - add_default_user(Username, Password) - end, DefautUsers), + case proplists:get_value(passwd, Opts) of + undefined -> ok; + File -> {ok, DefaultUsers} = file:consult(File), + lists:foreach(fun({Username, Password}) -> + add_default_user(Username, Password) + end, DefaultUsers) + end, emqttd_ctl:register_cmd(users, {?MODULE, cli}, []), - {ok, []}. + {ok, Opts}. check(#mqtt_client{username = undefined}, _Password, _Opts) -> {error, username_undefined}; @@ -127,7 +137,7 @@ check(_User, undefined, _Opts) -> {error, password_undefined}; check(#mqtt_client{username = Username}, Password, _Opts) -> case mnesia:dirty_read(?AUTH_USERNAME_TAB, Username) of - [] -> + [] -> {error, username_not_found}; [#?AUTH_USERNAME_TAB{password = <>}] -> case Hash =:= md5_hash(Salt, Password) of diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index 2d7da927c..dca66a8b6 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -47,7 +47,7 @@ start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) -> start_bridge(Node, _Topic, _Options) when Node =:= node() -> {error, bridge_to_self}; start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) -> - Options1 = emqttd_opts:merge(emqttd_broker:env(bridge), Options), + Options1 = emqttd_opts:merge(emqttd_conf:bridge(), Options), supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)). %% @doc Stop a bridge diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index d95800954..0bc27afa1 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -29,7 +29,7 @@ -export([subscribe/1, notify/2]). %% Broker API --export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]). +-export([version/0, uptime/0, datetime/0, sysdescr/0]). %% Tick API -export([start_tick/1, stop_tick/1]). @@ -71,10 +71,6 @@ subscribe(EventType) -> notify(EventType, Event) -> gproc:send({p, l, {broker, EventType}}, {notify, EventType, self(), Event}). -%% @doc Get broker env -env(Name) -> - proplists:get_value(Name, emqttd:env(broker)). - %% @doc Get broker version -spec(version() -> string()). version() -> @@ -99,7 +95,7 @@ datetime() -> %% @doc Start a tick timer start_tick(Msg) -> - start_tick(timer:seconds(env(sys_interval)), Msg). + start_tick(timer:seconds(emqttd:conf(broker_sys_interval, 60)), Msg). start_tick(0, _Msg) -> undefined; diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 6a3bee488..abca547c7 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -94,9 +94,8 @@ init([OriginConn, MqttEnv]) -> error:Error -> Self ! {shutdown, Error} end end, - PktOpts = proplists:get_value(packet, MqttEnv), - ParserFun = emqttd_parser:new(PktOpts), - ProtoState = emqttd_protocol:init(PeerName, SendFun, PktOpts), + ParserFun = emqttd_parser:new(MqttEnv), + ProtoState = emqttd_protocol:init(PeerName, SendFun, MqttEnv), RateLimit = proplists:get_value(rate_limit, Connection:opts()), State = run_socket(#client_state{connection = Connection, connname = ConnName, @@ -108,9 +107,8 @@ init([OriginConn, MqttEnv]) -> rate_limit = RateLimit, parser_fun = ParserFun, proto_state = ProtoState, - packet_opts = PktOpts}), - ClientOpts = proplists:get_value(client, MqttEnv), - IdleTimout = proplists:get_value(idle_timeout, ClientOpts, 10), + packet_opts = MqttEnv}), + IdleTimout = proplists:get_value(client_idle_timeout, MqttEnv, 30), gen_server:enter_loop(?MODULE, [], State, timer:seconds(IdleTimout)). handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> diff --git a/src/emqttd_conf.erl b/src/emqttd_conf.erl new file mode 100644 index 000000000..c4a78fd9b --- /dev/null +++ b/src/emqttd_conf.erl @@ -0,0 +1,99 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_conf). + +-export([mqtt/0, retained/0, session/0, queue/0, bridge/0, pubsub/0]). + +mqtt() -> + [ + %% Max ClientId Length Allowed. + {max_clientid_len, emqttd:conf(mqtt_max_clientid_len, 512)}, + %% Max Packet Size Allowed, 64K by default. + {max_packet_size, emqttd:conf(mqtt_max_packet_size, 65536)}, + %% Client Idle Timeout. + {client_idle_timeout, emqttd:conf(mqtt_client_idle_timeout, 30)} + ]. + +retained() -> + [ + %% Expired after seconds, never expired if 0 + {expired_after, emqttd:conf(retained_expired_after, 0)}, + %% Max number of retained messages + {max_message_num, emqttd:conf(retained_max_message_num, 100000)}, + %% Max Payload Size of retained message + {max_playload_size, emqttd:conf(retained_max_playload_size, 65536)} + ]. + +session() -> + [ + %% Max number of QoS 1 and 2 messages that can be “inflight” at one time. + %% 0 means no limit + {max_inflight, emqttd:conf(session_max_inflight, 100)}, + + %% Retry interval for redelivering QoS1/2 messages. + {unack_retry_interval, emqttd:conf(session_unack_retry_interval, 60)}, + + %% Awaiting PUBREL Timeout + {await_rel_timeout, emqttd:conf(session_await_rel_timeout, 20)}, + + %% Max Packets that Awaiting PUBREL, 0 means no limit + {max_awaiting_rel, emqttd:conf(session_max_awaiting_rel, 0)}, + + %% Statistics Collection Interval(seconds) + {collect_interval, emqttd:conf(session_collect_interval, 0)}, + + %% Expired after 2 day (unit: minute) + {expired_after, emqttd:conf(session_expired_after, 2880)} + ]. + +queue() -> + [ + %% Type: simple | priority + {type, emqttd:conf(queue_type, simple)}, + + %% Topic Priority: 0~255, Default is 0 + {priority, emqttd:conf(queue_priority, [])}, + + %% Max queue length. Enqueued messages when persistent client disconnected, + %% or inflight window is full. + {max_length, emqttd:conf(queue_max_length, infinity)}, + + %% Low-water mark of queued messages + {low_watermark, emqttd:conf(queue_low_watermark, 0.2)}, + + %% High-water mark of queued messages + {high_watermark, emqttd:conf(queue_high_watermark, 0.6)}, + + %% Queue Qos0 messages? + {queue_qos0, emqttd:conf(queue_qos0, true)} + ]. + +bridge() -> + [ + %% TODO: Bridge Queue Size + {max_queue_len, emqttd:conf(bridge_max_queue_len, 10000)}, + + %% Ping Interval of bridge node + {ping_down_interval, emqttd:conf(bridge_ping_down_interval, 1)} + ]. + +pubsub() -> + [ + %% PubSub and Router. Default should be scheduler numbers. + {pool_size, emqttd:conf(pubsub_pool_size, 8)} + ]. + diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index 9109d2155..d23654bdc 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -30,12 +30,15 @@ %%-------------------------------------------------------------------- load(Opts) -> - File = proplists:get_value(file, Opts), - {ok, Terms} = file:consult(File), - Sections = compile(Terms), - emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Sections]), - emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Sections]), - emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections]). + case proplists:get_value(config, Opts) of + undefined -> + ok; + File -> + {ok, Terms} = file:consult(File), Sections = compile(Terms), + emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Sections]), + emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Sections]), + emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections]) + end. rewrite_subscribe(_ClientId, TopicTable, Sections) -> lager:info("Rewrite subscribe: ~p", [TopicTable]), diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 28ff8033b..1677cec8d 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -30,10 +30,10 @@ -export([init/1]). start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_broker:env(pubsub)]). + supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:pubsub()]). pubsub_pool() -> - hd([Pid|| {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]). + hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]). init([Env]) -> diff --git a/src/emqttd_retainer.erl b/src/emqttd_retainer.erl index e0bb631ba..0b239bec5 100644 --- a/src/emqttd_retainer.erl +++ b/src/emqttd_retainer.erl @@ -71,7 +71,7 @@ limit(payload) -> env(max_playload_size). env(Key) -> case get({retained, Key}) of undefined -> - Env = emqttd_broker:env(retained), + Env = emqttd_conf:retained(), Val = proplists:get_value(Key, Env), put({retained, Key}, Val), Val; Val -> diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index ac7af465b..6e0586b94 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -214,8 +214,7 @@ unsubscribe(SessPid, Topics) -> init([CleanSess, ClientId, ClientPid]) -> process_flag(trap_exit, true), true = link(ClientPid), - QEnv = emqttd:env(mqtt, queue), - SessEnv = emqttd:env(mqtt, session), + SessEnv = emqttd_conf:session(), Session = #session{ clean_sess = CleanSess, client_id = ClientId, @@ -223,7 +222,7 @@ init([CleanSess, ClientId, ClientPid]) -> subscriptions = dict:new(), inflight_queue = [], max_inflight = get_value(max_inflight, SessEnv, 0), - message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), + message_queue = emqttd_mqueue:new(ClientId, emqttd_conf:queue(), emqttd_alarm:alarm_fun()), awaiting_rel = #{}, awaiting_ack = #{}, awaiting_comp = #{}, @@ -234,7 +233,7 @@ init([CleanSess, ClientId, ClientPid]) -> collect_interval = get_value(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)), - %% start statistics + %% Start statistics {ok, start_collector(Session), hibernate}. prioritise_call(Msg, _From, _Len, _State) -> diff --git a/src/emqttd_sysmon_sup.erl b/src/emqttd_sysmon_sup.erl index 883dab20e..3ed8e36a5 100644 --- a/src/emqttd_sysmon_sup.erl +++ b/src/emqttd_sysmon_sup.erl @@ -33,12 +33,10 @@ init([]) -> {ok, {{one_for_one, 10, 100}, [Sysmon]}}. opts() -> - Opts = [{long_gc, config(sysmon_long_gc)}, - {long_schedule, config(sysmon_long_schedule)}, - {large_heap, config(sysmon_large_heap)}, - {busy_port, config(busy_port)}, - {busy_dist_port, config(sysmon_busy_dist_port)}], + Opts = [{long_gc, emqttd:conf(sysmon_long_gc)}, + {long_schedule, emqttd:conf(sysmon_long_schedule)}, + {large_heap, emqttd:conf(sysmon_large_heap)}, + {busy_port, emqttd:conf(busy_port)}, + {busy_dist_port, emqttd:conf(sysmon_busy_dist_port)}], [{Key, Val} || {Key, {ok, Val}} <- Opts]. -config(Key) -> gen_conf:value(emqttd, Key). - diff --git a/src/emqttd_ws.erl b/src/emqttd_ws.erl index 60371457f..56a4d92d9 100644 --- a/src/emqttd_ws.erl +++ b/src/emqttd_ws.erl @@ -31,7 +31,7 @@ %% @doc Handle WebSocket Request. handle_request(Req) -> Peer = Req:get(peer), - PktOpts = emqttd:env(mqtt, packet), + PktOpts = emqttd_conf:mqtt(), ParserFun = emqttd_parser:new(PktOpts), {ReentryWs, ReplyChannel} = upgrade(Req), {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel), diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index ff4c16e79..6803c668c 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -66,17 +66,15 @@ init([MqttEnv, WsPid, Req, ReplyChannel]) -> {ok, Peername} = Req:get(peername), Headers = mochiweb_headers:to_list( mochiweb_request:get(headers, Req)), - PktOpts = proplists:get_value(packet, MqttEnv), SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, ProtoState = emqttd_protocol:init(Peername, SendFun, - [{ws_initial_headers, Headers} | PktOpts]), + [{ws_initial_headers, Headers} | MqttEnv]), {ok, #wsclient_state{ws_pid = WsPid, peer = Req:get(peer), connection = Req:get(connection), proto_state = ProtoState}, idle_timeout(MqttEnv)}. idle_timeout(MqttEnv) -> - ClientOpts = proplists:get_value(client, MqttEnv), - timer:seconds(proplists:get_value(idle_timeout, ClientOpts, 10)). + timer:seconds(proplists:get_value(client_idle_timeout, MqttEnv, 10)). handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> {reply, emqttd_protocol:session(ProtoState), State}; diff --git a/src/emqttd_ws_client_sup.erl b/src/emqttd_ws_client_sup.erl index 3577527a6..33983fd8c 100644 --- a/src/emqttd_ws_client_sup.erl +++ b/src/emqttd_ws_client_sup.erl @@ -27,7 +27,7 @@ %% @doc Start websocket client supervisor -spec(start_link() -> {ok, pid()}). start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd:env(mqtt)]). + supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:mqtt()]). %% @doc Start a WebSocket Client -spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}).