From bb417e44986f444349143ee9aa26b83bf277f15b Mon Sep 17 00:00:00 2001 From: zhanghongtong Date: Tue, 13 Jul 2021 16:15:52 +0800 Subject: [PATCH] feat(authz): support http Signed-off-by: zhanghongtong --- apps/emqx/rebar.config | 2 +- apps/emqx_authz/etc/emqx_authz.conf | 10 ++ apps/emqx_authz/src/emqx_authz.erl | 12 ++- apps/emqx_authz/src/emqx_authz_http.erl | 99 +++++++++++++++++++ apps/emqx_authz/src/emqx_authz_schema.erl | 84 +++++++++++++++- .../emqx_authz/test/emqx_authz_http_SUITE.erl | 94 ++++++++++++++++++ .../src/emqx_connector_http.erl | 40 +++----- rebar.config | 4 +- 8 files changed, 313 insertions(+), 32 deletions(-) create mode 100644 apps/emqx_authz/src/emqx_authz_http.erl create mode 100644 apps/emqx_authz/test/emqx_authz_http_SUITE.erl diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 3aa80c416..92ff5a9b3 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -15,7 +15,7 @@ , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.3"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.9.6"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.10.3"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}} diff --git a/apps/emqx_authz/etc/emqx_authz.conf b/apps/emqx_authz/etc/emqx_authz.conf index e91a68a63..b984a19e6 100644 --- a/apps/emqx_authz/etc/emqx_authz.conf +++ b/apps/emqx_authz/etc/emqx_authz.conf @@ -1,5 +1,15 @@ emqx_authz:{ rules: [ + # { + # type: http + # config: { + # url: "https://emqx.com" + # headers: { + # Accept: "application/json" + # Content-Type: "application/json" + # } + # } + # }, # { # type: mysql # config: { diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 78aa47d91..e4006c6db 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -88,6 +88,14 @@ compile(#{topics := Topics, topics => NTopics }; +compile(#{principal := Principal, + type := http, + config := #{url := Url} = Config + } = Rule) -> + NConfig = maps:merge(Config, #{base_url => maps:remove(query, Url)}), + NRule = create_resource(Rule#{config := NConfig}), + NRule#{principal => compile_principal(Principal)}; + compile(#{principal := Principal, type := DB } = Rule) when DB =:= redis; @@ -150,8 +158,8 @@ b2l(B) when is_binary(B) -> binary_to_list(B). -spec(authorize(emqx_types:clientinfo(), emqx_types:all(), emqx_topic:topic(), emqx_permission_rule:acl_result(), rules()) -> {stop, allow} | {ok, deny}). authorize(#{username := Username, - peerhost := IpAddress - } = Client, PubSub, Topic, _DefaultResult, Rules) -> + peerhost := IpAddress + } = Client, PubSub, Topic, _DefaultResult, Rules) -> case do_authorize(Client, PubSub, Topic, Rules) of {matched, allow} -> ?LOG(info, "Client succeeded authorization: Username: ~p, IP: ~p, Topic: ~p, Permission: allow", [Username, IpAddress, Topic]), diff --git a/apps/emqx_authz/src/emqx_authz_http.erl b/apps/emqx_authz/src/emqx_authz_http.erl new file mode 100644 index 000000000..e39a8ef0b --- /dev/null +++ b/apps/emqx_authz/src/emqx_authz_http.erl @@ -0,0 +1,99 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authz_http). + +-include("emqx_authz.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% AuthZ Callbacks +-export([ authorize/4 + , description/0 + ]). + +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. + +description() -> + "AuthZ with http". + +authorize(Client, PubSub, Topic, + #{resource_id := ResourceID, + type := http, + config := #{url := #{path := Path} = Url, + headers := Headers, + method := Method, + request_timeout := RequestTimeout} = Config + }) -> + Request = case Method of + get -> + Query = maps:get(query, Url, ""), + Path1 = replvar(Path ++ "?" ++ Query, PubSub, Topic, Client), + {Path1, maps:to_list(Headers)}; + _ -> + Body0 = serialize_body( + maps:get('Accept', Headers, <<"application/json">>), + maps:get(body, Config, #{}) + ), + Body1 = replvar(Body0, PubSub, Topic, Client), + Path1 = replvar(Path, PubSub, Topic, Client), + {Path1, maps:to_list(Headers), Body1} + end, + case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of + {ok, 204, _Headers} -> {matched, allow}; + {ok, 200, _Headers, _Body} -> {matched, allow}; + _ -> nomatch + end. + +query_string(Body) -> + query_string(maps:to_list(Body), []). + +query_string([], Acc) -> + <<$&, Str/binary>> = iolist_to_binary(lists:reverse(Acc)), + Str; +query_string([{K, V} | More], Acc) -> + query_string(More, [["&", emqx_http_lib:uri_encode(K), "=", emqx_http_lib:uri_encode(V)] | Acc]). + +serialize_body(<<"application/json">>, Body) -> + jsx:encode(Body); +serialize_body(<<"application/x-www-form-urlencoded">>, Body) -> + query_string(Body). + +replvar(Str0, PubSub, Topic, + #{username := Username, + clientid := Clientid, + peerhost := IpAddress, + protocol := Protocol, + mountpoint := Mountpoint + }) when is_list(Str0); + is_binary(Str0) -> + NTopic = emqx_http_lib:uri_encode(Topic), + Str1 = re:replace(Str0, "%c", Clientid, [global, {return, binary}]), + Str2 = re:replace(Str1, "%u", Username, [global, {return, binary}]), + Str3 = re:replace(Str2, "%a", inet_parse:ntoa(IpAddress), [global, {return, binary}]), + Str4 = re:replace(Str3, "%r", bin(Protocol), [global, {return, binary}]), + Str5 = re:replace(Str4, "%m", Mountpoint, [global, {return, binary}]), + Str6 = re:replace(Str5, "%t", NTopic, [global, {return, binary}]), + Str7 = re:replace(Str6, "%A", bin(PubSub), [global, {return, binary}]), + Str7. + +bin(A) when is_atom(A) -> atom_to_binary(A, utf8); +bin(B) when is_binary(B) -> B; +bin(L) when is_list(L) -> list_to_binary(L); +bin(X) -> X. diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index f1a79db25..9554aade4 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -4,18 +4,87 @@ -type action() :: publish | subscribe | all. -type permission() :: allow | deny. +-type url() :: emqx_http_lib:uri_map(). -reflect_type([ permission/0 , action/0 + , url/0 ]). --export([structs/0, fields/1]). +-typerefl_from_string({url/0, emqx_http_lib, uri_parse}). + +-export([ structs/0 + , fields/1 + ]). structs() -> ["emqx_authz"]. fields("emqx_authz") -> [ {rules, rules()} ]; +fields(http) -> + [ {principal, principal()} + , {type, #{type => http}} + , {config, #{type => hoconsc:union([ hoconsc:ref(?MODULE, http_get) + , hoconsc:ref(?MODULE, http_post) + ])} + } + ]; +fields(http_get) -> + [ {url, #{type => url()}} + , {headers, #{type => map(), + default => #{ <<"accept">> => <<"application/json">> + , <<"cache-control">> => <<"no-cache">> + , <<"connection">> => <<"keep-alive">> + , <<"keep-alive">> => <<"timeout=5">> + }, + converter => fun (Headers0) -> + Headers1 = maps:fold(fun(K0, V, AccIn) -> + K1 = iolist_to_binary(string:to_lower(binary_to_list(K0))), + maps:put(K1, V, AccIn) + end, #{}, Headers0), + maps:merge(#{ <<"accept">> => <<"application/json">> + , <<"cache-control">> => <<"no-cache">> + , <<"connection">> => <<"keep-alive">> + , <<"keep-alive">> => <<"timeout=5">> + }, Headers1) + end + } + } + , {method, #{type => get, + default => get + }} + ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)); +fields(http_post) -> + [ {url, #{type => url()}} + , {headers, #{type => map(), + default => #{ <<"accept">> => <<"application/json">> + , <<"cache-control">> => <<"no-cache">> + , <<"connection">> => <<"keep-alive">> + , <<"content-type">> => <<"application/json">> + , <<"keep-alive">> => <<"timeout=5">> + }, + converter => fun (Headers0) -> + Headers1 = maps:fold(fun(K0, V, AccIn) -> + K1 = iolist_to_binary(string:to_lower(binary_to_list(K0))), + maps:put(K1, V, AccIn) + end, #{}, Headers0), + maps:merge(#{ <<"accept">> => <<"application/json">> + , <<"cache-control">> => <<"no-cache">> + , <<"connection">> => <<"keep-alive">> + , <<"content-type">> => <<"application/json">> + , <<"keep-alive">> => <<"timeout=5">> + }, Headers1) + end + } + } + , {method, #{type => hoconsc:enum([post, put]), + default => get}} + , {body, #{type => map(), + nullable => true + } + } + ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)); fields(mongo) -> connector_fields(mongo) ++ [ {collection, #{type => atom()}} @@ -75,9 +144,10 @@ fields(eq_topic) -> union_array(Item) when is_list(Item) -> hoconsc:array(hoconsc:union(Item)). -rules() -> +rules() -> #{type => union_array( [ hoconsc:ref(?MODULE, simple_rule) + , hoconsc:ref(?MODULE, http) , hoconsc:ref(?MODULE, mysql) , hoconsc:ref(?MODULE, pgsql) , hoconsc:ref(?MODULE, redis) @@ -108,7 +178,15 @@ query() -> }. connector_fields(DB) -> - Mod = list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])), + Mod0 = io_lib:format("~s_~s",[emqx_connector, DB]), + Mod = try + list_to_existing_atom(Mod0) + catch + error:badarg -> + list_to_atom(Mod0); + Error -> + erlang:error(Error) + end, [ {principal, principal()} , {type, #{type => DB}} ] ++ Mod:fields(""). diff --git a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl new file mode 100644 index 000000000..77f78bf89 --- /dev/null +++ b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl @@ -0,0 +1,94 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authz_http_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include("emqx_authz.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + emqx_ct:all(?MODULE). + +groups() -> + []. + +init_per_suite(Config) -> + meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), + meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ), + ok = emqx_ct_helpers:start_apps([emqx_authz], fun set_special_configs/1), + Config. + +end_per_suite(_Config) -> + file:delete(filename:join(emqx:get_env(plugins_etc_dir), 'authz.conf')), + emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]), + meck:unload(emqx_resource). + +set_special_configs(emqx) -> + application:set_env(emqx, allow_anonymous, true), + application:set_env(emqx, enable_acl_cache, false), + application:set_env(emqx, acl_nomatch, deny), + application:set_env(emqx, plugins_loaded_file, + emqx_ct_helpers:deps_path(emqx, "test/loaded_plguins")), + ok; +set_special_configs(emqx_authz) -> + Rules = [#{config =>#{ + url => #{host => "fake.com", + path => "/", + port => 443, + scheme => https}, + headers => #{}, + method => get, + request_timeout => 5000 + }, + principal => all, + type => http} + ], + emqx_config:put([emqx_authz], #{rules => Rules}), + ok; +set_special_configs(_App) -> + ok. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_authz(_) -> + ClientInfo = #{clientid => <<"clientid">>, + username => <<"username">>, + peerhost => {127,0,0,1}, + protocol => mqtt, + mountpoint => <<"fake">> + }, + + meck:expect(emqx_resource, query, fun(_, _) -> {ok, 204, fake_headers} end), + ?assertEqual(allow, + emqx_access_control:authorize(ClientInfo, subscribe, <<"#">>)), + + meck:expect(emqx_resource, query, fun(_, _) -> {ok, 200, fake_headers, fake_body} end), + ?assertEqual(allow, + emqx_access_control:authorize(ClientInfo, publish, <<"#">>)), + + + meck:expect(emqx_resource, query, fun(_, _) -> {error, other} end), + ?assertEqual(deny, + emqx_access_control:authorize(ClientInfo, subscribe, <<"+">>)), + ?assertEqual(deny, + emqx_access_control:authorize(ClientInfo, publish, <<"+">>)), + ok. + diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 94940a065..d6d30eb76 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -28,6 +28,10 @@ , on_health_check/2 ]). +-type url() :: emqx_http_lib:uri_map(). +-reflect_type([url/0]). +-typerefl_from_string({url/0, emqx_http_lib, uri_parse}). + -export([ structs/0 , fields/1 , validations/0]). @@ -53,7 +57,6 @@ fields(config) -> , {connect_timeout, fun connect_timeout/1} , {max_retries, fun max_retries/1} , {retry_interval, fun retry_interval/1} - , {keepalive, fun keepalive/1} , {pool_type, fun pool_type/1} , {pool_size, fun pool_size/1} , {ssl_opts, #{type => hoconsc:ref(?MODULE, ssl_opts), @@ -70,9 +73,12 @@ fields(ssl_opts) -> validations() -> [ {check_ssl_opts, fun check_ssl_opts/1} ]. -base_url(type) -> binary(); +base_url(type) -> url(); base_url(nullable) -> false; -base_url(validate) -> [fun check_base_url/1]; +base_url(validate) -> fun (#{query := _Query}) -> + {error, "There must be no query in the base_url"}; + (_) -> ok + end; base_url(_) -> undefined. connect_timeout(type) -> connect_timeout(); @@ -87,10 +93,6 @@ retry_interval(type) -> non_neg_integer(); retry_interval(default) -> 1000; retry_interval(_) -> undefined. -keepalive(type) -> non_neg_integer(); -keepalive(default) -> 5000; -keepalive(_) -> undefined. - pool_type(type) -> pool_type(); pool_type(default) -> random; pool_type(_) -> undefined. @@ -117,18 +119,16 @@ verify(default) -> false; verify(_) -> undefined. %% =================================================================== -on_start(InstId, #{url := URL, +on_start(InstId, #{base_url := #{scheme := Scheme, + host := Host, + port := Port, + path := BasePath}, connect_timeout := ConnectTimeout, max_retries := MaxRetries, retry_interval := RetryInterval, - keepalive := Keepalive, pool_type := PoolType, pool_size := PoolSize} = Config) -> logger:info("starting http connector: ~p, config: ~p", [InstId, Config]), - {ok, #{scheme := Scheme, - host := Host, - port := Port, - path := BasePath}} = emqx_http_lib:uri_parse(URL), {Transport, TransportOpts} = case Scheme of http -> {tcp, []}; @@ -143,7 +143,7 @@ on_start(InstId, #{url := URL, , {connect_timeout, ConnectTimeout} , {retry, MaxRetries} , {retry_timeout, RetryInterval} - , {keepalive, Keepalive} + , {keepalive, 5000} , {pool_type, PoolType} , {pool_size, PoolSize} , {transport, Transport} @@ -192,19 +192,11 @@ on_health_check(_InstId, #{host := Host, port := Port} = State) -> %% Internal functions %%-------------------------------------------------------------------- -check_base_url(URL) -> - case emqx_http_lib:uri_parse(URL) of - {error, _} -> false; - {ok, #{query := _}} -> false; - _ -> true - end. - check_ssl_opts(Conf) -> check_ssl_opts("base_url", Conf). check_ssl_opts(URLFrom, Conf) -> - URL = hocon_schema:get_value(URLFrom, Conf), - {ok, #{scheme := Scheme}} = emqx_http_lib:uri_parse(URL), + #{schema := Scheme} = hocon_schema:get_value(URLFrom, Conf), SSLOpts = hocon_schema:get_value("ssl_opts", Conf), case {Scheme, maps:size(SSLOpts)} of {http, 0} -> true; @@ -216,4 +208,4 @@ check_ssl_opts(URLFrom, Conf) -> update_path(BasePath, {Path, Headers}) -> {filename:join(BasePath, Path), Headers}; update_path(BasePath, {Path, Headers, Body}) -> - {filename:join(BasePath, Path), Headers, Body}. \ No newline at end of file + {filename:join(BasePath, Path), Headers, Body}. diff --git a/rebar.config b/rebar.config index 77e98ce68..56cf8c00b 100644 --- a/rebar.config +++ b/rebar.config @@ -60,8 +60,8 @@ , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {getopt, "1.0.1"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.9.6"}}} - , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.2.1"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.10.3"}}} + , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.3.0"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.1.0"}}} ]}.