diff --git a/.github/workflows/.gitlint b/.github/workflows/.gitlint index 1396f5911..5982f6b66 100644 --- a/.github/workflows/.gitlint +++ b/.github/workflows/.gitlint @@ -58,7 +58,7 @@ ignore=title-trailing-punctuation, T1, T2, T3, T4, T5, T6, T8, B1, B2, B3, B4, B # python-style regex that the commit-msg title must match # Note that the regex can contradict with other rules if not used correctly # (e.g. title-must-not-contain-word). -regex=^(feat|fix|docs|style|refactor|test|chore|perf)\(.+\): .+ +regex=^(feat|feature|fix|docs|style|refactor|test|chore|perf|improve)\(.+\): .+ # [body-max-line-length] # line-length=72 diff --git a/apps/emqx_auth_http/src/emqx_auth_http_app.erl b/apps/emqx_auth_http/src/emqx_auth_http_app.erl index 2dc368067..c251962ce 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_app.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_app.erl @@ -85,7 +85,7 @@ r(Config) -> Headers = application:get_env(?APP, headers, []), Method = proplists:get_value(method, Config, post), Path = proplists:get_value(path, Config), - NewHeaders = [{<<"content_type">>, proplists:get_value(content_type, Config, <<"application/x-www-form-urlencoded">>)} | Headers], + NewHeaders = [{<<"content-type">>, proplists:get_value(content_type, Config, <<"application/x-www-form-urlencoded">>)} | Headers], Params = proplists:get_value(params, Config), {ok, RequestTimeout} = application:get_env(?APP, request_timeout), #http_request{method = Method, path = Path, headers = NewHeaders, params = Params, request_timeout = RequestTimeout}. @@ -118,9 +118,9 @@ translate_env() -> URL = proplists:get_value(url, Env), #{host := Host0, port := Port, - path := Path} = uri_string:parse(list_to_binary(URL)), - Host = get_addr(binary_to_list(Host0)), - [{Name, {Host, Port, binary_to_list(Path)}} | Acc] + path := Path} = uri_string:parse(URL), + Host = get_addr(Host0), + [{Name, {Host, Port, path(Path)}} | Acc] end end, [], [acl_req, auth_req, super_req]), case same_host_and_port(URLs) of @@ -137,6 +137,9 @@ translate_env() -> {error, different_server} end. +path("") -> "/"; +path(Path) -> Path. + same_host_and_port([_]) -> true; same_host_and_port([{_, {Host, Port, _}}, {_, {Host, Port, _}}]) -> diff --git a/apps/emqx_auth_http/src/emqx_auth_http_cli.erl b/apps/emqx_auth_http/src/emqx_auth_http_cli.erl index 82e493997..32edd6190 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_cli.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_cli.erl @@ -28,11 +28,11 @@ %%-------------------------------------------------------------------- request(PoolName, get, Path, Headers, Params, Timeout) -> - NewPath = Path ++ "?" ++ cow_qs:qs(bin_kw(Params)), + NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))), reply(emqx_http_client:request(get, PoolName, {NewPath, Headers}, Timeout)); request(PoolName, post, Path, Headers, Params, Timeout) -> - Body = case proplists:get_value(<<"content_type">>, Headers) of + Body = case proplists:get_value(<<"content-type">>, Headers) of <<"application/x-www-form-urlencoded">> -> cow_qs:qs(bin_kw(Params)); <<"application/json">> -> diff --git a/apps/emqx_auth_http/test/emqx_auth_http_SUITE.erl b/apps/emqx_auth_http/test/emqx_auth_http_SUITE.erl index e7496ecbc..fc716f5ae 100644 --- a/apps/emqx_auth_http/test/emqx_auth_http_SUITE.erl +++ b/apps/emqx_auth_http/test/emqx_auth_http_SUITE.erl @@ -66,13 +66,13 @@ set_special_configs(emqx, _Schmea, _Inet) -> set_special_configs(emqx_auth_http, Schema, Inet) -> ServerAddr = http_server(Schema, Inet), - AuthReq = #{method => get, + AuthReq = #{method => post, url => ServerAddr ++ "/mqtt/auth", - content_type => <<"application/x-www-form-urlencoded">>, + content_type => <<"application/json">>, params => [{"clientid", "%c"}, {"username", "%u"}, {"password", "%P"}]}, SuperReq = #{method => post, url => ServerAddr ++ "/mqtt/superuser", - content_type => <<"application/x-www-form-urlencoded">>, + content_type => <<"application/json">>, params => [{"clientid", "%c"}, {"username", "%u"}]}, AclReq = #{method => post, url => ServerAddr ++ "/mqtt/acl", diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 97e23fbe4..e2620bb9d 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -128,6 +128,7 @@ , export_acl_mnesia/0 , import_rules/1 , import_resources/1 + , import_resources_and_rules/3 , import_blacklist/1 , import_applications/1 , import_users/1 @@ -664,41 +665,98 @@ export_acl_mnesia() -> end. import_rules(Rules) -> - lists:foreach(fun(#{<<"id">> := RuleId, - <<"rawsql">> := RawSQL, - <<"actions">> := Actions, - <<"enabled">> := Enabled, - <<"description">> := Desc}) -> - Rule = #{ - id => RuleId, - rawsql => RawSQL, - actions => map_to_actions(Actions), - enabled => Enabled, - description => Desc - }, - try emqx_rule_engine:create_rule(Rule) - catch throw:{resource_not_initialized, _ResId} -> - emqx_rule_engine:create_rule(Rule#{enabled => false}) - end + lists:foreach(fun(Rule) -> + import_rule(Rule) end, Rules). import_resources(Reources) -> - lists:foreach(fun(#{<<"id">> := Id, - <<"type">> := Type, - <<"config">> := Config, - <<"created_at">> := CreatedAt, - <<"description">> := Desc}) -> - NCreatedAt = case CreatedAt of - null -> undefined; - _ -> CreatedAt - end, - emqx_rule_engine:create_resource(#{id => Id, - type => any_to_atom(Type), - config => Config, - created_at => NCreatedAt, - description => Desc}) + lists:foreach(fun(Resource) -> + import_resource(Resource) end, Reources). +import_rule(#{<<"id">> := RuleId, + <<"rawsql">> := RawSQL, + <<"actions">> := Actions, + <<"enabled">> := Enabled, + <<"description">> := Desc}) -> + Rule = #{id => RuleId, + rawsql => RawSQL, + actions => map_to_actions(Actions), + enabled => Enabled, + description => Desc}, + try emqx_rule_engine:create_rule(Rule) + catch throw:{resource_not_initialized, _ResId} -> + emqx_rule_engine:create_rule(Rule#{enabled => false}) + end. + +import_resource(#{<<"id">> := Id, + <<"type">> := Type, + <<"config">> := Config, + <<"created_at">> := CreatedAt, + <<"description">> := Desc}) -> + NCreatedAt = case CreatedAt of + null -> undefined; + _ -> CreatedAt + end, + emqx_rule_engine:create_resource(#{id => Id, + type => any_to_atom(Type), + config => Config, + created_at => NCreatedAt, + description => Desc}). + +import_resources_and_rules(Resources, Rules, FromVersion) + when FromVersion =:= "4.0" orelse FromVersion =:= "4.1" orelse FromVersion =:= "4.2" -> + Configs = lists:foldl(fun(#{<<"id">> := ID, + <<"type">> := <<"web_hook">>, + <<"config">> := #{<<"content_type">> := ContentType, + <<"headers">> := Headers, + <<"method">> := Method, + <<"url">> := URL}} = Resource, Acc) -> + NConfig = #{<<"connect_timeout">> => 5, + <<"request_timeout">> => 5, + <<"cacertfile">> => <<>>, + <<"certfile">> => <<>>, + <<"keyfile">> => <<>>, + <<"pool_size">> => 8, + <<"url">> => URL, + <<"verify">> => true}, + NResource = Resource#{<<"config">> := NConfig}, + import_resource(NResource), + NHeaders = maps:put(<<"content-type">>, ContentType, Headers), + [{ID, #{headers => NHeaders, method => Method}} | Acc]; + (Resource, Acc) -> + import_resource(Resource), + Acc + end, [], Resources), + lists:foreach(fun(#{<<"actions">> := Actions} = Rule) -> + NActions = apply_new_config(Actions, Configs), + import_rule(Rule#{<<"actions">> := NActions}) + end, Rules); +import_resources_and_rules(Resources, Rules, _FromVersion) -> + import_resources(Resources), + import_rules(Rules). + +apply_new_config(Actions, Configs) -> + apply_new_config(Actions, Configs, []). + +apply_new_config([], _Configs, Acc) -> + Acc; +apply_new_config([Action = #{<<"name">> := <<"data_to_webserver">>, + <<"args">> := #{<<"$resource">> := ID, + <<"path">> := Path, + <<"payload_tmpl">> := PayloadTmpl}} | More], Configs, Acc) -> + case proplists:get_value(ID, Configs, undefined) of + undefined -> + apply_new_config(More, Configs, [Action | Acc]); + #{headers := Headers, method := Method} -> + Args = #{<<"$resource">> => ID, + <<"body">> => PayloadTmpl, + <<"headers">> => Headers, + <<"method">> => Method, + <<"path">> => Path}, + apply_new_config(More, Configs, [Action#{<<"args">> := Args} | Acc]) + end. + import_blacklist(Blacklist) -> lists:foreach(fun(#{<<"who">> := Who, <<"by">> := By, diff --git a/apps/emqx_management/src/emqx_mgmt_api_data.erl b/apps/emqx_management/src/emqx_mgmt_api_data.erl index 9527cae6c..723824d99 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_data.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_data.erl @@ -174,8 +174,7 @@ do_import(Filename) -> case lists:member(Version, ?VERSIONS) of true -> try - emqx_mgmt:import_resources(maps:get(<<"resources">>, Data, [])), - emqx_mgmt:import_rules(maps:get(<<"rules">>, Data, [])), + emqx_mgmt:import_resources_and_rules(maps:get(<<"resources">>, Data, []), maps:get(<<"rules">>, Data, []), Version), emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])), emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])), emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])), diff --git a/apps/emqx_web_hook/etc/emqx_web_hook.conf b/apps/emqx_web_hook/etc/emqx_web_hook.conf index ff370ce9b..41821fdc9 100644 --- a/apps/emqx_web_hook/etc/emqx_web_hook.conf +++ b/apps/emqx_web_hook/etc/emqx_web_hook.conf @@ -2,49 +2,51 @@ ## WebHook ##==================================================================== -## The web services URL for Hook request +## Webhook URL ## ## Value: String -web.hook.api.url = http://127.0.0.1:8080 +web.hook.url = http://127.0.0.1:8080 -##-------------------------------------------------------------------- -## HTTP Request Headers -## -## The header params what you extra need -## Format: -## web.hook.headers. = your-param +## HTTP Headers +## ## Example: -## 1. web.hook.headers.token = your-token -## 2. web.hook.headers.other = others-param +## 1. web.hook.headers.content-type = application/json +## 2. web.hook.headers.accept = * ## ## Value: String -## web.hook.headers.token = your-token +web.hook.headers.content-type = application/json + +## The encoding format of the payload field in the HTTP body +## The payload field only appears in the on_message_publish and on_message_delivered actions +## +## Value: plain | base64 | base62 +web.hook.body.encoding_of_payload_field = plain ##-------------------------------------------------------------------- -## Encode message payload field +## PEM format file of CA's ## -## Value: base64 | base62 -## web.hook.encode_payload = base64 -## Mysql ssl configuration. -## -## Value: on | off -## web.hook.ssl = off +## Value: File +## web.hook.ssl.cacertfile = -##-------------------------------------------------------------------- -## CA certificate. +## Certificate file to use, PEM format assumed ## ## Value: File -## web.hook.ssl.cafile = path to your ca file -## Client ssl certificate. -## -## Value: File -## web.hook.ssl.certfile = path to your clientcert file +## web.hook.ssl.certfile = -##-------------------------------------------------------------------- -## Client ssl keyfile. +## Private key file to use, PEM format assumed ## ## Value: File -## web.hook.ssl.keyfile = path to your clientkey file +## web.hook.ssl.keyfile = + +## Turn on peer certificate verification +## +## Value: true | false +## web.hook.ssl.verify = true + +## Connection process pool size +## +## Value: Number +web.hook.pool_size = 32 ##-------------------------------------------------------------------- ## Hook Rules diff --git a/apps/emqx_web_hook/include/emqx_web_hook.hrl b/apps/emqx_web_hook/include/emqx_web_hook.hrl new file mode 100644 index 000000000..4666b4d27 --- /dev/null +++ b/apps/emqx_web_hook/include/emqx_web_hook.hrl @@ -0,0 +1 @@ +-define(APP, emqx_web_hook). \ No newline at end of file diff --git a/apps/emqx_web_hook/priv/emqx_web_hook.schema b/apps/emqx_web_hook/priv/emqx_web_hook.schema index 7ee6f7104..62ad804e3 100644 --- a/apps/emqx_web_hook/priv/emqx_web_hook.schema +++ b/apps/emqx_web_hook/priv/emqx_web_hook.schema @@ -1,33 +1,39 @@ %%-*- mode: erlang -*- %% EMQ X R3.0 config mapping -{mapping, "web.hook.api.url", "emqx_web_hook.url", [ +{mapping, "web.hook.url", "emqx_web_hook.url", [ {datatype, string} ]}. -{mapping, "web.hook.ssl", "emqx_web.hook.ssl", [ - {default, off}, - {datatype, flag} -]}. - -{mapping, "web.hook.ssl.cafile", "emqx_web_hook.ssloptions", [ - {default, ""}, +{mapping, "web.hook.headers.$name", "emqx_web_hook.headers", [ {datatype, string} ]}. -{mapping, "web.hook.ssl.certfile", "emqx_web_hook.ssloptions", [ - {default, ""}, +{mapping, "web.hook.body.encoding_of_payload_field", "emqx_web_hook.encoding_of_payload_field", [ + {default, plain}, + {datatype, {enum, [plain, base62, base64]}} +]}. + +{mapping, "web.hook.ssl.cacertfile", "emqx_web_hook.cacertfile", [ {datatype, string} ]}. -{mapping, "web.hook.ssl.keyfile", "emqx_web_hook.ssloptions", [ - {default, ""}, +{mapping, "web.hook.ssl.certfile", "emqx_web_hook.certfile", [ {datatype, string} ]}. -{mapping, "web.hook.encode_payload", "emqx_web_hook.encode_payload", [ - {default, undefined}, - {datatype, {enum, [base62, base64]}} +{mapping, "web.hook.ssl.keyfile", "emqx_web_hook.keyfile", [ + {datatype, string} +]}. + +{mapping, "web.hook.ssl.verify", "emqx_web_hook.verify", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "web.hook.pool_size", "emqx_web_hook.pool_size", [ + {default, 32}, + {datatype, integer} ]}. {mapping, "web.hook.rule.client.connect.$name", "emqx_web_hook.rules", [ @@ -78,10 +84,6 @@ {datatype, string} ]}. -{mapping, "web.hook.headers.$name", "emqx_web_hook.headers", [ - {datatype, string} -]}. - {translation, "emqx_web_hook.headers", fun(Conf) -> Headers = cuttlefish_variable:filter_by_prefix("web.hook.headers", Conf), [{K, V} || {[_, _, _, K], V} <- Headers] @@ -94,13 +96,3 @@ end}. {lists:concat([Name1,".",Name2]), Val} end, Hooks) end}. - -{translation, "emqx_web_hook.ssloptions", fun(Conf) -> - CA = cuttlefish:conf_get("web.hook.ssl.cafile", Conf), - Cert = cuttlefish:conf_get("web.hook.ssl.certfile", Conf), - Key = cuttlefish:conf_get("web.hook.ssl.keyfile", Conf), - case ((Cert == "") or (Key == "")) of - true -> [{cacertfile, CA}]; - _ -> [{cacertfile, CA}, {certfile, Cert}, {keyfile, Key}] - end -end}. diff --git a/apps/emqx_web_hook/rebar.config b/apps/emqx_web_hook/rebar.config index f0e99e681..a923a9906 100644 --- a/apps/emqx_web_hook/rebar.config +++ b/apps/emqx_web_hook/rebar.config @@ -1,7 +1,8 @@ {plugins, [rebar3_proper]}. {deps, - [{emqx_rule_engine, {git, "https://github.com/emqx/emqx-rule-engine"}} + [{ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.0"}}}, + {emqx_rule_engine, {git, "https://github.com/emqx/emqx-rule-engine"}} ]}. {edoc_opts, [{preprocess, true}]}. diff --git a/apps/emqx_web_hook/src/emqx_web_hook.app.src b/apps/emqx_web_hook/src/emqx_web_hook.app.src index d5734bde6..b68816579 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.app.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.app.src @@ -3,7 +3,7 @@ {vsn, "4.3.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_web_hook_sup]}, - {applications, [kernel,stdlib]}, + {applications, [kernel,stdlib,ehttpc]}, {mod, {emqx_web_hook_app,[]}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx_web_hook/src/emqx_web_hook.erl b/apps/emqx_web_hook/src/emqx_web_hook.erl index 3a6f785c7..ec525c759 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook.erl @@ -94,7 +94,7 @@ on_client_connect(ConnInfo = #{clientid := ClientId, username := Username, peern , keepalive => maps:get(keepalive, ConnInfo) , proto_ver => maps:get(proto_ver, ConnInfo) }, - send_http_request(Params). + send_http_request(ClientId, Params). %%-------------------------------------------------------------------- %% Client connack @@ -111,7 +111,7 @@ on_client_connack(ConnInfo = #{clientid := ClientId, username := Username, peern , proto_ver => maps:get(proto_ver, ConnInfo) , conn_ack => Rc }, - send_http_request(Params). + send_http_request(ClientId, Params). %%-------------------------------------------------------------------- %% Client connected @@ -128,7 +128,7 @@ on_client_connected(#{clientid := ClientId, username := Username, peerhost := Pe , proto_ver => maps:get(proto_ver, ConnInfo) , connected_at => maps:get(connected_at, ConnInfo) }, - send_http_request(Params). + send_http_request(ClientId, Params). %%-------------------------------------------------------------------- %% Client disconnected @@ -145,7 +145,7 @@ on_client_disconnected(#{clientid := ClientId, username := Username}, Reason, Co , reason => stringfy(maybe(Reason)) , disconnected_at => maps:get(disconnected_at, ConnInfo, erlang:system_time(millisecond)) }, - send_http_request(Params). + send_http_request(ClientId, Params). %%-------------------------------------------------------------------- %% Client subscribe @@ -163,7 +163,7 @@ on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties, , topic => Topic , opts => Opts }, - send_http_request(Params) + send_http_request(ClientId, Params) end, Topic, Filter) end, TopicTable). @@ -183,7 +183,7 @@ on_client_unsubscribe(#{clientid := ClientId, username := Username}, _Properties , topic => Topic , opts => Opts }, - send_http_request(Params) + send_http_request(ClientId, Params) end, Topic, Filter) end, TopicTable). @@ -202,7 +202,7 @@ on_session_subscribed(#{clientid := ClientId, username := Username}, Topic, Opts , topic => Topic , opts => Opts }, - send_http_request(Params) + send_http_request(ClientId, Params) end, Topic, Filter). %%-------------------------------------------------------------------- @@ -219,7 +219,7 @@ on_session_unsubscribed(#{clientid := ClientId, username := Username}, Topic, _O , username => maybe(Username) , topic => Topic }, - send_http_request(Params) + send_http_request(ClientId, Params) end, Topic, Filter). %%-------------------------------------------------------------------- @@ -236,7 +236,7 @@ on_session_terminated(#{clientid := ClientId, username := Username}, Reason, _Se , username => maybe(Username) , reason => stringfy(maybe(Reason)) }, - send_http_request(Params). + send_http_request(ClientId, Params). %%-------------------------------------------------------------------- %% Message publish @@ -259,7 +259,7 @@ on_message_publish(Message = #message{topic = Topic}, {Filter}) -> , payload => encode_payload(Message#message.payload) , ts => Message#message.timestamp }, - send_http_request(Params), + send_http_request(FromClientId, Params), {ok, Message} end, Message, Topic, Filter). @@ -287,7 +287,7 @@ on_message_delivered(#{clientid := ClientId, username := Username}, , payload => encode_payload(Message#message.payload) , ts => Message#message.timestamp }, - send_http_request(Params) + send_http_request(ClientId, Params) end, Topic, Filter). %%-------------------------------------------------------------------- @@ -314,35 +314,32 @@ on_message_acked(#{clientid := ClientId, username := Username}, , payload => encode_payload(Message#message.payload) , ts => Message#message.timestamp }, - send_http_request(Params) + send_http_request(ClientId, Params) end, Topic, Filter). %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- -send_http_request(Params) -> - Params1 = emqx_json:encode(Params), - Url = application:get_env(?APP, url, "http://127.0.0.1"), +send_http_request(ClientID, Params) -> + {ok, Path} = application:get_env(?APP, path), Headers = application:get_env(?APP, headers, []), - ?LOG(debug, "Send to: ~0p, params: ~0s", [Url, Params1]), - case request_(post, {Url, Headers, "application/json", Params1}, [{timeout, 5000}], [], 0) of - {ok, _} -> ok; + Body = emqx_json:encode(Params), + ?LOG(debug, "Send to: ~0p, params: ~0s", [Path, Body]), + case ehttpc:request(ehttpc_pool:pick_worker(?APP, ClientID), post, {Path, Headers, Body}) of + {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> + ok; + {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> + ok; + {ok, StatusCode, _} -> + ?LOG(warning, "HTTP request failed with status code: ~p", [StatusCode]), + ok; + {ok, StatusCode, _, _} -> + ?LOG(warning, "HTTP request failed with status code: ~p", [StatusCode]), + ok; {error, Reason} -> - ?LOG(error, "HTTP request error: ~p", [Reason]), ok - end. - -request_(Method, Req, HTTPOpts, Opts, Times) -> - %% Resend request, when TCP closed by remotely - NHttpOpts = case application:get_env(?APP, ssl, false) of - true -> [{ssl, application:get_env(?APP, ssloptions, [])} | HTTPOpts]; - _ -> HTTPOpts - end, - case httpc:request(Method, Req, NHttpOpts, Opts) of - {error, socket_closed_remotely} when Times < 3 -> - timer:sleep(trunc(math:pow(10, Times))), - request_(Method, Req, HTTPOpts, Opts, Times+1); - Other -> Other + ?LOG(error, "HTTP request error: ~p", [Reason]), + ok end. parse_rule(Rules) -> @@ -375,11 +372,11 @@ parse_from(Message) -> {emqx_message:from(Message), maybe(emqx_message:get_header(username, Message))}. encode_payload(Payload) -> - encode_payload(Payload, application:get_env(?APP, encode_payload, undefined)). + encode_payload(Payload, application:get_env(?APP, encoding_of_payload_field, plain)). encode_payload(Payload, base62) -> emqx_base62:encode(Payload); encode_payload(Payload, base64) -> base64:encode(Payload); -encode_payload(Payload, _) -> Payload. +encode_payload(Payload, plain) -> Payload. stringfy(Term) when is_atom(Term); is_binary(Term) -> Term; diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index 045f5e114..784b4c768 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -20,43 +20,81 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_rule_engine/include/rule_actions.hrl"). +-include("emqx_web_hook.hrl"). -define(RESOURCE_TYPE_WEBHOOK, 'web_hook'). -define(RESOURCE_CONFIG_SPEC, #{ - url => #{order => 1, - type => string, - format => url, - required => true, - title => #{en => <<"Request URL">>, - zh => <<"请求 URL"/utf8>>}, - description => #{en => <<"Request URL">>, - zh => <<"请求 URL"/utf8>>}}, - method => #{order => 2, - type => string, - enum => [<<"PUT">>,<<"POST">>,<<"GET">>,<<"DELETE">>], - default => <<"POST">>, - title => #{en => <<"Request Method">>, - zh => <<"请求方法"/utf8>>}, - description => #{en => <<"Request Method. \n" - "Note that: the Payload Template of Action will be discarded in case of GET method">>, - zh => <<"请求方法。\n" - "注意:当方法为 GET 时,动作中的 '消息内容模板' 参数会被忽略"/utf8>>}}, - content_type => #{order => 3, - type => string, - enum => [<<"application/json">>,<<"text/plain;charset=UTF-8">>], - default => <<"application/json">>, - title => #{en => <<"Content-Type">>, - zh => <<"Content-Type"/utf8>>}, - description => #{en => <<"The Content-Type of HTTP Request">>, - zh => <<"HTTP 请求头中的 Content-Type 字段值"/utf8>>}}, - headers => #{order => 4, - type => object, - schema => #{}, - default => #{}, - title => #{en => <<"Request Header">>, - zh => <<"请求头"/utf8>>}, - description => #{en => <<"The custom HTTP request headers">>, - zh => <<"自定义的 HTTP 请求头列表"/utf8>>}} + url => #{ + order => 1, + type => string, + format => url, + required => true, + title => #{en => <<"URL">>, + zh => <<"URL"/utf8>>}, + description => #{en => <<"The URL of the server that will receive the Webhook requests.">>, + zh => <<"用于接收 Webhook 请求的服务器的 URL。"/utf8>>} + }, + connect_timeout => #{ + order => 2, + type => number, + default => 5, + title => #{en => <<"Connect Timeout">>, + zh => <<"连接超时时间"/utf8>>}, + description => #{en => <<"Connect Timeout In Seconds">>, + zh => <<"连接超时时间,单位秒"/utf8>>}}, + request_timeout => #{ + order => 3, + type => number, + default => 5, + title => #{en => <<"Request Timeout">>, + zh => <<"请求超时时间时间"/utf8>>}, + description => #{en => <<"Request Timeout In Seconds">>, + zh => <<"请求超时时间,单位秒"/utf8>>}}, + cacertfile => #{ + order => 4, + type => file, + default => <<>>, + title => #{en => <<"CA Certificate File">>, + zh => <<"CA 证书文件"/utf8>>}, + description => #{en => <<"CA Certificate File.">>, + zh => <<"CA 证书文件。"/utf8>>} + }, + certfile => #{ + order => 5, + type => file, + default => <<>>, + title => #{en => <<"Certificate File">>, + zh => <<"证书文件"/utf8>>}, + description => #{en => <<"Certificate File.">>, + zh => <<"证书文件。"/utf8>>} + }, + keyfile => #{ + order => 6, + type => file, + default => <<>>, + title => #{en => <<"Private Key File">>, + zh => <<"私钥文件"/utf8>>}, + description => #{en => <<"Private key file.">>, + zh => <<"私钥文件。"/utf8>>} + }, + verify => #{ + order => 7, + type => boolean, + default => true, + title => #{en => <<"Verify">>, + zh => <<"Verify"/utf8>>}, + description => #{en => <<"Turn on peer certificate verification.">>, + zh => <<"是否开启对端证书验证。"/utf8>>} + }, + pool_size => #{ + order => 8, + type => number, + default => 32, + title => #{en => <<"Pool Size">>, + zh => <<"连接池大小"/utf8>>}, + description => #{en => <<"Pool Size for HTTP Server.">>, + zh => <<"HTTP Server 连接池大小。"/utf8>>} + } }). -define(ACTION_PARAM_RESOURCE, #{ @@ -65,37 +103,54 @@ required => true, title => #{en => <<"Resource ID">>, zh => <<"资源 ID"/utf8>>}, - description => #{en => <<"Bind a resource to this action">>, - zh => <<"给动作绑定一个资源"/utf8>>} + description => #{en => <<"Bind a resource to this action.">>, + zh => <<"给动作绑定一个资源。"/utf8>>} }). -define(ACTION_DATA_SPEC, #{ '$resource' => ?ACTION_PARAM_RESOURCE, - path => #{order => 1, - type => string, - required => false, - default => <<>>, - title => #{en => <<"Path">>, - zh => <<"Path"/utf8>>}, - description => #{en => <<"A path component, variable interpolation from " - "SQL statement is supported. This value will be " - "concatenated with Request URL.">>, - zh => <<"URL 的路径配置,支持使用 ${} 获取规则输出的字段值。\n" - "例如:${clientid}。该值会与 Request URL 组成一个完整的 URL"/utf8>>} - }, - payload_tmpl => #{ + method => #{ + order => 1, + type => string, + enum => [<<"POST">>,<<"DELETE">>,<<"PUT">>,<<"GET">>], + default => <<"POST">>, + title => #{en => <<"Method">>, + zh => <<"Method"/utf8>>}, + description => #{en => <<"HTTP Method.\n" + "Note that: the Body option in the Action will be discarded in case of GET or DELETE method.">>, + zh => <<"HTTP Method。\n" + "注意:当方法为 GET 或 DELETE 时,动作中的 Body 选项会被忽略。"/utf8>>}}, + path => #{ order => 2, type => string, + required => false, + default => <<"">>, + title => #{en => <<"Path">>, + zh => <<"Path"/utf8>>}, + description => #{en => <<"The path part of the URL, support using ${Var} to get the field value output by the rule.">>, + zh => <<"URL 的路径部分,支持使用 ${Var} 获取规则输出的字段值。\n"/utf8>>} + }, + headers => #{ + order => 3, + type => object, + schema => #{}, + default => #{<<"content-type">> => <<"application/json">>}, + title => #{en => <<"Headers">>, + zh => <<"Headers"/utf8>>}, + description => #{en => <<"HTTP headers.">>, + zh => <<"HTTP headers。"/utf8>>}}, + body => #{ + order => 5, + type => string, input => textarea, required => false, default => <<"">>, - title => #{en => <<"Payload Template">>, - zh => <<"消息内容模板"/utf8>>}, - description => #{en => <<"The payload template, variable interpolation is supported." - "If using empty template (default), then the payload will " - "be all the available vars in JSON format">>, - zh => <<"消息内容模板,支持使用 ${} 获取变量值。" - "默认消息内容为规则输出的所有字段的 JSON 字符串"/utf8>>}} + title => #{en => <<"Body">>, + zh => <<"Body"/utf8>>}, + description => #{en => <<"The HTTP body supports the use of ${Var} to obtain the field value output by the rule.\n" + "The content of the default HTTP request body is a JSON string composed of the keys and values of all fields output by the rule.">>, + zh => <<"HTTP 请求体,支持使用 ${Var} 获取规则输出的字段值\n" + "默认 HTTP 请求体的内容为规则输出的所有字段的键和值构成的 JSON 字符串。"/utf8>>}} }). -resource_type(#{name => ?RESOURCE_TYPE_WEBHOOK, @@ -137,17 +192,29 @@ %%------------------------------------------------------------------------------ -spec(on_resource_create(binary(), map()) -> map()). -on_resource_create(ResId, Conf = #{<<"url">> := Url}) -> - case emqx_rule_utils:http_connectivity(Url) of - ok -> Conf; +on_resource_create(ResId, Conf) -> + {ok, _} = application:ensure_all_started(ehttpc), + Options = pool_opts(Conf), + PoolName = pool_name(ResId), + start_resource(ResId, PoolName, Options), + Conf#{<<"pool">> => PoolName, options => Options}. + +start_resource(ResId, PoolName, Options) -> + case ehttpc_pool:start_pool(PoolName, Options) of + {ok, _} -> + ?LOG(info, "Initiated Resource ~p Successfully, ResId: ~p", + [?RESOURCE_TYPE_WEBHOOK, ResId]); + {error, {already_started, _Pid}} -> + on_resource_destroy(ResId, #{<<"pool">> => PoolName}), + start_resource(ResId, PoolName, Options); {error, Reason} -> ?LOG(error, "Initiate Resource ~p failed, ResId: ~p, ~0p", - [?RESOURCE_TYPE_WEBHOOK, ResId, Reason]), - error({connect_failure, Reason}) + [?RESOURCE_TYPE_WEBHOOK, ResId, Reason]), + error({{?RESOURCE_TYPE_WEBHOOK, ResId}, create_failed}) end. -spec(on_get_resource_status(binary(), map()) -> map()). -on_get_resource_status(ResId, _Params = #{<<"url">> := Url}) -> +on_get_resource_status(ResId, #{<<"url">> := Url}) -> #{is_alive => case emqx_rule_utils:http_connectivity(Url) of ok -> true; @@ -158,30 +225,57 @@ on_get_resource_status(ResId, _Params = #{<<"url">> := Url}) -> end}. -spec(on_resource_destroy(binary(), map()) -> ok | {error, Reason::term()}). -on_resource_destroy(_ResId, _Params) -> - ok. +on_resource_destroy(ResId, #{<<"pool">> := PoolName}) -> + ?LOG(info, "Destroying Resource ~p, ResId: ~p", [?RESOURCE_TYPE_WEBHOOK, ResId]), + case ehttpc_pool:stop_pool(PoolName) of + ok -> + ?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_WEBHOOK, ResId]); + {error, Reason} -> + ?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_WEBHOOK, ResId, Reason]), + error({{?RESOURCE_TYPE_WEBHOOK, ResId}, destroy_failed}) + end. %% An action that forwards publish messages to a remote web server. -spec(on_action_create_data_to_webserver(Id::binary(), #{url() := string()}) -> {bindings(), NewParams :: map()}). on_action_create_data_to_webserver(Id, Params) -> - #{url := Url, headers := Headers, method := Method, content_type := ContentType, payload_tmpl := PayloadTmpl, path := Path} - = parse_action_params(Params), - PathTks = emqx_rule_utils:preproc_tmpl(Path), - PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), + #{method := Method, + path := Path, + headers := Headers, + body := Body, + pool := Pool, + request_timeout := RequestTimeout} = parse_action_params(Params), + BodyTokens = emqx_rule_utils:preproc_tmpl(Body), + PathTokens = emqx_rule_utils:preproc_tmpl(Path), Params. on_action_data_to_webserver(Selected, _Envs = #{?BINDING_KEYS := #{ 'Id' := Id, - 'Url' := Url, - 'Headers' := Headers, 'Method' := Method, - 'ContentType' := ContentType, - 'PathTks' := PathTks, - 'PayloadTks' := PayloadTks - }}) -> - FullUrl = Url ++ emqx_rule_utils:proc_tmpl(PathTks, Selected), - http_request(Id, FullUrl, Headers, Method, ContentType, format_msg(PayloadTks, Selected)). + 'Headers' := Headers, + 'PathTokens' := PathTokens, + 'BodyTokens' := BodyTokens, + 'Pool' := Pool, + 'RequestTimeout' := RequestTimeout}, + clientid := ClientID}) -> + NBody = format_msg(BodyTokens, Selected), + NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected), + Req = create_req(Method, NPath, Headers, NBody), + case ehttpc:request(ehttpc_pool:pick_worker(Pool, ClientID), Method, Req, RequestTimeout) of + {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> + ok; + {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> + ok; + {ok, StatusCode, _} -> + ?LOG(warning, "[WebHook Action] HTTP request failed with status code: ~p", [StatusCode]), + ok; + {ok, StatusCode, _, _} -> + ?LOG(warning, "[WebHook Action] HTTP request failed with status code: ~p", [StatusCode]), + ok; + {error, Reason} -> + ?LOG(error, "[WebHook Action] HTTP request error: ~p", [Reason]), + emqx_rule_metrics:inc_actions_error(Id) + end. format_msg([], Data) -> emqx_json:encode(Data); @@ -192,44 +286,28 @@ format_msg(Tokens, Data) -> %% Internal functions %%------------------------------------------------------------------------------ -create_req(get, Url, Headers, _, _) -> - {(Url), (Headers)}; +create_req(Method, Path, Headers, _Body) + when Method =:= get orelse Method =:= delete -> + {Path, Headers}; +create_req(_, Path, Headers, Body) -> + {Path, Headers, Body}. -create_req(_, Url, Headers, ContentType, Body) -> - {(Url), (Headers), binary_to_list(ContentType), (Body)}. - -http_request(ActId, Url, Headers, Method, ContentType, Params) -> - logger:debug("[WebHook Action] ~s to ~s, headers: ~p, content-type: ~p, body: ~p", [Method, Url, Headers, ContentType, Params]), - case do_http_request(Method, create_req(Method, Url, Headers, ContentType, Params), - [{timeout, 5000}], [], 0) of - {ok, _} -> - emqx_rule_metrics:inc_actions_success(ActId); - {error, Reason} -> - logger:error("[WebHook Action] HTTP request error: ~p", [Reason]), - emqx_rule_metrics:inc_actions_error(ActId) - end. - -do_http_request(Method, Req, HTTPOpts, Opts, Times) -> - %% Resend request, when TCP closed by remotely - case httpc:request(Method, Req, HTTPOpts, Opts) of - {error, socket_closed_remotely} when Times < 3 -> - timer:sleep(trunc(math:pow(10, Times))), - do_http_request(Method, Req, HTTPOpts, Opts, Times+1); - Other -> Other - end. - -parse_action_params(Params = #{<<"url">> := Url}) -> +parse_action_params(Params = #{<<"url">> := URL}) -> try - #{url => str(Url), + #{path := CommonPath} = uri_string:parse(URL), + #{method => method(maps:get(<<"method">>, Params, <<"POST">>)), + path => path(filename:join(CommonPath, maps:get(<<"path">>, Params, <<>>))), headers => headers(maps:get(<<"headers">>, Params, undefined)), - method => method(maps:get(<<"method">>, Params, <<"POST">>)), - content_type => maps:get(<<"content_type">>, Params, <<"application/json">>), - payload_tmpl => maps:get(<<"payload_tmpl">>, Params, <<>>), - path => maps:get(<<"path">>, Params, <<>>)} + body => maps:get(<<"body">>, Params, <<>>), + request_timeout => timer:seconds(maps:get(<<"request_timeout">>, Params, 5)), + pool => maps:get(<<"pool">>, Params)} catch _:_ -> throw({invalid_params, Params}) end. +path(<<>>) -> <<"/">>; +path(Path) -> Path. + method(GET) when GET == <<"GET">>; GET == <<"get">> -> get; method(POST) when POST == <<"POST">>; POST == <<"post">> -> post; method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put; @@ -245,3 +323,62 @@ headers(Headers) when is_map(Headers) -> str(Str) when is_list(Str) -> Str; str(Atom) when is_atom(Atom) -> atom_to_list(Atom); str(Bin) when is_binary(Bin) -> binary_to_list(Bin). + +pool_opts(Params = #{<<"url">> := URL}) -> + #{host := Host0, + port := Port, + scheme := Scheme} = uri_string:parse(URL), + Host = get_addr(binary_to_list(Host0)), + PoolSize = maps:get(<<"pool_size">>, Params, 32), + ConnectTimeout = timer:seconds(maps:get(<<"connect_timeout">>, Params, 5)), + IPv6 = case tuple_size(Host) =:= 8 of + true -> [inet6]; + false -> [] + end, + MoreOpts = case Scheme of + <<"http">> -> + [{transport_opts, IPv6}]; + <<"https">> -> + KeyFile = maps:get(<<"keyfile">>, Params), + CertFile = maps:get(<<"certfile">>, Params), + CACertFile = maps:get(<<"cacertfile">>, Params), + VerifyType = case maps:get(<<"verify">>, Params) of + true -> verify_peer; + false -> verify_none + end, + TLSOpts = lists:filter(fun({_K, V}) when V =:= <<>> -> + false; + (_) -> + true + end, [{keyfile, KeyFile}, {certfile, CertFile}, {cacertfile, CACertFile}]), + TlsVers = ['tlsv1.2','tlsv1.1',tlsv1], + NTLSOpts = [{verify, VerifyType}, + {versions, TlsVers}, + {ciphers, lists:foldl(fun(TlsVer, Ciphers) -> + Ciphers ++ ssl:cipher_suites(all, TlsVer) + end, [], TlsVers)} | TLSOpts], + [{transport, ssl}, {transport_opts, NTLSOpts ++ IPv6}] + end, + [{host, Host}, + {port, Port}, + {pool_size, PoolSize}, + {pool_type, hash}, + {connect_timeout, ConnectTimeout}, + {retry, 5}, + {retry_timeout, 1000}] ++ MoreOpts. + +get_addr(Hostname) -> + case inet:parse_address(Hostname) of + {ok, {_,_,_,_} = Addr} -> Addr; + {ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr; + {error, einval} -> + case inet:getaddr(Hostname, inet) of + {error, _} -> + {ok, Addr} = inet:getaddr(Hostname, inet6), + Addr; + {ok, Addr} -> Addr + end + end. + +pool_name(ResId) -> + list_to_atom("webhook:" ++ str(ResId)). diff --git a/apps/emqx_web_hook/src/emqx_web_hook_app.erl b/apps/emqx_web_hook/src/emqx_web_hook_app.erl index 2b68cfbe2..218002b2b 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_app.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_app.erl @@ -20,15 +20,103 @@ -emqx_plugin(?MODULE). +-include("emqx_web_hook.hrl"). + -export([ start/2 , stop/1 ]). start(_StartType, _StartArgs) -> + translate_env(), {ok, Sup} = emqx_web_hook_sup:start_link(), + {ok, PoolOpts} = application:get_env(?APP, pool_opts), + ehttpc_sup:start_pool(?APP, PoolOpts), emqx_web_hook:register_metrics(), emqx_web_hook:load(), {ok, Sup}. stop(_State) -> - emqx_web_hook:unload(). + emqx_web_hook:unload(), + ehttpc_sup:stop_pool(?APP). + +add_default_scheme(URL) when is_list(URL) -> + add_default_scheme(list_to_binary(URL)); +add_default_scheme(<<"http://", _/binary>> = URL) -> + URL; +add_default_scheme(<<"https://", _/binary>> = URL) -> + URL; +add_default_scheme(URL) -> + <<"http://", URL/binary>>. + +translate_env() -> + {ok, URL} = application:get_env(?APP, url), + #{host := Host0, + port := Port, + path := Path0, + scheme := Scheme} = uri_string:parse(binary_to_list(add_default_scheme(URL))), + Host = get_addr(Host0), + Path = path(Path0), + PoolSize = application:get_env(?APP, pool_size, 8), + IPv6 = case tuple_size(Host) =:= 8 of + true -> [inet6]; + false -> [] + end, + MoreOpts = case Scheme of + "http" -> + [{transport_opts, IPv6}]; + "https" -> + CACertFile = application:get_env(?APP, cacertfile, undefined), + CertFile = application:get_env(?APP, certfile, undefined), + KeyFile = application:get_env(?APP, keyfile, undefined), + {ok, Verify} = application:get_env(?APP, verify), + VerifyType = case Verify of + true -> verify_peer; + false -> verify_none + end, + TLSOpts = lists:filter(fun({_K, V}) when V =:= <<>> -> + false; + (_) -> + true + end, [{keyfile, KeyFile}, {certfile, CertFile}, {cacertfile, CACertFile}]), + TlsVers = ['tlsv1.2','tlsv1.1',tlsv1], + NTLSOpts = [{verify, VerifyType}, + {versions, TlsVers}, + {ciphers, lists:foldl(fun(TlsVer, Ciphers) -> + Ciphers ++ ssl:cipher_suites(all, TlsVer) + end, [], TlsVers)} | TLSOpts], + [{transport, ssl}, {transport_opts, NTLSOpts ++ IPv6}] + end, + PoolOpts = [{host, Host}, + {port, Port}, + {pool_size, PoolSize}, + {pool_type, hash}, + {connect_timeout, 5000}, + {retry, 5}, + {retry_timeout, 1000}] ++ MoreOpts, + application:set_env(?APP, path, Path), + application:set_env(?APP, pool_opts, PoolOpts), + Headers = application:get_env(?APP, headers, []), + NHeaders = set_content_type(Headers), + application:set_env(?APP, headers, NHeaders). + +get_addr(Hostname) -> + case inet:parse_address(Hostname) of + {ok, {_,_,_,_} = Addr} -> Addr; + {ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr; + {error, einval} -> + case inet:getaddr(Hostname, inet) of + {error, _} -> + {ok, Addr} = inet:getaddr(Hostname, inet6), + Addr; + {ok, Addr} -> Addr + end + end. + +path("") -> + "/"; +path(Path) -> + Path. + +set_content_type(Headers) -> + NHeaders = proplists:delete(<<"Content-Type">>, proplists:delete(<<"content-type">>, Headers)), + [{<<"content-type">>, <<"application/json">>} | NHeaders]. \ No newline at end of file