feat: add RabbitMQ bridge
This commit is contained in:
parent
4d1499b747
commit
70cf1533db
|
@ -0,0 +1,17 @@
|
||||||
|
version: '3.9'
|
||||||
|
|
||||||
|
services:
|
||||||
|
rabbitmq:
|
||||||
|
container_name: rabbitmq
|
||||||
|
image: rabbitmq:3.11-management
|
||||||
|
|
||||||
|
restart: always
|
||||||
|
expose:
|
||||||
|
- "15672"
|
||||||
|
- "5672"
|
||||||
|
# We don't want to take ports from the host
|
||||||
|
# ports:
|
||||||
|
# - "15672:15672"
|
||||||
|
# - "5672:5672"
|
||||||
|
networks:
|
||||||
|
- emqx_bridge
|
|
@ -42,7 +42,7 @@
|
||||||
-type bar_separated_list() :: list().
|
-type bar_separated_list() :: list().
|
||||||
-type ip_port() :: tuple() | integer().
|
-type ip_port() :: tuple() | integer().
|
||||||
-type cipher() :: map().
|
-type cipher() :: map().
|
||||||
-type port_number() :: 1..65536.
|
-type port_number() :: 1..65535.
|
||||||
-type server_parse_option() :: #{
|
-type server_parse_option() :: #{
|
||||||
default_port => port_number(),
|
default_port => port_number(),
|
||||||
no_port => boolean(),
|
no_port => boolean(),
|
||||||
|
@ -135,7 +135,8 @@
|
||||||
cipher/0,
|
cipher/0,
|
||||||
comma_separated_atoms/0,
|
comma_separated_atoms/0,
|
||||||
url/0,
|
url/0,
|
||||||
json_binary/0
|
json_binary/0,
|
||||||
|
port_number/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]).
|
-export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]).
|
||||||
|
|
|
@ -47,7 +47,9 @@
|
||||||
-type param_types() :: #{emqx_bpapi:var_name() => _Type}.
|
-type param_types() :: #{emqx_bpapi:var_name() => _Type}.
|
||||||
|
|
||||||
%% Applications and modules we wish to ignore in the analysis:
|
%% Applications and modules we wish to ignore in the analysis:
|
||||||
-define(IGNORED_APPS, "gen_rpc, recon, redbug, observer_cli, snabbkaffe, ekka, mria").
|
-define(IGNORED_APPS,
|
||||||
|
"gen_rpc, recon, redbug, observer_cli, snabbkaffe, ekka, mria, amqp_client, rabbit_common"
|
||||||
|
).
|
||||||
-define(IGNORED_MODULES, "emqx_rpc").
|
-define(IGNORED_MODULES, "emqx_rpc").
|
||||||
%% List of known RPC backend modules:
|
%% List of known RPC backend modules:
|
||||||
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
|
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
Business Source License 1.1
|
||||||
|
|
||||||
|
Licensor: Hangzhou EMQ Technologies Co., Ltd.
|
||||||
|
Licensed Work: EMQX Enterprise Edition
|
||||||
|
The Licensed Work is (c) 2023
|
||||||
|
Hangzhou EMQ Technologies Co., Ltd.
|
||||||
|
Additional Use Grant: Students and educators are granted right to copy,
|
||||||
|
modify, and create derivative work for research
|
||||||
|
or education.
|
||||||
|
Change Date: 2027-02-01
|
||||||
|
Change License: Apache License, Version 2.0
|
||||||
|
|
||||||
|
For information about alternative licensing arrangements for the Software,
|
||||||
|
please contact Licensor: https://www.emqx.com/en/contact
|
||||||
|
|
||||||
|
Notice
|
||||||
|
|
||||||
|
The Business Source License (this document, or the “License”) is not an Open
|
||||||
|
Source license. However, the Licensed Work will eventually be made available
|
||||||
|
under an Open Source License, as stated in this License.
|
||||||
|
|
||||||
|
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
|
||||||
|
“Business Source License” is a trademark of MariaDB Corporation Ab.
|
||||||
|
|
||||||
|
-----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
Business Source License 1.1
|
||||||
|
|
||||||
|
Terms
|
||||||
|
|
||||||
|
The Licensor hereby grants you the right to copy, modify, create derivative
|
||||||
|
works, redistribute, and make non-production use of the Licensed Work. The
|
||||||
|
Licensor may make an Additional Use Grant, above, permitting limited
|
||||||
|
production use.
|
||||||
|
|
||||||
|
Effective on the Change Date, or the fourth anniversary of the first publicly
|
||||||
|
available distribution of a specific version of the Licensed Work under this
|
||||||
|
License, whichever comes first, the Licensor hereby grants you rights under
|
||||||
|
the terms of the Change License, and the rights granted in the paragraph
|
||||||
|
above terminate.
|
||||||
|
|
||||||
|
If your use of the Licensed Work does not comply with the requirements
|
||||||
|
currently in effect as described in this License, you must purchase a
|
||||||
|
commercial license from the Licensor, its affiliated entities, or authorized
|
||||||
|
resellers, or you must refrain from using the Licensed Work.
|
||||||
|
|
||||||
|
All copies of the original and modified Licensed Work, and derivative works
|
||||||
|
of the Licensed Work, are subject to this License. This License applies
|
||||||
|
separately for each version of the Licensed Work and the Change Date may vary
|
||||||
|
for each version of the Licensed Work released by Licensor.
|
||||||
|
|
||||||
|
You must conspicuously display this License on each original or modified copy
|
||||||
|
of the Licensed Work. If you receive the Licensed Work in original or
|
||||||
|
modified form from a third party, the terms and conditions set forth in this
|
||||||
|
License apply to your use of that work.
|
||||||
|
|
||||||
|
Any use of the Licensed Work in violation of this License will automatically
|
||||||
|
terminate your rights under this License for the current and all other
|
||||||
|
versions of the Licensed Work.
|
||||||
|
|
||||||
|
This License does not grant you any right in any trademark or logo of
|
||||||
|
Licensor or its affiliates (provided that you may use a trademark or logo of
|
||||||
|
Licensor as expressly required by this License).
|
||||||
|
|
||||||
|
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
|
||||||
|
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
|
||||||
|
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
|
||||||
|
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
|
||||||
|
TITLE.
|
||||||
|
|
||||||
|
MariaDB hereby grants you permission to use this License’s text to license
|
||||||
|
your works, and to refer to it using the trademark “Business Source License”,
|
||||||
|
as long as you comply with the Covenants of Licensor below.
|
||||||
|
|
||||||
|
Covenants of Licensor
|
||||||
|
|
||||||
|
In consideration of the right to use this License’s text and the “Business
|
||||||
|
Source License” name and trademark, Licensor covenants to MariaDB, and to all
|
||||||
|
other recipients of the licensed work to be provided by Licensor:
|
||||||
|
|
||||||
|
1. To specify as the Change License the GPL Version 2.0 or any later version,
|
||||||
|
or a license that is compatible with GPL Version 2.0 or a later version,
|
||||||
|
where “compatible” means that software provided under the Change License can
|
||||||
|
be included in a program with software provided under GPL Version 2.0 or a
|
||||||
|
later version. Licensor may specify additional Change Licenses without
|
||||||
|
limitation.
|
||||||
|
|
||||||
|
2. To either: (a) specify an additional grant of rights to use that does not
|
||||||
|
impose any additional restriction on the right granted in this License, as
|
||||||
|
the Additional Use Grant; or (b) insert the text “None”.
|
||||||
|
|
||||||
|
3. To specify a Change Date.
|
||||||
|
|
||||||
|
4. Not to modify this License in any other way.
|
|
@ -0,0 +1,46 @@
|
||||||
|
# EMQX RabbitMQ Bridge
|
||||||
|
|
||||||
|
[RabbitMQ](https://www.rabbitmq.com/) is a powerful, open-source message broker
|
||||||
|
that facilitates asynchronous communication between different components of an
|
||||||
|
application. Built on the Advanced Message Queuing Protocol (AMQP), RabbitMQ
|
||||||
|
enables the reliable transmission of messages by decoupling the sender and
|
||||||
|
receiver components. This separation allows for increased scalability,
|
||||||
|
robustness, and flexibility in application architecture.
|
||||||
|
|
||||||
|
RabbitMQ is commonly used for a wide range of purposes, such as distributing
|
||||||
|
tasks among multiple workers, enabling event-driven architectures, and
|
||||||
|
implementing publish-subscribe patterns. It is a popular choice for
|
||||||
|
microservices, distributed systems, and real-time applications, providing an
|
||||||
|
efficient way to handle varying workloads and ensuring message delivery in
|
||||||
|
complex environments.
|
||||||
|
|
||||||
|
This application is used to connect EMQX and RabbitMQ. User can create a rule
|
||||||
|
and easily ingest IoT data into RabbitMQ by leveraging
|
||||||
|
[EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html).
|
||||||
|
|
||||||
|
|
||||||
|
# Documentation
|
||||||
|
|
||||||
|
- Refer to the [RabbitMQ bridge documentation](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-rabbitmq.html)
|
||||||
|
for how to use EMQX dashboard to ingest IoT data into RabbitMQ.
|
||||||
|
- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)
|
||||||
|
for an introduction to the EMQX rules engine.
|
||||||
|
|
||||||
|
|
||||||
|
# HTTP APIs
|
||||||
|
|
||||||
|
- Several APIs are provided for bridge management, which includes create bridge,
|
||||||
|
update bridge, get bridge, stop or restart bridge and list bridges etc.
|
||||||
|
|
||||||
|
Refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges) for more detailed information.
|
||||||
|
|
||||||
|
|
||||||
|
# Contributing
|
||||||
|
|
||||||
|
Please see our [contributing.md](../../CONTRIBUTING.md).
|
||||||
|
|
||||||
|
|
||||||
|
# License
|
||||||
|
|
||||||
|
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
rabbitmq
|
|
@ -0,0 +1,33 @@
|
||||||
|
%% -*- mode: erlang; -*-
|
||||||
|
{erl_opts, [debug_info]}.
|
||||||
|
{deps, [
|
||||||
|
%% The following two are dependencies of rabbit_common
|
||||||
|
{thoas, {git, "https://github.com/emqx/thoas.git", {tag, "v1.0.0"}}}
|
||||||
|
, {credentials_obfuscation, {git, "https://github.com/emqx/credentials-obfuscation.git", {tag, "v3.2.0"}}}
|
||||||
|
%% The v3.11.13_with_app_src tag, employed in the next two dependencies,
|
||||||
|
%% represents a fork of the official RabbitMQ v3.11.13 tag. This fork diverges
|
||||||
|
%% from the official version as it includes app and hrl files
|
||||||
|
%% generated by make files in subdirectories deps/rabbit_common and
|
||||||
|
%% deps/amqp_client (app files are also relocated from the ebin to the src
|
||||||
|
%% directory). This modification ensures compatibility with rebar3, as
|
||||||
|
%% rabbit_common and amqp_client utilize the erlang.mk build tool.
|
||||||
|
%% Similar changes are probably needed when upgrading to newer versions
|
||||||
|
%% of rabbit_common and amqp_client. There are hex packages for rabbit_common and
|
||||||
|
%% amqp_client, but they are not used here as we don't want to depend on
|
||||||
|
%% packages that we don't have control over.
|
||||||
|
, {rabbit_common, {git_subdir,
|
||||||
|
"https://github.com/emqx/rabbitmq-server.git",
|
||||||
|
{tag, "v3.11.13-emqx"},
|
||||||
|
"deps/rabbit_common"}}
|
||||||
|
, {amqp_client, {git_subdir,
|
||||||
|
"https://github.com/emqx/rabbitmq-server.git",
|
||||||
|
{tag, "v3.11.13-emqx"},
|
||||||
|
"deps/amqp_client"}}
|
||||||
|
, {emqx_connector, {path, "../../apps/emqx_connector"}}
|
||||||
|
, {emqx_resource, {path, "../../apps/emqx_resource"}}
|
||||||
|
, {emqx_bridge, {path, "../../apps/emqx_bridge"}}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{shell, [
|
||||||
|
{apps, [emqx_bridge_rabbitmq]}
|
||||||
|
]}.
|
|
@ -0,0 +1,9 @@
|
||||||
|
{application, emqx_bridge_rabbitmq, [
|
||||||
|
{description, "EMQX Enterprise RabbitMQ Bridge"},
|
||||||
|
{vsn, "0.1.0"},
|
||||||
|
{registered, []},
|
||||||
|
{applications, [kernel, stdlib, ecql, rabbit_common, amqp_client]},
|
||||||
|
{env, []},
|
||||||
|
{modules, []},
|
||||||
|
{links, []}
|
||||||
|
]}.
|
|
@ -0,0 +1,124 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_bridge_rabbitmq).
|
||||||
|
|
||||||
|
-include_lib("emqx_bridge/include/emqx_bridge.hrl").
|
||||||
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
|
|
||||||
|
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
conn_bridge_examples/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
namespace/0,
|
||||||
|
roots/0,
|
||||||
|
fields/1,
|
||||||
|
desc/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% -------------------------------------------------------------------------------------------------
|
||||||
|
%% Callback used by HTTP API
|
||||||
|
%% -------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
conn_bridge_examples(Method) ->
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
<<"rabbitmq">> => #{
|
||||||
|
summary => <<"RabbitMQ Bridge">>,
|
||||||
|
value => values(Method, "rabbitmq")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
].
|
||||||
|
|
||||||
|
values(_Method, Type) ->
|
||||||
|
#{
|
||||||
|
enable => true,
|
||||||
|
type => Type,
|
||||||
|
name => <<"foo">>,
|
||||||
|
server => <<"localhost">>,
|
||||||
|
port => 5672,
|
||||||
|
username => <<"guest">>,
|
||||||
|
password => <<"******">>,
|
||||||
|
pool_size => 8,
|
||||||
|
timeout => 5,
|
||||||
|
virtual_host => <<"/">>,
|
||||||
|
heartbeat => <<"30s">>,
|
||||||
|
auto_reconnect => <<"2s">>,
|
||||||
|
exchange => <<"messages">>,
|
||||||
|
exchange_type => <<"topic">>,
|
||||||
|
routing_key => <<"my_routing_key">>,
|
||||||
|
durable => false,
|
||||||
|
payload_template => <<"">>,
|
||||||
|
resource_opts => #{
|
||||||
|
worker_pool_size => 8,
|
||||||
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
|
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
||||||
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
|
query_mode => async,
|
||||||
|
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
||||||
|
%% -------------------------------------------------------------------------------------------------
|
||||||
|
%% Hocon Schema Definitions
|
||||||
|
%% -------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
namespace() -> "bridge_rabbitmq".
|
||||||
|
|
||||||
|
roots() -> [].
|
||||||
|
|
||||||
|
fields("config") ->
|
||||||
|
[
|
||||||
|
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||||
|
{local_topic,
|
||||||
|
mk(
|
||||||
|
binary(),
|
||||||
|
#{desc => ?DESC("local_topic"), default => undefined}
|
||||||
|
)},
|
||||||
|
{resource_opts,
|
||||||
|
mk(
|
||||||
|
ref(?MODULE, "creation_opts"),
|
||||||
|
#{
|
||||||
|
required => false,
|
||||||
|
default => #{},
|
||||||
|
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
|
||||||
|
}
|
||||||
|
)}
|
||||||
|
] ++
|
||||||
|
emqx_bridge_rabbitmq_connector:fields(config);
|
||||||
|
fields("creation_opts") ->
|
||||||
|
emqx_resource_schema:fields("creation_opts");
|
||||||
|
fields("post") ->
|
||||||
|
fields("post", clickhouse);
|
||||||
|
fields("put") ->
|
||||||
|
fields("config");
|
||||||
|
fields("get") ->
|
||||||
|
emqx_bridge_schema:status_fields() ++ fields("post").
|
||||||
|
|
||||||
|
fields("post", Type) ->
|
||||||
|
[type_field(Type), name_field() | fields("config")].
|
||||||
|
|
||||||
|
desc("config") ->
|
||||||
|
?DESC("desc_config");
|
||||||
|
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||||
|
["Configuration for RabbitMQ using `", string:to_upper(Method), "` method."];
|
||||||
|
desc("creation_opts" = Name) ->
|
||||||
|
emqx_resource_schema:desc(Name);
|
||||||
|
desc(_) ->
|
||||||
|
undefined.
|
||||||
|
|
||||||
|
%% -------------------------------------------------------------------------------------------------
|
||||||
|
%% internal
|
||||||
|
%% -------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type_field(Type) ->
|
||||||
|
{type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
|
||||||
|
|
||||||
|
name_field() ->
|
||||||
|
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
|
|
@ -0,0 +1,548 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_rabbitmq_connector).
|
||||||
|
|
||||||
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||||
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
|
%% Needed to create RabbitMQ connection
|
||||||
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||||
|
|
||||||
|
-behaviour(emqx_resource).
|
||||||
|
-behaviour(hocon_schema).
|
||||||
|
-behaviour(ecpool_worker).
|
||||||
|
|
||||||
|
%% hocon_schema callbacks
|
||||||
|
-export([roots/0, fields/1]).
|
||||||
|
|
||||||
|
%% HTTP API callbacks
|
||||||
|
-export([values/1]).
|
||||||
|
|
||||||
|
%% emqx_resource callbacks
|
||||||
|
-export([
|
||||||
|
%% Required callbacks
|
||||||
|
on_start/2,
|
||||||
|
on_stop/2,
|
||||||
|
callback_mode/0,
|
||||||
|
%% Optional callbacks
|
||||||
|
on_get_status/2,
|
||||||
|
on_query/3,
|
||||||
|
is_buffer_supported/0,
|
||||||
|
on_batch_query/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% callbacks for ecpool_worker
|
||||||
|
-export([connect/1]).
|
||||||
|
|
||||||
|
%% Internal callbacks
|
||||||
|
-export([publish_messages/3]).
|
||||||
|
|
||||||
|
roots() ->
|
||||||
|
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
||||||
|
|
||||||
|
fields(config) ->
|
||||||
|
[
|
||||||
|
{server,
|
||||||
|
hoconsc:mk(
|
||||||
|
typerefl:binary(),
|
||||||
|
#{
|
||||||
|
default => <<"localhost">>,
|
||||||
|
desc => ?DESC("server")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{port,
|
||||||
|
hoconsc:mk(
|
||||||
|
emqx_schema:port_number(),
|
||||||
|
#{
|
||||||
|
default => 5672,
|
||||||
|
desc => ?DESC("server")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{username,
|
||||||
|
hoconsc:mk(
|
||||||
|
typerefl:binary(),
|
||||||
|
#{
|
||||||
|
required => true,
|
||||||
|
desc => ?DESC("username")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{password,
|
||||||
|
hoconsc:mk(
|
||||||
|
typerefl:binary(),
|
||||||
|
#{
|
||||||
|
required => true,
|
||||||
|
desc => ?DESC("password")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{pool_size,
|
||||||
|
hoconsc:mk(
|
||||||
|
typerefl:pos_integer(),
|
||||||
|
#{
|
||||||
|
default => 8,
|
||||||
|
desc => ?DESC("pool_size")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{timeout,
|
||||||
|
hoconsc:mk(
|
||||||
|
emqx_schema:duration_ms(),
|
||||||
|
#{
|
||||||
|
default => <<"5s">>,
|
||||||
|
desc => ?DESC("timeout")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{wait_for_publish_confirmations,
|
||||||
|
hoconsc:mk(
|
||||||
|
boolean(),
|
||||||
|
#{
|
||||||
|
default => true,
|
||||||
|
desc => ?DESC("wait_for_publish_confirmations")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{publish_confirmation_timeout,
|
||||||
|
hoconsc:mk(
|
||||||
|
emqx_schema:duration_ms(),
|
||||||
|
#{
|
||||||
|
default => <<"30s">>,
|
||||||
|
desc => ?DESC("timeout")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
|
||||||
|
{virtual_host,
|
||||||
|
hoconsc:mk(
|
||||||
|
typerefl:binary(),
|
||||||
|
#{
|
||||||
|
default => <<"/">>,
|
||||||
|
desc => ?DESC("virtual_host")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{heartbeat,
|
||||||
|
hoconsc:mk(
|
||||||
|
emqx_schema:duration_ms(),
|
||||||
|
#{
|
||||||
|
default => <<"30s">>,
|
||||||
|
desc => ?DESC("heartbeat")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{auto_reconnect,
|
||||||
|
hoconsc:mk(
|
||||||
|
emqx_schema:duration_ms(),
|
||||||
|
#{
|
||||||
|
default => <<"2s">>,
|
||||||
|
desc => ?DESC("auto_reconnect")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
%% Things related to sending messages to RabbitMQ
|
||||||
|
{exchange,
|
||||||
|
hoconsc:mk(
|
||||||
|
typerefl:binary(),
|
||||||
|
#{
|
||||||
|
required => true,
|
||||||
|
desc => ?DESC("exchange")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{routing_key,
|
||||||
|
hoconsc:mk(
|
||||||
|
typerefl:binary(),
|
||||||
|
#{
|
||||||
|
required => true,
|
||||||
|
desc => ?DESC("routing_key")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{delivery_mode,
|
||||||
|
hoconsc:mk(
|
||||||
|
hoconsc:enum([non_persistent, persistent]),
|
||||||
|
#{
|
||||||
|
default => non_persistent,
|
||||||
|
desc => ?DESC("delivery_mode")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{payload_template,
|
||||||
|
hoconsc:mk(
|
||||||
|
binary(),
|
||||||
|
#{
|
||||||
|
default => <<"${.}">>,
|
||||||
|
desc => ?DESC("payload_template")
|
||||||
|
}
|
||||||
|
)}
|
||||||
|
].
|
||||||
|
|
||||||
|
values(post) ->
|
||||||
|
maps:merge(values(put), #{name => <<"connector">>});
|
||||||
|
values(get) ->
|
||||||
|
values(post);
|
||||||
|
values(put) ->
|
||||||
|
#{
|
||||||
|
server => <<"localhost">>,
|
||||||
|
port => 5672,
|
||||||
|
enable => true,
|
||||||
|
pool_size => 8,
|
||||||
|
type => rabbitmq,
|
||||||
|
username => <<"guest">>,
|
||||||
|
password => <<"******">>,
|
||||||
|
routing_key => <<"my_routing_key">>,
|
||||||
|
payload_template => <<"">>
|
||||||
|
};
|
||||||
|
values(_) ->
|
||||||
|
#{}.
|
||||||
|
|
||||||
|
%% ===================================================================
|
||||||
|
%% Callbacks defined in emqx_resource
|
||||||
|
%% ===================================================================
|
||||||
|
|
||||||
|
%% emqx_resource callback
|
||||||
|
|
||||||
|
callback_mode() -> always_sync.
|
||||||
|
|
||||||
|
%% emqx_resource callback
|
||||||
|
|
||||||
|
-spec is_buffer_supported() -> boolean().
|
||||||
|
is_buffer_supported() ->
|
||||||
|
%% We want to make use of EMQX's buffer mechanism
|
||||||
|
false.
|
||||||
|
|
||||||
|
%% emqx_resource callback called when the resource is started
|
||||||
|
|
||||||
|
-spec on_start(resource_id(), term()) -> {ok, resource_state()} | {error, _}.
|
||||||
|
on_start(
|
||||||
|
InstanceID,
|
||||||
|
#{
|
||||||
|
pool_size := PoolSize,
|
||||||
|
payload_template := PayloadTemplate,
|
||||||
|
password := Password,
|
||||||
|
delivery_mode := InitialDeliveryMode
|
||||||
|
} = InitialConfig
|
||||||
|
) ->
|
||||||
|
DeliveryMode =
|
||||||
|
case InitialDeliveryMode of
|
||||||
|
non_persistent -> 1;
|
||||||
|
persistent -> 2
|
||||||
|
end,
|
||||||
|
Config = InitialConfig#{
|
||||||
|
password => emqx_secret:wrap(Password),
|
||||||
|
delivery_mode => DeliveryMode
|
||||||
|
},
|
||||||
|
?SLOG(info, #{
|
||||||
|
msg => "starting_rabbitmq_connector",
|
||||||
|
connector => InstanceID,
|
||||||
|
config => emqx_utils:redact(Config)
|
||||||
|
}),
|
||||||
|
Options = [
|
||||||
|
{config, Config},
|
||||||
|
%% The pool_size is read by ecpool and decides the number of workers in
|
||||||
|
%% the pool
|
||||||
|
{pool_size, PoolSize},
|
||||||
|
{pool, InstanceID}
|
||||||
|
],
|
||||||
|
ProcessedTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate),
|
||||||
|
State = #{
|
||||||
|
poolname => InstanceID,
|
||||||
|
processed_payload_template => ProcessedTemplate,
|
||||||
|
config => Config
|
||||||
|
},
|
||||||
|
case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of
|
||||||
|
ok ->
|
||||||
|
{ok, State};
|
||||||
|
{error, Reason} ->
|
||||||
|
LogMessage =
|
||||||
|
#{
|
||||||
|
msg => "rabbitmq_connector_start_failed",
|
||||||
|
error_reason => Reason,
|
||||||
|
config => emqx_utils:redact(Config)
|
||||||
|
},
|
||||||
|
?SLOG(info, LogMessage),
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% emqx_resource callback called when the resource is stopped
|
||||||
|
|
||||||
|
-spec on_stop(resource_id(), resource_state()) -> term().
|
||||||
|
on_stop(
|
||||||
|
ResourceID,
|
||||||
|
#{poolname := PoolName} = _State
|
||||||
|
) ->
|
||||||
|
?SLOG(info, #{
|
||||||
|
msg => "stopping RabbitMQ connector",
|
||||||
|
connector => ResourceID
|
||||||
|
}),
|
||||||
|
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
||||||
|
Clients = [
|
||||||
|
begin
|
||||||
|
{ok, Client} = ecpool_worker:client(Worker),
|
||||||
|
Client
|
||||||
|
end
|
||||||
|
|| Worker <- Workers
|
||||||
|
],
|
||||||
|
%% We need to stop the pool before stopping the workers as the pool monitors the workers
|
||||||
|
StopResult = emqx_resource_pool:stop(PoolName),
|
||||||
|
lists:foreach(fun stop_worker/1, Clients),
|
||||||
|
StopResult.
|
||||||
|
|
||||||
|
stop_worker({Channel, Connection}) ->
|
||||||
|
amqp_channel:close(Channel),
|
||||||
|
amqp_connection:close(Connection).
|
||||||
|
|
||||||
|
%% This is the callback function that is called by ecpool when the pool is
|
||||||
|
%% started
|
||||||
|
|
||||||
|
-spec connect(term()) -> {ok, {pid(), pid()}, map()} | {error, term()}.
|
||||||
|
connect(Options) ->
|
||||||
|
Config = proplists:get_value(config, Options),
|
||||||
|
try
|
||||||
|
create_rabbitmq_connection_and_channel(Config)
|
||||||
|
catch
|
||||||
|
_:{error, Reason} ->
|
||||||
|
?SLOG(error, #{
|
||||||
|
msg => "rabbitmq_connector_connection_failed",
|
||||||
|
error_type => error,
|
||||||
|
error_reason => Reason,
|
||||||
|
config => emqx_utils:redact(Config)
|
||||||
|
}),
|
||||||
|
{error, Reason};
|
||||||
|
Type:Reason ->
|
||||||
|
?SLOG(error, #{
|
||||||
|
msg => "rabbitmq_connector_connection_failed",
|
||||||
|
error_type => Type,
|
||||||
|
error_reason => Reason,
|
||||||
|
config => emqx_utils:redact(Config)
|
||||||
|
}),
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
create_rabbitmq_connection_and_channel(Config) ->
|
||||||
|
#{
|
||||||
|
server := Host,
|
||||||
|
port := Port,
|
||||||
|
username := Username,
|
||||||
|
password := WrappedPassword,
|
||||||
|
timeout := Timeout,
|
||||||
|
virtual_host := VirtualHost,
|
||||||
|
heartbeat := Heartbeat,
|
||||||
|
wait_for_publish_confirmations := WaitForPublishConfirmations
|
||||||
|
} = Config,
|
||||||
|
Password = emqx_secret:unwrap(WrappedPassword),
|
||||||
|
RabbitMQConnectionOptions =
|
||||||
|
#amqp_params_network{
|
||||||
|
host = erlang:binary_to_list(Host),
|
||||||
|
port = Port,
|
||||||
|
username = Username,
|
||||||
|
password = Password,
|
||||||
|
connection_timeout = Timeout,
|
||||||
|
virtual_host = VirtualHost,
|
||||||
|
heartbeat = Heartbeat
|
||||||
|
},
|
||||||
|
{ok, RabbitMQConnection} =
|
||||||
|
case amqp_connection:start(RabbitMQConnectionOptions) of
|
||||||
|
{ok, Connection} ->
|
||||||
|
{ok, Connection};
|
||||||
|
{error, Reason} ->
|
||||||
|
erlang:error({error, Reason})
|
||||||
|
end,
|
||||||
|
{ok, RabbitMQChannel} =
|
||||||
|
case amqp_connection:open_channel(RabbitMQConnection) of
|
||||||
|
{ok, Channel} ->
|
||||||
|
{ok, Channel};
|
||||||
|
{error, OpenChannelErrorReason} ->
|
||||||
|
erlang:error({error, OpenChannelErrorReason})
|
||||||
|
end,
|
||||||
|
%% We need to enable confirmations if we want to wait for them
|
||||||
|
case WaitForPublishConfirmations of
|
||||||
|
true ->
|
||||||
|
case amqp_channel:call(RabbitMQChannel, #'confirm.select'{}) of
|
||||||
|
#'confirm.select_ok'{} ->
|
||||||
|
ok;
|
||||||
|
Error ->
|
||||||
|
ConfirmModeErrorReason =
|
||||||
|
erlang:iolist_to_binary(
|
||||||
|
io_lib:format(
|
||||||
|
"Could not enable RabbitMQ confirmation mode ~p",
|
||||||
|
[Error]
|
||||||
|
)
|
||||||
|
),
|
||||||
|
erlang:error({error, ConfirmModeErrorReason})
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
{ok, {RabbitMQConnection, RabbitMQChannel}, #{
|
||||||
|
supervisees => [RabbitMQConnection, RabbitMQChannel]
|
||||||
|
}}.
|
||||||
|
|
||||||
|
%% emqx_resource callback called to check the status of the resource
|
||||||
|
|
||||||
|
-spec on_get_status(resource_id(), term()) ->
|
||||||
|
{connected, resource_state()} | {disconnected, resource_state(), binary()}.
|
||||||
|
on_get_status(
|
||||||
|
_InstId,
|
||||||
|
#{
|
||||||
|
poolname := PoolName
|
||||||
|
} = State
|
||||||
|
) ->
|
||||||
|
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
||||||
|
Clients = [
|
||||||
|
begin
|
||||||
|
{ok, Client} = ecpool_worker:client(Worker),
|
||||||
|
Client
|
||||||
|
end
|
||||||
|
|| Worker <- Workers
|
||||||
|
],
|
||||||
|
CheckResults = [
|
||||||
|
check_worker(Client)
|
||||||
|
|| Client <- Clients
|
||||||
|
],
|
||||||
|
Connected = length(CheckResults) > 0 andalso lists:all(fun(R) -> R end, CheckResults),
|
||||||
|
case Connected of
|
||||||
|
true ->
|
||||||
|
{connected, State};
|
||||||
|
false ->
|
||||||
|
{disconnected, State, <<"not_connected">>}
|
||||||
|
end;
|
||||||
|
on_get_status(
|
||||||
|
_InstId,
|
||||||
|
State
|
||||||
|
) ->
|
||||||
|
{disconnect, State, <<"not_connected: no connection pool in state">>}.
|
||||||
|
|
||||||
|
check_worker({Channel, Connection}) ->
|
||||||
|
erlang:is_process_alive(Channel) andalso erlang:is_process_alive(Connection).
|
||||||
|
|
||||||
|
%% emqx_resource callback that is called when a non-batch query is received
|
||||||
|
|
||||||
|
-spec on_query(resource_id(), Request, resource_state()) -> query_result() when
|
||||||
|
Request :: {RequestType, Data},
|
||||||
|
RequestType :: send_message,
|
||||||
|
Data :: map().
|
||||||
|
on_query(
|
||||||
|
ResourceID,
|
||||||
|
{RequestType, Data},
|
||||||
|
#{
|
||||||
|
poolname := PoolName,
|
||||||
|
processed_payload_template := PayloadTemplate,
|
||||||
|
config := Config
|
||||||
|
} = State
|
||||||
|
) ->
|
||||||
|
?SLOG(debug, #{
|
||||||
|
msg => "RabbitMQ connector received query",
|
||||||
|
connector => ResourceID,
|
||||||
|
type => RequestType,
|
||||||
|
data => Data,
|
||||||
|
state => emqx_utils:redact(State)
|
||||||
|
}),
|
||||||
|
MessageData = format_data(PayloadTemplate, Data),
|
||||||
|
ecpool:pick_and_do(
|
||||||
|
PoolName,
|
||||||
|
{?MODULE, publish_messages, [Config, [MessageData]]},
|
||||||
|
no_handover
|
||||||
|
).
|
||||||
|
|
||||||
|
%% emqx_resource callback that is called when a batch query is received
|
||||||
|
|
||||||
|
-spec on_batch_query(resource_id(), BatchReq, resource_state()) -> query_result() when
|
||||||
|
BatchReq :: nonempty_list({'send_message', map()}).
|
||||||
|
on_batch_query(
|
||||||
|
ResourceID,
|
||||||
|
BatchReq,
|
||||||
|
State
|
||||||
|
) ->
|
||||||
|
?SLOG(debug, #{
|
||||||
|
msg => "RabbitMQ connector received batch query",
|
||||||
|
connector => ResourceID,
|
||||||
|
data => BatchReq,
|
||||||
|
state => emqx_utils:redact(State)
|
||||||
|
}),
|
||||||
|
%% Currently we only support batch requests with the send_message key
|
||||||
|
{Keys, MessagesToInsert} = lists:unzip(BatchReq),
|
||||||
|
ensure_keys_are_of_type_send_message(Keys),
|
||||||
|
%% Pick out the payload template
|
||||||
|
#{
|
||||||
|
processed_payload_template := PayloadTemplate,
|
||||||
|
poolname := PoolName,
|
||||||
|
config := Config
|
||||||
|
} = State,
|
||||||
|
%% Create batch payload
|
||||||
|
FormattedMessages = [
|
||||||
|
format_data(PayloadTemplate, Data)
|
||||||
|
|| Data <- MessagesToInsert
|
||||||
|
],
|
||||||
|
%% Publish the messages
|
||||||
|
ecpool:pick_and_do(
|
||||||
|
PoolName,
|
||||||
|
{?MODULE, publish_messages, [Config, FormattedMessages]},
|
||||||
|
no_handover
|
||||||
|
).
|
||||||
|
|
||||||
|
publish_messages(
|
||||||
|
{_Connection, Channel},
|
||||||
|
#{
|
||||||
|
delivery_mode := DeliveryMode,
|
||||||
|
routing_key := RoutingKey,
|
||||||
|
exchange := Exchange,
|
||||||
|
wait_for_publish_confirmations := WaitForPublishConfirmations,
|
||||||
|
publish_confirmation_timeout := PublishConfirmationTimeout
|
||||||
|
} = _Config,
|
||||||
|
Messages
|
||||||
|
) ->
|
||||||
|
MessageProperties = #'P_basic'{
|
||||||
|
headers = [],
|
||||||
|
delivery_mode = DeliveryMode
|
||||||
|
},
|
||||||
|
Method = #'basic.publish'{
|
||||||
|
exchange = Exchange,
|
||||||
|
routing_key = RoutingKey
|
||||||
|
},
|
||||||
|
_ = [
|
||||||
|
amqp_channel:cast(
|
||||||
|
Channel,
|
||||||
|
Method,
|
||||||
|
#amqp_msg{
|
||||||
|
payload = Message,
|
||||||
|
props = MessageProperties
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|| Message <- Messages
|
||||||
|
],
|
||||||
|
case WaitForPublishConfirmations of
|
||||||
|
true ->
|
||||||
|
case amqp_channel:wait_for_confirms(Channel, PublishConfirmationTimeout) of
|
||||||
|
true ->
|
||||||
|
ok;
|
||||||
|
false ->
|
||||||
|
erlang:error(
|
||||||
|
{recoverable_error,
|
||||||
|
<<"RabbitMQ: Got NACK when waiting for message acknowledgment.">>}
|
||||||
|
);
|
||||||
|
timeout ->
|
||||||
|
erlang:error(
|
||||||
|
{recoverable_error,
|
||||||
|
<<"RabbitMQ: Timeout when waiting for message acknowledgment.">>}
|
||||||
|
)
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
ensure_keys_are_of_type_send_message(Keys) ->
|
||||||
|
case lists:all(fun is_send_message_atom/1, Keys) of
|
||||||
|
true ->
|
||||||
|
ok;
|
||||||
|
false ->
|
||||||
|
erlang:error(
|
||||||
|
{unrecoverable_error,
|
||||||
|
<<"Unexpected type for batch message (Expected send_message)">>}
|
||||||
|
)
|
||||||
|
end.
|
||||||
|
|
||||||
|
is_send_message_atom(send_message) ->
|
||||||
|
true;
|
||||||
|
is_send_message_atom(_) ->
|
||||||
|
false.
|
||||||
|
|
||||||
|
format_data([], Msg) ->
|
||||||
|
emqx_utils_json:encode(Msg);
|
||||||
|
format_data(Tokens, Msg) ->
|
||||||
|
emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg).
|
|
@ -0,0 +1,371 @@
|
||||||
|
%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_rabbitmq_SUITE).
|
||||||
|
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||||
|
|
||||||
|
%% See comment in
|
||||||
|
%% lib-ee/emqx_ee_connector/test/ee_connector_rabbitmq_SUITE.erl for how to
|
||||||
|
%% run this without bringing up the whole CI infrastucture
|
||||||
|
|
||||||
|
rabbit_mq_host() ->
|
||||||
|
<<"rabbitmq">>.
|
||||||
|
|
||||||
|
rabbit_mq_port() ->
|
||||||
|
5672.
|
||||||
|
|
||||||
|
rabbit_mq_exchange() ->
|
||||||
|
<<"messages">>.
|
||||||
|
|
||||||
|
rabbit_mq_queue() ->
|
||||||
|
<<"test_queue">>.
|
||||||
|
|
||||||
|
rabbit_mq_routing_key() ->
|
||||||
|
<<"test_routing_key">>.
|
||||||
|
|
||||||
|
get_channel_connection(Config) ->
|
||||||
|
proplists:get_value(channel_connection, Config).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Common Test Setup, Teardown and Testcase List
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
% snabbkaffe:fix_ct_logging(),
|
||||||
|
case
|
||||||
|
emqx_common_test_helpers:is_tcp_server_available(
|
||||||
|
erlang:binary_to_list(rabbit_mq_host()), rabbit_mq_port()
|
||||||
|
)
|
||||||
|
of
|
||||||
|
true ->
|
||||||
|
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
||||||
|
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||||
|
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
||||||
|
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||||
|
{ok, _} = application:ensure_all_started(emqx_ee_connector),
|
||||||
|
{ok, _} = application:ensure_all_started(emqx_ee_bridge),
|
||||||
|
{ok, _} = application:ensure_all_started(amqp_client),
|
||||||
|
emqx_mgmt_api_test_util:init_suite(),
|
||||||
|
ChannelConnection = setup_rabbit_mq_exchange_and_queue(),
|
||||||
|
[{channel_connection, ChannelConnection} | Config];
|
||||||
|
false ->
|
||||||
|
case os:getenv("IS_CI") of
|
||||||
|
"yes" ->
|
||||||
|
throw(no_rabbitmq);
|
||||||
|
_ ->
|
||||||
|
{skip, no_rabbitmq}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
setup_rabbit_mq_exchange_and_queue() ->
|
||||||
|
%% Create an exachange and a queue
|
||||||
|
{ok, Connection} =
|
||||||
|
amqp_connection:start(#amqp_params_network{
|
||||||
|
host = erlang:binary_to_list(rabbit_mq_host()),
|
||||||
|
port = rabbit_mq_port()
|
||||||
|
}),
|
||||||
|
{ok, Channel} = amqp_connection:open_channel(Connection),
|
||||||
|
%% Create an exchange
|
||||||
|
#'exchange.declare_ok'{} =
|
||||||
|
amqp_channel:call(
|
||||||
|
Channel,
|
||||||
|
#'exchange.declare'{
|
||||||
|
exchange = rabbit_mq_exchange(),
|
||||||
|
type = <<"topic">>
|
||||||
|
}
|
||||||
|
),
|
||||||
|
%% Create a queue
|
||||||
|
#'queue.declare_ok'{} =
|
||||||
|
amqp_channel:call(
|
||||||
|
Channel,
|
||||||
|
#'queue.declare'{queue = rabbit_mq_queue()}
|
||||||
|
),
|
||||||
|
%% Bind the queue to the exchange
|
||||||
|
#'queue.bind_ok'{} =
|
||||||
|
amqp_channel:call(
|
||||||
|
Channel,
|
||||||
|
#'queue.bind'{
|
||||||
|
queue = rabbit_mq_queue(),
|
||||||
|
exchange = rabbit_mq_exchange(),
|
||||||
|
routing_key = rabbit_mq_routing_key()
|
||||||
|
}
|
||||||
|
),
|
||||||
|
#{
|
||||||
|
connection => Connection,
|
||||||
|
channel => Channel
|
||||||
|
}.
|
||||||
|
|
||||||
|
end_per_suite(Config) ->
|
||||||
|
#{
|
||||||
|
connection := Connection,
|
||||||
|
channel := Channel
|
||||||
|
} = get_channel_connection(Config),
|
||||||
|
emqx_mgmt_api_test_util:end_suite(),
|
||||||
|
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
||||||
|
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
||||||
|
_ = application:stop(emqx_connector),
|
||||||
|
_ = application:stop(emqx_ee_connector),
|
||||||
|
_ = application:stop(emqx_bridge),
|
||||||
|
%% Close the channel
|
||||||
|
ok = amqp_channel:close(Channel),
|
||||||
|
%% Close the connection
|
||||||
|
ok = amqp_connection:close(Connection).
|
||||||
|
|
||||||
|
init_per_testcase(_, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_, _Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
rabbitmq_config(Config) ->
|
||||||
|
%%SQL = maps:get(sql, Config, sql_insert_template_for_bridge()),
|
||||||
|
BatchSize = maps:get(batch_size, Config, 1),
|
||||||
|
BatchTime = maps:get(batch_time_ms, Config, 0),
|
||||||
|
Name = atom_to_binary(?MODULE),
|
||||||
|
Server = maps:get(server, Config, rabbit_mq_host()),
|
||||||
|
Port = maps:get(port, Config, rabbit_mq_port()),
|
||||||
|
Template = maps:get(payload_template, Config, <<"">>),
|
||||||
|
ConfigString =
|
||||||
|
io_lib:format(
|
||||||
|
"bridges.rabbitmq.~s {\n"
|
||||||
|
" enable = true\n"
|
||||||
|
" server = \"~s\"\n"
|
||||||
|
" port = ~p\n"
|
||||||
|
" username = \"guest\"\n"
|
||||||
|
" password = \"guest\"\n"
|
||||||
|
" routing_key = \"~s\"\n"
|
||||||
|
" exchange = \"~s\"\n"
|
||||||
|
" payload_template = \"~s\"\n"
|
||||||
|
" resource_opts = {\n"
|
||||||
|
" batch_size = ~b\n"
|
||||||
|
" batch_time = ~bms\n"
|
||||||
|
" }\n"
|
||||||
|
"}\n",
|
||||||
|
[
|
||||||
|
Name,
|
||||||
|
Server,
|
||||||
|
Port,
|
||||||
|
rabbit_mq_routing_key(),
|
||||||
|
rabbit_mq_exchange(),
|
||||||
|
Template,
|
||||||
|
BatchSize,
|
||||||
|
BatchTime
|
||||||
|
]
|
||||||
|
),
|
||||||
|
ct:pal(ConfigString),
|
||||||
|
parse_and_check(ConfigString, <<"rabbitmq">>, Name).
|
||||||
|
|
||||||
|
parse_and_check(ConfigString, BridgeType, Name) ->
|
||||||
|
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
||||||
|
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
||||||
|
#{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
|
||||||
|
RetConfig.
|
||||||
|
|
||||||
|
make_bridge(Config) ->
|
||||||
|
Type = <<"rabbitmq">>,
|
||||||
|
Name = atom_to_binary(?MODULE),
|
||||||
|
BridgeConfig = rabbitmq_config(Config),
|
||||||
|
{ok, _} = emqx_bridge:create(
|
||||||
|
Type,
|
||||||
|
Name,
|
||||||
|
BridgeConfig
|
||||||
|
),
|
||||||
|
emqx_bridge_resource:bridge_id(Type, Name).
|
||||||
|
|
||||||
|
delete_bridge() ->
|
||||||
|
Type = <<"rabbitmq">>,
|
||||||
|
Name = atom_to_binary(?MODULE),
|
||||||
|
{ok, _} = emqx_bridge:remove(Type, Name),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Test Cases
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_make_delete_bridge(_Config) ->
|
||||||
|
make_bridge(#{}),
|
||||||
|
%% Check that the new brige is in the list of bridges
|
||||||
|
Bridges = emqx_bridge:list(),
|
||||||
|
Name = atom_to_binary(?MODULE),
|
||||||
|
IsRightName =
|
||||||
|
fun
|
||||||
|
(#{name := BName}) when BName =:= Name ->
|
||||||
|
true;
|
||||||
|
(_) ->
|
||||||
|
false
|
||||||
|
end,
|
||||||
|
?assert(lists:any(IsRightName, Bridges)),
|
||||||
|
delete_bridge(),
|
||||||
|
BridgesAfterDelete = emqx_bridge:list(),
|
||||||
|
?assertNot(lists:any(IsRightName, BridgesAfterDelete)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_make_delete_bridge_non_existing_server(_Config) ->
|
||||||
|
make_bridge(#{server => <<"non_existing_server">>, port => 3174}),
|
||||||
|
%% Check that the new brige is in the list of bridges
|
||||||
|
Bridges = emqx_bridge:list(),
|
||||||
|
Name = atom_to_binary(?MODULE),
|
||||||
|
IsRightName =
|
||||||
|
fun
|
||||||
|
(#{name := BName}) when BName =:= Name ->
|
||||||
|
true;
|
||||||
|
(_) ->
|
||||||
|
false
|
||||||
|
end,
|
||||||
|
?assert(lists:any(IsRightName, Bridges)),
|
||||||
|
delete_bridge(),
|
||||||
|
BridgesAfterDelete = emqx_bridge:list(),
|
||||||
|
?assertNot(lists:any(IsRightName, BridgesAfterDelete)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_send_message_query(Config) ->
|
||||||
|
BridgeID = make_bridge(#{batch_size => 1}),
|
||||||
|
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
|
||||||
|
%% This will use the SQL template included in the bridge
|
||||||
|
emqx_bridge:send_message(BridgeID, Payload),
|
||||||
|
%% Check that the data got to the database
|
||||||
|
?assertEqual(Payload, receive_simple_test_message(Config)),
|
||||||
|
delete_bridge(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_send_message_query_with_template(Config) ->
|
||||||
|
BridgeID = make_bridge(#{
|
||||||
|
batch_size => 1,
|
||||||
|
payload_template =>
|
||||||
|
<<
|
||||||
|
"{"
|
||||||
|
" \\\"key\\\": ${key},"
|
||||||
|
" \\\"data\\\": \\\"${data}\\\","
|
||||||
|
" \\\"timestamp\\\": ${timestamp},"
|
||||||
|
" \\\"secret\\\": 42"
|
||||||
|
"}"
|
||||||
|
>>
|
||||||
|
}),
|
||||||
|
Payload = #{
|
||||||
|
<<"key">> => 7,
|
||||||
|
<<"data">> => <<"RabbitMQ">>,
|
||||||
|
<<"timestamp">> => 10000
|
||||||
|
},
|
||||||
|
emqx_bridge:send_message(BridgeID, Payload),
|
||||||
|
%% Check that the data got to the database
|
||||||
|
ExpectedResult = Payload#{
|
||||||
|
<<"secret">> => 42
|
||||||
|
},
|
||||||
|
?assertEqual(ExpectedResult, receive_simple_test_message(Config)),
|
||||||
|
delete_bridge(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_send_simple_batch(Config) ->
|
||||||
|
BridgeConf =
|
||||||
|
#{
|
||||||
|
batch_size => 100
|
||||||
|
},
|
||||||
|
BridgeID = make_bridge(BridgeConf),
|
||||||
|
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
|
||||||
|
emqx_bridge:send_message(BridgeID, Payload),
|
||||||
|
?assertEqual(Payload, receive_simple_test_message(Config)),
|
||||||
|
delete_bridge(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_send_simple_batch_with_template(Config) ->
|
||||||
|
BridgeConf =
|
||||||
|
#{
|
||||||
|
batch_size => 100,
|
||||||
|
payload_template =>
|
||||||
|
<<
|
||||||
|
"{"
|
||||||
|
" \\\"key\\\": ${key},"
|
||||||
|
" \\\"data\\\": \\\"${data}\\\","
|
||||||
|
" \\\"timestamp\\\": ${timestamp},"
|
||||||
|
" \\\"secret\\\": 42"
|
||||||
|
"}"
|
||||||
|
>>
|
||||||
|
},
|
||||||
|
BridgeID = make_bridge(BridgeConf),
|
||||||
|
Payload = #{
|
||||||
|
<<"key">> => 7,
|
||||||
|
<<"data">> => <<"RabbitMQ">>,
|
||||||
|
<<"timestamp">> => 10000
|
||||||
|
},
|
||||||
|
emqx_bridge:send_message(BridgeID, Payload),
|
||||||
|
ExpectedResult = Payload#{
|
||||||
|
<<"secret">> => 42
|
||||||
|
},
|
||||||
|
?assertEqual(ExpectedResult, receive_simple_test_message(Config)),
|
||||||
|
delete_bridge(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_heavy_batching(Config) ->
|
||||||
|
NumberOfMessages = 20000,
|
||||||
|
BridgeConf = #{
|
||||||
|
batch_size => 10173,
|
||||||
|
batch_time_ms => 50
|
||||||
|
},
|
||||||
|
BridgeID = make_bridge(BridgeConf),
|
||||||
|
SendMessage = fun(Key) ->
|
||||||
|
Payload = #{
|
||||||
|
<<"key">> => Key
|
||||||
|
},
|
||||||
|
emqx_bridge:send_message(BridgeID, Payload)
|
||||||
|
end,
|
||||||
|
[SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)],
|
||||||
|
AllMessages = lists:foldl(
|
||||||
|
fun(_, Acc) ->
|
||||||
|
Message = receive_simple_test_message(Config),
|
||||||
|
#{<<"key">> := Key} = Message,
|
||||||
|
Acc#{Key => true}
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
lists:seq(1, NumberOfMessages)
|
||||||
|
),
|
||||||
|
?assertEqual(NumberOfMessages, maps:size(AllMessages)),
|
||||||
|
delete_bridge(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
receive_simple_test_message(Config) ->
|
||||||
|
#{channel := Channel} = get_channel_connection(Config),
|
||||||
|
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
|
||||||
|
amqp_channel:call(
|
||||||
|
Channel,
|
||||||
|
#'basic.consume'{
|
||||||
|
queue = rabbit_mq_queue()
|
||||||
|
}
|
||||||
|
),
|
||||||
|
receive
|
||||||
|
%% This is the first message received
|
||||||
|
#'basic.consume_ok'{} ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
receive
|
||||||
|
{#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
|
||||||
|
%% Ack the message
|
||||||
|
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
|
||||||
|
%% Cancel the consumer
|
||||||
|
#'basic.cancel_ok'{consumer_tag = ConsumerTag} =
|
||||||
|
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
|
||||||
|
emqx_utils_json:decode(Content#amqp_msg.payload)
|
||||||
|
end.
|
||||||
|
|
||||||
|
rabbitmq_config() ->
|
||||||
|
Config =
|
||||||
|
#{
|
||||||
|
server => rabbit_mq_host(),
|
||||||
|
port => 5672,
|
||||||
|
exchange => rabbit_mq_exchange(),
|
||||||
|
routing_key => rabbit_mq_routing_key()
|
||||||
|
},
|
||||||
|
#{<<"config">> => Config}.
|
||||||
|
|
||||||
|
test_data() ->
|
||||||
|
#{<<"msg_field">> => <<"Hello">>}.
|
|
@ -0,0 +1,232 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_rabbitmq_connector_SUITE).
|
||||||
|
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||||
|
|
||||||
|
%% This test SUITE requires a running RabbitMQ instance. If you don't want to
|
||||||
|
%% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script
|
||||||
|
%% you can create a clickhouse instance with the following command.
|
||||||
|
%% 5672 is the default port for AMQP 0-9-1 and 15672 is the default port for
|
||||||
|
%% the HTTP managament interface.
|
||||||
|
%%
|
||||||
|
%% docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3.11-management
|
||||||
|
|
||||||
|
rabbit_mq_host() ->
|
||||||
|
<<"rabbitmq">>.
|
||||||
|
|
||||||
|
rabbit_mq_port() ->
|
||||||
|
5672.
|
||||||
|
|
||||||
|
rabbit_mq_exchange() ->
|
||||||
|
<<"test_exchange">>.
|
||||||
|
|
||||||
|
rabbit_mq_queue() ->
|
||||||
|
<<"test_queue">>.
|
||||||
|
|
||||||
|
rabbit_mq_routing_key() ->
|
||||||
|
<<"test_routing_key">>.
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
case
|
||||||
|
emqx_common_test_helpers:is_tcp_server_available(
|
||||||
|
erlang:binary_to_list(rabbit_mq_host()), rabbit_mq_port()
|
||||||
|
)
|
||||||
|
of
|
||||||
|
true ->
|
||||||
|
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||||
|
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
||||||
|
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||||
|
{ok, _} = application:ensure_all_started(emqx_ee_connector),
|
||||||
|
{ok, _} = application:ensure_all_started(amqp_client),
|
||||||
|
ChannelConnection = setup_rabbit_mq_exchange_and_queue(),
|
||||||
|
[{channel_connection, ChannelConnection} | Config];
|
||||||
|
false ->
|
||||||
|
case os:getenv("IS_CI") of
|
||||||
|
"yes" ->
|
||||||
|
throw(no_rabbitmq);
|
||||||
|
_ ->
|
||||||
|
{skip, no_rabbitmq}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
setup_rabbit_mq_exchange_and_queue() ->
|
||||||
|
%% Create an exachange and a queue
|
||||||
|
{ok, Connection} =
|
||||||
|
amqp_connection:start(#amqp_params_network{
|
||||||
|
host = erlang:binary_to_list(rabbit_mq_host()),
|
||||||
|
port = rabbit_mq_port()
|
||||||
|
}),
|
||||||
|
{ok, Channel} = amqp_connection:open_channel(Connection),
|
||||||
|
%% Create an exchange
|
||||||
|
#'exchange.declare_ok'{} =
|
||||||
|
amqp_channel:call(
|
||||||
|
Channel,
|
||||||
|
#'exchange.declare'{
|
||||||
|
exchange = rabbit_mq_exchange(),
|
||||||
|
type = <<"topic">>
|
||||||
|
}
|
||||||
|
),
|
||||||
|
%% Create a queue
|
||||||
|
#'queue.declare_ok'{} =
|
||||||
|
amqp_channel:call(
|
||||||
|
Channel,
|
||||||
|
#'queue.declare'{queue = rabbit_mq_queue()}
|
||||||
|
),
|
||||||
|
%% Bind the queue to the exchange
|
||||||
|
#'queue.bind_ok'{} =
|
||||||
|
amqp_channel:call(
|
||||||
|
Channel,
|
||||||
|
#'queue.bind'{
|
||||||
|
queue = rabbit_mq_queue(),
|
||||||
|
exchange = rabbit_mq_exchange(),
|
||||||
|
routing_key = rabbit_mq_routing_key()
|
||||||
|
}
|
||||||
|
),
|
||||||
|
#{
|
||||||
|
connection => Connection,
|
||||||
|
channel => Channel
|
||||||
|
}.
|
||||||
|
|
||||||
|
get_channel_connection(Config) ->
|
||||||
|
proplists:get_value(channel_connection, Config).
|
||||||
|
|
||||||
|
end_per_suite(Config) ->
|
||||||
|
#{
|
||||||
|
connection := Connection,
|
||||||
|
channel := Channel
|
||||||
|
} = get_channel_connection(Config),
|
||||||
|
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
||||||
|
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
||||||
|
_ = application:stop(emqx_connector),
|
||||||
|
%% Close the channel
|
||||||
|
ok = amqp_channel:close(Channel),
|
||||||
|
%% Close the connection
|
||||||
|
ok = amqp_connection:close(Connection).
|
||||||
|
|
||||||
|
% %%------------------------------------------------------------------------------
|
||||||
|
% %% Testcases
|
||||||
|
% %%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_lifecycle(Config) ->
|
||||||
|
perform_lifecycle_check(
|
||||||
|
erlang:atom_to_binary(?MODULE),
|
||||||
|
rabbitmq_config(),
|
||||||
|
Config
|
||||||
|
).
|
||||||
|
|
||||||
|
perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
|
||||||
|
#{
|
||||||
|
channel := Channel
|
||||||
|
} = get_channel_connection(TestConfig),
|
||||||
|
{ok, #{config := CheckedConfig}} =
|
||||||
|
emqx_resource:check_config(emqx_bridge_rabbitmq_connector, InitialConfig),
|
||||||
|
{ok, #{
|
||||||
|
state := #{poolname := PoolName} = State,
|
||||||
|
status := InitialStatus
|
||||||
|
}} =
|
||||||
|
emqx_resource:create_local(
|
||||||
|
ResourceID,
|
||||||
|
?CONNECTOR_RESOURCE_GROUP,
|
||||||
|
emqx_bridge_rabbitmq_connector,
|
||||||
|
CheckedConfig,
|
||||||
|
#{}
|
||||||
|
),
|
||||||
|
?assertEqual(InitialStatus, connected),
|
||||||
|
%% Instance should match the state and status of the just started resource
|
||||||
|
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
|
||||||
|
state := State,
|
||||||
|
status := InitialStatus
|
||||||
|
}} =
|
||||||
|
emqx_resource:get_instance(ResourceID),
|
||||||
|
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)),
|
||||||
|
%% Perform query as further check that the resource is working as expected
|
||||||
|
perform_query(ResourceID, Channel),
|
||||||
|
?assertEqual(ok, emqx_resource:stop(ResourceID)),
|
||||||
|
%% Resource will be listed still, but state will be changed and healthcheck will fail
|
||||||
|
%% as the worker no longer exists.
|
||||||
|
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
|
||||||
|
state := State,
|
||||||
|
status := StoppedStatus
|
||||||
|
}} = emqx_resource:get_instance(ResourceID),
|
||||||
|
?assertEqual(stopped, StoppedStatus),
|
||||||
|
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(ResourceID)),
|
||||||
|
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
|
||||||
|
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
|
||||||
|
% Can call stop/1 again on an already stopped instance
|
||||||
|
?assertEqual(ok, emqx_resource:stop(ResourceID)),
|
||||||
|
% Make sure it can be restarted and the healthchecks and queries work properly
|
||||||
|
?assertEqual(ok, emqx_resource:restart(ResourceID)),
|
||||||
|
% async restart, need to wait resource
|
||||||
|
timer:sleep(500),
|
||||||
|
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
|
||||||
|
emqx_resource:get_instance(ResourceID),
|
||||||
|
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)),
|
||||||
|
%% Check that everything is working again by performing a query
|
||||||
|
perform_query(ResourceID, Channel),
|
||||||
|
% Stop and remove the resource in one go.
|
||||||
|
?assertEqual(ok, emqx_resource:remove_local(ResourceID)),
|
||||||
|
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
|
||||||
|
% Should not even be able to get the resource data out of ets now unlike just stopping.
|
||||||
|
?assertEqual({error, not_found}, emqx_resource:get_instance(ResourceID)).
|
||||||
|
|
||||||
|
% %%------------------------------------------------------------------------------
|
||||||
|
% %% Helpers
|
||||||
|
% %%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
perform_query(PoolName, Channel) ->
|
||||||
|
%% Send message to queue:
|
||||||
|
ok = emqx_resource:query(PoolName, {query, test_data()}),
|
||||||
|
%% Get the message from queue:
|
||||||
|
ok = receive_simple_test_message(Channel).
|
||||||
|
|
||||||
|
receive_simple_test_message(Channel) ->
|
||||||
|
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
|
||||||
|
amqp_channel:call(
|
||||||
|
Channel,
|
||||||
|
#'basic.consume'{
|
||||||
|
queue = rabbit_mq_queue()
|
||||||
|
}
|
||||||
|
),
|
||||||
|
receive
|
||||||
|
%% This is the first message received
|
||||||
|
#'basic.consume_ok'{} ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
receive
|
||||||
|
{#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
|
||||||
|
Expected = test_data(),
|
||||||
|
?assertEqual(Expected, emqx_utils_json:decode(Content#amqp_msg.payload)),
|
||||||
|
%% Ack the message
|
||||||
|
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
|
||||||
|
%% Cancel the consumer
|
||||||
|
#'basic.cancel_ok'{consumer_tag = ConsumerTag} =
|
||||||
|
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
rabbitmq_config() ->
|
||||||
|
Config =
|
||||||
|
#{
|
||||||
|
server => rabbit_mq_host(),
|
||||||
|
port => 5672,
|
||||||
|
username => <<"guest">>,
|
||||||
|
password => <<"guest">>,
|
||||||
|
exchange => rabbit_mq_exchange(),
|
||||||
|
routing_key => rabbit_mq_routing_key()
|
||||||
|
},
|
||||||
|
#{<<"config">> => Config}.
|
||||||
|
|
||||||
|
test_data() ->
|
||||||
|
#{<<"msg_field">> => <<"Hello">>}.
|
|
@ -846,6 +846,8 @@ typename_to_spec("bucket_name()", _Mod) ->
|
||||||
#{type => string, example => <<"retainer">>};
|
#{type => string, example => <<"retainer">>};
|
||||||
typename_to_spec("json_binary()", _Mod) ->
|
typename_to_spec("json_binary()", _Mod) ->
|
||||||
#{type => string, example => <<"{\"a\": [1,true]}">>};
|
#{type => string, example => <<"{\"a\": [1,true]}">>};
|
||||||
|
typename_to_spec("port_number()", _Mod) ->
|
||||||
|
range("1..65535");
|
||||||
typename_to_spec(Name, Mod) ->
|
typename_to_spec(Name, Mod) ->
|
||||||
Spec = range(Name),
|
Spec = range(Name),
|
||||||
Spec1 = remote_module_type(Spec, Name, Mod),
|
Spec1 = remote_module_type(Spec, Name, Mod),
|
||||||
|
|
|
@ -134,6 +134,9 @@
|
||||||
%% when calling emqx_resource:stop/1
|
%% when calling emqx_resource:stop/1
|
||||||
-callback on_stop(resource_id(), resource_state()) -> term().
|
-callback on_stop(resource_id(), resource_state()) -> term().
|
||||||
|
|
||||||
|
%% when calling emqx_resource:get_callback_mode/1
|
||||||
|
-callback callback_mode() -> callback_mode().
|
||||||
|
|
||||||
%% when calling emqx_resource:query/3
|
%% when calling emqx_resource:query/3
|
||||||
-callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result().
|
-callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result().
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
A RabbitMQ bridge has been added. This bridge makes it possible to forward messages from EMQX to RabbitMQ.
|
|
@ -13,7 +13,8 @@
|
||||||
emqx_bridge_opents,
|
emqx_bridge_opents,
|
||||||
emqx_bridge_pulsar,
|
emqx_bridge_pulsar,
|
||||||
emqx_bridge_sqlserver,
|
emqx_bridge_sqlserver,
|
||||||
emqx_bridge_rocketmq
|
emqx_bridge_rocketmq,
|
||||||
|
emqx_bridge_rabbitmq
|
||||||
]},
|
]},
|
||||||
{env, []},
|
{env, []},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
|
|
|
@ -39,7 +39,8 @@ api_schemas(Method) ->
|
||||||
ref(emqx_bridge_opents, Method),
|
ref(emqx_bridge_opents, Method),
|
||||||
ref(emqx_bridge_pulsar, Method ++ "_producer"),
|
ref(emqx_bridge_pulsar, Method ++ "_producer"),
|
||||||
ref(emqx_bridge_oracle, Method),
|
ref(emqx_bridge_oracle, Method),
|
||||||
ref(emqx_bridge_iotdb, Method)
|
ref(emqx_bridge_iotdb, Method),
|
||||||
|
ref(emqx_bridge_rabbitmq, Method)
|
||||||
].
|
].
|
||||||
|
|
||||||
schema_modules() ->
|
schema_modules() ->
|
||||||
|
@ -63,7 +64,8 @@ schema_modules() ->
|
||||||
emqx_bridge_opents,
|
emqx_bridge_opents,
|
||||||
emqx_bridge_pulsar,
|
emqx_bridge_pulsar,
|
||||||
emqx_bridge_oracle,
|
emqx_bridge_oracle,
|
||||||
emqx_bridge_iotdb
|
emqx_bridge_iotdb,
|
||||||
|
emqx_bridge_rabbitmq
|
||||||
].
|
].
|
||||||
|
|
||||||
examples(Method) ->
|
examples(Method) ->
|
||||||
|
@ -106,7 +108,8 @@ resource_type(sqlserver) -> emqx_bridge_sqlserver_connector;
|
||||||
resource_type(opents) -> emqx_bridge_opents_connector;
|
resource_type(opents) -> emqx_bridge_opents_connector;
|
||||||
resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer;
|
resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer;
|
||||||
resource_type(oracle) -> emqx_oracle;
|
resource_type(oracle) -> emqx_oracle;
|
||||||
resource_type(iotdb) -> emqx_bridge_iotdb_impl.
|
resource_type(iotdb) -> emqx_bridge_iotdb_impl;
|
||||||
|
resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector.
|
||||||
|
|
||||||
fields(bridges) ->
|
fields(bridges) ->
|
||||||
[
|
[
|
||||||
|
@ -192,7 +195,7 @@ fields(bridges) ->
|
||||||
)}
|
)}
|
||||||
] ++ kafka_structs() ++ pulsar_structs() ++ mongodb_structs() ++ influxdb_structs() ++
|
] ++ kafka_structs() ++ pulsar_structs() ++ mongodb_structs() ++ influxdb_structs() ++
|
||||||
redis_structs() ++
|
redis_structs() ++
|
||||||
pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs().
|
pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs().
|
||||||
|
|
||||||
mongodb_structs() ->
|
mongodb_structs() ->
|
||||||
[
|
[
|
||||||
|
@ -323,3 +326,15 @@ kafka_producer_converter(Map, Opts) ->
|
||||||
end,
|
end,
|
||||||
Map
|
Map
|
||||||
).
|
).
|
||||||
|
|
||||||
|
rabbitmq_structs() ->
|
||||||
|
[
|
||||||
|
{rabbitmq,
|
||||||
|
mk(
|
||||||
|
hoconsc:map(name, ref(emqx_bridge_rabbitmq, "config")),
|
||||||
|
#{
|
||||||
|
desc => <<"RabbitMQ Bridge Config">>,
|
||||||
|
required => false
|
||||||
|
}
|
||||||
|
)}
|
||||||
|
].
|
||||||
|
|
23
mix.exs
23
mix.exs
|
@ -174,7 +174,8 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
:emqx_bridge_sqlserver,
|
:emqx_bridge_sqlserver,
|
||||||
:emqx_bridge_pulsar,
|
:emqx_bridge_pulsar,
|
||||||
:emqx_oracle,
|
:emqx_oracle,
|
||||||
:emqx_bridge_oracle
|
:emqx_bridge_oracle,
|
||||||
|
:emqx_bridge_rabbitmq
|
||||||
])
|
])
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -189,7 +190,22 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
{:snappyer, "1.2.8", override: true},
|
{:snappyer, "1.2.8", override: true},
|
||||||
{:crc32cer, "0.1.8", override: true},
|
{:crc32cer, "0.1.8", override: true},
|
||||||
{:supervisor3, "1.1.12", override: true},
|
{:supervisor3, "1.1.12", override: true},
|
||||||
{:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true}
|
{:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true},
|
||||||
|
# The following two are dependencies of rabbit_common. They are needed here to
|
||||||
|
# make mix not complain about conflicting versions
|
||||||
|
{:thoas, github: "emqx/thoas", tag: "v1.0.0", override: true},
|
||||||
|
{:credentials_obfuscation,
|
||||||
|
github: "emqx/credentials-obfuscation", tag: "v3.2.0", override: true},
|
||||||
|
{:rabbit_common,
|
||||||
|
github: "emqx/rabbitmq-server",
|
||||||
|
tag: "v3.11.13-emqx",
|
||||||
|
sparse: "deps/rabbit_common",
|
||||||
|
override: true},
|
||||||
|
{:amqp_client,
|
||||||
|
github: "emqx/rabbitmq-server",
|
||||||
|
tag: "v3.11.13-emqx",
|
||||||
|
sparse: "deps/amqp_client",
|
||||||
|
override: true}
|
||||||
]
|
]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -321,7 +337,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
emqx_plugin_libs: :load,
|
emqx_plugin_libs: :load,
|
||||||
esasl: :load,
|
esasl: :load,
|
||||||
observer_cli: :permanent,
|
observer_cli: :permanent,
|
||||||
tools: :load,
|
tools: :permanent,
|
||||||
covertool: :load,
|
covertool: :load,
|
||||||
system_monitor: :load,
|
system_monitor: :load,
|
||||||
emqx_utils: :load,
|
emqx_utils: :load,
|
||||||
|
@ -385,6 +401,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
emqx_bridge_sqlserver: :permanent,
|
emqx_bridge_sqlserver: :permanent,
|
||||||
emqx_oracle: :permanent,
|
emqx_oracle: :permanent,
|
||||||
emqx_bridge_oracle: :permanent,
|
emqx_bridge_oracle: :permanent,
|
||||||
|
emqx_bridge_rabbitmq: :permanent,
|
||||||
emqx_ee_schema_registry: :permanent
|
emqx_ee_schema_registry: :permanent
|
||||||
],
|
],
|
||||||
else: []
|
else: []
|
||||||
|
|
|
@ -98,6 +98,7 @@ is_community_umbrella_app("apps/emqx_bridge_timescale") -> false;
|
||||||
is_community_umbrella_app("apps/emqx_bridge_oracle") -> false;
|
is_community_umbrella_app("apps/emqx_bridge_oracle") -> false;
|
||||||
is_community_umbrella_app("apps/emqx_bridge_sqlserver") -> false;
|
is_community_umbrella_app("apps/emqx_bridge_sqlserver") -> false;
|
||||||
is_community_umbrella_app("apps/emqx_oracle") -> false;
|
is_community_umbrella_app("apps/emqx_oracle") -> false;
|
||||||
|
is_community_umbrella_app("apps/emqx_bridge_rabbitmq") -> false;
|
||||||
is_community_umbrella_app(_) -> true.
|
is_community_umbrella_app(_) -> true.
|
||||||
|
|
||||||
is_jq_supported() ->
|
is_jq_supported() ->
|
||||||
|
@ -404,7 +405,7 @@ relx_apps(ReleaseType, Edition) ->
|
||||||
{emqx_plugin_libs, load},
|
{emqx_plugin_libs, load},
|
||||||
{esasl, load},
|
{esasl, load},
|
||||||
observer_cli,
|
observer_cli,
|
||||||
{tools, load},
|
tools,
|
||||||
{covertool, load},
|
{covertool, load},
|
||||||
% started by emqx_machine
|
% started by emqx_machine
|
||||||
{system_monitor, load},
|
{system_monitor, load},
|
||||||
|
@ -476,6 +477,7 @@ relx_apps_per_edition(ee) ->
|
||||||
emqx_bridge_sqlserver,
|
emqx_bridge_sqlserver,
|
||||||
emqx_oracle,
|
emqx_oracle,
|
||||||
emqx_bridge_oracle,
|
emqx_bridge_oracle,
|
||||||
|
emqx_bridge_rabbitmq,
|
||||||
emqx_ee_schema_registry
|
emqx_ee_schema_registry
|
||||||
];
|
];
|
||||||
relx_apps_per_edition(ce) ->
|
relx_apps_per_edition(ce) ->
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
emqx_bridge_rabbitmq {
|
||||||
|
|
||||||
|
local_topic.desc:
|
||||||
|
"""The MQTT topic filter to be forwarded to RabbitMQ. All MQTT 'PUBLISH' messages with the topic matching the local_topic will be forwarded.
|
||||||
|
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that match local_topic will be forwarded."""
|
||||||
|
|
||||||
|
local_topic.label:
|
||||||
|
"""Local Topic"""
|
||||||
|
|
||||||
|
config_enable.desc:
|
||||||
|
"""Enable or disable this bridge"""
|
||||||
|
|
||||||
|
config_enable.label:
|
||||||
|
"""Enable or Disable Bridge"""
|
||||||
|
|
||||||
|
desc_config.desc:
|
||||||
|
"""Configuration for a RabbitMQ bridge."""
|
||||||
|
|
||||||
|
desc_config.label:
|
||||||
|
"""RabbitMQ Bridge Configuration"""
|
||||||
|
|
||||||
|
desc_type.desc:
|
||||||
|
"""The Bridge Type"""
|
||||||
|
|
||||||
|
desc_type.label:
|
||||||
|
"""Bridge Type"""
|
||||||
|
|
||||||
|
desc_name.desc:
|
||||||
|
"""Bridge name."""
|
||||||
|
|
||||||
|
desc_name.label:
|
||||||
|
"""Bridge Name"""
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,100 @@
|
||||||
|
|
||||||
|
emqx_bridge_rabbitmq_connector {
|
||||||
|
|
||||||
|
server.desc:
|
||||||
|
"""The RabbitMQ server address that you want to connect to (for example, localhost)."""
|
||||||
|
|
||||||
|
server.label:
|
||||||
|
"""Server"""
|
||||||
|
|
||||||
|
port.desc:
|
||||||
|
"""The port number on which the RabbitMQ server is listening (default is 5672)."""
|
||||||
|
|
||||||
|
port.label:
|
||||||
|
"""Port"""
|
||||||
|
|
||||||
|
username.desc:
|
||||||
|
"""The username used to authenticate with the RabbitMQ server."""
|
||||||
|
|
||||||
|
username.label:
|
||||||
|
"""Username"""
|
||||||
|
|
||||||
|
password.desc:
|
||||||
|
"""The password used to authenticate with the RabbitMQ server."""
|
||||||
|
|
||||||
|
password.label:
|
||||||
|
"""Password"""
|
||||||
|
|
||||||
|
pool_size.desc:
|
||||||
|
"""The size of the connection pool."""
|
||||||
|
|
||||||
|
pool_size.label:
|
||||||
|
"""Pool Size"""
|
||||||
|
|
||||||
|
timeout.desc:
|
||||||
|
"""The timeout for waiting on the connection to be established."""
|
||||||
|
|
||||||
|
timeout.label:
|
||||||
|
"""Connection Timeout"""
|
||||||
|
|
||||||
|
virtual_host.desc:
|
||||||
|
"""The virtual host to use when connecting to the RabbitMQ server."""
|
||||||
|
|
||||||
|
virtual_host.label:
|
||||||
|
"""Virtual Host"""
|
||||||
|
|
||||||
|
heartbeat.desc:
|
||||||
|
"""The interval for sending heartbeat messages to the RabbitMQ server."""
|
||||||
|
|
||||||
|
heartbeat.label:
|
||||||
|
"""Heartbeat"""
|
||||||
|
|
||||||
|
auto_reconnect.desc:
|
||||||
|
"""The interval for attempting to reconnect to the RabbitMQ server if the connection is lost."""
|
||||||
|
|
||||||
|
auto_reconnect.label:
|
||||||
|
"""Auto Reconnect"""
|
||||||
|
|
||||||
|
exchange.desc:
|
||||||
|
"""The name of the RabbitMQ exchange where the messages will be sent."""
|
||||||
|
|
||||||
|
exchange.label:
|
||||||
|
"""Exchange"""
|
||||||
|
|
||||||
|
exchange_type.desc:
|
||||||
|
"""The type of the RabbitMQ exchange (direct, fanout, or topic)."""
|
||||||
|
|
||||||
|
exchange_type.label:
|
||||||
|
"""Exchange Type"""
|
||||||
|
|
||||||
|
routing_key.desc:
|
||||||
|
"""The routing key used to route messages to the correct queue in the RabbitMQ exchange."""
|
||||||
|
|
||||||
|
routing_key.label:
|
||||||
|
"""Routing Key"""
|
||||||
|
|
||||||
|
delivery_mode.desc:
|
||||||
|
"""The delivery mode for messages published to RabbitMQ. Delivery mode non_persistent (1) is suitable for messages that don't require persistence across RabbitMQ restarts, whereas delivery mode persistent (2) is designed for messages that must survive RabbitMQ restarts."""
|
||||||
|
|
||||||
|
delivery_mode.label:
|
||||||
|
"""Message Delivery Mode"""
|
||||||
|
|
||||||
|
payload_template.desc:
|
||||||
|
"""The template for formatting the payload of the message before sending it to RabbitMQ. Template placeholders, such as ${field1.sub_field}, will be substituted with the respective field's value. When left empty, the entire input message will be used as the payload, formatted as a JSON text. This behavior is equivalent to specifying ${.} as the payload template."""
|
||||||
|
|
||||||
|
payload_template.label:
|
||||||
|
"""Payload Template"""
|
||||||
|
|
||||||
|
publish_confirmation_timeout.desc:
|
||||||
|
"""The timeout for waiting for RabbitMQ to confirm message publication when using publisher confirms."""
|
||||||
|
|
||||||
|
publish_confirmation_timeout.label:
|
||||||
|
"""Publish Confirmation Timeout"""
|
||||||
|
|
||||||
|
wait_for_publish_confirmations.desc:
|
||||||
|
"""A boolean value that indicates whether to wait for RabbitMQ to confirm message publication when using publisher confirms."""
|
||||||
|
|
||||||
|
wait_for_publish_confirmations.label:
|
||||||
|
"""Wait for Publish Confirmations"""
|
||||||
|
|
||||||
|
}
|
|
@ -36,6 +36,9 @@ rebar_deps =
|
||||||
|
|
||||||
{:git, _, {:ref, ref}} ->
|
{:git, _, {:ref, ref}} ->
|
||||||
to_string(ref)
|
to_string(ref)
|
||||||
|
|
||||||
|
{:git_subdir, _, {:ref, ref}, _} ->
|
||||||
|
to_string(ref)
|
||||||
end
|
end
|
||||||
|
|
||||||
{name, ref}
|
{name, ref}
|
||||||
|
|
|
@ -200,6 +200,9 @@ for dep in ${CT_DEPS}; do
|
||||||
iotdb)
|
iotdb)
|
||||||
FILES+=( '.ci/docker-compose-file/docker-compose-iotdb.yaml' )
|
FILES+=( '.ci/docker-compose-file/docker-compose-iotdb.yaml' )
|
||||||
;;
|
;;
|
||||||
|
rabbitmq)
|
||||||
|
FILES+=( '.ci/docker-compose-file/docker-compose-rabbitmq.yaml' )
|
||||||
|
;;
|
||||||
*)
|
*)
|
||||||
echo "unknown_ct_dependency $dep"
|
echo "unknown_ct_dependency $dep"
|
||||||
exit 1
|
exit 1
|
||||||
|
|
Loading…
Reference in New Issue