diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 83ce7a759..385c89965 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mqtt, [{description, "EMQ X Bridge to MQTT Broker"}, - {vsn, "4.3.1"}, % strict semver, bump manually! + {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,replayq,emqtt]}, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src deleted file mode 100644 index 03e6119ae..000000000 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src +++ /dev/null @@ -1,16 +0,0 @@ -%% -*-: erlang -*- - -{VSN, - [ - {"4.3.0", [ - {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []} - ]}, - {<<".*">>, []} - ], - [ - {"4.3.0", [ - {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []} - ]}, - {<<".*">>, []} - ] -}. diff --git a/apps/emqx_rule_actions/README.md b/apps/emqx_rule_actions/README.md new file mode 100644 index 000000000..c17e1a34a --- /dev/null +++ b/apps/emqx_rule_actions/README.md @@ -0,0 +1,11 @@ +# emqx_rule_actions + +This project contains a collection of rule actions/resources. It is mainly for + making unit test easier. Also it's easier for us to create utils that many + modules depends on it. + +## Build +----- + + $ rebar3 compile + diff --git a/apps/emqx_web_hook/rebar.config b/apps/emqx_rule_actions/rebar.config similarity index 57% rename from apps/emqx_web_hook/rebar.config rename to apps/emqx_rule_actions/rebar.config index 387972c9f..097c18a3d 100644 --- a/apps/emqx_web_hook/rebar.config +++ b/apps/emqx_rule_actions/rebar.config @@ -1,18 +1,25 @@ -{plugins, [rebar3_proper]}. - {deps, []}. -{edoc_opts, [{preprocess, true}]}. {erl_opts, [warn_unused_vars, warn_shadow_vars, warn_unused_import, warn_obsolete_guard, - debug_info, - {parse_transform}]}. + no_debug_info, + compressed, %% for edge + {parse_transform} + ]}. + +{overrides, [{add, [{erl_opts, [no_debug_info, compressed]}]}]}. + +{edoc_opts, [{preprocess, true}]}. {xref_checks, [undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls, - warnings_as_errors, deprecated_functions]}. + warnings_as_errors, deprecated_functions + ]}. + {cover_enabled, true}. {cover_opts, [verbose]}. -{cover_export_enabled, true}. \ No newline at end of file +{cover_export_enabled, true}. + +{plugins, [rebar3_proper]}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl similarity index 100% rename from apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl rename to apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl diff --git a/apps/emqx_rule_actions/src/emqx_rule_actions.app.src b/apps/emqx_rule_actions/src/emqx_rule_actions.app.src new file mode 100644 index 000000000..fd95c3572 --- /dev/null +++ b/apps/emqx_rule_actions/src/emqx_rule_actions.app.src @@ -0,0 +1,11 @@ +{application, emqx_rule_actions, + [{description, "Rule actions"}, + {vsn, "5.0.0"}, + {registered, []}, + {applications, + [kernel,stdlib]}, + {env,[]}, + {modules, []}, + {licenses, ["Apache 2.0"]}, + {links, []} + ]}. diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_rule_actions/src/emqx_web_hook_actions.erl similarity index 100% rename from apps/emqx_web_hook/src/emqx_web_hook_actions.erl rename to apps/emqx_rule_actions/src/emqx_web_hook_actions.erl diff --git a/apps/emqx_web_hook/.gitignore b/apps/emqx_web_hook/.gitignore deleted file mode 100644 index e6348348a..000000000 --- a/apps/emqx_web_hook/.gitignore +++ /dev/null @@ -1,31 +0,0 @@ -.rebar3 -_* -.eunit -*.o -*.beam -*.plt -*.swp -*.swo -.erlang.cookie -ebin -log -erl_crash.dump -.rebar -logs -_build -.idea -data -.DS_Store -.erlang.mk/ -cover -ct.coverdata -deps -eunit.coverdata -test/ct.cover.spec -emqx_web_hook.d -emq_web_hook.d -rebar.lock -erlang.mk -rebar3.crashdump -etc/emqx_web_hook.conf.rendered -Mnesia.nonode@nohost diff --git a/apps/emqx_web_hook/README.md b/apps/emqx_web_hook/README.md deleted file mode 100644 index c76c2936d..000000000 --- a/apps/emqx_web_hook/README.md +++ /dev/null @@ -1,194 +0,0 @@ - -# emqx-web-hook - -EMQ X WebHook plugin. - -Please see: [EMQ X - WebHook](https://docs.emqx.io/broker/latest/en/advanced/webhook.html) - -## emqx_web_hook.conf - -```properties -## The web services URL for Hook request -## -## Value: String -web.hook.url = http://127.0.0.1:8080 - -## Encode message payload field -## -## Value: base64 | base62 -## web.hook.encode_payload = base64 - -##-------------------------------------------------------------------- -## Hook Rules - -## These configuration items represent a list of events should be forwarded -## -## Format: -## web.hook.rule.. = -web.hook.rule.client.connect.1 = {"action": "on_client_connect"} -web.hook.rule.client.connack.1 = {"action": "on_client_connack"} -web.hook.rule.client.connected.1 = {"action": "on_client_connected"} -web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"} -web.hook.rule.client.subscribe.1 = {"action": "on_client_subscribe"} -web.hook.rule.client.unsubscribe.1 = {"action": "on_client_unsubscribe"} -web.hook.rule.session.subscribed.1 = {"action": "on_session_subscribed"} -web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"} -web.hook.rule.session.terminated.1 = {"action": "on_session_terminated"} -web.hook.rule.message.publish.1 = {"action": "on_message_publish"} -web.hook.rule.message.delivered.1 = {"action": "on_message_delivered"} -web.hook.rule.message.acked.1 = {"action": "on_message_acked"} -``` - -## API - -The HTTP request parameter format: - -* client.connected -```json -{ - "action":"client_connected", - "clientid":"C_1492410235117", - "username":"C_1492410235117", - "keepalive": 60, - "ipaddress": "127.0.0.1", - "proto_ver": 4, - "connected_at": 1556176748, - "conn_ack":0 -} -``` - -* client.disconnected -```json -{ - "action":"client_disconnected", - "clientid":"C_1492410235117", - "username":"C_1492410235117", - "reason":"normal" -} -``` - -* client.subscribe -```json -{ - "action":"client_subscribe", - "clientid":"C_1492410235117", - "username":"C_1492410235117", - "topic":"world", - "opts":{ - "qos":0 - } -} -``` - -* client.unsubscribe -```json -{ - "action":"client_unsubscribe", - "clientid":"C_1492410235117", - "username":"C_1492410235117", - "topic":"world" -} -``` - -* session.created -```json -{ - "action":"session_created", - "clientid":"C_1492410235117", - "username":"C_1492410235117" -} -``` - -* session.subscribed -```json -{ - "action":"session_subscribed", - "clientid":"C_1492410235117", - "username":"C_1492410235117", - "topic":"world", - "opts":{ - "qos":0 - } -} -``` - -* session.unsubscribed -```json -{ - "action":"session_unsubscribed", - "clientid":"C_1492410235117", - "username":"C_1492410235117", - "topic":"world" -} -``` - -* session.terminated -```json -{ - "action":"session_terminated", - "clientid":"C_1492410235117", - "username":"C_1492410235117", - "reason":"normal" -} -``` - -* message.publish -```json -{ - "action":"message_publish", - "from_client_id":"C_1492410235117", - "from_username":"C_1492410235117", - "topic":"world", - "qos":0, - "retain":true, - "payload":"Hello world!", - "ts":1492412774 -} -``` - -* message.delivered -```json -{ - "action":"message_delivered", - "clientid":"C_1492410235117", - "username":"C_1492410235117", - "from_client_id":"C_1492410235117", - "from_username":"C_1492410235117", - "topic":"world", - "qos":0, - "retain":true, - "payload":"Hello world!", - "ts":1492412826 -} -``` - -* message.acked -```json -{ - "action":"message_acked", - "clientid":"C_1492410235117", - "username":"C_1492410235117", - "from_client_id":"C_1492410235117", - "from_username":"C_1492410235117", - "topic":"world", - "qos":1, - "retain":true, - "payload":"Hello world!", - "ts":1492412914 -} -``` - -## License - -Apache License Version 2.0 - -## Author - -* [Sakib Sami](https://github.com/s4kibs4mi) - -## Contributors - -* [Deng](https://github.com/turtleDeng) -* [vishr](https://github.com/vishr) -* [emqplus](https://github.com/emqplus) -* [huangdan](https://github.com/huangdan) diff --git a/apps/emqx_web_hook/TODO b/apps/emqx_web_hook/TODO deleted file mode 100644 index 31bf5a2ad..000000000 --- a/apps/emqx_web_hook/TODO +++ /dev/null @@ -1,3 +0,0 @@ -1. HTTPS -2. More HTTP Headers and Options -3. MQTT 5.0 diff --git a/apps/emqx_web_hook/etc/emqx_web_hook.conf b/apps/emqx_web_hook/etc/emqx_web_hook.conf deleted file mode 100644 index 6707e4673..000000000 --- a/apps/emqx_web_hook/etc/emqx_web_hook.conf +++ /dev/null @@ -1,77 +0,0 @@ -##==================================================================== -## WebHook -##==================================================================== - -## Webhook URL -## -## Value: String -web.hook.url = "http://127.0.0.1:80" - -## HTTP Headers -## -## Example: -## 1. web.hook.headers.content-type = "application/json" -## 2. web.hook.headers.accept = "*" -## -## Value: String -web.hook.headers.content-type = "application/json" - -## The encoding format of the payload field in the HTTP body -## The payload field only appears in the on_message_publish and on_message_delivered actions -## -## Value: plain | base64 | base62 -web.hook.body.encoding_of_payload_field = plain - -##-------------------------------------------------------------------- -## PEM format file of CA's -## -## Value: File -## web.hook.ssl.cacertfile = - -## Certificate file to use, PEM format assumed -## -## Value: File -## web.hook.ssl.certfile = - -## Private key file to use, PEM format assumed -## -## Value: File -## web.hook.ssl.keyfile = - -## Turn on peer certificate verification -## -## Value: true | false -## web.hook.ssl.verify = false - -## If not specified, the server's names returned in server's certificate is validated against -## what's provided `web.hook.url` config's host part. -## Setting to 'disable' will make EMQ X ignore unmatched server names. -## If set with a host name, the server's names returned in server's certificate is validated -## against this value. -## -## Value: String | disable -## web.hook.ssl.server_name_indication = disable - -## Connection process pool size -## -## Value: Number -web.hook.pool_size = 32 - -##-------------------------------------------------------------------- -## Hook Rules -## These configuration items represent a list of events should be forwarded -## -## Format: -## web.hook.rule.. = -#web.hook.rule.client.connect.1 = "{"action": "on_client_connect"}" -#web.hook.rule.client.connack.1 = "{"action": "on_client_connack"}" -#web.hook.rule.client.connected.1 = "{"action": "on_client_connected"}" -#web.hook.rule.client.disconnected.1 = "{"action": "on_client_disconnected"}" -#web.hook.rule.client.subscribe.1 = "{"action": "on_client_subscribe"}" -#web.hook.rule.client.unsubscribe.1 = "{"action": "on_client_unsubscribe"}" -#web.hook.rule.session.subscribed.1 = "{"action": "on_session_subscribed"}" -#web.hook.rule.session.unsubscribed.1 = "{"action": "on_session_unsubscribed"}" -#web.hook.rule.session.terminated.1 = "{"action": "on_session_terminated"}" -#web.hook.rule.message.publish.1 = "{"action": "on_message_publish"}" -#web.hook.rule.message.delivered.1 = "{"action": "on_message_delivered"}" -#web.hook.rule.message.acked.1 = ""{"action": "on_message_acked"}" diff --git a/apps/emqx_web_hook/include/emqx_web_hook.hrl b/apps/emqx_web_hook/include/emqx_web_hook.hrl deleted file mode 100644 index 73019ec8c..000000000 --- a/apps/emqx_web_hook/include/emqx_web_hook.hrl +++ /dev/null @@ -1 +0,0 @@ --define(APP, emqx_web_hook). diff --git a/apps/emqx_web_hook/priv/emqx_web_hook.schema b/apps/emqx_web_hook/priv/emqx_web_hook.schema deleted file mode 100644 index 8ba1cc0fd..000000000 --- a/apps/emqx_web_hook/priv/emqx_web_hook.schema +++ /dev/null @@ -1,105 +0,0 @@ -%%-*- mode: erlang -*- -%% EMQ X R3.0 config mapping - -{mapping, "web.hook.url", "emqx_web_hook.url", [ - {datatype, string} -]}. - -{mapping, "web.hook.headers.$name", "emqx_web_hook.headers", [ - {datatype, string} -]}. - -{mapping, "web.hook.body.encoding_of_payload_field", "emqx_web_hook.encoding_of_payload_field", [ - {default, plain}, - {datatype, {enum, [plain, base62, base64]}} -]}. - -{mapping, "web.hook.ssl.cacertfile", "emqx_web_hook.cacertfile", [ - {default, ""}, - {datatype, string} -]}. - -{mapping, "web.hook.ssl.certfile", "emqx_web_hook.certfile", [ - {default, ""}, - {datatype, string} -]}. - -{mapping, "web.hook.ssl.keyfile", "emqx_web_hook.keyfile", [ - {default, ""}, - {datatype, string} -]}. - -{mapping, "web.hook.ssl.verify", "emqx_web_hook.verify", [ - {default, false}, - {datatype, {enum, [true, false]}} -]}. - -{mapping, "web.hook.ssl.server_name_indication", "emqx_web_hook.server_name_indication", [ - {datatype, string} -]}. - -{mapping, "web.hook.pool_size", "emqx_web_hook.pool_size", [ - {default, 32}, - {datatype, integer} -]}. - -{mapping, "web.hook.rule.client.connect.$name", "emqx_web_hook.rules", [ - {datatype, string} -]}. - -{mapping, "web.hook.rule.client.connack.$name", "emqx_web_hook.rules", [ - {datatype, string} -]}. - -{mapping, "web.hook.rule.client.connected.$name", "emqx_web_hook.rules", [ - {datatype, string} -]}. - -{mapping, "web.hook.rule.client.disconnected.$name", "emqx_web_hook.rules", [ - {datatype, string} -]}. - -{mapping, "web.hook.rule.client.subscribe.$name", "emqx_web_hook.rules", [ - {datatype, string} -]}. - -{mapping, "web.hook.rule.client.unsubscribe.$name", "emqx_web_hook.rules", [ - {datatype, string} -]}. - -{mapping, "web.hook.rule.session.subscribed.$name", "emqx_web_hook.rules", [ - {datatype, string} -]}. - -{mapping, "web.hook.rule.session.unsubscribed.$name", "emqx_web_hook.rules", [ - {datatype, string} -]}. - -{mapping, "web.hook.rule.session.terminated.$name", "emqx_web_hook.rules", [ - {datatype, string} -]}. - -{mapping, "web.hook.rule.message.publish.$name", "emqx_web_hook.rules", [ - {datatype, string} -]}. - -{mapping, "web.hook.rule.message.acked.$name", "emqx_web_hook.rules", [ - {datatype, string} -]}. - -{mapping, "web.hook.rule.message.delivered.$name", "emqx_web_hook.rules", [ - {datatype, string} -]}. - -{translation, "emqx_web_hook.headers", fun(Conf) -> - Headers = cuttlefish_variable:filter_by_prefix("web.hook.headers", Conf), - [{K, V} || {[_, _, _, K], V} <- Headers] -end}. - -{translation, "emqx_web_hook.rules", fun(Conf) -> - Hooks = cuttlefish_variable:filter_by_prefix("web.hook.rule", Conf), - lists:map( - fun({[_, _, _,Name1,Name2, _], Val}) -> - {lists:concat([Name1,".",Name2]), Val} - end, Hooks) -end}. diff --git a/apps/emqx_web_hook/src/emqx_web_hook.app.src b/apps/emqx_web_hook/src/emqx_web_hook.app.src deleted file mode 100644 index e1cfda173..000000000 --- a/apps/emqx_web_hook/src/emqx_web_hook.app.src +++ /dev/null @@ -1,14 +0,0 @@ -{application, emqx_web_hook, - [{description, "EMQ X WebHook Plugin"}, - {vsn, "4.3.2"}, % strict semver, bump manually! - {modules, []}, - {registered, [emqx_web_hook_sup]}, - {applications, [kernel,stdlib,ehttpc]}, - {mod, {emqx_web_hook_app,[]}}, - {env, []}, - {licenses, ["Apache-2.0"]}, - {maintainers, ["EMQ X Team "]}, - {links, [{"Homepage", "https://emqx.io/"}, - {"Github", "https://github.com/emqx/emqx-web-hook"} - ]} - ]}. diff --git a/apps/emqx_web_hook/src/emqx_web_hook.appup.src b/apps/emqx_web_hook/src/emqx_web_hook.appup.src deleted file mode 100644 index ae6e9e1ae..000000000 --- a/apps/emqx_web_hook/src/emqx_web_hook.appup.src +++ /dev/null @@ -1,18 +0,0 @@ -%% -*-: erlang -*- - -{VSN, - [ - {<<"4.3.[0-1]">>, [ - {restart_application, emqx_web_hook}, - {apply,{emqx_rule_engine,refresh_resource,[web_hook]}} - ]}, - {<<".*">>, []} - ], - [ - {<<"4.3.[0-1]">>, [ - {restart_application, emqx_web_hook}, - {apply,{emqx_rule_engine,refresh_resource,[web_hook]}} - ]}, - {<<".*">>, []} - ] -}. diff --git a/apps/emqx_web_hook/src/emqx_web_hook.erl b/apps/emqx_web_hook/src/emqx_web_hook.erl deleted file mode 100644 index 7af83d749..000000000 --- a/apps/emqx_web_hook/src/emqx_web_hook.erl +++ /dev/null @@ -1,390 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_web_hook). - --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/logger.hrl"). - --define(APP, emqx_web_hook). - --logger_header("[WebHook]"). - --import(inet, [ntoa/1]). - -%% APIs --export([ register_metrics/0 - , load/0 - , unload/0 - ]). - -%% Hooks callback --export([ on_client_connect/3 - , on_client_connack/4 - , on_client_connected/3 - , on_client_disconnected/4 - , on_client_subscribe/4 - , on_client_unsubscribe/4 - ]). - --export([ on_session_subscribed/4 - , on_session_unsubscribed/4 - , on_session_terminated/4 - ]). --export([ on_message_publish/2 - , on_message_delivered/3 - , on_message_acked/3 - ]). - -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- - -register_metrics() -> - lists:foreach(fun emqx_metrics:ensure/1, - ['webhook.client_connect', - 'webhook.client_connack', - 'webhook.client_connected', - 'webhook.client_disconnected', - 'webhook.client_subscribe', - 'webhook.client_unsubscribe', - 'webhook.session_subscribed', - 'webhook.session_unsubscribed', - 'webhook.session_terminated', - 'webhook.message_publish', - 'webhook.message_delivered', - 'webhook.message_acked']). - -load() -> - lists:foreach( - fun({Hook, Fun, Filter}) -> - emqx:hook(Hook, {?MODULE, Fun, [{Filter}]}) - end, parse_rule(application:get_env(?APP, rules, []))). - -unload() -> - lists:foreach( - fun({Hook, Fun, _Filter}) -> - emqx:unhook(Hook, {?MODULE, Fun}) - end, parse_rule(application:get_env(?APP, rules, []))). - -%%-------------------------------------------------------------------- -%% Client connect -%%-------------------------------------------------------------------- - -on_client_connect(ConnInfo = #{clientid := ClientId, username := Username, peername := {Peerhost, _}}, _ConnProp, _Env) -> - emqx_metrics:inc('webhook.client_connect'), - Params = #{ action => client_connect - , node => node() - , clientid => ClientId - , username => maybe(Username) - , ipaddress => iolist_to_binary(ntoa(Peerhost)) - , keepalive => maps:get(keepalive, ConnInfo) - , proto_ver => maps:get(proto_ver, ConnInfo) - }, - send_http_request(ClientId, Params). - -%%-------------------------------------------------------------------- -%% Client connack -%%-------------------------------------------------------------------- - -on_client_connack(ConnInfo = #{clientid := ClientId, username := Username, peername := {Peerhost, _}}, Rc, _AckProp, _Env) -> - emqx_metrics:inc('webhook.client_connack'), - Params = #{ action => client_connack - , node => node() - , clientid => ClientId - , username => maybe(Username) - , ipaddress => iolist_to_binary(ntoa(Peerhost)) - , keepalive => maps:get(keepalive, ConnInfo) - , proto_ver => maps:get(proto_ver, ConnInfo) - , conn_ack => Rc - }, - send_http_request(ClientId, Params). - -%%-------------------------------------------------------------------- -%% Client connected -%%-------------------------------------------------------------------- - -on_client_connected(#{clientid := ClientId, username := Username, peerhost := Peerhost}, ConnInfo, _Env) -> - emqx_metrics:inc('webhook.client_connected'), - Params = #{ action => client_connected - , node => node() - , clientid => ClientId - , username => maybe(Username) - , ipaddress => iolist_to_binary(ntoa(Peerhost)) - , keepalive => maps:get(keepalive, ConnInfo) - , proto_ver => maps:get(proto_ver, ConnInfo) - , connected_at => maps:get(connected_at, ConnInfo) - }, - send_http_request(ClientId, Params). - -%%-------------------------------------------------------------------- -%% Client disconnected -%%-------------------------------------------------------------------- - -on_client_disconnected(ClientInfo, {shutdown, Reason}, ConnInfo, Env) when is_atom(Reason) -> - on_client_disconnected(ClientInfo, Reason, ConnInfo, Env); -on_client_disconnected(#{clientid := ClientId, username := Username}, Reason, ConnInfo, _Env) -> - emqx_metrics:inc('webhook.client_disconnected'), - Params = #{ action => client_disconnected - , node => node() - , clientid => ClientId - , username => maybe(Username) - , reason => stringfy(maybe(Reason)) - , disconnected_at => maps:get(disconnected_at, ConnInfo, erlang:system_time(millisecond)) - }, - send_http_request(ClientId, Params). - -%%-------------------------------------------------------------------- -%% Client subscribe -%%-------------------------------------------------------------------- - -on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties, TopicTable, {Filter}) -> - lists:foreach(fun({Topic, Opts}) -> - with_filter( - fun() -> - emqx_metrics:inc('webhook.client_subscribe'), - Params = #{ action => client_subscribe - , node => node() - , clientid => ClientId - , username => maybe(Username) - , topic => Topic - , opts => Opts - }, - send_http_request(ClientId, Params) - end, Topic, Filter) - end, TopicTable). - -%%-------------------------------------------------------------------- -%% Client unsubscribe -%%-------------------------------------------------------------------- - -on_client_unsubscribe(#{clientid := ClientId, username := Username}, _Properties, TopicTable, {Filter}) -> - lists:foreach(fun({Topic, Opts}) -> - with_filter( - fun() -> - emqx_metrics:inc('webhook.client_unsubscribe'), - Params = #{ action => client_unsubscribe - , node => node() - , clientid => ClientId - , username => maybe(Username) - , topic => Topic - , opts => Opts - }, - send_http_request(ClientId, Params) - end, Topic, Filter) - end, TopicTable). - -%%-------------------------------------------------------------------- -%% Session subscribed -%%-------------------------------------------------------------------- - -on_session_subscribed(#{clientid := ClientId, username := Username}, Topic, Opts, {Filter}) -> - with_filter( - fun() -> - emqx_metrics:inc('webhook.session_subscribed'), - Params = #{ action => session_subscribed - , node => node() - , clientid => ClientId - , username => maybe(Username) - , topic => Topic - , opts => Opts - }, - send_http_request(ClientId, Params) - end, Topic, Filter). - -%%-------------------------------------------------------------------- -%% Session unsubscribed -%%-------------------------------------------------------------------- - -on_session_unsubscribed(#{clientid := ClientId, username := Username}, Topic, _Opts, {Filter}) -> - with_filter( - fun() -> - emqx_metrics:inc('webhook.session_unsubscribed'), - Params = #{ action => session_unsubscribed - , node => node() - , clientid => ClientId - , username => maybe(Username) - , topic => Topic - }, - send_http_request(ClientId, Params) - end, Topic, Filter). - -%%-------------------------------------------------------------------- -%% Session terminated -%%-------------------------------------------------------------------- - -on_session_terminated(Info, {shutdown, Reason}, SessInfo, Env) when is_atom(Reason) -> - on_session_terminated(Info, Reason, SessInfo, Env); -on_session_terminated(#{clientid := ClientId, username := Username}, Reason, _SessInfo, _Env) -> - emqx_metrics:inc('webhook.session_terminated'), - Params = #{ action => session_terminated - , node => node() - , clientid => ClientId - , username => maybe(Username) - , reason => stringfy(maybe(Reason)) - }, - send_http_request(ClientId, Params). - -%%-------------------------------------------------------------------- -%% Message publish -%%-------------------------------------------------------------------- - -on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) -> - {ok, Message}; -on_message_publish(Message = #message{topic = Topic}, {Filter}) -> - with_filter( - fun() -> - emqx_metrics:inc('webhook.message_publish'), - {FromClientId, FromUsername} = parse_from(Message), - Params = #{ action => message_publish - , node => node() - , from_client_id => FromClientId - , from_username => FromUsername - , topic => Message#message.topic - , qos => Message#message.qos - , retain => emqx_message:get_flag(retain, Message) - , payload => encode_payload(Message#message.payload) - , ts => Message#message.timestamp - }, - send_http_request(FromClientId, Params), - {ok, Message} - end, Message, Topic, Filter). - -%%-------------------------------------------------------------------- -%% Message deliver -%%-------------------------------------------------------------------- - -on_message_delivered(_ClientInfo,#message{topic = <<"$SYS/", _/binary>>}, _Env) -> - ok; -on_message_delivered(#{clientid := ClientId, username := Username}, - Message = #message{topic = Topic}, {Filter}) -> - with_filter( - fun() -> - emqx_metrics:inc('webhook.message_delivered'), - {FromClientId, FromUsername} = parse_from(Message), - Params = #{ action => message_delivered - , node => node() - , clientid => ClientId - , username => maybe(Username) - , from_client_id => FromClientId - , from_username => FromUsername - , topic => Message#message.topic - , qos => Message#message.qos - , retain => emqx_message:get_flag(retain, Message) - , payload => encode_payload(Message#message.payload) - , ts => Message#message.timestamp - }, - send_http_request(ClientId, Params) - end, Topic, Filter). - -%%-------------------------------------------------------------------- -%% Message acked -%%-------------------------------------------------------------------- - -on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}, _Env) -> - ok; -on_message_acked(#{clientid := ClientId, username := Username}, - Message = #message{topic = Topic}, {Filter}) -> - with_filter( - fun() -> - emqx_metrics:inc('webhook.message_acked'), - {FromClientId, FromUsername} = parse_from(Message), - Params = #{ action => message_acked - , node => node() - , clientid => ClientId - , username => maybe(Username) - , from_client_id => FromClientId - , from_username => FromUsername - , topic => Message#message.topic - , qos => Message#message.qos - , retain => emqx_message:get_flag(retain, Message) - , payload => encode_payload(Message#message.payload) - , ts => Message#message.timestamp - }, - send_http_request(ClientId, Params) - end, Topic, Filter). - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -send_http_request(ClientID, Params) -> - {ok, Path} = application:get_env(?APP, path), - Headers = application:get_env(?APP, headers, []), - Body = emqx_json:encode(Params), - ?LOG(debug, "Send to: ~0p, params: ~s", [Path, Body]), - case ehttpc:request(ehttpc_pool:pick_worker(?APP, ClientID), post, {Path, Headers, Body}) of - {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> - ok; - {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> - ok; - {ok, StatusCode, _} -> - ?LOG(warning, "HTTP request failed with status code: ~p", [StatusCode]), - ok; - {ok, StatusCode, _, _} -> - ?LOG(warning, "HTTP request failed with status code: ~p", [StatusCode]), - ok; - {error, Reason} -> - ?LOG(error, "HTTP request error: ~p", [Reason]), - ok - end. - -parse_rule(Rules) -> - parse_rule(Rules, []). -parse_rule([], Acc) -> - lists:reverse(Acc); -parse_rule([{Rule, Conf} | Rules], Acc) -> - Params = emqx_json:decode(iolist_to_binary(Conf)), - Action = proplists:get_value(<<"action">>, Params), - Filter = proplists:get_value(<<"topic">>, Params), - parse_rule(Rules, [{list_to_atom(Rule), binary_to_existing_atom(Action, utf8), Filter} | Acc]). - -with_filter(Fun, _, undefined) -> - Fun(), ok; -with_filter(Fun, Topic, Filter) -> - case emqx_topic:match(Topic, Filter) of - true -> Fun(), ok; - false -> ok - end. - -with_filter(Fun, _, _, undefined) -> - Fun(); -with_filter(Fun, Msg, Topic, Filter) -> - case emqx_topic:match(Topic, Filter) of - true -> Fun(); - false -> {ok, Msg} - end. - -parse_from(Message) -> - {emqx_message:from(Message), maybe(emqx_message:get_header(username, Message))}. - -encode_payload(Payload) -> - encode_payload(Payload, application:get_env(?APP, encoding_of_payload_field, plain)). - -encode_payload(Payload, base62) -> emqx_base62:encode(Payload); -encode_payload(Payload, base64) -> base64:encode(Payload); -encode_payload(Payload, plain) -> Payload. - -stringfy(Term) when is_binary(Term) -> - Term; -stringfy(Term) when is_atom(Term) -> - atom_to_binary(Term, utf8); -stringfy(Term) -> - unicode:characters_to_binary((io_lib:format("~0p", [Term]))). - -maybe(undefined) -> null; -maybe(Str) -> Str. - diff --git a/apps/emqx_web_hook/src/emqx_web_hook_app.erl b/apps/emqx_web_hook/src/emqx_web_hook_app.erl deleted file mode 100644 index 580742c47..000000000 --- a/apps/emqx_web_hook/src/emqx_web_hook_app.erl +++ /dev/null @@ -1,102 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_web_hook_app). - --behaviour(application). - --emqx_plugin(?MODULE). - --include("emqx_web_hook.hrl"). - --export([ start/2 - , stop/1 - ]). - -start(_StartType, _StartArgs) -> - translate_env(), - {ok, Sup} = emqx_web_hook_sup:start_link(), - {ok, PoolOpts} = application:get_env(?APP, pool_opts), - {ok, _Pid} = ehttpc_sup:start_pool(?APP, PoolOpts), - emqx_web_hook:register_metrics(), - emqx_web_hook:load(), - {ok, Sup}. - -stop(_State) -> - emqx_web_hook:unload(), - ehttpc_sup:stop_pool(?APP). - -translate_env() -> - {ok, URL} = application:get_env(?APP, url), - {ok, #{host := Host, - port := Port, - scheme := Scheme} = URIMap} = emqx_http_lib:uri_parse(URL), - Path = path(URIMap), - PoolSize = application:get_env(?APP, pool_size, 32), - MoreOpts = case Scheme of - http -> - [{transport_opts, emqx_misc:ipv6_probe([])}]; - https -> - CACertFile = application:get_env(?APP, cacertfile, undefined), - CertFile = application:get_env(?APP, certfile, undefined), - KeyFile = application:get_env(?APP, keyfile, undefined), - {ok, Verify} = application:get_env(?APP, verify), - VerifyType = case Verify of - true -> verify_peer; - false -> verify_none - end, - SNI = case application:get_env(?APP, server_name_indication, undefined) of - "disable" -> disable; - SNI0 -> SNI0 - end, - TLSOpts = lists:filter(fun({_K, V}) -> - V /= <<>> andalso V /= undefined andalso V /= "" andalso true - end, [{keyfile, KeyFile}, - {certfile, CertFile}, - {cacertfile, CACertFile}, - {verify, VerifyType}, - {server_name_indication, SNI}]), - NTLSOpts = [ {versions, emqx_tls_lib:default_versions()} - , {ciphers, emqx_tls_lib:default_ciphers()} - | TLSOpts - ], - [{transport, ssl}, {transport_opts, emqx_misc:ipv6_probe(NTLSOpts)}] - end, - PoolOpts = [{host, Host}, - {port, Port}, - {pool_size, PoolSize}, - {pool_type, hash}, - {connect_timeout, 5000}, - {retry, 5}, - {retry_timeout, 1000}] ++ MoreOpts, - application:set_env(?APP, path, Path), - application:set_env(?APP, pool_opts, PoolOpts), - Headers = application:get_env(?APP, headers, []), - NHeaders = set_content_type(Headers), - application:set_env(?APP, headers, NHeaders). - -path(#{path := "", 'query' := Query}) -> - "?" ++ Query; -path(#{path := Path, 'query' := Query}) -> - Path ++ "?" ++ Query; -path(#{path := ""}) -> - "/"; -path(#{path := Path}) -> - Path. - -set_content_type(Headers) -> - NHeaders = proplists:delete(<<"Content-Type">>, proplists:delete(<<"content-type">>, Headers)), - [{<<"content-type">>, <<"application/json">>} | NHeaders]. diff --git a/apps/emqx_web_hook/src/emqx_web_hook_sup.erl b/apps/emqx_web_hook/src/emqx_web_hook_sup.erl deleted file mode 100644 index ec46efaa0..000000000 --- a/apps/emqx_web_hook/src/emqx_web_hook_sup.erl +++ /dev/null @@ -1,29 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_web_hook_sup). - --behaviour(supervisor). - --export([start_link/0]). - --export([init/1]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -init([]) -> - {ok, {{one_for_all, 0, 1}, []}}. diff --git a/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl b/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl deleted file mode 100644 index 864e1b150..000000000 --- a/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl +++ /dev/null @@ -1,284 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_web_hook_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("emqx/include/emqx.hrl"). --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). - --define(HOOK_LOOKUP(H), emqx_hooks:lookup(list_to_atom(H))). --define(ACTION(Name), #{<<"action">> := Name}). - -%%-------------------------------------------------------------------- -%% Setups -%%-------------------------------------------------------------------- - -all() -> - [ {group, http} - , {group, https} - , {group, ipv6http} - , {group, ipv6https} - , {group, all} - ]. - -groups() -> - Cases = [test_full_flow], - [ {http, [sequence], Cases} - , {https, [sequence], Cases} - , {ipv6http, [sequence], Cases} - , {ipv6https, [sequence], Cases} - , {all, [sequence], emqx_ct:all(?MODULE)} - ]. - -start_apps(F) -> emqx_ct_helpers:start_apps(apps(), F). - -init_per_group(Name, Config) -> - application:ensure_all_started(emqx_management), - set_special_cfgs(), - BasePort = - case Name of - all -> 8801; - http -> 8811; - https -> 8821; - ipv6http -> 8831; - ipv6https -> 8841 - end, - CF = case Name of - all -> fun set_special_configs_http/1; - http -> fun set_special_configs_http/1; - https -> fun set_special_configs_https/1; - ipv6http -> fun set_special_configs_ipv6_http/1; - ipv6https -> fun set_special_configs_ipv6_https/1 - end, - start_apps(fun(_) -> CF(BasePort) end), - Opts = case atom_to_list(Name) of - "ipv6" ++ _ -> [{ip, {0,0,0,0,0,0,0,1}}, inet6]; - _ -> [inet] - end, - [{base_port, BasePort}, {transport_opts, Opts} | Config]. - -end_per_group(_Name, Config) -> - emqx_ct_helpers:stop_apps(apps()), - Config. - -set_special_configs_http(Port) -> - application:set_env(emqx_web_hook, url, "http://127.0.0.1:" ++ integer_to_list(Port)). - -set_special_configs_https(Port) -> - set_ssl_configs(), - application:set_env(emqx_web_hook, url, "https://127.0.0.1:" ++ integer_to_list(Port+1)). - -set_special_configs_ipv6_http(Port) -> - application:set_env(emqx_web_hook, url, "http://[::1]:" ++ integer_to_list(Port)). - -set_special_configs_ipv6_https(Port) -> - set_ssl_configs(), - application:set_env(emqx_web_hook, url, "https://[::1]:" ++ integer_to_list(Port+1)). - -set_ssl_configs() -> - Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"), - SslOpts = [{keyfile, Path ++ "/client-key.pem"}, - {certfile, Path ++ "/client-cert.pem"}, - {cacertfile, Path ++ "/ca.pem"}], - application:set_env(emqx_web_hook, ssl, true), - application:set_env(emqx_web_hook, ssloptions, SslOpts). - -set_special_cfgs() -> - AllRules = [{"message.acked", "{\"action\": \"on_message_acked\"}"}, - {"message.delivered", "{\"action\": \"on_message_delivered\"}"}, - {"message.publish", "{\"action\": \"on_message_publish\"}"}, - {"session.terminated", "{\"action\": \"on_session_terminated\"}"}, - {"session.unsubscribed", "{\"action\": \"on_session_unsubscribed\"}"}, - {"session.subscribed", "{\"action\": \"on_session_subscribed\"}"}, - {"client.unsubscribe", "{\"action\": \"on_client_unsubscribe\"}"}, - {"client.subscribe", "{\"action\": \"on_client_subscribe\"}"}, - {"client.disconnected", "{\"action\": \"on_client_disconnected\"}"}, - {"client.connected", "{\"action\": \"on_client_connected\"}"}, - {"client.connack", "{\"action\": \"on_client_connack\"}"}, - {"client.connect", "{\"action\": \"on_client_connect\"}"}], - application:set_env(emqx_web_hook, rules, AllRules). - -%%-------------------------------------------------------------------- -%% Test cases -%%-------------------------------------------------------------------- - -test_full_flow(Config) -> - [_|_] = Opts = proplists:get_value(transport_opts, Config), - BasePort = proplists:get_value(base_port, Config), - Tester = self(), - {ok, ServerPid} = http_server:start_link(Tester, BasePort, Opts), - receive {ServerPid, ready} -> ok - after 1000 -> error(timeout) - end, - application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]), - ClientId = iolist_to_binary(["client-", integer_to_list(erlang:system_time())]), - {ok, C} = emqtt:start_link([ {clientid, ClientId} - , {proto_ver, v5} - , {keepalive, 60} - ]), - try - do_test_full_flow(C, ClientId) - after - Ref = erlang:monitor(process, ServerPid), - http_server:stop(ServerPid), - receive {'DOWN', Ref, _, _, _} -> ok - after 5000 -> error(timeout) - end - end. - -do_test_full_flow(C, ClientId) -> - {ok, _} = emqtt:connect(C), - {ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos2), - {ok, _} = emqtt:publish(C, <<"TopicA">>, <<"Payload...">>, qos2), - {ok, _, _} = emqtt:unsubscribe(C, <<"TopicA">>), - emqtt:disconnect(C), - validate_params_and_headers(undefined, ClientId). - -validate_params_and_headers(ClientState, ClientId) -> - receive - {http_server, {Params0, _Bool}, Headers} -> - Params = emqx_json:decode(Params0, [return_maps]), - try - validate_hook_resp(ClientId, Params), - validate_hook_headers(Headers), - case maps:get(<<"action">>, Params) of - <<"session_terminated">> -> - ok; - <<"client_connect">> -> - validate_params_and_headers(connected, ClientId); - _ -> - validate_params_and_headers(ClientState, ClientId) %% continue looping - end - catch - throw : {unknown_client, Other} -> - ct:pal("ignored_event_from_other_client ~p~nexpecting:~p~n~p~n~p", - [Other, ClientId, Params, Headers]), - validate_params_and_headers(ClientState, ClientId) %% continue looping - end - after - 5000 -> - case ClientState =:= undefined of - true -> error("client_was_never_connected"); - false -> error("terminate_action_is_not_received_in_time") - end - end. - -t_check_hooked(_) -> - {ok, Rules} = application:get_env(emqx_web_hook, rules), - lists:foreach(fun({HookName, _Action}) -> - Hooks = ?HOOK_LOOKUP(HookName), - ?assertEqual(true, length(Hooks) > 0) - end, Rules). - -t_change_config(_) -> - {ok, Rules} = application:get_env(emqx_web_hook, rules), - emqx_web_hook:unload(), - HookRules = lists:keydelete("message.delivered", 1, Rules), - application:set_env(emqx_web_hook, rules, HookRules), - emqx_web_hook:load(), - ?assertEqual([], ?HOOK_LOOKUP("message.delivered")), - emqx_web_hook:unload(), - application:set_env(emqx_web_hook, rules, Rules), - emqx_web_hook:load(). - -%%-------------------------------------------------------------------- -%% Utils -%%-------------------------------------------------------------------- - -validate_hook_headers(Headers) -> - ?assertEqual(<<"K1">>, maps:get(<<"k1">>, Headers)), - ?assertEqual(<<"K2">>, maps:get(<<"k2">>, Headers)). - -validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connect">>)) -> - assert_username_clientid(ClientId, Body), - ?assertEqual(5, maps:get(<<"proto_ver">>, Body)), - ?assertEqual(60, maps:get(<<"keepalive">>, Body)), - ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)), - ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), - ok; -validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connack">>)) -> - assert_username_clientid(ClientId, Body), - ?assertEqual(5, maps:get(<<"proto_ver">>, Body)), - ?assertEqual(60, maps:get(<<"keepalive">>, Body)), - ?assertEqual(<<"success">>, maps:get(<<"conn_ack">>, Body)), - ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)), - ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), - ok; -validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connected">>)) -> - assert_username_clientid(ClientId, Body), - _ = maps:get(<<"connected_at">>, Body), - ?assertEqual(5, maps:get(<<"proto_ver">>, Body)), - ?assertEqual(60, maps:get(<<"keepalive">>, Body)), - ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)), - ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)); -validate_hook_resp(ClientId, Body = ?ACTION(<<"client_disconnected">>)) -> - assert_username_clientid(ClientId, Body), - ?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)), - ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)); -validate_hook_resp(ClientId, Body = ?ACTION(<<"client_subscribe">>)) -> - assert_username_clientid(ClientId, Body), - _ = maps:get(<<"opts">>, Body), - ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)), - ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)); -validate_hook_resp(ClientId, Body = ?ACTION(<<"client_unsubscribe">>)) -> - assert_username_clientid(ClientId, Body), - _ = maps:get(<<"opts">>, Body), - ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)), - ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)); -validate_hook_resp(ClientId, Body = ?ACTION(<<"session_subscribed">>)) -> - assert_username_clientid(ClientId, Body), - _ = maps:get(<<"opts">>, Body), - ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)), - ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)); -validate_hook_resp(ClientId, Body = ?ACTION(<<"session_unsubscribed">>)) -> - assert_username_clientid(ClientId, Body), - ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)), - ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)); -validate_hook_resp(ClientId, Body = ?ACTION(<<"session_terminated">>)) -> - assert_username_clientid(ClientId, Body), - ?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)), - ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)); -validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_publish">>)) -> - ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), - assert_messages_attrs(Body); -validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_delivered">>)) -> - ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), - assert_messages_attrs(Body); -validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_acked">>)) -> - ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), - assert_messages_attrs(Body). - -assert_username_clientid(ClientId, #{<<"clientid">> := ClientId, <<"username">> := Username}) -> - ?assertEqual(null, Username); -assert_username_clientid(_ClientId, #{<<"clientid">> := Other}) -> - throw({unknown_client, Other}). - -assert_messages_attrs(#{ <<"ts">> := _ - , <<"qos">> := _ - , <<"topic">> := _ - , <<"retain">> := _ - , <<"payload">> := _ - , <<"from_username">> := _ - , <<"from_client_id">> := _ - }) -> - ok. - -apps() -> - [emqx_web_hook, emqx_modules, emqx_management, emqx_rule_engine]. diff --git a/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/ca.pem b/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/ca.pem deleted file mode 100644 index 00b31d8a4..000000000 --- a/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/ca.pem +++ /dev/null @@ -1,19 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDAzCCAeugAwIBAgIBATANBgkqhkiG9w0BAQsFADA8MTowOAYDVQQDDDFNeVNR -TF9TZXJ2ZXJfOC4wLjE5X0F1dG9fR2VuZXJhdGVkX0NBX0NlcnRpZmljYXRlMB4X -DTIwMDYxMTAzMzg0NloXDTMwMDYwOTAzMzg0NlowPDE6MDgGA1UEAwwxTXlTUUxf -U2VydmVyXzguMC4xOV9BdXRvX0dlbmVyYXRlZF9DQV9DZXJ0aWZpY2F0ZTCCASIw -DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANJBlAYvTQ6euY4HcSn4syH7kq9s -KcG+OMjPUrj+KFEElCzgNuIhaS0f3ORQGB1PNcvVcfdXUI3WX332gWbr9s1b7Xl1 -JKJfDXs+26Cm6NhONTE3sPHnbTSmQEFb52hwAtjQmcY3IQs1AgxKFFHJfnCBEWfE -ePBQaiuYk1XDESMdWpMLrPnYQaj9MpAOUxjlmZCayzPWlF0j0IWvfsF5TqZL7tFK -9p5F/DzyZ4n1mqPVEoUmq5ZdSKj2TQkpWTMHBWHEDQQqXbyE1FGJR7zEUFeuG1KT -sVBg7iZEC93SygZTbgUZSQXIwQCsO6xZ8MB2XDJkPbWp/3Wc6c8I6P09F48CAwEA -AaMQMA4wDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEADKz6bIpP5anp -GgLB0jkclRWuMlS4qqIt4itSsMXPJ/ezpHwECixmgW2TIQl6S1woRkUeMxhT2/Ay -Sn/7aKxuzRagyE5NEGOvrOuAP5RO2ZdNJ/X3/Rh533fK1sOTEEbSsWUvW6iSkZef -rsfZBVP32xBhRWkKRdLeLB4W99ADMa0IrTmZPCXHSSE2V4e1o6zWLXcOZeH1Qh8N -SkelBweR+8r1Fbvy1r3s7eH7DCbYoGEDVLQGOLvzHKBisQHmoDnnF5E9g1eeNRdg -o+vhOKfYCOzeNREJIqS42PHcGhdNRk90ycigPmfUJclz1mDHoMjKR2S5oosTpr65 -tNPx3CL7GA== ------END CERTIFICATE----- diff --git a/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/client-cert.pem b/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/client-cert.pem deleted file mode 100644 index aad1404ca..000000000 --- a/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/client-cert.pem +++ /dev/null @@ -1,19 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDBDCCAeygAwIBAgIBAzANBgkqhkiG9w0BAQsFADA8MTowOAYDVQQDDDFNeVNR -TF9TZXJ2ZXJfOC4wLjE5X0F1dG9fR2VuZXJhdGVkX0NBX0NlcnRpZmljYXRlMB4X -DTIwMDYxMTAzMzg0N1oXDTMwMDYwOTAzMzg0N1owQDE+MDwGA1UEAww1TXlTUUxf -U2VydmVyXzguMC4xOV9BdXRvX0dlbmVyYXRlZF9DbGllbnRfQ2VydGlmaWNhdGUw -ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDVYSWpOvCTupz82fc85Opv -EQ7rkB8X2oOMyBCpkyHKBIr1ZQgRDWBp9UVOASq3GnSElm6+T3Kb1QbOffa8GIlw -sjAueKdq5L2eSkmPIEQ7eoO5kEW+4V866hE1LeL/PmHg2lGP0iqZiJYtElhHNQO8 -3y9I7cm3xWMAA3SSWikVtpJRn3qIp2QSrH+tK+/HHbE5QwtPxdir4ULSCSOaM5Yh -Wi5Oto88TZqe1v7SXC864JVvO4LuS7TuSreCdWZyPXTJFBFeCEWSAxonKZrqHbBe -CwKML6/0NuzjaQ51c2tzmVI6xpHj3nnu4cSRx6Jf9WBm+35vm0wk4pohX3ptdzeV -AgMBAAGjDTALMAkGA1UdEwQCMAAwDQYJKoZIhvcNAQELBQADggEBAByQ5zSNeFUH -Aw7JlpZHtHaSEeiiyBHke20ziQ07BK1yi/ms2HAWwQkpZv149sjNuIRH8pkTmkZn -g8PDzSefjLbC9AsWpWV0XNV22T/cdobqLqMBDDZ2+5bsV+jTrOigWd9/AHVZ93PP -IJN8HJn6rtvo2l1bh/CdsX14uVSdofXnuWGabNTydqtMvmCerZsdf6qKqLL+PYwm -RDpgWiRUY7KPBSSlKm/9lJzA+bOe4dHeJzxWFVCJcbpoiTFs1je1V8kKQaHtuW39 -ifX6LTKUMlwEECCbDKM8Yq2tm8NjkjCcnFDtKg8zKGPUu+jrFMN5otiC3wnKcP7r -O9EkaPcgYH8= ------END CERTIFICATE----- diff --git a/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/client-key.pem b/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/client-key.pem deleted file mode 100644 index 6789d0291..000000000 --- a/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/client-key.pem +++ /dev/null @@ -1,27 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEowIBAAKCAQEA1WElqTrwk7qc/Nn3POTqbxEO65AfF9qDjMgQqZMhygSK9WUI -EQ1gafVFTgEqtxp0hJZuvk9ym9UGzn32vBiJcLIwLninauS9nkpJjyBEO3qDuZBF -vuFfOuoRNS3i/z5h4NpRj9IqmYiWLRJYRzUDvN8vSO3Jt8VjAAN0klopFbaSUZ96 -iKdkEqx/rSvvxx2xOUMLT8XYq+FC0gkjmjOWIVouTraPPE2antb+0lwvOuCVbzuC -7ku07kq3gnVmcj10yRQRXghFkgMaJyma6h2wXgsCjC+v9Dbs42kOdXNrc5lSOsaR -49557uHEkceiX/VgZvt+b5tMJOKaIV96bXc3lQIDAQABAoIBAF7yjXmSOn7h6P0y -WCuGiTLG2mbDiLJqj2LTm2Z5i+2Cu/qZ7E76Ls63TxF4v3MemH5vGfQhEhR5ZD/6 -GRJ1sKKvB3WGRqjwA9gtojHH39S/nWGy6vYW/vMOOH37XyjIr3EIdIaUtFQBTSHd -Kd71niYrAbVn6fyWHolhADwnVmTMOl5OOAhCdEF4GN3b5aIhIu8BJ7EUzTtHBJIj -CAEfjZFjDs1y1cIgGFJkuIQxMfCpq5recU2qwip7YO6fk//WEjOPu7kSf5IEswL8 -jg1dea9rGBV6KaD2xsgsC6Ll6Sb4BbsrHMfflG3K2Lk3RdVqqTFp1Fn1PTLQE/1S -S/SZPYECgYEA9qYcHKHd0+Q5Ty5wgpxKGa4UCWkpwvfvyv4bh8qlmxueB+l2AIdo -ZvkM8gTPagPQ3WypAyC2b9iQu70uOJo1NizTtKnpjDdN1YpDjISJuS/P0x73gZwy -gmoM5AzMtN4D6IbxXtXnPaYICvwLKU80ouEN5ZPM4/ODLUu6gsp0v2UCgYEA3Xgi -zMC4JF0vEKEaK0H6QstaoXUmw/lToZGH3TEojBIkb/2LrHUclygtONh9kJSFb89/ -jbmRRLAOrx3HZKCNGUmF4H9k5OQyAIv6OGBinvLGqcbqnyNlI+Le8zxySYwKMlEj -EMrBCLmSyi0CGFrbZ3mlj/oCET/ql9rNvcK+DHECgYAEx5dH3sMjtgp+RFId1dWB -xePRgt4yTwewkVgLO5wV82UOljGZNQaK6Eyd7AXw8f38LHzh+KJQbIvxd2sL4cEi -OaAoohpKg0/Y0YMZl//rPMf0OWdmdZZs/I0fZjgZUSwWN3c59T8z7KG/RL8an9RP -S7kvN7wCttdV61/D5RR6GQKBgDxCe/WKWpBKaovzydMLWLTj7/0Oi0W3iXHkzzr4 -LTgvl4qBSofaNbVLUUKuZTv5rXUG2IYPf99YqCYtzBstNDc1MiAriaBeFtzfOW4t -i6gEFtoLLbuvPc3N5Sv5vn8Ug5G9UfU3td5R4AbyyCcoUZqOFuZd+EIJSiOXfXOs -kVmBAoGBAIU9aPAqhU5LX902oq8KsrpdySONqv5mtoStvl3wo95WIqXNEsFY60wO -q02jKQmJJ2MqhkJm2EoF2Mq8+40EZ5sz8LdgeQ/M0yQ9lAhPi4rftwhpe55Ma9dk -SE9X1c/DMCBEaIjJqVXdy0/EeArwpb8sHkguVVAZUWxzD+phm1gs ------END RSA PRIVATE KEY----- diff --git a/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/server-cert.pem b/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/server-cert.pem deleted file mode 100644 index a2f9688df..000000000 --- a/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/server-cert.pem +++ /dev/null @@ -1,19 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDBDCCAeygAwIBAgIBAjANBgkqhkiG9w0BAQsFADA8MTowOAYDVQQDDDFNeVNR -TF9TZXJ2ZXJfOC4wLjE5X0F1dG9fR2VuZXJhdGVkX0NBX0NlcnRpZmljYXRlMB4X -DTIwMDYxMTAzMzg0NloXDTMwMDYwOTAzMzg0NlowQDE+MDwGA1UEAww1TXlTUUxf -U2VydmVyXzguMC4xOV9BdXRvX0dlbmVyYXRlZF9TZXJ2ZXJfQ2VydGlmaWNhdGUw -ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCcEnEm5hqP1EbEJycOz8Ua -NWp29QdpFUzTWhkKGhVXk+0msmNTw4NBAFB42moY44OU8wvDideOlJNhPRWveD8z -G2lxzJA91p0UK4et8ia9MmeuCGhdC9jxJ8X69WNlUiPyy0hI/ZsqRq9Z0C2eW0iL -JPXsy4X8Xpw3SFwoXf5pR9RFY5Pb2tuyxqmSestu2VXT/NQjJg4CVDR3mFcHPXZB -4elRzH0WshExEGkgy0bg20MJeRc2Qdb5Xx+EakbmwroDWaCn3NSGqQ7jv6Vw0doy -TGvS6h6RHBxnyqRfRgKGlCoOMG9/5+rFJC00QpCUG2vHXHWGoWlMlJ3foN7rj5v9 -AgMBAAGjDTALMAkGA1UdEwQCMAAwDQYJKoZIhvcNAQELBQADggEBAJ5zt2rj4Ag6 -zpN59AWC1Fur8g8l41ksHkSpKPp+PtyO/ngvbMqBpfmK1e7JCKZv/68QXfMyWWAI -hwalqZkXXWHKjuz3wE7dE25PXFXtGJtcZAaj10xt98fzdqt8lQSwh2kbfNwZIz1F -sgAStgE7+ZTcqTgvNB76Os1UK0to+/P0VBWktaVFdyub4Nc2SdPVnZNvrRBXBwOD -3V8ViwywDOFoE7DvCvwx/SVsvoC0Z4j3AMMovO6oHicP7uU83qsQgm1Qru3YeoLR -+DoVi7IPHbWvN7MqFYn3YjNlByO2geblY7MR0BlqbFlmFrqLsUfjsh2ys7/U/knC -dN/klu446fI= ------END CERTIFICATE----- diff --git a/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/server-key.pem b/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/server-key.pem deleted file mode 100644 index a1dfd5f78..000000000 --- a/apps/emqx_web_hook/test/emqx_web_hook_SUITE_data/server-key.pem +++ /dev/null @@ -1,27 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEowIBAAKCAQEAnBJxJuYaj9RGxCcnDs/FGjVqdvUHaRVM01oZChoVV5PtJrJj -U8ODQQBQeNpqGOODlPMLw4nXjpSTYT0Vr3g/MxtpccyQPdadFCuHrfImvTJnrgho -XQvY8SfF+vVjZVIj8stISP2bKkavWdAtnltIiyT17MuF/F6cN0hcKF3+aUfURWOT -29rbssapknrLbtlV0/zUIyYOAlQ0d5hXBz12QeHpUcx9FrIRMRBpIMtG4NtDCXkX -NkHW+V8fhGpG5sK6A1mgp9zUhqkO47+lcNHaMkxr0uoekRwcZ8qkX0YChpQqDjBv -f+fqxSQtNEKQlBtrx1x1hqFpTJSd36De64+b/QIDAQABAoIBAFiah66Dt9SruLkn -WR8piUaFyLlcBib8Nq9OWSTJBhDAJERxxb4KIvvGB+l0ZgNXNp5bFPSfzsZdRwZP -PX5uj8Kd71Dxx3mz211WESMJdEC42u+MSmN4lGLkJ5t/sDwXU91E1vbJM0ve8THV -4/Ag9qA4DX2vVZOeyqT/6YHpSsPNZplqzrbAiwrfHwkctHfgqwOf3QLfhmVQgfCS -VwidBldEUv2whSIiIxh4Rv5St4kA68IBCbJxdpOpyuQBkk6CkxZ7VN9FqOuSd4Pk -Wm7iWyBMZsCmELZh5XAXld4BEt87C5R4CvbPBDZxAv3THk1DNNvpy3PFQfwARRFb -SAToYMECgYEAyL7U8yxpzHDYWd3oCx6vTi9p9N/z0FfAkWrRF6dm4UcSklNiT1Aq -EOnTA+SaW8tV3E64gCWcY23gNP8so/ZseWj6L+peHwtchaP9+KB7yGw2A+05+lOx -VetLTjAOmfpiUXFe5w1q4C1RGhLjZjjzW+GvwdAuchQgUEFaomrV+PUCgYEAxwfH -cmVGFbAktcjU4HSRjKSfawCrut+3YUOLybyku3Q/hP9amG8qkVTFe95CTLjLe2D0 -ccaTTpofFEJ32COeck0g0Ujn/qQ+KXRoauOYs4FB1DtqMpqB78wufWEUpDpbd9/h -J+gJdC/IADd4tJW9zA92g8IA7ZtFmqDtiSpQ0ekCgYAQGkaorvJZpN+l7cf0RGTZ -h7IfI2vCVZer0n6tQA9fmLzjoe6r4AlPzAHSOR8sp9XeUy43kUzHKQQoHCPvjw/K -eWJAP7OHF/k2+x2fOPhU7mEy1W+mJdp+wt4Kio5RSaVjVQ3AyPG+w8PSrJszEvRq -dWMMz+851WV2KpfjmWBKlQKBgQC++4j4DZQV5aMkSKV1CIZOBf3vaIJhXKEUFQPD -PmB4fBEjpwCg+zNGp6iktt65zi17o8qMjrb1mtCt2SY04eD932LZUHNFlwcLMmes -Ad+aiDLJ24WJL1f16eDGcOyktlblDZB5gZ/ovJzXEGOkLXglosTfo77OQculmDy2 -/UL2WQKBgGeKasmGNfiYAcWio+KXgFkHXWtAXB9B91B1OFnCa40wx+qnl71MIWQH -PQ/CZFNWOfGiNEJIZjrHsfNJoeXkhq48oKcT0AVCDYyLV0VxDO4ejT95mGW6njNd -JpvmhwwAjOvuWVr0tn4iXlSK8irjlJHmwcRjLTJq97vE9fsA2MjI ------END RSA PRIVATE KEY----- diff --git a/apps/emqx_web_hook/test/http_server.erl b/apps/emqx_web_hook/test/http_server.erl deleted file mode 100644 index 791f725d1..000000000 --- a/apps/emqx_web_hook/test/http_server.erl +++ /dev/null @@ -1,102 +0,0 @@ -%%-------------------------------------------------------------------- -%% A Simple HTTP Server based cowboy -%% -%% It will deliver the http-request params to initialer process -%%-------------------------------------------------------------------- -%% -%% Author:wwhai -%% --module(http_server). --behaviour(gen_server). - --export([start_link/3]). --export([stop/1]). --export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, init/2, terminate/2]). --record(state, {parent :: pid()}). -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- - -start_link(Parent, BasePort, Opts) -> - stop_http(), - stop_https(), - timer:sleep(100), - gen_server:start_link(?MODULE, {Parent, BasePort, Opts}, []). - -init({Parent, BasePort, Opts}) -> - ok = start_http(Parent, [{port, BasePort} | Opts]), - ok = start_https(Parent, [{port, BasePort + 1} | Opts]), - Parent ! {self(), ready}, - {ok, #state{parent = Parent}}. - -handle_call(_Request, _From, State) -> - {reply, ignored, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - stop_http(), - stop_https(). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -stop(Pid) -> - ok = gen_server:stop(Pid). - -%%-------------------------------------------------------------------- -%% Callbacks -%%-------------------------------------------------------------------- - -start_http(Parent, Opts) -> - {ok, _Pid1} = cowboy:start_clear(http, Opts, #{ - env => #{dispatch => compile_router(Parent)} - }), - Port = proplists:get_value(port, Opts), - io:format(standard_error, "[TEST LOG] Start http server on ~p successfully!~n", [Port]). - -start_https(Parent, Opts) -> - Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"), - SslOpts = [{keyfile, Path ++ "/server-key.pem"}, - {cacertfile, Path ++ "/ca.pem"}, - {certfile, Path ++ "/server-cert.pem"}], - - {ok, _Pid2} = cowboy:start_tls(https, Opts ++ SslOpts, - #{env => #{dispatch => compile_router(Parent)}}), - Port = proplists:get_value(port, Opts), - io:format(standard_error, "[TEST LOG] Start https server on ~p successfully!~n", [Port]). - -stop_http() -> - cowboy:stop_listener(http), - io:format("[TEST LOG] Stopped http server"). - -stop_https() -> - cowboy:stop_listener(https), - io:format("[TEST LOG] Stopped https server"). - -compile_router(Parent) -> - {ok, _} = application:ensure_all_started(cowboy), - cowboy_router:compile([ - {'_', [{"/", ?MODULE, #{parent => Parent}}]} - ]). - -init(Req, #{parent := Parent} = State) -> - Method = cowboy_req:method(Req), - Headers = cowboy_req:headers(Req), - [Params] = case Method of - <<"GET">> -> cowboy_req:parse_qs(Req); - <<"POST">> -> - {ok, PostVals, _} = cowboy_req:read_urlencoded_body(Req), - PostVals - end, - Parent ! {?MODULE, Params, Headers}, - {ok, reply(Req, ok), State}. - -reply(Req, ok) -> - cowboy_req:reply(200, #{<<"content-type">> => <<"text/plain">>}, <<"ok">>, Req); -reply(Req, error) -> - cowboy_req:reply(404, #{<<"content-type">> => <<"text/plain">>}, <<"deny">>, Req). diff --git a/apps/emqx_web_hook/test/props/prop_webhook_confs.erl b/apps/emqx_web_hook/test/props/prop_webhook_confs.erl deleted file mode 100644 index 8946ce1d2..000000000 --- a/apps/emqx_web_hook/test/props/prop_webhook_confs.erl +++ /dev/null @@ -1,146 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(prop_webhook_confs). --include_lib("proper/include/proper.hrl"). - --import(emqx_ct_proper_types, - [ url/0 - , nof/1 - ]). - --define(ALL(Vars, Types, Exprs), - ?SETUP(fun() -> - State = do_setup(), - fun() -> do_teardown(State) end - end, ?FORALL(Vars, Types, Exprs))). - -%%-------------------------------------------------------------------- -%% Properties -%%-------------------------------------------------------------------- - -prop_confs() -> - Schema = cuttlefish_schema:files(filelib:wildcard(code:priv_dir(emqx_web_hook) ++ "/*.schema")), - ?ALL({Url, Confs0}, {url(), confs()}, - begin - Confs = [{"web.hook.url", Url}|Confs0], - Envs = cuttlefish_generator:map(Schema, cuttlefish_conf_file(Confs)), - - assert_confs(Confs, Envs), - - set_application_envs(Envs), - {ok, _} = application:ensure_all_started(emqx_web_hook), - application:stop(emqx_web_hook), - unset_application_envs(Envs), - true - end). - -%%-------------------------------------------------------------------- -%% Helpers -%%-------------------------------------------------------------------- - -do_setup() -> - logger:set_primary_config(#{level => warning}), - emqx_ct_helpers:start_apps([], fun set_special_cfgs/1), - ok. - -do_teardown(_) -> - emqx_ct_helpers:stop_apps([]), - logger:set_primary_config(#{level => info}), - ok. - -set_special_cfgs(_) -> - application:set_env(emqx, plugins_loaded_file, undefined), - application:set_env(emqx, modules_loaded_file, undefined), - ok. - -assert_confs([{"web.hook.url", Url}|More], Envs) -> - %% Assert! - Url = deep_get_env("emqx_web_hook.url", Envs), - assert_confs(More, Envs); - -assert_confs([{"web.hook.rule." ++ HookName0, Spec}|More], Envs) -> - HookName = re:replace(HookName0, "\\.[0-9]", "", [{return, list}]), - Rules = deep_get_env("emqx_web_hook.rules", Envs), - - %% Assert! - Spec = proplists:get_value(HookName, Rules), - - assert_confs(More, Envs); - -assert_confs([_|More], Envs) -> - assert_confs(More, Envs); - -assert_confs([], _) -> - true. - -deep_get_env(Path, Envs) -> - lists:foldl( - fun(_K, undefiend) -> undefiend; - (K, Acc) -> proplists:get_value(binary_to_atom(K, utf8), Acc) - end, Envs, re:split(Path, "\\.")). - -set_application_envs(Envs) -> - application:set_env(Envs). - -unset_application_envs(Envs) -> - lists:foreach(fun({App, Es}) -> - lists:foreach(fun({K, _}) -> - application:unset_env(App, K) - end, Es) end, Envs). - -cuttlefish_conf_file(Ls) when is_list(Ls) -> - [cuttlefish_conf_option(K,V) || {K, V} <- Ls]. - -cuttlefish_conf_option(K, V) - when is_list(K) -> - {re:split(K, "[.]", [{return, list}]), V}. - -%%-------------------------------------------------------------------- -%% Generators -%%-------------------------------------------------------------------- - -confs() -> - nof([{"web.hook.headers.content-type", - oneof(["application/json"])}, - {"web.hook.body.encoding_of_payload_field", - oneof(["plain", "base64", "base62"])}, - {"web.hook.rule.client.connect.1", rule_spec()}, - {"web.hook.rule.client.connack.1", rule_spec()}, - {"web.hook.rule.client.connected.1", rule_spec()}, - {"web.hook.rule.client.disconnected.1", rule_spec()}, - {"web.hook.rule.client.subscribe.1", rule_spec()}, - {"web.hook.rule.client.unsubscribe.1", rule_spec()}, - {"web.hook.rule.session.subscribed.1", rule_spec()}, - {"web.hook.rule.session.unsubscribed.1", rule_spec()}, - {"web.hook.rule.session.terminated.1", rule_spec()}, - {"web.hook.rule.message.publish.1", rule_spec()}, - {"web.hook.rule.message.delivered.1", rule_spec()}, - {"web.hook.rule.message.acked.1", rule_spec()} - ]). - -rule_spec() -> - ?LET(Action, action_names(), - begin - binary_to_list(emqx_json:encode(#{action => Action})) - end). - -action_names() -> - oneof([on_client_connect, on_client_connack, on_client_connected, - on_client_connected, on_client_disconnected, on_client_subscribe, on_client_unsubscribe, - on_session_subscribed, on_session_unsubscribed, on_session_terminated, - on_message_publish, on_message_delivered, on_message_acked]). - diff --git a/apps/emqx_web_hook/test/props/prop_webhook_hooks.erl b/apps/emqx_web_hook/test/props/prop_webhook_hooks.erl deleted file mode 100644 index 311585287..000000000 --- a/apps/emqx_web_hook/test/props/prop_webhook_hooks.erl +++ /dev/null @@ -1,397 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(prop_webhook_hooks). - --include_lib("proper/include/proper.hrl"). - --import(emqx_ct_proper_types, - [ conninfo/0 - , clientinfo/0 - , sessioninfo/0 - , message/0 - , connack_return_code/0 - , topictab/0 - , topic/0 - , subopts/0 - ]). - --define(ALL(Vars, Types, Exprs), - ?SETUP(fun() -> - State = do_setup(), - fun() -> do_teardown(State) end - end, ?FORALL(Vars, Types, Exprs))). - -%%-------------------------------------------------------------------- -%% Properties -%%-------------------------------------------------------------------- - -prop_client_connect() -> - ?ALL({ConnInfo, ConnProps, Env}, - {conninfo(), conn_properties(), empty_env()}, - begin - ok = emqx_web_hook:on_client_connect(ConnInfo, ConnProps, Env), - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => client_connect, - node => stringfy(node()), - clientid => maps:get(clientid, ConnInfo), - username => maybe(maps:get(username, ConnInfo)), - ipaddress => peer2addr(maps:get(peername, ConnInfo)), - keepalive => maps:get(keepalive, ConnInfo), - proto_ver => maps:get(proto_ver, ConnInfo) - }), - true - end). - -prop_client_connack() -> - ?ALL({ConnInfo, Rc, AckProps, Env}, - {conninfo(), connack_return_code(), ack_properties(), empty_env()}, - begin - ok = emqx_web_hook:on_client_connack(ConnInfo, Rc, AckProps, Env), - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => client_connack, - node => stringfy(node()), - clientid => maps:get(clientid, ConnInfo), - username => maybe(maps:get(username, ConnInfo)), - ipaddress => peer2addr(maps:get(peername, ConnInfo)), - keepalive => maps:get(keepalive, ConnInfo), - proto_ver => maps:get(proto_ver, ConnInfo), - conn_ack => Rc - }), - true - end). - -prop_client_connected() -> - ?ALL({ClientInfo, ConnInfo, Env}, - {clientinfo(), conninfo(), empty_env()}, - begin - ok = emqx_web_hook:on_client_connected(ClientInfo, ConnInfo, Env), - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => client_connected, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - ipaddress => peer2addr(maps:get(peerhost, ClientInfo)), - keepalive => maps:get(keepalive, ConnInfo), - proto_ver => maps:get(proto_ver, ConnInfo), - connected_at => maps:get(connected_at, ConnInfo) - }), - true - end). - -prop_client_disconnected() -> - ?ALL({ClientInfo, Reason, ConnInfo, Env}, - {clientinfo(), shutdown_reason(), disconnected_conninfo(), empty_env()}, - begin - ok = emqx_web_hook:on_client_disconnected(ClientInfo, Reason, ConnInfo, Env), - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => client_disconnected, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - disconnected_at => maps:get(disconnected_at, ConnInfo), - reason => stringfy(Reason) - }), - true - end). - -prop_client_subscribe() -> - ?ALL({ClientInfo, SubProps, TopicTab, Env}, - {clientinfo(), sub_properties(), topictab(), topic_filter_env()}, - begin - ok = emqx_web_hook:on_client_subscribe(ClientInfo, SubProps, TopicTab, Env), - - Matched = filter_topictab(TopicTab, Env), - - lists:foreach(fun({Topic, Opts}) -> - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => client_subscribe, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - topic => Topic, - opts => Opts}) - end, Matched), - true - end). - -prop_client_unsubscribe() -> - ?ALL({ClientInfo, SubProps, TopicTab, Env}, - {clientinfo(), unsub_properties(), topictab(), topic_filter_env()}, - begin - ok = emqx_web_hook:on_client_unsubscribe(ClientInfo, SubProps, TopicTab, Env), - - Matched = filter_topictab(TopicTab, Env), - - lists:foreach(fun({Topic, Opts}) -> - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => client_unsubscribe, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - topic => Topic, - opts => Opts}) - end, Matched), - true - end). - -prop_session_subscribed() -> - ?ALL({ClientInfo, Topic, SubOpts, Env}, - {clientinfo(), topic(), subopts(), topic_filter_env()}, - begin - ok = emqx_web_hook:on_session_subscribed(ClientInfo, Topic, SubOpts, Env), - filter_topic_match(Topic, Env) andalso begin - Body = receive_http_request_body(), - Body1 = emqx_json:encode( - #{action => session_subscribed, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - topic => Topic, - opts => SubOpts - }), - Body = Body1 - end, - true - end). - -prop_session_unsubscribed() -> - ?ALL({ClientInfo, Topic, SubOpts, Env}, - {clientinfo(), topic(), subopts(), empty_env()}, - begin - ok = emqx_web_hook:on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env), - filter_topic_match(Topic, Env) andalso begin - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => session_unsubscribed, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - topic => Topic - }) - end, - true - end). - -prop_session_terminated() -> - ?ALL({ClientInfo, Reason, SessInfo, Env}, - {clientinfo(), shutdown_reason(), sessioninfo(), empty_env()}, - begin - ok = emqx_web_hook:on_session_terminated(ClientInfo, Reason, SessInfo, Env), - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => session_terminated, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - reason => stringfy(Reason) - }), - true - end). - -prop_message_publish() -> - ?ALL({Msg, Env, Encode}, {message(), topic_filter_env(), payload_encode()}, - begin - application:set_env(emqx_web_hook, encoding_of_payload_field, Encode), - {ok, Msg} = emqx_web_hook:on_message_publish(Msg, Env), - application:unset_env(emqx_web_hook, encoding_of_payload_field), - - (not emqx_message:is_sys(Msg)) - andalso filter_topic_match(emqx_message:topic(Msg), Env) - andalso begin - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => message_publish, - node => stringfy(node()), - from_client_id => emqx_message:from(Msg), - from_username => maybe(emqx_message:get_header(username, Msg)), - topic => emqx_message:topic(Msg), - qos => emqx_message:qos(Msg), - retain => emqx_message:get_flag(retain, Msg), - payload => encode(emqx_message:payload(Msg), Encode), - ts => emqx_message:timestamp(Msg) - }) - end, - true - end). - -prop_message_delivered() -> - ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), topic_filter_env(), payload_encode()}, - begin - application:set_env(emqx_web_hook, encoding_of_payload_field, Encode), - ok = emqx_web_hook:on_message_delivered(ClientInfo, Msg, Env), - application:unset_env(emqx_web_hook, encoding_of_payload_field), - - (not emqx_message:is_sys(Msg)) - andalso filter_topic_match(emqx_message:topic(Msg), Env) - andalso begin - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => message_delivered, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - from_client_id => emqx_message:from(Msg), - from_username => maybe(emqx_message:get_header(username, Msg)), - topic => emqx_message:topic(Msg), - qos => emqx_message:qos(Msg), - retain => emqx_message:get_flag(retain, Msg), - payload => encode(emqx_message:payload(Msg), Encode), - ts => emqx_message:timestamp(Msg) - }) - end, - true - end). - -prop_message_acked() -> - ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), empty_env(), payload_encode()}, - begin - application:set_env(emqx_web_hook, encoding_of_payload_field, Encode), - ok = emqx_web_hook:on_message_acked(ClientInfo, Msg, Env), - application:unset_env(emqx_web_hook, encoding_of_payload_field), - - (not emqx_message:is_sys(Msg)) - andalso filter_topic_match(emqx_message:topic(Msg), Env) - andalso begin - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => message_acked, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - from_client_id => emqx_message:from(Msg), - from_username => maybe(emqx_message:get_header(username, Msg)), - topic => emqx_message:topic(Msg), - qos => emqx_message:qos(Msg), - retain => emqx_message:get_flag(retain, Msg), - payload => encode(emqx_message:payload(Msg), Encode), - ts => emqx_message:timestamp(Msg) - }) - end, - true - end). - -%%-------------------------------------------------------------------- -%% Helper -%%-------------------------------------------------------------------- -do_setup() -> - %% Pre-defined envs - application:set_env(emqx_web_hook, path, "path"), - application:set_env(emqx_web_hook, headers, []), - - meck:new(ehttpc_pool, [passthrough, no_history]), - meck:expect(ehttpc_pool, pick_worker, fun(_, _) -> ok end), - - Self = self(), - meck:new(ehttpc, [passthrough, no_history]), - meck:expect(ehttpc, request, - fun(_ClientId, Method, {Path, Headers, Body}) -> - Self ! {Method, Path, Headers, Body}, {ok, 200, ok} - end), - - meck:new(emqx_metrics, [passthrough, no_history]), - meck:expect(emqx_metrics, inc, fun(_) -> ok end), - ok. - -do_teardown(_) -> - meck:unload(ehttpc_pool), - meck:unload(ehttpc), - meck:unload(emqx_metrics). - -maybe(undefined) -> null; -maybe(T) -> T. - -peer2addr({Host, _}) -> - list_to_binary(inet:ntoa(Host)); -peer2addr(Host) -> - list_to_binary(inet:ntoa(Host)). - -stringfy({shutdown, Reason}) -> - stringfy(Reason); -stringfy(Term) when is_binary(Term) -> - Term; -stringfy(Term) when is_atom(Term) -> - atom_to_binary(Term, utf8); -stringfy(Term) -> - unicode:characters_to_binary(io_lib:format("~0p", [Term])). - -receive_http_request_body() -> - receive - {post, _, _, Body} -> - Body - after 100 -> - exit(waiting_message_timeout) - end. - -filter_topictab(TopicTab, {undefined}) -> - TopicTab; -filter_topictab(TopicTab, {TopicFilter}) -> - lists:filter(fun({Topic, _}) -> emqx_topic:match(Topic, TopicFilter) end, TopicTab). - -filter_topic_match(_Topic, {undefined}) -> - true; -filter_topic_match(Topic, {TopicFilter}) -> - emqx_topic:match(Topic, TopicFilter). - -encode(Bin, base64) -> - base64:encode(Bin); -encode(Bin, base62) -> - emqx_base62:encode(Bin); -encode(Bin, _) -> - Bin. - -%%-------------------------------------------------------------------- -%% Generators -%%-------------------------------------------------------------------- - -conn_properties() -> - #{}. - -ack_properties() -> - #{}. - -sub_properties() -> - #{}. - -unsub_properties() -> - #{}. - -shutdown_reason() -> - oneof([disconnected, not_autherised, - "list_reason", <<"binary_reason">>, - {tuple, reason}, - {shutdown, emqx_ct_proper_types:limited_atom()}]). - -empty_env() -> - {undefined}. - -topic_filter_env() -> - oneof([{<<"#">>}, {undefined}, {topic()}]). - -payload_encode() -> - oneof([base62, base64, plain]). - -disconnected_conninfo() -> - ?LET(Info, conninfo(), - begin - Info#{disconnected_at => erlang:system_time(millisecond)} - end). diff --git a/rebar.config.erl b/rebar.config.erl index f28b43856..901749d72 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -286,8 +286,8 @@ relx_plugin_apps(ReleaseType) -> , emqx_coap , emqx_stomp , emqx_authentication - , emqx_web_hook , emqx_statsd + , emqx_rule_actions ] ++ relx_plugin_apps_per_rel(ReleaseType) ++ relx_plugin_apps_enterprise(is_enterprise())