diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index f7a1e533f..134ba15b6 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -372,21 +372,7 @@ preproc_parameter(#{config_root := actions, parameters := Parameter}) -> config_root => actions }; preproc_parameter(#{config_root := sources, parameters := Parameter, hookpoints := Hooks}) -> - #{ - payload_template := PayloadTmpl, - qos := QosTmpl, - topic := TopicTmpl - } = Parameter, - Parameter#{ - payload_template => emqx_placeholder:preproc_tmpl(PayloadTmpl), - qos => preproc_qos(QosTmpl), - topic => emqx_placeholder:preproc_tmpl(TopicTmpl), - hookpoints => Hooks, - config_root => sources - }. - -preproc_qos(Qos) when is_integer(Qos) -> Qos; -preproc_qos(Qos) -> emqx_placeholder:preproc_tmpl(Qos). + Parameter#{hookpoints => Hooks, config_root => sources}. delivery_mode(non_persistent) -> 1; delivery_mode(persistent) -> 2. diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl index 34c945937..3fb00632c 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl @@ -101,6 +101,7 @@ fields(action_parameters) -> hoconsc:mk( binary(), #{ + default => <<"">>, desc => ?DESC(?CONNECTOR_SCHEMA, "payload_template") } )} @@ -126,39 +127,6 @@ fields(subscriber_source) -> ); fields(source_parameters) -> [ - {wait_for_publish_confirmations, - hoconsc:mk( - boolean(), - #{ - default => true, - desc => ?DESC(?CONNECTOR_SCHEMA, "wait_for_publish_confirmations") - } - )}, - {topic, - ?HOCON( - binary(), - #{ - required => true, - validator => fun emqx_schema:non_empty_string/1, - desc => ?DESC("source_topic") - } - )}, - {qos, - ?HOCON( - ?UNION([emqx_schema:qos(), binary()]), - #{ - default => 0, - desc => ?DESC("source_qos") - } - )}, - {payload_template, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("source_payload_template") - } - )}, {queue, ?HOCON( binary(), @@ -167,6 +135,14 @@ fields(source_parameters) -> desc => ?DESC("source_queue") } )}, + {wait_for_publish_confirmations, + hoconsc:mk( + boolean(), + #{ + default => true, + desc => ?DESC(?CONNECTOR_SCHEMA, "wait_for_publish_confirmations") + } + )}, {no_ack, ?HOCON( boolean(), @@ -260,9 +236,6 @@ source_examples(Method) -> _ConnectorType = rabbitmq, #{ parameters => #{ - topic => <<"${payload.mqtt_topic}">>, - qos => <<"${payload.mqtt_qos}">>, - payload_template => <<"${payload.mqtt_payload}">>, queue => <<"test_queue">>, no_ack => true } diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl index b102faf5d..d0d43641b 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl @@ -37,28 +37,13 @@ handle_cast(_Request, State) -> handle_info( {#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{ payload = Payload, - props = #'P_basic'{message_id = MessageId, headers = Headers} + props = PBasic }}, {Channel, InstanceId, Params} = State ) -> - #{ - hookpoints := Hooks, - payload_template := PayloadTmpl, - qos := QoSTmpl, - topic := TopicTmpl, - no_ack := NoAck - } = Params, - MQTTMsg = emqx_message:make( - make_message_id(MessageId), - InstanceId, - render(Payload, QoSTmpl), - render(Payload, TopicTmpl), - render(Payload, PayloadTmpl), - #{}, - make_headers(Headers) - ), - _ = emqx:publish(MQTTMsg), - lists:foreach(fun(Hook) -> emqx_hooks:run(Hook, [MQTTMsg]) end, Hooks), + Message = to_map(PBasic, Payload), + #{hookpoints := Hooks, no_ack := NoAck} = Params, + lists:foreach(fun(Hook) -> emqx_hooks:run(Hook, [Message]) end, Hooks), (NoAck =:= false) andalso amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}), emqx_resource_metrics:received_inc(InstanceId), @@ -68,18 +53,52 @@ handle_info(#'basic.cancel_ok'{}, State) -> handle_info(_Info, State) -> {noreply, State}. +to_map(PBasic, Payload) -> + #'P_basic'{ + content_type = ContentType, + content_encoding = ContentEncoding, + headers = Headers, + delivery_mode = DeliveryMode, + priority = Priority, + correlation_id = CorrelationId, + reply_to = ReplyTo, + expiration = Expiration, + message_id = MessageId, + timestamp = Timestamp, + type = Type, + user_id = UserId, + app_id = AppId, + cluster_id = ClusterId + } = PBasic, + Message = #{ + <<"payload">> => make_payload(Payload), + <<"content_type">> => ContentType, + <<"content_encoding">> => ContentEncoding, + <<"headers">> => make_headers(Headers), + <<"delivery_mode">> => DeliveryMode, + <<"priority">> => Priority, + <<"correlation_id">> => CorrelationId, + <<"reply_to">> => ReplyTo, + <<"expiration">> => Expiration, + <<"message_id">> => MessageId, + <<"timestamp">> => Timestamp, + <<"type">> => Type, + <<"user_id">> => UserId, + <<"app_id">> => AppId, + <<"cluster_id">> => ClusterId + }, + maps:filtermap(fun(_K, V) -> V =/= undefined andalso V =/= <<"undefined">> end, Message). + terminate(_Reason, _State) -> ok. -render(_Message, QoS) when is_integer(QoS) -> QoS; -render(Message, PayloadTmpl) -> - Opts = #{return => full_binary}, - emqx_placeholder:proc_tmpl(PayloadTmpl, Message, Opts). - -make_message_id(undefined) -> emqx_guid:gen(); -make_message_id(Id) -> Id. - make_headers(undefined) -> - #{}; + undefined; make_headers(Headers) when is_list(Headers) -> maps:from_list([{Key, Value} || {Key, _Type, Value} <- Headers]). + +make_payload(Payload) -> + case emqx_utils_json:safe_decode(Payload, [return_maps]) of + {ok, Map} -> Map; + {error, _} -> Payload + end. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl deleted file mode 100644 index 7698608d3..000000000 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl +++ /dev/null @@ -1,423 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_bridge_rabbitmq_SUITE). - --compile(nowarn_export_all). --compile(export_all). - --include_lib("emqx_connector/include/emqx_connector.hrl"). --include_lib("eunit/include/eunit.hrl"). --include_lib("stdlib/include/assert.hrl"). --include_lib("amqp_client/include/amqp_client.hrl"). - -%% See comment in -%% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to -%% run this without bringing up the whole CI infrastructure --define(TYPE, <<"rabbitmq">>). - -rabbit_mq_host() -> - <<"rabbitmq">>. - -rabbit_mq_port() -> - 5672. - -rabbit_mq_exchange() -> - <<"messages">>. - -rabbit_mq_queue() -> - <<"test_queue">>. - -rabbit_mq_routing_key() -> - <<"test_routing_key">>. - -get_channel_connection(Config) -> - proplists:get_value(channel_connection, Config). - -get_rabbitmq(Config) -> - proplists:get_value(rabbitmq, Config). - -get_tls(Config) -> - proplists:get_value(tls, Config). - -%%------------------------------------------------------------------------------ -%% Common Test Setup, Tear down and Testcase List -%%------------------------------------------------------------------------------ - -all() -> - [ - {group, tcp}, - {group, tls} - ]. - -groups() -> - AllTCs = emqx_common_test_helpers:all(?MODULE), - [ - {tcp, AllTCs}, - {tls, AllTCs} - ]. - -init_per_suite(Config) -> - Config. - -end_per_suite(_Config) -> - ok. - -init_per_group(tcp, Config) -> - RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"), - RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")), - case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of - true -> - Config1 = common_init_per_group(#{ - host => RabbitMQHost, port => RabbitMQPort, tls => false - }), - Config1 ++ Config; - false -> - case os:getenv("IS_CI") of - "yes" -> - throw(no_rabbitmq); - _ -> - {skip, no_rabbitmq} - end - end; -init_per_group(tls, Config) -> - RabbitMQHost = os:getenv("RABBITMQ_TLS_HOST", "rabbitmq"), - RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_TLS_PORT", "5671")), - case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of - true -> - Config1 = common_init_per_group(#{ - host => RabbitMQHost, port => RabbitMQPort, tls => true - }), - Config1 ++ Config; - false -> - case os:getenv("IS_CI") of - "yes" -> - throw(no_rabbitmq); - _ -> - {skip, no_rabbitmq} - end - end; -init_per_group(_Group, Config) -> - Config. - -common_init_per_group(Opts) -> - emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), - ok = emqx_connector_test_helpers:start_apps([emqx_resource]), - {ok, _} = application:ensure_all_started(emqx_connector), - {ok, _} = application:ensure_all_started(amqp_client), - emqx_mgmt_api_test_util:init_suite(), - #{host := Host, port := Port, tls := UseTLS} = Opts, - ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS), - [ - {channel_connection, ChannelConnection}, - {rabbitmq, #{server => Host, port => Port}}, - {tls, UseTLS} - ]. - -setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS) -> - SSLOptions = - case UseTLS of - false -> none; - true -> emqx_tls_lib:to_client_opts(ssl_options(UseTLS)) - end, - %% Create an exchange and a queue - {ok, Connection} = - amqp_connection:start(#amqp_params_network{ - host = Host, - port = Port, - ssl_options = SSLOptions - }), - {ok, Channel} = amqp_connection:open_channel(Connection), - %% Create an exchange - #'exchange.declare_ok'{} = - amqp_channel:call( - Channel, - #'exchange.declare'{ - exchange = rabbit_mq_exchange(), - type = <<"topic">> - } - ), - %% Create a queue - #'queue.declare_ok'{} = - amqp_channel:call( - Channel, - #'queue.declare'{queue = rabbit_mq_queue()} - ), - %% Bind the queue to the exchange - #'queue.bind_ok'{} = - amqp_channel:call( - Channel, - #'queue.bind'{ - queue = rabbit_mq_queue(), - exchange = rabbit_mq_exchange(), - routing_key = rabbit_mq_routing_key() - } - ), - #{ - connection => Connection, - channel => Channel - }. - -end_per_group(_Group, Config) -> - #{ - connection := Connection, - channel := Channel - } = get_channel_connection(Config), - emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), - _ = application:stop(emqx_connector), - _ = application:stop(emqx_bridge), - %% Close the channel - ok = amqp_channel:close(Channel), - %% Close the connection - ok = amqp_connection:close(Connection). - -init_per_testcase(_, Config) -> - Config. - -end_per_testcase(_, _Config) -> - ok. - -rabbitmq_config(UseTLS, Config) -> - BatchSize = maps:get(batch_size, Config, 1), - BatchTime = maps:get(batch_time_ms, Config, 0), - Name = atom_to_binary(?MODULE), - Server = maps:get(server, Config, rabbit_mq_host()), - Port = maps:get(port, Config, rabbit_mq_port()), - Template = maps:get(payload_template, Config, <<"">>), - ConfigString = - io_lib:format( - "bridges.rabbitmq.~s {\n" - " enable = true\n" - " ssl = ~s\n" - " server = \"~s\"\n" - " port = ~p\n" - " username = \"guest\"\n" - " password = \"guest\"\n" - " routing_key = \"~s\"\n" - " exchange = \"~s\"\n" - " payload_template = \"~s\"\n" - " resource_opts = {\n" - " batch_size = ~b\n" - " batch_time = ~bms\n" - " }\n" - "}\n", - [ - Name, - hocon_pp:do(ssl_options(UseTLS), #{embedded => true}), - Server, - Port, - rabbit_mq_routing_key(), - rabbit_mq_exchange(), - Template, - BatchSize, - BatchTime - ] - ), - ct:pal(ConfigString), - parse_and_check(ConfigString, <<"rabbitmq">>, Name). - -ssl_options(true) -> - CertsDir = filename:join([ - emqx_common_test_helpers:proj_root(), - ".ci", - "docker-compose-file", - "certs" - ]), - #{ - enable => true, - cacertfile => filename:join([CertsDir, "ca.crt"]), - certfile => filename:join([CertsDir, "client.pem"]), - keyfile => filename:join([CertsDir, "client.key"]) - }; -ssl_options(false) -> - #{ - enable => false - }. - -parse_and_check(ConfigString, BridgeType, Name) -> - {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), - hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), - #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, - RetConfig. - -create_bridge(Name, UseTLS, Config) -> - BridgeConfig = rabbitmq_config(UseTLS, Config), - {ok, _} = emqx_bridge:create(?TYPE, Name, BridgeConfig), - emqx_bridge_resource:bridge_id(?TYPE, Name). - -delete_bridge(Name) -> - ok = emqx_bridge:remove(?TYPE, Name). - -%%------------------------------------------------------------------------------ -%% Test Cases -%%------------------------------------------------------------------------------ - -t_create_delete_bridge(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - RabbitMQ = get_rabbitmq(Config), - UseTLS = get_tls(Config), - create_bridge(Name, UseTLS, RabbitMQ), - Bridges = emqx_bridge:list(), - Any = fun(#{name := BName}) -> BName =:= Name end, - ?assert(lists:any(Any, Bridges), Bridges), - ok = delete_bridge(Name), - BridgesAfterDelete = emqx_bridge:list(), - ?assertNot(lists:any(Any, BridgesAfterDelete), BridgesAfterDelete), - ok. - -t_create_delete_bridge_non_existing_server(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - UseTLS = get_tls(Config), - create_bridge(Name, UseTLS, #{server => <<"non_existing_server">>, port => 3174}), - %% Check that the new bridge is in the list of bridges - Bridges = emqx_bridge:list(), - Any = fun(#{name := BName}) -> BName =:= Name end, - ?assert(lists:any(Any, Bridges)), - ok = delete_bridge(Name), - BridgesAfterDelete = emqx_bridge:list(), - ?assertNot(lists:any(Any, BridgesAfterDelete)), - ok. - -t_send_message_query(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - RabbitMQ = get_rabbitmq(Config), - UseTLS = get_tls(Config), - BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{batch_size => 1}), - Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}, - %% This will use the SQL template included in the bridge - emqx_bridge:send_message(BridgeID, Payload), - %% Check that the data got to the database - ?assertEqual(Payload, receive_simple_test_message(Config)), - ok = delete_bridge(Name), - ok. - -t_send_message_query_with_template(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - RabbitMQ = get_rabbitmq(Config), - UseTLS = get_tls(Config), - BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{ - batch_size => 1, - payload_template => - << - "{" - " \\\"key\\\": ${key}," - " \\\"data\\\": \\\"${data}\\\"," - " \\\"timestamp\\\": ${timestamp}," - " \\\"secret\\\": 42" - "}" - >> - }), - Payload = #{ - <<"key">> => 7, - <<"data">> => <<"RabbitMQ">>, - <<"timestamp">> => 10000 - }, - emqx_bridge:send_message(BridgeID, Payload), - %% Check that the data got to the database - ExpectedResult = Payload#{ - <<"secret">> => 42 - }, - ?assertEqual(ExpectedResult, receive_simple_test_message(Config)), - ok = delete_bridge(Name), - ok. - -t_send_simple_batch(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - RabbitMQ = get_rabbitmq(Config), - BridgeConf = RabbitMQ#{batch_size => 100}, - UseTLS = get_tls(Config), - BridgeID = create_bridge(Name, UseTLS, BridgeConf), - Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}, - emqx_bridge:send_message(BridgeID, Payload), - ?assertEqual(Payload, receive_simple_test_message(Config)), - ok = delete_bridge(Name), - ok. - -t_send_simple_batch_with_template(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - RabbitMQ = get_rabbitmq(Config), - UseTLS = get_tls(Config), - BridgeConf = - RabbitMQ#{ - batch_size => 100, - payload_template => - << - "{" - " \\\"key\\\": ${key}," - " \\\"data\\\": \\\"${data}\\\"," - " \\\"timestamp\\\": ${timestamp}," - " \\\"secret\\\": 42" - "}" - >> - }, - BridgeID = create_bridge(Name, UseTLS, BridgeConf), - Payload = #{ - <<"key">> => 7, - <<"data">> => <<"RabbitMQ">>, - <<"timestamp">> => 10000 - }, - emqx_bridge:send_message(BridgeID, Payload), - ExpectedResult = Payload#{ - <<"secret">> => 42 - }, - ?assertEqual(ExpectedResult, receive_simple_test_message(Config)), - ok = delete_bridge(Name), - ok. - -t_heavy_batching(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - NumberOfMessages = 20000, - RabbitMQ = get_rabbitmq(Config), - UseTLS = get_tls(Config), - BridgeConf = RabbitMQ#{ - batch_size => 10173, - batch_time_ms => 50 - }, - BridgeID = create_bridge(Name, UseTLS, BridgeConf), - SendMessage = fun(Key) -> - Payload = #{<<"key">> => Key}, - emqx_bridge:send_message(BridgeID, Payload) - end, - [SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)], - AllMessages = lists:foldl( - fun(_, Acc) -> - Message = receive_simple_test_message(Config), - #{<<"key">> := Key} = Message, - Acc#{Key => true} - end, - #{}, - lists:seq(1, NumberOfMessages) - ), - ?assertEqual(NumberOfMessages, maps:size(AllMessages)), - ok = delete_bridge(Name), - ok. - -receive_simple_test_message(Config) -> - #{channel := Channel} = get_channel_connection(Config), - #'basic.consume_ok'{consumer_tag = ConsumerTag} = - amqp_channel:call( - Channel, - #'basic.consume'{ - queue = rabbit_mq_queue() - } - ), - receive - %% This is the first message received - #'basic.consume_ok'{} -> - ok - end, - receive - {#'basic.deliver'{delivery_tag = DeliveryTag}, Content} -> - %% Ack the message - amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}), - %% Cancel the consumer - #'basic.cancel_ok'{consumer_tag = ConsumerTag} = - amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}), - emqx_utils_json:decode(Content#amqp_msg.payload) - after 5000 -> - ?assert(false, "Did not receive message within 5 second") - end. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl index ee5a2609b..56cdc8b0d 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_rabbitmq_connector_SUITE). @@ -12,6 +12,18 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-import(emqx_bridge_rabbitmq_test_utils, [ + rabbit_mq_exchange/0, + rabbit_mq_routing_key/0, + rabbit_mq_queue/0, + rabbit_mq_host/0, + rabbit_mq_port/0, + get_rabbitmq/1, + ssl_options/1, + get_channel_connection/1, + parse_and_check/4, + receive_message_from_rabbitmq/1 +]). %% This test SUITE requires a running RabbitMQ instance. If you don't want to %% bring up the whole CI infrastructure with the `scripts/ct/run.sh` script @@ -21,99 +33,24 @@ %% %% docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3.11-management -rabbit_mq_host() -> - list_to_binary(os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq")). - -rabbit_mq_port() -> - list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")). - -rabbit_mq_password() -> - <<"guest">>. - -rabbit_mq_exchange() -> - <<"test_exchange">>. - -rabbit_mq_queue() -> - <<"test_queue">>. - -rabbit_mq_routing_key() -> - <<"test_routing_key">>. - all() -> - emqx_common_test_helpers:all(?MODULE). + [ + {group, tcp}, + {group, tls} + ]. -init_per_suite(Config) -> - Host = rabbit_mq_host(), - Port = rabbit_mq_port(), - ct:pal("rabbitmq:~p~n", [{Host, Port}]), - case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of - true -> - Apps = emqx_cth_suite:start( - [emqx_conf, emqx_connector, emqx_bridge_rabbitmq], - #{work_dir => emqx_cth_suite:work_dir(Config)} - ), - ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port), - [{channel_connection, ChannelConnection}, {suite_apps, Apps} | Config]; - false -> - case os:getenv("IS_CI") of - "yes" -> - throw(no_rabbitmq); - _ -> - {skip, no_rabbitmq} - end - end. +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {tcp, AllTCs}, + {tls, AllTCs} + ]. -setup_rabbit_mq_exchange_and_queue(Host, Port) -> - %% Create an exchange and a queue - {ok, Connection} = - amqp_connection:start(#amqp_params_network{ - host = binary_to_list(Host), - port = Port - }), - {ok, Channel} = amqp_connection:open_channel(Connection), - %% Create an exchange - #'exchange.declare_ok'{} = - amqp_channel:call( - Channel, - #'exchange.declare'{ - exchange = rabbit_mq_exchange(), - type = <<"topic">> - } - ), - %% Create a queue - #'queue.declare_ok'{} = - amqp_channel:call( - Channel, - #'queue.declare'{queue = rabbit_mq_queue()} - ), - %% Bind the queue to the exchange - #'queue.bind_ok'{} = - amqp_channel:call( - Channel, - #'queue.bind'{ - queue = rabbit_mq_queue(), - exchange = rabbit_mq_exchange(), - routing_key = rabbit_mq_routing_key() - } - ), - #{ - connection => Connection, - channel => Channel - }. +init_per_group(Group, Config) -> + emqx_bridge_rabbitmq_test_utils:init_per_group(Group, Config). -get_channel_connection(Config) -> - proplists:get_value(channel_connection, Config). - -end_per_suite(Config) -> - #{ - connection := Connection, - channel := Channel - } = get_channel_connection(Config), - %% Close the channel - ok = amqp_channel:close(Channel), - %% Close the connection - ok = amqp_connection:close(Connection), - ok = emqx_cth_suite:stop(?config(suite_apps, Config)). +end_per_group(Group, Config) -> + emqx_bridge_rabbitmq_test_utils:end_per_group(Group, Config). % %%------------------------------------------------------------------------------ % %% Testcases @@ -143,7 +80,6 @@ t_start_passfile(Config) -> ). perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) -> - #{channel := Channel} = get_channel_connection(TestConfig), CheckedConfig = check_config(InitialConfig), #{ id := PoolName, @@ -159,7 +95,7 @@ perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) -> emqx_resource:get_instance(ResourceID), ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)), %% Perform query as further check that the resource is working as expected - perform_query(ResourceID, Channel), + perform_query(ResourceID, TestConfig), ?assertEqual(ok, emqx_resource:stop(ResourceID)), %% Resource will be listed still, but state will be changed and healthcheck will fail %% as the worker no longer exists. @@ -181,7 +117,7 @@ perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) -> emqx_resource:get_instance(ResourceID), ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)), %% Check that everything is working again by performing a query - perform_query(ResourceID, Channel), + perform_query(ResourceID, TestConfig), % Stop and remove the resource in one go. ?assertEqual(ok, emqx_resource:remove_local(ResourceID)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), @@ -214,37 +150,12 @@ perform_query(PoolName, Channel) -> ?assertEqual(ok, emqx_resource_manager:add_channel(PoolName, ChannelId, ActionConfig)), ok = emqx_resource:query(PoolName, {ChannelId, payload()}), %% Get the message from queue: - ok = receive_simple_test_message(Channel), + SendData = test_data(), + RecvData = receive_message_from_rabbitmq(Channel), + ?assertMatch(SendData, RecvData), ?assertEqual(ok, emqx_resource_manager:remove_channel(PoolName, ChannelId)), ok. -receive_simple_test_message(Channel) -> - #'basic.consume_ok'{consumer_tag = ConsumerTag} = - amqp_channel:call( - Channel, - #'basic.consume'{ - queue = rabbit_mq_queue() - } - ), - receive - %% This is the first message received - #'basic.consume_ok'{} -> - ok - end, - receive - {#'basic.deliver'{delivery_tag = DeliveryTag}, Content} -> - Expected = test_data(), - ?assertEqual(Expected, emqx_utils_json:decode(Content#amqp_msg.payload)), - %% Ack the message - amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}), - %% Cancel the consumer - #'basic.cancel_ok'{consumer_tag = ConsumerTag} = - amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}), - ok - after 5000 -> - ?assert(false, "Did not receive message within 5 second") - end. - rabbitmq_config() -> rabbitmq_config(#{}). @@ -278,3 +189,6 @@ rabbitmq_action_config() -> wait_for_publish_confirmations => true } }. + +rabbit_mq_password() -> + <<"guest">>. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_test_utils.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_test_utils.erl new file mode 100644 index 000000000..47df47976 --- /dev/null +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_test_utils.erl @@ -0,0 +1,203 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_rabbitmq_test_utils). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +init_per_group(tcp, Config) -> + RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"), + RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")), + case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of + true -> + Config1 = common_init_per_group(#{ + host => RabbitMQHost, port => RabbitMQPort, tls => false + }), + Config1 ++ Config; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_rabbitmq); + _ -> + {skip, no_rabbitmq} + end + end; +init_per_group(tls, Config) -> + RabbitMQHost = os:getenv("RABBITMQ_TLS_HOST", "rabbitmq"), + RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_TLS_PORT", "5671")), + case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of + true -> + Config1 = common_init_per_group(#{ + host => RabbitMQHost, port => RabbitMQPort, tls => true + }), + Config1 ++ Config; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_rabbitmq); + _ -> + {skip, no_rabbitmq} + end + end; +init_per_group(_Group, Config) -> + Config. + +common_init_per_group(Opts) -> + emqx_common_test_helpers:render_and_load_app_config(emqx_conf), + ok = emqx_common_test_helpers:start_apps([ + emqx_conf, emqx_bridge, emqx_bridge_rabbitmq, emqx_rule_engine + ]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource]), + {ok, _} = application:ensure_all_started(emqx_connector), + {ok, _} = application:ensure_all_started(amqp_client), + emqx_mgmt_api_test_util:init_suite(), + #{host := Host, port := Port, tls := UseTLS} = Opts, + ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS), + [ + {channel_connection, ChannelConnection}, + {rabbitmq, #{server => Host, port => Port, tls => UseTLS}} + ]. + +setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS) -> + SSLOptions = + case UseTLS of + false -> none; + true -> emqx_tls_lib:to_client_opts(ssl_options(UseTLS)) + end, + %% Create an exchange and a queue + {ok, Connection} = + amqp_connection:start(#amqp_params_network{ + host = Host, + port = Port, + ssl_options = SSLOptions + }), + {ok, Channel} = amqp_connection:open_channel(Connection), + %% Create an exchange + #'exchange.declare_ok'{} = + amqp_channel:call( + Channel, + #'exchange.declare'{ + exchange = rabbit_mq_exchange(), + type = <<"topic">> + } + ), + %% Create a queue + #'queue.declare_ok'{} = + amqp_channel:call( + Channel, + #'queue.declare'{queue = rabbit_mq_queue()} + ), + %% Bind the queue to the exchange + #'queue.bind_ok'{} = + amqp_channel:call( + Channel, + #'queue.bind'{ + queue = rabbit_mq_queue(), + exchange = rabbit_mq_exchange(), + routing_key = rabbit_mq_routing_key() + } + ), + #{ + connection => Connection, + channel => Channel + }. + +end_per_group(_Group, Config) -> + #{ + connection := Connection, + channel := Channel + } = get_channel_connection(Config), + amqp_channel:call(Channel, #'queue.purge'{queue = rabbit_mq_queue()}), + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge_rabbitmq, emqx_rule_engine]), + ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), + _ = application:stop(emqx_connector), + _ = application:stop(emqx_bridge), + %% Close the channel + ok = amqp_channel:close(Channel), + %% Close the connection + ok = amqp_connection:close(Connection). + +rabbit_mq_host() -> + list_to_binary(os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq")). + +rabbit_mq_port() -> + list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")). + +rabbit_mq_exchange() -> + <<"messages">>. + +rabbit_mq_queue() -> + <<"test_queue">>. + +rabbit_mq_routing_key() -> + <<"test_routing_key">>. + +get_rabbitmq(Config) -> + proplists:get_value(rabbitmq, Config). + +get_channel_connection(Config) -> + proplists:get_value(channel_connection, Config). + +ssl_options(true) -> + CertsDir = filename:join([ + emqx_common_test_helpers:proj_root(), + ".ci", + "docker-compose-file", + "certs" + ]), + #{ + enable => true, + cacertfile => filename:join([CertsDir, "ca.crt"]), + certfile => filename:join([CertsDir, "client.pem"]), + keyfile => filename:join([CertsDir, "client.key"]) + }; +ssl_options(false) -> + #{ + enable => false + }. + +parse_and_check(Key, Mod, Conf, Name) -> + ConfStr = hocon_pp:do(Conf, #{}), + ct:pal(ConfStr), + {ok, RawConf} = hocon:binary(ConfStr, #{format => map}), + hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}), + #{Key := #{<<"rabbitmq">> := #{Name := RetConf}}} = RawConf, + RetConf. + +receive_message_from_rabbitmq(Config) -> + #{channel := Channel} = get_channel_connection(Config), + #'basic.consume_ok'{consumer_tag = ConsumerTag} = + amqp_channel:call( + Channel, + #'basic.consume'{ + queue = rabbit_mq_queue() + } + ), + receive + %% This is the first message received + #'basic.consume_ok'{} -> + ok + end, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, Content} -> + %% Ack the message + amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}), + %% Cancel the consumer + #'basic.cancel_ok'{consumer_tag = ConsumerTag} = + amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}), + Payload = Content#amqp_msg.payload, + case emqx_utils_json:safe_decode(Payload, [return_maps]) of + {ok, Msg} -> Msg; + {error, _} -> ?assert(false, {"Failed to decode the message", Payload}) + end + after 5000 -> + ?assert(false, "Did not receive message within 5 second") + end. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v1_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v1_SUITE.erl new file mode 100644 index 000000000..48756c616 --- /dev/null +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v1_SUITE.erl @@ -0,0 +1,221 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_rabbitmq_v1_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +%% See comment in +%% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to +%% run this without bringing up the whole CI infrastructure +-define(TYPE, <<"rabbitmq">>). +-import(emqx_bridge_rabbitmq_test_utils, [ + rabbit_mq_exchange/0, + rabbit_mq_routing_key/0, + rabbit_mq_queue/0, + rabbit_mq_host/0, + rabbit_mq_port/0, + get_rabbitmq/1, + ssl_options/1, + get_channel_connection/1, + parse_and_check/4, + receive_message_from_rabbitmq/1 +]). +%%------------------------------------------------------------------------------ +%% Common Test Setup, Tear down and Testcase List +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, tcp}, + {group, tls} + ]. + +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {tcp, AllTCs}, + {tls, AllTCs} + ]. + +init_per_group(Group, Config) -> + emqx_bridge_rabbitmq_test_utils:init_per_group(Group, Config). + +end_per_group(Group, Config) -> + emqx_bridge_rabbitmq_test_utils:end_per_group(Group, Config). + +create_bridge(Name, Config) -> + BridgeConfig = rabbitmq_config(Config), + {ok, _} = emqx_bridge:create(?TYPE, Name, BridgeConfig), + emqx_bridge_resource:bridge_id(?TYPE, Name). + +delete_bridge(Name) -> + ok = emqx_bridge:remove(?TYPE, Name). + +%%------------------------------------------------------------------------------ +%% Test Cases +%%------------------------------------------------------------------------------ + +t_create_delete_bridge(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + create_bridge(Name, RabbitMQ), + Bridges = emqx_bridge:list(), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Bridges), Bridges), + ok = delete_bridge(Name), + BridgesAfterDelete = emqx_bridge:list(), + ?assertNot(lists:any(Any, BridgesAfterDelete), BridgesAfterDelete), + ok. + +t_create_delete_bridge_non_existing_server(_Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + create_bridge(Name, #{server => <<"non_existing_server">>, port => 3174}), + %% Check that the new bridge is in the list of bridges + Bridges = emqx_bridge:list(), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Bridges)), + ok = delete_bridge(Name), + BridgesAfterDelete = emqx_bridge:list(), + ?assertNot(lists:any(Any, BridgesAfterDelete)), + ok. + +t_send_message_query(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + BridgeID = create_bridge(Name, RabbitMQ#{batch_size => 1}), + Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}, + %% This will use the SQL template included in the bridge + emqx_bridge:send_message(BridgeID, Payload), + %% Check that the data got to the database + ?assertEqual(Payload, receive_message_from_rabbitmq(Config)), + ok = delete_bridge(Name), + ok. + +t_send_message_query_with_template(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + BridgeID = create_bridge(Name, RabbitMQ#{ + batch_size => 1, + payload_template => payload_template() + }), + Payload = #{ + <<"key">> => 7, + <<"data">> => <<"RabbitMQ">>, + <<"timestamp">> => 10000 + }, + emqx_bridge:send_message(BridgeID, Payload), + %% Check that the data got to the database + ExpectedResult = Payload#{ + <<"secret">> => 42 + }, + ?assertEqual(ExpectedResult, receive_message_from_rabbitmq(Config)), + ok = delete_bridge(Name), + ok. + +t_send_simple_batch(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + BridgeConf = RabbitMQ#{batch_size => 100}, + BridgeID = create_bridge(Name, BridgeConf), + Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}, + emqx_bridge:send_message(BridgeID, Payload), + ?assertEqual(Payload, receive_message_from_rabbitmq(Config)), + ok = delete_bridge(Name), + ok. + +t_send_simple_batch_with_template(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + BridgeConf = + RabbitMQ#{ + batch_size => 100, + payload_template => payload_template() + }, + BridgeID = create_bridge(Name, BridgeConf), + Payload = #{ + <<"key">> => 7, + <<"data">> => <<"RabbitMQ">>, + <<"timestamp">> => 10000 + }, + emqx_bridge:send_message(BridgeID, Payload), + ExpectedResult = Payload#{<<"secret">> => 42}, + ?assertEqual(ExpectedResult, receive_message_from_rabbitmq(Config)), + ok = delete_bridge(Name), + ok. + +t_heavy_batching(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + NumberOfMessages = 20000, + RabbitMQ = get_rabbitmq(Config), + BridgeConf = RabbitMQ#{ + batch_size => 10173, + batch_time_ms => 50 + }, + BridgeID = create_bridge(Name, BridgeConf), + SendMessage = fun(Key) -> + Payload = #{<<"key">> => Key}, + emqx_bridge:send_message(BridgeID, Payload) + end, + [SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)], + AllMessages = lists:foldl( + fun(_, Acc) -> + Message = receive_message_from_rabbitmq(Config), + #{<<"key">> := Key} = Message, + Acc#{Key => true} + end, + #{}, + lists:seq(1, NumberOfMessages) + ), + ?assertEqual(NumberOfMessages, maps:size(AllMessages)), + ok = delete_bridge(Name), + ok. + +rabbitmq_config(Config) -> + UseTLS = maps:get(tls, Config, false), + BatchSize = maps:get(batch_size, Config, 1), + BatchTime = maps:get(batch_time_ms, Config, 0), + Name = atom_to_binary(?MODULE), + Server = maps:get(server, Config, rabbit_mq_host()), + Port = maps:get(port, Config, rabbit_mq_port()), + Template = maps:get(payload_template, Config, <<"">>), + Bridge = + #{ + <<"bridges">> => #{ + <<"rabbitmq">> => #{ + Name => #{ + <<"enable">> => true, + <<"ssl">> => ssl_options(UseTLS), + <<"server">> => Server, + <<"port">> => Port, + <<"username">> => <<"guest">>, + <<"password">> => <<"guest">>, + <<"routing_key">> => rabbit_mq_routing_key(), + <<"exchange">> => rabbit_mq_exchange(), + <<"payload_template">> => Template, + <<"resource_opts">> => #{ + <<"batch_size">> => BatchSize, + <<"batch_time">> => BatchTime + } + } + } + } + }, + parse_and_check(<<"bridges">>, emqx_bridge_schema, Bridge, Name). + +payload_template() -> + << + "{" + " \"key\": ${key}," + " \"data\": \"${data}\"," + " \"timestamp\": ${timestamp}," + " \"secret\": 42" + "}" + >>. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl new file mode 100644 index 000000000..8b11f732a --- /dev/null +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl @@ -0,0 +1,261 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_rabbitmq_v2_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-import(emqx_bridge_rabbitmq_test_utils, [ + rabbit_mq_exchange/0, + rabbit_mq_routing_key/0, + rabbit_mq_queue/0, + rabbit_mq_host/0, + rabbit_mq_port/0, + get_rabbitmq/1, + get_tls/1, + ssl_options/1, + get_channel_connection/1, + parse_and_check/4, + receive_message_from_rabbitmq/1 +]). +-import(emqx_common_test_helpers, [on_exit/1]). + +-define(TYPE, <<"rabbitmq">>). + +all() -> + [ + {group, tcp}, + {group, tls} + ]. + +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {tcp, AllTCs}, + {tls, AllTCs} + ]. + +init_per_group(Group, Config) -> + Config1 = emqx_bridge_rabbitmq_test_utils:init_per_group(Group, Config), + Name = atom_to_binary(?MODULE), + create_connector(Name, get_rabbitmq(Config1)), + Config1. + +end_per_group(Group, Config) -> + Name = atom_to_binary(?MODULE), + delete_connector(Name), + emqx_bridge_rabbitmq_test_utils:end_per_group(Group, Config). + +rabbitmq_connector(Config) -> + UseTLS = maps:get(tls, Config, false), + Name = atom_to_binary(?MODULE), + Server = maps:get(server, Config, rabbit_mq_host()), + Port = maps:get(port, Config, rabbit_mq_port()), + Connector = #{ + <<"connectors">> => #{ + <<"rabbitmq">> => #{ + Name => #{ + <<"enable">> => true, + <<"ssl">> => ssl_options(UseTLS), + <<"server">> => Server, + <<"port">> => Port, + <<"username">> => <<"guest">>, + <<"password">> => <<"guest">> + } + } + } + }, + parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name). + +rabbitmq_source() -> + Name = atom_to_binary(?MODULE), + Source = #{ + <<"sources">> => #{ + <<"rabbitmq">> => #{ + Name => #{ + <<"enable">> => true, + <<"connector">> => Name, + <<"parameters">> => #{ + <<"no_ack">> => true, + <<"queue">> => rabbit_mq_queue(), + <<"wait_for_publish_confirmations">> => true + } + } + } + } + }, + parse_and_check(<<"sources">>, emqx_bridge_v2_schema, Source, Name). + +rabbitmq_action() -> + Name = atom_to_binary(?MODULE), + Action = #{ + <<"actions">> => #{ + <<"rabbitmq">> => #{ + Name => #{ + <<"connector">> => Name, + <<"enable">> => true, + <<"parameters">> => #{ + <<"exchange">> => rabbit_mq_exchange(), + <<"payload_template">> => <<"${.payload}">>, + <<"routing_key">> => rabbit_mq_routing_key(), + <<"delivery_mode">> => <<"non_persistent">>, + <<"publish_confirmation_timeout">> => <<"30s">>, + <<"wait_for_publish_confirmations">> => true + } + } + } + } + }, + parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name). + +create_connector(Name, Config) -> + Connector = rabbitmq_connector(Config), + {ok, _} = emqx_connector:create(?TYPE, Name, Connector). + +delete_connector(Name) -> + ok = emqx_connector:remove(?TYPE, Name). + +create_source(Name) -> + Source = rabbitmq_source(), + {ok, _} = emqx_bridge_v2:create(sources, ?TYPE, Name, Source). + +delete_source(Name) -> + ok = emqx_bridge_v2:remove(sources, ?TYPE, Name). + +create_action(Name) -> + Action = rabbitmq_action(), + {ok, _} = emqx_bridge_v2:create(actions, ?TYPE, Name, Action). + +delete_action(Name) -> + ok = emqx_bridge_v2:remove(actions, ?TYPE, Name). + +%%------------------------------------------------------------------------------ +%% Test Cases +%%------------------------------------------------------------------------------ + +t_source(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + create_source(Name), + Sources = emqx_bridge_v2:list(sources), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Sources), Sources), + Topic = <<"tesldkafd">>, + {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( + #{ + sql => <<"select * from \"$bridges/rabbitmq:", Name/binary, "\"">>, + id => atom_to_binary(?FUNCTION_NAME), + actions => [ + #{ + args => #{ + topic => Topic, + mqtt_properties => #{}, + payload => <<"${payload}">>, + qos => 0, + retain => false, + user_properties => [] + }, + function => republish + } + ], + description => <<"bridge_v2 republish rule">> + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + {ok, C1} = emqtt:start_link([{clean_start, true}]), + {ok, _} = emqtt:connect(C1), + {ok, #{}, [0]} = emqtt:subscribe(C1, Topic, [{qos, 0}, {rh, 0}]), + send_test_message_to_rabbitmq(Config), + PayloadBin = emqx_utils_json:encode(payload()), + ?assertMatch( + [ + #{ + dup := false, + properties := undefined, + topic := Topic, + qos := 0, + payload := PayloadBin, + retain := false + } + ], + receive_messages(1) + ), + ok = emqtt:disconnect(C1), + ok = delete_source(Name), + SourcesAfterDelete = emqx_bridge_v2:list(sources), + ?assertNot(lists:any(Any, SourcesAfterDelete), SourcesAfterDelete), + ok. + +t_action(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + create_action(Name), + Actions = emqx_bridge_v2:list(actions), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Actions), Actions), + Topic = <<"lkadfdaction">>, + {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( + #{ + sql => <<"select * from \"", Topic/binary, "\"">>, + id => atom_to_binary(?FUNCTION_NAME), + actions => [<<"rabbitmq:", Name/binary>>], + description => <<"bridge_v2 send msg to rabbitmq action">> + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + {ok, C1} = emqtt:start_link([{clean_start, true}]), + {ok, _} = emqtt:connect(C1), + Payload = payload(), + PayloadBin = emqx_utils_json:encode(Payload), + {ok, _} = emqtt:publish(C1, Topic, #{}, PayloadBin, [{qos, 1}, {retain, false}]), + Msg = receive_message_from_rabbitmq(Config), + ?assertMatch(Payload, Msg), + ok = emqtt:disconnect(C1), + ok = delete_action(Name), + ActionsAfterDelete = emqx_bridge_v2:list(actions), + ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete), + ok. + +receive_messages(Count) -> + receive_messages(Count, []). +receive_messages(0, Msgs) -> + Msgs; +receive_messages(Count, Msgs) -> + receive + {publish, Msg} -> + ct:log("Msg: ~p ~n", [Msg]), + receive_messages(Count - 1, [Msg | Msgs]); + Other -> + ct:log("Other Msg: ~p~n", [Other]), + receive_messages(Count, Msgs) + after 2000 -> + Msgs + end. + +payload() -> + #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}. + +send_test_message_to_rabbitmq(Config) -> + #{channel := Channel} = get_channel_connection(Config), + MessageProperties = #'P_basic'{ + headers = [], + delivery_mode = 1 + }, + Method = #'basic.publish'{ + exchange = rabbit_mq_exchange(), + routing_key = rabbit_mq_routing_key() + }, + amqp_channel:cast( + Channel, + Method, + #amqp_msg{ + payload = emqx_utils_json:encode(payload()), + props = MessageProperties + } + ), + ok. diff --git a/rel/i18n/emqx_bridge_rabbitmq_pubsub_schema.hocon b/rel/i18n/emqx_bridge_rabbitmq_pubsub_schema.hocon index a73394386..82f1781e9 100644 --- a/rel/i18n/emqx_bridge_rabbitmq_pubsub_schema.hocon +++ b/rel/i18n/emqx_bridge_rabbitmq_pubsub_schema.hocon @@ -21,21 +21,6 @@ source_parameters.desc: source_parameters.label: """Source Parameters""" -source_topic.desc: -"""Topic used for constructing MQTT messages, supporting templates.""" -source_topic.label: -"""Source Topic""" - -source_qos.desc: -"""The QoS level of the MQTT message, supporting templates.""" -source_qos.label: -"""QoS""" - -source_payload_template.desc: -"""The template used to construct the payload of the MQTT message.""" -source_payload_template.label: -"""Source Payload Template""" - source_queue.desc: """The queue name of the RabbitMQ broker.""" source_queue.label: