Merge pull request #574 from emqtt/emq10

1.1 - Support IPv6 and add more plugins
This commit is contained in:
Feng Lee 2016-05-31 21:24:11 +08:00
commit cd77f42cbf
24 changed files with 140 additions and 56 deletions

6
.gitmodules vendored
View File

@ -25,3 +25,9 @@
[submodule "plugins/emqttd_reloader"] [submodule "plugins/emqttd_reloader"]
path = plugins/emqttd_reloader path = plugins/emqttd_reloader
url = https://github.com/emqtt/emqttd_reloader.git url = https://github.com/emqtt/emqttd_reloader.git
[submodule "plugins/emqttd_plugin_mongo"]
path = plugins/emqttd_plugin_mongo
url = https://github.com/emqtt/emqttd_plugin_mongo.git
[submodule "plugins/emqttd_auth_http"]
path = plugins/emqttd_auth_http
url = https://github.com/emqtt/emqttd_auth_http.git

@ -0,0 +1 @@
Subproject commit 4711caa703f84775ff0a1256587788430494eecd

@ -1 +1 @@
Subproject commit db9ed84da2c5f578f5762a6a69f715b7edafa02f Subproject commit f841ab9b4ca40f47575b5f98c6707c6598ef0f91

@ -0,0 +1 @@
Subproject commit 168a70890b6c204e0629fc915a6da4b0247c326e

@ -1 +1 @@
Subproject commit dfc35e6960a5ddbba8619cf92e72bf14dda4a54c Subproject commit ff967b4046ee7d4e568ebbdb248289bd3724b1f6

@ -1 +1 @@
Subproject commit c81f663312ae371808fe10a5997ea78c3033eae7 Subproject commit 6ec5fb063f5070766f6a1f911b3c5771890bb878

@ -1 +1 @@
Subproject commit fa2b98ffa808242850fe118660ab77be3b6ea3ba Subproject commit fc342cc9164c91f6a64e7de2f9f1b0604d767159

@ -1 +1 @@
Subproject commit 653e37d8e472b121c454fa2acd51f696ff144bd5 Subproject commit 418bb1d4385a0fd21a57918c2b2b4fdcc9751c1c

@ -1 +1 @@
Subproject commit eb4a03d90f932839bdad9aa5703de7731169ce66 Subproject commit f946d5ff188a0668734fdc850149bb3d5cf1e798

@ -1 +1 @@
Subproject commit 4577550a309dad9907205d9cb4fe1277d75ec142 Subproject commit 1cc503573439da213b435bc1c5d2dc7eb3891052

@ -1 +1 @@
Subproject commit 39650c4685a38cecbdac785c3aafcd4c669aa301 Subproject commit 4bd649c137defacacbaba685c3adf042e3f70700

View File

@ -41,8 +41,8 @@
{deps, [ {deps, [
{gproc, ".*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}}, {gproc, ".*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}},
{lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}}, {lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}},
{esockd, ".*", {git, "git://github.com/emqtt/esockd.git", {tag, "3.2"}}}, {esockd, ".*", {git, "git://github.com/emqtt/esockd.git", {tag, "4.0"}}},
{mochiweb, "4.*", {git, "git://github.com/emqtt/mochiweb.git", {tag, "4.0.1"}}} {mochiweb, "4.*", {git, "git://github.com/emqtt/mochiweb.git", {tag, "4.1"}}}
]}. ]}.
{recursive_cmds, [ct, eunit, clean]}. {recursive_cmds, [ct, eunit, clean]}.

View File

@ -16,7 +16,7 @@
{error_logger_redirect, false}, {error_logger_redirect, false},
{crash_log, "log/emqttd_crash.log"}, {crash_log, "log/emqttd_crash.log"},
{handlers, [ {handlers, [
%%{lager_console_backend, info}, {lager_console_backend, error},
%%NOTICE: Level >= error %%NOTICE: Level >= error
%%{lager_emqtt_backend, error}, %%{lager_emqtt_backend, error},
{lager_file_backend, [ {lager_file_backend, [

View File

@ -20,6 +20,7 @@
goldrush, goldrush,
compiler, compiler,
runtime_tools, runtime_tools,
{observer, load},
lager, lager,
gen_logger, gen_logger,
gproc, gproc,
@ -56,6 +57,7 @@
{app, inets, [{incl_cond, include}]}, {app, inets, [{incl_cond, include}]},
{app, compiler, [{incl_cond, include}]}, {app, compiler, [{incl_cond, include}]},
{app, runtime_tools, [{incl_cond, include}]}, {app, runtime_tools, [{incl_cond, include}]},
{app, observer, [{incl_cond, include}]},
{app, goldrush, [{incl_cond, include}]}, {app, goldrush, [{incl_cond, include}]},
{app, gen_logger, [{incl_cond, include}]}, {app, gen_logger, [{incl_cond, include}]},
{app, lager, [{incl_cond, include}]}, {app, lager, [{incl_cond, include}]},

View File

@ -1,7 +1,7 @@
{application, emqttd, {application, emqttd,
[ [
{description, "Erlang MQTT Broker"}, {description, "Erlang MQTT Broker"},
{vsn, "1.0.2"}, {vsn, "1.1"},
{id, "emqttd"}, {id, "emqttd"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},

View File

@ -89,7 +89,7 @@ check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
end. end.
%% @doc Reload ACL Rules. %% @doc Reload ACL Rules.
-spec(reload_acl() -> list(ok | {error, any()})). -spec(reload_acl() -> list(ok | {error, already_existed})).
reload_acl() -> reload_acl() ->
[Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)]. [Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)].
@ -201,5 +201,5 @@ mod(Prefix, Name) ->
list_to_atom(lists:concat([Prefix, Name])). list_to_atom(lists:concat([Prefix, Name])).
if_existed(false, Fun) -> Fun(); if_existed(false, Fun) -> Fun();
if_existed(true, _Fun) -> {error, existed}. if_existed(_Mod, _Fun) -> {error, already_existed}.

View File

@ -19,7 +19,7 @@
-include("emqttd.hrl"). -include("emqttd.hrl").
-type who() :: all | binary() | -type who() :: all | binary() |
{ipaddr, esockd_access:cidr()} | {ipaddr, esockd_cidr:cidr_string()} |
{client, binary()} | {client, binary()} |
{user, binary()}. {user, binary()}.
@ -51,8 +51,7 @@ compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A) ->
compile(who, all) -> compile(who, all) ->
all; all;
compile(who, {ipaddr, CIDR}) -> compile(who, {ipaddr, CIDR}) ->
{Start, End} = esockd_access:range(CIDR), {ipaddr, esockd_cidr:parse(CIDR, true)};
{ipaddr, {CIDR, Start, End}};
compile(who, {client, all}) -> compile(who, {client, all}) ->
{client, all}; {client, all};
compile(who, {client, ClientId}) -> compile(who, {client, ClientId}) ->
@ -107,9 +106,8 @@ match_who(#mqtt_client{username = Username}, {user, Username}) ->
true; true;
match_who(#mqtt_client{peername = undefined}, {ipaddr, _Tup}) -> match_who(#mqtt_client{peername = undefined}, {ipaddr, _Tup}) ->
false; false;
match_who(#mqtt_client{peername = {IP, _}}, {ipaddr, {_CDIR, Start, End}}) -> match_who(#mqtt_client{peername = {IP, _}}, {ipaddr, CIDR}) ->
I = esockd_access:atoi(IP), esockd_cidr:match(IP, CIDR);
I >= Start andalso I =< End;
match_who(Client, {'and', Conds}) when is_list(Conds) -> match_who(Client, {'and', Conds}) when is_list(Conds) ->
lists:foldl(fun(Who, Allow) -> lists:foldl(fun(Who, Allow) ->
match_who(Client, Who) andalso Allow match_who(Client, Who) andalso Allow

View File

@ -33,7 +33,7 @@
{backlog, 512}, {backlog, 512},
{nodelay, true}]). {nodelay, true}]).
-type listener() :: {atom(), inet:port_number(), [esockd:option()]}. -type listener() :: {atom(), esockd:listen_on(), [esockd:option()]}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Application callbacks %% Application callbacks
@ -172,22 +172,22 @@ start_listeners() -> lists:foreach(fun start_listener/1, emqttd:env(listeners)).
%% Start mqtt listener %% Start mqtt listener
-spec(start_listener(listener()) -> any()). -spec(start_listener(listener()) -> any()).
start_listener({mqtt, Port, Opts}) -> start_listener(mqtt, Port, Opts); start_listener({mqtt, ListenOn, Opts}) -> start_listener(mqtt, ListenOn, Opts);
%% Start mqtt(SSL) listener %% Start mqtt(SSL) listener
start_listener({mqtts, Port, Opts}) -> start_listener(mqtts, Port, Opts); start_listener({mqtts, ListenOn, Opts}) -> start_listener(mqtts, ListenOn, Opts);
%% Start http listener %% Start http listener
start_listener({http, Port, Opts}) -> start_listener({http, ListenOn, Opts}) ->
mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []}); mochiweb:start_http(http, ListenOn, Opts, {emqttd_http, handle_request, []});
%% Start https listener %% Start https listener
start_listener({https, Port, Opts}) -> start_listener({https, ListenOn, Opts}) ->
mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []}). mochiweb:start_http(https, ListenOn, Opts, {emqttd_http, handle_request, []}).
start_listener(Protocol, Port, Opts) -> start_listener(Protocol, ListenOn, Opts) ->
MFArgs = {emqttd_client, start_link, [emqttd:env(mqtt)]}, MFArgs = {emqttd_client, start_link, [emqttd:env(mqtt)]},
esockd:open(Protocol, Port, merge_sockopts(Opts), MFArgs). esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs).
merge_sockopts(Options) -> merge_sockopts(Options) ->
SockOpts = emqttd_opts:merge(?MQTT_SOCKOPTS, SockOpts = emqttd_opts:merge(?MQTT_SOCKOPTS,
@ -201,5 +201,6 @@ merge_sockopts(Options) ->
%% @doc Stop Listeners %% @doc Stop Listeners
stop_listeners() -> lists:foreach(fun stop_listener/1, emqttd:env(listeners)). stop_listeners() -> lists:foreach(fun stop_listener/1, emqttd:env(listeners)).
stop_listener({Protocol, Port, _Opts}) -> esockd:close({Protocol, Port}). %% @private
stop_listener({Protocol, ListenOn, _Opts}) -> esockd:close(Protocol, ListenOn).

View File

@ -108,9 +108,8 @@ load(Fd, {ok, Line}, Clients) when is_list(Line) ->
[#mqtt_auth_clientid{client_id = ClientId} | Clients]; [#mqtt_auth_clientid{client_id = ClientId} | Clients];
[ClientId, IpAddr0] -> [ClientId, IpAddr0] ->
IpAddr = string:strip(IpAddr0, right, $\n), IpAddr = string:strip(IpAddr0, right, $\n),
Range = esockd_access:range(IpAddr),
[#mqtt_auth_clientid{client_id = list_to_binary(ClientId), [#mqtt_auth_clientid{client_id = list_to_binary(ClientId),
ipaddr = {IpAddr, Range}}|Clients]; ipaddr = esockd_cidr:parse(IpAddr, true)} | Clients];
BadLine -> BadLine ->
lager:error("BadLine in clients.config: ~s", [BadLine]), lager:error("BadLine in clients.config: ~s", [BadLine]),
Clients Clients
@ -123,11 +122,12 @@ load(Fd, eof, Clients) ->
check_clientid_only(ClientId, IpAddr) -> check_clientid_only(ClientId, IpAddr) ->
case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of
[] -> {error, clientid_not_found}; [] ->
[#?AUTH_CLIENTID_TAB{ipaddr = undefined}] -> ok; {error, clientid_not_found};
[#?AUTH_CLIENTID_TAB{ipaddr = {_, {Start, End}}}] -> [#?AUTH_CLIENTID_TAB{ipaddr = undefined}] ->
I = esockd_access:atoi(IpAddr), ok;
case I >= Start andalso I =< End of [#?AUTH_CLIENTID_TAB{ipaddr = CIDR}] ->
case esockd_cidr:match(IpAddr, CIDR) of
true -> ok; true -> ok;
false -> {error, wrong_ipaddr} false -> {error, wrong_ipaddr}
end end

58
src/emqttd_base62.erl Normal file
View File

@ -0,0 +1,58 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_base62).
-export([encode/1, decode/1]).
%% @doc Encode an integer to base62 string
-spec(encode(non_neg_integer()) -> binary()).
encode(I) when is_integer(I) andalso I > 0 ->
list_to_binary(encode(I, [])).
encode(I, Acc) when I < 62 ->
[char(I) | Acc];
encode(I, Acc) ->
encode(I div 62, [char(I rem 62) | Acc]).
char(I) when I < 10 ->
$0 + I;
char(I) when I < 36 ->
$A + I - 10;
char(I) when I < 62 ->
$a + I - 36.
%% @doc Decode base62 string to an integer
-spec(decode(string() | binary()) -> integer()).
decode(B) when is_binary(B) ->
decode(binary_to_list(B));
decode(S) when is_list(S) ->
decode(S, 0).
decode([], I) ->
I;
decode([C|S], I) ->
decode(S, I * 62 + byte(C)).
byte(C) when $0 =< C andalso C =< $9 ->
C - $0;
byte(C) when $A =< C andalso C =< $Z ->
C - $A + 10;
byte(C) when $a =< C andalso C =< $z ->
C - $a + 36.

View File

@ -453,12 +453,12 @@ trace_off(Who, Name) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Listeners Command %% @doc Listeners Command
listeners([]) -> listeners([]) ->
foreach(fun({{Protocol, Port}, Pid}) -> foreach(fun({{Protocol, ListenOn}, Pid}) ->
Info = [{acceptors, esockd:get_acceptors(Pid)}, Info = [{acceptors, esockd:get_acceptors(Pid)},
{max_clients, esockd:get_max_clients(Pid)}, {max_clients, esockd:get_max_clients(Pid)},
{current_clients,esockd:get_current_clients(Pid)}, {current_clients,esockd:get_current_clients(Pid)},
{shutdown_count, esockd:get_shutdown_count(Pid)}], {shutdown_count, esockd:get_shutdown_count(Pid)}],
?PRINT("listener on ~s:~w~n", [Protocol, Port]), ?PRINT("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]),
foreach(fun({Key, Val}) -> foreach(fun({Key, Val}) ->
?PRINT(" ~-16s: ~w~n", [Key, Val]) ?PRINT(" ~-16s: ~w~n", [Key, Val])
end, Info) end, Info)

View File

@ -117,8 +117,7 @@ call(SM, Req) ->
init([Pool, Id]) -> init([Pool, Id]) ->
?GPROC_POOL(join, Pool, Id), ?GPROC_POOL(join, Pool, Id),
{ok, #state{pool = Pool, id = Id, {ok, #state{pool = Pool, id = Id, monitors = dict:new()}}.
monitors = dict:new()}}.
prioritise_call(_Msg, _From, _Len, _State) -> prioritise_call(_Msg, _From, _Len, _State) ->
1. 1.
@ -175,7 +174,7 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
[_Sess] -> ok [_Sess] -> ok
end end
end), end),
{noreply, erase_monitor(MRef, State)}; {noreply, erase_monitor(MRef, State), hibernate};
error -> error ->
lager:error("MRef of session ~p not found", [DownPid]), lager:error("MRef of session ~p not found", [DownPid]),
{noreply, State} {noreply, State}

View File

@ -78,6 +78,7 @@ reload_acl(_) ->
register_mod(_) -> register_mod(_) ->
ok = ?AC:register_mod(acl, emqttd_acl_test_mod, []), ok = ?AC:register_mod(acl, emqttd_acl_test_mod, []),
{error, already_existed} = ?AC:register_mod(acl, emqttd_acl_test_mod, []),
[{emqttd_acl_test_mod, _, 0}, [{emqttd_acl_test_mod, _, 0},
{emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl), {emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl),
ok = ?AC:register_mod(auth, emqttd_auth_anonymous_test_mod,[]), ok = ?AC:register_mod(auth, emqttd_auth_anonymous_test_mod,[]),
@ -117,14 +118,14 @@ check_acl(_) ->
compile_rule(_) -> compile_rule(_) ->
{allow, {'and', [{ipaddr, {"127.0.0.1", _I, _I}}, {allow, {'and', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}},
{user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = {user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}), compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}),
{allow, {'or', [{ipaddr, {"127.0.0.1", _I, _I}}, {allow, {'or', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}},
{user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = {user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}), compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}),
{allow, {ipaddr, {"127.0.0.1", _I, _I}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = {allow, {ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}), compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}),
{allow, {user, <<"testuser">>}, subscribe, [ [<<"a">>, <<"b">>, <<"c">>], [<<"d">>, <<"e">>, <<"f">>, '#'] ]} = {allow, {user, <<"testuser">>}, subscribe, [ [<<"a">>, <<"b">>, <<"c">>], [<<"d">>, <<"e">>, <<"f">>, '#'] ]} =
compile({allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}), compile({allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}),

View File

@ -28,9 +28,11 @@
-define(PQ, priority_queue). -define(PQ, priority_queue).
-define(BASE62, emqttd_base62).
all() -> [{group, guid}, {group, opts}, all() -> [{group, guid}, {group, opts},
{group, ?PQ}, {group, time}, {group, ?PQ}, {group, time},
{group, node}]. {group, node}, {group, base62}].
groups() -> groups() ->
[{guid, [], [guid_gen]}, [{guid, [], [guid_gen]},
@ -38,7 +40,8 @@ groups() ->
{?PQ, [], [priority_queue_plen, {?PQ, [], [priority_queue_plen,
priority_queue_out2]}, priority_queue_out2]},
{time, [], [time_now_to_]}, {time, [], [time_now_to_]},
{node, [], [node_is_aliving, node_parse_name]}]. {node, [], [node_is_aliving, node_parse_name]},
{base62, [], [base62_encode]}].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% emqttd_guid %% emqttd_guid
@ -144,3 +147,17 @@ node_parse_name(_) ->
'a@127.0.0.1' = emqttd_node:parse_name("a@127.0.0.1"), 'a@127.0.0.1' = emqttd_node:parse_name("a@127.0.0.1"),
'b@127.0.0.1' = emqttd_node:parse_name("b"). 'b@127.0.0.1' = emqttd_node:parse_name("b").
%%--------------------------------------------------------------------
%% base62 encode decode
%%--------------------------------------------------------------------
base62_encode(_) ->
10 = ?BASE62:decode(?BASE62:encode(10)),
100 = ?BASE62:decode(?BASE62:encode(100)),
9999 = ?BASE62:decode(?BASE62:encode(9999)),
65535 = ?BASE62:decode(?BASE62:encode(65535)),
<<X:128/unsigned-big-integer>> = emqttd_guid:gen(),
<<Y:128/unsigned-big-integer>> = emqttd_guid:gen(),
X = ?BASE62:decode(?BASE62:encode(X)),
Y = ?BASE62:decode(?BASE62:encode(Y)).