From dca292f5387e621676aae0acc241e323c8271eb8 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 27 Aug 2018 15:54:41 +0800 Subject: [PATCH 1/4] Replace 'Client' with 'Credentials' map --- src/emqx_access_control.erl | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 3bb31057e..bd35d7ebc 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -32,8 +32,6 @@ -define(TAB, ?MODULE). -define(SERVER, ?MODULE). --record(state, {}). - %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ @@ -104,6 +102,15 @@ check_acl(Credentials, PubSub, Topic, AclMods, true) -> AclResult end. +do_check_acl(#{zone := Zone}, _PubSub, _Topic, []) -> + emqx_zone:get_env(Zone, acl_nomatch, deny); +do_check_acl(Credentials, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> + case Mod:check_acl({Credentials, PubSub, Topic}, State) of + allow -> allow; + deny -> deny; + ignore -> do_check_acl(Credentials, PubSub, Topic, AclMods) + end. + -spec(reload_acl() -> list(ok | {error, term()})). reload_acl() -> [Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)]. @@ -143,7 +150,7 @@ stop() -> init([]) -> _ = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]), - {ok, #state{}}. + {ok, #{}}. handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> Mods = lookup_mods(Type), @@ -194,15 +201,6 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -do_check_acl(#client{zone = Zone}, _PubSub, _Topic, []) -> - emqx_zone:get_env(Zone, acl_nomatch, deny); -do_check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> - case Mod:check_acl({Client, PubSub, Topic}, State) of - allow -> allow; - deny -> deny; - ignore -> do_check_acl(Client, PubSub, Topic, AclMods) - end. - %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- From 42288ac4129004f52bdf85072fbac50452d9a64a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 27 Aug 2018 15:57:13 +0800 Subject: [PATCH 2/4] The ACL file should not be undefined --- src/emqx_acl_internal.erl | 14 ++--- src/emqx_listeners.erl | 113 +++++++++++++++++++++++++------------- 2 files changed, 80 insertions(+), 47 deletions(-) diff --git a/src/emqx_acl_internal.erl b/src/emqx_acl_internal.erl index d226fb496..54f944416 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_acl_internal.erl @@ -70,11 +70,8 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> false. %% @doc Check ACL --spec(check_acl({credentials(), pubsub(), topic()}, #{}) - -> allow | deny | ignore). -check_acl(_Who, #{acl_file := undefined}) -> - allow; -check_acl({Credentials, PubSub, Topic}, #{}) -> +-spec(check_acl({credentials(), pubsub(), topic()}, #{}) -> allow | deny | ignore). +check_acl({Credentials, PubSub, Topic}, _State) -> case match(Credentials, Topic, lookup(PubSub)) of {matched, allow} -> allow; {matched, deny} -> deny; @@ -98,12 +95,11 @@ match(Credentials, Topic, [Rule|Rules]) -> end. -spec(reload_acl(#{}) -> ok | {error, term()}). -reload_acl(#{acl_file := undefined}) -> - ok; reload_acl(#{acl_file := AclFile}) -> case catch load_rules_from_file(AclFile) of - true -> emqx_logger:error("Reload acl_file ~s successfully", [AclFile]), - ok; + true -> + emqx_logger:info("Reload acl_file ~s successfully", [AclFile]), + ok; {'EXIT', Error} -> {error, Error} end. diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 084ffe7c2..16cd06b4f 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -18,49 +18,64 @@ -include("emqx_mqtt.hrl"). -export([start/0, restart/0, stop/0]). --export([start_listener/1, stop_listener/1, restart_listener/1]). +-export([start_listener/1, start_listener/3]). +-export([restart_listener/1, restart_listener/3]). +-export([stop_listener/1, stop_listener/3]). --type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}). +-type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}). -%% @doc Start all listeners +%% @doc Start all listeners. -spec(start() -> ok). start() -> lists:foreach(fun start_listener/1, emqx_config:get_env(listeners, [])). -%% Start MQTT/TCP listener -spec(start_listener(listener()) -> {ok, pid()} | {error, term()}). -start_listener({tcp, ListenOn, Options}) -> +start_listener({Proto, ListenOn, Options}) -> + case start_listener(Proto, ListenOn, Options) of + {ok, _} -> + io:format("Start mqtt:~s listener on ~s successfully~n", [Proto, format(ListenOn)]); + {error, Reason} -> + io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p", + [Proto, format(ListenOn), Reason]) + end. + +%% Start MQTT/TCP listener +-spec(start_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) + -> {ok, pid()} | {error, term()}). +start_listener(tcp, ListenOn, Options) -> start_mqtt_listener('mqtt:tcp', ListenOn, Options); %% Start MQTT/TLS listener -start_listener({Proto, ListenOn, Options}) when Proto == ssl; Proto == tls -> +start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls -> start_mqtt_listener('mqtt:ssl', ListenOn, Options); %% Start MQTT/WS listener -start_listener({Proto, ListenOn, Options}) when Proto == http; Proto == ws -> +start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]), - NumAcceptors = proplists:get_value(acceptors, Options, 4), - MaxConnections = proplists:get_value(max_connections, Options, 1024), - TcpOptions = proplists:get_value(tcp_options, Options, []), - RanchOpts = [{num_acceptors, NumAcceptors}, - {max_connections, MaxConnections} | TcpOptions], - cowboy:start_clear('mqtt:ws', with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}); + start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), Dispatch); %% Start MQTT/WSS listener -start_listener({Proto, ListenOn, Options}) when Proto == https; Proto == wss -> - Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws, []}]}]), - NumAcceptors = proplists:get_value(acceptors, Options, 4), - MaxConnections = proplists:get_value(max_clients, Options, 1024), - TcpOptions = proplists:get_value(tcp_options, Options, []), - SslOptions = proplists:get_value(ssl_options, Options, []), - RanchOpts = [{num_acceptors, NumAcceptors}, - {max_connections, MaxConnections} | TcpOptions ++ SslOptions], - cowboy:start_tls('mqtt:wss', with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}). +start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> + Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]), + start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), Dispatch). start_mqtt_listener(Name, ListenOn, Options) -> SockOpts = esockd:parse_opt(Options), - MFA = {emqx_connection, start_link, [Options -- SockOpts]}, - {ok, _} = esockd:open(Name, ListenOn, merge_default(SockOpts), MFA). + esockd:open(Name, ListenOn, merge_default(SockOpts), + {emqx_connection, start_link, [Options -- SockOpts]}). + +start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) -> + Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}). + +ranch_opts(Options) -> + NumAcceptors = proplists:get_value(acceptors, Options, 4), + MaxConnections = proplists:get_value(max_connections, Options, 1024), + TcpOptions = proplists:get_value(tcp_options, Options, []), + RanchOpts = [{num_acceptors, NumAcceptors}, {max_connections, MaxConnections} | TcpOptions], + case proplists:get_value(ssl_options, Options) of + undefined -> RanchOpts; + SslOptions -> RanchOpts ++ SslOptions + end. with_port(Port, Opts) when is_integer(Port) -> [{port, Port}|Opts]; @@ -73,34 +88,49 @@ restart() -> lists:foreach(fun restart_listener/1, emqx_config:get_env(listeners, [])). -spec(restart_listener(listener()) -> any()). -restart_listener({tcp, ListenOn, _Options}) -> +restart_listener({Proto, ListenOn, Options}) -> + restart_listener(Proto, ListenOn, Options). + +-spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) -> any()). +restart_listener(tcp, ListenOn, _Options) -> esockd:reopen('mqtt:tcp', ListenOn); -restart_listener({Proto, ListenOn, _Options}) when Proto == ssl; Proto == tls -> +restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls -> esockd:reopen('mqtt:ssl', ListenOn); -restart_listener({Proto, ListenOn, Options}) when Proto == http; Proto == ws -> +restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> cowboy:stop_listener('mqtt:ws'), - start_listener({Proto, ListenOn, Options}); -restart_listener({Proto, ListenOn, Options}) when Proto == https; Proto == wss -> + start_listener(Proto, ListenOn, Options); +restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> cowboy:stop_listener('mqtt:wss'), - start_listener({Proto, ListenOn, Options}); -restart_listener({Proto, ListenOn, _Opts}) -> + start_listener(Proto, ListenOn, Options); +restart_listener(Proto, ListenOn, _Opts) -> esockd:reopen(Proto, ListenOn). -%% @doc Stop all listeners +%% @doc Stop all listeners. -spec(stop() -> ok). stop() -> lists:foreach(fun stop_listener/1, emqx_config:get_env(listeners, [])). --spec(stop_listener(listener()) -> ok | {error, any()}). -stop_listener({tcp, ListenOn, _Opts}) -> +-spec(stop_listener(listener()) -> ok | {error, term()}). +stop_listener({Proto, ListenOn, Opts}) -> + case stop_listener(Proto, ListenOn, Opts) of + ok -> + io:format("Stop mqtt:~s listener on ~s successfully~n", [Proto, format(ListenOn)]); + {error, Reason} -> + io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p", + [Proto, format(ListenOn), Reason]) + end. + +-spec(stop_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) + -> ok | {error, term()}). +stop_listener(tcp, ListenOn, _Opts) -> esockd:close('mqtt:tcp', ListenOn); -stop_listener({Proto, ListenOn, _Opts}) when Proto == ssl; Proto == tls -> +stop_listener(Proto, ListenOn, _Opts) when Proto == ssl; Proto == tls -> esockd:close('mqtt:ssl', ListenOn); -stop_listener({Proto, _ListenOn, _Opts}) when Proto == http; Proto == ws -> +stop_listener(Proto, _ListenOn, _Opts) when Proto == http; Proto == ws -> cowboy:stop_listener('mqtt:ws'); -stop_listener({Proto, _ListenOn, _Opts}) when Proto == https; Proto == wss -> +stop_listener(Proto, _ListenOn, _Opts) when Proto == https; Proto == wss -> cowboy:stop_listener('mqtt:wss'); -stop_listener({Proto, ListenOn, _Opts}) -> +stop_listener(Proto, ListenOn, _Opts) -> esockd:close(Proto, ListenOn). merge_default(Options) -> @@ -111,3 +141,10 @@ merge_default(Options) -> [{tcp_options, ?MQTT_SOCKOPTS} | Options] end. +format(Port) when is_integer(Port) -> + io_lib:format("0.0.0.0:~w", [Port]); +format({Addr, Port}) when is_list(Addr) -> + io_lib:format("~s:~w", [Addr, Port]); +format({Addr, Port}) when is_tuple(Addr) -> + io_lib:format("~s:~w", [esockd_net:ntoab(Addr), Port]). + From e5b2e584e9df62347791670571ef4a41da33d935 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 27 Aug 2018 15:57:36 +0800 Subject: [PATCH 3/4] Remove 'TODO:' --- etc/emqx.conf | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index f3f46589e..c35b1d0f7 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -424,7 +424,7 @@ log.syslog = on ## Value: true | false allow_anonymous = true -## TODO: Allow or deny if no ACL rules match. +## Allow or deny if no ACL rules matched. ## ## Value: allow | deny acl_nomatch = allow @@ -455,7 +455,6 @@ acl_cache_max_size = 32 ## Default: 1 minute acl_cache_ttl = 1m - ##-------------------------------------------------------------------- ## MQTT Protocol ##-------------------------------------------------------------------- From a19daee3538851317793d4d69a585f7d40e6a84c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 27 Aug 2018 16:19:58 +0800 Subject: [PATCH 4/4] Improve the print of listener startup --- src/emqx_listeners.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 16cd06b4f..421304f3a 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -33,9 +33,9 @@ start() -> start_listener({Proto, ListenOn, Options}) -> case start_listener(Proto, ListenOn, Options) of {ok, _} -> - io:format("Start mqtt:~s listener on ~s successfully~n", [Proto, format(ListenOn)]); + io:format("Start mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p", + io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p!", [Proto, format(ListenOn), Reason]) end. @@ -114,9 +114,9 @@ stop() -> stop_listener({Proto, ListenOn, Opts}) -> case stop_listener(Proto, ListenOn, Opts) of ok -> - io:format("Stop mqtt:~s listener on ~s successfully~n", [Proto, format(ListenOn)]); + io:format("Stop mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p", + io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p.", [Proto, format(ListenOn), Reason]) end.