diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index ec2da3237..829ce2282 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -284,7 +284,7 @@ replace_placeholders([{K, V0} | More], Credential, Acc) -> undefined -> error({cannot_get_variable, V0}); V -> - replace_placeholders(More, Credential, [{K, emqx_authn_utils:bin(V)} | Acc]) + replace_placeholders(More, Credential, [{K, to_bin(V)} | Acc]) end. append_query(Path, []) -> diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 2bcb7971d..0b5534608 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -207,7 +207,12 @@ check_dup_types([Source | Sources], Checked) -> create_dry_run(T, Source) -> case is_connector_source(T) of true -> - [NSource] = check_sources([Source]), + [CheckedSource] = check_sources([Source]), + case T of + http -> + URIMap = maps:get(url, CheckedSource), + NSource = maps:put(base_url, maps:remove(query, URIMap), CheckedSource) + end, emqx_resource:create_dry_run(connector_module(T), NSource); false -> ok diff --git a/apps/emqx_authz/src/emqx_authz_api_sources.erl b/apps/emqx_authz/src/emqx_authz_api_sources.erl index 23c6077fa..df5a6c819 100644 --- a/apps/emqx_authz/src/emqx_authz_api_sources.erl +++ b/apps/emqx_authz/src/emqx_authz_api_sources.erl @@ -440,14 +440,15 @@ read_certs(#{<<"ssl">> := SSL} = Source) -> {error, Reason} -> ?SLOG(error, Reason#{msg => failed_to_readd_ssl_file}), throw(failed_to_readd_ssl_file); - NewSSL -> + {ok, NewSSL} -> Source#{<<"ssl">> => NewSSL} end; read_certs(Source) -> Source. maybe_write_certs(#{<<"ssl">> := #{<<"enable">> := true} = SSL} = Source) -> Type = maps:get(<<"type">>, Source), - emqx_tls_lib:ensure_ssl_files(filename:join(["authz", Type]), SSL); + {ok, Return} = emqx_tls_lib:ensure_ssl_files(filename:join(["authz", Type]), SSL), + maps:put(<<"ssl">>, Return, Source); maybe_write_certs(Source) -> Source. write_file(Filename, Bytes0) -> diff --git a/apps/emqx_authz/src/emqx_authz_http.erl b/apps/emqx_authz/src/emqx_authz_http.erl index 4c17f90ec..6d1324c47 100644 --- a/apps/emqx_authz/src/emqx_authz_http.erl +++ b/apps/emqx_authz/src/emqx_authz_http.erl @@ -24,6 +24,7 @@ %% AuthZ Callbacks -export([ authorize/4 , description/0 + , parse_url/1 ]). -ifdef(TEST). @@ -36,7 +37,7 @@ description() -> authorize(Client, PubSub, Topic, #{type := http, - url := #{path := Path} = Url, + url := #{path := Path} = URL, headers := Headers, method := Method, request_timeout := RequestTimeout, @@ -44,7 +45,7 @@ authorize(Client, PubSub, Topic, } = Source) -> Request = case Method of get -> - Query = maps:get(query, Url, ""), + Query = maps:get(query, URL, ""), Path1 = replvar(Path ++ "?" ++ Query, PubSub, Topic, Client), {Path1, maps:to_list(Headers)}; _ -> @@ -56,10 +57,32 @@ authorize(Client, PubSub, Topic, Path1 = replvar(Path, PubSub, Topic, Client), {Path1, maps:to_list(Headers), Body1} end, - case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of - {ok, 204, _Headers} -> {matched, allow}; - {ok, 200, _Headers, _Body} -> {matched, allow}; - _ -> nomatch + case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of + {ok, 200, _Headers} -> + {matched, allow}; + {ok, 204, _Headers} -> + {matched, allow}; + {ok, 200, _Headers, _Body} -> + {matched, allow}; + {ok, _Status, _Headers, _Body} -> + nomatch; + {error, Reason} -> + ?SLOG(error, #{msg => "http_server_query_failed", + resource => ResourceID, + reason => Reason}), + ignore + end. + +parse_url(URL) + when URL =:= undefined -> + #{}; +parse_url(URL) -> + {ok, URIMap} = emqx_http_lib:uri_parse(URL), + case maps:get(query, URIMap, undefined) of + undefined -> + URIMap#{query => ""}; + _ -> + URIMap end. query_string(Body) -> @@ -88,7 +111,7 @@ replvar(Str0, PubSub, Topic, is_binary(Str0) -> NTopic = emqx_http_lib:uri_encode(Topic), Str1 = re:replace( Str0, emqx_authz:ph_to_re(?PH_S_CLIENTID) - , Clientid, [global, {return, binary}]), + , bin(Clientid), [global, {return, binary}]), Str2 = re:replace( Str1, emqx_authz:ph_to_re(?PH_S_USERNAME) , bin(Username), [global, {return, binary}]), Str3 = re:replace( Str2, emqx_authz:ph_to_re(?PH_S_HOST) @@ -96,9 +119,9 @@ replvar(Str0, PubSub, Topic, Str4 = re:replace( Str3, emqx_authz:ph_to_re(?PH_S_PROTONAME) , bin(Protocol), [global, {return, binary}]), Str5 = re:replace( Str4, emqx_authz:ph_to_re(?PH_S_MOUNTPOINT) - , Mountpoint, [global, {return, binary}]), + , bin(Mountpoint), [global, {return, binary}]), Str6 = re:replace( Str5, emqx_authz:ph_to_re(?PH_S_TOPIC) - , NTopic, [global, {return, binary}]), + , bin(NTopic), [global, {return, binary}]), Str7 = re:replace( Str6, emqx_authz:ph_to_re(?PH_S_ACTION) , bin(PubSub), [global, {return, binary}]), Str7. diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index 884f6e82b..4f7788849 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -32,10 +32,15 @@ -export([ namespace/0 , roots/0 , fields/1 + , validations/0 ]). -import(emqx_schema, [mk_duration/2]). +%%-------------------------------------------------------------------- +%% Hocon Schema +%%-------------------------------------------------------------------- + namespace() -> authz. %% @doc authorization schema is not exported @@ -98,92 +103,24 @@ and the new rules will override all rules from the old config file. }} ]; fields(http_get) -> - [ {type, #{type => http}} - , {enable, #{type => boolean(), - default => true}} - , {url, #{type => url()}} - , {method, #{type => get, default => get }} - , {headers, #{type => map(), - default => #{ <<"accept">> => <<"application/json">> - , <<"cache-control">> => <<"no-cache">> - , <<"connection">> => <<"keep-alive">> - , <<"keep-alive">> => <<"timeout=5">> - }, - converter => fun (Headers0) -> - Headers1 = maps:fold(fun(K0, V, AccIn) -> - K1 = iolist_to_binary(string:to_lower(to_list(K0))), - maps:put(K1, V, AccIn) - end, #{}, Headers0), - maps:merge(#{ <<"accept">> => <<"application/json">> - , <<"cache-control">> => <<"no-cache">> - , <<"connection">> => <<"keep-alive">> - , <<"keep-alive">> => <<"timeout=5">> - }, Headers1) - end - } - } - , {request_timeout, mk_duration("request timeout", #{default => "30s"})} - ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)); + [ {method, #{type => get, default => post}} + , {headers, fun headers_no_content_type/1} + ] ++ http_common_fields(); fields(http_post) -> - [ {type, #{type => http}} - , {enable, #{type => boolean(), - default => true}} - , {url, #{type => url()}} - , {method, #{type => post, - default => get}} - , {headers, #{type => map(), - default => #{ <<"accept">> => <<"application/json">> - , <<"cache-control">> => <<"no-cache">> - , <<"connection">> => <<"keep-alive">> - , <<"content-type">> => <<"application/json">> - , <<"keep-alive">> => <<"timeout=5">> - }, - converter => fun (Headers0) -> - Headers1 = maps:fold(fun(K0, V, AccIn) -> - K1 = iolist_to_binary(string:to_lower(binary_to_list(K0))), - maps:put(K1, V, AccIn) - end, #{}, Headers0), - maps:merge(#{ <<"accept">> => <<"application/json">> - , <<"cache-control">> => <<"no-cache">> - , <<"connection">> => <<"keep-alive">> - , <<"content-type">> => <<"application/json">> - , <<"keep-alive">> => <<"timeout=5">> - }, Headers1) - end - } - } - , {request_timeout, mk_duration("request timeout", #{default => "30s"})} - , {body, #{type => map(), - nullable => true - } - } - ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)); + [ {method, #{type => post, default => post}} + , {headers, fun headers/1} + ] ++ http_common_fields(); fields(mnesia) -> [ {type, #{type => 'built-in-database'}} , {enable, #{type => boolean(), default => true}} ]; fields(mongo_single) -> - [ {collection, #{type => atom()}} - , {selector, #{type => map()}} - , {type, #{type => mongodb}} - , {enable, #{type => boolean(), - default => true}} - ] ++ emqx_connector_mongo:fields(single); + mongo_common_fields() ++ emqx_connector_mongo:fields(single); fields(mongo_rs) -> - [ {collection, #{type => atom()}} - , {selector, #{type => map()}} - , {type, #{type => mongodb}} - , {enable, #{type => boolean(), - default => true}} - ] ++ emqx_connector_mongo:fields(rs); + mongo_common_fields() ++ emqx_connector_mongo:fields(rs); fields(mongo_sharded) -> - [ {collection, #{type => atom()}} - , {selector, #{type => map()}} - , {type, #{type => mongodb}} - , {enable, #{type => boolean(), - default => true}} - ] ++ emqx_connector_mongo:fields(sharded); + mongo_common_fields() ++ emqx_connector_mongo:fields(sharded); fields(mysql) -> connector_fields(mysql) ++ [ {query, query()} ]; @@ -203,10 +140,87 @@ fields(redis_cluster) -> connector_fields(redis, cluster) ++ [ {cmd, query()} ]. +http_common_fields() -> + [ {type, #{type => http}} + , {enable, #{type => boolean(), default => true}} + , {url, #{type => url()}} + , {request_timeout, mk_duration("request timeout", #{default => "30s"})} + , {body, #{type => map(), nullable => true}} + ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)). + +mongo_common_fields() -> + [ {collection, #{type => atom()}} + , {selector, #{type => map()}} + , {type, #{type => mongodb}} + , {enable, #{type => boolean(), + default => true}} + ]. + +validations() -> + [ {check_ssl_opts, fun check_ssl_opts/1} + , {check_headers, fun check_headers/1} + ]. + +headers(type) -> map(); +headers(converter) -> + fun(Headers) -> + maps:merge(default_headers(), transform_header_name(Headers)) + end; +headers(default) -> default_headers(); +headers(_) -> undefined. + +headers_no_content_type(type) -> map(); +headers_no_content_type(converter) -> + fun(Headers) -> + maps:merge(default_headers_no_content_type(), transform_header_name(Headers)) + end; +headers_no_content_type(default) -> default_headers_no_content_type(); +headers_no_content_type(_) -> undefined. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- +default_headers() -> + maps:put(<<"content-type">>, + <<"application/json">>, + default_headers_no_content_type()). + +default_headers_no_content_type() -> + #{ <<"accept">> => <<"application/json">> + , <<"cache-control">> => <<"no-cache">> + , <<"connection">> => <<"keep-alive">> + , <<"keep-alive">> => <<"timeout=5">> + }. + +transform_header_name(Headers) -> + maps:fold(fun(K0, V, Acc) -> + K = list_to_binary(string:to_lower(to_list(K0))), + maps:put(K, V, Acc) + end, #{}, Headers). + +check_ssl_opts(Conf) + when Conf =:= #{} -> + true; +check_ssl_opts(Conf) -> + case emqx_authz_http:parse_url(hocon_schema:get_value("config.url", Conf)) of + #{scheme := https} -> + case hocon_schema:get_value("config.ssl.enable", Conf) of + true -> ok; + false -> false + end; + #{scheme := http} -> + ok + end. + +check_headers(Conf) + when Conf =:= #{} -> + true; +check_headers(Conf) -> + Method = to_bin(hocon_schema:get_value("config.method", Conf)), + Headers = hocon_schema:get_value("config.headers", Conf), + Method =:= <<"post">> orelse (not maps:is_key(<<"content-type">>, Headers)). + union_array(Item) when is_list(Item) -> hoconsc:array(hoconsc:union(Item)). @@ -229,15 +243,22 @@ connector_fields(DB, Fields) -> catch error:badarg -> list_to_atom(Mod0); - Error -> - erlang:error(Error) + error:Reason -> + erlang:error(Reason) end, [ {type, #{type => DB}} , {enable, #{type => boolean(), default => true}} - ] ++ Mod:fields(Fields). + ] ++ erlang:apply(Mod, fields, [Fields]). to_list(A) when is_atom(A) -> atom_to_list(A); to_list(B) when is_binary(B) -> binary_to_list(B). + +to_bin(A) when is_atom(A) -> + atom_to_binary(A); +to_bin(B) when is_binary(B) -> + B; +to_bin(L) when is_list(L) -> + list_to_binary(L). diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index 130e266fb..d965affee 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -36,7 +36,8 @@ init_per_suite(Config) -> meck:expect(emqx_resource, remove, fun(_) -> ok end ), ok = emqx_common_test_helpers:start_apps( - [emqx_conf, emqx_authz], fun set_special_configs/1), + [emqx_connector, emqx_conf, emqx_authz], + fun set_special_configs/1), Config. end_per_suite(_Config) -> diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index c121bb4c6..6a1b15e57 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -269,7 +269,7 @@ srv_record(_) -> undefined. parse_servers(Type, Servers) when is_binary(Servers) -> parse_servers(Type, binary_to_list(Servers)); parse_servers(Type, Servers) when is_list(Servers) -> - case string:split(Servers, ",", trailing) of + case string:split(Servers, ",", all) of [Host | _] when Type =:= single -> [Host]; Hosts ->