diff --git a/.gitmodules b/.gitmodules index dc7660343..4d56b5203 100644 --- a/.gitmodules +++ b/.gitmodules @@ -25,3 +25,9 @@ [submodule "plugins/emqttd_reloader"] path = plugins/emqttd_reloader url = https://github.com/emqtt/emqttd_reloader.git +[submodule "plugins/emqttd_plugin_mongo"] + path = plugins/emqttd_plugin_mongo + url = https://github.com/emqtt/emqttd_plugin_mongo.git +[submodule "plugins/emqttd_auth_http"] + path = plugins/emqttd_auth_http + url = https://github.com/emqtt/emqttd_auth_http.git diff --git a/plugins/emqttd_auth_http b/plugins/emqttd_auth_http new file mode 160000 index 000000000..4711caa70 --- /dev/null +++ b/plugins/emqttd_auth_http @@ -0,0 +1 @@ +Subproject commit 4711caa703f84775ff0a1256587788430494eecd diff --git a/plugins/emqttd_dashboard b/plugins/emqttd_dashboard index db9ed84da..f841ab9b4 160000 --- a/plugins/emqttd_dashboard +++ b/plugins/emqttd_dashboard @@ -1 +1 @@ -Subproject commit db9ed84da2c5f578f5762a6a69f715b7edafa02f +Subproject commit f841ab9b4ca40f47575b5f98c6707c6598ef0f91 diff --git a/plugins/emqttd_plugin_mongo b/plugins/emqttd_plugin_mongo new file mode 160000 index 000000000..168a70890 --- /dev/null +++ b/plugins/emqttd_plugin_mongo @@ -0,0 +1 @@ +Subproject commit 168a70890b6c204e0629fc915a6da4b0247c326e diff --git a/plugins/emqttd_plugin_mysql b/plugins/emqttd_plugin_mysql index dfc35e696..ff967b404 160000 --- a/plugins/emqttd_plugin_mysql +++ b/plugins/emqttd_plugin_mysql @@ -1 +1 @@ -Subproject commit dfc35e6960a5ddbba8619cf92e72bf14dda4a54c +Subproject commit ff967b4046ee7d4e568ebbdb248289bd3724b1f6 diff --git a/plugins/emqttd_plugin_pgsql b/plugins/emqttd_plugin_pgsql index c81f66331..6ec5fb063 160000 --- a/plugins/emqttd_plugin_pgsql +++ b/plugins/emqttd_plugin_pgsql @@ -1 +1 @@ -Subproject commit c81f663312ae371808fe10a5997ea78c3033eae7 +Subproject commit 6ec5fb063f5070766f6a1f911b3c5771890bb878 diff --git a/plugins/emqttd_plugin_redis b/plugins/emqttd_plugin_redis index fa2b98ffa..fc342cc91 160000 --- a/plugins/emqttd_plugin_redis +++ b/plugins/emqttd_plugin_redis @@ -1 +1 @@ -Subproject commit fa2b98ffa808242850fe118660ab77be3b6ea3ba +Subproject commit fc342cc9164c91f6a64e7de2f9f1b0604d767159 diff --git a/plugins/emqttd_plugin_template b/plugins/emqttd_plugin_template index 653e37d8e..418bb1d43 160000 --- a/plugins/emqttd_plugin_template +++ b/plugins/emqttd_plugin_template @@ -1 +1 @@ -Subproject commit 653e37d8e472b121c454fa2acd51f696ff144bd5 +Subproject commit 418bb1d4385a0fd21a57918c2b2b4fdcc9751c1c diff --git a/plugins/emqttd_recon b/plugins/emqttd_recon index eb4a03d90..f946d5ff1 160000 --- a/plugins/emqttd_recon +++ b/plugins/emqttd_recon @@ -1 +1 @@ -Subproject commit eb4a03d90f932839bdad9aa5703de7731169ce66 +Subproject commit f946d5ff188a0668734fdc850149bb3d5cf1e798 diff --git a/plugins/emqttd_reloader b/plugins/emqttd_reloader index 4577550a3..1cc503573 160000 --- a/plugins/emqttd_reloader +++ b/plugins/emqttd_reloader @@ -1 +1 @@ -Subproject commit 4577550a309dad9907205d9cb4fe1277d75ec142 +Subproject commit 1cc503573439da213b435bc1c5d2dc7eb3891052 diff --git a/plugins/emqttd_stomp b/plugins/emqttd_stomp index 39650c468..4bd649c13 160000 --- a/plugins/emqttd_stomp +++ b/plugins/emqttd_stomp @@ -1 +1 @@ -Subproject commit 39650c4685a38cecbdac785c3aafcd4c669aa301 +Subproject commit 4bd649c137defacacbaba685c3adf042e3f70700 diff --git a/rebar.config b/rebar.config index f41d40129..cc8ab8f54 100644 --- a/rebar.config +++ b/rebar.config @@ -41,8 +41,8 @@ {deps, [ {gproc, ".*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}}, {lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}}, - {esockd, ".*", {git, "git://github.com/emqtt/esockd.git", {tag, "3.2"}}}, - {mochiweb, "4.*", {git, "git://github.com/emqtt/mochiweb.git", {tag, "4.0.1"}}} + {esockd, ".*", {git, "git://github.com/emqtt/esockd.git", {tag, "4.0"}}}, + {mochiweb, "4.*", {git, "git://github.com/emqtt/mochiweb.git", {tag, "4.1"}}} ]}. {recursive_cmds, [ct, eunit, clean]}. diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index df0a0da6e..2748856bb 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -16,7 +16,7 @@ {error_logger_redirect, false}, {crash_log, "log/emqttd_crash.log"}, {handlers, [ - %%{lager_console_backend, info}, + {lager_console_backend, error}, %%NOTICE: Level >= error %%{lager_emqtt_backend, error}, {lager_file_backend, [ diff --git a/rel/reltool.config b/rel/reltool.config index 411543559..c79fa74cd 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -10,21 +10,22 @@ stdlib, sasl, asn1, - syntax_tools, - ssl, - crypto, + syntax_tools, + ssl, + crypto, eldap, xmerl, - os_mon, - inets, - goldrush, + os_mon, + inets, + goldrush, compiler, runtime_tools, - lager, + {observer, load}, + lager, gen_logger, gproc, - esockd, - mochiweb, + esockd, + mochiweb, emqttd ]}, {rel, "start_clean", "", @@ -56,6 +57,7 @@ {app, inets, [{incl_cond, include}]}, {app, compiler, [{incl_cond, include}]}, {app, runtime_tools, [{incl_cond, include}]}, + {app, observer, [{incl_cond, include}]}, {app, goldrush, [{incl_cond, include}]}, {app, gen_logger, [{incl_cond, include}]}, {app, lager, [{incl_cond, include}]}, diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 647c7d49d..5485a8d46 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "1.0.2"}, + {vsn, "1.1"}, {id, "emqttd"}, {modules, []}, {registered, []}, diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index 8fa96a81f..47d6417d4 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -89,7 +89,7 @@ check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> end. %% @doc Reload ACL Rules. --spec(reload_acl() -> list(ok | {error, any()})). +-spec(reload_acl() -> list(ok | {error, already_existed})). reload_acl() -> [Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)]. @@ -201,5 +201,5 @@ mod(Prefix, Name) -> list_to_atom(lists:concat([Prefix, Name])). if_existed(false, Fun) -> Fun(); -if_existed(true, _Fun) -> {error, existed}. +if_existed(_Mod, _Fun) -> {error, already_existed}. diff --git a/src/emqttd_access_rule.erl b/src/emqttd_access_rule.erl index 0c632de9b..6b0e08d75 100644 --- a/src/emqttd_access_rule.erl +++ b/src/emqttd_access_rule.erl @@ -19,7 +19,7 @@ -include("emqttd.hrl"). -type who() :: all | binary() | - {ipaddr, esockd_access:cidr()} | + {ipaddr, esockd_cidr:cidr_string()} | {client, binary()} | {user, binary()}. @@ -51,8 +51,7 @@ compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A) -> compile(who, all) -> all; compile(who, {ipaddr, CIDR}) -> - {Start, End} = esockd_access:range(CIDR), - {ipaddr, {CIDR, Start, End}}; + {ipaddr, esockd_cidr:parse(CIDR, true)}; compile(who, {client, all}) -> {client, all}; compile(who, {client, ClientId}) -> @@ -107,9 +106,8 @@ match_who(#mqtt_client{username = Username}, {user, Username}) -> true; match_who(#mqtt_client{peername = undefined}, {ipaddr, _Tup}) -> false; -match_who(#mqtt_client{peername = {IP, _}}, {ipaddr, {_CDIR, Start, End}}) -> - I = esockd_access:atoi(IP), - I >= Start andalso I =< End; +match_who(#mqtt_client{peername = {IP, _}}, {ipaddr, CIDR}) -> + esockd_cidr:match(IP, CIDR); match_who(Client, {'and', Conds}) when is_list(Conds) -> lists:foldl(fun(Who, Allow) -> match_who(Client, Who) andalso Allow diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 0086e06f0..42cc35daa 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -33,7 +33,7 @@ {backlog, 512}, {nodelay, true}]). --type listener() :: {atom(), inet:port_number(), [esockd:option()]}. +-type listener() :: {atom(), esockd:listen_on(), [esockd:option()]}. %%-------------------------------------------------------------------- %% Application callbacks @@ -172,22 +172,22 @@ start_listeners() -> lists:foreach(fun start_listener/1, emqttd:env(listeners)). %% Start mqtt listener -spec(start_listener(listener()) -> any()). -start_listener({mqtt, Port, Opts}) -> start_listener(mqtt, Port, Opts); +start_listener({mqtt, ListenOn, Opts}) -> start_listener(mqtt, ListenOn, Opts); %% Start mqtt(SSL) listener -start_listener({mqtts, Port, Opts}) -> start_listener(mqtts, Port, Opts); +start_listener({mqtts, ListenOn, Opts}) -> start_listener(mqtts, ListenOn, Opts); %% Start http listener -start_listener({http, Port, Opts}) -> - mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []}); +start_listener({http, ListenOn, Opts}) -> + mochiweb:start_http(http, ListenOn, Opts, {emqttd_http, handle_request, []}); %% Start https listener -start_listener({https, Port, Opts}) -> - mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []}). +start_listener({https, ListenOn, Opts}) -> + mochiweb:start_http(https, ListenOn, Opts, {emqttd_http, handle_request, []}). -start_listener(Protocol, Port, Opts) -> +start_listener(Protocol, ListenOn, Opts) -> MFArgs = {emqttd_client, start_link, [emqttd:env(mqtt)]}, - esockd:open(Protocol, Port, merge_sockopts(Opts), MFArgs). + esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs). merge_sockopts(Options) -> SockOpts = emqttd_opts:merge(?MQTT_SOCKOPTS, @@ -201,5 +201,6 @@ merge_sockopts(Options) -> %% @doc Stop Listeners stop_listeners() -> lists:foreach(fun stop_listener/1, emqttd:env(listeners)). -stop_listener({Protocol, Port, _Opts}) -> esockd:close({Protocol, Port}). +%% @private +stop_listener({Protocol, ListenOn, _Opts}) -> esockd:close(Protocol, ListenOn). diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl index 45b81c0f6..35b71035b 100644 --- a/src/emqttd_auth_clientid.erl +++ b/src/emqttd_auth_clientid.erl @@ -108,9 +108,8 @@ load(Fd, {ok, Line}, Clients) when is_list(Line) -> [#mqtt_auth_clientid{client_id = ClientId} | Clients]; [ClientId, IpAddr0] -> IpAddr = string:strip(IpAddr0, right, $\n), - Range = esockd_access:range(IpAddr), [#mqtt_auth_clientid{client_id = list_to_binary(ClientId), - ipaddr = {IpAddr, Range}}|Clients]; + ipaddr = esockd_cidr:parse(IpAddr, true)} | Clients]; BadLine -> lager:error("BadLine in clients.config: ~s", [BadLine]), Clients @@ -123,11 +122,12 @@ load(Fd, eof, Clients) -> check_clientid_only(ClientId, IpAddr) -> case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of - [] -> {error, clientid_not_found}; - [#?AUTH_CLIENTID_TAB{ipaddr = undefined}] -> ok; - [#?AUTH_CLIENTID_TAB{ipaddr = {_, {Start, End}}}] -> - I = esockd_access:atoi(IpAddr), - case I >= Start andalso I =< End of + [] -> + {error, clientid_not_found}; + [#?AUTH_CLIENTID_TAB{ipaddr = undefined}] -> + ok; + [#?AUTH_CLIENTID_TAB{ipaddr = CIDR}] -> + case esockd_cidr:match(IpAddr, CIDR) of true -> ok; false -> {error, wrong_ipaddr} end diff --git a/src/emqttd_base62.erl b/src/emqttd_base62.erl new file mode 100644 index 000000000..1e9d0a1a2 --- /dev/null +++ b/src/emqttd_base62.erl @@ -0,0 +1,58 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_base62). + +-export([encode/1, decode/1]). + +%% @doc Encode an integer to base62 string +-spec(encode(non_neg_integer()) -> binary()). +encode(I) when is_integer(I) andalso I > 0 -> + list_to_binary(encode(I, [])). + +encode(I, Acc) when I < 62 -> + [char(I) | Acc]; +encode(I, Acc) -> + encode(I div 62, [char(I rem 62) | Acc]). + +char(I) when I < 10 -> + $0 + I; + +char(I) when I < 36 -> + $A + I - 10; + +char(I) when I < 62 -> + $a + I - 36. + +%% @doc Decode base62 string to an integer +-spec(decode(string() | binary()) -> integer()). +decode(B) when is_binary(B) -> + decode(binary_to_list(B)); +decode(S) when is_list(S) -> + decode(S, 0). + +decode([], I) -> + I; +decode([C|S], I) -> + decode(S, I * 62 + byte(C)). + +byte(C) when $0 =< C andalso C =< $9 -> + C - $0; +byte(C) when $A =< C andalso C =< $Z -> + C - $A + 10; +byte(C) when $a =< C andalso C =< $z -> + C - $a + 36. + diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index bcc68e5cf..a64f086ed 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -453,12 +453,12 @@ trace_off(Who, Name) -> %%-------------------------------------------------------------------- %% @doc Listeners Command listeners([]) -> - foreach(fun({{Protocol, Port}, Pid}) -> + foreach(fun({{Protocol, ListenOn}, Pid}) -> Info = [{acceptors, esockd:get_acceptors(Pid)}, {max_clients, esockd:get_max_clients(Pid)}, {current_clients,esockd:get_current_clients(Pid)}, {shutdown_count, esockd:get_shutdown_count(Pid)}], - ?PRINT("listener on ~s:~w~n", [Protocol, Port]), + ?PRINT("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]), foreach(fun({Key, Val}) -> ?PRINT(" ~-16s: ~w~n", [Key, Val]) end, Info) diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index e3e893a58..05aed7b42 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -117,8 +117,7 @@ call(SM, Req) -> init([Pool, Id]) -> ?GPROC_POOL(join, Pool, Id), - {ok, #state{pool = Pool, id = Id, - monitors = dict:new()}}. + {ok, #state{pool = Pool, id = Id, monitors = dict:new()}}. prioritise_call(_Msg, _From, _Len, _State) -> 1. @@ -175,7 +174,7 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> [_Sess] -> ok end end), - {noreply, erase_monitor(MRef, State)}; + {noreply, erase_monitor(MRef, State), hibernate}; error -> lager:error("MRef of session ~p not found", [DownPid]), {noreply, State} diff --git a/test/emqttd_access_SUITE.erl b/test/emqttd_access_SUITE.erl index c12fd00bd..8a8d05766 100644 --- a/test/emqttd_access_SUITE.erl +++ b/test/emqttd_access_SUITE.erl @@ -78,6 +78,7 @@ reload_acl(_) -> register_mod(_) -> ok = ?AC:register_mod(acl, emqttd_acl_test_mod, []), + {error, already_existed} = ?AC:register_mod(acl, emqttd_acl_test_mod, []), [{emqttd_acl_test_mod, _, 0}, {emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl), ok = ?AC:register_mod(auth, emqttd_auth_anonymous_test_mod,[]), @@ -117,14 +118,14 @@ check_acl(_) -> compile_rule(_) -> - {allow, {'and', [{ipaddr, {"127.0.0.1", _I, _I}}, + {allow, {'and', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, {user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}), - {allow, {'or', [{ipaddr, {"127.0.0.1", _I, _I}}, + {allow, {'or', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, {user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}), - {allow, {ipaddr, {"127.0.0.1", _I, _I}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = + {allow, {ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}), {allow, {user, <<"testuser">>}, subscribe, [ [<<"a">>, <<"b">>, <<"c">>], [<<"d">>, <<"e">>, <<"f">>, '#'] ]} = compile({allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}), diff --git a/test/emqttd_lib_SUITE.erl b/test/emqttd_lib_SUITE.erl index 33a5e3547..1a3b1aef6 100644 --- a/test/emqttd_lib_SUITE.erl +++ b/test/emqttd_lib_SUITE.erl @@ -28,9 +28,11 @@ -define(PQ, priority_queue). +-define(BASE62, emqttd_base62). + all() -> [{group, guid}, {group, opts}, {group, ?PQ}, {group, time}, - {group, node}]. + {group, node}, {group, base62}]. groups() -> [{guid, [], [guid_gen]}, @@ -38,7 +40,8 @@ groups() -> {?PQ, [], [priority_queue_plen, priority_queue_out2]}, {time, [], [time_now_to_]}, - {node, [], [node_is_aliving, node_parse_name]}]. + {node, [], [node_is_aliving, node_parse_name]}, + {base62, [], [base62_encode]}]. %%-------------------------------------------------------------------- %% emqttd_guid @@ -144,3 +147,17 @@ node_parse_name(_) -> 'a@127.0.0.1' = emqttd_node:parse_name("a@127.0.0.1"), 'b@127.0.0.1' = emqttd_node:parse_name("b"). +%%-------------------------------------------------------------------- +%% base62 encode decode +%%-------------------------------------------------------------------- + +base62_encode(_) -> + 10 = ?BASE62:decode(?BASE62:encode(10)), + 100 = ?BASE62:decode(?BASE62:encode(100)), + 9999 = ?BASE62:decode(?BASE62:encode(9999)), + 65535 = ?BASE62:decode(?BASE62:encode(65535)), + <> = emqttd_guid:gen(), + <> = emqttd_guid:gen(), + X = ?BASE62:decode(?BASE62:encode(X)), + Y = ?BASE62:decode(?BASE62:encode(Y)). +