chore(plugins): rm emqx-web-hook and mv webhook action to emqx_rule_actions

This commit is contained in:
Turtle 2021-06-28 16:12:06 +08:00 committed by turtleDeng
parent 5571c54607
commit faad90c9d4
28 changed files with 38 additions and 2029 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_mqtt,
[{description, "EMQ X Bridge to MQTT Broker"},
{vsn, "4.3.1"}, % strict semver, bump manually!
{vsn, "5.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,replayq,emqtt]},

View File

@ -1,16 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{"4.3.0", [
{load_module, emqx_bridge_worker, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
],
[
{"4.3.0", [
{load_module, emqx_bridge_worker, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
]
}.

View File

@ -0,0 +1,11 @@
# emqx_rule_actions
This project contains a collection of rule actions/resources. It is mainly for
making unit test easier. Also it's easier for us to create utils that many
modules depends on it.
## Build
-----
$ rebar3 compile

View File

@ -1,18 +1,25 @@
{plugins, [rebar3_proper]}.
{deps, []}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
no_debug_info,
compressed, %% for edge
{parse_transform}
]}.
{overrides, [{add, [{erl_opts, [no_debug_info, compressed]}]}]}.
{edoc_opts, [{preprocess, true}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
warnings_as_errors, deprecated_functions
]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{cover_export_enabled, true}.
{plugins, [rebar3_proper]}.

View File

@ -0,0 +1,11 @@
{application, emqx_rule_actions,
[{description, "Rule actions"},
{vsn, "5.0.0"},
{registered, []},
{applications,
[kernel,stdlib]},
{env,[]},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

View File

@ -1,31 +0,0 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
data
.DS_Store
.erlang.mk/
cover
ct.coverdata
deps
eunit.coverdata
test/ct.cover.spec
emqx_web_hook.d
emq_web_hook.d
rebar.lock
erlang.mk
rebar3.crashdump
etc/emqx_web_hook.conf.rendered
Mnesia.nonode@nohost

View File

@ -1,194 +0,0 @@
# emqx-web-hook
EMQ X WebHook plugin.
Please see: [EMQ X - WebHook](https://docs.emqx.io/broker/latest/en/advanced/webhook.html)
## emqx_web_hook.conf
```properties
## The web services URL for Hook request
##
## Value: String
web.hook.url = http://127.0.0.1:8080
## Encode message payload field
##
## Value: base64 | base62
## web.hook.encode_payload = base64
##--------------------------------------------------------------------
## Hook Rules
## These configuration items represent a list of events should be forwarded
##
## Format:
## web.hook.rule.<HookName>.<No> = <Spec>
web.hook.rule.client.connect.1 = {"action": "on_client_connect"}
web.hook.rule.client.connack.1 = {"action": "on_client_connack"}
web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
web.hook.rule.client.subscribe.1 = {"action": "on_client_subscribe"}
web.hook.rule.client.unsubscribe.1 = {"action": "on_client_unsubscribe"}
web.hook.rule.session.subscribed.1 = {"action": "on_session_subscribed"}
web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"}
web.hook.rule.session.terminated.1 = {"action": "on_session_terminated"}
web.hook.rule.message.publish.1 = {"action": "on_message_publish"}
web.hook.rule.message.delivered.1 = {"action": "on_message_delivered"}
web.hook.rule.message.acked.1 = {"action": "on_message_acked"}
```
## API
The HTTP request parameter format:
* client.connected
```json
{
"action":"client_connected",
"clientid":"C_1492410235117",
"username":"C_1492410235117",
"keepalive": 60,
"ipaddress": "127.0.0.1",
"proto_ver": 4,
"connected_at": 1556176748,
"conn_ack":0
}
```
* client.disconnected
```json
{
"action":"client_disconnected",
"clientid":"C_1492410235117",
"username":"C_1492410235117",
"reason":"normal"
}
```
* client.subscribe
```json
{
"action":"client_subscribe",
"clientid":"C_1492410235117",
"username":"C_1492410235117",
"topic":"world",
"opts":{
"qos":0
}
}
```
* client.unsubscribe
```json
{
"action":"client_unsubscribe",
"clientid":"C_1492410235117",
"username":"C_1492410235117",
"topic":"world"
}
```
* session.created
```json
{
"action":"session_created",
"clientid":"C_1492410235117",
"username":"C_1492410235117"
}
```
* session.subscribed
```json
{
"action":"session_subscribed",
"clientid":"C_1492410235117",
"username":"C_1492410235117",
"topic":"world",
"opts":{
"qos":0
}
}
```
* session.unsubscribed
```json
{
"action":"session_unsubscribed",
"clientid":"C_1492410235117",
"username":"C_1492410235117",
"topic":"world"
}
```
* session.terminated
```json
{
"action":"session_terminated",
"clientid":"C_1492410235117",
"username":"C_1492410235117",
"reason":"normal"
}
```
* message.publish
```json
{
"action":"message_publish",
"from_client_id":"C_1492410235117",
"from_username":"C_1492410235117",
"topic":"world",
"qos":0,
"retain":true,
"payload":"Hello world!",
"ts":1492412774
}
```
* message.delivered
```json
{
"action":"message_delivered",
"clientid":"C_1492410235117",
"username":"C_1492410235117",
"from_client_id":"C_1492410235117",
"from_username":"C_1492410235117",
"topic":"world",
"qos":0,
"retain":true,
"payload":"Hello world!",
"ts":1492412826
}
```
* message.acked
```json
{
"action":"message_acked",
"clientid":"C_1492410235117",
"username":"C_1492410235117",
"from_client_id":"C_1492410235117",
"from_username":"C_1492410235117",
"topic":"world",
"qos":1,
"retain":true,
"payload":"Hello world!",
"ts":1492412914
}
```
## License
Apache License Version 2.0
## Author
* [Sakib Sami](https://github.com/s4kibs4mi)
## Contributors
* [Deng](https://github.com/turtleDeng)
* [vishr](https://github.com/vishr)
* [emqplus](https://github.com/emqplus)
* [huangdan](https://github.com/huangdan)

View File

@ -1,3 +0,0 @@
1. HTTPS
2. More HTTP Headers and Options
3. MQTT 5.0

View File

@ -1,77 +0,0 @@
##====================================================================
## WebHook
##====================================================================
## Webhook URL
##
## Value: String
web.hook.url = "http://127.0.0.1:80"
## HTTP Headers
##
## Example:
## 1. web.hook.headers.content-type = "application/json"
## 2. web.hook.headers.accept = "*"
##
## Value: String
web.hook.headers.content-type = "application/json"
## The encoding format of the payload field in the HTTP body
## The payload field only appears in the on_message_publish and on_message_delivered actions
##
## Value: plain | base64 | base62
web.hook.body.encoding_of_payload_field = plain
##--------------------------------------------------------------------
## PEM format file of CA's
##
## Value: File
## web.hook.ssl.cacertfile = <PEM format file of CA's>
## Certificate file to use, PEM format assumed
##
## Value: File
## web.hook.ssl.certfile = <Certificate file to use>
## Private key file to use, PEM format assumed
##
## Value: File
## web.hook.ssl.keyfile = <Private key file to use>
## Turn on peer certificate verification
##
## Value: true | false
## web.hook.ssl.verify = false
## If not specified, the server's names returned in server's certificate is validated against
## what's provided `web.hook.url` config's host part.
## Setting to 'disable' will make EMQ X ignore unmatched server names.
## If set with a host name, the server's names returned in server's certificate is validated
## against this value.
##
## Value: String | disable
## web.hook.ssl.server_name_indication = disable
## Connection process pool size
##
## Value: Number
web.hook.pool_size = 32
##--------------------------------------------------------------------
## Hook Rules
## These configuration items represent a list of events should be forwarded
##
## Format:
## web.hook.rule.<HookName>.<No> = <Spec>
#web.hook.rule.client.connect.1 = "{"action": "on_client_connect"}"
#web.hook.rule.client.connack.1 = "{"action": "on_client_connack"}"
#web.hook.rule.client.connected.1 = "{"action": "on_client_connected"}"
#web.hook.rule.client.disconnected.1 = "{"action": "on_client_disconnected"}"
#web.hook.rule.client.subscribe.1 = "{"action": "on_client_subscribe"}"
#web.hook.rule.client.unsubscribe.1 = "{"action": "on_client_unsubscribe"}"
#web.hook.rule.session.subscribed.1 = "{"action": "on_session_subscribed"}"
#web.hook.rule.session.unsubscribed.1 = "{"action": "on_session_unsubscribed"}"
#web.hook.rule.session.terminated.1 = "{"action": "on_session_terminated"}"
#web.hook.rule.message.publish.1 = "{"action": "on_message_publish"}"
#web.hook.rule.message.delivered.1 = "{"action": "on_message_delivered"}"
#web.hook.rule.message.acked.1 = ""{"action": "on_message_acked"}"

View File

@ -1 +0,0 @@
-define(APP, emqx_web_hook).

View File

@ -1,105 +0,0 @@
%%-*- mode: erlang -*-
%% EMQ X R3.0 config mapping
{mapping, "web.hook.url", "emqx_web_hook.url", [
{datatype, string}
]}.
{mapping, "web.hook.headers.$name", "emqx_web_hook.headers", [
{datatype, string}
]}.
{mapping, "web.hook.body.encoding_of_payload_field", "emqx_web_hook.encoding_of_payload_field", [
{default, plain},
{datatype, {enum, [plain, base62, base64]}}
]}.
{mapping, "web.hook.ssl.cacertfile", "emqx_web_hook.cacertfile", [
{default, ""},
{datatype, string}
]}.
{mapping, "web.hook.ssl.certfile", "emqx_web_hook.certfile", [
{default, ""},
{datatype, string}
]}.
{mapping, "web.hook.ssl.keyfile", "emqx_web_hook.keyfile", [
{default, ""},
{datatype, string}
]}.
{mapping, "web.hook.ssl.verify", "emqx_web_hook.verify", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{mapping, "web.hook.ssl.server_name_indication", "emqx_web_hook.server_name_indication", [
{datatype, string}
]}.
{mapping, "web.hook.pool_size", "emqx_web_hook.pool_size", [
{default, 32},
{datatype, integer}
]}.
{mapping, "web.hook.rule.client.connect.$name", "emqx_web_hook.rules", [
{datatype, string}
]}.
{mapping, "web.hook.rule.client.connack.$name", "emqx_web_hook.rules", [
{datatype, string}
]}.
{mapping, "web.hook.rule.client.connected.$name", "emqx_web_hook.rules", [
{datatype, string}
]}.
{mapping, "web.hook.rule.client.disconnected.$name", "emqx_web_hook.rules", [
{datatype, string}
]}.
{mapping, "web.hook.rule.client.subscribe.$name", "emqx_web_hook.rules", [
{datatype, string}
]}.
{mapping, "web.hook.rule.client.unsubscribe.$name", "emqx_web_hook.rules", [
{datatype, string}
]}.
{mapping, "web.hook.rule.session.subscribed.$name", "emqx_web_hook.rules", [
{datatype, string}
]}.
{mapping, "web.hook.rule.session.unsubscribed.$name", "emqx_web_hook.rules", [
{datatype, string}
]}.
{mapping, "web.hook.rule.session.terminated.$name", "emqx_web_hook.rules", [
{datatype, string}
]}.
{mapping, "web.hook.rule.message.publish.$name", "emqx_web_hook.rules", [
{datatype, string}
]}.
{mapping, "web.hook.rule.message.acked.$name", "emqx_web_hook.rules", [
{datatype, string}
]}.
{mapping, "web.hook.rule.message.delivered.$name", "emqx_web_hook.rules", [
{datatype, string}
]}.
{translation, "emqx_web_hook.headers", fun(Conf) ->
Headers = cuttlefish_variable:filter_by_prefix("web.hook.headers", Conf),
[{K, V} || {[_, _, _, K], V} <- Headers]
end}.
{translation, "emqx_web_hook.rules", fun(Conf) ->
Hooks = cuttlefish_variable:filter_by_prefix("web.hook.rule", Conf),
lists:map(
fun({[_, _, _,Name1,Name2, _], Val}) ->
{lists:concat([Name1,".",Name2]), Val}
end, Hooks)
end}.

View File

@ -1,14 +0,0 @@
{application, emqx_web_hook,
[{description, "EMQ X WebHook Plugin"},
{vsn, "4.3.2"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_web_hook_sup]},
{applications, [kernel,stdlib,ehttpc]},
{mod, {emqx_web_hook_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
{links, [{"Homepage", "https://emqx.io/"},
{"Github", "https://github.com/emqx/emqx-web-hook"}
]}
]}.

View File

@ -1,18 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<"4.3.[0-1]">>, [
{restart_application, emqx_web_hook},
{apply,{emqx_rule_engine,refresh_resource,[web_hook]}}
]},
{<<".*">>, []}
],
[
{<<"4.3.[0-1]">>, [
{restart_application, emqx_web_hook},
{apply,{emqx_rule_engine,refresh_resource,[web_hook]}}
]},
{<<".*">>, []}
]
}.

View File

@ -1,390 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_web_hook).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-define(APP, emqx_web_hook).
-logger_header("[WebHook]").
-import(inet, [ntoa/1]).
%% APIs
-export([ register_metrics/0
, load/0
, unload/0
]).
%% Hooks callback
-export([ on_client_connect/3
, on_client_connack/4
, on_client_connected/3
, on_client_disconnected/4
, on_client_subscribe/4
, on_client_unsubscribe/4
]).
-export([ on_session_subscribed/4
, on_session_unsubscribed/4
, on_session_terminated/4
]).
-export([ on_message_publish/2
, on_message_delivered/3
, on_message_acked/3
]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
register_metrics() ->
lists:foreach(fun emqx_metrics:ensure/1,
['webhook.client_connect',
'webhook.client_connack',
'webhook.client_connected',
'webhook.client_disconnected',
'webhook.client_subscribe',
'webhook.client_unsubscribe',
'webhook.session_subscribed',
'webhook.session_unsubscribed',
'webhook.session_terminated',
'webhook.message_publish',
'webhook.message_delivered',
'webhook.message_acked']).
load() ->
lists:foreach(
fun({Hook, Fun, Filter}) ->
emqx:hook(Hook, {?MODULE, Fun, [{Filter}]})
end, parse_rule(application:get_env(?APP, rules, []))).
unload() ->
lists:foreach(
fun({Hook, Fun, _Filter}) ->
emqx:unhook(Hook, {?MODULE, Fun})
end, parse_rule(application:get_env(?APP, rules, []))).
%%--------------------------------------------------------------------
%% Client connect
%%--------------------------------------------------------------------
on_client_connect(ConnInfo = #{clientid := ClientId, username := Username, peername := {Peerhost, _}}, _ConnProp, _Env) ->
emqx_metrics:inc('webhook.client_connect'),
Params = #{ action => client_connect
, node => node()
, clientid => ClientId
, username => maybe(Username)
, ipaddress => iolist_to_binary(ntoa(Peerhost))
, keepalive => maps:get(keepalive, ConnInfo)
, proto_ver => maps:get(proto_ver, ConnInfo)
},
send_http_request(ClientId, Params).
%%--------------------------------------------------------------------
%% Client connack
%%--------------------------------------------------------------------
on_client_connack(ConnInfo = #{clientid := ClientId, username := Username, peername := {Peerhost, _}}, Rc, _AckProp, _Env) ->
emqx_metrics:inc('webhook.client_connack'),
Params = #{ action => client_connack
, node => node()
, clientid => ClientId
, username => maybe(Username)
, ipaddress => iolist_to_binary(ntoa(Peerhost))
, keepalive => maps:get(keepalive, ConnInfo)
, proto_ver => maps:get(proto_ver, ConnInfo)
, conn_ack => Rc
},
send_http_request(ClientId, Params).
%%--------------------------------------------------------------------
%% Client connected
%%--------------------------------------------------------------------
on_client_connected(#{clientid := ClientId, username := Username, peerhost := Peerhost}, ConnInfo, _Env) ->
emqx_metrics:inc('webhook.client_connected'),
Params = #{ action => client_connected
, node => node()
, clientid => ClientId
, username => maybe(Username)
, ipaddress => iolist_to_binary(ntoa(Peerhost))
, keepalive => maps:get(keepalive, ConnInfo)
, proto_ver => maps:get(proto_ver, ConnInfo)
, connected_at => maps:get(connected_at, ConnInfo)
},
send_http_request(ClientId, Params).
%%--------------------------------------------------------------------
%% Client disconnected
%%--------------------------------------------------------------------
on_client_disconnected(ClientInfo, {shutdown, Reason}, ConnInfo, Env) when is_atom(Reason) ->
on_client_disconnected(ClientInfo, Reason, ConnInfo, Env);
on_client_disconnected(#{clientid := ClientId, username := Username}, Reason, ConnInfo, _Env) ->
emqx_metrics:inc('webhook.client_disconnected'),
Params = #{ action => client_disconnected
, node => node()
, clientid => ClientId
, username => maybe(Username)
, reason => stringfy(maybe(Reason))
, disconnected_at => maps:get(disconnected_at, ConnInfo, erlang:system_time(millisecond))
},
send_http_request(ClientId, Params).
%%--------------------------------------------------------------------
%% Client subscribe
%%--------------------------------------------------------------------
on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties, TopicTable, {Filter}) ->
lists:foreach(fun({Topic, Opts}) ->
with_filter(
fun() ->
emqx_metrics:inc('webhook.client_subscribe'),
Params = #{ action => client_subscribe
, node => node()
, clientid => ClientId
, username => maybe(Username)
, topic => Topic
, opts => Opts
},
send_http_request(ClientId, Params)
end, Topic, Filter)
end, TopicTable).
%%--------------------------------------------------------------------
%% Client unsubscribe
%%--------------------------------------------------------------------
on_client_unsubscribe(#{clientid := ClientId, username := Username}, _Properties, TopicTable, {Filter}) ->
lists:foreach(fun({Topic, Opts}) ->
with_filter(
fun() ->
emqx_metrics:inc('webhook.client_unsubscribe'),
Params = #{ action => client_unsubscribe
, node => node()
, clientid => ClientId
, username => maybe(Username)
, topic => Topic
, opts => Opts
},
send_http_request(ClientId, Params)
end, Topic, Filter)
end, TopicTable).
%%--------------------------------------------------------------------
%% Session subscribed
%%--------------------------------------------------------------------
on_session_subscribed(#{clientid := ClientId, username := Username}, Topic, Opts, {Filter}) ->
with_filter(
fun() ->
emqx_metrics:inc('webhook.session_subscribed'),
Params = #{ action => session_subscribed
, node => node()
, clientid => ClientId
, username => maybe(Username)
, topic => Topic
, opts => Opts
},
send_http_request(ClientId, Params)
end, Topic, Filter).
%%--------------------------------------------------------------------
%% Session unsubscribed
%%--------------------------------------------------------------------
on_session_unsubscribed(#{clientid := ClientId, username := Username}, Topic, _Opts, {Filter}) ->
with_filter(
fun() ->
emqx_metrics:inc('webhook.session_unsubscribed'),
Params = #{ action => session_unsubscribed
, node => node()
, clientid => ClientId
, username => maybe(Username)
, topic => Topic
},
send_http_request(ClientId, Params)
end, Topic, Filter).
%%--------------------------------------------------------------------
%% Session terminated
%%--------------------------------------------------------------------
on_session_terminated(Info, {shutdown, Reason}, SessInfo, Env) when is_atom(Reason) ->
on_session_terminated(Info, Reason, SessInfo, Env);
on_session_terminated(#{clientid := ClientId, username := Username}, Reason, _SessInfo, _Env) ->
emqx_metrics:inc('webhook.session_terminated'),
Params = #{ action => session_terminated
, node => node()
, clientid => ClientId
, username => maybe(Username)
, reason => stringfy(maybe(Reason))
},
send_http_request(ClientId, Params).
%%--------------------------------------------------------------------
%% Message publish
%%--------------------------------------------------------------------
on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
{ok, Message};
on_message_publish(Message = #message{topic = Topic}, {Filter}) ->
with_filter(
fun() ->
emqx_metrics:inc('webhook.message_publish'),
{FromClientId, FromUsername} = parse_from(Message),
Params = #{ action => message_publish
, node => node()
, from_client_id => FromClientId
, from_username => FromUsername
, topic => Message#message.topic
, qos => Message#message.qos
, retain => emqx_message:get_flag(retain, Message)
, payload => encode_payload(Message#message.payload)
, ts => Message#message.timestamp
},
send_http_request(FromClientId, Params),
{ok, Message}
end, Message, Topic, Filter).
%%--------------------------------------------------------------------
%% Message deliver
%%--------------------------------------------------------------------
on_message_delivered(_ClientInfo,#message{topic = <<"$SYS/", _/binary>>}, _Env) ->
ok;
on_message_delivered(#{clientid := ClientId, username := Username},
Message = #message{topic = Topic}, {Filter}) ->
with_filter(
fun() ->
emqx_metrics:inc('webhook.message_delivered'),
{FromClientId, FromUsername} = parse_from(Message),
Params = #{ action => message_delivered
, node => node()
, clientid => ClientId
, username => maybe(Username)
, from_client_id => FromClientId
, from_username => FromUsername
, topic => Message#message.topic
, qos => Message#message.qos
, retain => emqx_message:get_flag(retain, Message)
, payload => encode_payload(Message#message.payload)
, ts => Message#message.timestamp
},
send_http_request(ClientId, Params)
end, Topic, Filter).
%%--------------------------------------------------------------------
%% Message acked
%%--------------------------------------------------------------------
on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
ok;
on_message_acked(#{clientid := ClientId, username := Username},
Message = #message{topic = Topic}, {Filter}) ->
with_filter(
fun() ->
emqx_metrics:inc('webhook.message_acked'),
{FromClientId, FromUsername} = parse_from(Message),
Params = #{ action => message_acked
, node => node()
, clientid => ClientId
, username => maybe(Username)
, from_client_id => FromClientId
, from_username => FromUsername
, topic => Message#message.topic
, qos => Message#message.qos
, retain => emqx_message:get_flag(retain, Message)
, payload => encode_payload(Message#message.payload)
, ts => Message#message.timestamp
},
send_http_request(ClientId, Params)
end, Topic, Filter).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
send_http_request(ClientID, Params) ->
{ok, Path} = application:get_env(?APP, path),
Headers = application:get_env(?APP, headers, []),
Body = emqx_json:encode(Params),
?LOG(debug, "Send to: ~0p, params: ~s", [Path, Body]),
case ehttpc:request(ehttpc_pool:pick_worker(?APP, ClientID), post, {Path, Headers, Body}) of
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
ok;
{ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
ok;
{ok, StatusCode, _} ->
?LOG(warning, "HTTP request failed with status code: ~p", [StatusCode]),
ok;
{ok, StatusCode, _, _} ->
?LOG(warning, "HTTP request failed with status code: ~p", [StatusCode]),
ok;
{error, Reason} ->
?LOG(error, "HTTP request error: ~p", [Reason]),
ok
end.
parse_rule(Rules) ->
parse_rule(Rules, []).
parse_rule([], Acc) ->
lists:reverse(Acc);
parse_rule([{Rule, Conf} | Rules], Acc) ->
Params = emqx_json:decode(iolist_to_binary(Conf)),
Action = proplists:get_value(<<"action">>, Params),
Filter = proplists:get_value(<<"topic">>, Params),
parse_rule(Rules, [{list_to_atom(Rule), binary_to_existing_atom(Action, utf8), Filter} | Acc]).
with_filter(Fun, _, undefined) ->
Fun(), ok;
with_filter(Fun, Topic, Filter) ->
case emqx_topic:match(Topic, Filter) of
true -> Fun(), ok;
false -> ok
end.
with_filter(Fun, _, _, undefined) ->
Fun();
with_filter(Fun, Msg, Topic, Filter) ->
case emqx_topic:match(Topic, Filter) of
true -> Fun();
false -> {ok, Msg}
end.
parse_from(Message) ->
{emqx_message:from(Message), maybe(emqx_message:get_header(username, Message))}.
encode_payload(Payload) ->
encode_payload(Payload, application:get_env(?APP, encoding_of_payload_field, plain)).
encode_payload(Payload, base62) -> emqx_base62:encode(Payload);
encode_payload(Payload, base64) -> base64:encode(Payload);
encode_payload(Payload, plain) -> Payload.
stringfy(Term) when is_binary(Term) ->
Term;
stringfy(Term) when is_atom(Term) ->
atom_to_binary(Term, utf8);
stringfy(Term) ->
unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
maybe(undefined) -> null;
maybe(Str) -> Str.

View File

@ -1,102 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_web_hook_app).
-behaviour(application).
-emqx_plugin(?MODULE).
-include("emqx_web_hook.hrl").
-export([ start/2
, stop/1
]).
start(_StartType, _StartArgs) ->
translate_env(),
{ok, Sup} = emqx_web_hook_sup:start_link(),
{ok, PoolOpts} = application:get_env(?APP, pool_opts),
{ok, _Pid} = ehttpc_sup:start_pool(?APP, PoolOpts),
emqx_web_hook:register_metrics(),
emqx_web_hook:load(),
{ok, Sup}.
stop(_State) ->
emqx_web_hook:unload(),
ehttpc_sup:stop_pool(?APP).
translate_env() ->
{ok, URL} = application:get_env(?APP, url),
{ok, #{host := Host,
port := Port,
scheme := Scheme} = URIMap} = emqx_http_lib:uri_parse(URL),
Path = path(URIMap),
PoolSize = application:get_env(?APP, pool_size, 32),
MoreOpts = case Scheme of
http ->
[{transport_opts, emqx_misc:ipv6_probe([])}];
https ->
CACertFile = application:get_env(?APP, cacertfile, undefined),
CertFile = application:get_env(?APP, certfile, undefined),
KeyFile = application:get_env(?APP, keyfile, undefined),
{ok, Verify} = application:get_env(?APP, verify),
VerifyType = case Verify of
true -> verify_peer;
false -> verify_none
end,
SNI = case application:get_env(?APP, server_name_indication, undefined) of
"disable" -> disable;
SNI0 -> SNI0
end,
TLSOpts = lists:filter(fun({_K, V}) ->
V /= <<>> andalso V /= undefined andalso V /= "" andalso true
end, [{keyfile, KeyFile},
{certfile, CertFile},
{cacertfile, CACertFile},
{verify, VerifyType},
{server_name_indication, SNI}]),
NTLSOpts = [ {versions, emqx_tls_lib:default_versions()}
, {ciphers, emqx_tls_lib:default_ciphers()}
| TLSOpts
],
[{transport, ssl}, {transport_opts, emqx_misc:ipv6_probe(NTLSOpts)}]
end,
PoolOpts = [{host, Host},
{port, Port},
{pool_size, PoolSize},
{pool_type, hash},
{connect_timeout, 5000},
{retry, 5},
{retry_timeout, 1000}] ++ MoreOpts,
application:set_env(?APP, path, Path),
application:set_env(?APP, pool_opts, PoolOpts),
Headers = application:get_env(?APP, headers, []),
NHeaders = set_content_type(Headers),
application:set_env(?APP, headers, NHeaders).
path(#{path := "", 'query' := Query}) ->
"?" ++ Query;
path(#{path := Path, 'query' := Query}) ->
Path ++ "?" ++ Query;
path(#{path := ""}) ->
"/";
path(#{path := Path}) ->
Path.
set_content_type(Headers) ->
NHeaders = proplists:delete(<<"Content-Type">>, proplists:delete(<<"content-type">>, Headers)),
[{<<"content-type">>, <<"application/json">>} | NHeaders].

View File

@ -1,29 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_web_hook_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.

View File

@ -1,284 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_web_hook_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(HOOK_LOOKUP(H), emqx_hooks:lookup(list_to_atom(H))).
-define(ACTION(Name), #{<<"action">> := Name}).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
[ {group, http}
, {group, https}
, {group, ipv6http}
, {group, ipv6https}
, {group, all}
].
groups() ->
Cases = [test_full_flow],
[ {http, [sequence], Cases}
, {https, [sequence], Cases}
, {ipv6http, [sequence], Cases}
, {ipv6https, [sequence], Cases}
, {all, [sequence], emqx_ct:all(?MODULE)}
].
start_apps(F) -> emqx_ct_helpers:start_apps(apps(), F).
init_per_group(Name, Config) ->
application:ensure_all_started(emqx_management),
set_special_cfgs(),
BasePort =
case Name of
all -> 8801;
http -> 8811;
https -> 8821;
ipv6http -> 8831;
ipv6https -> 8841
end,
CF = case Name of
all -> fun set_special_configs_http/1;
http -> fun set_special_configs_http/1;
https -> fun set_special_configs_https/1;
ipv6http -> fun set_special_configs_ipv6_http/1;
ipv6https -> fun set_special_configs_ipv6_https/1
end,
start_apps(fun(_) -> CF(BasePort) end),
Opts = case atom_to_list(Name) of
"ipv6" ++ _ -> [{ip, {0,0,0,0,0,0,0,1}}, inet6];
_ -> [inet]
end,
[{base_port, BasePort}, {transport_opts, Opts} | Config].
end_per_group(_Name, Config) ->
emqx_ct_helpers:stop_apps(apps()),
Config.
set_special_configs_http(Port) ->
application:set_env(emqx_web_hook, url, "http://127.0.0.1:" ++ integer_to_list(Port)).
set_special_configs_https(Port) ->
set_ssl_configs(),
application:set_env(emqx_web_hook, url, "https://127.0.0.1:" ++ integer_to_list(Port+1)).
set_special_configs_ipv6_http(Port) ->
application:set_env(emqx_web_hook, url, "http://[::1]:" ++ integer_to_list(Port)).
set_special_configs_ipv6_https(Port) ->
set_ssl_configs(),
application:set_env(emqx_web_hook, url, "https://[::1]:" ++ integer_to_list(Port+1)).
set_ssl_configs() ->
Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"),
SslOpts = [{keyfile, Path ++ "/client-key.pem"},
{certfile, Path ++ "/client-cert.pem"},
{cacertfile, Path ++ "/ca.pem"}],
application:set_env(emqx_web_hook, ssl, true),
application:set_env(emqx_web_hook, ssloptions, SslOpts).
set_special_cfgs() ->
AllRules = [{"message.acked", "{\"action\": \"on_message_acked\"}"},
{"message.delivered", "{\"action\": \"on_message_delivered\"}"},
{"message.publish", "{\"action\": \"on_message_publish\"}"},
{"session.terminated", "{\"action\": \"on_session_terminated\"}"},
{"session.unsubscribed", "{\"action\": \"on_session_unsubscribed\"}"},
{"session.subscribed", "{\"action\": \"on_session_subscribed\"}"},
{"client.unsubscribe", "{\"action\": \"on_client_unsubscribe\"}"},
{"client.subscribe", "{\"action\": \"on_client_subscribe\"}"},
{"client.disconnected", "{\"action\": \"on_client_disconnected\"}"},
{"client.connected", "{\"action\": \"on_client_connected\"}"},
{"client.connack", "{\"action\": \"on_client_connack\"}"},
{"client.connect", "{\"action\": \"on_client_connect\"}"}],
application:set_env(emqx_web_hook, rules, AllRules).
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
test_full_flow(Config) ->
[_|_] = Opts = proplists:get_value(transport_opts, Config),
BasePort = proplists:get_value(base_port, Config),
Tester = self(),
{ok, ServerPid} = http_server:start_link(Tester, BasePort, Opts),
receive {ServerPid, ready} -> ok
after 1000 -> error(timeout)
end,
application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]),
ClientId = iolist_to_binary(["client-", integer_to_list(erlang:system_time())]),
{ok, C} = emqtt:start_link([ {clientid, ClientId}
, {proto_ver, v5}
, {keepalive, 60}
]),
try
do_test_full_flow(C, ClientId)
after
Ref = erlang:monitor(process, ServerPid),
http_server:stop(ServerPid),
receive {'DOWN', Ref, _, _, _} -> ok
after 5000 -> error(timeout)
end
end.
do_test_full_flow(C, ClientId) ->
{ok, _} = emqtt:connect(C),
{ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos2),
{ok, _} = emqtt:publish(C, <<"TopicA">>, <<"Payload...">>, qos2),
{ok, _, _} = emqtt:unsubscribe(C, <<"TopicA">>),
emqtt:disconnect(C),
validate_params_and_headers(undefined, ClientId).
validate_params_and_headers(ClientState, ClientId) ->
receive
{http_server, {Params0, _Bool}, Headers} ->
Params = emqx_json:decode(Params0, [return_maps]),
try
validate_hook_resp(ClientId, Params),
validate_hook_headers(Headers),
case maps:get(<<"action">>, Params) of
<<"session_terminated">> ->
ok;
<<"client_connect">> ->
validate_params_and_headers(connected, ClientId);
_ ->
validate_params_and_headers(ClientState, ClientId) %% continue looping
end
catch
throw : {unknown_client, Other} ->
ct:pal("ignored_event_from_other_client ~p~nexpecting:~p~n~p~n~p",
[Other, ClientId, Params, Headers]),
validate_params_and_headers(ClientState, ClientId) %% continue looping
end
after
5000 ->
case ClientState =:= undefined of
true -> error("client_was_never_connected");
false -> error("terminate_action_is_not_received_in_time")
end
end.
t_check_hooked(_) ->
{ok, Rules} = application:get_env(emqx_web_hook, rules),
lists:foreach(fun({HookName, _Action}) ->
Hooks = ?HOOK_LOOKUP(HookName),
?assertEqual(true, length(Hooks) > 0)
end, Rules).
t_change_config(_) ->
{ok, Rules} = application:get_env(emqx_web_hook, rules),
emqx_web_hook:unload(),
HookRules = lists:keydelete("message.delivered", 1, Rules),
application:set_env(emqx_web_hook, rules, HookRules),
emqx_web_hook:load(),
?assertEqual([], ?HOOK_LOOKUP("message.delivered")),
emqx_web_hook:unload(),
application:set_env(emqx_web_hook, rules, Rules),
emqx_web_hook:load().
%%--------------------------------------------------------------------
%% Utils
%%--------------------------------------------------------------------
validate_hook_headers(Headers) ->
?assertEqual(<<"K1">>, maps:get(<<"k1">>, Headers)),
?assertEqual(<<"K2">>, maps:get(<<"k2">>, Headers)).
validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connect">>)) ->
assert_username_clientid(ClientId, Body),
?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
?assertEqual(60, maps:get(<<"keepalive">>, Body)),
?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
ok;
validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connack">>)) ->
assert_username_clientid(ClientId, Body),
?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
?assertEqual(60, maps:get(<<"keepalive">>, Body)),
?assertEqual(<<"success">>, maps:get(<<"conn_ack">>, Body)),
?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
ok;
validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connected">>)) ->
assert_username_clientid(ClientId, Body),
_ = maps:get(<<"connected_at">>, Body),
?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
?assertEqual(60, maps:get(<<"keepalive">>, Body)),
?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
validate_hook_resp(ClientId, Body = ?ACTION(<<"client_disconnected">>)) ->
assert_username_clientid(ClientId, Body),
?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
validate_hook_resp(ClientId, Body = ?ACTION(<<"client_subscribe">>)) ->
assert_username_clientid(ClientId, Body),
_ = maps:get(<<"opts">>, Body),
?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
validate_hook_resp(ClientId, Body = ?ACTION(<<"client_unsubscribe">>)) ->
assert_username_clientid(ClientId, Body),
_ = maps:get(<<"opts">>, Body),
?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
validate_hook_resp(ClientId, Body = ?ACTION(<<"session_subscribed">>)) ->
assert_username_clientid(ClientId, Body),
_ = maps:get(<<"opts">>, Body),
?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
validate_hook_resp(ClientId, Body = ?ACTION(<<"session_unsubscribed">>)) ->
assert_username_clientid(ClientId, Body),
?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
validate_hook_resp(ClientId, Body = ?ACTION(<<"session_terminated">>)) ->
assert_username_clientid(ClientId, Body),
?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_publish">>)) ->
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_messages_attrs(Body);
validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_delivered">>)) ->
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_messages_attrs(Body);
validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_acked">>)) ->
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_messages_attrs(Body).
assert_username_clientid(ClientId, #{<<"clientid">> := ClientId, <<"username">> := Username}) ->
?assertEqual(null, Username);
assert_username_clientid(_ClientId, #{<<"clientid">> := Other}) ->
throw({unknown_client, Other}).
assert_messages_attrs(#{ <<"ts">> := _
, <<"qos">> := _
, <<"topic">> := _
, <<"retain">> := _
, <<"payload">> := _
, <<"from_username">> := _
, <<"from_client_id">> := _
}) ->
ok.
apps() ->
[emqx_web_hook, emqx_modules, emqx_management, emqx_rule_engine].

View File

@ -1,19 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIDAzCCAeugAwIBAgIBATANBgkqhkiG9w0BAQsFADA8MTowOAYDVQQDDDFNeVNR
TF9TZXJ2ZXJfOC4wLjE5X0F1dG9fR2VuZXJhdGVkX0NBX0NlcnRpZmljYXRlMB4X
DTIwMDYxMTAzMzg0NloXDTMwMDYwOTAzMzg0NlowPDE6MDgGA1UEAwwxTXlTUUxf
U2VydmVyXzguMC4xOV9BdXRvX0dlbmVyYXRlZF9DQV9DZXJ0aWZpY2F0ZTCCASIw
DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANJBlAYvTQ6euY4HcSn4syH7kq9s
KcG+OMjPUrj+KFEElCzgNuIhaS0f3ORQGB1PNcvVcfdXUI3WX332gWbr9s1b7Xl1
JKJfDXs+26Cm6NhONTE3sPHnbTSmQEFb52hwAtjQmcY3IQs1AgxKFFHJfnCBEWfE
ePBQaiuYk1XDESMdWpMLrPnYQaj9MpAOUxjlmZCayzPWlF0j0IWvfsF5TqZL7tFK
9p5F/DzyZ4n1mqPVEoUmq5ZdSKj2TQkpWTMHBWHEDQQqXbyE1FGJR7zEUFeuG1KT
sVBg7iZEC93SygZTbgUZSQXIwQCsO6xZ8MB2XDJkPbWp/3Wc6c8I6P09F48CAwEA
AaMQMA4wDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEADKz6bIpP5anp
GgLB0jkclRWuMlS4qqIt4itSsMXPJ/ezpHwECixmgW2TIQl6S1woRkUeMxhT2/Ay
Sn/7aKxuzRagyE5NEGOvrOuAP5RO2ZdNJ/X3/Rh533fK1sOTEEbSsWUvW6iSkZef
rsfZBVP32xBhRWkKRdLeLB4W99ADMa0IrTmZPCXHSSE2V4e1o6zWLXcOZeH1Qh8N
SkelBweR+8r1Fbvy1r3s7eH7DCbYoGEDVLQGOLvzHKBisQHmoDnnF5E9g1eeNRdg
o+vhOKfYCOzeNREJIqS42PHcGhdNRk90ycigPmfUJclz1mDHoMjKR2S5oosTpr65
tNPx3CL7GA==
-----END CERTIFICATE-----

View File

@ -1,19 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIDBDCCAeygAwIBAgIBAzANBgkqhkiG9w0BAQsFADA8MTowOAYDVQQDDDFNeVNR
TF9TZXJ2ZXJfOC4wLjE5X0F1dG9fR2VuZXJhdGVkX0NBX0NlcnRpZmljYXRlMB4X
DTIwMDYxMTAzMzg0N1oXDTMwMDYwOTAzMzg0N1owQDE+MDwGA1UEAww1TXlTUUxf
U2VydmVyXzguMC4xOV9BdXRvX0dlbmVyYXRlZF9DbGllbnRfQ2VydGlmaWNhdGUw
ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDVYSWpOvCTupz82fc85Opv
EQ7rkB8X2oOMyBCpkyHKBIr1ZQgRDWBp9UVOASq3GnSElm6+T3Kb1QbOffa8GIlw
sjAueKdq5L2eSkmPIEQ7eoO5kEW+4V866hE1LeL/PmHg2lGP0iqZiJYtElhHNQO8
3y9I7cm3xWMAA3SSWikVtpJRn3qIp2QSrH+tK+/HHbE5QwtPxdir4ULSCSOaM5Yh
Wi5Oto88TZqe1v7SXC864JVvO4LuS7TuSreCdWZyPXTJFBFeCEWSAxonKZrqHbBe
CwKML6/0NuzjaQ51c2tzmVI6xpHj3nnu4cSRx6Jf9WBm+35vm0wk4pohX3ptdzeV
AgMBAAGjDTALMAkGA1UdEwQCMAAwDQYJKoZIhvcNAQELBQADggEBAByQ5zSNeFUH
Aw7JlpZHtHaSEeiiyBHke20ziQ07BK1yi/ms2HAWwQkpZv149sjNuIRH8pkTmkZn
g8PDzSefjLbC9AsWpWV0XNV22T/cdobqLqMBDDZ2+5bsV+jTrOigWd9/AHVZ93PP
IJN8HJn6rtvo2l1bh/CdsX14uVSdofXnuWGabNTydqtMvmCerZsdf6qKqLL+PYwm
RDpgWiRUY7KPBSSlKm/9lJzA+bOe4dHeJzxWFVCJcbpoiTFs1je1V8kKQaHtuW39
ifX6LTKUMlwEECCbDKM8Yq2tm8NjkjCcnFDtKg8zKGPUu+jrFMN5otiC3wnKcP7r
O9EkaPcgYH8=
-----END CERTIFICATE-----

View File

@ -1,27 +0,0 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEA1WElqTrwk7qc/Nn3POTqbxEO65AfF9qDjMgQqZMhygSK9WUI
EQ1gafVFTgEqtxp0hJZuvk9ym9UGzn32vBiJcLIwLninauS9nkpJjyBEO3qDuZBF
vuFfOuoRNS3i/z5h4NpRj9IqmYiWLRJYRzUDvN8vSO3Jt8VjAAN0klopFbaSUZ96
iKdkEqx/rSvvxx2xOUMLT8XYq+FC0gkjmjOWIVouTraPPE2antb+0lwvOuCVbzuC
7ku07kq3gnVmcj10yRQRXghFkgMaJyma6h2wXgsCjC+v9Dbs42kOdXNrc5lSOsaR
49557uHEkceiX/VgZvt+b5tMJOKaIV96bXc3lQIDAQABAoIBAF7yjXmSOn7h6P0y
WCuGiTLG2mbDiLJqj2LTm2Z5i+2Cu/qZ7E76Ls63TxF4v3MemH5vGfQhEhR5ZD/6
GRJ1sKKvB3WGRqjwA9gtojHH39S/nWGy6vYW/vMOOH37XyjIr3EIdIaUtFQBTSHd
Kd71niYrAbVn6fyWHolhADwnVmTMOl5OOAhCdEF4GN3b5aIhIu8BJ7EUzTtHBJIj
CAEfjZFjDs1y1cIgGFJkuIQxMfCpq5recU2qwip7YO6fk//WEjOPu7kSf5IEswL8
jg1dea9rGBV6KaD2xsgsC6Ll6Sb4BbsrHMfflG3K2Lk3RdVqqTFp1Fn1PTLQE/1S
S/SZPYECgYEA9qYcHKHd0+Q5Ty5wgpxKGa4UCWkpwvfvyv4bh8qlmxueB+l2AIdo
ZvkM8gTPagPQ3WypAyC2b9iQu70uOJo1NizTtKnpjDdN1YpDjISJuS/P0x73gZwy
gmoM5AzMtN4D6IbxXtXnPaYICvwLKU80ouEN5ZPM4/ODLUu6gsp0v2UCgYEA3Xgi
zMC4JF0vEKEaK0H6QstaoXUmw/lToZGH3TEojBIkb/2LrHUclygtONh9kJSFb89/
jbmRRLAOrx3HZKCNGUmF4H9k5OQyAIv6OGBinvLGqcbqnyNlI+Le8zxySYwKMlEj
EMrBCLmSyi0CGFrbZ3mlj/oCET/ql9rNvcK+DHECgYAEx5dH3sMjtgp+RFId1dWB
xePRgt4yTwewkVgLO5wV82UOljGZNQaK6Eyd7AXw8f38LHzh+KJQbIvxd2sL4cEi
OaAoohpKg0/Y0YMZl//rPMf0OWdmdZZs/I0fZjgZUSwWN3c59T8z7KG/RL8an9RP
S7kvN7wCttdV61/D5RR6GQKBgDxCe/WKWpBKaovzydMLWLTj7/0Oi0W3iXHkzzr4
LTgvl4qBSofaNbVLUUKuZTv5rXUG2IYPf99YqCYtzBstNDc1MiAriaBeFtzfOW4t
i6gEFtoLLbuvPc3N5Sv5vn8Ug5G9UfU3td5R4AbyyCcoUZqOFuZd+EIJSiOXfXOs
kVmBAoGBAIU9aPAqhU5LX902oq8KsrpdySONqv5mtoStvl3wo95WIqXNEsFY60wO
q02jKQmJJ2MqhkJm2EoF2Mq8+40EZ5sz8LdgeQ/M0yQ9lAhPi4rftwhpe55Ma9dk
SE9X1c/DMCBEaIjJqVXdy0/EeArwpb8sHkguVVAZUWxzD+phm1gs
-----END RSA PRIVATE KEY-----

View File

@ -1,19 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIDBDCCAeygAwIBAgIBAjANBgkqhkiG9w0BAQsFADA8MTowOAYDVQQDDDFNeVNR
TF9TZXJ2ZXJfOC4wLjE5X0F1dG9fR2VuZXJhdGVkX0NBX0NlcnRpZmljYXRlMB4X
DTIwMDYxMTAzMzg0NloXDTMwMDYwOTAzMzg0NlowQDE+MDwGA1UEAww1TXlTUUxf
U2VydmVyXzguMC4xOV9BdXRvX0dlbmVyYXRlZF9TZXJ2ZXJfQ2VydGlmaWNhdGUw
ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCcEnEm5hqP1EbEJycOz8Ua
NWp29QdpFUzTWhkKGhVXk+0msmNTw4NBAFB42moY44OU8wvDideOlJNhPRWveD8z
G2lxzJA91p0UK4et8ia9MmeuCGhdC9jxJ8X69WNlUiPyy0hI/ZsqRq9Z0C2eW0iL
JPXsy4X8Xpw3SFwoXf5pR9RFY5Pb2tuyxqmSestu2VXT/NQjJg4CVDR3mFcHPXZB
4elRzH0WshExEGkgy0bg20MJeRc2Qdb5Xx+EakbmwroDWaCn3NSGqQ7jv6Vw0doy
TGvS6h6RHBxnyqRfRgKGlCoOMG9/5+rFJC00QpCUG2vHXHWGoWlMlJ3foN7rj5v9
AgMBAAGjDTALMAkGA1UdEwQCMAAwDQYJKoZIhvcNAQELBQADggEBAJ5zt2rj4Ag6
zpN59AWC1Fur8g8l41ksHkSpKPp+PtyO/ngvbMqBpfmK1e7JCKZv/68QXfMyWWAI
hwalqZkXXWHKjuz3wE7dE25PXFXtGJtcZAaj10xt98fzdqt8lQSwh2kbfNwZIz1F
sgAStgE7+ZTcqTgvNB76Os1UK0to+/P0VBWktaVFdyub4Nc2SdPVnZNvrRBXBwOD
3V8ViwywDOFoE7DvCvwx/SVsvoC0Z4j3AMMovO6oHicP7uU83qsQgm1Qru3YeoLR
+DoVi7IPHbWvN7MqFYn3YjNlByO2geblY7MR0BlqbFlmFrqLsUfjsh2ys7/U/knC
dN/klu446fI=
-----END CERTIFICATE-----

View File

@ -1,27 +0,0 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAnBJxJuYaj9RGxCcnDs/FGjVqdvUHaRVM01oZChoVV5PtJrJj
U8ODQQBQeNpqGOODlPMLw4nXjpSTYT0Vr3g/MxtpccyQPdadFCuHrfImvTJnrgho
XQvY8SfF+vVjZVIj8stISP2bKkavWdAtnltIiyT17MuF/F6cN0hcKF3+aUfURWOT
29rbssapknrLbtlV0/zUIyYOAlQ0d5hXBz12QeHpUcx9FrIRMRBpIMtG4NtDCXkX
NkHW+V8fhGpG5sK6A1mgp9zUhqkO47+lcNHaMkxr0uoekRwcZ8qkX0YChpQqDjBv
f+fqxSQtNEKQlBtrx1x1hqFpTJSd36De64+b/QIDAQABAoIBAFiah66Dt9SruLkn
WR8piUaFyLlcBib8Nq9OWSTJBhDAJERxxb4KIvvGB+l0ZgNXNp5bFPSfzsZdRwZP
PX5uj8Kd71Dxx3mz211WESMJdEC42u+MSmN4lGLkJ5t/sDwXU91E1vbJM0ve8THV
4/Ag9qA4DX2vVZOeyqT/6YHpSsPNZplqzrbAiwrfHwkctHfgqwOf3QLfhmVQgfCS
VwidBldEUv2whSIiIxh4Rv5St4kA68IBCbJxdpOpyuQBkk6CkxZ7VN9FqOuSd4Pk
Wm7iWyBMZsCmELZh5XAXld4BEt87C5R4CvbPBDZxAv3THk1DNNvpy3PFQfwARRFb
SAToYMECgYEAyL7U8yxpzHDYWd3oCx6vTi9p9N/z0FfAkWrRF6dm4UcSklNiT1Aq
EOnTA+SaW8tV3E64gCWcY23gNP8so/ZseWj6L+peHwtchaP9+KB7yGw2A+05+lOx
VetLTjAOmfpiUXFe5w1q4C1RGhLjZjjzW+GvwdAuchQgUEFaomrV+PUCgYEAxwfH
cmVGFbAktcjU4HSRjKSfawCrut+3YUOLybyku3Q/hP9amG8qkVTFe95CTLjLe2D0
ccaTTpofFEJ32COeck0g0Ujn/qQ+KXRoauOYs4FB1DtqMpqB78wufWEUpDpbd9/h
J+gJdC/IADd4tJW9zA92g8IA7ZtFmqDtiSpQ0ekCgYAQGkaorvJZpN+l7cf0RGTZ
h7IfI2vCVZer0n6tQA9fmLzjoe6r4AlPzAHSOR8sp9XeUy43kUzHKQQoHCPvjw/K
eWJAP7OHF/k2+x2fOPhU7mEy1W+mJdp+wt4Kio5RSaVjVQ3AyPG+w8PSrJszEvRq
dWMMz+851WV2KpfjmWBKlQKBgQC++4j4DZQV5aMkSKV1CIZOBf3vaIJhXKEUFQPD
PmB4fBEjpwCg+zNGp6iktt65zi17o8qMjrb1mtCt2SY04eD932LZUHNFlwcLMmes
Ad+aiDLJ24WJL1f16eDGcOyktlblDZB5gZ/ovJzXEGOkLXglosTfo77OQculmDy2
/UL2WQKBgGeKasmGNfiYAcWio+KXgFkHXWtAXB9B91B1OFnCa40wx+qnl71MIWQH
PQ/CZFNWOfGiNEJIZjrHsfNJoeXkhq48oKcT0AVCDYyLV0VxDO4ejT95mGW6njNd
JpvmhwwAjOvuWVr0tn4iXlSK8irjlJHmwcRjLTJq97vE9fsA2MjI
-----END RSA PRIVATE KEY-----

View File

@ -1,102 +0,0 @@
%%--------------------------------------------------------------------
%% A Simple HTTP Server based cowboy
%%
%% It will deliver the http-request params to initialer process
%%--------------------------------------------------------------------
%%
%% Author:wwhai
%%
-module(http_server).
-behaviour(gen_server).
-export([start_link/3]).
-export([stop/1]).
-export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, init/2, terminate/2]).
-record(state, {parent :: pid()}).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
start_link(Parent, BasePort, Opts) ->
stop_http(),
stop_https(),
timer:sleep(100),
gen_server:start_link(?MODULE, {Parent, BasePort, Opts}, []).
init({Parent, BasePort, Opts}) ->
ok = start_http(Parent, [{port, BasePort} | Opts]),
ok = start_https(Parent, [{port, BasePort + 1} | Opts]),
Parent ! {self(), ready},
{ok, #state{parent = Parent}}.
handle_call(_Request, _From, State) ->
{reply, ignored, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
stop_http(),
stop_https().
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
stop(Pid) ->
ok = gen_server:stop(Pid).
%%--------------------------------------------------------------------
%% Callbacks
%%--------------------------------------------------------------------
start_http(Parent, Opts) ->
{ok, _Pid1} = cowboy:start_clear(http, Opts, #{
env => #{dispatch => compile_router(Parent)}
}),
Port = proplists:get_value(port, Opts),
io:format(standard_error, "[TEST LOG] Start http server on ~p successfully!~n", [Port]).
start_https(Parent, Opts) ->
Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"),
SslOpts = [{keyfile, Path ++ "/server-key.pem"},
{cacertfile, Path ++ "/ca.pem"},
{certfile, Path ++ "/server-cert.pem"}],
{ok, _Pid2} = cowboy:start_tls(https, Opts ++ SslOpts,
#{env => #{dispatch => compile_router(Parent)}}),
Port = proplists:get_value(port, Opts),
io:format(standard_error, "[TEST LOG] Start https server on ~p successfully!~n", [Port]).
stop_http() ->
cowboy:stop_listener(http),
io:format("[TEST LOG] Stopped http server").
stop_https() ->
cowboy:stop_listener(https),
io:format("[TEST LOG] Stopped https server").
compile_router(Parent) ->
{ok, _} = application:ensure_all_started(cowboy),
cowboy_router:compile([
{'_', [{"/", ?MODULE, #{parent => Parent}}]}
]).
init(Req, #{parent := Parent} = State) ->
Method = cowboy_req:method(Req),
Headers = cowboy_req:headers(Req),
[Params] = case Method of
<<"GET">> -> cowboy_req:parse_qs(Req);
<<"POST">> ->
{ok, PostVals, _} = cowboy_req:read_urlencoded_body(Req),
PostVals
end,
Parent ! {?MODULE, Params, Headers},
{ok, reply(Req, ok), State}.
reply(Req, ok) ->
cowboy_req:reply(200, #{<<"content-type">> => <<"text/plain">>}, <<"ok">>, Req);
reply(Req, error) ->
cowboy_req:reply(404, #{<<"content-type">> => <<"text/plain">>}, <<"deny">>, Req).

View File

@ -1,146 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(prop_webhook_confs).
-include_lib("proper/include/proper.hrl").
-import(emqx_ct_proper_types,
[ url/0
, nof/1
]).
-define(ALL(Vars, Types, Exprs),
?SETUP(fun() ->
State = do_setup(),
fun() -> do_teardown(State) end
end, ?FORALL(Vars, Types, Exprs))).
%%--------------------------------------------------------------------
%% Properties
%%--------------------------------------------------------------------
prop_confs() ->
Schema = cuttlefish_schema:files(filelib:wildcard(code:priv_dir(emqx_web_hook) ++ "/*.schema")),
?ALL({Url, Confs0}, {url(), confs()},
begin
Confs = [{"web.hook.url", Url}|Confs0],
Envs = cuttlefish_generator:map(Schema, cuttlefish_conf_file(Confs)),
assert_confs(Confs, Envs),
set_application_envs(Envs),
{ok, _} = application:ensure_all_started(emqx_web_hook),
application:stop(emqx_web_hook),
unset_application_envs(Envs),
true
end).
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
do_setup() ->
logger:set_primary_config(#{level => warning}),
emqx_ct_helpers:start_apps([], fun set_special_cfgs/1),
ok.
do_teardown(_) ->
emqx_ct_helpers:stop_apps([]),
logger:set_primary_config(#{level => info}),
ok.
set_special_cfgs(_) ->
application:set_env(emqx, plugins_loaded_file, undefined),
application:set_env(emqx, modules_loaded_file, undefined),
ok.
assert_confs([{"web.hook.url", Url}|More], Envs) ->
%% Assert!
Url = deep_get_env("emqx_web_hook.url", Envs),
assert_confs(More, Envs);
assert_confs([{"web.hook.rule." ++ HookName0, Spec}|More], Envs) ->
HookName = re:replace(HookName0, "\\.[0-9]", "", [{return, list}]),
Rules = deep_get_env("emqx_web_hook.rules", Envs),
%% Assert!
Spec = proplists:get_value(HookName, Rules),
assert_confs(More, Envs);
assert_confs([_|More], Envs) ->
assert_confs(More, Envs);
assert_confs([], _) ->
true.
deep_get_env(Path, Envs) ->
lists:foldl(
fun(_K, undefiend) -> undefiend;
(K, Acc) -> proplists:get_value(binary_to_atom(K, utf8), Acc)
end, Envs, re:split(Path, "\\.")).
set_application_envs(Envs) ->
application:set_env(Envs).
unset_application_envs(Envs) ->
lists:foreach(fun({App, Es}) ->
lists:foreach(fun({K, _}) ->
application:unset_env(App, K)
end, Es) end, Envs).
cuttlefish_conf_file(Ls) when is_list(Ls) ->
[cuttlefish_conf_option(K,V) || {K, V} <- Ls].
cuttlefish_conf_option(K, V)
when is_list(K) ->
{re:split(K, "[.]", [{return, list}]), V}.
%%--------------------------------------------------------------------
%% Generators
%%--------------------------------------------------------------------
confs() ->
nof([{"web.hook.headers.content-type",
oneof(["application/json"])},
{"web.hook.body.encoding_of_payload_field",
oneof(["plain", "base64", "base62"])},
{"web.hook.rule.client.connect.1", rule_spec()},
{"web.hook.rule.client.connack.1", rule_spec()},
{"web.hook.rule.client.connected.1", rule_spec()},
{"web.hook.rule.client.disconnected.1", rule_spec()},
{"web.hook.rule.client.subscribe.1", rule_spec()},
{"web.hook.rule.client.unsubscribe.1", rule_spec()},
{"web.hook.rule.session.subscribed.1", rule_spec()},
{"web.hook.rule.session.unsubscribed.1", rule_spec()},
{"web.hook.rule.session.terminated.1", rule_spec()},
{"web.hook.rule.message.publish.1", rule_spec()},
{"web.hook.rule.message.delivered.1", rule_spec()},
{"web.hook.rule.message.acked.1", rule_spec()}
]).
rule_spec() ->
?LET(Action, action_names(),
begin
binary_to_list(emqx_json:encode(#{action => Action}))
end).
action_names() ->
oneof([on_client_connect, on_client_connack, on_client_connected,
on_client_connected, on_client_disconnected, on_client_subscribe, on_client_unsubscribe,
on_session_subscribed, on_session_unsubscribed, on_session_terminated,
on_message_publish, on_message_delivered, on_message_acked]).

View File

@ -1,397 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(prop_webhook_hooks).
-include_lib("proper/include/proper.hrl").
-import(emqx_ct_proper_types,
[ conninfo/0
, clientinfo/0
, sessioninfo/0
, message/0
, connack_return_code/0
, topictab/0
, topic/0
, subopts/0
]).
-define(ALL(Vars, Types, Exprs),
?SETUP(fun() ->
State = do_setup(),
fun() -> do_teardown(State) end
end, ?FORALL(Vars, Types, Exprs))).
%%--------------------------------------------------------------------
%% Properties
%%--------------------------------------------------------------------
prop_client_connect() ->
?ALL({ConnInfo, ConnProps, Env},
{conninfo(), conn_properties(), empty_env()},
begin
ok = emqx_web_hook:on_client_connect(ConnInfo, ConnProps, Env),
Body = receive_http_request_body(),
Body = emqx_json:encode(
#{action => client_connect,
node => stringfy(node()),
clientid => maps:get(clientid, ConnInfo),
username => maybe(maps:get(username, ConnInfo)),
ipaddress => peer2addr(maps:get(peername, ConnInfo)),
keepalive => maps:get(keepalive, ConnInfo),
proto_ver => maps:get(proto_ver, ConnInfo)
}),
true
end).
prop_client_connack() ->
?ALL({ConnInfo, Rc, AckProps, Env},
{conninfo(), connack_return_code(), ack_properties(), empty_env()},
begin
ok = emqx_web_hook:on_client_connack(ConnInfo, Rc, AckProps, Env),
Body = receive_http_request_body(),
Body = emqx_json:encode(
#{action => client_connack,
node => stringfy(node()),
clientid => maps:get(clientid, ConnInfo),
username => maybe(maps:get(username, ConnInfo)),
ipaddress => peer2addr(maps:get(peername, ConnInfo)),
keepalive => maps:get(keepalive, ConnInfo),
proto_ver => maps:get(proto_ver, ConnInfo),
conn_ack => Rc
}),
true
end).
prop_client_connected() ->
?ALL({ClientInfo, ConnInfo, Env},
{clientinfo(), conninfo(), empty_env()},
begin
ok = emqx_web_hook:on_client_connected(ClientInfo, ConnInfo, Env),
Body = receive_http_request_body(),
Body = emqx_json:encode(
#{action => client_connected,
node => stringfy(node()),
clientid => maps:get(clientid, ClientInfo),
username => maybe(maps:get(username, ClientInfo)),
ipaddress => peer2addr(maps:get(peerhost, ClientInfo)),
keepalive => maps:get(keepalive, ConnInfo),
proto_ver => maps:get(proto_ver, ConnInfo),
connected_at => maps:get(connected_at, ConnInfo)
}),
true
end).
prop_client_disconnected() ->
?ALL({ClientInfo, Reason, ConnInfo, Env},
{clientinfo(), shutdown_reason(), disconnected_conninfo(), empty_env()},
begin
ok = emqx_web_hook:on_client_disconnected(ClientInfo, Reason, ConnInfo, Env),
Body = receive_http_request_body(),
Body = emqx_json:encode(
#{action => client_disconnected,
node => stringfy(node()),
clientid => maps:get(clientid, ClientInfo),
username => maybe(maps:get(username, ClientInfo)),
disconnected_at => maps:get(disconnected_at, ConnInfo),
reason => stringfy(Reason)
}),
true
end).
prop_client_subscribe() ->
?ALL({ClientInfo, SubProps, TopicTab, Env},
{clientinfo(), sub_properties(), topictab(), topic_filter_env()},
begin
ok = emqx_web_hook:on_client_subscribe(ClientInfo, SubProps, TopicTab, Env),
Matched = filter_topictab(TopicTab, Env),
lists:foreach(fun({Topic, Opts}) ->
Body = receive_http_request_body(),
Body = emqx_json:encode(
#{action => client_subscribe,
node => stringfy(node()),
clientid => maps:get(clientid, ClientInfo),
username => maybe(maps:get(username, ClientInfo)),
topic => Topic,
opts => Opts})
end, Matched),
true
end).
prop_client_unsubscribe() ->
?ALL({ClientInfo, SubProps, TopicTab, Env},
{clientinfo(), unsub_properties(), topictab(), topic_filter_env()},
begin
ok = emqx_web_hook:on_client_unsubscribe(ClientInfo, SubProps, TopicTab, Env),
Matched = filter_topictab(TopicTab, Env),
lists:foreach(fun({Topic, Opts}) ->
Body = receive_http_request_body(),
Body = emqx_json:encode(
#{action => client_unsubscribe,
node => stringfy(node()),
clientid => maps:get(clientid, ClientInfo),
username => maybe(maps:get(username, ClientInfo)),
topic => Topic,
opts => Opts})
end, Matched),
true
end).
prop_session_subscribed() ->
?ALL({ClientInfo, Topic, SubOpts, Env},
{clientinfo(), topic(), subopts(), topic_filter_env()},
begin
ok = emqx_web_hook:on_session_subscribed(ClientInfo, Topic, SubOpts, Env),
filter_topic_match(Topic, Env) andalso begin
Body = receive_http_request_body(),
Body1 = emqx_json:encode(
#{action => session_subscribed,
node => stringfy(node()),
clientid => maps:get(clientid, ClientInfo),
username => maybe(maps:get(username, ClientInfo)),
topic => Topic,
opts => SubOpts
}),
Body = Body1
end,
true
end).
prop_session_unsubscribed() ->
?ALL({ClientInfo, Topic, SubOpts, Env},
{clientinfo(), topic(), subopts(), empty_env()},
begin
ok = emqx_web_hook:on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env),
filter_topic_match(Topic, Env) andalso begin
Body = receive_http_request_body(),
Body = emqx_json:encode(
#{action => session_unsubscribed,
node => stringfy(node()),
clientid => maps:get(clientid, ClientInfo),
username => maybe(maps:get(username, ClientInfo)),
topic => Topic
})
end,
true
end).
prop_session_terminated() ->
?ALL({ClientInfo, Reason, SessInfo, Env},
{clientinfo(), shutdown_reason(), sessioninfo(), empty_env()},
begin
ok = emqx_web_hook:on_session_terminated(ClientInfo, Reason, SessInfo, Env),
Body = receive_http_request_body(),
Body = emqx_json:encode(
#{action => session_terminated,
node => stringfy(node()),
clientid => maps:get(clientid, ClientInfo),
username => maybe(maps:get(username, ClientInfo)),
reason => stringfy(Reason)
}),
true
end).
prop_message_publish() ->
?ALL({Msg, Env, Encode}, {message(), topic_filter_env(), payload_encode()},
begin
application:set_env(emqx_web_hook, encoding_of_payload_field, Encode),
{ok, Msg} = emqx_web_hook:on_message_publish(Msg, Env),
application:unset_env(emqx_web_hook, encoding_of_payload_field),
(not emqx_message:is_sys(Msg))
andalso filter_topic_match(emqx_message:topic(Msg), Env)
andalso begin
Body = receive_http_request_body(),
Body = emqx_json:encode(
#{action => message_publish,
node => stringfy(node()),
from_client_id => emqx_message:from(Msg),
from_username => maybe(emqx_message:get_header(username, Msg)),
topic => emqx_message:topic(Msg),
qos => emqx_message:qos(Msg),
retain => emqx_message:get_flag(retain, Msg),
payload => encode(emqx_message:payload(Msg), Encode),
ts => emqx_message:timestamp(Msg)
})
end,
true
end).
prop_message_delivered() ->
?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), topic_filter_env(), payload_encode()},
begin
application:set_env(emqx_web_hook, encoding_of_payload_field, Encode),
ok = emqx_web_hook:on_message_delivered(ClientInfo, Msg, Env),
application:unset_env(emqx_web_hook, encoding_of_payload_field),
(not emqx_message:is_sys(Msg))
andalso filter_topic_match(emqx_message:topic(Msg), Env)
andalso begin
Body = receive_http_request_body(),
Body = emqx_json:encode(
#{action => message_delivered,
node => stringfy(node()),
clientid => maps:get(clientid, ClientInfo),
username => maybe(maps:get(username, ClientInfo)),
from_client_id => emqx_message:from(Msg),
from_username => maybe(emqx_message:get_header(username, Msg)),
topic => emqx_message:topic(Msg),
qos => emqx_message:qos(Msg),
retain => emqx_message:get_flag(retain, Msg),
payload => encode(emqx_message:payload(Msg), Encode),
ts => emqx_message:timestamp(Msg)
})
end,
true
end).
prop_message_acked() ->
?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), empty_env(), payload_encode()},
begin
application:set_env(emqx_web_hook, encoding_of_payload_field, Encode),
ok = emqx_web_hook:on_message_acked(ClientInfo, Msg, Env),
application:unset_env(emqx_web_hook, encoding_of_payload_field),
(not emqx_message:is_sys(Msg))
andalso filter_topic_match(emqx_message:topic(Msg), Env)
andalso begin
Body = receive_http_request_body(),
Body = emqx_json:encode(
#{action => message_acked,
node => stringfy(node()),
clientid => maps:get(clientid, ClientInfo),
username => maybe(maps:get(username, ClientInfo)),
from_client_id => emqx_message:from(Msg),
from_username => maybe(emqx_message:get_header(username, Msg)),
topic => emqx_message:topic(Msg),
qos => emqx_message:qos(Msg),
retain => emqx_message:get_flag(retain, Msg),
payload => encode(emqx_message:payload(Msg), Encode),
ts => emqx_message:timestamp(Msg)
})
end,
true
end).
%%--------------------------------------------------------------------
%% Helper
%%--------------------------------------------------------------------
do_setup() ->
%% Pre-defined envs
application:set_env(emqx_web_hook, path, "path"),
application:set_env(emqx_web_hook, headers, []),
meck:new(ehttpc_pool, [passthrough, no_history]),
meck:expect(ehttpc_pool, pick_worker, fun(_, _) -> ok end),
Self = self(),
meck:new(ehttpc, [passthrough, no_history]),
meck:expect(ehttpc, request,
fun(_ClientId, Method, {Path, Headers, Body}) ->
Self ! {Method, Path, Headers, Body}, {ok, 200, ok}
end),
meck:new(emqx_metrics, [passthrough, no_history]),
meck:expect(emqx_metrics, inc, fun(_) -> ok end),
ok.
do_teardown(_) ->
meck:unload(ehttpc_pool),
meck:unload(ehttpc),
meck:unload(emqx_metrics).
maybe(undefined) -> null;
maybe(T) -> T.
peer2addr({Host, _}) ->
list_to_binary(inet:ntoa(Host));
peer2addr(Host) ->
list_to_binary(inet:ntoa(Host)).
stringfy({shutdown, Reason}) ->
stringfy(Reason);
stringfy(Term) when is_binary(Term) ->
Term;
stringfy(Term) when is_atom(Term) ->
atom_to_binary(Term, utf8);
stringfy(Term) ->
unicode:characters_to_binary(io_lib:format("~0p", [Term])).
receive_http_request_body() ->
receive
{post, _, _, Body} ->
Body
after 100 ->
exit(waiting_message_timeout)
end.
filter_topictab(TopicTab, {undefined}) ->
TopicTab;
filter_topictab(TopicTab, {TopicFilter}) ->
lists:filter(fun({Topic, _}) -> emqx_topic:match(Topic, TopicFilter) end, TopicTab).
filter_topic_match(_Topic, {undefined}) ->
true;
filter_topic_match(Topic, {TopicFilter}) ->
emqx_topic:match(Topic, TopicFilter).
encode(Bin, base64) ->
base64:encode(Bin);
encode(Bin, base62) ->
emqx_base62:encode(Bin);
encode(Bin, _) ->
Bin.
%%--------------------------------------------------------------------
%% Generators
%%--------------------------------------------------------------------
conn_properties() ->
#{}.
ack_properties() ->
#{}.
sub_properties() ->
#{}.
unsub_properties() ->
#{}.
shutdown_reason() ->
oneof([disconnected, not_autherised,
"list_reason", <<"binary_reason">>,
{tuple, reason},
{shutdown, emqx_ct_proper_types:limited_atom()}]).
empty_env() ->
{undefined}.
topic_filter_env() ->
oneof([{<<"#">>}, {undefined}, {topic()}]).
payload_encode() ->
oneof([base62, base64, plain]).
disconnected_conninfo() ->
?LET(Info, conninfo(),
begin
Info#{disconnected_at => erlang:system_time(millisecond)}
end).

View File

@ -286,8 +286,8 @@ relx_plugin_apps(ReleaseType) ->
, emqx_coap
, emqx_stomp
, emqx_authentication
, emqx_web_hook
, emqx_statsd
, emqx_rule_actions
]
++ relx_plugin_apps_per_rel(ReleaseType)
++ relx_plugin_apps_enterprise(is_enterprise())