diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl new file mode 100644 index 000000000..71929b944 --- /dev/null +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -0,0 +1,299 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authn_http). + +-include("emqx_authn.hrl"). +-include_lib("typerefl/include/types.hrl"). + +-behaviour(hocon_schema). + +-export([ structs/0 + , fields/1 + , validations/0 + ]). + +-type accept() :: 'application/json' | 'application/x-www-form-urlencoded'. +-type content_type() :: accept(). + +-reflect_type([ accept/0 + , content_type/0 + ]). + +-export([ create/3 + , update/4 + , authenticate/2 + , destroy/1 + ]). + +%%------------------------------------------------------------------------------ +%% Hocon Schema +%%------------------------------------------------------------------------------ + +structs() -> [""]. + +fields("") -> + [ {config, #{type => hoconsc:union( + [ hoconsc:ref(?MODULE, get) + , hoconsc:ref(?MODULE, post) + ])}} + ]; + +fields(get) -> + [ {method, #{type => get, + default => get}} + ] ++ common_fields(); + +fields(post) -> + [ {method, #{type => post, + default => get}} + , {content_type, fun content_type/1} + ] ++ common_fields(). + +common_fields() -> + [ {url, fun url/1} + , {accept, fun accept/1} + , {headers, fun headers/1} + , {form_data, fun form_data/1} + , {request_timeout, fun request_timeout/1} + ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)). + +validations() -> + [ {check_ssl_opts, fun check_ssl_opts/1} ]. + +url(type) -> binary(); +url(nullable) -> false; +url(validate) -> [fun check_url/1]; +url(_) -> undefined. + +accept(type) -> accept(); +accept(default) -> 'application/json'; +accept(_) -> undefined. + +content_type(type) -> content_type(); +content_type(default) -> 'application/json'; +content_type(_) -> undefined. + +headers(type) -> list(); +headers(default) -> []; +headers(_) -> undefined. + +form_data(type) -> binary(); +form_data(nullable) -> false; +form_data(validate) -> [fun check_form_data/1]; +form_data(_) -> undefined. + +request_timeout(type) -> non_neg_integer(); +request_timeout(default) -> 5000; +request_timeout(_) -> undefined. + +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + +create(ChainID, AuthenticatorName, + #{method := Method, + url := URL, + accept := Accept, + content_type := ContentType, + headers := Headers, + form_data := FormData, + request_timeout := RequestTimeout} = Config) -> + NHeaders = maps:merge(#{<<"accept">> => atom_to_binary(Accept, utf8), + <<"content-type">> => atom_to_binary(ContentType, utf8)}, Headers), + NFormData = preprocess_form_data(FormData), + #{path := Path, + query := Query} = URIMap = parse_url(URL), + BaseURL = generate_base_url(URIMap), + State = #{method => Method, + path => Path, + base_query => cow_qs:parse_qs(Query), + accept => Accept, + content_type => ContentType, + headers => NHeaders, + form_data => NFormData, + request_timeout => RequestTimeout}, + ResourceID = <>, + case emqx_resource:create_local(ResourceID, emqx_connector_http, Config#{base_url := BaseURL}) of + {ok, _} -> + {ok, State#{resource_id => ResourceID}}; + {error, already_created} -> + {ok, State#{resource_id => ResourceID}}; + {error, Reason} -> + {error, Reason} + end. + +update(_ChainID, _AuthenticatorName, Config, #{resource_id := ResourceID} = State) -> + case emqx_resource:update_local(ResourceID, emqx_connector_http, Config, []) of + {ok, _} -> {ok, State}; + {error, Reason} -> {error, Reason} + end. + +authenticate(ClientInfo, #{resource_id := ResourceID, + method := Method, + request_timeout := RequestTimeout} = State) -> + Request = generate_request(ClientInfo, State), + case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of + {ok, 204, _Headers} -> ok; + {ok, 200, Headers, Body} -> + ContentType = proplists:get_value(<<"content-type">>, Headers, <<"application/json">>), + case safely_parse_body(ContentType, Body) of + {ok, _NBody} -> + %% TODO: Return by user property + ok; + {error, Reason} -> + {stop, Reason} + end; + {error, _Reason} -> + ignore + end. + +destroy(#{resource_id := ResourceID}) -> + _ = emqx_resource:remove_local(ResourceID), + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +check_url(URL) -> + case emqx_http_lib:uri_parse(URL) of + {ok, _} -> true; + {error, _} -> false + end. + +check_form_data(FormData) -> + KVs = binary:split(FormData, [<<"&">>], [global]), + case false =:= lists:any(fun(T) -> T =:= <<>> end, KVs) of + true -> + NKVs = [list_to_tuple(binary:split(KV, [<<"=">>], [global])) || KV <- KVs], + false =:= + lists:any(fun({K, V}) -> + K =:= <<>> orelse V =:= <<>>; + (_) -> + true + end, NKVs); + false -> + false + end. + +check_ssl_opts(Conf) -> + URL = hocon_schema:get_value("url", Conf), + {ok, #{scheme := Scheme}} = emqx_http_lib:uri_parse(URL), + SSLOpts = hocon_schema:get_value("ssl_opts", Conf), + case {Scheme, SSLOpts} of + {http, undefined} -> true; + {http, _} -> false; + {https, undefined} -> false; + {https, _} -> true + end. + +preprocess_form_data(FormData) -> + KVs = binary:split(FormData, [<<"&">>], [global]), + [list_to_tuple(binary:split(KV, [<<"=">>], [global])) || KV <- KVs]. + +parse_url(URL) -> + {ok, URIMap} = emqx_http_lib:uri_parse(URL), + case maps:get(query, URIMap, undefined) of + undefined -> + URIMap#{query => ""}; + _ -> + URIMap + end. + +generate_base_url(#{scheme := Scheme, + host := Host, + port := Port}) -> + iolist_to_binary(io_lib:format("~p://~s:~p", [Scheme, Host, Port])). + +generate_request(ClientInfo, #{method := Method, + path := Path, + base_query := BaseQuery, + content_type := ContentType, + headers := Headers, + form_data := FormData0}) -> + FormData = replace_placeholders(FormData0, ClientInfo), + case Method of + get -> + NPath = append_query(Path, BaseQuery ++ FormData), + {NPath, Headers}; + post -> + NPath = append_query(Path, BaseQuery), + Body = serialize_body(ContentType, FormData), + {NPath, Headers, Body} + end. + +replace_placeholders(FormData0, ClientInfo) -> + FormData = lists:map(fun({K, V0}) -> + case replace_placeholder(V0, ClientInfo) of + undefined -> {K, undefined}; + V -> {K, bin(V)} + end + end, FormData0), + lists:filter(fun({_, V}) -> + V =/= undefined + end, FormData). + +replace_placeholder(<<"${mqtt-username}">>, ClientInfo) -> + maps:get(username, ClientInfo, undefined); +replace_placeholder(<<"${mqtt-clientid}">>, ClientInfo) -> + maps:get(clientid, ClientInfo, undefined); +replace_placeholder(<<"${ip-address}">>, ClientInfo) -> + maps:get(peerhost, ClientInfo, undefined); +replace_placeholder(<<"${cert-subject}">>, ClientInfo) -> + maps:get(dn, ClientInfo, undefined); +replace_placeholder(<<"${cert-common-name}">>, ClientInfo) -> + maps:get(cn, ClientInfo, undefined); +replace_placeholder(Constant, _) -> + Constant. + +append_query(Path, []) -> + Path; +append_query(Path, Query) -> + Path ++ "?" ++ binary_to_list(qs(Query)). + +qs(KVs) -> + qs(KVs, []). + +qs([], Acc) -> + <<$&, Qs/binary>> = iolist_to_binary(lists:reverse(Acc)), + Qs; +qs([{K, V} | More], Acc) -> + qs(More, [["&", emqx_http_lib:uri_encode(K), "=", emqx_http_lib:uri_encode(V)] | Acc]). + +serialize_body('application/json', FormData) -> + emqx_json:encode(FormData); +serialize_body('application/x-www-form-urlencoded', FormData) -> + qs(FormData). + +safely_parse_body(ContentType, Body) -> + try parse_body(ContentType, Body) of + Result -> Result + catch + _Class:_Reason -> + {error, invalid_body} + end. + +parse_body(<<"application/json">>, Body) -> + {ok, emqx_json:decode(Body)}; +parse_body(<<"application/x-www-form-urlencoded">>, Body) -> + {ok, cow_qs:parse_qs(Body)}; +parse_body(ContentType, _) -> + {error, {unsupported_content_type, ContentType}}. + +bin(A) when is_atom(A) -> atom_to_binary(A, utf8); +bin(L) when is_list(L) -> list_to_binary(L); +bin(X) -> X. \ No newline at end of file diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl index f737d5168..8fae45ff4 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl @@ -294,12 +294,16 @@ do_verify_claims(Claims, [{Name, Value} | More]) -> {error, {claims, {Name, Value0}}} end. -check_verify_claims([]) -> +check_verify_claims(Conf) -> + Claims = hocon_schema:get_value("verify_claims", Conf), + do_check_verify_claims(Claims). + +do_check_verify_claims([]) -> false; -check_verify_claims([{Name, Expected} | More]) -> +do_check_verify_claims([{Name, Expected} | More]) -> check_claim_name(Name) andalso check_claim_expected(Expected) andalso - check_verify_claims(More). + do_check_verify_claims(More). check_claim_name(exp) -> false; diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index 3b5384d9c..cc4445eaf 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -58,10 +58,11 @@ query_timeout(_) -> undefined. %% APIs %%------------------------------------------------------------------------------ -create(ChainID, ServiceName, #{query := Query0, - password_hash_algorithm := Algorithm} = Config) -> +create(ChainID, AuthenticatorName, + #{query := Query0, + password_hash_algorithm := Algorithm} = Config) -> {Query, PlaceHolders} = parse_query(Query0), - ResourceID = iolist_to_binary(io_lib:format("~s/~s",[ChainID, ServiceName])), + ResourceID = iolist_to_binary(io_lib:format("~s/~s",[ChainID, AuthenticatorName])), State = #{query => Query, placeholders => PlaceHolders, password_hash_algorithm => Algorithm}, @@ -74,7 +75,7 @@ create(ChainID, ServiceName, #{query := Query0, {error, Reason} end. -update(_ChainID, _ServiceName, Config, #{resource_id := ResourceID} = State) -> +update(_ChainID, _AuthenticatorName, Config, #{resource_id := ResourceID} = State) -> case emqx_resource:update_local(ResourceID, emqx_connector_mysql, Config, []) of {ok, _} -> {ok, State}; {error, Reason} -> {error, Reason} diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl new file mode 100644 index 000000000..807243038 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -0,0 +1,213 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_http). + +-include("emqx_connector.hrl"). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). + +%% callbacks of behaviour emqx_resource +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_health_check/2 + ]). + +-export([ structs/0 + , fields/1 + , validations/0]). + +-type connect_timeout() :: non_neg_integer() | infinity. +-type pool_type() :: random | hash. + +-reflect_type([ connect_timeout/0 + , pool_type/0 + ]). + +%%===================================================================== +%% Hocon schema +structs() -> [""]. + +fields("") -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]; + +fields(config) -> + [ {base_url, fun base_url/1} + , {connect_timeout, fun connect_timeout/1} + , {max_retries, fun max_retries/1} + , {retry_interval, fun retry_interval/1} + , {keepalive, fun keepalive/1} + , {pool_type, fun pool_type/1} + , {pool_size, fun pool_size/1} + , {ssl_opts, #{type => hoconsc:ref(?MODULE, ssl_opts), + nullable => true}} + ]; + +fields(ssl_opts) -> + [ {cacertfile, fun cacertfile/1} + , {keyfile, fun keyfile/1} + , {certfile, fun certfile/1} + , {verify, fun verify/1} + ]. + +validations() -> + [ {check_ssl_opts, fun check_ssl_opts/1} ]. + +base_url(type) -> binary(); +base_url(nullable) -> false; +base_url(validate) -> [fun check_base_url/1]; +base_url(_) -> undefined. + +connect_timeout(type) -> connect_timeout(); +connect_timeout(default) -> 5000; +connect_timeout(_) -> undefined. + +max_retries(type) -> non_neg_integer(); +max_retries(default) -> 5; +max_retries(_) -> undefined. + +retry_interval(type) -> non_neg_integer(); +retry_interval(default) -> 1000; +retry_interval(_) -> undefined. + +keepalive(type) -> non_neg_integer(); +keepalive(default) -> 5000; +keepalive(_) -> undefined. + +pool_type(type) -> pool_type(); +pool_type(default) -> random; +pool_type(_) -> undefined. + +pool_size(type) -> non_neg_integer(); +pool_size(default) -> 8; +pool_size(_) -> undefined. + +cacertfile(type) -> string(); +cacertfile(nullable) -> true; +cacertfile(_) -> undefined. + +keyfile(type) -> string(); +keyfile(nullable) -> true; +keyfile(_) -> undefined. + +certfile(type) -> string(); +certfile(nullable) -> false; +certfile(_) -> undefined. + +verify(type) -> boolean(); +verify(default) -> false; +verify(_) -> undefined. + +%% =================================================================== +on_start(InstId, #{url := URL, + connect_timeout := ConnectTimeout, + max_retries := MaxRetries, + retry_interval := RetryInterval, + keepalive := Keepalive, + pool_type := PoolType, + pool_size := PoolSize} = Config) -> + logger:info("starting http connector: ~p, config: ~p", [InstId, Config]), + {ok, #{scheme := Scheme, + host := Host, + port := Port, + path := BasePath}} = emqx_http_lib:uri_parse(URL), + {Transport, TransportOpts} = case Scheme of + http -> + {tcp, []}; + https -> + SSLOpts = emqx_plugin_libs_ssl:save_files_return_opts( + maps:get(ssl_opts, Config), "connectors", InstId), + {tls, SSLOpts} + end, + NTransportOpts = emqx_misc:ipv6_probe(TransportOpts), + PoolOpts = [ {host, Host} + , {port, Port} + , {connect_timeout, ConnectTimeout} + , {retry, MaxRetries} + , {retry_timeout, RetryInterval} + , {keepalive, Keepalive} + , {pool_type, PoolType} + , {pool_size, PoolSize} + , {transport, Transport} + , {transport, NTransportOpts}], + PoolName = emqx_plugin_libs_pool:pool_name(InstId), + {ok, _} = ehttpc_sup:start_pool(PoolName, PoolOpts), + {ok, #{pool_name => PoolName, + host => Host, + port => Port, + base_path => BasePath}}. + +on_stop(InstId, #{pool_name := PoolName}) -> + logger:info("stopping http connector: ~p", [InstId]), + ehttpc_sup:stop_pool(PoolName). + +on_query(InstId, {Method, Request}, AfterQuery, State) -> + on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State); +on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) -> + on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State); +on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery, #{pool_name := PoolName, + base_path := BasePath} = State) -> + logger:debug("http connector ~p received request: ~p, at state: ~p", [InstId, Request, State]), + NRequest = update_path(BasePath, Request), + case Result = ehttpc:request(case KeyOrNum of + undefined -> PoolName; + _ -> {PoolName, KeyOrNum} + end, Method, NRequest, Timeout) of + {error, Reason} -> + logger:debug("http connector ~p do reqeust failed, sql: ~p, reason: ~p", [InstId, NRequest, Reason]), + emqx_resource:query_failed(AfterQuery); + _ -> + emqx_resource:query_success(AfterQuery) + end, + Result. + +on_health_check(_InstId, #{server := {Host, Port}} = State) -> + case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), 3000) of + {ok, Sock} -> + gen_tcp:close(Sock), + {ok, State}; + {error, _Reason} -> + {error, test_query_failed, State} + end. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +check_base_url(URL) -> + case emqx_http_lib:uri_parse(URL) of + {error, _} -> false; + {ok, #{query := _}} -> false; + _ -> true + end. + +check_ssl_opts(Conf) -> + URL = hocon_schema:get_value("url", Conf), + {ok, #{scheme := Scheme}} = emqx_http_lib:uri_parse(URL), + SSLOpts = hocon_schema:get_value("ssl_opts", Conf), + case {Scheme, SSLOpts} of + {http, undefined} -> true; + {http, _} -> false; + {https, undefined} -> false; + {https, _} -> true + end. + +update_path(BasePath, {Path, Headers}) -> + {filename:join(BasePath, Path), Headers}; +update_path(BasePath, {Path, Headers, Body}) -> + {filename:join(BasePath, Path), Headers, Body}. \ No newline at end of file diff --git a/rebar.config b/rebar.config index abdc316b7..aa6b2dd4c 100644 --- a/rebar.config +++ b/rebar.config @@ -43,7 +43,7 @@ {deps, [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.6"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.7"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}