From fcfcbf139d3d2de95ca27d9ff9705eb5e38151f5 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Wed, 24 Feb 2021 23:40:34 +0100 Subject: [PATCH] chore(webhook): merge enterprise to opensource --- apps/emqx_web_hook/etc/emqx_web_hook.conf | 4 +- apps/emqx_web_hook/include/emqx_web_hook.hrl | 2 +- apps/emqx_web_hook/priv/emqx_web_hook.schema | 5 +- apps/emqx_web_hook/rebar.config | 11 - .../emqx_web_hook/src/emqx_web_hook.appup.src | 10 + .../src/emqx_web_hook_actions.erl | 311 +++++++------ .../emqx_web_hook/test/prop_webhook_confs.erl | 142 ++++++ .../emqx_web_hook/test/prop_webhook_hooks.erl | 409 ++++++++++++++++++ .../test/props/prop_webhook_confs.erl | 6 +- 9 files changed, 743 insertions(+), 157 deletions(-) create mode 100644 apps/emqx_web_hook/src/emqx_web_hook.appup.src create mode 100644 apps/emqx_web_hook/test/prop_webhook_confs.erl create mode 100644 apps/emqx_web_hook/test/prop_webhook_hooks.erl diff --git a/apps/emqx_web_hook/etc/emqx_web_hook.conf b/apps/emqx_web_hook/etc/emqx_web_hook.conf index 159769394..218718079 100644 --- a/apps/emqx_web_hook/etc/emqx_web_hook.conf +++ b/apps/emqx_web_hook/etc/emqx_web_hook.conf @@ -5,10 +5,10 @@ ## Webhook URL ## ## Value: String -web.hook.url = http://127.0.0.1:80 +web.hook.api.url = http://127.0.0.1:80 ## HTTP Headers -## +## ## Example: ## 1. web.hook.headers.content-type = application/json ## 2. web.hook.headers.accept = * diff --git a/apps/emqx_web_hook/include/emqx_web_hook.hrl b/apps/emqx_web_hook/include/emqx_web_hook.hrl index 4666b4d27..73019ec8c 100644 --- a/apps/emqx_web_hook/include/emqx_web_hook.hrl +++ b/apps/emqx_web_hook/include/emqx_web_hook.hrl @@ -1 +1 @@ --define(APP, emqx_web_hook). \ No newline at end of file +-define(APP, emqx_web_hook). diff --git a/apps/emqx_web_hook/priv/emqx_web_hook.schema b/apps/emqx_web_hook/priv/emqx_web_hook.schema index 9610ae094..beed1b107 100644 --- a/apps/emqx_web_hook/priv/emqx_web_hook.schema +++ b/apps/emqx_web_hook/priv/emqx_web_hook.schema @@ -1,7 +1,7 @@ %%-*- mode: erlang -*- %% EMQ X R3.0 config mapping -{mapping, "web.hook.url", "emqx_web_hook.url", [ +{mapping, "web.hook.api.url", "emqx_web_hook.url", [ {datatype, string} ]}. @@ -15,14 +15,17 @@ ]}. {mapping, "web.hook.ssl.cacertfile", "emqx_web_hook.cacertfile", [ + {default, ""}, {datatype, string} ]}. {mapping, "web.hook.ssl.certfile", "emqx_web_hook.certfile", [ + {default, ""}, {datatype, string} ]}. {mapping, "web.hook.ssl.keyfile", "emqx_web_hook.keyfile", [ + {default, ""}, {datatype, string} ]}. diff --git a/apps/emqx_web_hook/rebar.config b/apps/emqx_web_hook/rebar.config index 3684b78b0..65d1434df 100644 --- a/apps/emqx_web_hook/rebar.config +++ b/apps/emqx_web_hook/rebar.config @@ -18,14 +18,3 @@ {cover_enabled, true}. {cover_opts, [verbose]}. {cover_export_enabled, true}. - -{profiles, - [{test, - [{erl_opts, [export_all, nowarn_export_all]}, - {deps, - [{emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}}, - {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}} - ]} - ]} - ]}. diff --git a/apps/emqx_web_hook/src/emqx_web_hook.appup.src b/apps/emqx_web_hook/src/emqx_web_hook.appup.src new file mode 100644 index 000000000..0c7b8ebf3 --- /dev/null +++ b/apps/emqx_web_hook/src/emqx_web_hook.appup.src @@ -0,0 +1,10 @@ +%% -*-: erlang -*- + +{VSN, + [ + {<<".*">>, []} + ], + [ + {<<".*">>, []} + ] +}. diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index f5d27f09c..4d5fed64b 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -17,95 +17,111 @@ %% Define the default actions. -module(emqx_web_hook_actions). +-export([ on_resource_create/2 + , on_get_resource_status/2 + , on_resource_destroy/2 + ]). + +-export([ on_action_create_data_to_webserver/2 + , on_action_data_to_webserver/2 + ]). + +-export_type([action_fun/0]). + -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_rule_engine/include/rule_actions.hrl"). --include("emqx_web_hook.hrl"). + +-type(action_fun() :: fun((Data :: map(), Envs :: map()) -> Result :: any())). + +-type(url() :: binary()). -define(RESOURCE_TYPE_WEBHOOK, 'web_hook'). -define(RESOURCE_CONFIG_SPEC, #{ - url => #{ - order => 1, + method => #{order => 1, type => string, - format => url, - required => true, - title => #{en => <<"URL">>, - zh => <<"URL"/utf8>>}, - description => #{en => <<"The URL of the server that will receive the Webhook requests.">>, - zh => <<"用于接收 Webhook 请求的服务器的 URL。"/utf8>>} - }, - connect_timeout => #{ - order => 2, - type => number, - default => 5, - title => #{en => <<"Connect Timeout">>, - zh => <<"连接超时时间"/utf8>>}, - description => #{en => <<"Connect timeout in seconds">>, - zh => <<"连接超时时间,单位秒"/utf8>>}}, - request_timeout => #{ - order => 3, - type => number, - default => 5, - title => #{en => <<"Request Timeout">>, - zh => <<"请求超时时间时间"/utf8>>}, - description => #{en => <<"Request timeout in seconds">>, - zh => <<"请求超时时间,单位秒"/utf8>>}}, - cacertfile => #{ - order => 4, - type => file, - default => <<>>, - title => #{en => <<"CA Certificate File">>, - zh => <<"CA 证书文件"/utf8>>}, - description => #{en => <<"CA certificate file.">>, - zh => <<"CA 证书文件。"/utf8>>} - }, - certfile => #{ - order => 5, - type => file, - default => <<>>, - title => #{en => <<"Certificate File">>, - zh => <<"证书文件"/utf8>>}, - description => #{en => <<"Certificate file.">>, - zh => <<"证书文件。"/utf8>>} - }, - keyfile => #{ - order => 6, - type => file, - default => <<>>, - title => #{en => <<"Private Key File">>, - zh => <<"私钥文件"/utf8>>}, - description => #{en => <<"Private key file.">>, - zh => <<"私钥文件。"/utf8>>} - }, - verify => #{ - order => 7, + enum => [<<"PUT">>,<<"POST">>], + default => <<"POST">>, + title => #{en => <<"Request Method">>, + zh => <<"请求方法"/utf8>>}, + description => #{en => <<"Request Method">>, + zh => <<"请求方法"/utf8>>}}, + url => #{order => 2, + type => string, + format => url, + required => true, + title => #{en => <<"Request URL">>, + zh => <<"请求 URL"/utf8>>}, + description => #{en => <<"The URL of the server that will receive the Webhook requests.">>, + zh => <<"用于接收 Webhook 请求的服务器的 URL。"/utf8>>}}, + headers => #{order => 3, + type => object, + schema => #{}, + default => #{}, + title => #{en => <<"Request Header">>, + zh => <<"请求头"/utf8>>}, + description => #{en => <<"Request Header">>, + zh => <<"请求头"/utf8>>}}, + connect_timeout => #{order => 4, + type => string, + default => <<"5s">>, + title => #{en => <<"Connect Timeout">>, + zh => <<"连接超时时间"/utf8>>}, + description => #{en => <<"Connect Timeout In Seconds">>, + zh => <<"连接超时时间"/utf8>>}}, + request_timeout => #{order => 5, + type => string, + default => <<"5s">>, + title => #{en => <<"Request Timeout">>, + zh => <<"请求超时时间时间"/utf8>>}, + description => #{en => <<"Request Timeout In Seconds">>, + zh => <<"请求超时时间"/utf8>>}}, + pool_size => #{order => 6, + type => number, + default => 8, + title => #{en => <<"Pool Size">>, zh => <<"连接池大小"/utf8>>}, + description => #{en => <<"Connection Pool">>, + zh => <<"连接池大小"/utf8>>} + }, + cacertfile => #{order => 7, + type => file, + default => <<"">>, + title => #{en => <<"CA Certificate File">>, + zh => <<"CA 证书文件"/utf8>>}, + description => #{en => <<"CA Certificate file">>, + zh => <<"CA 证书文件"/utf8>>}}, + keyfile => #{order => 8, + type => file, + default => <<"">>, + title =>#{en => <<"SSL Key">>, + zh => <<"SSL Key"/utf8>>}, + description => #{en => <<"Your ssl keyfile">>, + zh => <<"SSL 私钥"/utf8>>}}, + certfile => #{order => 9, + type => file, + default => <<"">>, + title =>#{en => <<"SSL Cert">>, + zh => <<"SSL Cert"/utf8>>}, + description => #{en => <<"Your ssl certfile">>, + zh => <<"SSL 证书"/utf8>>}}, + verify => #{order => 10, type => boolean, default => false, - title => #{en => <<"Verify">>, - zh => <<"Verify"/utf8>>}, - description => #{en => <<"Turn on peer certificate verification.">>, - zh => <<"是否开启对端证书验证。"/utf8>>} - }, - pool_size => #{ - order => 8, - type => number, - default => 32, - title => #{en => <<"Pool Size">>, - zh => <<"连接池大小"/utf8>>}, - description => #{en => <<"Pool size for HTTP server.">>, - zh => <<"HTTP server 连接池大小。"/utf8>>} - } - }). + title =>#{en => <<"Verify Server Certfile">>, + zh => <<"校验服务器证书"/utf8>>}, + description => #{en => <<"Whether to verify the server certificate. By default, the client will not verify the server's certificate. If verification is required, please set it to true.">>, + zh => <<"是否校验服务器证书。 默认客户端不会去校验服务器的证书,如果需要校验,请设置成true。"/utf8>>}} +}). -define(ACTION_PARAM_RESOURCE, #{ - order => 0, - type => string, - required => true, - title => #{en => <<"Resource ID">>, - zh => <<"资源 ID"/utf8>>}, - description => #{en => <<"Bind a resource to this action.">>, - zh => <<"给动作绑定一个资源。"/utf8>>} - }). + order => 0, + type => string, + required => true, + title => #{en => <<"Resource ID">>, + zh => <<"资源 ID"/utf8>>}, + description => #{en => <<"Bind a resource to this action">>, + zh => <<"给动作绑定一个资源"/utf8>>} +}). -define(ACTION_DATA_SPEC, #{ '$resource' => ?ACTION_PARAM_RESOURCE, @@ -153,39 +169,29 @@ "默认 HTTP 请求体的内容为规则输出的所有字段的键和值构成的 JSON 字符串。"/utf8>>}} }). --resource_type(#{name => ?RESOURCE_TYPE_WEBHOOK, - create => on_resource_create, - status => on_get_resource_status, - destroy => on_resource_destroy, - params => ?RESOURCE_CONFIG_SPEC, - title => #{en => <<"WebHook">>, - zh => <<"WebHook"/utf8>>}, - description => #{en => <<"WebHook">>, - zh => <<"WebHook"/utf8>>} - }). +-resource_type( + #{name => ?RESOURCE_TYPE_WEBHOOK, + create => on_resource_create, + status => on_get_resource_status, + destroy => on_resource_destroy, + params => ?RESOURCE_CONFIG_SPEC, + title => #{en => <<"WebHook">>, + zh => <<"WebHook"/utf8>>}, + description => #{en => <<"WebHook">>, + zh => <<"WebHook"/utf8>>} +}). -rule_action(#{name => data_to_webserver, - category => data_forward, - for => '$any', - create => on_action_create_data_to_webserver, - params => ?ACTION_DATA_SPEC, - types => [?RESOURCE_TYPE_WEBHOOK], - title => #{en => <<"Data to Web Server">>, - zh => <<"发送数据到 Web 服务"/utf8>>}, - description => #{en => <<"Forward Messages to Web Server">>, - zh => <<"将数据转发给 Web 服务"/utf8>>} - }). - --type(url() :: binary()). - --export([ on_resource_create/2 - , on_get_resource_status/2 - , on_resource_destroy/2 - ]). - --export([ on_action_create_data_to_webserver/2 - , on_action_data_to_webserver/2 - ]). + category => data_forward, + for => '$any', + create => on_action_create_data_to_webserver, + params => ?ACTION_DATA_SPEC, + types => [?RESOURCE_TYPE_WEBHOOK], + title => #{en => <<"Data to Web Server">>, + zh => <<"发送数据到 Web 服务"/utf8>>}, + description => #{en => <<"Forward Messages to Web Server">>, + zh => <<"将数据转发给 Web 服务"/utf8>>} +}). %%------------------------------------------------------------------------------ %% Actions for web hook @@ -194,7 +200,7 @@ -spec(on_resource_create(binary(), map()) -> map()). on_resource_create(ResId, Conf) -> {ok, _} = application:ensure_all_started(ehttpc), - Options = pool_opts(Conf), + Options = pool_opts(Conf, ResId), PoolName = pool_name(ResId), case test_http_connect(Conf) of true -> ok; @@ -299,7 +305,7 @@ parse_action_params(Params = #{<<"url">> := URL}) -> path => path(filename:join(CommonPath, maps:get(<<"path">>, Params, <<>>))), headers => NHeaders, body => maps:get(<<"body">>, Params, <<>>), - request_timeout => timer:seconds(maps:get(<<"request_timeout">>, Params, 5)), + request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))), pool => maps:get(<<"pool">>, Params)} catch _:_ -> throw({invalid_params, Params}) @@ -328,50 +334,77 @@ str(Str) when is_list(Str) -> Str; str(Atom) when is_atom(Atom) -> atom_to_list(Atom); str(Bin) when is_binary(Bin) -> binary_to_list(Bin). -pool_opts(Params = #{<<"url">> := URL}) -> - #{host := Host0, - scheme := Scheme} = URIMap = uri_string:parse(binary_to_list(URL)), +add_default_scheme(<<"http://", _/binary>> = URL) -> + URL; +add_default_scheme(<<"https://", _/binary>> = URL) -> + URL; +add_default_scheme(URL) -> + <<"http://", URL/binary>>. + +pool_opts(Params = #{<<"url">> := URL}, ResId) -> + #{host := Host0, scheme := Scheme} = URIMap = uri_string:parse(binary_to_list(add_default_scheme(URL))), Port = maps:get(port, URIMap, case Scheme of "https" -> 443; _ -> 80 end), PoolSize = maps:get(<<"pool_size">>, Params, 32), - ConnectTimeout = timer:seconds(maps:get(<<"connect_timeout">>, Params, 5)), + ConnectTimeout = cuttlefish_duration:parse(str(maps:get(<<"connect_timeout">>, Params, <<"5s">>))), {Inet, Host} = parse_host(Host0), - MoreOpts = case Scheme of - "http" -> - [{transport_opts, [Inet]}]; - "https" -> - KeyFile = maps:get(<<"keyfile">>, Params), - CertFile = maps:get(<<"certfile">>, Params), - CACertFile = maps:get(<<"cacertfile">>, Params), - VerifyType = case maps:get(<<"verify">>, Params) of - true -> verify_peer; - false -> verify_none - end, - TLSOpts = lists:filter(fun({_K, V}) when V =:= <<>> -> - false; - (_) -> - true - end, [{keyfile, KeyFile}, {certfile, CertFile}, {cacertfile, CACertFile}]), - NTLSOpts = [ {verify, VerifyType} - , {versions, emqx_tls_lib:default_versions()} - , {ciphers, emqx_tls_lib:default_ciphers()} - | TLSOpts - ], - [{transport, ssl}, {transport_opts, [Inet | NTLSOpts]}] - end, + SslOpts = get_ssl_options(Params, ResId, add_default_scheme(URL)), [{host, Host}, {port, Port}, {pool_size, PoolSize}, {pool_type, hash}, {connect_timeout, ConnectTimeout}, {retry, 5}, - {retry_timeout, 1000}] ++ MoreOpts. + {retry_timeout, 1000}, + {transport_opts, [Inet] ++ SslOpts}]. pool_name(ResId) -> list_to_atom("webhook:" ++ str(ResId)). +get_ssl_options(Config, ResId, <<"https://", _URL/binary>>) -> + [{transport, ssl}, {transport_opts, get_ssl_opts(Config, ResId)}]; +get_ssl_options(_Config, _ResId, _URL) -> + []. + +get_ssl_opts(Opts, ResId) -> + KeyFile = maps:get(<<"keyfile">>, Opts, undefined), + CertFile = maps:get(<<"certfile">>, Opts, undefined), + CAFile = maps:get(<<"cacertfile">>, Opts, undefined), + Filter = fun(Opts1) -> + [{K, V} || {K, V} <- Opts1, + V =/= undefined, + V =/= <<>>, + V =/= "" ] + end, + Key = save_upload_file(KeyFile, ResId), + Cert = save_upload_file(CertFile, ResId), + CA = save_upload_file(CAFile, ResId), + Verify = case maps:get(<<"verify">>, Opts, false) of + false -> verify_none; + true -> verify_peer + end, + case Filter([{keyfile, Key}, {certfile, Cert}, {cacertfile, CA}]) of + [] -> [{verify, Verify}]; + SslOpts -> + [{verify, Verify} | SslOpts] + end. + +save_upload_file(#{<<"file">> := <<>>, <<"filename">> := <<>>}, _ResId) -> ""; +save_upload_file(FilePath, _) when is_binary(FilePath) -> binary_to_list(FilePath); +save_upload_file(#{<<"file">> := File, <<"filename">> := FileName}, ResId) -> + FullFilename = filename:join([emqx:get_env(data_dir), rules, ResId, FileName]), + ok = filelib:ensure_dir(FullFilename), + case file:write_file(FullFilename, File) of + ok -> + binary_to_list(FullFilename); + {error, Reason} -> + logger:error("Store file failed, ResId: ~p, ~0p", [ResId, Reason]), + error({ResId, store_file_fail}) + end; +save_upload_file(_, _) -> "". + parse_host(Host) -> case inet:parse_address(Host) of {ok, Addr} when size(Addr) =:= 4 -> {inet, Addr}; diff --git a/apps/emqx_web_hook/test/prop_webhook_confs.erl b/apps/emqx_web_hook/test/prop_webhook_confs.erl new file mode 100644 index 000000000..bfe170239 --- /dev/null +++ b/apps/emqx_web_hook/test/prop_webhook_confs.erl @@ -0,0 +1,142 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_webhook_confs). +-include_lib("proper/include/proper.hrl"). + +-import(emqx_ct_proper_types, + [ url/0 + , nof/1 + ]). + +-define(ALL(Vars, Types, Exprs), + ?SETUP(fun() -> + State = do_setup(), + fun() -> do_teardown(State) end + end, ?FORALL(Vars, Types, Exprs))). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_confs() -> + Schema = cuttlefish_schema:files(filelib:wildcard(code:priv_dir(emqx_web_hook) ++ "/*.schema")), + ?ALL({Url, Confs0}, {url(), confs()}, + begin + Confs = [{"web.hook.api.url", Url}|Confs0], + Envs = cuttlefish_generator:map(Schema, cuttlefish_conf_file(Confs)), + + assert_confs(Confs, Envs), + + set_application_envs(Envs), + {ok, _} = application:ensure_all_started(emqx_web_hook), + application:stop(emqx_web_hook), + unset_application_envs(Envs), + true + end). + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +do_setup() -> + application:set_env(kernel, logger_level, error), + emqx_ct_helpers:start_apps([], fun set_special_cfgs/1), + ok. + +do_teardown(_) -> + emqx_ct_helpers:stop_apps([]), + ok. + +set_special_cfgs(_) -> + application:set_env(emqx, plugins_loaded_file, undefined), + application:set_env(emqx, modules_loaded_file, undefined), + ok. + +assert_confs([{"web.hook.api.url", Url}|More], Envs) -> + %% Assert! + Url = deep_get_env("emqx_web_hook.url", Envs), + assert_confs(More, Envs); + +assert_confs([{"web.hook.rule." ++ HookName0, Spec}|More], Envs) -> + HookName = re:replace(HookName0, "\\.[0-9]", "", [{return, list}]), + Rules = deep_get_env("emqx_web_hook.rules", Envs), + + %% Assert! + Spec = proplists:get_value(HookName, Rules), + + assert_confs(More, Envs); + +assert_confs([_|More], Envs) -> + assert_confs(More, Envs); + +assert_confs([], _) -> + true. + +deep_get_env(Path, Envs) -> + lists:foldl( + fun(_K, undefiend) -> undefiend; + (K, Acc) -> proplists:get_value(binary_to_atom(K, utf8), Acc) + end, Envs, re:split(Path, "\\.")). + +set_application_envs(Envs) -> + application:set_env(Envs). + +unset_application_envs(Envs) -> + lists:foreach(fun({App, Es}) -> + lists:foreach(fun({K, _}) -> + application:unset_env(App, K) + end, Es) end, Envs). + +cuttlefish_conf_file(Ls) when is_list(Ls) -> + [cuttlefish_conf_option(K,V) || {K, V} <- Ls]. + +cuttlefish_conf_option(K, V) + when is_list(K) -> + {re:split(K, "[.]", [{return, list}]), V}. + +%%-------------------------------------------------------------------- +%% Generators +%%-------------------------------------------------------------------- + +confs() -> + nof([{"web.hook.encode_payload", oneof(["base64", "base62"])}, + {"web.hook.rule.client.connect.1", rule_spec()}, + {"web.hook.rule.client.connack.1", rule_spec()}, + {"web.hook.rule.client.connected.1", rule_spec()}, + {"web.hook.rule.client.disconnected.1", rule_spec()}, + {"web.hook.rule.client.subscribe.1", rule_spec()}, + {"web.hook.rule.client.unsubscribe.1", rule_spec()}, + {"web.hook.rule.session.subscribed.1", rule_spec()}, + {"web.hook.rule.session.unsubscribed.1", rule_spec()}, + {"web.hook.rule.session.terminated.1", rule_spec()}, + {"web.hook.rule.message.publish.1", rule_spec()}, + {"web.hook.rule.message.delivered.1", rule_spec()}, + {"web.hook.rule.message.acked.1", rule_spec()} + ]). + +rule_spec() -> + ?LET(Action, action_names(), + begin + binary_to_list(emqx_json:encode(#{action => Action})) + end). + +action_names() -> + oneof([on_client_connect, on_client_connack, on_client_connected, + on_client_connected, on_client_disconnected, on_client_subscribe, on_client_unsubscribe, + on_session_subscribed, on_session_unsubscribed, on_session_terminated, + on_message_publish, on_message_delivered, on_message_acked]). + diff --git a/apps/emqx_web_hook/test/prop_webhook_hooks.erl b/apps/emqx_web_hook/test/prop_webhook_hooks.erl new file mode 100644 index 000000000..4e51573a6 --- /dev/null +++ b/apps/emqx_web_hook/test/prop_webhook_hooks.erl @@ -0,0 +1,409 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_webhook_hooks). + +-include_lib("proper/include/proper.hrl"). + +-import(emqx_ct_proper_types, + [ conninfo/0 + , clientinfo/0 + , sessioninfo/0 + , message/0 + , connack_return_code/0 + , topictab/0 + , topic/0 + , subopts/0 + ]). + +-define(ALL(Vars, Types, Exprs), + ?SETUP(fun() -> + State = do_setup(), + fun() -> do_teardown(State) end + end, ?FORALL(Vars, Types, Exprs))). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_client_connect() -> + ?ALL({ConnInfo, ConnProps, Env}, + {conninfo(), conn_properties(), empty_env()}, + begin + ok = emqx_web_hook:on_client_connect(ConnInfo, ConnProps, Env), + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => client_connect, + node => stringfy(node()), + clientid => maps:get(clientid, ConnInfo), + username => maybe(maps:get(username, ConnInfo)), + ipaddress => peer2addr(maps:get(peername, ConnInfo)), + keepalive => maps:get(keepalive, ConnInfo), + proto_ver => maps:get(proto_ver, ConnInfo) + }), + true + end). + +prop_client_connack() -> + ?ALL({ConnInfo, Rc, AckProps, Env}, + {conninfo(), connack_return_code(), ack_properties(), empty_env()}, + begin + ok = emqx_web_hook:on_client_connack(ConnInfo, Rc, AckProps, Env), + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => client_connack, + node => stringfy(node()), + clientid => maps:get(clientid, ConnInfo), + username => maybe(maps:get(username, ConnInfo)), + ipaddress => peer2addr(maps:get(peername, ConnInfo)), + keepalive => maps:get(keepalive, ConnInfo), + proto_ver => maps:get(proto_ver, ConnInfo), + conn_ack => Rc + }), + true + end). + +prop_client_connected() -> + ?ALL({ClientInfo, ConnInfo, Env}, + {clientinfo(), conninfo(), empty_env()}, + begin + ok = emqx_web_hook:on_client_connected(ClientInfo, ConnInfo, Env), + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => client_connected, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + ipaddress => peer2addr(maps:get(peerhost, ClientInfo)), + keepalive => maps:get(keepalive, ConnInfo), + proto_ver => maps:get(proto_ver, ConnInfo), + connected_at => maps:get(connected_at, ConnInfo) + }), + true + end). + +prop_client_disconnected() -> + ?ALL({ClientInfo, Reason, ConnInfo, Env}, + {clientinfo(), shutdown_reason(), disconnected_conninfo(), empty_env()}, + begin + ok = emqx_web_hook:on_client_disconnected(ClientInfo, Reason, ConnInfo, Env), + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => client_disconnected, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + disconnected_at => maps:get(disconnected_at, ConnInfo), + reason => stringfy(Reason) + }), + true + end). + +prop_client_subscribe() -> + ?ALL({ClientInfo, SubProps, TopicTab, Env}, + {clientinfo(), sub_properties(), topictab(), topic_filter_env()}, + begin + ok = emqx_web_hook:on_client_subscribe(ClientInfo, SubProps, TopicTab, Env), + + Matched = filter_topictab(TopicTab, Env), + + lists:foreach(fun({Topic, Opts}) -> + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => client_subscribe, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + topic => Topic, + opts => Opts}) + end, Matched), + true + end). + +prop_client_unsubscribe() -> + ?ALL({ClientInfo, SubProps, TopicTab, Env}, + {clientinfo(), unsub_properties(), topictab(), topic_filter_env()}, + begin + ok = emqx_web_hook:on_client_unsubscribe(ClientInfo, SubProps, TopicTab, Env), + + Matched = filter_topictab(TopicTab, Env), + + lists:foreach(fun({Topic, Opts}) -> + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => client_unsubscribe, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + topic => Topic, + opts => Opts}) + end, Matched), + true + end). + +prop_session_subscribed() -> + ?ALL({ClientInfo, Topic, SubOpts, Env}, + {clientinfo(), topic(), subopts(), topic_filter_env()}, + begin + ok = emqx_web_hook:on_session_subscribed(ClientInfo, Topic, SubOpts, Env), + filter_topic_match(Topic, Env) andalso begin + Body = receive_http_request_body(), + Body1 = emqx_json:encode( + #{action => session_subscribed, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + topic => Topic, + opts => SubOpts + }), + Body = Body1 + end, + true + end). + +prop_session_unsubscribed() -> + ?ALL({ClientInfo, Topic, SubOpts, Env}, + {clientinfo(), topic(), subopts(), empty_env()}, + begin + ok = emqx_web_hook:on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env), + filter_topic_match(Topic, Env) andalso begin + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => session_unsubscribed, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + topic => Topic + }) + end, + true + end). + +prop_session_terminated() -> + ?ALL({ClientInfo, Reason, SessInfo, Env}, + {clientinfo(), shutdown_reason(), sessioninfo(), empty_env()}, + begin + ok = emqx_web_hook:on_session_terminated(ClientInfo, Reason, SessInfo, Env), + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => session_terminated, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + reason => stringfy(Reason) + }), + true + end). + +prop_message_publish() -> + ?ALL({Msg, Env, Encode}, {message(), topic_filter_env(), payload_encode()}, + begin + application:set_env(emqx_web_hook, encode_payload, Encode), + {ok, Msg} = emqx_web_hook:on_message_publish(Msg, Env), + application:unset_env(emqx_web_hook, encode_payload), + + (not emqx_message:is_sys(Msg)) + andalso filter_topic_match(emqx_message:topic(Msg), Env) + andalso begin + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => message_publish, + node => stringfy(node()), + from_client_id => emqx_message:from(Msg), + from_username => maybe(emqx_message:get_header(username, Msg)), + topic => emqx_message:topic(Msg), + qos => emqx_message:qos(Msg), + retain => emqx_message:get_flag(retain, Msg), + payload => encode(emqx_message:payload(Msg), Encode), + ts => emqx_message:timestamp(Msg) + }) + end, + true + end). + +prop_message_delivered() -> + ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), topic_filter_env(), payload_encode()}, + begin + application:set_env(emqx_web_hook, encode_payload, Encode), + ok = emqx_web_hook:on_message_delivered(ClientInfo, Msg, Env), + application:unset_env(emqx_web_hook, encode_payload), + + (not emqx_message:is_sys(Msg)) + andalso filter_topic_match(emqx_message:topic(Msg), Env) + andalso begin + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => message_delivered, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + from_client_id => emqx_message:from(Msg), + from_username => maybe(emqx_message:get_header(username, Msg)), + topic => emqx_message:topic(Msg), + qos => emqx_message:qos(Msg), + retain => emqx_message:get_flag(retain, Msg), + payload => encode(emqx_message:payload(Msg), Encode), + ts => emqx_message:timestamp(Msg) + }) + end, + true + end). + +prop_message_acked() -> + ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), empty_env(), payload_encode()}, + begin + application:set_env(emqx_web_hook, encode_payload, Encode), + ok = emqx_web_hook:on_message_acked(ClientInfo, Msg, Env), + application:unset_env(emqx_web_hook, encode_payload), + + (not emqx_message:is_sys(Msg)) + andalso filter_topic_match(emqx_message:topic(Msg), Env) + andalso begin + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => message_acked, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + from_client_id => emqx_message:from(Msg), + from_username => maybe(emqx_message:get_header(username, Msg)), + topic => emqx_message:topic(Msg), + qos => emqx_message:qos(Msg), + retain => emqx_message:get_flag(retain, Msg), + payload => encode(emqx_message:payload(Msg), Encode), + ts => emqx_message:timestamp(Msg) + }) + end, + true + end). + +%%-------------------------------------------------------------------- +%% Helper +%%-------------------------------------------------------------------- +do_setup() -> + %% Pre-defined envs + application:set_env(emqx_web_hook, path, "path"), + application:set_env(emqx_web_hook, headers, []), + + meck:new(ehttpc_pool, [passthrough, no_history]), + meck:expect(ehttpc_pool, pick_worker, fun(_, _) -> ok end), + + Self = self(), + meck:new(ehttpc, [passthrough, no_history]), + meck:expect(ehttpc, request, + fun(_ClientId, Method, {Path, Headers, Body}) -> + Self ! {Method, Path, Headers, Body}, {ok, ok, ok} + end), + + meck:new(emqx_metrics, [passthrough, no_history]), + meck:expect(emqx_metrics, inc, fun(_) -> ok end), + ok. + +do_teardown(_) -> + meck:unload(ehttpc_pool), + meck:unload(ehttpc), + meck:unload(emqx_metrics). + +maybe(undefined) -> null; +maybe(T) -> T. + +peer2addr({Host, _}) -> + list_to_binary(inet:ntoa(Host)); +peer2addr(Host) -> + list_to_binary(inet:ntoa(Host)). + +ensure_to_binary(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); +ensure_to_binary(Bin) when is_binary(Bin) -> Bin. + +stringfy({shutdown, Reason}) -> + stringfy(Reason); +stringfy(Term) when is_atom(Term); is_binary(Term) -> + Term; +stringfy(Term) -> + unicode:characters_to_binary(io_lib:format("~0p", [Term])). + +receive_http_request_body() -> + receive + {post, _, _, Body} -> + Body + after 100 -> + exit(waiting_message_timeout) + end. + +receive_http_request_bodys() -> + receive_http_request_bodys_([]). + +receive_http_request_bodys_(Acc) -> + receive + {post, _, _, Body} -> + receive_http_request_bodys_([Body|Acc]) + after 1000 -> + lists:reverse(Acc) + end. + +filter_topictab(TopicTab, {undefined}) -> + TopicTab; +filter_topictab(TopicTab, {TopicFilter}) -> + lists:filter(fun({Topic, _}) -> emqx_topic:match(Topic, TopicFilter) end, TopicTab). + +filter_topic_match(_Topic, {undefined}) -> + true; +filter_topic_match(Topic, {TopicFilter}) -> + emqx_topic:match(Topic, TopicFilter). + +encode(Bin, base64) -> + base64:encode(Bin); +encode(Bin, base62) -> + emqx_base62:encode(Bin); +encode(Bin, _) -> + Bin. + +%%-------------------------------------------------------------------- +%% Generators +%%-------------------------------------------------------------------- + +conn_properties() -> + #{}. + +ack_properties() -> + #{}. + +sub_properties() -> + #{}. + +unsub_properties() -> + #{}. + +shutdown_reason() -> + oneof([any(), {shutdown, atom()}]). + +empty_env() -> + {undefined}. + +topic_filter_env() -> + oneof([{<<"#">>}, {undefined}, {topic()}]). + +payload_encode() -> + oneof([base62, base64, undefined]). + +http_code() -> + oneof([socket_closed_remotely, others]). + +disconnected_conninfo() -> + ?LET(Info, conninfo(), + begin + Info#{disconnected_at => erlang:system_time(millisecond)} + end). diff --git a/apps/emqx_web_hook/test/props/prop_webhook_confs.erl b/apps/emqx_web_hook/test/props/prop_webhook_confs.erl index 24903ddec..bfe170239 100644 --- a/apps/emqx_web_hook/test/props/prop_webhook_confs.erl +++ b/apps/emqx_web_hook/test/props/prop_webhook_confs.erl @@ -34,8 +34,9 @@ prop_confs() -> Schema = cuttlefish_schema:files(filelib:wildcard(code:priv_dir(emqx_web_hook) ++ "/*.schema")), - ?ALL(Confs, confs(), + ?ALL({Url, Confs0}, {url(), confs()}, begin + Confs = [{"web.hook.api.url", Url}|Confs0], Envs = cuttlefish_generator:map(Schema, cuttlefish_conf_file(Confs)), assert_confs(Confs, Envs), @@ -112,8 +113,7 @@ cuttlefish_conf_option(K, V) %%-------------------------------------------------------------------- confs() -> - nof([{"web.hook.api.url", url()}, - {"web.hook.encode_payload", oneof(["base64", "base62"])}, + nof([{"web.hook.encode_payload", oneof(["base64", "base62"])}, {"web.hook.rule.client.connect.1", rule_spec()}, {"web.hook.rule.client.connack.1", rule_spec()}, {"web.hook.rule.client.connected.1", rule_spec()},