diff --git a/apps/emqx_auth_http/etc/emqx_auth_http.conf b/apps/emqx_auth_http/etc/emqx_auth_http.conf index 0df589169..3d5c45ea7 100644 --- a/apps/emqx_auth_http/etc/emqx_auth_http.conf +++ b/apps/emqx_auth_http/etc/emqx_auth_http.conf @@ -2,24 +2,29 @@ ## HTTP Auth/ACL Plugin ##-------------------------------------------------------------------- -##-------------------------------------------------------------------- -## Authentication request. - -## HTTP URL API path for authentication request +## HTTP URL API path for Auth Request ## ## Value: URL ## -## Examples: http://127.0.0.1:8991/mqtt/auth, https://[::1]:8991/mqtt/auth -auth.http.auth_req = http://127.0.0.1:8991/mqtt/auth +## Examples: http://127.0.0.1:80/mqtt/auth, https://[::1]:80/mqtt/auth +auth.http.auth_req.url = http://127.0.0.1:80/mqtt/auth +## HTTP Request Method for Auth Request +## ## Value: post | get auth.http.auth_req.method = post -## It only works when method=post -## Value: json | x-www-form-urlencoded -auth.http.auth_req.content_type = x-www-form-urlencoded +## HTTP Request Headers for Auth Request, Content-Type header is configured by default. +## The possible values of the Content-Type header: application/x-www-form-urlencoded, application/json +## +## Examples: auth.http.auth_req.headers.accept = */* +auth.http.auth_req.headers.content-type = application/x-www-form-urlencoded -## Variables: +## Parameters used to construct the request body or query string parameters +## When the request method is GET, these parameters will be converted into query string parameters +## When the request method is POST, the final format is determined by content-type +## +## Available Variables: ## - %u: username ## - %c: clientid ## - %a: ipaddress @@ -29,27 +34,32 @@ auth.http.auth_req.content_type = x-www-form-urlencoded ## - %C: common name of client TLS cert ## - %d: subject of client TLS cert ## -## Value: Params +## Value: =,=,... auth.http.auth_req.params = clientid=%c,username=%u,password=%P -##-------------------------------------------------------------------- -## Superuser request. - -## HTTP URL API path for Superuser request +## HTTP URL API path for SuperUser Request ## ## Value: URL ## -## Examples: http://127.0.0.1:8991/mqtt/superuser, https://[::1]:8991/mqtt/superuser -#auth.http.super_req = http://127.0.0.1:8991/mqtt/superuser +## Examples: http://127.0.0.1:80/mqtt/superuser, https://[::1]:80/mqtt/superuser +auth.http.super_req.url = http://127.0.0.1:80/mqtt/superuser +## HTTP Request Method for SuperUser Request +## ## Value: post | get -#auth.http.super_req.method = post +auth.http.super_req.method = post -## It only works when method=pos -## Value: json | x-www-form-urlencoded -#auth.http.super_req.content_type = x-www-form-urlencoded +## HTTP Request Headers for SuperUser Request, Content-Type header is configured by default. +## The possible values of the Content-Type header: application/x-www-form-urlencoded, application/json +## +## Examples: auth.http.super_req.headers.accept = */* +auth.http.super_req.headers.content-type = application/x-www-form-urlencoded -## Variables: +## Parameters used to construct the request body or query string parameters +## When the request method is GET, these parameters will be converted into query string parameters +## When the request method is POST, the final format is determined by content-type +## +## Available Variables: ## - %u: username ## - %c: clientid ## - %a: ipaddress @@ -59,42 +69,45 @@ auth.http.auth_req.params = clientid=%c,username=%u,password=%P ## - %C: common name of client TLS cert ## - %d: subject of client TLS cert ## -## Value: Params -#auth.http.super_req.params = clientid=%c,username=%u +## Value: =,=,... +auth.http.super_req.params = clientid=%c,username=%u -##-------------------------------------------------------------------- -## ACL request. - -## HTTP URL API path for ACL request +## HTTP URL API path for ACL Request ## ## Value: URL ## -## Examples: http://127.0.0.1:8991/mqtt/acl, https://[::1]:8991/mqtt/acl -auth.http.acl_req = http://127.0.0.1:8991/mqtt/acl +## Examples: http://127.0.0.1:80/mqtt/acl, https://[::1]:80/mqtt/acl +auth.http.acl_req.url = http://127.0.0.1:80/mqtt/acl +## HTTP Request Method for ACL Request +## ## Value: post | get -auth.http.acl_req.method = get +auth.http.acl_req.method = post -## It only works when method=post -## Value: json | x-www-form-urlencoded -auth.http.acl_req.content_type = x-www-form-urlencoded +## HTTP Request Headers for ACL Request, Content-Type header is configured by default. +## The possible values of the Content-Type header: application/x-www-form-urlencoded, application/json +## +## Examples: auth.http.acl_req.headers.accept = */* +auth.http.acl_req.headers.content-type = application/x-www-form-urlencoded -## Variables: -## - %A: 1 | 2, 1 = sub, 2 = pub +## Parameters used to construct the request body or query string parameters +## When the request method is GET, these parameters will be converted into query string parameters +## When the request method is POST, the final format is determined by content-type +## +## Available Variables: ## - %u: username ## - %c: clientid ## - %a: ipaddress ## - %r: protocol -## - %m: mountpoint -## - %t: topic +## - %P: password +## - %p: sockport of server accepted +## - %C: common name of client TLS cert +## - %d: subject of client TLS cert ## -## Value: Params +## Value: =,=,... auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t,mountpoint=%m -##------------------------------------------------------------------------------ -## Http Reqeust options - -## Time-out time for the http request, 0 is never timeout. +## Time-out time for the request. ## ## Value: Duration ## -h: hour, e.g. '2h' for 2 hours @@ -102,37 +115,23 @@ auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t, ## -s: second, e.g. '30s' for 30 seconds ## ## Default: 5s -## auth.http.request.timeout = 5s +auth.http.timeout = 5s -## Connection time-out time, used during the initial request -## when the client is connecting to the server +## Connection time-out time, used during the initial request, +## when the client is connecting to the server. ## ## Value: Duration +## -h: hour, e.g. '2h' for 2 hours +## -m: minute, e.g. '5m' for 5 minutes +## -s: second, e.g. '30s' for 30 seconds ## -## Default is same with the timeout option -## auth.http.request.connect_timeout = 0 +## Default: 5s +auth.http.connect_timeout = 5s -## Re-send http reuqest times +## Connection process pool size ## -## Value: integer -## -## Default: 3 -auth.http.request.retry_times = 5 - -## The interval for re-sending the http request -## -## Value: Duration -## -## Default: 1s -auth.http.request.retry_interval = 1s - -## The 'Exponential Backoff' mechanism for re-sending request. The actually -## re-send time interval is `interval * backoff ^ times` -## -## Value: float -## -## Default: 2.0 -auth.http.request.retry_backoff = 2.0 +## Value: Number +auth.http.pool_size = 32 ##------------------------------------------------------------------------------ ## SSL options @@ -152,11 +151,3 @@ auth.http.request.retry_backoff = 2.0 ## ## Value: File ## auth.http.ssl.keyfile = {{ platform_etc_dir }}/certs/client-key.pem - -##-------------------------------------------------------------------- -## HTTP Request Headers -## -## Example: auth.http.header.Accept-Encoding = * -## -## Value: String -## auth.http.header.Accept = */* diff --git a/apps/emqx_auth_http/include/emqx_auth_http.hrl b/apps/emqx_auth_http/include/emqx_auth_http.hrl index 2bbe12827..9c1216357 100644 --- a/apps/emqx_auth_http/include/emqx_auth_http.hrl +++ b/apps/emqx_auth_http/include/emqx_auth_http.hrl @@ -1,8 +1,6 @@ -define(APP, emqx_auth_http). --record(http_request, {method = post, path, headers, params, request_timeout}). - -record(auth_metrics, { success = 'client.auth.success', failure = 'client.auth.failure', diff --git a/apps/emqx_auth_http/priv/emqx_auth_http.schema b/apps/emqx_auth_http/priv/emqx_auth_http.schema index 4f4289db0..afd71cfd9 100644 --- a/apps/emqx_auth_http/priv/emqx_auth_http.schema +++ b/apps/emqx_auth_http/priv/emqx_auth_http.schema @@ -1,6 +1,6 @@ %%-*- mode: erlang -*- %% emqx_auth_http config mapping -{mapping, "auth.http.auth_req", "emqx_auth_http.auth_req", [ +{mapping, "auth.http.auth_req.url", "emqx_auth_http.auth_req", [ {datatype, string} ]}. @@ -9,9 +9,8 @@ {datatype, {enum, [post, get]}} ]}. -{mapping, "auth.http.auth_req.content_type", "emqx_auth_http.auth_req", [ - {default, 'x-www-form-urlencoded'}, - {datatype, {enum, ['json', 'x-www-form-urlencoded']}} +{mapping, "auth.http.auth_req.headers.$field", "emqx_auth_http.auth_req", [ + {datatype, string} ]}. {mapping, "auth.http.auth_req.params", "emqx_auth_http.auth_req", [ @@ -19,18 +18,19 @@ ]}. {translation, "emqx_auth_http.auth_req", fun(Conf) -> - case cuttlefish:conf_get("auth.http.auth_req", Conf) of + case cuttlefish:conf_get("auth.http.auth_req.url", Conf, undefined) of undefined -> cuttlefish:unset(); Url -> + Headers = cuttlefish_variable:filter_by_prefix("auth.http.auth_req.headers", Conf), Params = cuttlefish:conf_get("auth.http.auth_req.params", Conf), [{url, Url}, - {method, cuttlefish:conf_get("auth.http.auth_req.method", Conf)}, - {content_type, list_to_binary("application/" ++ atom_to_list(cuttlefish:conf_get("auth.http.auth_req.content_type", Conf)))}, - {params, [list_to_tuple(string:tokens(S, "=")) || S <- string:tokens(Params, ",")]}] + {method, cuttlefish:conf_get("auth.http.auth_req.method", Conf)}, + {headers, [{K, V} || {[_, _, _, _, K], V} <- Headers]}, + {params, [list_to_tuple(string:tokens(S, "=")) || S <- string:tokens(Params, ",")]}] end end}. -{mapping, "auth.http.super_req", "emqx_auth_http.super_req", [ +{mapping, "auth.http.super_req.url", "emqx_auth_http.super_req", [ {datatype, string} ]}. @@ -39,9 +39,8 @@ end}. {datatype, {enum, [post, get]}} ]}. -{mapping, "auth.http.super_req.content_type", "emqx_auth_http.super_req", [ - {default, 'x-www-form-urlencoded'}, - {datatype, {enum, ['json', 'x-www-form-urlencoded']}} +{mapping, "auth.http.super_req.headers.$field", "emqx_auth_http.super_req", [ + {datatype, string} ]}. {mapping, "auth.http.super_req.params", "emqx_auth_http.super_req", [ @@ -49,17 +48,19 @@ end}. ]}. {translation, "emqx_auth_http.super_req", fun(Conf) -> - case cuttlefish:conf_get("auth.http.super_req", Conf, undefined) of + case cuttlefish:conf_get("auth.http.super_req.url", Conf, undefined) of undefined -> cuttlefish:unset(); - Url -> Params = cuttlefish:conf_get("auth.http.super_req.params", Conf), - [{url, Url}, {method, cuttlefish:conf_get("auth.http.super_req.method", Conf)}, - {content_type, list_to_binary("application/" ++ atom_to_list(cuttlefish:conf_get("auth.http.super_req.content_type", Conf)))}, - {params, [list_to_tuple(string:tokens(S, "=")) || S <- string:tokens(Params, ",")]}] + Url -> + Headers = cuttlefish_variable:filter_by_prefix("auth.http.super_req.headers", Conf), + Params = cuttlefish:conf_get("auth.http.super_req.params", Conf), + [{url, Url}, + {method, cuttlefish:conf_get("auth.http.super_req.method", Conf)}, + {headers, [{K, V} || {[_, _, _, _, K], V} <- Headers]}, + {params, [list_to_tuple(string:tokens(S, "=")) || S <- string:tokens(Params, ",")]}] end end}. -{mapping, "auth.http.acl_req", "emqx_auth_http.acl_req", [ - {default, undefined}, +{mapping, "auth.http.acl_req.url", "emqx_auth_http.acl_req", [ {datatype, string} ]}. @@ -68,9 +69,8 @@ end}. {datatype, {enum, [post, get]}} ]}. -{mapping, "auth.http.acl_req.content_type", "emqx_auth_http.acl_req", [ - {default, 'x-www-form-urlencoded'}, - {datatype, {enum, ['json', 'x-www-form-urlencoded']}} +{mapping, "auth.http.acl_req.headers.$field", "emqx_auth_http.acl_req", [ + {datatype, string} ]}. {mapping, "auth.http.acl_req.params", "emqx_auth_http.acl_req", [ @@ -78,92 +78,41 @@ end}. ]}. {translation, "emqx_auth_http.acl_req", fun(Conf) -> - case cuttlefish:conf_get("auth.http.acl_req", Conf, undefined) of + case cuttlefish:conf_get("auth.http.acl_req.url", Conf, undefined) of undefined -> cuttlefish:unset(); - Url -> Params = cuttlefish:conf_get("auth.http.acl_req.params", Conf), - [{url, Url}, - {method, cuttlefish:conf_get("auth.http.acl_req.method", Conf)}, - {content_type, list_to_binary("application/" ++ atom_to_list(cuttlefish:conf_get("auth.http.acl_req.content_type", Conf)))}, - {params, [list_to_tuple(string:tokens(S, "=")) || S <- string:tokens(Params, ",")]}] + Url -> + Headers = cuttlefish_variable:filter_by_prefix("auth.http.acl_req.headers", Conf), + Params = cuttlefish:conf_get("auth.http.acl_req.params", Conf), + [{url, Url}, + {method, cuttlefish:conf_get("auth.http.acl_req.method", Conf)}, + {headers, [{K, V} || {[_, _, _, _, K], V} <- Headers]}, + {params, [list_to_tuple(string:tokens(S, "=")) || S <- string:tokens(Params, ",")]}] end end}. -{mapping, "auth.http.request.timeout", "emqx_auth_http.request_timeout", [ +{mapping, "auth.http.timeout", "emqx_auth_http.timeout", [ {default, "5s"}, {datatype, [integer, {duration, ms}]} ]}. -{mapping, "auth.http.pool_size", "emqx_auth_http.pool_opts", [ +{mapping, "auth.http.connect_timeout", "emqx_auth_http.connect_timeout", [ + {default, "5s"}, + {datatype, [integer, {duration, ms}]} +]}. + +{mapping, "auth.http.pool_size", "emqx_auth_http.pool_size", [ {default, 8}, {datatype, integer} ]}. -{mapping, "auth.http.request.connect_timeout", "emqx_auth_http.pool_opts", [ - {default, "5s"}, - {datatype, [integer, {duration, ms}]} -]}. - -{mapping, "auth.http.ssl.cacertfile", "emqx_auth_http.pool_opts", [ +{mapping, "auth.http.ssl.cacertfile", "emqx_auth_http.cacertfile", [ {datatype, string} ]}. -{mapping, "auth.http.ssl.certfile", "emqx_auth_http.pool_opts", [ +{mapping, "auth.http.ssl.certfile", "emqx_auth_http.certfile", [ {datatype, string} ]}. -{mapping, "auth.http.ssl.keyfile", "emqx_auth_http.pool_opts", [ +{mapping, "auth.http.ssl.keyfile", "emqx_auth_http.keyfile", [ {datatype, string} ]}. - -{mapping, "auth.http.request.retry_times", "emqx_auth_http.pool_opts", [ - {default, 5}, - {datatype, integer} -]}. - -{mapping, "auth.http.request.retry_interval", "emqx_auth_http.pool_opts", [ - {default, "1s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "auth.http.request.retry_backoff", "emqx_auth_http.pool_opts", [ - {default, 2.0}, - {datatype, float} -]}. - -{translation, "emqx_auth_http.pool_opts", fun(Conf) -> - Filter = fun(L) -> [{K, V} || {K, V} <- L, V =/= undefined] end, - InfinityFun = fun(0) -> infinity; - (Duration) -> Duration - end, - SslOpts = Filter([{cacertfile, cuttlefish:conf_get("auth.http.ssl.cacertfile", Conf, undefined)}, - {certfile, cuttlefish:conf_get("auth.http.ssl.certfile", Conf, undefined)}, - {keyfile, cuttlefish:conf_get("auth.http.ssl.keyfile", Conf, undefined)}]), - Opts = [{pool_size, cuttlefish:conf_get("auth.http.pool_size", Conf)}, - {connect_timeout, InfinityFun(cuttlefish:conf_get("auth.http.request.connect_timeout", Conf))}, - {retry, cuttlefish:conf_get("auth.http.request.retry_times", Conf)}, - {retry_timeout, cuttlefish:conf_get("auth.http.request.retry_interval", Conf)}], - case SslOpts of - [] -> Filter(Opts); - _ -> - TlsVers = ['tlsv1.2','tlsv1.1',tlsv1], - DefaultOpts = [{versions, TlsVers}, - {ciphers, lists:foldl( - fun(TlsVer, Ciphers) -> - Ciphers ++ ssl:cipher_suites(all, TlsVer) - end, [], TlsVers)}], - Filter([{ssl, DefaultOpts ++ SslOpts} | Opts]) - end -end}. - - -{mapping, "auth.http.header.$field", "emqx_auth_http.headers", [ - {datatype, string} -]}. - -{translation, "emqx_auth_http.headers", fun(Conf) -> - lists:map( - fun({["auth", "http", "header", Field], Value}) -> - {Field, Value} - end, - cuttlefish_variable:filter_by_prefix("auth.http.header", Conf)) -end}. \ No newline at end of file diff --git a/apps/emqx_auth_http/rebar.config b/apps/emqx_auth_http/rebar.config index 0f2e48298..1e9f04a08 100644 --- a/apps/emqx_auth_http/rebar.config +++ b/apps/emqx_auth_http/rebar.config @@ -1,6 +1,5 @@ {deps, - [{gun, {git, "https://github.com/emqx/gun", {tag, "1.3.4"}}}, - {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} + [{ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.1"}}} ]}. {edoc_opts, [{preprocess, true}]}. diff --git a/apps/emqx_auth_http/src/emqx_acl_http.erl b/apps/emqx_auth_http/src/emqx_acl_http.erl index a6f60b465..23f6ff62b 100644 --- a/apps/emqx_auth_http/src/emqx_acl_http.erl +++ b/apps/emqx_auth_http/src/emqx_acl_http.erl @@ -42,22 +42,20 @@ register_metrics() -> %% ACL callbacks %%-------------------------------------------------------------------- -check_acl(ClientInfo, PubSub, Topic, AclResult, State) -> +check_acl(ClientInfo, PubSub, Topic, AclResult, Params) -> return_with(fun inc_metrics/1, - do_check_acl(ClientInfo, PubSub, Topic, AclResult, State)). + do_check_acl(ClientInfo, PubSub, Topic, AclResult, Params)). -do_check_acl(#{username := <<$$, _/binary>>}, _PubSub, _Topic, _AclResult, _Config) -> +do_check_acl(#{username := <<$$, _/binary>>}, _PubSub, _Topic, _AclResult, _Params) -> ok; -do_check_acl(ClientInfo, PubSub, Topic, _AclResult, #{acl_req := AclReq, - pool_name := PoolName}) -> +do_check_acl(ClientInfo, PubSub, Topic, _AclResult, #{acl := ACLParams = #{path := Path}}) -> ClientInfo1 = ClientInfo#{access => access(PubSub), topic => Topic}, - case check_acl_request(PoolName, AclReq, ClientInfo1) of + case check_acl_request(ACLParams, ClientInfo1) of {ok, 200, <<"ignore">>} -> ok; {ok, 200, _Body} -> {stop, allow}; {ok, _Code, _Body} -> {stop, deny}; {error, Error} -> - ?LOG(error, "Request ACL path ~s, error: ~p", - [AclReq#http_request.path, Error]), + ?LOG(error, "Request ACL path ~s, error: ~p", [Path, Error]), ok end. @@ -77,12 +75,13 @@ inc_metrics({stop, deny}) -> return_with(Fun, Result) -> Fun(Result), Result. -check_acl_request(PoolName, #http_request{path = Path, - method = Method, - headers = Headers, - params = Params, - request_timeout = RequestTimeout}, ClientInfo) -> - request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), RequestTimeout). +check_acl_request(#{pool_name := PoolName, + path := Path, + method := Method, + headers := Headers, + params := Params, + timeout := Timeout}, ClientInfo) -> + request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout). access(subscribe) -> 1; access(publish) -> 2. diff --git a/apps/emqx_auth_http/src/emqx_auth_http.app.src b/apps/emqx_auth_http/src/emqx_auth_http.app.src index 207b24cf4..b2c3221e6 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.app.src +++ b/apps/emqx_auth_http/src/emqx_auth_http.app.src @@ -3,7 +3,7 @@ {vsn, "4.3.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_auth_http_sup]}, - {applications, [kernel,stdlib,gproc,gun]}, + {applications, [kernel,stdlib,ehttpc]}, {mod, {emqx_auth_http_app, []}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx_auth_http/src/emqx_auth_http.erl b/apps/emqx_auth_http/src/emqx_auth_http.erl index 21fe2d325..e82a37625 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http.erl @@ -29,10 +29,6 @@ , feedvar/2 ]). --type http_request() :: #http_request{method::'get' | 'post',params::[any()]}. -%-type http_opts() :: #{clientid:=_, peerhost:=_, protocol:=_, _=>_}. -%-type retry_opts() :: #{backoff:=_, interval:=_, times:=_, _=>_}. - %% Callbacks -export([ register_metrics/0 , check/3 @@ -43,28 +39,26 @@ register_metrics() -> lists:foreach(fun emqx_metrics:ensure/1, ?AUTH_METRICS). -check(ClientInfo, AuthResult, #{auth_req := AuthReq, - super_req := SuperReq, - pool_name := PoolName}) -> - case authenticate(PoolName, AuthReq, ClientInfo) of +check(ClientInfo, AuthResult, #{auth := AuthParms = #{path := Path}, + super := SuperParams}) -> + case authenticate(AuthParms, ClientInfo) of {ok, 200, <<"ignore">>} -> emqx_metrics:inc(?AUTH_METRICS(ignore)), ok; {ok, 200, Body} -> emqx_metrics:inc(?AUTH_METRICS(success)), - IsSuperuser = is_superuser(PoolName, SuperReq, ClientInfo), + IsSuperuser = is_superuser(SuperParams, ClientInfo), {stop, AuthResult#{is_superuser => IsSuperuser, auth_result => success, anonymous => false, mountpoint => mountpoint(Body, ClientInfo)}}; {ok, Code, _Body} -> ?LOG(error, "Deny connection from path: ~s, response http code: ~p", - [AuthReq#http_request.path, Code]), + [Path, Code]), emqx_metrics:inc(?AUTH_METRICS(failure)), {stop, AuthResult#{auth_result => http_to_connack_error(Code), anonymous => false}}; {error, Error} -> - ?LOG(error, "Request auth path: ~s, error: ~p", - [AuthReq#http_request.path, Error]), + ?LOG(error, "Request auth path: ~s, error: ~p", [Path, Error]), emqx_metrics:inc(?AUTH_METRICS(failure)), %%FIXME later: server_unavailable is not right. {stop, AuthResult#{auth_result => server_unavailable, @@ -77,22 +71,24 @@ description() -> "Authentication by HTTP API". %% Requests %%-------------------------------------------------------------------- -authenticate(PoolName, #http_request{path = Path, - method = Method, - headers = Headers, - params = Params, - request_timeout = RequestTimeout}, ClientInfo) -> - request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), RequestTimeout). +authenticate(#{pool_name := PoolName, + path := Path, + method := Method, + headers := Headers, + params := Params, + timeout := Timeout}, ClientInfo) -> + request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout). --spec(is_superuser(atom(), maybe(http_request()), emqx_types:client()) -> boolean()). -is_superuser(_PoolName, undefined, _ClientInfo) -> +-spec(is_superuser(maybe(map()), emqx_types:client()) -> boolean()). +is_superuser(undefined, _ClientInfo) -> false; -is_superuser(PoolName, #http_request{path = Path, - method = Method, - headers = Headers, - params = Params, - request_timeout = RequestTimeout}, ClientInfo) -> - case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), RequestTimeout) of +is_superuser(#{pool_name := PoolName, + path := Path, + method := Method, + headers := Headers, + params := Params, + timeout := Timeout}, ClientInfo) -> + case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout) of {ok, 200, _Body} -> true; {ok, _Code, _Body} -> false; {error, Error} -> ?LOG(error, "Request superuser path ~s, error: ~p", [Path, Error]), diff --git a/apps/emqx_auth_http/src/emqx_auth_http_app.erl b/apps/emqx_auth_http/src/emqx_auth_http_app.erl index c251962ce..487201e8e 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_app.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_app.erl @@ -25,139 +25,153 @@ -export([ start/2 , stop/1 ]). --export([init/1]). %%-------------------------------------------------------------------- %% Application Callbacks %%-------------------------------------------------------------------- start(_StartType, _StartArgs) -> - case translate_env() of - ok -> - {ok, PoolOpts} = application:get_env(?APP, pool_opts), - {ok, Sup} = emqx_http_client_sup:start_link(?APP, ssl(inet(PoolOpts))), - _ = with_env(auth_req, fun load_auth_hook/1), - _ = with_env(acl_req, fun load_acl_hook/1), - {ok, Sup}; - {error, Reason} -> - {error, Reason} - end. - -load_auth_hook(AuthReq) -> - ok = emqx_auth_http:register_metrics(), - SuperReq = r(application:get_env(?APP, super_req, undefined)), - Params = #{auth_req => AuthReq, - super_req => SuperReq, - pool_name => ?APP}, - emqx:hook('client.authenticate', {emqx_auth_http, check, [Params]}). - -load_acl_hook(AclReq) -> - ok = emqx_acl_http:register_metrics(), - Params = #{acl_req => AclReq, - pool_name => ?APP}, - emqx:hook('client.check_acl', {emqx_acl_http, check_acl, [Params]}). + {ok, Sup} = emqx_auth_http_sup:start_link(), + translate_env(), + load_hooks(), + {ok, Sup}. stop(_State) -> - emqx:unhook('client.authenticate', {emqx_auth_http, check}), - emqx:unhook('client.check_acl', {emqx_acl_http, check_acl}), - emqx_http_client_sup:stop_pool(?APP). - -%%-------------------------------------------------------------------- -%% Dummy supervisor -%%-------------------------------------------------------------------- - -init([]) -> - {ok, { {one_for_all, 10, 100}, []} }. + unload_hooks(). %%-------------------------------------------------------------------- %% Internel functions %%-------------------------------------------------------------------- -with_env(Par, Fun) -> - case application:get_env(?APP, Par) of - undefined -> ok; - {ok, Req} -> Fun(r(Req)) - end. - -r(undefined) -> - undefined; -r(Config) -> - Headers = application:get_env(?APP, headers, []), - Method = proplists:get_value(method, Config, post), - Path = proplists:get_value(path, Config), - NewHeaders = [{<<"content-type">>, proplists:get_value(content_type, Config, <<"application/x-www-form-urlencoded">>)} | Headers], - Params = proplists:get_value(params, Config), - {ok, RequestTimeout} = application:get_env(?APP, request_timeout), - #http_request{method = Method, path = Path, headers = NewHeaders, params = Params, request_timeout = RequestTimeout}. - -inet(PoolOpts) -> - case proplists:get_value(host, PoolOpts) of - Host when tuple_size(Host) =:= 8 -> - TransOpts = proplists:get_value(transport_opts, PoolOpts, []), - NewPoolOpts = proplists:delete(transport_opts, PoolOpts), - [{transport_opts, [inet6 | TransOpts]} | NewPoolOpts]; - _ -> - PoolOpts - end. - -ssl(PoolOpts) -> - case proplists:get_value(ssl, PoolOpts, []) of - [] -> - PoolOpts; - SSLOpts -> - TransOpts = proplists:get_value(transport_opts, PoolOpts, []), - NewPoolOpts = proplists:delete(transport_opts, PoolOpts), - [{transport_opts, SSLOpts ++ TransOpts}, {transport, ssl} | NewPoolOpts] - end. - translate_env() -> - URLs = lists:foldl(fun(Name, Acc) -> - case application:get_env(?APP, Name, []) of - [] -> Acc; - Env -> - URL = proplists:get_value(url, Env), - #{host := Host0, - port := Port, - path := Path} = uri_string:parse(URL), - Host = get_addr(Host0), - [{Name, {Host, Port, path(Path)}} | Acc] - end - end, [], [acl_req, auth_req, super_req]), - case same_host_and_port(URLs) of - true -> - [begin - {ok, Req} = application:get_env(?APP, Name), - application:set_env(?APP, Name, [{path, Path} | Req]) - end || {Name, {_, _, Path}} <- URLs], - {_, {Host, Port, _}} = lists:last(URLs), - PoolOpts = application:get_env(?APP, pool_opts, []), - application:set_env(?APP, pool_opts, [{host, Host}, {port, Port} | PoolOpts]), - ok; - false -> - {error, different_server} + lists:foreach(fun translate_env/1, [auth_req, super_req, acl_req]). + +translate_env(EnvName) -> + case application:get_env(?APP, EnvName) of + undefined -> ok; + {ok, Req} -> + {ok, PoolSize} = application:get_env(?APP, pool_size), + {ok, ConnectTimeout} = application:get_env(?APP, connect_timeout), + URL = proplists:get_value(url, Req), + #{host := Host0, + path := Path0, + scheme := Scheme} = URIMap = uri_string:parse(add_default_scheme(URL)), + Port = maps:get(port, URIMap, case Scheme of + "https" -> 443; + _ -> 80 + end), + Path = path(Path0), + Host = case inet:parse_address(Host0) of + {ok, {_,_,_,_} = Addr} -> Addr; + {ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr; + {error, einval} -> Host0 + end, + Inet = case Host of + {_,_,_,_} -> inet; + {_,_,_,_,_,_,_,_} -> inet6; + _ -> + case inet:getaddr(Host, inet6) of + {error, _} -> inet; + {ok, _} -> inet6 + end + end, + MoreOpts = case Scheme of + "http" -> + [{transport_opts, [Inet]}]; + "https" -> + CACertFile = application:get_env(?APP, cafile, undefined), + CertFile = application:get_env(?APP, certfile, undefined), + KeyFile = application:get_env(?APP, keyfile, undefined), + TLSOpts = lists:filter(fun({_K, V}) when V =:= <<>> -> + false; + (_) -> + true + end, [{keyfile, KeyFile}, {certfile, CertFile}, {cacertfile, CACertFile}]), + TlsVers = ['tlsv1.2','tlsv1.1',tlsv1], + NTLSOpts = [{versions, TlsVers}, + {ciphers, lists:foldl(fun(TlsVer, Ciphers) -> + Ciphers ++ ssl:cipher_suites(all, TlsVer) + end, [], TlsVers)} | TLSOpts], + [{transport, ssl}, {transport_opts, [Inet | NTLSOpts]}] + end, + PoolOpts = [{host, Host}, + {port, Port}, + {pool_size, PoolSize}, + {pool_type, random}, + {connect_timeout, ConnectTimeout}, + {retry, 5}, + {retry_timeout, 1000}] ++ MoreOpts, + Method = proplists:get_value(method, Req), + Headers = proplists:get_value(headers, Req), + NHeaders = ensure_content_type_header(Method, to_lower(Headers)), + NReq = lists:keydelete(headers, 1, Req), + {ok, Timeout} = application:get_env(?APP, timeout), + application:set_env(?APP, EnvName, [{path, Path}, + {headers, NHeaders}, + {timeout, Timeout}, + {pool_name, list_to_atom("emqx_auth_http/" ++ atom_to_list(EnvName))}, + {pool_opts, PoolOpts} | NReq]) end. -path("") -> "/"; -path(Path) -> Path. - -same_host_and_port([_]) -> - true; -same_host_and_port([{_, {Host, Port, _}}, {_, {Host, Port, _}}]) -> - true; -same_host_and_port([{_, {Host, Port, _}}, URL = {_, {Host, Port, _}} | Rest]) -> - same_host_and_port([URL | Rest]); -same_host_and_port(_) -> - false. - -get_addr(Hostname) -> - case inet:parse_address(Hostname) of - {ok, {_,_,_,_} = Addr} -> Addr; - {ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr; - {error, einval} -> - case inet:getaddr(Hostname, inet) of - {error, _} -> - {ok, Addr} = inet:getaddr(Hostname, inet6), - Addr; - {ok, Addr} -> Addr +load_hooks() -> + case application:get_env(?APP, auth_req) of + undefined -> ok; + {ok, AuthReq} -> + ok = emqx_auth_http:register_metrics(), + PoolOpts = proplists:get_value(pool_opts, AuthReq), + PoolName = proplists:get_value(pool_name, AuthReq), + ehttpc_sup:start_pool(PoolName, PoolOpts), + case application:get_env(?APP, super_req) of + undefined -> + emqx:hook('client.authenticate', {emqx_auth_http, check, [#{auth => maps:from_list(AuthReq), + super => undefined}]}); + {ok, SuperReq} -> + PoolOpts1 = proplists:get_value(pool_opts, SuperReq), + PoolName1 = proplists:get_value(pool_name, SuperReq), + ehttpc_sup:start_pool(PoolName1, PoolOpts1), + emqx:hook('client.authenticate', {emqx_auth_http, check, [#{auth => maps:from_list(AuthReq), + super => maps:from_list(SuperReq)}]}) end - end. + end, + case application:get_env(?APP, acl_req) of + undefined -> ok; + {ok, ACLReq} -> + ok = emqx_acl_http:register_metrics(), + PoolOpts2 = proplists:get_value(pool_opts, ACLReq), + PoolName2 = proplists:get_value(pool_name, ACLReq), + ehttpc_sup:start_pool(PoolName2, PoolOpts2), + emqx:hook('client.check_acl', {emqx_acl_http, check_acl, [#{acl => maps:from_list(ACLReq)}]}) + end, + ok. + +unload_hooks() -> + emqx:unhook('client.authenticate', {emqx_auth_http, check}), + emqx:unhook('client.check_acl', {emqx_acl_http, check_acl}), + ehttpc_sup:stop_pool('emqx_auth_http/auth_req'), + ehttpc_sup:stop_pool('emqx_auth_http/super_req'), + ehttpc_sup:stop_pool('emqx_auth_http/acl_req'), + ok. + +to_lower(Headers) -> + [{string:to_lower(K), V} || {K, V} <- Headers]. + +ensure_content_type_header(Method, Headers) + when Method =:= post orelse Method =:= put -> + Headers; +ensure_content_type_header(_Method, Headers) -> + lists:keydelete("content-type", 1, Headers). + +add_default_scheme(URL) when is_list(URL) -> + binary_to_list(add_default_scheme(list_to_binary(URL))); +add_default_scheme(<<"http://", _/binary>> = URL) -> + URL; +add_default_scheme(<<"https://", _/binary>> = URL) -> + URL; +add_default_scheme(URL) -> + <<"http://", URL/binary>>. + +path("") -> + "/"; +path(Path) -> + Path. + diff --git a/apps/emqx_auth_http/src/emqx_auth_http_cli.erl b/apps/emqx_auth_http/src/emqx_auth_http_cli.erl index c56e45d49..6ff8c445a 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_cli.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_cli.erl @@ -29,16 +29,16 @@ request(PoolName, get, Path, Headers, Params, Timeout) -> NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))), - reply(emqx_http_client:request(get, PoolName, {NewPath, Headers}, Timeout)); + reply(ehttpc:request(ehttpc_pool:pick_worker(PoolName), get, {NewPath, Headers}, Timeout)); request(PoolName, post, Path, Headers, Params, Timeout) -> - Body = case proplists:get_value(<<"content-type">>, Headers) of - <<"application/x-www-form-urlencoded">> -> + Body = case proplists:get_value("content-type", Headers) of + "application/x-www-form-urlencoded" -> cow_qs:qs(bin_kw(Params)); - <<"application/json">> -> + "application/json" -> emqx_json:encode(bin_kw(Params)) end, - reply(emqx_http_client:request(post, PoolName, {Path, Headers, Body}, Timeout)). + reply(ehttpc:request(ehttpc_pool:pick_worker(PoolName), post, {Path, Headers, Body}, Timeout)). reply({ok, StatusCode, _Headers}) -> {ok, StatusCode, <<>>}; @@ -80,6 +80,7 @@ feedvar(Params, ClientInfo = #{clientid := ClientId, ({Param, "%A"}) -> {Param, maps:get(access, ClientInfo, null)}; ({Param, "%t"}) -> {Param, maps:get(topic, ClientInfo, null)}; ({Param, "%m"}) -> {Param, maps:get(mountpoint, ClientInfo, null)}; + ({Param, "%k"}) -> {Param, emqx_json:encode(maps:get(ws_cookie, ClientInfo, null))}; ({Param, Var}) -> {Param, Var} end, Params). diff --git a/apps/emqx_auth_http/src/emqx_auth_http_sup.erl b/apps/emqx_auth_http/src/emqx_auth_http_sup.erl new file mode 100644 index 000000000..36b61a224 --- /dev/null +++ b/apps/emqx_auth_http/src/emqx_auth_http_sup.erl @@ -0,0 +1,29 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_auth_http_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. diff --git a/apps/emqx_auth_http/src/emqx_http_client.erl b/apps/emqx_auth_http/src/emqx_http_client.erl deleted file mode 100644 index 217ee4b58..000000000 --- a/apps/emqx_auth_http/src/emqx_http_client.erl +++ /dev/null @@ -1,256 +0,0 @@ --module(emqx_http_client). - --behaviour(gen_server). - --include_lib("emqx/include/logger.hrl"). - -%% APIs --export([ start_link/3 - , request/3 - , request/4 - ]). - -%% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). - --record(state, { - pool :: ecpool:poo_name(), - id :: pos_integer(), - client :: pid() | undefined, - mref :: reference() | undefined, - host :: inet:hostname() | inet:ip_address(), - port :: inet:port_number(), - gun_opts :: proplists:proplist(), - gun_state :: down | up, - requests :: map() - }). - -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- - -start_link(Pool, Id, Opts) -> - gen_server:start_link(?MODULE, [Pool, Id, Opts], []). - -request(Method, Pool, Req) -> - request(Method, Pool, Req, 5000). - -request(get, Pool, {Path, Headers}, Timeout) -> - call(pick(Pool), {get, {Path, Headers}, Timeout}, Timeout + 1000); -request(Method, Pool, {Path, Headers, Body}, Timeout) -> - call(pick(Pool), {Method, {Path, Headers, Body}, Timeout}, Timeout + 1000). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init([Pool, Id, Opts]) -> - State = #state{pool = Pool, - id = Id, - client = undefined, - mref = undefined, - host = proplists:get_value(host, Opts), - port = proplists:get_value(port, Opts), - gun_opts = gun_opts(Opts), - gun_state = down, - requests = #{}}, - true = gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, State}. - -handle_call(Req = {_, _, _}, From, State = #state{client = undefined, gun_state = down}) -> - case open(State) of - {ok, NewState} -> - handle_call(Req, From, NewState); - {error, Reason} -> - {reply, {error, Reason}, State} - end; - -handle_call(Req = {_, _, Timeout}, From, State = #state{client = Client, mref = MRef, gun_state = down}) when is_pid(Client) -> - case gun:await_up(Client, Timeout, MRef) of - {ok, _} -> - handle_call(Req, From, State#state{gun_state = up}); - {error, timeout} -> - {reply, {error, timeout}, State}; - {error, Reason} -> - true = erlang:demonitor(MRef, [flush]), - {reply, {error, Reason}, State#state{client = undefined, mref = undefined}} - end; - -handle_call({Method, Request, Timeout}, From, State = #state{client = Client, requests = Requests, gun_state = up}) when is_pid(Client) -> - StreamRef = do_request(Client, Method, Request), - ExpirationTime = erlang:system_time(millisecond) + Timeout, - {noreply, State#state{requests = maps:put(StreamRef, {From, ExpirationTime, undefined}, Requests)}}; - -handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), - {noreply, State}. - -handle_info({gun_response, Client, StreamRef, IsFin, StatusCode, Headers}, State = #state{client = Client, requests = Requests}) -> - Now = erlang:system_time(millisecond), - case maps:take(StreamRef, Requests) of - error -> - ?LOG(error, "Received 'gun_response' message from unknown stream ref: ~p", [StreamRef]), - {noreply, State}; - {{_, ExpirationTime, _}, NRequests} when Now > ExpirationTime -> - gun:cancel(Client, StreamRef), - flush_stream(Client, StreamRef), - {noreply, State#state{requests = NRequests}}; - {{From, ExpirationTime, undefined}, NRequests} -> - case IsFin of - fin -> - gen_server:reply(From, {ok, StatusCode, Headers}), - {noreply, State#state{requests = NRequests}}; - nofin -> - {noreply, State#state{requests = NRequests#{StreamRef => {From, ExpirationTime, {StatusCode, Headers, <<>>}}}}} - end; - _ -> - ?LOG(error, "Received 'gun_response' message does not match the state"), - {noreply, State} - end; - -handle_info({gun_data, Client, StreamRef, IsFin, Data}, State = #state{client = Client, requests = Requests}) -> - Now = erlang:system_time(millisecond), - case maps:take(StreamRef, Requests) of - error -> - ?LOG(error, "Received 'gun_data' message from unknown stream ref: ~p", [StreamRef]), - {noreply, State}; - {{_, ExpirationTime, _}, NRequests} when Now > ExpirationTime -> - gun:cancel(Client, StreamRef), - flush_stream(Client, StreamRef), - {noreply, State#state{requests = NRequests}}; - {{From, ExpirationTime, {StatusCode, Headers, Acc}}, NRequests} -> - case IsFin of - fin -> - gen_server:reply(From, {ok, StatusCode, Headers, <>}), - {noreply, State#state{requests = NRequests}}; - nofin -> - {noreply, State#state{requests = NRequests#{StreamRef => {From, ExpirationTime, {StatusCode, Headers, <>}}}}} - end; - _ -> - ?LOG(error, "Received 'gun_data' message does not match the state"), - {noreply, State} - end; - -handle_info({gun_error, Client, StreamRef, Reason}, State = #state{client = Client, requests = Requests}) -> - Now = erlang:system_time(millisecond), - case maps:take(StreamRef, Requests) of - error -> - ?LOG(error, "Received 'gun_error' message from unknown stream ref: ~p~n", [StreamRef]), - {noreply, State}; - {{_, ExpirationTime, _}, NRequests} when Now > ExpirationTime -> - {noreply, State#state{requests = NRequests}}; - {{From, _, _}, NRequests} -> - gen_server:reply(From, {error, Reason}), - {noreply, State#state{requests = NRequests}} - end; - -handle_info({gun_up, Client, _}, State = #state{client = Client}) -> - {noreply, State#state{gun_state = up}}; - -handle_info({gun_down, Client, _, Reason, KilledStreams, _}, State = #state{client = Client, requests = Requests}) -> - Now = erlang:system_time(millisecond), - NRequests = lists:foldl(fun(StreamRef, Acc) -> - case maps:take(StreamRef, Acc) of - error -> Acc; - {{_, ExpirationTime, _}, NAcc} when Now > ExpirationTime -> - NAcc; - {{From, _, _}, NAcc} -> - gen_server:reply(From, {error, Reason}), - NAcc - end - end, Requests, KilledStreams), - {noreply, State#state{gun_state = down, requests = NRequests}}; - -handle_info({'DOWN', MRef, process, Client, Reason}, State = #state{mref = MRef, client = Client, requests = Requests}) -> - true = erlang:demonitor(MRef, [flush]), - Now = erlang:system_time(millisecond), - lists:foreach(fun({_, {_, ExpirationTime, _}}) when Now > ExpirationTime -> - ok; - ({_, {From, _, _}}) -> - gen_server:reply(From, {error, Reason}) - end, maps:to_list(Requests)), - case open(State#state{requests = #{}}) of - {ok, NewState} -> - {noreply, NewState}; - {error, Reason} -> - {noreply, State#state{mref = undefined, client = undefined}} - end; - -handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, #state{pool = Pool, id = Id}) -> - gproc_pool:disconnect_worker(Pool, {Pool, Id}), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -open(State = #state{host = Host, port = Port, gun_opts = GunOpts}) -> - case gun:open(Host, Port, GunOpts) of - {ok, ConnPid} when is_pid(ConnPid) -> - MRef = monitor(process, ConnPid), - {ok, State#state{mref = MRef, client = ConnPid}}; - {error, Reason} -> - {error, Reason} - end. - -gun_opts(Opts) -> - gun_opts(Opts, #{retry => 5, - retry_timeout => 1000, - connect_timeout => 5000, - protocols => [http], - http_opts => #{keepalive => infinity}}). - -gun_opts([], Acc) -> - Acc; -gun_opts([{retry, Retry} | Opts], Acc) -> - gun_opts(Opts, Acc#{retry => Retry}); -gun_opts([{retry_timeout, RetryTimeout} | Opts], Acc) -> - gun_opts(Opts, Acc#{retry_timeout => RetryTimeout}); -gun_opts([{connect_timeout, ConnectTimeout} | Opts], Acc) -> - gun_opts(Opts, Acc#{connect_timeout => ConnectTimeout}); -gun_opts([{transport, Transport} | Opts], Acc) -> - gun_opts(Opts, Acc#{transport => Transport}); -gun_opts([{transport_opts, TransportOpts} | Opts], Acc) -> - gun_opts(Opts, Acc#{transport_opts => TransportOpts}); -gun_opts([_ | Opts], Acc) -> - gun_opts(Opts, Acc). - -call(ChannPid, Msg, Timeout) -> - gen_server:call(ChannPid, Msg, Timeout). - -pick(Pool) -> - gproc_pool:pick_worker(Pool). - -do_request(Client, get, {Path, Headers}) -> - gun:get(Client, Path, Headers); -do_request(Client, post, {Path, Headers, Body}) -> - gun:post(Client, Path, Headers, Body). - -flush_stream(Client, StreamRef) -> - receive - {gun_response, Client, StreamRef, _, _, _} -> - flush_stream(Client, StreamRef); - {gun_data, Client, StreamRef, _, _} -> - flush_stream(Client, StreamRef); - {gun_error, Client, StreamRef, _} -> - flush_stream(Client, StreamRef) - after 0 -> - ok - end. diff --git a/apps/emqx_auth_http/src/emqx_http_client_sup.erl b/apps/emqx_auth_http/src/emqx_http_client_sup.erl deleted file mode 100644 index 1beb1ba36..000000000 --- a/apps/emqx_auth_http/src/emqx_http_client_sup.erl +++ /dev/null @@ -1,48 +0,0 @@ --module(emqx_http_client_sup). - --behaviour(supervisor). - --export([ start_link/2 - , init/1 - , stop_pool/1 - ]). - -start_link(Pool, Opts) -> - supervisor:start_link(?MODULE, [Pool, Opts]). - -init([Pool, Opts]) -> - PoolSize = pool_size(Opts), - ok = ensure_pool(Pool, random, [{size, PoolSize}]), - {ok, {{one_for_one, 10, 100}, [ - begin - ensure_pool_worker(Pool, {Pool, I}, I), - #{id => {Pool, I}, - start => {emqx_http_client, start_link, [Pool, I, Opts]}, - restart => transient, - shutdown => 5000, - type => worker, - modules => [emqx_http_client]} - end || I <- lists:seq(1, PoolSize)]}}. - - -ensure_pool(Pool, Type, Opts) -> - try gproc_pool:new(Pool, Type, Opts) - catch - error:exists -> ok - end. - -ensure_pool_worker(Pool, Name, Slot) -> - try gproc_pool:add_worker(Pool, Name, Slot) - catch - error:exists -> ok - end. - -pool_size(Opts) -> - Schedulers = erlang:system_info(schedulers), - proplists:get_value(pool_size, Opts, Schedulers). - -stop_pool(Name) -> - Workers = gproc_pool:defined_workers(Name), - _ = [gproc_pool:remove_worker(Name, WokerName) || {WokerName, _, _} <- Workers], - gproc_pool:delete(Name), - ok. diff --git a/apps/emqx_auth_http/test/emqx_auth_http_SUITE.erl b/apps/emqx_auth_http/test/emqx_auth_http_SUITE.erl index f9acdc638..235f1b5d8 100644 --- a/apps/emqx_auth_http/test/emqx_auth_http_SUITE.erl +++ b/apps/emqx_auth_http/test/emqx_auth_http_SUITE.erl @@ -68,15 +68,15 @@ set_special_configs(emqx_auth_http, Schema, Inet) -> AuthReq = #{method => post, url => ServerAddr ++ "/mqtt/auth", - content_type => <<"application/json">>, + headers => [{"content-type", "application/json"}], params => [{"clientid", "%c"}, {"username", "%u"}, {"password", "%P"}]}, SuperReq = #{method => post, url => ServerAddr ++ "/mqtt/superuser", - content_type => <<"application/json">>, + headers => [{"content-type", "application/json"}], params => [{"clientid", "%c"}, {"username", "%u"}]}, AclReq = #{method => post, url => ServerAddr ++ "/mqtt/acl", - content_type => <<"application/json">>, + headers => [{"content-type", "application/json"}], params => [{"access", "%A"}, {"username", "%u"}, {"clientid", "%c"}, {"ipaddr", "%a"}, {"topic", "%t"}, {"mountpoint", "%m"}]}, Schema =:= https andalso set_https_client_opts(), @@ -87,9 +87,10 @@ set_special_configs(emqx_auth_http, Schema, Inet) -> %% @private set_https_client_opts() -> - TransportOpts = emqx_ct_helpers:client_ssl_twoway(), - {ok, PoolOpts} = application:get_env(emqx_auth_http, pool_opts), - application:set_env(emqx_auth_http, pool_opts, [{transport_opts, TransportOpts}, {transport, ssl} | PoolOpts]). + SSLOpt = emqx_ct_helpers:client_ssl_twoway(), + application:set_env(emqx_auth_http, cafile, proplists:get_value(cacertfile, SSLOpt, undefined)), + application:set_env(emqx_auth_http, certfile, proplists:get_value(certfile, SSLOpt, undefined)), + application:set_env(emqx_auth_http, keyfile, proplists:get_value(keyfile, SSLOpt, undefined)). %% @private http_server(http, inet) -> "http://127.0.0.1:8991"; @@ -171,3 +172,10 @@ t_comment_config(_) -> ?assertEqual(AuthCount - 1, length(emqx_hooks:lookup('client.authenticate'))), ?assertEqual(AclCount - 1, length(emqx_hooks:lookup('client.check_acl'))). +t_feedvar(_) -> + Params = [{"cookie", "%k"}], + User0 = ?USER(<<"client1">>, <<"testuser">>, mqtt, {127,0,0,1}, external), + ?assertEqual([{"cookie", <<"null">>}], emqx_auth_http_cli:feedvar(Params, User0)), + + User1 = User0#{ws_cookie => [{<<"k">>, <<"v">>}]}, + ?assertEqual([{"cookie", <<"{\"k\":\"v\"}">>}], emqx_auth_http_cli:feedvar(Params, User1)).