test: rafator rabbitmq test SUITE, delete redundance code

This commit is contained in:
zhongwencool 2024-02-08 12:37:45 +08:00
parent 90ba2977fe
commit a0f8e4f328
6 changed files with 545 additions and 750 deletions

View File

@ -101,6 +101,7 @@ fields(action_parameters) ->
hoconsc:mk(
binary(),
#{
default => <<"">>,
desc => ?DESC(?CONNECTOR_SCHEMA, "payload_template")
}
)}

View File

@ -1,423 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_rabbitmq_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
%% See comment in
%% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to
%% run this without bringing up the whole CI infrastructure
-define(TYPE, <<"rabbitmq">>).
rabbit_mq_host() ->
<<"rabbitmq">>.
rabbit_mq_port() ->
5672.
rabbit_mq_exchange() ->
<<"messages">>.
rabbit_mq_queue() ->
<<"test_queue">>.
rabbit_mq_routing_key() ->
<<"test_routing_key">>.
get_channel_connection(Config) ->
proplists:get_value(channel_connection, Config).
get_rabbitmq(Config) ->
proplists:get_value(rabbitmq, Config).
get_tls(Config) ->
proplists:get_value(tls, Config).
%%------------------------------------------------------------------------------
%% Common Test Setup, Tear down and Testcase List
%%------------------------------------------------------------------------------
all() ->
[
{group, tcp},
{group, tls}
].
groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
[
{tcp, AllTCs},
{tls, AllTCs}
].
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_group(tcp, Config) ->
RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"),
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")),
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
true ->
Config1 = common_init_per_group(#{
host => RabbitMQHost, port => RabbitMQPort, tls => false
}),
Config1 ++ Config;
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_rabbitmq);
_ ->
{skip, no_rabbitmq}
end
end;
init_per_group(tls, Config) ->
RabbitMQHost = os:getenv("RABBITMQ_TLS_HOST", "rabbitmq"),
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_TLS_PORT", "5671")),
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
true ->
Config1 = common_init_per_group(#{
host => RabbitMQHost, port => RabbitMQPort, tls => true
}),
Config1 ++ Config;
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_rabbitmq);
_ ->
{skip, no_rabbitmq}
end
end;
init_per_group(_Group, Config) ->
Config.
common_init_per_group(Opts) ->
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
{ok, _} = application:ensure_all_started(amqp_client),
emqx_mgmt_api_test_util:init_suite(),
#{host := Host, port := Port, tls := UseTLS} = Opts,
ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS),
[
{channel_connection, ChannelConnection},
{rabbitmq, #{server => Host, port => Port}},
{tls, UseTLS}
].
setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS) ->
SSLOptions =
case UseTLS of
false -> none;
true -> emqx_tls_lib:to_client_opts(ssl_options(UseTLS))
end,
%% Create an exchange and a queue
{ok, Connection} =
amqp_connection:start(#amqp_params_network{
host = Host,
port = Port,
ssl_options = SSLOptions
}),
{ok, Channel} = amqp_connection:open_channel(Connection),
%% Create an exchange
#'exchange.declare_ok'{} =
amqp_channel:call(
Channel,
#'exchange.declare'{
exchange = rabbit_mq_exchange(),
type = <<"topic">>
}
),
%% Create a queue
#'queue.declare_ok'{} =
amqp_channel:call(
Channel,
#'queue.declare'{queue = rabbit_mq_queue()}
),
%% Bind the queue to the exchange
#'queue.bind_ok'{} =
amqp_channel:call(
Channel,
#'queue.bind'{
queue = rabbit_mq_queue(),
exchange = rabbit_mq_exchange(),
routing_key = rabbit_mq_routing_key()
}
),
#{
connection => Connection,
channel => Channel
}.
end_per_group(_Group, Config) ->
#{
connection := Connection,
channel := Channel
} = get_channel_connection(Config),
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector),
_ = application:stop(emqx_bridge),
%% Close the channel
ok = amqp_channel:close(Channel),
%% Close the connection
ok = amqp_connection:close(Connection).
init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, _Config) ->
ok.
rabbitmq_config(UseTLS, Config) ->
BatchSize = maps:get(batch_size, Config, 1),
BatchTime = maps:get(batch_time_ms, Config, 0),
Name = atom_to_binary(?MODULE),
Server = maps:get(server, Config, rabbit_mq_host()),
Port = maps:get(port, Config, rabbit_mq_port()),
Template = maps:get(payload_template, Config, <<"">>),
ConfigString =
io_lib:format(
"bridges.rabbitmq.~s {\n"
" enable = true\n"
" ssl = ~s\n"
" server = \"~s\"\n"
" port = ~p\n"
" username = \"guest\"\n"
" password = \"guest\"\n"
" routing_key = \"~s\"\n"
" exchange = \"~s\"\n"
" payload_template = \"~s\"\n"
" resource_opts = {\n"
" batch_size = ~b\n"
" batch_time = ~bms\n"
" }\n"
"}\n",
[
Name,
hocon_pp:do(ssl_options(UseTLS), #{embedded => true}),
Server,
Port,
rabbit_mq_routing_key(),
rabbit_mq_exchange(),
Template,
BatchSize,
BatchTime
]
),
ct:pal(ConfigString),
parse_and_check(ConfigString, <<"rabbitmq">>, Name).
ssl_options(true) ->
CertsDir = filename:join([
emqx_common_test_helpers:proj_root(),
".ci",
"docker-compose-file",
"certs"
]),
#{
enable => true,
cacertfile => filename:join([CertsDir, "ca.crt"]),
certfile => filename:join([CertsDir, "client.pem"]),
keyfile => filename:join([CertsDir, "client.key"])
};
ssl_options(false) ->
#{
enable => false
}.
parse_and_check(ConfigString, BridgeType, Name) ->
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
#{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
RetConfig.
create_bridge(Name, UseTLS, Config) ->
BridgeConfig = rabbitmq_config(UseTLS, Config),
{ok, _} = emqx_bridge:create(?TYPE, Name, BridgeConfig),
emqx_bridge_resource:bridge_id(?TYPE, Name).
delete_bridge(Name) ->
ok = emqx_bridge:remove(?TYPE, Name).
%%------------------------------------------------------------------------------
%% Test Cases
%%------------------------------------------------------------------------------
t_create_delete_bridge(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
RabbitMQ = get_rabbitmq(Config),
UseTLS = get_tls(Config),
create_bridge(Name, UseTLS, RabbitMQ),
Bridges = emqx_bridge:list(),
Any = fun(#{name := BName}) -> BName =:= Name end,
?assert(lists:any(Any, Bridges), Bridges),
ok = delete_bridge(Name),
BridgesAfterDelete = emqx_bridge:list(),
?assertNot(lists:any(Any, BridgesAfterDelete), BridgesAfterDelete),
ok.
t_create_delete_bridge_non_existing_server(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
UseTLS = get_tls(Config),
create_bridge(Name, UseTLS, #{server => <<"non_existing_server">>, port => 3174}),
%% Check that the new bridge is in the list of bridges
Bridges = emqx_bridge:list(),
Any = fun(#{name := BName}) -> BName =:= Name end,
?assert(lists:any(Any, Bridges)),
ok = delete_bridge(Name),
BridgesAfterDelete = emqx_bridge:list(),
?assertNot(lists:any(Any, BridgesAfterDelete)),
ok.
t_send_message_query(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
RabbitMQ = get_rabbitmq(Config),
UseTLS = get_tls(Config),
BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{batch_size => 1}),
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
%% This will use the SQL template included in the bridge
emqx_bridge:send_message(BridgeID, Payload),
%% Check that the data got to the database
?assertEqual(Payload, receive_simple_test_message(Config)),
ok = delete_bridge(Name),
ok.
t_send_message_query_with_template(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
RabbitMQ = get_rabbitmq(Config),
UseTLS = get_tls(Config),
BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{
batch_size => 1,
payload_template =>
<<
"{"
" \\\"key\\\": ${key},"
" \\\"data\\\": \\\"${data}\\\","
" \\\"timestamp\\\": ${timestamp},"
" \\\"secret\\\": 42"
"}"
>>
}),
Payload = #{
<<"key">> => 7,
<<"data">> => <<"RabbitMQ">>,
<<"timestamp">> => 10000
},
emqx_bridge:send_message(BridgeID, Payload),
%% Check that the data got to the database
ExpectedResult = Payload#{
<<"secret">> => 42
},
?assertEqual(ExpectedResult, receive_simple_test_message(Config)),
ok = delete_bridge(Name),
ok.
t_send_simple_batch(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
RabbitMQ = get_rabbitmq(Config),
BridgeConf = RabbitMQ#{batch_size => 100},
UseTLS = get_tls(Config),
BridgeID = create_bridge(Name, UseTLS, BridgeConf),
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
emqx_bridge:send_message(BridgeID, Payload),
?assertEqual(Payload, receive_simple_test_message(Config)),
ok = delete_bridge(Name),
ok.
t_send_simple_batch_with_template(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
RabbitMQ = get_rabbitmq(Config),
UseTLS = get_tls(Config),
BridgeConf =
RabbitMQ#{
batch_size => 100,
payload_template =>
<<
"{"
" \\\"key\\\": ${key},"
" \\\"data\\\": \\\"${data}\\\","
" \\\"timestamp\\\": ${timestamp},"
" \\\"secret\\\": 42"
"}"
>>
},
BridgeID = create_bridge(Name, UseTLS, BridgeConf),
Payload = #{
<<"key">> => 7,
<<"data">> => <<"RabbitMQ">>,
<<"timestamp">> => 10000
},
emqx_bridge:send_message(BridgeID, Payload),
ExpectedResult = Payload#{
<<"secret">> => 42
},
?assertEqual(ExpectedResult, receive_simple_test_message(Config)),
ok = delete_bridge(Name),
ok.
t_heavy_batching(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
NumberOfMessages = 20000,
RabbitMQ = get_rabbitmq(Config),
UseTLS = get_tls(Config),
BridgeConf = RabbitMQ#{
batch_size => 10173,
batch_time_ms => 50
},
BridgeID = create_bridge(Name, UseTLS, BridgeConf),
SendMessage = fun(Key) ->
Payload = #{<<"key">> => Key},
emqx_bridge:send_message(BridgeID, Payload)
end,
[SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)],
AllMessages = lists:foldl(
fun(_, Acc) ->
Message = receive_simple_test_message(Config),
#{<<"key">> := Key} = Message,
Acc#{Key => true}
end,
#{},
lists:seq(1, NumberOfMessages)
),
?assertEqual(NumberOfMessages, maps:size(AllMessages)),
ok = delete_bridge(Name),
ok.
receive_simple_test_message(Config) ->
#{channel := Channel} = get_channel_connection(Config),
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
amqp_channel:call(
Channel,
#'basic.consume'{
queue = rabbit_mq_queue()
}
),
receive
%% This is the first message received
#'basic.consume_ok'{} ->
ok
end,
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
%% Cancel the consumer
#'basic.cancel_ok'{consumer_tag = ConsumerTag} =
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
emqx_utils_json:decode(Content#amqp_msg.payload)
after 5000 ->
?assert(false, "Did not receive message within 5 second")
end.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_rabbitmq_connector_SUITE).
@ -12,6 +12,18 @@
-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-import(emqx_bridge_rabbitmq_test_utils, [
rabbit_mq_exchange/0,
rabbit_mq_routing_key/0,
rabbit_mq_queue/0,
rabbit_mq_host/0,
rabbit_mq_port/0,
get_rabbitmq/1,
ssl_options/1,
get_channel_connection/1,
parse_and_check/4,
receive_message_from_rabbitmq/1
]).
%% This test SUITE requires a running RabbitMQ instance. If you don't want to
%% bring up the whole CI infrastructure with the `scripts/ct/run.sh` script
@ -21,99 +33,24 @@
%%
%% docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3.11-management
rabbit_mq_host() ->
list_to_binary(os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq")).
rabbit_mq_port() ->
list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")).
rabbit_mq_password() ->
<<"guest">>.
rabbit_mq_exchange() ->
<<"test_exchange">>.
rabbit_mq_queue() ->
<<"test_queue">>.
rabbit_mq_routing_key() ->
<<"test_routing_key">>.
all() ->
emqx_common_test_helpers:all(?MODULE).
[
{group, tcp},
{group, tls}
].
init_per_suite(Config) ->
Host = rabbit_mq_host(),
Port = rabbit_mq_port(),
ct:pal("rabbitmq:~p~n", [{Host, Port}]),
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
true ->
Apps = emqx_cth_suite:start(
[emqx_conf, emqx_connector, emqx_bridge_rabbitmq],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port),
[{channel_connection, ChannelConnection}, {suite_apps, Apps} | Config];
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_rabbitmq);
_ ->
{skip, no_rabbitmq}
end
end.
groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
[
{tcp, AllTCs},
{tls, AllTCs}
].
setup_rabbit_mq_exchange_and_queue(Host, Port) ->
%% Create an exchange and a queue
{ok, Connection} =
amqp_connection:start(#amqp_params_network{
host = binary_to_list(Host),
port = Port
}),
{ok, Channel} = amqp_connection:open_channel(Connection),
%% Create an exchange
#'exchange.declare_ok'{} =
amqp_channel:call(
Channel,
#'exchange.declare'{
exchange = rabbit_mq_exchange(),
type = <<"topic">>
}
),
%% Create a queue
#'queue.declare_ok'{} =
amqp_channel:call(
Channel,
#'queue.declare'{queue = rabbit_mq_queue()}
),
%% Bind the queue to the exchange
#'queue.bind_ok'{} =
amqp_channel:call(
Channel,
#'queue.bind'{
queue = rabbit_mq_queue(),
exchange = rabbit_mq_exchange(),
routing_key = rabbit_mq_routing_key()
}
),
#{
connection => Connection,
channel => Channel
}.
init_per_group(Group, Config) ->
emqx_bridge_rabbitmq_test_utils:init_per_group(Group, Config).
get_channel_connection(Config) ->
proplists:get_value(channel_connection, Config).
end_per_suite(Config) ->
#{
connection := Connection,
channel := Channel
} = get_channel_connection(Config),
%% Close the channel
ok = amqp_channel:close(Channel),
%% Close the connection
ok = amqp_connection:close(Connection),
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
end_per_group(Group, Config) ->
emqx_bridge_rabbitmq_test_utils:end_per_group(Group, Config).
% %%------------------------------------------------------------------------------
% %% Testcases
@ -143,7 +80,6 @@ t_start_passfile(Config) ->
).
perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
#{channel := Channel} = get_channel_connection(TestConfig),
CheckedConfig = check_config(InitialConfig),
#{
id := PoolName,
@ -159,7 +95,7 @@ perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
emqx_resource:get_instance(ResourceID),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)),
%% Perform query as further check that the resource is working as expected
perform_query(ResourceID, Channel),
perform_query(ResourceID, TestConfig),
?assertEqual(ok, emqx_resource:stop(ResourceID)),
%% Resource will be listed still, but state will be changed and healthcheck will fail
%% as the worker no longer exists.
@ -181,7 +117,7 @@ perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
emqx_resource:get_instance(ResourceID),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)),
%% Check that everything is working again by performing a query
perform_query(ResourceID, Channel),
perform_query(ResourceID, TestConfig),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(ResourceID)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
@ -214,37 +150,12 @@ perform_query(PoolName, Channel) ->
?assertEqual(ok, emqx_resource_manager:add_channel(PoolName, ChannelId, ActionConfig)),
ok = emqx_resource:query(PoolName, {ChannelId, payload()}),
%% Get the message from queue:
ok = receive_simple_test_message(Channel),
SendData = test_data(),
RecvData = receive_message_from_rabbitmq(Channel),
?assertMatch(SendData, RecvData),
?assertEqual(ok, emqx_resource_manager:remove_channel(PoolName, ChannelId)),
ok.
receive_simple_test_message(Channel) ->
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
amqp_channel:call(
Channel,
#'basic.consume'{
queue = rabbit_mq_queue()
}
),
receive
%% This is the first message received
#'basic.consume_ok'{} ->
ok
end,
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
Expected = test_data(),
?assertEqual(Expected, emqx_utils_json:decode(Content#amqp_msg.payload)),
%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
%% Cancel the consumer
#'basic.cancel_ok'{consumer_tag = ConsumerTag} =
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
ok
after 5000 ->
?assert(false, "Did not receive message within 5 second")
end.
rabbitmq_config() ->
rabbitmq_config(#{}).
@ -278,3 +189,6 @@ rabbitmq_action_config() ->
wait_for_publish_confirmations => true
}
}.
rabbit_mq_password() ->
<<"guest">>.

View File

@ -0,0 +1,203 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_rabbitmq_test_utils).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
init_per_group(tcp, Config) ->
RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"),
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")),
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
true ->
Config1 = common_init_per_group(#{
host => RabbitMQHost, port => RabbitMQPort, tls => false
}),
Config1 ++ Config;
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_rabbitmq);
_ ->
{skip, no_rabbitmq}
end
end;
init_per_group(tls, Config) ->
RabbitMQHost = os:getenv("RABBITMQ_TLS_HOST", "rabbitmq"),
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_TLS_PORT", "5671")),
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
true ->
Config1 = common_init_per_group(#{
host => RabbitMQHost, port => RabbitMQPort, tls => true
}),
Config1 ++ Config;
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_rabbitmq);
_ ->
{skip, no_rabbitmq}
end
end;
init_per_group(_Group, Config) ->
Config.
common_init_per_group(Opts) ->
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([
emqx_conf, emqx_bridge, emqx_bridge_rabbitmq, emqx_rule_engine
]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
{ok, _} = application:ensure_all_started(amqp_client),
emqx_mgmt_api_test_util:init_suite(),
#{host := Host, port := Port, tls := UseTLS} = Opts,
ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS),
[
{channel_connection, ChannelConnection},
{rabbitmq, #{server => Host, port => Port, tls => UseTLS}}
].
setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS) ->
SSLOptions =
case UseTLS of
false -> none;
true -> emqx_tls_lib:to_client_opts(ssl_options(UseTLS))
end,
%% Create an exchange and a queue
{ok, Connection} =
amqp_connection:start(#amqp_params_network{
host = Host,
port = Port,
ssl_options = SSLOptions
}),
{ok, Channel} = amqp_connection:open_channel(Connection),
%% Create an exchange
#'exchange.declare_ok'{} =
amqp_channel:call(
Channel,
#'exchange.declare'{
exchange = rabbit_mq_exchange(),
type = <<"topic">>
}
),
%% Create a queue
#'queue.declare_ok'{} =
amqp_channel:call(
Channel,
#'queue.declare'{queue = rabbit_mq_queue()}
),
%% Bind the queue to the exchange
#'queue.bind_ok'{} =
amqp_channel:call(
Channel,
#'queue.bind'{
queue = rabbit_mq_queue(),
exchange = rabbit_mq_exchange(),
routing_key = rabbit_mq_routing_key()
}
),
#{
connection => Connection,
channel => Channel
}.
end_per_group(_Group, Config) ->
#{
connection := Connection,
channel := Channel
} = get_channel_connection(Config),
amqp_channel:call(Channel, #'queue.purge'{queue = rabbit_mq_queue()}),
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge_rabbitmq, emqx_rule_engine]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector),
_ = application:stop(emqx_bridge),
%% Close the channel
ok = amqp_channel:close(Channel),
%% Close the connection
ok = amqp_connection:close(Connection).
rabbit_mq_host() ->
list_to_binary(os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq")).
rabbit_mq_port() ->
list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")).
rabbit_mq_exchange() ->
<<"messages">>.
rabbit_mq_queue() ->
<<"test_queue">>.
rabbit_mq_routing_key() ->
<<"test_routing_key">>.
get_rabbitmq(Config) ->
proplists:get_value(rabbitmq, Config).
get_channel_connection(Config) ->
proplists:get_value(channel_connection, Config).
ssl_options(true) ->
CertsDir = filename:join([
emqx_common_test_helpers:proj_root(),
".ci",
"docker-compose-file",
"certs"
]),
#{
enable => true,
cacertfile => filename:join([CertsDir, "ca.crt"]),
certfile => filename:join([CertsDir, "client.pem"]),
keyfile => filename:join([CertsDir, "client.key"])
};
ssl_options(false) ->
#{
enable => false
}.
parse_and_check(Key, Mod, Conf, Name) ->
ConfStr = hocon_pp:do(Conf, #{}),
ct:pal(ConfStr),
{ok, RawConf} = hocon:binary(ConfStr, #{format => map}),
hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}),
#{Key := #{<<"rabbitmq">> := #{Name := RetConf}}} = RawConf,
RetConf.
receive_message_from_rabbitmq(Config) ->
#{channel := Channel} = get_channel_connection(Config),
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
amqp_channel:call(
Channel,
#'basic.consume'{
queue = rabbit_mq_queue()
}
),
receive
%% This is the first message received
#'basic.consume_ok'{} ->
ok
end,
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
%% Cancel the consumer
#'basic.cancel_ok'{consumer_tag = ConsumerTag} =
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
Payload = Content#amqp_msg.payload,
case emqx_utils_json:safe_decode(Payload, [return_maps]) of
{ok, Msg} -> Msg;
{error, _} -> ?assert(false, {"Failed to decode the message", Payload})
end
after 5000 ->
?assert(false, "Did not receive message within 5 second")
end.

View File

@ -0,0 +1,221 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_rabbitmq_v1_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
%% See comment in
%% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to
%% run this without bringing up the whole CI infrastructure
-define(TYPE, <<"rabbitmq">>).
-import(emqx_bridge_rabbitmq_test_utils, [
rabbit_mq_exchange/0,
rabbit_mq_routing_key/0,
rabbit_mq_queue/0,
rabbit_mq_host/0,
rabbit_mq_port/0,
get_rabbitmq/1,
ssl_options/1,
get_channel_connection/1,
parse_and_check/4,
receive_message_from_rabbitmq/1
]).
%%------------------------------------------------------------------------------
%% Common Test Setup, Tear down and Testcase List
%%------------------------------------------------------------------------------
all() ->
[
{group, tcp},
{group, tls}
].
groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
[
{tcp, AllTCs},
{tls, AllTCs}
].
init_per_group(Group, Config) ->
emqx_bridge_rabbitmq_test_utils:init_per_group(Group, Config).
end_per_group(Group, Config) ->
emqx_bridge_rabbitmq_test_utils:end_per_group(Group, Config).
create_bridge(Name, Config) ->
BridgeConfig = rabbitmq_config(Config),
{ok, _} = emqx_bridge:create(?TYPE, Name, BridgeConfig),
emqx_bridge_resource:bridge_id(?TYPE, Name).
delete_bridge(Name) ->
ok = emqx_bridge:remove(?TYPE, Name).
%%------------------------------------------------------------------------------
%% Test Cases
%%------------------------------------------------------------------------------
t_create_delete_bridge(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
RabbitMQ = get_rabbitmq(Config),
create_bridge(Name, RabbitMQ),
Bridges = emqx_bridge:list(),
Any = fun(#{name := BName}) -> BName =:= Name end,
?assert(lists:any(Any, Bridges), Bridges),
ok = delete_bridge(Name),
BridgesAfterDelete = emqx_bridge:list(),
?assertNot(lists:any(Any, BridgesAfterDelete), BridgesAfterDelete),
ok.
t_create_delete_bridge_non_existing_server(_Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
create_bridge(Name, #{server => <<"non_existing_server">>, port => 3174}),
%% Check that the new bridge is in the list of bridges
Bridges = emqx_bridge:list(),
Any = fun(#{name := BName}) -> BName =:= Name end,
?assert(lists:any(Any, Bridges)),
ok = delete_bridge(Name),
BridgesAfterDelete = emqx_bridge:list(),
?assertNot(lists:any(Any, BridgesAfterDelete)),
ok.
t_send_message_query(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
RabbitMQ = get_rabbitmq(Config),
BridgeID = create_bridge(Name, RabbitMQ#{batch_size => 1}),
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
%% This will use the SQL template included in the bridge
emqx_bridge:send_message(BridgeID, Payload),
%% Check that the data got to the database
?assertEqual(Payload, receive_message_from_rabbitmq(Config)),
ok = delete_bridge(Name),
ok.
t_send_message_query_with_template(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
RabbitMQ = get_rabbitmq(Config),
BridgeID = create_bridge(Name, RabbitMQ#{
batch_size => 1,
payload_template => payload_template()
}),
Payload = #{
<<"key">> => 7,
<<"data">> => <<"RabbitMQ">>,
<<"timestamp">> => 10000
},
emqx_bridge:send_message(BridgeID, Payload),
%% Check that the data got to the database
ExpectedResult = Payload#{
<<"secret">> => 42
},
?assertEqual(ExpectedResult, receive_message_from_rabbitmq(Config)),
ok = delete_bridge(Name),
ok.
t_send_simple_batch(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
RabbitMQ = get_rabbitmq(Config),
BridgeConf = RabbitMQ#{batch_size => 100},
BridgeID = create_bridge(Name, BridgeConf),
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
emqx_bridge:send_message(BridgeID, Payload),
?assertEqual(Payload, receive_message_from_rabbitmq(Config)),
ok = delete_bridge(Name),
ok.
t_send_simple_batch_with_template(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
RabbitMQ = get_rabbitmq(Config),
BridgeConf =
RabbitMQ#{
batch_size => 100,
payload_template => payload_template()
},
BridgeID = create_bridge(Name, BridgeConf),
Payload = #{
<<"key">> => 7,
<<"data">> => <<"RabbitMQ">>,
<<"timestamp">> => 10000
},
emqx_bridge:send_message(BridgeID, Payload),
ExpectedResult = Payload#{<<"secret">> => 42},
?assertEqual(ExpectedResult, receive_message_from_rabbitmq(Config)),
ok = delete_bridge(Name),
ok.
t_heavy_batching(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
NumberOfMessages = 20000,
RabbitMQ = get_rabbitmq(Config),
BridgeConf = RabbitMQ#{
batch_size => 10173,
batch_time_ms => 50
},
BridgeID = create_bridge(Name, BridgeConf),
SendMessage = fun(Key) ->
Payload = #{<<"key">> => Key},
emqx_bridge:send_message(BridgeID, Payload)
end,
[SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)],
AllMessages = lists:foldl(
fun(_, Acc) ->
Message = receive_message_from_rabbitmq(Config),
#{<<"key">> := Key} = Message,
Acc#{Key => true}
end,
#{},
lists:seq(1, NumberOfMessages)
),
?assertEqual(NumberOfMessages, maps:size(AllMessages)),
ok = delete_bridge(Name),
ok.
rabbitmq_config(Config) ->
UseTLS = maps:get(tls, Config, false),
BatchSize = maps:get(batch_size, Config, 1),
BatchTime = maps:get(batch_time_ms, Config, 0),
Name = atom_to_binary(?MODULE),
Server = maps:get(server, Config, rabbit_mq_host()),
Port = maps:get(port, Config, rabbit_mq_port()),
Template = maps:get(payload_template, Config, <<"">>),
Bridge =
#{
<<"bridges">> => #{
<<"rabbitmq">> => #{
Name => #{
<<"enable">> => true,
<<"ssl">> => ssl_options(UseTLS),
<<"server">> => Server,
<<"port">> => Port,
<<"username">> => <<"guest">>,
<<"password">> => <<"guest">>,
<<"routing_key">> => rabbit_mq_routing_key(),
<<"exchange">> => rabbit_mq_exchange(),
<<"payload_template">> => Template,
<<"resource_opts">> => #{
<<"batch_size">> => BatchSize,
<<"batch_time">> => BatchTime
}
}
}
}
},
parse_and_check(<<"bridges">>, emqx_bridge_schema, Bridge, Name).
payload_template() ->
<<
"{"
" \"key\": ${key},"
" \"data\": \"${data}\","
" \"timestamp\": ${timestamp},"
" \"secret\": 42"
"}"
>>.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_rabbitmq_v2_SUITE).
@ -12,203 +12,108 @@
-include_lib("stdlib/include/assert.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-define(TYPE, <<"rabbitmq">>).
-import(emqx_bridge_rabbitmq_test_utils, [
rabbit_mq_exchange/0,
rabbit_mq_routing_key/0,
rabbit_mq_queue/0,
rabbit_mq_host/0,
rabbit_mq_port/0,
get_rabbitmq/1,
get_tls/1,
ssl_options/1,
get_channel_connection/1,
parse_and_check/4,
receive_message_from_rabbitmq/1
]).
-import(emqx_common_test_helpers, [on_exit/1]).
rabbit_mq_host() ->
<<"rabbitmq">>.
rabbit_mq_port() ->
5672.
rabbit_mq_exchange() ->
<<"messages">>.
rabbit_mq_queue() ->
<<"test_queue">>.
rabbit_mq_routing_key() ->
<<"test_routing_key">>.
get_channel_connection(Config) ->
proplists:get_value(channel_connection, Config).
get_rabbitmq(Config) ->
proplists:get_value(rabbitmq, Config).
%%------------------------------------------------------------------------------
%% Common Test Setup, Tear down and Testcase List
%%------------------------------------------------------------------------------
-define(TYPE, <<"rabbitmq">>).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"),
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")),
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
true ->
Config1 = common_init(#{
host => RabbitMQHost, port => RabbitMQPort
}),
Name = atom_to_binary(?MODULE),
Config2 = [{connector, Name} | Config1 ++ Config],
create_connector(Name, get_rabbitmq(Config2)),
Config2;
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_rabbitmq);
_ ->
{skip, no_rabbitmq}
end
end.
common_init(Opts) ->
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([
emqx_conf, emqx_bridge, emqx_bridge_rabbitmq, emqx_rule_engine
]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
{ok, _} = application:ensure_all_started(amqp_client),
emqx_mgmt_api_test_util:init_suite(),
#{host := Host, port := Port} = Opts,
ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port),
[
{channel_connection, ChannelConnection},
{rabbitmq, #{server => Host, port => Port}}
{group, tcp},
{group, tls}
].
setup_rabbit_mq_exchange_and_queue(Host, Port) ->
%% Create an exchange and a queue
{ok, Connection} =
amqp_connection:start(#amqp_params_network{
host = Host,
port = Port,
ssl_options = none
}),
{ok, Channel} = amqp_connection:open_channel(Connection),
%% Create an exchange
#'exchange.declare_ok'{} =
amqp_channel:call(
Channel,
#'exchange.declare'{
exchange = rabbit_mq_exchange(),
type = <<"topic">>
}
),
%% Create a queue
#'queue.declare_ok'{} =
amqp_channel:call(
Channel,
#'queue.declare'{queue = rabbit_mq_queue()}
),
%% Bind the queue to the exchange
#'queue.bind_ok'{} =
amqp_channel:call(
Channel,
#'queue.bind'{
queue = rabbit_mq_queue(),
exchange = rabbit_mq_exchange(),
routing_key = rabbit_mq_routing_key()
}
),
#{
connection => Connection,
channel => Channel
}.
groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
[
{tcp, AllTCs},
{tls, AllTCs}
].
end_per_suite(Config) ->
delete_connector(proplists:get_value(connector, Config)),
#{
connection := Connection,
channel := Channel
} = get_channel_connection(Config),
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge_rabbitmq, emqx_rule_engine]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector),
_ = application:stop(emqx_bridge),
%% Close the channel
ok = amqp_channel:close(Channel),
%% Close the connection
ok = amqp_connection:close(Connection).
init_per_group(Group, Config) ->
Config1 = emqx_bridge_rabbitmq_test_utils:init_per_group(Group, Config),
Name = atom_to_binary(?MODULE),
create_connector(Name, get_rabbitmq(Config1)),
Config1.
end_per_group(Group, Config) ->
Name = atom_to_binary(?MODULE),
delete_connector(Name),
emqx_bridge_rabbitmq_test_utils:end_per_group(Group, Config).
rabbitmq_connector(Config) ->
UseTLS = maps:get(tls, Config, false),
Name = atom_to_binary(?MODULE),
Server = maps:get(server, Config, rabbit_mq_host()),
Port = maps:get(port, Config, rabbit_mq_port()),
ConfigStr =
io_lib:format(
"connectors.rabbitmq.~s {\n"
" enable = true\n"
" ssl = {enable = false}\n"
" server = \"~s\"\n"
" port = ~p\n"
" username = \"guest\"\n"
" password = \"guest\"\n"
"}\n",
[
Name,
Server,
Port
]
),
ct:pal(ConfigStr),
parse_and_check(<<"connectors">>, emqx_connector_schema, ConfigStr, <<"rabbitmq">>, Name).
Connector = #{
<<"connectors">> => #{
<<"rabbitmq">> => #{
Name => #{
<<"enable">> => true,
<<"ssl">> => ssl_options(UseTLS),
<<"server">> => Server,
<<"port">> => Port,
<<"username">> => <<"guest">>,
<<"password">> => <<"guest">>
}
}
}
},
parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name).
rabbitmq_source() ->
Name = atom_to_binary(?MODULE),
ConfigStr =
io_lib:format(
"sources.rabbitmq.~s {\n"
"connector = ~s\n"
"enable = true\n"
"parameters {\n"
"no_ack = true\n"
"queue = ~s\n"
"wait_for_publish_confirmations = true\n"
"}}\n",
[
Name,
Name,
rabbit_mq_queue()
]
),
ct:pal(ConfigStr),
parse_and_check(<<"sources">>, emqx_bridge_v2_schema, ConfigStr, <<"rabbitmq">>, Name).
Source = #{
<<"sources">> => #{
<<"rabbitmq">> => #{
Name => #{
<<"enable">> => true,
<<"connector">> => Name,
<<"parameters">> => #{
<<"no_ack">> => true,
<<"queue">> => rabbit_mq_queue(),
<<"wait_for_publish_confirmations">> => true
}
}
}
}
},
parse_and_check(<<"sources">>, emqx_bridge_v2_schema, Source, Name).
rabbitmq_action() ->
Name = atom_to_binary(?MODULE),
ConfigStr =
io_lib:format(
"actions.rabbitmq.~s {\n"
"connector = ~s\n"
"enable = true\n"
"parameters {\n"
"exchange: ~s\n"
"payload_template: \"${.payload}\"\n"
"routing_key: ~s\n"
"delivery_mode: non_persistent\n"
"publish_confirmation_timeout: 30s\n"
"wait_for_publish_confirmations = true\n"
"}}\n",
[
Name,
Name,
rabbit_mq_exchange(),
rabbit_mq_routing_key()
]
),
ct:pal(ConfigStr),
parse_and_check(<<"actions">>, emqx_bridge_v2_schema, ConfigStr, <<"rabbitmq">>, Name).
parse_and_check(Key, Mod, ConfigStr, Type, Name) ->
{ok, RawConf} = hocon:binary(ConfigStr, #{format => map}),
hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}),
#{Key := #{Type := #{Name := RetConfig}}} = RawConf,
RetConfig.
Action = #{
<<"actions">> => #{
<<"rabbitmq">> => #{
Name => #{
<<"connector">> => Name,
<<"enable">> => true,
<<"parameters">> => #{
<<"exchange">> => rabbit_mq_exchange(),
<<"payload_template">> => <<"${.payload}">>,
<<"routing_key">> => rabbit_mq_routing_key(),
<<"delivery_mode">> => <<"non_persistent">>,
<<"publish_confirmation_timeout">> => <<"30s">>,
<<"wait_for_publish_confirmations">> => true
}
}
}
}
},
parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name).
create_connector(Name, Config) ->
Connector = rabbitmq_connector(Config),
@ -308,7 +213,7 @@ t_action(Config) ->
Payload = payload(),
PayloadBin = emqx_utils_json:encode(Payload),
{ok, _} = emqtt:publish(C1, Topic, #{}, PayloadBin, [{qos, 1}, {retain, false}]),
Msg = receive_test_message_from_rabbitmq(Config),
Msg = receive_message_from_rabbitmq(Config),
?assertMatch(Payload, Msg),
ok = emqtt:disconnect(C1),
ok = delete_action(Name),
@ -354,29 +259,3 @@ send_test_message_to_rabbitmq(Config) ->
}
),
ok.
receive_test_message_from_rabbitmq(Config) ->
#{channel := Channel} = get_channel_connection(Config),
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
amqp_channel:call(
Channel,
#'basic.consume'{
queue = rabbit_mq_queue()
}
),
receive
%% This is the first message received
#'basic.consume_ok'{} ->
ok
end,
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
%% Cancel the consumer
#'basic.cancel_ok'{consumer_tag = ConsumerTag} =
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
emqx_utils_json:decode(Content#amqp_msg.payload)
after 5000 ->
?assert(false, "Did not receive message within 5 second")
end.