%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_rabbitmq_SUITE). -compile(nowarn_export_all). -compile(export_all). -include_lib("emqx_connector/include/emqx_connector.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("stdlib/include/assert.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). %% See comment in %% lib-ee/emqx_ee_connector/test/ee_connector_rabbitmq_SUITE.erl for how to %% run this without bringing up the whole CI infrastucture rabbit_mq_host() -> <<"rabbitmq">>. rabbit_mq_port() -> 5672. rabbit_mq_exchange() -> <<"messages">>. rabbit_mq_queue() -> <<"test_queue">>. rabbit_mq_routing_key() -> <<"test_routing_key">>. get_channel_connection(Config) -> proplists:get_value(channel_connection, Config). %%------------------------------------------------------------------------------ %% Common Test Setup, Teardown and Testcase List %%------------------------------------------------------------------------------ init_per_suite(Config) -> % snabbkaffe:fix_ct_logging(), case emqx_common_test_helpers:is_tcp_server_available( erlang:binary_to_list(rabbit_mq_host()), rabbit_mq_port() ) of true -> emqx_common_test_helpers:render_and_load_app_config(emqx_conf), ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]), {ok, _} = application:ensure_all_started(emqx_connector), {ok, _} = application:ensure_all_started(emqx_ee_connector), {ok, _} = application:ensure_all_started(emqx_ee_bridge), {ok, _} = application:ensure_all_started(amqp_client), emqx_mgmt_api_test_util:init_suite(), ChannelConnection = setup_rabbit_mq_exchange_and_queue(), [{channel_connection, ChannelConnection} | Config]; false -> case os:getenv("IS_CI") of "yes" -> throw(no_rabbitmq); _ -> {skip, no_rabbitmq} end end. setup_rabbit_mq_exchange_and_queue() -> %% Create an exachange and a queue {ok, Connection} = amqp_connection:start(#amqp_params_network{ host = erlang:binary_to_list(rabbit_mq_host()), port = rabbit_mq_port() }), {ok, Channel} = amqp_connection:open_channel(Connection), %% Create an exchange #'exchange.declare_ok'{} = amqp_channel:call( Channel, #'exchange.declare'{ exchange = rabbit_mq_exchange(), type = <<"topic">> } ), %% Create a queue #'queue.declare_ok'{} = amqp_channel:call( Channel, #'queue.declare'{queue = rabbit_mq_queue()} ), %% Bind the queue to the exchange #'queue.bind_ok'{} = amqp_channel:call( Channel, #'queue.bind'{ queue = rabbit_mq_queue(), exchange = rabbit_mq_exchange(), routing_key = rabbit_mq_routing_key() } ), #{ connection => Connection, channel => Channel }. end_per_suite(Config) -> #{ connection := Connection, channel := Channel } = get_channel_connection(Config), emqx_mgmt_api_test_util:end_suite(), ok = emqx_common_test_helpers:stop_apps([emqx_conf]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), _ = application:stop(emqx_connector), _ = application:stop(emqx_ee_connector), _ = application:stop(emqx_bridge), %% Close the channel ok = amqp_channel:close(Channel), %% Close the connection ok = amqp_connection:close(Connection). init_per_testcase(_, Config) -> Config. end_per_testcase(_, _Config) -> ok. all() -> emqx_common_test_helpers:all(?MODULE). rabbitmq_config(Config) -> %%SQL = maps:get(sql, Config, sql_insert_template_for_bridge()), BatchSize = maps:get(batch_size, Config, 1), BatchTime = maps:get(batch_time_ms, Config, 0), Name = atom_to_binary(?MODULE), Server = maps:get(server, Config, rabbit_mq_host()), Port = maps:get(port, Config, rabbit_mq_port()), Template = maps:get(payload_template, Config, <<"">>), ConfigString = io_lib:format( "bridges.rabbitmq.~s {\n" " enable = true\n" " server = \"~s\"\n" " port = ~p\n" " username = \"guest\"\n" " password = \"guest\"\n" " routing_key = \"~s\"\n" " exchange = \"~s\"\n" " payload_template = \"~s\"\n" " resource_opts = {\n" " batch_size = ~b\n" " batch_time = ~bms\n" " }\n" "}\n", [ Name, Server, Port, rabbit_mq_routing_key(), rabbit_mq_exchange(), Template, BatchSize, BatchTime ] ), ct:pal(ConfigString), parse_and_check(ConfigString, <<"rabbitmq">>, Name). parse_and_check(ConfigString, BridgeType, Name) -> {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, RetConfig. make_bridge(Config) -> Type = <<"rabbitmq">>, Name = atom_to_binary(?MODULE), BridgeConfig = rabbitmq_config(Config), {ok, _} = emqx_bridge:create( Type, Name, BridgeConfig ), emqx_bridge_resource:bridge_id(Type, Name). delete_bridge() -> Type = <<"rabbitmq">>, Name = atom_to_binary(?MODULE), {ok, _} = emqx_bridge:remove(Type, Name), ok. %%------------------------------------------------------------------------------ %% Test Cases %%------------------------------------------------------------------------------ t_make_delete_bridge(_Config) -> make_bridge(#{}), %% Check that the new brige is in the list of bridges Bridges = emqx_bridge:list(), Name = atom_to_binary(?MODULE), IsRightName = fun (#{name := BName}) when BName =:= Name -> true; (_) -> false end, ?assert(lists:any(IsRightName, Bridges)), delete_bridge(), BridgesAfterDelete = emqx_bridge:list(), ?assertNot(lists:any(IsRightName, BridgesAfterDelete)), ok. t_make_delete_bridge_non_existing_server(_Config) -> make_bridge(#{server => <<"non_existing_server">>, port => 3174}), %% Check that the new brige is in the list of bridges Bridges = emqx_bridge:list(), Name = atom_to_binary(?MODULE), IsRightName = fun (#{name := BName}) when BName =:= Name -> true; (_) -> false end, ?assert(lists:any(IsRightName, Bridges)), delete_bridge(), BridgesAfterDelete = emqx_bridge:list(), ?assertNot(lists:any(IsRightName, BridgesAfterDelete)), ok. t_send_message_query(Config) -> BridgeID = make_bridge(#{batch_size => 1}), Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}, %% This will use the SQL template included in the bridge emqx_bridge:send_message(BridgeID, Payload), %% Check that the data got to the database ?assertEqual(Payload, receive_simple_test_message(Config)), delete_bridge(), ok. t_send_message_query_with_template(Config) -> BridgeID = make_bridge(#{ batch_size => 1, payload_template => << "{" " \\\"key\\\": ${key}," " \\\"data\\\": \\\"${data}\\\"," " \\\"timestamp\\\": ${timestamp}," " \\\"secret\\\": 42" "}" >> }), Payload = #{ <<"key">> => 7, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000 }, emqx_bridge:send_message(BridgeID, Payload), %% Check that the data got to the database ExpectedResult = Payload#{ <<"secret">> => 42 }, ?assertEqual(ExpectedResult, receive_simple_test_message(Config)), delete_bridge(), ok. t_send_simple_batch(Config) -> BridgeConf = #{ batch_size => 100 }, BridgeID = make_bridge(BridgeConf), Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}, emqx_bridge:send_message(BridgeID, Payload), ?assertEqual(Payload, receive_simple_test_message(Config)), delete_bridge(), ok. t_send_simple_batch_with_template(Config) -> BridgeConf = #{ batch_size => 100, payload_template => << "{" " \\\"key\\\": ${key}," " \\\"data\\\": \\\"${data}\\\"," " \\\"timestamp\\\": ${timestamp}," " \\\"secret\\\": 42" "}" >> }, BridgeID = make_bridge(BridgeConf), Payload = #{ <<"key">> => 7, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000 }, emqx_bridge:send_message(BridgeID, Payload), ExpectedResult = Payload#{ <<"secret">> => 42 }, ?assertEqual(ExpectedResult, receive_simple_test_message(Config)), delete_bridge(), ok. t_heavy_batching(Config) -> NumberOfMessages = 20000, BridgeConf = #{ batch_size => 10173, batch_time_ms => 50 }, BridgeID = make_bridge(BridgeConf), SendMessage = fun(Key) -> Payload = #{ <<"key">> => Key }, emqx_bridge:send_message(BridgeID, Payload) end, [SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)], AllMessages = lists:foldl( fun(_, Acc) -> Message = receive_simple_test_message(Config), #{<<"key">> := Key} = Message, Acc#{Key => true} end, #{}, lists:seq(1, NumberOfMessages) ), ?assertEqual(NumberOfMessages, maps:size(AllMessages)), delete_bridge(), ok. receive_simple_test_message(Config) -> #{channel := Channel} = get_channel_connection(Config), #'basic.consume_ok'{consumer_tag = ConsumerTag} = amqp_channel:call( Channel, #'basic.consume'{ queue = rabbit_mq_queue() } ), receive %% This is the first message received #'basic.consume_ok'{} -> ok end, receive {#'basic.deliver'{delivery_tag = DeliveryTag}, Content} -> %% Ack the message amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}), %% Cancel the consumer #'basic.cancel_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}), emqx_utils_json:decode(Content#amqp_msg.payload) end. rabbitmq_config() -> Config = #{ server => rabbit_mq_host(), port => 5672, exchange => rabbit_mq_exchange(), routing_key => rabbit_mq_routing_key() }, #{<<"config">> => Config}. test_data() -> #{<<"msg_field">> => <<"Hello">>}.