%%-------------------------------------------------------------------- % Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_rocketmq_SUITE). -compile(nowarn_export_all). -compile(export_all). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). % Bridge defaults -define(TOPIC, "TopicTest"). -define(DENY_TOPIC, "DENY_TOPIC"). -define(ACCESS_KEY, "RocketMQ"). -define(SECRET_KEY, "12345678"). -define(BATCH_SIZE, 10). -define(PAYLOAD, <<"HELLO">>). -define(GET_CONFIG(KEY__, CFG__), proplists:get_value(KEY__, CFG__)). %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ all() -> [ {group, async}, {group, sync}, {group, acl} ]. groups() -> TCs = emqx_common_test_helpers:all(?MODULE) -- [t_acl_deny], BatchingGroups = [{group, with_batch}, {group, without_batch}], [ {async, BatchingGroups}, {sync, BatchingGroups}, {with_batch, TCs}, {without_batch, TCs}, {acl, [t_acl_deny]} ]. init_per_group(async, Config) -> [{query_mode, async} | Config]; init_per_group(sync, Config) -> [{query_mode, sync} | Config]; init_per_group(with_batch, Config0) -> Config = [{batch_size, ?BATCH_SIZE} | Config0], common_init(Config); init_per_group(without_batch, Config0) -> Config = [{batch_size, 1} | Config0], common_init(Config); init_per_group(acl, Config0) -> Config = [{batch_size, 1}, {query_mode, sync} | Config0], common_init(Config); init_per_group(_Group, Config) -> Config. end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch -> ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), ok; end_per_group(_Group, _Config) -> ok. init_per_suite(Config) -> Config. end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), ok. init_per_testcase(_Testcase, Config) -> delete_bridge(Config), Config. end_per_testcase(_Testcase, Config) -> ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), ok = snabbkaffe:stop(), delete_bridge(Config), ok. %%------------------------------------------------------------------------------ %% Helper fns %%------------------------------------------------------------------------------ common_init(ConfigT) -> BridgeType = <<"rocketmq">>, Host = os:getenv("ROCKETMQ_HOST", "toxiproxy"), Port = list_to_integer(os:getenv("ROCKETMQ_PORT", "9876")), Config0 = [ {host, Host}, {port, Port}, {proxy_name, "rocketmq"} | ConfigT ], case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of true -> % Setup toxiproxy ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), % Ensure enterprise bridge module is loaded ok = emqx_common_test_helpers:start_apps([ emqx_conf, emqx_resource, emqx_bridge, rocketmq ]), _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), {Name, RocketMQConf} = rocketmq_config(BridgeType, Config0), RocketMQSSLConf = RocketMQConf#{ <<"servers">> => <<"rocketmq_namesrv_ssl:9876">>, <<"ssl">> => #{ <<"enable">> => true, <<"verify">> => verify_none } }, Config = [ {rocketmq_config, RocketMQConf}, {rocketmq_config_ssl, RocketMQSSLConf}, {rocketmq_bridge_type, BridgeType}, {rocketmq_name, Name}, {proxy_host, ProxyHost}, {proxy_port, ProxyPort} | Config0 ], Config; false -> case os:getenv("IS_CI") of false -> {skip, no_rocketmq}; _ -> throw(no_rocketmq) end end. rocketmq_config(BridgeType, Config) -> Port = integer_to_list(?GET_CONFIG(port, Config)), Server = ?GET_CONFIG(host, Config) ++ ":" ++ Port, Name = atom_to_binary(?MODULE), BatchSize = ?config(batch_size, Config), QueryMode = ?config(query_mode, Config), ConfigString = io_lib:format( "bridges.~s.~s {\n" " enable = true\n" " servers = ~p\n" " access_key = ~p\n" " secret_key = ~p\n" " topic = ~p\n" " resource_opts = {\n" " request_ttl = 1500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" "}", [ BridgeType, Name, Server, ?ACCESS_KEY, ?SECRET_KEY, ?TOPIC, BatchSize, QueryMode ] ), {Name, parse_and_check(ConfigString, BridgeType, Name)}. parse_and_check(ConfigString, BridgeType, Name) -> {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf, Config. create_bridge(Config) -> BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), Name = ?GET_CONFIG(rocketmq_name, Config), RocketMQConf = ?GET_CONFIG(rocketmq_config, Config), emqx_bridge:create(BridgeType, Name, RocketMQConf). create_bridge_ssl(Config) -> BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), Name = ?GET_CONFIG(rocketmq_name, Config), RocketMQConf = ?GET_CONFIG(rocketmq_config_ssl, Config), emqx_bridge:create(BridgeType, Name, RocketMQConf). create_bridge_ssl_bad_ssl_opts(Config) -> BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), Name = ?GET_CONFIG(rocketmq_name, Config), RocketMQConf0 = ?GET_CONFIG(rocketmq_config_ssl, Config), %% This config is wrong because we use verify_peer without %% a cert that can be used in the verification. RocketMQConf1 = maps:put( <<"ssl">>, #{ <<"enable">> => true, <<"verify">> => verify_peer }, RocketMQConf0 ), emqx_bridge:create(BridgeType, Name, RocketMQConf1). delete_bridge(Config) -> BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), Name = ?GET_CONFIG(rocketmq_name, Config), emqx_bridge:remove(BridgeType, Name). create_bridge_http(Params) -> Path = emqx_mgmt_api_test_util:api_path(["bridges"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; Error -> Error end. send_message(Config, Payload) -> Name = ?GET_CONFIG(rocketmq_name, Config), BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), ActionId = emqx_bridge_v2:id(BridgeType, Name), emqx_bridge_v2:query(BridgeType, Name, {ActionId, Payload}, #{}). query_resource(Config, Request) -> Name = ?GET_CONFIG(rocketmq_name, Config), BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), ID = emqx_bridge_v2:id(BridgeType, Name), ResID = emqx_connector_resource:resource_id(BridgeType, Name), emqx_resource:query(ID, Request, #{timeout => 500, connector_resource_id => ResID}). %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ t_setup_via_config_and_publish(Config) -> ?assertMatch( {ok, _}, create_bridge(Config) ), SentData = #{payload => ?PAYLOAD}, ?check_trace( begin ?wait_async_action( ?assertEqual(ok, send_message(Config, SentData)), #{?snk_kind := rocketmq_connector_query_return}, 10_000 ), ok end, fun(Trace0) -> Trace = ?of_kind(rocketmq_connector_query_return, Trace0), ?assertMatch([#{result := ok}], Trace), ok end ), ok. t_setup_via_config_and_publish_ssl(Config) -> ?assertMatch( {ok, _}, create_bridge_ssl(Config) ), SentData = #{payload => ?PAYLOAD}, ?check_trace( begin ?wait_async_action( ?assertEqual(ok, send_message(Config, SentData)), #{?snk_kind := rocketmq_connector_query_return}, 10_000 ), ok end, fun(Trace0) -> Trace = ?of_kind(rocketmq_connector_query_return, Trace0), ?assertMatch([#{result := ok}], Trace), ok end ), ok. %% Check that we can not connect to the SSL only RocketMQ instance %% with incorrect SSL options t_setup_via_config_ssl_host_bad_ssl_opts(Config) -> ?assertMatch( {ok, _}, create_bridge_ssl_bad_ssl_opts(Config) ), Name = ?GET_CONFIG(rocketmq_name, Config), BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceID)), ?assertMatch(#{status := disconnected}, emqx_bridge_v2:health_check(BridgeType, Name)), ok. t_setup_via_http_api_and_publish(Config) -> BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), Name = ?GET_CONFIG(rocketmq_name, Config), RocketMQConf = ?GET_CONFIG(rocketmq_config, Config), RocketMQConf2 = RocketMQConf#{ <<"name">> => Name, <<"type">> => BridgeType }, ?assertMatch( {ok, _}, create_bridge_http(RocketMQConf2) ), SentData = #{payload => ?PAYLOAD}, ?check_trace( begin ?wait_async_action( ?assertEqual(ok, send_message(Config, SentData)), #{?snk_kind := rocketmq_connector_query_return}, 10_000 ), ok end, fun(Trace0) -> Trace = ?of_kind(rocketmq_connector_query_return, Trace0), ?assertMatch([#{result := ok}], Trace), ok end ), ok. t_setup_two_actions_via_http_api_and_publish(Config) -> BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), Name = ?GET_CONFIG(rocketmq_name, Config), RocketMQConf = ?GET_CONFIG(rocketmq_config, Config), RocketMQConf2 = RocketMQConf#{ <<"name">> => Name, <<"type">> => BridgeType }, ?assertMatch( {ok, _}, create_bridge_http(RocketMQConf2) ), {ok, #{raw_config := ActionConf}} = emqx_bridge_v2:lookup(actions, BridgeType, Name), Topic2 = <<"Topic2">>, ActionConf2 = emqx_utils_maps:deep_force_put( [<<"parameters">>, <<"topic">>], ActionConf, Topic2 ), Action2Name = atom_to_binary(?FUNCTION_NAME), {ok, _} = emqx_bridge_v2:create(BridgeType, Action2Name, ActionConf2), SentData = #{payload => ?PAYLOAD}, ?check_trace( begin ?wait_async_action( ?assertEqual(ok, send_message(Config, SentData)), #{?snk_kind := rocketmq_connector_query_return}, 10_000 ), ok end, fun(Trace0) -> Trace = ?of_kind(rocketmq_connector_query_return, Trace0), ?assertMatch([#{result := ok}], Trace), ok end ), Config2 = proplists:delete(rocketmq_name, Config), Config3 = [{rocketmq_name, Action2Name} | Config2], ?check_trace( begin ?wait_async_action( ?assertEqual(ok, send_message(Config3, SentData)), #{?snk_kind := rocketmq_connector_query_return}, 10_000 ), ok end, fun(Trace0) -> Trace = ?of_kind(rocketmq_connector_query_return, Trace0), ?assertMatch([#{result := ok}], Trace), ok end ), ok. t_get_status(Config) -> ?assertMatch( {ok, _}, create_bridge(Config) ), Name = ?GET_CONFIG(rocketmq_name, Config), BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)), ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)), ok. t_simple_query(Config) -> ?assertMatch( {ok, _}, create_bridge(Config) ), Type = ?GET_CONFIG(rocketmq_bridge_type, Config), Name = ?GET_CONFIG(rocketmq_name, Config), ActionId = emqx_bridge_v2:id(Type, Name), Request = {ActionId, #{message => <<"Hello">>}}, Result = query_resource(Config, Request), ?assertEqual(ok, Result), ok. t_acl_deny(Config0) -> RocketCfg = ?GET_CONFIG(rocketmq_config, Config0), RocketCfg2 = RocketCfg#{<<"topic">> := ?DENY_TOPIC}, Config = lists:keyreplace(rocketmq_config, 1, Config0, {rocketmq_config, RocketCfg2}), ?assertMatch( {ok, _}, create_bridge(Config) ), SentData = #{payload => ?PAYLOAD}, ?check_trace( begin ?wait_async_action( ?assertMatch({error, #{<<"code">> := 1}}, send_message(Config, SentData)), #{?snk_kind := rocketmq_connector_query_return}, 10_000 ), ok end, fun(Trace0) -> Trace = ?of_kind(rocketmq_connector_query_return, Trace0), ?assertMatch([#{error := #{<<"code">> := 1}}], Trace), ok end ), ok.