Merge pull request #12487 from zhongwencool/rabbitmq-source-test
feat: don't publish mqtt message in rabbitmq's source
This commit is contained in:
commit
2ac4bde102
|
@ -372,21 +372,7 @@ preproc_parameter(#{config_root := actions, parameters := Parameter}) ->
|
||||||
config_root => actions
|
config_root => actions
|
||||||
};
|
};
|
||||||
preproc_parameter(#{config_root := sources, parameters := Parameter, hookpoints := Hooks}) ->
|
preproc_parameter(#{config_root := sources, parameters := Parameter, hookpoints := Hooks}) ->
|
||||||
#{
|
Parameter#{hookpoints => Hooks, config_root => sources}.
|
||||||
payload_template := PayloadTmpl,
|
|
||||||
qos := QosTmpl,
|
|
||||||
topic := TopicTmpl
|
|
||||||
} = Parameter,
|
|
||||||
Parameter#{
|
|
||||||
payload_template => emqx_placeholder:preproc_tmpl(PayloadTmpl),
|
|
||||||
qos => preproc_qos(QosTmpl),
|
|
||||||
topic => emqx_placeholder:preproc_tmpl(TopicTmpl),
|
|
||||||
hookpoints => Hooks,
|
|
||||||
config_root => sources
|
|
||||||
}.
|
|
||||||
|
|
||||||
preproc_qos(Qos) when is_integer(Qos) -> Qos;
|
|
||||||
preproc_qos(Qos) -> emqx_placeholder:preproc_tmpl(Qos).
|
|
||||||
|
|
||||||
delivery_mode(non_persistent) -> 1;
|
delivery_mode(non_persistent) -> 1;
|
||||||
delivery_mode(persistent) -> 2.
|
delivery_mode(persistent) -> 2.
|
||||||
|
|
|
@ -101,6 +101,7 @@ fields(action_parameters) ->
|
||||||
hoconsc:mk(
|
hoconsc:mk(
|
||||||
binary(),
|
binary(),
|
||||||
#{
|
#{
|
||||||
|
default => <<"">>,
|
||||||
desc => ?DESC(?CONNECTOR_SCHEMA, "payload_template")
|
desc => ?DESC(?CONNECTOR_SCHEMA, "payload_template")
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
|
@ -126,39 +127,6 @@ fields(subscriber_source) ->
|
||||||
);
|
);
|
||||||
fields(source_parameters) ->
|
fields(source_parameters) ->
|
||||||
[
|
[
|
||||||
{wait_for_publish_confirmations,
|
|
||||||
hoconsc:mk(
|
|
||||||
boolean(),
|
|
||||||
#{
|
|
||||||
default => true,
|
|
||||||
desc => ?DESC(?CONNECTOR_SCHEMA, "wait_for_publish_confirmations")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{topic,
|
|
||||||
?HOCON(
|
|
||||||
binary(),
|
|
||||||
#{
|
|
||||||
required => true,
|
|
||||||
validator => fun emqx_schema:non_empty_string/1,
|
|
||||||
desc => ?DESC("source_topic")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{qos,
|
|
||||||
?HOCON(
|
|
||||||
?UNION([emqx_schema:qos(), binary()]),
|
|
||||||
#{
|
|
||||||
default => 0,
|
|
||||||
desc => ?DESC("source_qos")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{payload_template,
|
|
||||||
?HOCON(
|
|
||||||
binary(),
|
|
||||||
#{
|
|
||||||
required => false,
|
|
||||||
desc => ?DESC("source_payload_template")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{queue,
|
{queue,
|
||||||
?HOCON(
|
?HOCON(
|
||||||
binary(),
|
binary(),
|
||||||
|
@ -167,6 +135,14 @@ fields(source_parameters) ->
|
||||||
desc => ?DESC("source_queue")
|
desc => ?DESC("source_queue")
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
{wait_for_publish_confirmations,
|
||||||
|
hoconsc:mk(
|
||||||
|
boolean(),
|
||||||
|
#{
|
||||||
|
default => true,
|
||||||
|
desc => ?DESC(?CONNECTOR_SCHEMA, "wait_for_publish_confirmations")
|
||||||
|
}
|
||||||
|
)},
|
||||||
{no_ack,
|
{no_ack,
|
||||||
?HOCON(
|
?HOCON(
|
||||||
boolean(),
|
boolean(),
|
||||||
|
@ -260,9 +236,6 @@ source_examples(Method) ->
|
||||||
_ConnectorType = rabbitmq,
|
_ConnectorType = rabbitmq,
|
||||||
#{
|
#{
|
||||||
parameters => #{
|
parameters => #{
|
||||||
topic => <<"${payload.mqtt_topic}">>,
|
|
||||||
qos => <<"${payload.mqtt_qos}">>,
|
|
||||||
payload_template => <<"${payload.mqtt_payload}">>,
|
|
||||||
queue => <<"test_queue">>,
|
queue => <<"test_queue">>,
|
||||||
no_ack => true
|
no_ack => true
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,28 +37,13 @@ handle_cast(_Request, State) ->
|
||||||
handle_info(
|
handle_info(
|
||||||
{#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{
|
{#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{
|
||||||
payload = Payload,
|
payload = Payload,
|
||||||
props = #'P_basic'{message_id = MessageId, headers = Headers}
|
props = PBasic
|
||||||
}},
|
}},
|
||||||
{Channel, InstanceId, Params} = State
|
{Channel, InstanceId, Params} = State
|
||||||
) ->
|
) ->
|
||||||
#{
|
Message = to_map(PBasic, Payload),
|
||||||
hookpoints := Hooks,
|
#{hookpoints := Hooks, no_ack := NoAck} = Params,
|
||||||
payload_template := PayloadTmpl,
|
lists:foreach(fun(Hook) -> emqx_hooks:run(Hook, [Message]) end, Hooks),
|
||||||
qos := QoSTmpl,
|
|
||||||
topic := TopicTmpl,
|
|
||||||
no_ack := NoAck
|
|
||||||
} = Params,
|
|
||||||
MQTTMsg = emqx_message:make(
|
|
||||||
make_message_id(MessageId),
|
|
||||||
InstanceId,
|
|
||||||
render(Payload, QoSTmpl),
|
|
||||||
render(Payload, TopicTmpl),
|
|
||||||
render(Payload, PayloadTmpl),
|
|
||||||
#{},
|
|
||||||
make_headers(Headers)
|
|
||||||
),
|
|
||||||
_ = emqx:publish(MQTTMsg),
|
|
||||||
lists:foreach(fun(Hook) -> emqx_hooks:run(Hook, [MQTTMsg]) end, Hooks),
|
|
||||||
(NoAck =:= false) andalso
|
(NoAck =:= false) andalso
|
||||||
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
|
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
|
||||||
emqx_resource_metrics:received_inc(InstanceId),
|
emqx_resource_metrics:received_inc(InstanceId),
|
||||||
|
@ -68,18 +53,52 @@ handle_info(#'basic.cancel_ok'{}, State) ->
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
to_map(PBasic, Payload) ->
|
||||||
|
#'P_basic'{
|
||||||
|
content_type = ContentType,
|
||||||
|
content_encoding = ContentEncoding,
|
||||||
|
headers = Headers,
|
||||||
|
delivery_mode = DeliveryMode,
|
||||||
|
priority = Priority,
|
||||||
|
correlation_id = CorrelationId,
|
||||||
|
reply_to = ReplyTo,
|
||||||
|
expiration = Expiration,
|
||||||
|
message_id = MessageId,
|
||||||
|
timestamp = Timestamp,
|
||||||
|
type = Type,
|
||||||
|
user_id = UserId,
|
||||||
|
app_id = AppId,
|
||||||
|
cluster_id = ClusterId
|
||||||
|
} = PBasic,
|
||||||
|
Message = #{
|
||||||
|
<<"payload">> => make_payload(Payload),
|
||||||
|
<<"content_type">> => ContentType,
|
||||||
|
<<"content_encoding">> => ContentEncoding,
|
||||||
|
<<"headers">> => make_headers(Headers),
|
||||||
|
<<"delivery_mode">> => DeliveryMode,
|
||||||
|
<<"priority">> => Priority,
|
||||||
|
<<"correlation_id">> => CorrelationId,
|
||||||
|
<<"reply_to">> => ReplyTo,
|
||||||
|
<<"expiration">> => Expiration,
|
||||||
|
<<"message_id">> => MessageId,
|
||||||
|
<<"timestamp">> => Timestamp,
|
||||||
|
<<"type">> => Type,
|
||||||
|
<<"user_id">> => UserId,
|
||||||
|
<<"app_id">> => AppId,
|
||||||
|
<<"cluster_id">> => ClusterId
|
||||||
|
},
|
||||||
|
maps:filtermap(fun(_K, V) -> V =/= undefined andalso V =/= <<"undefined">> end, Message).
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
render(_Message, QoS) when is_integer(QoS) -> QoS;
|
|
||||||
render(Message, PayloadTmpl) ->
|
|
||||||
Opts = #{return => full_binary},
|
|
||||||
emqx_placeholder:proc_tmpl(PayloadTmpl, Message, Opts).
|
|
||||||
|
|
||||||
make_message_id(undefined) -> emqx_guid:gen();
|
|
||||||
make_message_id(Id) -> Id.
|
|
||||||
|
|
||||||
make_headers(undefined) ->
|
make_headers(undefined) ->
|
||||||
#{};
|
undefined;
|
||||||
make_headers(Headers) when is_list(Headers) ->
|
make_headers(Headers) when is_list(Headers) ->
|
||||||
maps:from_list([{Key, Value} || {Key, _Type, Value} <- Headers]).
|
maps:from_list([{Key, Value} || {Key, _Type, Value} <- Headers]).
|
||||||
|
|
||||||
|
make_payload(Payload) ->
|
||||||
|
case emqx_utils_json:safe_decode(Payload, [return_maps]) of
|
||||||
|
{ok, Map} -> Map;
|
||||||
|
{error, _} -> Payload
|
||||||
|
end.
|
||||||
|
|
|
@ -1,423 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_bridge_rabbitmq_SUITE).
|
|
||||||
|
|
||||||
-compile(nowarn_export_all).
|
|
||||||
-compile(export_all).
|
|
||||||
|
|
||||||
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
|
||||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
|
||||||
|
|
||||||
%% See comment in
|
|
||||||
%% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to
|
|
||||||
%% run this without bringing up the whole CI infrastructure
|
|
||||||
-define(TYPE, <<"rabbitmq">>).
|
|
||||||
|
|
||||||
rabbit_mq_host() ->
|
|
||||||
<<"rabbitmq">>.
|
|
||||||
|
|
||||||
rabbit_mq_port() ->
|
|
||||||
5672.
|
|
||||||
|
|
||||||
rabbit_mq_exchange() ->
|
|
||||||
<<"messages">>.
|
|
||||||
|
|
||||||
rabbit_mq_queue() ->
|
|
||||||
<<"test_queue">>.
|
|
||||||
|
|
||||||
rabbit_mq_routing_key() ->
|
|
||||||
<<"test_routing_key">>.
|
|
||||||
|
|
||||||
get_channel_connection(Config) ->
|
|
||||||
proplists:get_value(channel_connection, Config).
|
|
||||||
|
|
||||||
get_rabbitmq(Config) ->
|
|
||||||
proplists:get_value(rabbitmq, Config).
|
|
||||||
|
|
||||||
get_tls(Config) ->
|
|
||||||
proplists:get_value(tls, Config).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% Common Test Setup, Tear down and Testcase List
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
all() ->
|
|
||||||
[
|
|
||||||
{group, tcp},
|
|
||||||
{group, tls}
|
|
||||||
].
|
|
||||||
|
|
||||||
groups() ->
|
|
||||||
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
|
||||||
[
|
|
||||||
{tcp, AllTCs},
|
|
||||||
{tls, AllTCs}
|
|
||||||
].
|
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
|
||||||
Config.
|
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
init_per_group(tcp, Config) ->
|
|
||||||
RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"),
|
|
||||||
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")),
|
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
|
|
||||||
true ->
|
|
||||||
Config1 = common_init_per_group(#{
|
|
||||||
host => RabbitMQHost, port => RabbitMQPort, tls => false
|
|
||||||
}),
|
|
||||||
Config1 ++ Config;
|
|
||||||
false ->
|
|
||||||
case os:getenv("IS_CI") of
|
|
||||||
"yes" ->
|
|
||||||
throw(no_rabbitmq);
|
|
||||||
_ ->
|
|
||||||
{skip, no_rabbitmq}
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
init_per_group(tls, Config) ->
|
|
||||||
RabbitMQHost = os:getenv("RABBITMQ_TLS_HOST", "rabbitmq"),
|
|
||||||
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_TLS_PORT", "5671")),
|
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
|
|
||||||
true ->
|
|
||||||
Config1 = common_init_per_group(#{
|
|
||||||
host => RabbitMQHost, port => RabbitMQPort, tls => true
|
|
||||||
}),
|
|
||||||
Config1 ++ Config;
|
|
||||||
false ->
|
|
||||||
case os:getenv("IS_CI") of
|
|
||||||
"yes" ->
|
|
||||||
throw(no_rabbitmq);
|
|
||||||
_ ->
|
|
||||||
{skip, no_rabbitmq}
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
init_per_group(_Group, Config) ->
|
|
||||||
Config.
|
|
||||||
|
|
||||||
common_init_per_group(Opts) ->
|
|
||||||
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
|
||||||
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
|
||||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
|
||||||
{ok, _} = application:ensure_all_started(amqp_client),
|
|
||||||
emqx_mgmt_api_test_util:init_suite(),
|
|
||||||
#{host := Host, port := Port, tls := UseTLS} = Opts,
|
|
||||||
ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS),
|
|
||||||
[
|
|
||||||
{channel_connection, ChannelConnection},
|
|
||||||
{rabbitmq, #{server => Host, port => Port}},
|
|
||||||
{tls, UseTLS}
|
|
||||||
].
|
|
||||||
|
|
||||||
setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS) ->
|
|
||||||
SSLOptions =
|
|
||||||
case UseTLS of
|
|
||||||
false -> none;
|
|
||||||
true -> emqx_tls_lib:to_client_opts(ssl_options(UseTLS))
|
|
||||||
end,
|
|
||||||
%% Create an exchange and a queue
|
|
||||||
{ok, Connection} =
|
|
||||||
amqp_connection:start(#amqp_params_network{
|
|
||||||
host = Host,
|
|
||||||
port = Port,
|
|
||||||
ssl_options = SSLOptions
|
|
||||||
}),
|
|
||||||
{ok, Channel} = amqp_connection:open_channel(Connection),
|
|
||||||
%% Create an exchange
|
|
||||||
#'exchange.declare_ok'{} =
|
|
||||||
amqp_channel:call(
|
|
||||||
Channel,
|
|
||||||
#'exchange.declare'{
|
|
||||||
exchange = rabbit_mq_exchange(),
|
|
||||||
type = <<"topic">>
|
|
||||||
}
|
|
||||||
),
|
|
||||||
%% Create a queue
|
|
||||||
#'queue.declare_ok'{} =
|
|
||||||
amqp_channel:call(
|
|
||||||
Channel,
|
|
||||||
#'queue.declare'{queue = rabbit_mq_queue()}
|
|
||||||
),
|
|
||||||
%% Bind the queue to the exchange
|
|
||||||
#'queue.bind_ok'{} =
|
|
||||||
amqp_channel:call(
|
|
||||||
Channel,
|
|
||||||
#'queue.bind'{
|
|
||||||
queue = rabbit_mq_queue(),
|
|
||||||
exchange = rabbit_mq_exchange(),
|
|
||||||
routing_key = rabbit_mq_routing_key()
|
|
||||||
}
|
|
||||||
),
|
|
||||||
#{
|
|
||||||
connection => Connection,
|
|
||||||
channel => Channel
|
|
||||||
}.
|
|
||||||
|
|
||||||
end_per_group(_Group, Config) ->
|
|
||||||
#{
|
|
||||||
connection := Connection,
|
|
||||||
channel := Channel
|
|
||||||
} = get_channel_connection(Config),
|
|
||||||
emqx_mgmt_api_test_util:end_suite(),
|
|
||||||
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
|
||||||
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
|
||||||
_ = application:stop(emqx_connector),
|
|
||||||
_ = application:stop(emqx_bridge),
|
|
||||||
%% Close the channel
|
|
||||||
ok = amqp_channel:close(Channel),
|
|
||||||
%% Close the connection
|
|
||||||
ok = amqp_connection:close(Connection).
|
|
||||||
|
|
||||||
init_per_testcase(_, Config) ->
|
|
||||||
Config.
|
|
||||||
|
|
||||||
end_per_testcase(_, _Config) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
rabbitmq_config(UseTLS, Config) ->
|
|
||||||
BatchSize = maps:get(batch_size, Config, 1),
|
|
||||||
BatchTime = maps:get(batch_time_ms, Config, 0),
|
|
||||||
Name = atom_to_binary(?MODULE),
|
|
||||||
Server = maps:get(server, Config, rabbit_mq_host()),
|
|
||||||
Port = maps:get(port, Config, rabbit_mq_port()),
|
|
||||||
Template = maps:get(payload_template, Config, <<"">>),
|
|
||||||
ConfigString =
|
|
||||||
io_lib:format(
|
|
||||||
"bridges.rabbitmq.~s {\n"
|
|
||||||
" enable = true\n"
|
|
||||||
" ssl = ~s\n"
|
|
||||||
" server = \"~s\"\n"
|
|
||||||
" port = ~p\n"
|
|
||||||
" username = \"guest\"\n"
|
|
||||||
" password = \"guest\"\n"
|
|
||||||
" routing_key = \"~s\"\n"
|
|
||||||
" exchange = \"~s\"\n"
|
|
||||||
" payload_template = \"~s\"\n"
|
|
||||||
" resource_opts = {\n"
|
|
||||||
" batch_size = ~b\n"
|
|
||||||
" batch_time = ~bms\n"
|
|
||||||
" }\n"
|
|
||||||
"}\n",
|
|
||||||
[
|
|
||||||
Name,
|
|
||||||
hocon_pp:do(ssl_options(UseTLS), #{embedded => true}),
|
|
||||||
Server,
|
|
||||||
Port,
|
|
||||||
rabbit_mq_routing_key(),
|
|
||||||
rabbit_mq_exchange(),
|
|
||||||
Template,
|
|
||||||
BatchSize,
|
|
||||||
BatchTime
|
|
||||||
]
|
|
||||||
),
|
|
||||||
ct:pal(ConfigString),
|
|
||||||
parse_and_check(ConfigString, <<"rabbitmq">>, Name).
|
|
||||||
|
|
||||||
ssl_options(true) ->
|
|
||||||
CertsDir = filename:join([
|
|
||||||
emqx_common_test_helpers:proj_root(),
|
|
||||||
".ci",
|
|
||||||
"docker-compose-file",
|
|
||||||
"certs"
|
|
||||||
]),
|
|
||||||
#{
|
|
||||||
enable => true,
|
|
||||||
cacertfile => filename:join([CertsDir, "ca.crt"]),
|
|
||||||
certfile => filename:join([CertsDir, "client.pem"]),
|
|
||||||
keyfile => filename:join([CertsDir, "client.key"])
|
|
||||||
};
|
|
||||||
ssl_options(false) ->
|
|
||||||
#{
|
|
||||||
enable => false
|
|
||||||
}.
|
|
||||||
|
|
||||||
parse_and_check(ConfigString, BridgeType, Name) ->
|
|
||||||
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
|
||||||
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
|
||||||
#{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
|
|
||||||
RetConfig.
|
|
||||||
|
|
||||||
create_bridge(Name, UseTLS, Config) ->
|
|
||||||
BridgeConfig = rabbitmq_config(UseTLS, Config),
|
|
||||||
{ok, _} = emqx_bridge:create(?TYPE, Name, BridgeConfig),
|
|
||||||
emqx_bridge_resource:bridge_id(?TYPE, Name).
|
|
||||||
|
|
||||||
delete_bridge(Name) ->
|
|
||||||
ok = emqx_bridge:remove(?TYPE, Name).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% Test Cases
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
t_create_delete_bridge(Config) ->
|
|
||||||
Name = atom_to_binary(?FUNCTION_NAME),
|
|
||||||
RabbitMQ = get_rabbitmq(Config),
|
|
||||||
UseTLS = get_tls(Config),
|
|
||||||
create_bridge(Name, UseTLS, RabbitMQ),
|
|
||||||
Bridges = emqx_bridge:list(),
|
|
||||||
Any = fun(#{name := BName}) -> BName =:= Name end,
|
|
||||||
?assert(lists:any(Any, Bridges), Bridges),
|
|
||||||
ok = delete_bridge(Name),
|
|
||||||
BridgesAfterDelete = emqx_bridge:list(),
|
|
||||||
?assertNot(lists:any(Any, BridgesAfterDelete), BridgesAfterDelete),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_create_delete_bridge_non_existing_server(Config) ->
|
|
||||||
Name = atom_to_binary(?FUNCTION_NAME),
|
|
||||||
UseTLS = get_tls(Config),
|
|
||||||
create_bridge(Name, UseTLS, #{server => <<"non_existing_server">>, port => 3174}),
|
|
||||||
%% Check that the new bridge is in the list of bridges
|
|
||||||
Bridges = emqx_bridge:list(),
|
|
||||||
Any = fun(#{name := BName}) -> BName =:= Name end,
|
|
||||||
?assert(lists:any(Any, Bridges)),
|
|
||||||
ok = delete_bridge(Name),
|
|
||||||
BridgesAfterDelete = emqx_bridge:list(),
|
|
||||||
?assertNot(lists:any(Any, BridgesAfterDelete)),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_send_message_query(Config) ->
|
|
||||||
Name = atom_to_binary(?FUNCTION_NAME),
|
|
||||||
RabbitMQ = get_rabbitmq(Config),
|
|
||||||
UseTLS = get_tls(Config),
|
|
||||||
BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{batch_size => 1}),
|
|
||||||
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
|
|
||||||
%% This will use the SQL template included in the bridge
|
|
||||||
emqx_bridge:send_message(BridgeID, Payload),
|
|
||||||
%% Check that the data got to the database
|
|
||||||
?assertEqual(Payload, receive_simple_test_message(Config)),
|
|
||||||
ok = delete_bridge(Name),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_send_message_query_with_template(Config) ->
|
|
||||||
Name = atom_to_binary(?FUNCTION_NAME),
|
|
||||||
RabbitMQ = get_rabbitmq(Config),
|
|
||||||
UseTLS = get_tls(Config),
|
|
||||||
BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{
|
|
||||||
batch_size => 1,
|
|
||||||
payload_template =>
|
|
||||||
<<
|
|
||||||
"{"
|
|
||||||
" \\\"key\\\": ${key},"
|
|
||||||
" \\\"data\\\": \\\"${data}\\\","
|
|
||||||
" \\\"timestamp\\\": ${timestamp},"
|
|
||||||
" \\\"secret\\\": 42"
|
|
||||||
"}"
|
|
||||||
>>
|
|
||||||
}),
|
|
||||||
Payload = #{
|
|
||||||
<<"key">> => 7,
|
|
||||||
<<"data">> => <<"RabbitMQ">>,
|
|
||||||
<<"timestamp">> => 10000
|
|
||||||
},
|
|
||||||
emqx_bridge:send_message(BridgeID, Payload),
|
|
||||||
%% Check that the data got to the database
|
|
||||||
ExpectedResult = Payload#{
|
|
||||||
<<"secret">> => 42
|
|
||||||
},
|
|
||||||
?assertEqual(ExpectedResult, receive_simple_test_message(Config)),
|
|
||||||
ok = delete_bridge(Name),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_send_simple_batch(Config) ->
|
|
||||||
Name = atom_to_binary(?FUNCTION_NAME),
|
|
||||||
RabbitMQ = get_rabbitmq(Config),
|
|
||||||
BridgeConf = RabbitMQ#{batch_size => 100},
|
|
||||||
UseTLS = get_tls(Config),
|
|
||||||
BridgeID = create_bridge(Name, UseTLS, BridgeConf),
|
|
||||||
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
|
|
||||||
emqx_bridge:send_message(BridgeID, Payload),
|
|
||||||
?assertEqual(Payload, receive_simple_test_message(Config)),
|
|
||||||
ok = delete_bridge(Name),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_send_simple_batch_with_template(Config) ->
|
|
||||||
Name = atom_to_binary(?FUNCTION_NAME),
|
|
||||||
RabbitMQ = get_rabbitmq(Config),
|
|
||||||
UseTLS = get_tls(Config),
|
|
||||||
BridgeConf =
|
|
||||||
RabbitMQ#{
|
|
||||||
batch_size => 100,
|
|
||||||
payload_template =>
|
|
||||||
<<
|
|
||||||
"{"
|
|
||||||
" \\\"key\\\": ${key},"
|
|
||||||
" \\\"data\\\": \\\"${data}\\\","
|
|
||||||
" \\\"timestamp\\\": ${timestamp},"
|
|
||||||
" \\\"secret\\\": 42"
|
|
||||||
"}"
|
|
||||||
>>
|
|
||||||
},
|
|
||||||
BridgeID = create_bridge(Name, UseTLS, BridgeConf),
|
|
||||||
Payload = #{
|
|
||||||
<<"key">> => 7,
|
|
||||||
<<"data">> => <<"RabbitMQ">>,
|
|
||||||
<<"timestamp">> => 10000
|
|
||||||
},
|
|
||||||
emqx_bridge:send_message(BridgeID, Payload),
|
|
||||||
ExpectedResult = Payload#{
|
|
||||||
<<"secret">> => 42
|
|
||||||
},
|
|
||||||
?assertEqual(ExpectedResult, receive_simple_test_message(Config)),
|
|
||||||
ok = delete_bridge(Name),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_heavy_batching(Config) ->
|
|
||||||
Name = atom_to_binary(?FUNCTION_NAME),
|
|
||||||
NumberOfMessages = 20000,
|
|
||||||
RabbitMQ = get_rabbitmq(Config),
|
|
||||||
UseTLS = get_tls(Config),
|
|
||||||
BridgeConf = RabbitMQ#{
|
|
||||||
batch_size => 10173,
|
|
||||||
batch_time_ms => 50
|
|
||||||
},
|
|
||||||
BridgeID = create_bridge(Name, UseTLS, BridgeConf),
|
|
||||||
SendMessage = fun(Key) ->
|
|
||||||
Payload = #{<<"key">> => Key},
|
|
||||||
emqx_bridge:send_message(BridgeID, Payload)
|
|
||||||
end,
|
|
||||||
[SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)],
|
|
||||||
AllMessages = lists:foldl(
|
|
||||||
fun(_, Acc) ->
|
|
||||||
Message = receive_simple_test_message(Config),
|
|
||||||
#{<<"key">> := Key} = Message,
|
|
||||||
Acc#{Key => true}
|
|
||||||
end,
|
|
||||||
#{},
|
|
||||||
lists:seq(1, NumberOfMessages)
|
|
||||||
),
|
|
||||||
?assertEqual(NumberOfMessages, maps:size(AllMessages)),
|
|
||||||
ok = delete_bridge(Name),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
receive_simple_test_message(Config) ->
|
|
||||||
#{channel := Channel} = get_channel_connection(Config),
|
|
||||||
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
|
|
||||||
amqp_channel:call(
|
|
||||||
Channel,
|
|
||||||
#'basic.consume'{
|
|
||||||
queue = rabbit_mq_queue()
|
|
||||||
}
|
|
||||||
),
|
|
||||||
receive
|
|
||||||
%% This is the first message received
|
|
||||||
#'basic.consume_ok'{} ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
receive
|
|
||||||
{#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
|
|
||||||
%% Ack the message
|
|
||||||
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
|
|
||||||
%% Cancel the consumer
|
|
||||||
#'basic.cancel_ok'{consumer_tag = ConsumerTag} =
|
|
||||||
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
|
|
||||||
emqx_utils_json:decode(Content#amqp_msg.payload)
|
|
||||||
after 5000 ->
|
|
||||||
?assert(false, "Did not receive message within 5 second")
|
|
||||||
end.
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_bridge_rabbitmq_connector_SUITE).
|
-module(emqx_bridge_rabbitmq_connector_SUITE).
|
||||||
|
@ -12,6 +12,18 @@
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||||
|
-import(emqx_bridge_rabbitmq_test_utils, [
|
||||||
|
rabbit_mq_exchange/0,
|
||||||
|
rabbit_mq_routing_key/0,
|
||||||
|
rabbit_mq_queue/0,
|
||||||
|
rabbit_mq_host/0,
|
||||||
|
rabbit_mq_port/0,
|
||||||
|
get_rabbitmq/1,
|
||||||
|
ssl_options/1,
|
||||||
|
get_channel_connection/1,
|
||||||
|
parse_and_check/4,
|
||||||
|
receive_message_from_rabbitmq/1
|
||||||
|
]).
|
||||||
|
|
||||||
%% This test SUITE requires a running RabbitMQ instance. If you don't want to
|
%% This test SUITE requires a running RabbitMQ instance. If you don't want to
|
||||||
%% bring up the whole CI infrastructure with the `scripts/ct/run.sh` script
|
%% bring up the whole CI infrastructure with the `scripts/ct/run.sh` script
|
||||||
|
@ -21,99 +33,24 @@
|
||||||
%%
|
%%
|
||||||
%% docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3.11-management
|
%% docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3.11-management
|
||||||
|
|
||||||
rabbit_mq_host() ->
|
|
||||||
list_to_binary(os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq")).
|
|
||||||
|
|
||||||
rabbit_mq_port() ->
|
|
||||||
list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")).
|
|
||||||
|
|
||||||
rabbit_mq_password() ->
|
|
||||||
<<"guest">>.
|
|
||||||
|
|
||||||
rabbit_mq_exchange() ->
|
|
||||||
<<"test_exchange">>.
|
|
||||||
|
|
||||||
rabbit_mq_queue() ->
|
|
||||||
<<"test_queue">>.
|
|
||||||
|
|
||||||
rabbit_mq_routing_key() ->
|
|
||||||
<<"test_routing_key">>.
|
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
[
|
||||||
|
{group, tcp},
|
||||||
|
{group, tls}
|
||||||
|
].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
groups() ->
|
||||||
Host = rabbit_mq_host(),
|
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
||||||
Port = rabbit_mq_port(),
|
[
|
||||||
ct:pal("rabbitmq:~p~n", [{Host, Port}]),
|
{tcp, AllTCs},
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
|
{tls, AllTCs}
|
||||||
true ->
|
].
|
||||||
Apps = emqx_cth_suite:start(
|
|
||||||
[emqx_conf, emqx_connector, emqx_bridge_rabbitmq],
|
|
||||||
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
|
||||||
),
|
|
||||||
ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port),
|
|
||||||
[{channel_connection, ChannelConnection}, {suite_apps, Apps} | Config];
|
|
||||||
false ->
|
|
||||||
case os:getenv("IS_CI") of
|
|
||||||
"yes" ->
|
|
||||||
throw(no_rabbitmq);
|
|
||||||
_ ->
|
|
||||||
{skip, no_rabbitmq}
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
setup_rabbit_mq_exchange_and_queue(Host, Port) ->
|
init_per_group(Group, Config) ->
|
||||||
%% Create an exchange and a queue
|
emqx_bridge_rabbitmq_test_utils:init_per_group(Group, Config).
|
||||||
{ok, Connection} =
|
|
||||||
amqp_connection:start(#amqp_params_network{
|
|
||||||
host = binary_to_list(Host),
|
|
||||||
port = Port
|
|
||||||
}),
|
|
||||||
{ok, Channel} = amqp_connection:open_channel(Connection),
|
|
||||||
%% Create an exchange
|
|
||||||
#'exchange.declare_ok'{} =
|
|
||||||
amqp_channel:call(
|
|
||||||
Channel,
|
|
||||||
#'exchange.declare'{
|
|
||||||
exchange = rabbit_mq_exchange(),
|
|
||||||
type = <<"topic">>
|
|
||||||
}
|
|
||||||
),
|
|
||||||
%% Create a queue
|
|
||||||
#'queue.declare_ok'{} =
|
|
||||||
amqp_channel:call(
|
|
||||||
Channel,
|
|
||||||
#'queue.declare'{queue = rabbit_mq_queue()}
|
|
||||||
),
|
|
||||||
%% Bind the queue to the exchange
|
|
||||||
#'queue.bind_ok'{} =
|
|
||||||
amqp_channel:call(
|
|
||||||
Channel,
|
|
||||||
#'queue.bind'{
|
|
||||||
queue = rabbit_mq_queue(),
|
|
||||||
exchange = rabbit_mq_exchange(),
|
|
||||||
routing_key = rabbit_mq_routing_key()
|
|
||||||
}
|
|
||||||
),
|
|
||||||
#{
|
|
||||||
connection => Connection,
|
|
||||||
channel => Channel
|
|
||||||
}.
|
|
||||||
|
|
||||||
get_channel_connection(Config) ->
|
end_per_group(Group, Config) ->
|
||||||
proplists:get_value(channel_connection, Config).
|
emqx_bridge_rabbitmq_test_utils:end_per_group(Group, Config).
|
||||||
|
|
||||||
end_per_suite(Config) ->
|
|
||||||
#{
|
|
||||||
connection := Connection,
|
|
||||||
channel := Channel
|
|
||||||
} = get_channel_connection(Config),
|
|
||||||
%% Close the channel
|
|
||||||
ok = amqp_channel:close(Channel),
|
|
||||||
%% Close the connection
|
|
||||||
ok = amqp_connection:close(Connection),
|
|
||||||
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
|
|
||||||
|
|
||||||
% %%------------------------------------------------------------------------------
|
% %%------------------------------------------------------------------------------
|
||||||
% %% Testcases
|
% %% Testcases
|
||||||
|
@ -143,7 +80,6 @@ t_start_passfile(Config) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
|
perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
|
||||||
#{channel := Channel} = get_channel_connection(TestConfig),
|
|
||||||
CheckedConfig = check_config(InitialConfig),
|
CheckedConfig = check_config(InitialConfig),
|
||||||
#{
|
#{
|
||||||
id := PoolName,
|
id := PoolName,
|
||||||
|
@ -159,7 +95,7 @@ perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
|
||||||
emqx_resource:get_instance(ResourceID),
|
emqx_resource:get_instance(ResourceID),
|
||||||
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)),
|
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)),
|
||||||
%% Perform query as further check that the resource is working as expected
|
%% Perform query as further check that the resource is working as expected
|
||||||
perform_query(ResourceID, Channel),
|
perform_query(ResourceID, TestConfig),
|
||||||
?assertEqual(ok, emqx_resource:stop(ResourceID)),
|
?assertEqual(ok, emqx_resource:stop(ResourceID)),
|
||||||
%% Resource will be listed still, but state will be changed and healthcheck will fail
|
%% Resource will be listed still, but state will be changed and healthcheck will fail
|
||||||
%% as the worker no longer exists.
|
%% as the worker no longer exists.
|
||||||
|
@ -181,7 +117,7 @@ perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
|
||||||
emqx_resource:get_instance(ResourceID),
|
emqx_resource:get_instance(ResourceID),
|
||||||
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)),
|
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)),
|
||||||
%% Check that everything is working again by performing a query
|
%% Check that everything is working again by performing a query
|
||||||
perform_query(ResourceID, Channel),
|
perform_query(ResourceID, TestConfig),
|
||||||
% Stop and remove the resource in one go.
|
% Stop and remove the resource in one go.
|
||||||
?assertEqual(ok, emqx_resource:remove_local(ResourceID)),
|
?assertEqual(ok, emqx_resource:remove_local(ResourceID)),
|
||||||
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
|
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
|
||||||
|
@ -214,37 +150,12 @@ perform_query(PoolName, Channel) ->
|
||||||
?assertEqual(ok, emqx_resource_manager:add_channel(PoolName, ChannelId, ActionConfig)),
|
?assertEqual(ok, emqx_resource_manager:add_channel(PoolName, ChannelId, ActionConfig)),
|
||||||
ok = emqx_resource:query(PoolName, {ChannelId, payload()}),
|
ok = emqx_resource:query(PoolName, {ChannelId, payload()}),
|
||||||
%% Get the message from queue:
|
%% Get the message from queue:
|
||||||
ok = receive_simple_test_message(Channel),
|
SendData = test_data(),
|
||||||
|
RecvData = receive_message_from_rabbitmq(Channel),
|
||||||
|
?assertMatch(SendData, RecvData),
|
||||||
?assertEqual(ok, emqx_resource_manager:remove_channel(PoolName, ChannelId)),
|
?assertEqual(ok, emqx_resource_manager:remove_channel(PoolName, ChannelId)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
receive_simple_test_message(Channel) ->
|
|
||||||
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
|
|
||||||
amqp_channel:call(
|
|
||||||
Channel,
|
|
||||||
#'basic.consume'{
|
|
||||||
queue = rabbit_mq_queue()
|
|
||||||
}
|
|
||||||
),
|
|
||||||
receive
|
|
||||||
%% This is the first message received
|
|
||||||
#'basic.consume_ok'{} ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
receive
|
|
||||||
{#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
|
|
||||||
Expected = test_data(),
|
|
||||||
?assertEqual(Expected, emqx_utils_json:decode(Content#amqp_msg.payload)),
|
|
||||||
%% Ack the message
|
|
||||||
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
|
|
||||||
%% Cancel the consumer
|
|
||||||
#'basic.cancel_ok'{consumer_tag = ConsumerTag} =
|
|
||||||
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
|
|
||||||
ok
|
|
||||||
after 5000 ->
|
|
||||||
?assert(false, "Did not receive message within 5 second")
|
|
||||||
end.
|
|
||||||
|
|
||||||
rabbitmq_config() ->
|
rabbitmq_config() ->
|
||||||
rabbitmq_config(#{}).
|
rabbitmq_config(#{}).
|
||||||
|
|
||||||
|
@ -278,3 +189,6 @@ rabbitmq_action_config() ->
|
||||||
wait_for_publish_confirmations => true
|
wait_for_publish_confirmations => true
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
rabbit_mq_password() ->
|
||||||
|
<<"guest">>.
|
||||||
|
|
|
@ -0,0 +1,203 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_rabbitmq_test_utils).
|
||||||
|
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||||
|
|
||||||
|
init_per_group(tcp, Config) ->
|
||||||
|
RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"),
|
||||||
|
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")),
|
||||||
|
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
|
||||||
|
true ->
|
||||||
|
Config1 = common_init_per_group(#{
|
||||||
|
host => RabbitMQHost, port => RabbitMQPort, tls => false
|
||||||
|
}),
|
||||||
|
Config1 ++ Config;
|
||||||
|
false ->
|
||||||
|
case os:getenv("IS_CI") of
|
||||||
|
"yes" ->
|
||||||
|
throw(no_rabbitmq);
|
||||||
|
_ ->
|
||||||
|
{skip, no_rabbitmq}
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
init_per_group(tls, Config) ->
|
||||||
|
RabbitMQHost = os:getenv("RABBITMQ_TLS_HOST", "rabbitmq"),
|
||||||
|
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_TLS_PORT", "5671")),
|
||||||
|
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
|
||||||
|
true ->
|
||||||
|
Config1 = common_init_per_group(#{
|
||||||
|
host => RabbitMQHost, port => RabbitMQPort, tls => true
|
||||||
|
}),
|
||||||
|
Config1 ++ Config;
|
||||||
|
false ->
|
||||||
|
case os:getenv("IS_CI") of
|
||||||
|
"yes" ->
|
||||||
|
throw(no_rabbitmq);
|
||||||
|
_ ->
|
||||||
|
{skip, no_rabbitmq}
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
init_per_group(_Group, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
common_init_per_group(Opts) ->
|
||||||
|
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
||||||
|
ok = emqx_common_test_helpers:start_apps([
|
||||||
|
emqx_conf, emqx_bridge, emqx_bridge_rabbitmq, emqx_rule_engine
|
||||||
|
]),
|
||||||
|
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
||||||
|
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||||
|
{ok, _} = application:ensure_all_started(amqp_client),
|
||||||
|
emqx_mgmt_api_test_util:init_suite(),
|
||||||
|
#{host := Host, port := Port, tls := UseTLS} = Opts,
|
||||||
|
ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS),
|
||||||
|
[
|
||||||
|
{channel_connection, ChannelConnection},
|
||||||
|
{rabbitmq, #{server => Host, port => Port, tls => UseTLS}}
|
||||||
|
].
|
||||||
|
|
||||||
|
setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS) ->
|
||||||
|
SSLOptions =
|
||||||
|
case UseTLS of
|
||||||
|
false -> none;
|
||||||
|
true -> emqx_tls_lib:to_client_opts(ssl_options(UseTLS))
|
||||||
|
end,
|
||||||
|
%% Create an exchange and a queue
|
||||||
|
{ok, Connection} =
|
||||||
|
amqp_connection:start(#amqp_params_network{
|
||||||
|
host = Host,
|
||||||
|
port = Port,
|
||||||
|
ssl_options = SSLOptions
|
||||||
|
}),
|
||||||
|
{ok, Channel} = amqp_connection:open_channel(Connection),
|
||||||
|
%% Create an exchange
|
||||||
|
#'exchange.declare_ok'{} =
|
||||||
|
amqp_channel:call(
|
||||||
|
Channel,
|
||||||
|
#'exchange.declare'{
|
||||||
|
exchange = rabbit_mq_exchange(),
|
||||||
|
type = <<"topic">>
|
||||||
|
}
|
||||||
|
),
|
||||||
|
%% Create a queue
|
||||||
|
#'queue.declare_ok'{} =
|
||||||
|
amqp_channel:call(
|
||||||
|
Channel,
|
||||||
|
#'queue.declare'{queue = rabbit_mq_queue()}
|
||||||
|
),
|
||||||
|
%% Bind the queue to the exchange
|
||||||
|
#'queue.bind_ok'{} =
|
||||||
|
amqp_channel:call(
|
||||||
|
Channel,
|
||||||
|
#'queue.bind'{
|
||||||
|
queue = rabbit_mq_queue(),
|
||||||
|
exchange = rabbit_mq_exchange(),
|
||||||
|
routing_key = rabbit_mq_routing_key()
|
||||||
|
}
|
||||||
|
),
|
||||||
|
#{
|
||||||
|
connection => Connection,
|
||||||
|
channel => Channel
|
||||||
|
}.
|
||||||
|
|
||||||
|
end_per_group(_Group, Config) ->
|
||||||
|
#{
|
||||||
|
connection := Connection,
|
||||||
|
channel := Channel
|
||||||
|
} = get_channel_connection(Config),
|
||||||
|
amqp_channel:call(Channel, #'queue.purge'{queue = rabbit_mq_queue()}),
|
||||||
|
emqx_mgmt_api_test_util:end_suite(),
|
||||||
|
ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge_rabbitmq, emqx_rule_engine]),
|
||||||
|
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
||||||
|
_ = application:stop(emqx_connector),
|
||||||
|
_ = application:stop(emqx_bridge),
|
||||||
|
%% Close the channel
|
||||||
|
ok = amqp_channel:close(Channel),
|
||||||
|
%% Close the connection
|
||||||
|
ok = amqp_connection:close(Connection).
|
||||||
|
|
||||||
|
rabbit_mq_host() ->
|
||||||
|
list_to_binary(os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq")).
|
||||||
|
|
||||||
|
rabbit_mq_port() ->
|
||||||
|
list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")).
|
||||||
|
|
||||||
|
rabbit_mq_exchange() ->
|
||||||
|
<<"messages">>.
|
||||||
|
|
||||||
|
rabbit_mq_queue() ->
|
||||||
|
<<"test_queue">>.
|
||||||
|
|
||||||
|
rabbit_mq_routing_key() ->
|
||||||
|
<<"test_routing_key">>.
|
||||||
|
|
||||||
|
get_rabbitmq(Config) ->
|
||||||
|
proplists:get_value(rabbitmq, Config).
|
||||||
|
|
||||||
|
get_channel_connection(Config) ->
|
||||||
|
proplists:get_value(channel_connection, Config).
|
||||||
|
|
||||||
|
ssl_options(true) ->
|
||||||
|
CertsDir = filename:join([
|
||||||
|
emqx_common_test_helpers:proj_root(),
|
||||||
|
".ci",
|
||||||
|
"docker-compose-file",
|
||||||
|
"certs"
|
||||||
|
]),
|
||||||
|
#{
|
||||||
|
enable => true,
|
||||||
|
cacertfile => filename:join([CertsDir, "ca.crt"]),
|
||||||
|
certfile => filename:join([CertsDir, "client.pem"]),
|
||||||
|
keyfile => filename:join([CertsDir, "client.key"])
|
||||||
|
};
|
||||||
|
ssl_options(false) ->
|
||||||
|
#{
|
||||||
|
enable => false
|
||||||
|
}.
|
||||||
|
|
||||||
|
parse_and_check(Key, Mod, Conf, Name) ->
|
||||||
|
ConfStr = hocon_pp:do(Conf, #{}),
|
||||||
|
ct:pal(ConfStr),
|
||||||
|
{ok, RawConf} = hocon:binary(ConfStr, #{format => map}),
|
||||||
|
hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}),
|
||||||
|
#{Key := #{<<"rabbitmq">> := #{Name := RetConf}}} = RawConf,
|
||||||
|
RetConf.
|
||||||
|
|
||||||
|
receive_message_from_rabbitmq(Config) ->
|
||||||
|
#{channel := Channel} = get_channel_connection(Config),
|
||||||
|
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
|
||||||
|
amqp_channel:call(
|
||||||
|
Channel,
|
||||||
|
#'basic.consume'{
|
||||||
|
queue = rabbit_mq_queue()
|
||||||
|
}
|
||||||
|
),
|
||||||
|
receive
|
||||||
|
%% This is the first message received
|
||||||
|
#'basic.consume_ok'{} ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
receive
|
||||||
|
{#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
|
||||||
|
%% Ack the message
|
||||||
|
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
|
||||||
|
%% Cancel the consumer
|
||||||
|
#'basic.cancel_ok'{consumer_tag = ConsumerTag} =
|
||||||
|
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
|
||||||
|
Payload = Content#amqp_msg.payload,
|
||||||
|
case emqx_utils_json:safe_decode(Payload, [return_maps]) of
|
||||||
|
{ok, Msg} -> Msg;
|
||||||
|
{error, _} -> ?assert(false, {"Failed to decode the message", Payload})
|
||||||
|
end
|
||||||
|
after 5000 ->
|
||||||
|
?assert(false, "Did not receive message within 5 second")
|
||||||
|
end.
|
|
@ -0,0 +1,221 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_rabbitmq_v1_SUITE).
|
||||||
|
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||||
|
|
||||||
|
%% See comment in
|
||||||
|
%% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to
|
||||||
|
%% run this without bringing up the whole CI infrastructure
|
||||||
|
-define(TYPE, <<"rabbitmq">>).
|
||||||
|
-import(emqx_bridge_rabbitmq_test_utils, [
|
||||||
|
rabbit_mq_exchange/0,
|
||||||
|
rabbit_mq_routing_key/0,
|
||||||
|
rabbit_mq_queue/0,
|
||||||
|
rabbit_mq_host/0,
|
||||||
|
rabbit_mq_port/0,
|
||||||
|
get_rabbitmq/1,
|
||||||
|
ssl_options/1,
|
||||||
|
get_channel_connection/1,
|
||||||
|
parse_and_check/4,
|
||||||
|
receive_message_from_rabbitmq/1
|
||||||
|
]).
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Common Test Setup, Tear down and Testcase List
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
[
|
||||||
|
{group, tcp},
|
||||||
|
{group, tls}
|
||||||
|
].
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
||||||
|
[
|
||||||
|
{tcp, AllTCs},
|
||||||
|
{tls, AllTCs}
|
||||||
|
].
|
||||||
|
|
||||||
|
init_per_group(Group, Config) ->
|
||||||
|
emqx_bridge_rabbitmq_test_utils:init_per_group(Group, Config).
|
||||||
|
|
||||||
|
end_per_group(Group, Config) ->
|
||||||
|
emqx_bridge_rabbitmq_test_utils:end_per_group(Group, Config).
|
||||||
|
|
||||||
|
create_bridge(Name, Config) ->
|
||||||
|
BridgeConfig = rabbitmq_config(Config),
|
||||||
|
{ok, _} = emqx_bridge:create(?TYPE, Name, BridgeConfig),
|
||||||
|
emqx_bridge_resource:bridge_id(?TYPE, Name).
|
||||||
|
|
||||||
|
delete_bridge(Name) ->
|
||||||
|
ok = emqx_bridge:remove(?TYPE, Name).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Test Cases
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_create_delete_bridge(Config) ->
|
||||||
|
Name = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
RabbitMQ = get_rabbitmq(Config),
|
||||||
|
create_bridge(Name, RabbitMQ),
|
||||||
|
Bridges = emqx_bridge:list(),
|
||||||
|
Any = fun(#{name := BName}) -> BName =:= Name end,
|
||||||
|
?assert(lists:any(Any, Bridges), Bridges),
|
||||||
|
ok = delete_bridge(Name),
|
||||||
|
BridgesAfterDelete = emqx_bridge:list(),
|
||||||
|
?assertNot(lists:any(Any, BridgesAfterDelete), BridgesAfterDelete),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_create_delete_bridge_non_existing_server(_Config) ->
|
||||||
|
Name = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
create_bridge(Name, #{server => <<"non_existing_server">>, port => 3174}),
|
||||||
|
%% Check that the new bridge is in the list of bridges
|
||||||
|
Bridges = emqx_bridge:list(),
|
||||||
|
Any = fun(#{name := BName}) -> BName =:= Name end,
|
||||||
|
?assert(lists:any(Any, Bridges)),
|
||||||
|
ok = delete_bridge(Name),
|
||||||
|
BridgesAfterDelete = emqx_bridge:list(),
|
||||||
|
?assertNot(lists:any(Any, BridgesAfterDelete)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_send_message_query(Config) ->
|
||||||
|
Name = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
RabbitMQ = get_rabbitmq(Config),
|
||||||
|
BridgeID = create_bridge(Name, RabbitMQ#{batch_size => 1}),
|
||||||
|
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
|
||||||
|
%% This will use the SQL template included in the bridge
|
||||||
|
emqx_bridge:send_message(BridgeID, Payload),
|
||||||
|
%% Check that the data got to the database
|
||||||
|
?assertEqual(Payload, receive_message_from_rabbitmq(Config)),
|
||||||
|
ok = delete_bridge(Name),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_send_message_query_with_template(Config) ->
|
||||||
|
Name = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
RabbitMQ = get_rabbitmq(Config),
|
||||||
|
BridgeID = create_bridge(Name, RabbitMQ#{
|
||||||
|
batch_size => 1,
|
||||||
|
payload_template => payload_template()
|
||||||
|
}),
|
||||||
|
Payload = #{
|
||||||
|
<<"key">> => 7,
|
||||||
|
<<"data">> => <<"RabbitMQ">>,
|
||||||
|
<<"timestamp">> => 10000
|
||||||
|
},
|
||||||
|
emqx_bridge:send_message(BridgeID, Payload),
|
||||||
|
%% Check that the data got to the database
|
||||||
|
ExpectedResult = Payload#{
|
||||||
|
<<"secret">> => 42
|
||||||
|
},
|
||||||
|
?assertEqual(ExpectedResult, receive_message_from_rabbitmq(Config)),
|
||||||
|
ok = delete_bridge(Name),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_send_simple_batch(Config) ->
|
||||||
|
Name = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
RabbitMQ = get_rabbitmq(Config),
|
||||||
|
BridgeConf = RabbitMQ#{batch_size => 100},
|
||||||
|
BridgeID = create_bridge(Name, BridgeConf),
|
||||||
|
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
|
||||||
|
emqx_bridge:send_message(BridgeID, Payload),
|
||||||
|
?assertEqual(Payload, receive_message_from_rabbitmq(Config)),
|
||||||
|
ok = delete_bridge(Name),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_send_simple_batch_with_template(Config) ->
|
||||||
|
Name = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
RabbitMQ = get_rabbitmq(Config),
|
||||||
|
BridgeConf =
|
||||||
|
RabbitMQ#{
|
||||||
|
batch_size => 100,
|
||||||
|
payload_template => payload_template()
|
||||||
|
},
|
||||||
|
BridgeID = create_bridge(Name, BridgeConf),
|
||||||
|
Payload = #{
|
||||||
|
<<"key">> => 7,
|
||||||
|
<<"data">> => <<"RabbitMQ">>,
|
||||||
|
<<"timestamp">> => 10000
|
||||||
|
},
|
||||||
|
emqx_bridge:send_message(BridgeID, Payload),
|
||||||
|
ExpectedResult = Payload#{<<"secret">> => 42},
|
||||||
|
?assertEqual(ExpectedResult, receive_message_from_rabbitmq(Config)),
|
||||||
|
ok = delete_bridge(Name),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_heavy_batching(Config) ->
|
||||||
|
Name = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
NumberOfMessages = 20000,
|
||||||
|
RabbitMQ = get_rabbitmq(Config),
|
||||||
|
BridgeConf = RabbitMQ#{
|
||||||
|
batch_size => 10173,
|
||||||
|
batch_time_ms => 50
|
||||||
|
},
|
||||||
|
BridgeID = create_bridge(Name, BridgeConf),
|
||||||
|
SendMessage = fun(Key) ->
|
||||||
|
Payload = #{<<"key">> => Key},
|
||||||
|
emqx_bridge:send_message(BridgeID, Payload)
|
||||||
|
end,
|
||||||
|
[SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)],
|
||||||
|
AllMessages = lists:foldl(
|
||||||
|
fun(_, Acc) ->
|
||||||
|
Message = receive_message_from_rabbitmq(Config),
|
||||||
|
#{<<"key">> := Key} = Message,
|
||||||
|
Acc#{Key => true}
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
lists:seq(1, NumberOfMessages)
|
||||||
|
),
|
||||||
|
?assertEqual(NumberOfMessages, maps:size(AllMessages)),
|
||||||
|
ok = delete_bridge(Name),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
rabbitmq_config(Config) ->
|
||||||
|
UseTLS = maps:get(tls, Config, false),
|
||||||
|
BatchSize = maps:get(batch_size, Config, 1),
|
||||||
|
BatchTime = maps:get(batch_time_ms, Config, 0),
|
||||||
|
Name = atom_to_binary(?MODULE),
|
||||||
|
Server = maps:get(server, Config, rabbit_mq_host()),
|
||||||
|
Port = maps:get(port, Config, rabbit_mq_port()),
|
||||||
|
Template = maps:get(payload_template, Config, <<"">>),
|
||||||
|
Bridge =
|
||||||
|
#{
|
||||||
|
<<"bridges">> => #{
|
||||||
|
<<"rabbitmq">> => #{
|
||||||
|
Name => #{
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"ssl">> => ssl_options(UseTLS),
|
||||||
|
<<"server">> => Server,
|
||||||
|
<<"port">> => Port,
|
||||||
|
<<"username">> => <<"guest">>,
|
||||||
|
<<"password">> => <<"guest">>,
|
||||||
|
<<"routing_key">> => rabbit_mq_routing_key(),
|
||||||
|
<<"exchange">> => rabbit_mq_exchange(),
|
||||||
|
<<"payload_template">> => Template,
|
||||||
|
<<"resource_opts">> => #{
|
||||||
|
<<"batch_size">> => BatchSize,
|
||||||
|
<<"batch_time">> => BatchTime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
parse_and_check(<<"bridges">>, emqx_bridge_schema, Bridge, Name).
|
||||||
|
|
||||||
|
payload_template() ->
|
||||||
|
<<
|
||||||
|
"{"
|
||||||
|
" \"key\": ${key},"
|
||||||
|
" \"data\": \"${data}\","
|
||||||
|
" \"timestamp\": ${timestamp},"
|
||||||
|
" \"secret\": 42"
|
||||||
|
"}"
|
||||||
|
>>.
|
|
@ -0,0 +1,261 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_rabbitmq_v2_SUITE).
|
||||||
|
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||||
|
|
||||||
|
-import(emqx_bridge_rabbitmq_test_utils, [
|
||||||
|
rabbit_mq_exchange/0,
|
||||||
|
rabbit_mq_routing_key/0,
|
||||||
|
rabbit_mq_queue/0,
|
||||||
|
rabbit_mq_host/0,
|
||||||
|
rabbit_mq_port/0,
|
||||||
|
get_rabbitmq/1,
|
||||||
|
get_tls/1,
|
||||||
|
ssl_options/1,
|
||||||
|
get_channel_connection/1,
|
||||||
|
parse_and_check/4,
|
||||||
|
receive_message_from_rabbitmq/1
|
||||||
|
]).
|
||||||
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||||
|
|
||||||
|
-define(TYPE, <<"rabbitmq">>).
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
[
|
||||||
|
{group, tcp},
|
||||||
|
{group, tls}
|
||||||
|
].
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
||||||
|
[
|
||||||
|
{tcp, AllTCs},
|
||||||
|
{tls, AllTCs}
|
||||||
|
].
|
||||||
|
|
||||||
|
init_per_group(Group, Config) ->
|
||||||
|
Config1 = emqx_bridge_rabbitmq_test_utils:init_per_group(Group, Config),
|
||||||
|
Name = atom_to_binary(?MODULE),
|
||||||
|
create_connector(Name, get_rabbitmq(Config1)),
|
||||||
|
Config1.
|
||||||
|
|
||||||
|
end_per_group(Group, Config) ->
|
||||||
|
Name = atom_to_binary(?MODULE),
|
||||||
|
delete_connector(Name),
|
||||||
|
emqx_bridge_rabbitmq_test_utils:end_per_group(Group, Config).
|
||||||
|
|
||||||
|
rabbitmq_connector(Config) ->
|
||||||
|
UseTLS = maps:get(tls, Config, false),
|
||||||
|
Name = atom_to_binary(?MODULE),
|
||||||
|
Server = maps:get(server, Config, rabbit_mq_host()),
|
||||||
|
Port = maps:get(port, Config, rabbit_mq_port()),
|
||||||
|
Connector = #{
|
||||||
|
<<"connectors">> => #{
|
||||||
|
<<"rabbitmq">> => #{
|
||||||
|
Name => #{
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"ssl">> => ssl_options(UseTLS),
|
||||||
|
<<"server">> => Server,
|
||||||
|
<<"port">> => Port,
|
||||||
|
<<"username">> => <<"guest">>,
|
||||||
|
<<"password">> => <<"guest">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name).
|
||||||
|
|
||||||
|
rabbitmq_source() ->
|
||||||
|
Name = atom_to_binary(?MODULE),
|
||||||
|
Source = #{
|
||||||
|
<<"sources">> => #{
|
||||||
|
<<"rabbitmq">> => #{
|
||||||
|
Name => #{
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"connector">> => Name,
|
||||||
|
<<"parameters">> => #{
|
||||||
|
<<"no_ack">> => true,
|
||||||
|
<<"queue">> => rabbit_mq_queue(),
|
||||||
|
<<"wait_for_publish_confirmations">> => true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
parse_and_check(<<"sources">>, emqx_bridge_v2_schema, Source, Name).
|
||||||
|
|
||||||
|
rabbitmq_action() ->
|
||||||
|
Name = atom_to_binary(?MODULE),
|
||||||
|
Action = #{
|
||||||
|
<<"actions">> => #{
|
||||||
|
<<"rabbitmq">> => #{
|
||||||
|
Name => #{
|
||||||
|
<<"connector">> => Name,
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"parameters">> => #{
|
||||||
|
<<"exchange">> => rabbit_mq_exchange(),
|
||||||
|
<<"payload_template">> => <<"${.payload}">>,
|
||||||
|
<<"routing_key">> => rabbit_mq_routing_key(),
|
||||||
|
<<"delivery_mode">> => <<"non_persistent">>,
|
||||||
|
<<"publish_confirmation_timeout">> => <<"30s">>,
|
||||||
|
<<"wait_for_publish_confirmations">> => true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name).
|
||||||
|
|
||||||
|
create_connector(Name, Config) ->
|
||||||
|
Connector = rabbitmq_connector(Config),
|
||||||
|
{ok, _} = emqx_connector:create(?TYPE, Name, Connector).
|
||||||
|
|
||||||
|
delete_connector(Name) ->
|
||||||
|
ok = emqx_connector:remove(?TYPE, Name).
|
||||||
|
|
||||||
|
create_source(Name) ->
|
||||||
|
Source = rabbitmq_source(),
|
||||||
|
{ok, _} = emqx_bridge_v2:create(sources, ?TYPE, Name, Source).
|
||||||
|
|
||||||
|
delete_source(Name) ->
|
||||||
|
ok = emqx_bridge_v2:remove(sources, ?TYPE, Name).
|
||||||
|
|
||||||
|
create_action(Name) ->
|
||||||
|
Action = rabbitmq_action(),
|
||||||
|
{ok, _} = emqx_bridge_v2:create(actions, ?TYPE, Name, Action).
|
||||||
|
|
||||||
|
delete_action(Name) ->
|
||||||
|
ok = emqx_bridge_v2:remove(actions, ?TYPE, Name).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Test Cases
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_source(Config) ->
|
||||||
|
Name = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
create_source(Name),
|
||||||
|
Sources = emqx_bridge_v2:list(sources),
|
||||||
|
Any = fun(#{name := BName}) -> BName =:= Name end,
|
||||||
|
?assert(lists:any(Any, Sources), Sources),
|
||||||
|
Topic = <<"tesldkafd">>,
|
||||||
|
{ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
|
||||||
|
#{
|
||||||
|
sql => <<"select * from \"$bridges/rabbitmq:", Name/binary, "\"">>,
|
||||||
|
id => atom_to_binary(?FUNCTION_NAME),
|
||||||
|
actions => [
|
||||||
|
#{
|
||||||
|
args => #{
|
||||||
|
topic => Topic,
|
||||||
|
mqtt_properties => #{},
|
||||||
|
payload => <<"${payload}">>,
|
||||||
|
qos => 0,
|
||||||
|
retain => false,
|
||||||
|
user_properties => []
|
||||||
|
},
|
||||||
|
function => republish
|
||||||
|
}
|
||||||
|
],
|
||||||
|
description => <<"bridge_v2 republish rule">>
|
||||||
|
}
|
||||||
|
),
|
||||||
|
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, true}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
{ok, #{}, [0]} = emqtt:subscribe(C1, Topic, [{qos, 0}, {rh, 0}]),
|
||||||
|
send_test_message_to_rabbitmq(Config),
|
||||||
|
PayloadBin = emqx_utils_json:encode(payload()),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
dup := false,
|
||||||
|
properties := undefined,
|
||||||
|
topic := Topic,
|
||||||
|
qos := 0,
|
||||||
|
payload := PayloadBin,
|
||||||
|
retain := false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
receive_messages(1)
|
||||||
|
),
|
||||||
|
ok = emqtt:disconnect(C1),
|
||||||
|
ok = delete_source(Name),
|
||||||
|
SourcesAfterDelete = emqx_bridge_v2:list(sources),
|
||||||
|
?assertNot(lists:any(Any, SourcesAfterDelete), SourcesAfterDelete),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_action(Config) ->
|
||||||
|
Name = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
create_action(Name),
|
||||||
|
Actions = emqx_bridge_v2:list(actions),
|
||||||
|
Any = fun(#{name := BName}) -> BName =:= Name end,
|
||||||
|
?assert(lists:any(Any, Actions), Actions),
|
||||||
|
Topic = <<"lkadfdaction">>,
|
||||||
|
{ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
|
||||||
|
#{
|
||||||
|
sql => <<"select * from \"", Topic/binary, "\"">>,
|
||||||
|
id => atom_to_binary(?FUNCTION_NAME),
|
||||||
|
actions => [<<"rabbitmq:", Name/binary>>],
|
||||||
|
description => <<"bridge_v2 send msg to rabbitmq action">>
|
||||||
|
}
|
||||||
|
),
|
||||||
|
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, true}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
Payload = payload(),
|
||||||
|
PayloadBin = emqx_utils_json:encode(Payload),
|
||||||
|
{ok, _} = emqtt:publish(C1, Topic, #{}, PayloadBin, [{qos, 1}, {retain, false}]),
|
||||||
|
Msg = receive_message_from_rabbitmq(Config),
|
||||||
|
?assertMatch(Payload, Msg),
|
||||||
|
ok = emqtt:disconnect(C1),
|
||||||
|
ok = delete_action(Name),
|
||||||
|
ActionsAfterDelete = emqx_bridge_v2:list(actions),
|
||||||
|
?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
receive_messages(Count) ->
|
||||||
|
receive_messages(Count, []).
|
||||||
|
receive_messages(0, Msgs) ->
|
||||||
|
Msgs;
|
||||||
|
receive_messages(Count, Msgs) ->
|
||||||
|
receive
|
||||||
|
{publish, Msg} ->
|
||||||
|
ct:log("Msg: ~p ~n", [Msg]),
|
||||||
|
receive_messages(Count - 1, [Msg | Msgs]);
|
||||||
|
Other ->
|
||||||
|
ct:log("Other Msg: ~p~n", [Other]),
|
||||||
|
receive_messages(Count, Msgs)
|
||||||
|
after 2000 ->
|
||||||
|
Msgs
|
||||||
|
end.
|
||||||
|
|
||||||
|
payload() ->
|
||||||
|
#{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}.
|
||||||
|
|
||||||
|
send_test_message_to_rabbitmq(Config) ->
|
||||||
|
#{channel := Channel} = get_channel_connection(Config),
|
||||||
|
MessageProperties = #'P_basic'{
|
||||||
|
headers = [],
|
||||||
|
delivery_mode = 1
|
||||||
|
},
|
||||||
|
Method = #'basic.publish'{
|
||||||
|
exchange = rabbit_mq_exchange(),
|
||||||
|
routing_key = rabbit_mq_routing_key()
|
||||||
|
},
|
||||||
|
amqp_channel:cast(
|
||||||
|
Channel,
|
||||||
|
Method,
|
||||||
|
#amqp_msg{
|
||||||
|
payload = emqx_utils_json:encode(payload()),
|
||||||
|
props = MessageProperties
|
||||||
|
}
|
||||||
|
),
|
||||||
|
ok.
|
|
@ -21,21 +21,6 @@ source_parameters.desc:
|
||||||
source_parameters.label:
|
source_parameters.label:
|
||||||
"""Source Parameters"""
|
"""Source Parameters"""
|
||||||
|
|
||||||
source_topic.desc:
|
|
||||||
"""Topic used for constructing MQTT messages, supporting templates."""
|
|
||||||
source_topic.label:
|
|
||||||
"""Source Topic"""
|
|
||||||
|
|
||||||
source_qos.desc:
|
|
||||||
"""The QoS level of the MQTT message, supporting templates."""
|
|
||||||
source_qos.label:
|
|
||||||
"""QoS"""
|
|
||||||
|
|
||||||
source_payload_template.desc:
|
|
||||||
"""The template used to construct the payload of the MQTT message."""
|
|
||||||
source_payload_template.label:
|
|
||||||
"""Source Payload Template"""
|
|
||||||
|
|
||||||
source_queue.desc:
|
source_queue.desc:
|
||||||
"""The queue name of the RabbitMQ broker."""
|
"""The queue name of the RabbitMQ broker."""
|
||||||
source_queue.label:
|
source_queue.label:
|
||||||
|
|
Loading…
Reference in New Issue