improve(http): replace httpc with gun, improve performance and fix httpc unresponsiveness (#3940)

This commit is contained in:
tigercl 2021-01-13 13:47:23 +08:00 committed by GitHub
parent 5427057c2c
commit 372687d79d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 521 additions and 243 deletions

View File

@ -58,7 +58,7 @@ ignore=title-trailing-punctuation, T1, T2, T3, T4, T5, T6, T8, B1, B2, B3, B4, B
# python-style regex that the commit-msg title must match
# Note that the regex can contradict with other rules if not used correctly
# (e.g. title-must-not-contain-word).
regex=^(feat|fix|docs|style|refactor|test|chore|perf)\(.+\): .+
regex=^(feat|feature|fix|docs|style|refactor|test|chore|perf|improve)\(.+\): .+
# [body-max-line-length]
# line-length=72

View File

@ -85,7 +85,7 @@ r(Config) ->
Headers = application:get_env(?APP, headers, []),
Method = proplists:get_value(method, Config, post),
Path = proplists:get_value(path, Config),
NewHeaders = [{<<"content_type">>, proplists:get_value(content_type, Config, <<"application/x-www-form-urlencoded">>)} | Headers],
NewHeaders = [{<<"content-type">>, proplists:get_value(content_type, Config, <<"application/x-www-form-urlencoded">>)} | Headers],
Params = proplists:get_value(params, Config),
{ok, RequestTimeout} = application:get_env(?APP, request_timeout),
#http_request{method = Method, path = Path, headers = NewHeaders, params = Params, request_timeout = RequestTimeout}.
@ -118,9 +118,9 @@ translate_env() ->
URL = proplists:get_value(url, Env),
#{host := Host0,
port := Port,
path := Path} = uri_string:parse(list_to_binary(URL)),
Host = get_addr(binary_to_list(Host0)),
[{Name, {Host, Port, binary_to_list(Path)}} | Acc]
path := Path} = uri_string:parse(URL),
Host = get_addr(Host0),
[{Name, {Host, Port, path(Path)}} | Acc]
end
end, [], [acl_req, auth_req, super_req]),
case same_host_and_port(URLs) of
@ -137,6 +137,9 @@ translate_env() ->
{error, different_server}
end.
path("") -> "/";
path(Path) -> Path.
same_host_and_port([_]) ->
true;
same_host_and_port([{_, {Host, Port, _}}, {_, {Host, Port, _}}]) ->

View File

@ -28,11 +28,11 @@
%%--------------------------------------------------------------------
request(PoolName, get, Path, Headers, Params, Timeout) ->
NewPath = Path ++ "?" ++ cow_qs:qs(bin_kw(Params)),
NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))),
reply(emqx_http_client:request(get, PoolName, {NewPath, Headers}, Timeout));
request(PoolName, post, Path, Headers, Params, Timeout) ->
Body = case proplists:get_value(<<"content_type">>, Headers) of
Body = case proplists:get_value(<<"content-type">>, Headers) of
<<"application/x-www-form-urlencoded">> ->
cow_qs:qs(bin_kw(Params));
<<"application/json">> ->

View File

@ -66,13 +66,13 @@ set_special_configs(emqx, _Schmea, _Inet) ->
set_special_configs(emqx_auth_http, Schema, Inet) ->
ServerAddr = http_server(Schema, Inet),
AuthReq = #{method => get,
AuthReq = #{method => post,
url => ServerAddr ++ "/mqtt/auth",
content_type => <<"application/x-www-form-urlencoded">>,
content_type => <<"application/json">>,
params => [{"clientid", "%c"}, {"username", "%u"}, {"password", "%P"}]},
SuperReq = #{method => post,
url => ServerAddr ++ "/mqtt/superuser",
content_type => <<"application/x-www-form-urlencoded">>,
content_type => <<"application/json">>,
params => [{"clientid", "%c"}, {"username", "%u"}]},
AclReq = #{method => post,
url => ServerAddr ++ "/mqtt/acl",

View File

@ -128,6 +128,7 @@
, export_acl_mnesia/0
, import_rules/1
, import_resources/1
, import_resources_and_rules/3
, import_blacklist/1
, import_applications/1
, import_users/1
@ -664,26 +665,31 @@ export_acl_mnesia() ->
end.
import_rules(Rules) ->
lists:foreach(fun(#{<<"id">> := RuleId,
lists:foreach(fun(Rule) ->
import_rule(Rule)
end, Rules).
import_resources(Reources) ->
lists:foreach(fun(Resource) ->
import_resource(Resource)
end, Reources).
import_rule(#{<<"id">> := RuleId,
<<"rawsql">> := RawSQL,
<<"actions">> := Actions,
<<"enabled">> := Enabled,
<<"description">> := Desc}) ->
Rule = #{
id => RuleId,
Rule = #{id => RuleId,
rawsql => RawSQL,
actions => map_to_actions(Actions),
enabled => Enabled,
description => Desc
},
description => Desc},
try emqx_rule_engine:create_rule(Rule)
catch throw:{resource_not_initialized, _ResId} ->
emqx_rule_engine:create_rule(Rule#{enabled => false})
end
end, Rules).
end.
import_resources(Reources) ->
lists:foreach(fun(#{<<"id">> := Id,
import_resource(#{<<"id">> := Id,
<<"type">> := Type,
<<"config">> := Config,
<<"created_at">> := CreatedAt,
@ -696,8 +702,60 @@ import_resources(Reources) ->
type => any_to_atom(Type),
config => Config,
created_at => NCreatedAt,
description => Desc})
end, Reources).
description => Desc}).
import_resources_and_rules(Resources, Rules, FromVersion)
when FromVersion =:= "4.0" orelse FromVersion =:= "4.1" orelse FromVersion =:= "4.2" ->
Configs = lists:foldl(fun(#{<<"id">> := ID,
<<"type">> := <<"web_hook">>,
<<"config">> := #{<<"content_type">> := ContentType,
<<"headers">> := Headers,
<<"method">> := Method,
<<"url">> := URL}} = Resource, Acc) ->
NConfig = #{<<"connect_timeout">> => 5,
<<"request_timeout">> => 5,
<<"cacertfile">> => <<>>,
<<"certfile">> => <<>>,
<<"keyfile">> => <<>>,
<<"pool_size">> => 8,
<<"url">> => URL,
<<"verify">> => true},
NResource = Resource#{<<"config">> := NConfig},
import_resource(NResource),
NHeaders = maps:put(<<"content-type">>, ContentType, Headers),
[{ID, #{headers => NHeaders, method => Method}} | Acc];
(Resource, Acc) ->
import_resource(Resource),
Acc
end, [], Resources),
lists:foreach(fun(#{<<"actions">> := Actions} = Rule) ->
NActions = apply_new_config(Actions, Configs),
import_rule(Rule#{<<"actions">> := NActions})
end, Rules);
import_resources_and_rules(Resources, Rules, _FromVersion) ->
import_resources(Resources),
import_rules(Rules).
apply_new_config(Actions, Configs) ->
apply_new_config(Actions, Configs, []).
apply_new_config([], _Configs, Acc) ->
Acc;
apply_new_config([Action = #{<<"name">> := <<"data_to_webserver">>,
<<"args">> := #{<<"$resource">> := ID,
<<"path">> := Path,
<<"payload_tmpl">> := PayloadTmpl}} | More], Configs, Acc) ->
case proplists:get_value(ID, Configs, undefined) of
undefined ->
apply_new_config(More, Configs, [Action | Acc]);
#{headers := Headers, method := Method} ->
Args = #{<<"$resource">> => ID,
<<"body">> => PayloadTmpl,
<<"headers">> => Headers,
<<"method">> => Method,
<<"path">> => Path},
apply_new_config(More, Configs, [Action#{<<"args">> := Args} | Acc])
end.
import_blacklist(Blacklist) ->
lists:foreach(fun(#{<<"who">> := Who,

View File

@ -174,8 +174,7 @@ do_import(Filename) ->
case lists:member(Version, ?VERSIONS) of
true ->
try
emqx_mgmt:import_resources(maps:get(<<"resources">>, Data, [])),
emqx_mgmt:import_rules(maps:get(<<"rules">>, Data, [])),
emqx_mgmt:import_resources_and_rules(maps:get(<<"resources">>, Data, []), maps:get(<<"rules">>, Data, []), Version),
emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])),
emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])),
emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])),

View File

@ -2,49 +2,51 @@
## WebHook
##====================================================================
## The web services URL for Hook request
## Webhook URL
##
## Value: String
web.hook.api.url = http://127.0.0.1:8080
web.hook.url = http://127.0.0.1:8080
##--------------------------------------------------------------------
## HTTP Request Headers
## HTTP Headers
##
## The header params what you extra need
## Format:
## web.hook.headers.<param> = your-param
## Example:
## 1. web.hook.headers.token = your-token
## 2. web.hook.headers.other = others-param
## 1. web.hook.headers.content-type = application/json
## 2. web.hook.headers.accept = *
##
## Value: String
## web.hook.headers.token = your-token
web.hook.headers.content-type = application/json
## The encoding format of the payload field in the HTTP body
## The payload field only appears in the on_message_publish and on_message_delivered actions
##
## Value: plain | base64 | base62
web.hook.body.encoding_of_payload_field = plain
##--------------------------------------------------------------------
## Encode message payload field
## PEM format file of CA's
##
## Value: base64 | base62
## web.hook.encode_payload = base64
## Mysql ssl configuration.
##
## Value: on | off
## web.hook.ssl = off
## Value: File
## web.hook.ssl.cacertfile = <PEM format file of CA's>
##--------------------------------------------------------------------
## CA certificate.
## Certificate file to use, PEM format assumed
##
## Value: File
## web.hook.ssl.cafile = path to your ca file
## Client ssl certificate.
##
## Value: File
## web.hook.ssl.certfile = path to your clientcert file
## web.hook.ssl.certfile = <Certificate file to use>
##--------------------------------------------------------------------
## Client ssl keyfile.
## Private key file to use, PEM format assumed
##
## Value: File
## web.hook.ssl.keyfile = path to your clientkey file
## web.hook.ssl.keyfile = <Private key file to use>
## Turn on peer certificate verification
##
## Value: true | false
## web.hook.ssl.verify = true
## Connection process pool size
##
## Value: Number
web.hook.pool_size = 32
##--------------------------------------------------------------------
## Hook Rules

View File

@ -0,0 +1 @@
-define(APP, emqx_web_hook).

View File

@ -1,33 +1,39 @@
%%-*- mode: erlang -*-
%% EMQ X R3.0 config mapping
{mapping, "web.hook.api.url", "emqx_web_hook.url", [
{mapping, "web.hook.url", "emqx_web_hook.url", [
{datatype, string}
]}.
{mapping, "web.hook.ssl", "emqx_web.hook.ssl", [
{default, off},
{datatype, flag}
]}.
{mapping, "web.hook.ssl.cafile", "emqx_web_hook.ssloptions", [
{default, ""},
{mapping, "web.hook.headers.$name", "emqx_web_hook.headers", [
{datatype, string}
]}.
{mapping, "web.hook.ssl.certfile", "emqx_web_hook.ssloptions", [
{default, ""},
{mapping, "web.hook.body.encoding_of_payload_field", "emqx_web_hook.encoding_of_payload_field", [
{default, plain},
{datatype, {enum, [plain, base62, base64]}}
]}.
{mapping, "web.hook.ssl.cacertfile", "emqx_web_hook.cacertfile", [
{datatype, string}
]}.
{mapping, "web.hook.ssl.keyfile", "emqx_web_hook.ssloptions", [
{default, ""},
{mapping, "web.hook.ssl.certfile", "emqx_web_hook.certfile", [
{datatype, string}
]}.
{mapping, "web.hook.encode_payload", "emqx_web_hook.encode_payload", [
{default, undefined},
{datatype, {enum, [base62, base64]}}
{mapping, "web.hook.ssl.keyfile", "emqx_web_hook.keyfile", [
{datatype, string}
]}.
{mapping, "web.hook.ssl.verify", "emqx_web_hook.verify", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{mapping, "web.hook.pool_size", "emqx_web_hook.pool_size", [
{default, 32},
{datatype, integer}
]}.
{mapping, "web.hook.rule.client.connect.$name", "emqx_web_hook.rules", [
@ -78,10 +84,6 @@
{datatype, string}
]}.
{mapping, "web.hook.headers.$name", "emqx_web_hook.headers", [
{datatype, string}
]}.
{translation, "emqx_web_hook.headers", fun(Conf) ->
Headers = cuttlefish_variable:filter_by_prefix("web.hook.headers", Conf),
[{K, V} || {[_, _, _, K], V} <- Headers]
@ -94,13 +96,3 @@ end}.
{lists:concat([Name1,".",Name2]), Val}
end, Hooks)
end}.
{translation, "emqx_web_hook.ssloptions", fun(Conf) ->
CA = cuttlefish:conf_get("web.hook.ssl.cafile", Conf),
Cert = cuttlefish:conf_get("web.hook.ssl.certfile", Conf),
Key = cuttlefish:conf_get("web.hook.ssl.keyfile", Conf),
case ((Cert == "") or (Key == "")) of
true -> [{cacertfile, CA}];
_ -> [{cacertfile, CA}, {certfile, Cert}, {keyfile, Key}]
end
end}.

View File

@ -1,7 +1,8 @@
{plugins, [rebar3_proper]}.
{deps,
[{emqx_rule_engine, {git, "https://github.com/emqx/emqx-rule-engine"}}
[{ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.0"}}},
{emqx_rule_engine, {git, "https://github.com/emqx/emqx-rule-engine"}}
]}.
{edoc_opts, [{preprocess, true}]}.

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_web_hook_sup]},
{applications, [kernel,stdlib]},
{applications, [kernel,stdlib,ehttpc]},
{mod, {emqx_web_hook_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},

View File

@ -94,7 +94,7 @@ on_client_connect(ConnInfo = #{clientid := ClientId, username := Username, peern
, keepalive => maps:get(keepalive, ConnInfo)
, proto_ver => maps:get(proto_ver, ConnInfo)
},
send_http_request(Params).
send_http_request(ClientId, Params).
%%--------------------------------------------------------------------
%% Client connack
@ -111,7 +111,7 @@ on_client_connack(ConnInfo = #{clientid := ClientId, username := Username, peern
, proto_ver => maps:get(proto_ver, ConnInfo)
, conn_ack => Rc
},
send_http_request(Params).
send_http_request(ClientId, Params).
%%--------------------------------------------------------------------
%% Client connected
@ -128,7 +128,7 @@ on_client_connected(#{clientid := ClientId, username := Username, peerhost := Pe
, proto_ver => maps:get(proto_ver, ConnInfo)
, connected_at => maps:get(connected_at, ConnInfo)
},
send_http_request(Params).
send_http_request(ClientId, Params).
%%--------------------------------------------------------------------
%% Client disconnected
@ -145,7 +145,7 @@ on_client_disconnected(#{clientid := ClientId, username := Username}, Reason, Co
, reason => stringfy(maybe(Reason))
, disconnected_at => maps:get(disconnected_at, ConnInfo, erlang:system_time(millisecond))
},
send_http_request(Params).
send_http_request(ClientId, Params).
%%--------------------------------------------------------------------
%% Client subscribe
@ -163,7 +163,7 @@ on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties,
, topic => Topic
, opts => Opts
},
send_http_request(Params)
send_http_request(ClientId, Params)
end, Topic, Filter)
end, TopicTable).
@ -183,7 +183,7 @@ on_client_unsubscribe(#{clientid := ClientId, username := Username}, _Properties
, topic => Topic
, opts => Opts
},
send_http_request(Params)
send_http_request(ClientId, Params)
end, Topic, Filter)
end, TopicTable).
@ -202,7 +202,7 @@ on_session_subscribed(#{clientid := ClientId, username := Username}, Topic, Opts
, topic => Topic
, opts => Opts
},
send_http_request(Params)
send_http_request(ClientId, Params)
end, Topic, Filter).
%%--------------------------------------------------------------------
@ -219,7 +219,7 @@ on_session_unsubscribed(#{clientid := ClientId, username := Username}, Topic, _O
, username => maybe(Username)
, topic => Topic
},
send_http_request(Params)
send_http_request(ClientId, Params)
end, Topic, Filter).
%%--------------------------------------------------------------------
@ -236,7 +236,7 @@ on_session_terminated(#{clientid := ClientId, username := Username}, Reason, _Se
, username => maybe(Username)
, reason => stringfy(maybe(Reason))
},
send_http_request(Params).
send_http_request(ClientId, Params).
%%--------------------------------------------------------------------
%% Message publish
@ -259,7 +259,7 @@ on_message_publish(Message = #message{topic = Topic}, {Filter}) ->
, payload => encode_payload(Message#message.payload)
, ts => Message#message.timestamp
},
send_http_request(Params),
send_http_request(FromClientId, Params),
{ok, Message}
end, Message, Topic, Filter).
@ -287,7 +287,7 @@ on_message_delivered(#{clientid := ClientId, username := Username},
, payload => encode_payload(Message#message.payload)
, ts => Message#message.timestamp
},
send_http_request(Params)
send_http_request(ClientId, Params)
end, Topic, Filter).
%%--------------------------------------------------------------------
@ -314,35 +314,32 @@ on_message_acked(#{clientid := ClientId, username := Username},
, payload => encode_payload(Message#message.payload)
, ts => Message#message.timestamp
},
send_http_request(Params)
send_http_request(ClientId, Params)
end, Topic, Filter).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
send_http_request(Params) ->
Params1 = emqx_json:encode(Params),
Url = application:get_env(?APP, url, "http://127.0.0.1"),
send_http_request(ClientID, Params) ->
{ok, Path} = application:get_env(?APP, path),
Headers = application:get_env(?APP, headers, []),
?LOG(debug, "Send to: ~0p, params: ~0s", [Url, Params1]),
case request_(post, {Url, Headers, "application/json", Params1}, [{timeout, 5000}], [], 0) of
{ok, _} -> ok;
Body = emqx_json:encode(Params),
?LOG(debug, "Send to: ~0p, params: ~0s", [Path, Body]),
case ehttpc:request(ehttpc_pool:pick_worker(?APP, ClientID), post, {Path, Headers, Body}) of
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
ok;
{ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
ok;
{ok, StatusCode, _} ->
?LOG(warning, "HTTP request failed with status code: ~p", [StatusCode]),
ok;
{ok, StatusCode, _, _} ->
?LOG(warning, "HTTP request failed with status code: ~p", [StatusCode]),
ok;
{error, Reason} ->
?LOG(error, "HTTP request error: ~p", [Reason]), ok
end.
request_(Method, Req, HTTPOpts, Opts, Times) ->
%% Resend request, when TCP closed by remotely
NHttpOpts = case application:get_env(?APP, ssl, false) of
true -> [{ssl, application:get_env(?APP, ssloptions, [])} | HTTPOpts];
_ -> HTTPOpts
end,
case httpc:request(Method, Req, NHttpOpts, Opts) of
{error, socket_closed_remotely} when Times < 3 ->
timer:sleep(trunc(math:pow(10, Times))),
request_(Method, Req, HTTPOpts, Opts, Times+1);
Other -> Other
?LOG(error, "HTTP request error: ~p", [Reason]),
ok
end.
parse_rule(Rules) ->
@ -375,11 +372,11 @@ parse_from(Message) ->
{emqx_message:from(Message), maybe(emqx_message:get_header(username, Message))}.
encode_payload(Payload) ->
encode_payload(Payload, application:get_env(?APP, encode_payload, undefined)).
encode_payload(Payload, application:get_env(?APP, encoding_of_payload_field, plain)).
encode_payload(Payload, base62) -> emqx_base62:encode(Payload);
encode_payload(Payload, base64) -> base64:encode(Payload);
encode_payload(Payload, _) -> Payload.
encode_payload(Payload, plain) -> Payload.
stringfy(Term) when is_atom(Term); is_binary(Term) ->
Term;

View File

@ -20,43 +20,81 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_rule_engine/include/rule_actions.hrl").
-include("emqx_web_hook.hrl").
-define(RESOURCE_TYPE_WEBHOOK, 'web_hook').
-define(RESOURCE_CONFIG_SPEC, #{
url => #{order => 1,
url => #{
order => 1,
type => string,
format => url,
required => true,
title => #{en => <<"Request URL">>,
zh => <<"请求 URL"/utf8>>},
description => #{en => <<"Request URL">>,
zh => <<"请求 URL"/utf8>>}},
method => #{order => 2,
type => string,
enum => [<<"PUT">>,<<"POST">>,<<"GET">>,<<"DELETE">>],
default => <<"POST">>,
title => #{en => <<"Request Method">>,
zh => <<"请求方法"/utf8>>},
description => #{en => <<"Request Method. \n"
"Note that: the Payload Template of Action will be discarded in case of GET method">>,
zh => <<"请求方法。\n"
"注意:当方法为 GET 时,动作中的 '消息内容模板' 参数会被忽略"/utf8>>}},
content_type => #{order => 3,
type => string,
enum => [<<"application/json">>,<<"text/plain;charset=UTF-8">>],
default => <<"application/json">>,
title => #{en => <<"Content-Type">>,
zh => <<"Content-Type"/utf8>>},
description => #{en => <<"The Content-Type of HTTP Request">>,
zh => <<"HTTP 请求头中的 Content-Type 字段值"/utf8>>}},
headers => #{order => 4,
type => object,
schema => #{},
default => #{},
title => #{en => <<"Request Header">>,
zh => <<"请求头"/utf8>>},
description => #{en => <<"The custom HTTP request headers">>,
zh => <<"自定义的 HTTP 请求头列表"/utf8>>}}
title => #{en => <<"URL">>,
zh => <<"URL"/utf8>>},
description => #{en => <<"The URL of the server that will receive the Webhook requests.">>,
zh => <<"用于接收 Webhook 请求的服务器的 URL。"/utf8>>}
},
connect_timeout => #{
order => 2,
type => number,
default => 5,
title => #{en => <<"Connect Timeout">>,
zh => <<"连接超时时间"/utf8>>},
description => #{en => <<"Connect Timeout In Seconds">>,
zh => <<"连接超时时间,单位秒"/utf8>>}},
request_timeout => #{
order => 3,
type => number,
default => 5,
title => #{en => <<"Request Timeout">>,
zh => <<"请求超时时间时间"/utf8>>},
description => #{en => <<"Request Timeout In Seconds">>,
zh => <<"请求超时时间,单位秒"/utf8>>}},
cacertfile => #{
order => 4,
type => file,
default => <<>>,
title => #{en => <<"CA Certificate File">>,
zh => <<"CA 证书文件"/utf8>>},
description => #{en => <<"CA Certificate File.">>,
zh => <<"CA 证书文件。"/utf8>>}
},
certfile => #{
order => 5,
type => file,
default => <<>>,
title => #{en => <<"Certificate File">>,
zh => <<"证书文件"/utf8>>},
description => #{en => <<"Certificate File.">>,
zh => <<"证书文件。"/utf8>>}
},
keyfile => #{
order => 6,
type => file,
default => <<>>,
title => #{en => <<"Private Key File">>,
zh => <<"私钥文件"/utf8>>},
description => #{en => <<"Private key file.">>,
zh => <<"私钥文件。"/utf8>>}
},
verify => #{
order => 7,
type => boolean,
default => true,
title => #{en => <<"Verify">>,
zh => <<"Verify"/utf8>>},
description => #{en => <<"Turn on peer certificate verification.">>,
zh => <<"是否开启对端证书验证。"/utf8>>}
},
pool_size => #{
order => 8,
type => number,
default => 32,
title => #{en => <<"Pool Size">>,
zh => <<"连接池大小"/utf8>>},
description => #{en => <<"Pool Size for HTTP Server.">>,
zh => <<"HTTP Server 连接池大小。"/utf8>>}
}
}).
-define(ACTION_PARAM_RESOURCE, #{
@ -65,37 +103,54 @@
required => true,
title => #{en => <<"Resource ID">>,
zh => <<"资源 ID"/utf8>>},
description => #{en => <<"Bind a resource to this action">>,
zh => <<"给动作绑定一个资源"/utf8>>}
description => #{en => <<"Bind a resource to this action.">>,
zh => <<"给动作绑定一个资源"/utf8>>}
}).
-define(ACTION_DATA_SPEC, #{
'$resource' => ?ACTION_PARAM_RESOURCE,
path => #{order => 1,
method => #{
order => 1,
type => string,
enum => [<<"POST">>,<<"DELETE">>,<<"PUT">>,<<"GET">>],
default => <<"POST">>,
title => #{en => <<"Method">>,
zh => <<"Method"/utf8>>},
description => #{en => <<"HTTP Method.\n"
"Note that: the Body option in the Action will be discarded in case of GET or DELETE method.">>,
zh => <<"HTTP Method。\n"
"注意:当方法为 GET 或 DELETE 时,动作中的 Body 选项会被忽略。"/utf8>>}},
path => #{
order => 2,
type => string,
required => false,
default => <<>>,
default => <<"">>,
title => #{en => <<"Path">>,
zh => <<"Path"/utf8>>},
description => #{en => <<"A path component, variable interpolation from "
"SQL statement is supported. This value will be "
"concatenated with Request URL.">>,
zh => <<"URL 的路径配置,支持使用 ${} 获取规则输出的字段值。\n"
"例如:${clientid}。该值会与 Request URL 组成一个完整的 URL"/utf8>>}
description => #{en => <<"The path part of the URL, support using ${Var} to get the field value output by the rule.">>,
zh => <<"URL 的路径部分,支持使用 ${Var} 获取规则输出的字段值。\n"/utf8>>}
},
payload_tmpl => #{
order => 2,
headers => #{
order => 3,
type => object,
schema => #{},
default => #{<<"content-type">> => <<"application/json">>},
title => #{en => <<"Headers">>,
zh => <<"Headers"/utf8>>},
description => #{en => <<"HTTP headers.">>,
zh => <<"HTTP headers。"/utf8>>}},
body => #{
order => 5,
type => string,
input => textarea,
required => false,
default => <<"">>,
title => #{en => <<"Payload Template">>,
zh => <<"消息内容模板"/utf8>>},
description => #{en => <<"The payload template, variable interpolation is supported."
"If using empty template (default), then the payload will "
"be all the available vars in JSON format">>,
zh => <<"消息内容模板,支持使用 ${} 获取变量值。"
"默认消息内容为规则输出的所有字段的 JSON 字符串"/utf8>>}}
title => #{en => <<"Body">>,
zh => <<"Body"/utf8>>},
description => #{en => <<"The HTTP body supports the use of ${Var} to obtain the field value output by the rule.\n"
"The content of the default HTTP request body is a JSON string composed of the keys and values of all fields output by the rule.">>,
zh => <<"HTTP 请求体,支持使用 ${Var} 获取规则输出的字段值\n"
"默认 HTTP 请求体的内容为规则输出的所有字段的键和值构成的 JSON 字符串。"/utf8>>}}
}).
-resource_type(#{name => ?RESOURCE_TYPE_WEBHOOK,
@ -137,17 +192,29 @@
%%------------------------------------------------------------------------------
-spec(on_resource_create(binary(), map()) -> map()).
on_resource_create(ResId, Conf = #{<<"url">> := Url}) ->
case emqx_rule_utils:http_connectivity(Url) of
ok -> Conf;
on_resource_create(ResId, Conf) ->
{ok, _} = application:ensure_all_started(ehttpc),
Options = pool_opts(Conf),
PoolName = pool_name(ResId),
start_resource(ResId, PoolName, Options),
Conf#{<<"pool">> => PoolName, options => Options}.
start_resource(ResId, PoolName, Options) ->
case ehttpc_pool:start_pool(PoolName, Options) of
{ok, _} ->
?LOG(info, "Initiated Resource ~p Successfully, ResId: ~p",
[?RESOURCE_TYPE_WEBHOOK, ResId]);
{error, {already_started, _Pid}} ->
on_resource_destroy(ResId, #{<<"pool">> => PoolName}),
start_resource(ResId, PoolName, Options);
{error, Reason} ->
?LOG(error, "Initiate Resource ~p failed, ResId: ~p, ~0p",
[?RESOURCE_TYPE_WEBHOOK, ResId, Reason]),
error({connect_failure, Reason})
error({{?RESOURCE_TYPE_WEBHOOK, ResId}, create_failed})
end.
-spec(on_get_resource_status(binary(), map()) -> map()).
on_get_resource_status(ResId, _Params = #{<<"url">> := Url}) ->
on_get_resource_status(ResId, #{<<"url">> := Url}) ->
#{is_alive =>
case emqx_rule_utils:http_connectivity(Url) of
ok -> true;
@ -158,30 +225,57 @@ on_get_resource_status(ResId, _Params = #{<<"url">> := Url}) ->
end}.
-spec(on_resource_destroy(binary(), map()) -> ok | {error, Reason::term()}).
on_resource_destroy(_ResId, _Params) ->
ok.
on_resource_destroy(ResId, #{<<"pool">> := PoolName}) ->
?LOG(info, "Destroying Resource ~p, ResId: ~p", [?RESOURCE_TYPE_WEBHOOK, ResId]),
case ehttpc_pool:stop_pool(PoolName) of
ok ->
?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_WEBHOOK, ResId]);
{error, Reason} ->
?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_WEBHOOK, ResId, Reason]),
error({{?RESOURCE_TYPE_WEBHOOK, ResId}, destroy_failed})
end.
%% An action that forwards publish messages to a remote web server.
-spec(on_action_create_data_to_webserver(Id::binary(), #{url() := string()}) -> {bindings(), NewParams :: map()}).
on_action_create_data_to_webserver(Id, Params) ->
#{url := Url, headers := Headers, method := Method, content_type := ContentType, payload_tmpl := PayloadTmpl, path := Path}
= parse_action_params(Params),
PathTks = emqx_rule_utils:preproc_tmpl(Path),
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
#{method := Method,
path := Path,
headers := Headers,
body := Body,
pool := Pool,
request_timeout := RequestTimeout} = parse_action_params(Params),
BodyTokens = emqx_rule_utils:preproc_tmpl(Body),
PathTokens = emqx_rule_utils:preproc_tmpl(Path),
Params.
on_action_data_to_webserver(Selected, _Envs =
#{?BINDING_KEYS := #{
'Id' := Id,
'Url' := Url,
'Headers' := Headers,
'Method' := Method,
'ContentType' := ContentType,
'PathTks' := PathTks,
'PayloadTks' := PayloadTks
}}) ->
FullUrl = Url ++ emqx_rule_utils:proc_tmpl(PathTks, Selected),
http_request(Id, FullUrl, Headers, Method, ContentType, format_msg(PayloadTks, Selected)).
'Headers' := Headers,
'PathTokens' := PathTokens,
'BodyTokens' := BodyTokens,
'Pool' := Pool,
'RequestTimeout' := RequestTimeout},
clientid := ClientID}) ->
NBody = format_msg(BodyTokens, Selected),
NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected),
Req = create_req(Method, NPath, Headers, NBody),
case ehttpc:request(ehttpc_pool:pick_worker(Pool, ClientID), Method, Req, RequestTimeout) of
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
ok;
{ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
ok;
{ok, StatusCode, _} ->
?LOG(warning, "[WebHook Action] HTTP request failed with status code: ~p", [StatusCode]),
ok;
{ok, StatusCode, _, _} ->
?LOG(warning, "[WebHook Action] HTTP request failed with status code: ~p", [StatusCode]),
ok;
{error, Reason} ->
?LOG(error, "[WebHook Action] HTTP request error: ~p", [Reason]),
emqx_rule_metrics:inc_actions_error(Id)
end.
format_msg([], Data) ->
emqx_json:encode(Data);
@ -192,44 +286,28 @@ format_msg(Tokens, Data) ->
%% Internal functions
%%------------------------------------------------------------------------------
create_req(get, Url, Headers, _, _) ->
{(Url), (Headers)};
create_req(Method, Path, Headers, _Body)
when Method =:= get orelse Method =:= delete ->
{Path, Headers};
create_req(_, Path, Headers, Body) ->
{Path, Headers, Body}.
create_req(_, Url, Headers, ContentType, Body) ->
{(Url), (Headers), binary_to_list(ContentType), (Body)}.
http_request(ActId, Url, Headers, Method, ContentType, Params) ->
logger:debug("[WebHook Action] ~s to ~s, headers: ~p, content-type: ~p, body: ~p", [Method, Url, Headers, ContentType, Params]),
case do_http_request(Method, create_req(Method, Url, Headers, ContentType, Params),
[{timeout, 5000}], [], 0) of
{ok, _} ->
emqx_rule_metrics:inc_actions_success(ActId);
{error, Reason} ->
logger:error("[WebHook Action] HTTP request error: ~p", [Reason]),
emqx_rule_metrics:inc_actions_error(ActId)
end.
do_http_request(Method, Req, HTTPOpts, Opts, Times) ->
%% Resend request, when TCP closed by remotely
case httpc:request(Method, Req, HTTPOpts, Opts) of
{error, socket_closed_remotely} when Times < 3 ->
timer:sleep(trunc(math:pow(10, Times))),
do_http_request(Method, Req, HTTPOpts, Opts, Times+1);
Other -> Other
end.
parse_action_params(Params = #{<<"url">> := Url}) ->
parse_action_params(Params = #{<<"url">> := URL}) ->
try
#{url => str(Url),
#{path := CommonPath} = uri_string:parse(URL),
#{method => method(maps:get(<<"method">>, Params, <<"POST">>)),
path => path(filename:join(CommonPath, maps:get(<<"path">>, Params, <<>>))),
headers => headers(maps:get(<<"headers">>, Params, undefined)),
method => method(maps:get(<<"method">>, Params, <<"POST">>)),
content_type => maps:get(<<"content_type">>, Params, <<"application/json">>),
payload_tmpl => maps:get(<<"payload_tmpl">>, Params, <<>>),
path => maps:get(<<"path">>, Params, <<>>)}
body => maps:get(<<"body">>, Params, <<>>),
request_timeout => timer:seconds(maps:get(<<"request_timeout">>, Params, 5)),
pool => maps:get(<<"pool">>, Params)}
catch _:_ ->
throw({invalid_params, Params})
end.
path(<<>>) -> <<"/">>;
path(Path) -> Path.
method(GET) when GET == <<"GET">>; GET == <<"get">> -> get;
method(POST) when POST == <<"POST">>; POST == <<"post">> -> post;
method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put;
@ -245,3 +323,62 @@ headers(Headers) when is_map(Headers) ->
str(Str) when is_list(Str) -> Str;
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
str(Bin) when is_binary(Bin) -> binary_to_list(Bin).
pool_opts(Params = #{<<"url">> := URL}) ->
#{host := Host0,
port := Port,
scheme := Scheme} = uri_string:parse(URL),
Host = get_addr(binary_to_list(Host0)),
PoolSize = maps:get(<<"pool_size">>, Params, 32),
ConnectTimeout = timer:seconds(maps:get(<<"connect_timeout">>, Params, 5)),
IPv6 = case tuple_size(Host) =:= 8 of
true -> [inet6];
false -> []
end,
MoreOpts = case Scheme of
<<"http">> ->
[{transport_opts, IPv6}];
<<"https">> ->
KeyFile = maps:get(<<"keyfile">>, Params),
CertFile = maps:get(<<"certfile">>, Params),
CACertFile = maps:get(<<"cacertfile">>, Params),
VerifyType = case maps:get(<<"verify">>, Params) of
true -> verify_peer;
false -> verify_none
end,
TLSOpts = lists:filter(fun({_K, V}) when V =:= <<>> ->
false;
(_) ->
true
end, [{keyfile, KeyFile}, {certfile, CertFile}, {cacertfile, CACertFile}]),
TlsVers = ['tlsv1.2','tlsv1.1',tlsv1],
NTLSOpts = [{verify, VerifyType},
{versions, TlsVers},
{ciphers, lists:foldl(fun(TlsVer, Ciphers) ->
Ciphers ++ ssl:cipher_suites(all, TlsVer)
end, [], TlsVers)} | TLSOpts],
[{transport, ssl}, {transport_opts, NTLSOpts ++ IPv6}]
end,
[{host, Host},
{port, Port},
{pool_size, PoolSize},
{pool_type, hash},
{connect_timeout, ConnectTimeout},
{retry, 5},
{retry_timeout, 1000}] ++ MoreOpts.
get_addr(Hostname) ->
case inet:parse_address(Hostname) of
{ok, {_,_,_,_} = Addr} -> Addr;
{ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr;
{error, einval} ->
case inet:getaddr(Hostname, inet) of
{error, _} ->
{ok, Addr} = inet:getaddr(Hostname, inet6),
Addr;
{ok, Addr} -> Addr
end
end.
pool_name(ResId) ->
list_to_atom("webhook:" ++ str(ResId)).

View File

@ -20,15 +20,103 @@
-emqx_plugin(?MODULE).
-include("emqx_web_hook.hrl").
-export([ start/2
, stop/1
]).
start(_StartType, _StartArgs) ->
translate_env(),
{ok, Sup} = emqx_web_hook_sup:start_link(),
{ok, PoolOpts} = application:get_env(?APP, pool_opts),
ehttpc_sup:start_pool(?APP, PoolOpts),
emqx_web_hook:register_metrics(),
emqx_web_hook:load(),
{ok, Sup}.
stop(_State) ->
emqx_web_hook:unload().
emqx_web_hook:unload(),
ehttpc_sup:stop_pool(?APP).
add_default_scheme(URL) when is_list(URL) ->
add_default_scheme(list_to_binary(URL));
add_default_scheme(<<"http://", _/binary>> = URL) ->
URL;
add_default_scheme(<<"https://", _/binary>> = URL) ->
URL;
add_default_scheme(URL) ->
<<"http://", URL/binary>>.
translate_env() ->
{ok, URL} = application:get_env(?APP, url),
#{host := Host0,
port := Port,
path := Path0,
scheme := Scheme} = uri_string:parse(binary_to_list(add_default_scheme(URL))),
Host = get_addr(Host0),
Path = path(Path0),
PoolSize = application:get_env(?APP, pool_size, 8),
IPv6 = case tuple_size(Host) =:= 8 of
true -> [inet6];
false -> []
end,
MoreOpts = case Scheme of
"http" ->
[{transport_opts, IPv6}];
"https" ->
CACertFile = application:get_env(?APP, cacertfile, undefined),
CertFile = application:get_env(?APP, certfile, undefined),
KeyFile = application:get_env(?APP, keyfile, undefined),
{ok, Verify} = application:get_env(?APP, verify),
VerifyType = case Verify of
true -> verify_peer;
false -> verify_none
end,
TLSOpts = lists:filter(fun({_K, V}) when V =:= <<>> ->
false;
(_) ->
true
end, [{keyfile, KeyFile}, {certfile, CertFile}, {cacertfile, CACertFile}]),
TlsVers = ['tlsv1.2','tlsv1.1',tlsv1],
NTLSOpts = [{verify, VerifyType},
{versions, TlsVers},
{ciphers, lists:foldl(fun(TlsVer, Ciphers) ->
Ciphers ++ ssl:cipher_suites(all, TlsVer)
end, [], TlsVers)} | TLSOpts],
[{transport, ssl}, {transport_opts, NTLSOpts ++ IPv6}]
end,
PoolOpts = [{host, Host},
{port, Port},
{pool_size, PoolSize},
{pool_type, hash},
{connect_timeout, 5000},
{retry, 5},
{retry_timeout, 1000}] ++ MoreOpts,
application:set_env(?APP, path, Path),
application:set_env(?APP, pool_opts, PoolOpts),
Headers = application:get_env(?APP, headers, []),
NHeaders = set_content_type(Headers),
application:set_env(?APP, headers, NHeaders).
get_addr(Hostname) ->
case inet:parse_address(Hostname) of
{ok, {_,_,_,_} = Addr} -> Addr;
{ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr;
{error, einval} ->
case inet:getaddr(Hostname, inet) of
{error, _} ->
{ok, Addr} = inet:getaddr(Hostname, inet6),
Addr;
{ok, Addr} -> Addr
end
end.
path("") ->
"/";
path(Path) ->
Path.
set_content_type(Headers) ->
NHeaders = proplists:delete(<<"Content-Type">>, proplists:delete(<<"content-type">>, Headers)),
[{<<"content-type">>, <<"application/json">>} | NHeaders].