emqx/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl

218 lines
9.4 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_web_hook_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(HOOK_LOOKUP(H), emqx_hooks:lookup(list_to_atom(H))).
-define(ACTION(Name), #{<<"action">> := Name}).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
[{group, http},
{group, https},
{group, ipv6http},
{group, ipv6https}].
groups() ->
Cases = emqx_ct:all(?MODULE),
[{http, [sequence], Cases},
{https, [sequence], Cases},
{ipv6http, [sequence], Cases},
{ipv6https, [sequence], Cases}].
init_per_group(Name, Config) ->
set_special_cfgs(),
case Name of
http ->
emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_http/1);
https ->
emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_https/1);
ipv6http ->
emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_ipv6_http/1);
ipv6https ->
emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_ipv6_https/1)
end,
Config.
end_per_group(_Name, Config) ->
emqx_ct_helpers:stop_apps([emqx_web_hook]),
Config.
set_special_configs_http(_) ->
application:set_env(emqx_web_hook, url, "http://127.0.0.1:9999").
set_special_configs_https(_) ->
Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"),
SslOpts = [{keyfile, Path ++ "/client-key.pem"},
{certfile, Path ++ "/client-cert.pem"},
{cafile, Path ++ "/ca.pem"}],
application:set_env(emqx_web_hook, ssl, true),
application:set_env(emqx_web_hook, ssloptions, SslOpts),
application:set_env(emqx_web_hook, url, "https://127.0.0.1:8888").
set_special_configs_ipv6_http(_) ->
application:set_env(emqx_web_hook, url, "http://[::1]:9999").
set_special_configs_ipv6_https(N) ->
set_special_configs_https(N),
application:set_env(emqx_web_hook, url, "https://[::1]:8888").
set_special_cfgs() ->
AllRules = [{"message.acked", "{\"action\": \"on_message_acked\"}"},
{"message.delivered", "{\"action\": \"on_message_delivered\"}"},
{"message.publish", "{\"action\": \"on_message_publish\"}"},
{"session.terminated", "{\"action\": \"on_session_terminated\"}"},
{"session.unsubscribed", "{\"action\": \"on_session_unsubscribed\"}"},
{"session.subscribed", "{\"action\": \"on_session_subscribed\"}"},
{"client.unsubscribe", "{\"action\": \"on_client_unsubscribe\"}"},
{"client.subscribe", "{\"action\": \"on_client_subscribe\"}"},
{"client.disconnected", "{\"action\": \"on_client_disconnected\"}"},
{"client.connected", "{\"action\": \"on_client_connected\"}"},
{"client.connack", "{\"action\": \"on_client_connack\"}"},
{"client.connect", "{\"action\": \"on_client_connect\"}"}],
application:set_env(emqx_web_hook, rules, AllRules).
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_valid(Config) ->
{ok, ServerPid} = http_server:start_link(),
application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]),
{ok, C} = emqtt:start_link([ {clientid, <<"simpleClient">>}
, {proto_ver, v5}
, {keepalive, 60}
]),
{ok, _} = emqtt:connect(C),
emqtt:subscribe(C, <<"TopicA">>, qos2),
emqtt:publish(C, <<"TopicA">>, <<"Payload...">>, qos2),
emqtt:unsubscribe(C, <<"TopicA">>),
emqtt:disconnect(C),
[begin
Maps = emqx_json:decode(P, [return_maps]),
validate_hook_resp(Maps),
validate_hook_headers(Headers)
end
|| {{P, _Bool}, Headers} <- http_server:get_received_data()],
http_server:stop(ServerPid),
Config.
t_check_hooked(_) ->
{ok, Rules} = application:get_env(emqx_web_hook, rules),
lists:foreach(fun({HookName, _Action}) ->
Hooks = ?HOOK_LOOKUP(HookName),
?assertEqual(true, length(Hooks) > 0)
end, Rules).
t_change_config(_) ->
{ok, Rules} = application:get_env(emqx_web_hook, rules),
emqx_web_hook:unload(),
HookRules = lists:keydelete("message.delivered", 1, Rules),
application:set_env(emqx_web_hook, rules, HookRules),
emqx_web_hook:load(),
?assertEqual([], ?HOOK_LOOKUP("message.delivered")),
emqx_web_hook:unload(),
application:set_env(emqx_web_hook, rules, Rules),
emqx_web_hook:load().
%%--------------------------------------------------------------------
%% Utils
%%--------------------------------------------------------------------
validate_hook_headers(Headers) ->
?assertEqual(<<"K1">>, maps:get(<<"k1">>, Headers)),
?assertEqual(<<"K2">>, maps:get(<<"k2">>, Headers)).
validate_hook_resp(Body = ?ACTION(<<"client_connect">>)) ->
?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
?assertEqual(60, maps:get(<<"keepalive">>, Body)),
?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_username_clientid(Body);
validate_hook_resp(Body = ?ACTION(<<"client_connack">>)) ->
?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
?assertEqual(60, maps:get(<<"keepalive">>, Body)),
?assertEqual(<<"success">>, maps:get(<<"conn_ack">>, Body)),
?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_username_clientid(Body);
validate_hook_resp(Body = ?ACTION(<<"client_connected">>)) ->
_ = maps:get(<<"connected_at">>, Body),
?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
?assertEqual(60, maps:get(<<"keepalive">>, Body)),
?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_username_clientid(Body);
validate_hook_resp(Body = ?ACTION(<<"client_disconnected">>)) ->
?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_username_clientid(Body);
validate_hook_resp(Body = ?ACTION(<<"client_subscribe">>)) ->
_ = maps:get(<<"opts">>, Body),
?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_username_clientid(Body);
validate_hook_resp(Body = ?ACTION(<<"client_unsubscribe">>)) ->
_ = maps:get(<<"opts">>, Body),
?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_username_clientid(Body);
validate_hook_resp(Body = ?ACTION(<<"session_subscribed">>)) ->
_ = maps:get(<<"opts">>, Body),
?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_username_clientid(Body);
validate_hook_resp(Body = ?ACTION(<<"session_unsubscribed">>)) ->
?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_username_clientid(Body);
validate_hook_resp(Body = ?ACTION(<<"session_terminated">>)) ->
?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_username_clientid(Body);
validate_hook_resp(Body = ?ACTION(<<"message_publish">>)) ->
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_messages_attrs(Body);
validate_hook_resp(Body = ?ACTION(<<"message_delivered">>)) ->
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_messages_attrs(Body);
validate_hook_resp(Body = ?ACTION(<<"message_acked">>)) ->
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
assert_messages_attrs(Body).
assert_username_clientid(#{<<"clientid">> := ClientId, <<"username">> := Username}) ->
?assertEqual(<<"simpleClient">>, ClientId),
?assertEqual(null, Username).
assert_messages_attrs(#{ <<"ts">> := _
, <<"qos">> := _
, <<"topic">> := _
, <<"retain">> := _
, <<"payload">> := _
, <<"from_username">> := _
, <<"from_client_id">> := _
}) ->
ok.