424 lines
14 KiB
Erlang
424 lines
14 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_bridge_rabbitmq_SUITE).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("stdlib/include/assert.hrl").
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
|
|
|
%% See comment in
|
|
%% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to
|
|
%% run this without bringing up the whole CI infrastructure
|
|
-define(TYPE, <<"rabbitmq">>).
|
|
|
|
rabbit_mq_host() ->
|
|
<<"rabbitmq">>.
|
|
|
|
rabbit_mq_port() ->
|
|
5672.
|
|
|
|
rabbit_mq_exchange() ->
|
|
<<"messages">>.
|
|
|
|
rabbit_mq_queue() ->
|
|
<<"test_queue">>.
|
|
|
|
rabbit_mq_routing_key() ->
|
|
<<"test_routing_key">>.
|
|
|
|
get_channel_connection(Config) ->
|
|
proplists:get_value(channel_connection, Config).
|
|
|
|
get_rabbitmq(Config) ->
|
|
proplists:get_value(rabbitmq, Config).
|
|
|
|
get_tls(Config) ->
|
|
proplists:get_value(tls, Config).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Common Test Setup, Tear down and Testcase List
|
|
%%------------------------------------------------------------------------------
|
|
|
|
all() ->
|
|
[
|
|
{group, tcp},
|
|
{group, tls}
|
|
].
|
|
|
|
groups() ->
|
|
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
|
[
|
|
{tcp, AllTCs},
|
|
{tls, AllTCs}
|
|
].
|
|
|
|
init_per_suite(Config) ->
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
ok.
|
|
|
|
init_per_group(tcp, Config) ->
|
|
RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"),
|
|
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")),
|
|
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
|
|
true ->
|
|
Config1 = common_init_per_group(#{
|
|
host => RabbitMQHost, port => RabbitMQPort, tls => false
|
|
}),
|
|
Config1 ++ Config;
|
|
false ->
|
|
case os:getenv("IS_CI") of
|
|
"yes" ->
|
|
throw(no_rabbitmq);
|
|
_ ->
|
|
{skip, no_rabbitmq}
|
|
end
|
|
end;
|
|
init_per_group(tls, Config) ->
|
|
RabbitMQHost = os:getenv("RABBITMQ_TLS_HOST", "rabbitmq"),
|
|
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_TLS_PORT", "5671")),
|
|
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
|
|
true ->
|
|
Config1 = common_init_per_group(#{
|
|
host => RabbitMQHost, port => RabbitMQPort, tls => true
|
|
}),
|
|
Config1 ++ Config;
|
|
false ->
|
|
case os:getenv("IS_CI") of
|
|
"yes" ->
|
|
throw(no_rabbitmq);
|
|
_ ->
|
|
{skip, no_rabbitmq}
|
|
end
|
|
end;
|
|
init_per_group(_Group, Config) ->
|
|
Config.
|
|
|
|
common_init_per_group(Opts) ->
|
|
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
|
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
|
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
|
{ok, _} = application:ensure_all_started(emqx_connector),
|
|
{ok, _} = application:ensure_all_started(amqp_client),
|
|
emqx_mgmt_api_test_util:init_suite(),
|
|
#{host := Host, port := Port, tls := UseTLS} = Opts,
|
|
ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS),
|
|
[
|
|
{channel_connection, ChannelConnection},
|
|
{rabbitmq, #{server => Host, port => Port}},
|
|
{tls, UseTLS}
|
|
].
|
|
|
|
setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS) ->
|
|
SSLOptions =
|
|
case UseTLS of
|
|
false -> none;
|
|
true -> emqx_tls_lib:to_client_opts(ssl_options(UseTLS))
|
|
end,
|
|
%% Create an exchange and a queue
|
|
{ok, Connection} =
|
|
amqp_connection:start(#amqp_params_network{
|
|
host = Host,
|
|
port = Port,
|
|
ssl_options = SSLOptions
|
|
}),
|
|
{ok, Channel} = amqp_connection:open_channel(Connection),
|
|
%% Create an exchange
|
|
#'exchange.declare_ok'{} =
|
|
amqp_channel:call(
|
|
Channel,
|
|
#'exchange.declare'{
|
|
exchange = rabbit_mq_exchange(),
|
|
type = <<"topic">>
|
|
}
|
|
),
|
|
%% Create a queue
|
|
#'queue.declare_ok'{} =
|
|
amqp_channel:call(
|
|
Channel,
|
|
#'queue.declare'{queue = rabbit_mq_queue()}
|
|
),
|
|
%% Bind the queue to the exchange
|
|
#'queue.bind_ok'{} =
|
|
amqp_channel:call(
|
|
Channel,
|
|
#'queue.bind'{
|
|
queue = rabbit_mq_queue(),
|
|
exchange = rabbit_mq_exchange(),
|
|
routing_key = rabbit_mq_routing_key()
|
|
}
|
|
),
|
|
#{
|
|
connection => Connection,
|
|
channel => Channel
|
|
}.
|
|
|
|
end_per_group(_Group, Config) ->
|
|
#{
|
|
connection := Connection,
|
|
channel := Channel
|
|
} = get_channel_connection(Config),
|
|
emqx_mgmt_api_test_util:end_suite(),
|
|
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
|
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
|
_ = application:stop(emqx_connector),
|
|
_ = application:stop(emqx_bridge),
|
|
%% Close the channel
|
|
ok = amqp_channel:close(Channel),
|
|
%% Close the connection
|
|
ok = amqp_connection:close(Connection).
|
|
|
|
init_per_testcase(_, Config) ->
|
|
Config.
|
|
|
|
end_per_testcase(_, _Config) ->
|
|
ok.
|
|
|
|
rabbitmq_config(UseTLS, Config) ->
|
|
BatchSize = maps:get(batch_size, Config, 1),
|
|
BatchTime = maps:get(batch_time_ms, Config, 0),
|
|
Name = atom_to_binary(?MODULE),
|
|
Server = maps:get(server, Config, rabbit_mq_host()),
|
|
Port = maps:get(port, Config, rabbit_mq_port()),
|
|
Template = maps:get(payload_template, Config, <<"">>),
|
|
ConfigString =
|
|
io_lib:format(
|
|
"bridges.rabbitmq.~s {\n"
|
|
" enable = true\n"
|
|
" ssl = ~s\n"
|
|
" server = \"~s\"\n"
|
|
" port = ~p\n"
|
|
" username = \"guest\"\n"
|
|
" password = \"guest\"\n"
|
|
" routing_key = \"~s\"\n"
|
|
" exchange = \"~s\"\n"
|
|
" payload_template = \"~s\"\n"
|
|
" resource_opts = {\n"
|
|
" batch_size = ~b\n"
|
|
" batch_time = ~bms\n"
|
|
" }\n"
|
|
"}\n",
|
|
[
|
|
Name,
|
|
hocon_pp:do(ssl_options(UseTLS), #{embedded => true}),
|
|
Server,
|
|
Port,
|
|
rabbit_mq_routing_key(),
|
|
rabbit_mq_exchange(),
|
|
Template,
|
|
BatchSize,
|
|
BatchTime
|
|
]
|
|
),
|
|
ct:pal(ConfigString),
|
|
parse_and_check(ConfigString, <<"rabbitmq">>, Name).
|
|
|
|
ssl_options(true) ->
|
|
CertsDir = filename:join([
|
|
emqx_common_test_helpers:proj_root(),
|
|
".ci",
|
|
"docker-compose-file",
|
|
"certs"
|
|
]),
|
|
#{
|
|
enable => true,
|
|
cacertfile => filename:join([CertsDir, "ca.crt"]),
|
|
certfile => filename:join([CertsDir, "client.pem"]),
|
|
keyfile => filename:join([CertsDir, "client.key"])
|
|
};
|
|
ssl_options(false) ->
|
|
#{
|
|
enable => false
|
|
}.
|
|
|
|
parse_and_check(ConfigString, BridgeType, Name) ->
|
|
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
|
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
|
#{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
|
|
RetConfig.
|
|
|
|
create_bridge(Name, UseTLS, Config) ->
|
|
BridgeConfig = rabbitmq_config(UseTLS, Config),
|
|
{ok, _} = emqx_bridge:create(?TYPE, Name, BridgeConfig),
|
|
emqx_bridge_resource:bridge_id(?TYPE, Name).
|
|
|
|
delete_bridge(Name) ->
|
|
ok = emqx_bridge:remove(?TYPE, Name).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test Cases
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_create_delete_bridge(Config) ->
|
|
Name = atom_to_binary(?FUNCTION_NAME),
|
|
RabbitMQ = get_rabbitmq(Config),
|
|
UseTLS = get_tls(Config),
|
|
create_bridge(Name, UseTLS, RabbitMQ),
|
|
Bridges = emqx_bridge:list(),
|
|
Any = fun(#{name := BName}) -> BName =:= Name end,
|
|
?assert(lists:any(Any, Bridges), Bridges),
|
|
ok = delete_bridge(Name),
|
|
BridgesAfterDelete = emqx_bridge:list(),
|
|
?assertNot(lists:any(Any, BridgesAfterDelete), BridgesAfterDelete),
|
|
ok.
|
|
|
|
t_create_delete_bridge_non_existing_server(Config) ->
|
|
Name = atom_to_binary(?FUNCTION_NAME),
|
|
UseTLS = get_tls(Config),
|
|
create_bridge(Name, UseTLS, #{server => <<"non_existing_server">>, port => 3174}),
|
|
%% Check that the new bridge is in the list of bridges
|
|
Bridges = emqx_bridge:list(),
|
|
Any = fun(#{name := BName}) -> BName =:= Name end,
|
|
?assert(lists:any(Any, Bridges)),
|
|
ok = delete_bridge(Name),
|
|
BridgesAfterDelete = emqx_bridge:list(),
|
|
?assertNot(lists:any(Any, BridgesAfterDelete)),
|
|
ok.
|
|
|
|
t_send_message_query(Config) ->
|
|
Name = atom_to_binary(?FUNCTION_NAME),
|
|
RabbitMQ = get_rabbitmq(Config),
|
|
UseTLS = get_tls(Config),
|
|
BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{batch_size => 1}),
|
|
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
|
|
%% This will use the SQL template included in the bridge
|
|
emqx_bridge:send_message(BridgeID, Payload),
|
|
%% Check that the data got to the database
|
|
?assertEqual(Payload, receive_simple_test_message(Config)),
|
|
ok = delete_bridge(Name),
|
|
ok.
|
|
|
|
t_send_message_query_with_template(Config) ->
|
|
Name = atom_to_binary(?FUNCTION_NAME),
|
|
RabbitMQ = get_rabbitmq(Config),
|
|
UseTLS = get_tls(Config),
|
|
BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{
|
|
batch_size => 1,
|
|
payload_template =>
|
|
<<
|
|
"{"
|
|
" \\\"key\\\": ${key},"
|
|
" \\\"data\\\": \\\"${data}\\\","
|
|
" \\\"timestamp\\\": ${timestamp},"
|
|
" \\\"secret\\\": 42"
|
|
"}"
|
|
>>
|
|
}),
|
|
Payload = #{
|
|
<<"key">> => 7,
|
|
<<"data">> => <<"RabbitMQ">>,
|
|
<<"timestamp">> => 10000
|
|
},
|
|
emqx_bridge:send_message(BridgeID, Payload),
|
|
%% Check that the data got to the database
|
|
ExpectedResult = Payload#{
|
|
<<"secret">> => 42
|
|
},
|
|
?assertEqual(ExpectedResult, receive_simple_test_message(Config)),
|
|
ok = delete_bridge(Name),
|
|
ok.
|
|
|
|
t_send_simple_batch(Config) ->
|
|
Name = atom_to_binary(?FUNCTION_NAME),
|
|
RabbitMQ = get_rabbitmq(Config),
|
|
BridgeConf = RabbitMQ#{batch_size => 100},
|
|
UseTLS = get_tls(Config),
|
|
BridgeID = create_bridge(Name, UseTLS, BridgeConf),
|
|
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
|
|
emqx_bridge:send_message(BridgeID, Payload),
|
|
?assertEqual(Payload, receive_simple_test_message(Config)),
|
|
ok = delete_bridge(Name),
|
|
ok.
|
|
|
|
t_send_simple_batch_with_template(Config) ->
|
|
Name = atom_to_binary(?FUNCTION_NAME),
|
|
RabbitMQ = get_rabbitmq(Config),
|
|
UseTLS = get_tls(Config),
|
|
BridgeConf =
|
|
RabbitMQ#{
|
|
batch_size => 100,
|
|
payload_template =>
|
|
<<
|
|
"{"
|
|
" \\\"key\\\": ${key},"
|
|
" \\\"data\\\": \\\"${data}\\\","
|
|
" \\\"timestamp\\\": ${timestamp},"
|
|
" \\\"secret\\\": 42"
|
|
"}"
|
|
>>
|
|
},
|
|
BridgeID = create_bridge(Name, UseTLS, BridgeConf),
|
|
Payload = #{
|
|
<<"key">> => 7,
|
|
<<"data">> => <<"RabbitMQ">>,
|
|
<<"timestamp">> => 10000
|
|
},
|
|
emqx_bridge:send_message(BridgeID, Payload),
|
|
ExpectedResult = Payload#{
|
|
<<"secret">> => 42
|
|
},
|
|
?assertEqual(ExpectedResult, receive_simple_test_message(Config)),
|
|
ok = delete_bridge(Name),
|
|
ok.
|
|
|
|
t_heavy_batching(Config) ->
|
|
Name = atom_to_binary(?FUNCTION_NAME),
|
|
NumberOfMessages = 20000,
|
|
RabbitMQ = get_rabbitmq(Config),
|
|
UseTLS = get_tls(Config),
|
|
BridgeConf = RabbitMQ#{
|
|
batch_size => 10173,
|
|
batch_time_ms => 50
|
|
},
|
|
BridgeID = create_bridge(Name, UseTLS, BridgeConf),
|
|
SendMessage = fun(Key) ->
|
|
Payload = #{<<"key">> => Key},
|
|
emqx_bridge:send_message(BridgeID, Payload)
|
|
end,
|
|
[SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)],
|
|
AllMessages = lists:foldl(
|
|
fun(_, Acc) ->
|
|
Message = receive_simple_test_message(Config),
|
|
#{<<"key">> := Key} = Message,
|
|
Acc#{Key => true}
|
|
end,
|
|
#{},
|
|
lists:seq(1, NumberOfMessages)
|
|
),
|
|
?assertEqual(NumberOfMessages, maps:size(AllMessages)),
|
|
ok = delete_bridge(Name),
|
|
ok.
|
|
|
|
receive_simple_test_message(Config) ->
|
|
#{channel := Channel} = get_channel_connection(Config),
|
|
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
|
|
amqp_channel:call(
|
|
Channel,
|
|
#'basic.consume'{
|
|
queue = rabbit_mq_queue()
|
|
}
|
|
),
|
|
receive
|
|
%% This is the first message received
|
|
#'basic.consume_ok'{} ->
|
|
ok
|
|
end,
|
|
receive
|
|
{#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
|
|
%% Ack the message
|
|
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
|
|
%% Cancel the consumer
|
|
#'basic.cancel_ok'{consumer_tag = ConsumerTag} =
|
|
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
|
|
emqx_utils_json:decode(Content#amqp_msg.payload)
|
|
after 5000 ->
|
|
?assert(false, "Did not receive message within 5 second")
|
|
end.
|