diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl index 775acf7b0..bc8c0b04f 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl @@ -40,15 +40,16 @@ init_per_group(Type = rs, Config) -> MongoPort = list_to_integer(os:getenv("MONGO_RS_PORT", "27017")), case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of true -> - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), + ensure_loaded(), ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), emqx_mgmt_api_test_util:init_suite(), - MongoConfig = mongo_config(MongoHost, MongoPort, Type), + {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type), [ {mongo_host, MongoHost}, {mongo_port, MongoPort}, - {mongo_config, MongoConfig} + {mongo_config, MongoConfig}, + {mongo_type, Type}, + {mongo_name, Name} | Config ]; false -> @@ -59,15 +60,16 @@ init_per_group(Type = sharded, Config) -> MongoPort = list_to_integer(os:getenv("MONGO_SHARDED_PORT", "27017")), case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of true -> - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), + ensure_loaded(), ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), emqx_mgmt_api_test_util:init_suite(), - MongoConfig = mongo_config(MongoHost, MongoPort, Type), + {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type), [ {mongo_host, MongoHost}, {mongo_port, MongoPort}, - {mongo_config, MongoConfig} + {mongo_config, MongoConfig}, + {mongo_type, Type}, + {mongo_name, Name} | Config ]; false -> @@ -78,15 +80,16 @@ init_per_group(Type = single, Config) -> MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")), case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of true -> - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), + ensure_loaded(), ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), emqx_mgmt_api_test_util:init_suite(), - MongoConfig = mongo_config(MongoHost, MongoPort, Type), + {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type), [ {mongo_host, MongoHost}, {mongo_port, MongoPort}, - {mongo_config, MongoConfig} + {mongo_config, MongoConfig}, + {mongo_type, Type}, + {mongo_name, Name} | Config ]; false -> @@ -118,65 +121,84 @@ end_per_testcase(_Testcase, Config) -> %% Helper fns %%------------------------------------------------------------------------------ -mongo_config(MongoHost0, MongoPort0, rs) -> - MongoHost = list_to_binary(MongoHost0), - MongoPort = integer_to_binary(MongoPort0), - Servers = <>, - Name = atom_to_binary(?MODULE), - #{ - <<"type">> => <<"mongodb_rs">>, - <<"name">> => Name, - <<"enable">> => true, - <<"collection">> => <<"mycol">>, - <<"servers">> => Servers, - <<"database">> => <<"mqtt">>, - <<"w_mode">> => <<"safe">>, - <<"replica_set_name">> => <<"rs0">> - }; -mongo_config(MongoHost0, MongoPort0, sharded) -> - MongoHost = list_to_binary(MongoHost0), - MongoPort = integer_to_binary(MongoPort0), - Servers = <>, - Name = atom_to_binary(?MODULE), - #{ - <<"type">> => <<"mongodb_sharded">>, - <<"name">> => Name, - <<"enable">> => true, - <<"collection">> => <<"mycol">>, - <<"servers">> => Servers, - <<"database">> => <<"mqtt">>, - <<"w_mode">> => <<"safe">> - }; -mongo_config(MongoHost0, MongoPort0, single) -> - MongoHost = list_to_binary(MongoHost0), - MongoPort = integer_to_binary(MongoPort0), - Server = <>, - Name = atom_to_binary(?MODULE), - #{ - <<"type">> => <<"mongodb_single">>, - <<"name">> => Name, - <<"enable">> => true, - <<"collection">> => <<"mycol">>, - <<"server">> => Server, - <<"database">> => <<"mqtt">>, - <<"w_mode">> => <<"safe">> - }. +ensure_loaded() -> + _ = application:load(emqx_ee_bridge), + _ = emqx_ee_bridge:module_info(), + ok. -create_bridge(Config0 = #{<<"type">> := Type, <<"name">> := Name}) -> - Config = maps:without( - [ - <<"type">>, - <<"name">> - ], - Config0 - ), - emqx_bridge:create(Type, Name, Config). +mongo_type_bin(rs) -> + <<"mongodb_rs">>; +mongo_type_bin(sharded) -> + <<"mongodb_sharded">>; +mongo_type_bin(single) -> + <<"mongodb_single">>. + +mongo_config(MongoHost, MongoPort0, rs = Type) -> + MongoPort = integer_to_list(MongoPort0), + Servers = MongoHost ++ ":" ++ MongoPort, + Name = atom_to_binary(?MODULE), + ConfigString = + io_lib:format( + "bridges.mongodb_rs.~s {\n" + " enable = true\n" + " collection = mycol\n" + " replica_set_name = rs0\n" + " servers = [~p]\n" + " w_mode = safe\n" + " database = mqtt\n" + "}", + [Name, Servers] + ), + {Name, parse_and_check(ConfigString, Type, Name)}; +mongo_config(MongoHost, MongoPort0, sharded = Type) -> + MongoPort = integer_to_list(MongoPort0), + Servers = MongoHost ++ ":" ++ MongoPort, + Name = atom_to_binary(?MODULE), + ConfigString = + io_lib:format( + "bridges.mongodb_sharded.~s {\n" + " enable = true\n" + " collection = mycol\n" + " servers = [~p]\n" + " w_mode = safe\n" + " database = mqtt\n" + "}", + [Name, Servers] + ), + {Name, parse_and_check(ConfigString, Type, Name)}; +mongo_config(MongoHost, MongoPort0, single = Type) -> + MongoPort = integer_to_list(MongoPort0), + Server = MongoHost ++ ":" ++ MongoPort, + Name = atom_to_binary(?MODULE), + ConfigString = + io_lib:format( + "bridges.mongodb_single.~s {\n" + " enable = true\n" + " collection = mycol\n" + " server = ~p\n" + " w_mode = safe\n" + " database = mqtt\n" + "}", + [Name, Server] + ), + {Name, parse_and_check(ConfigString, Type, Name)}. + +parse_and_check(ConfigString, Type, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + TypeBin = mongo_type_bin(Type), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf, + Config. + +create_bridge(Config) -> + Type = mongo_type_bin(?config(mongo_type, Config)), + Name = ?config(mongo_name, Config), + MongoConfig = ?config(mongo_config, Config), + emqx_bridge:create(Type, Name, MongoConfig). delete_bridge(Config) -> - #{ - <<"type">> := Type, - <<"name">> := Name - } = ?config(mongo_config, Config), + Type = mongo_type_bin(?config(mongo_type, Config)), + Name = ?config(mongo_name, Config), emqx_bridge:remove(Type, Name). create_bridge_http(Params) -> @@ -188,11 +210,9 @@ create_bridge_http(Params) -> end. clear_db(Config) -> - #{ - <<"name">> := Name, - <<"type">> := Type, - <<"collection">> := Collection - } = ?config(mongo_config, Config), + Type = mongo_type_bin(?config(mongo_type, Config)), + Name = ?config(mongo_name, Config), + #{<<"collection">> := Collection} = ?config(mongo_config, Config), ResourceID = emqx_bridge_resource:resource_id(Type, Name), {ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID), Selector = #{}, @@ -202,19 +222,15 @@ clear_db(Config) -> ok. find_all(Config) -> - #{ - <<"name">> := Name, - <<"type">> := Type, - <<"collection">> := Collection - } = ?config(mongo_config, Config), + Type = mongo_type_bin(?config(mongo_type, Config)), + Name = ?config(mongo_name, Config), + #{<<"collection">> := Collection} = ?config(mongo_config, Config), ResourceID = emqx_bridge_resource:resource_id(Type, Name), emqx_resource:query(ResourceID, {find, Collection, #{}, #{}}). send_message(Config, Payload) -> - #{ - <<"name">> := Name, - <<"type">> := Type - } = ?config(mongo_config, Config), + Name = ?config(mongo_name, Config), + Type = mongo_type_bin(?config(mongo_type, Config)), BridgeID = emqx_bridge_resource:bridge_id(Type, Name), emqx_bridge:send_message(BridgeID, Payload). @@ -223,10 +239,9 @@ send_message(Config, Payload) -> %%------------------------------------------------------------------------------ t_setup_via_config_and_publish(Config) -> - MongoConfig = ?config(mongo_config, Config), ?assertMatch( {ok, _}, - create_bridge(MongoConfig) + create_bridge(Config) ), Val = erlang:unique_integer(), ok = send_message(Config, #{key => Val}), @@ -237,7 +252,13 @@ t_setup_via_config_and_publish(Config) -> ok. t_setup_via_http_api_and_publish(Config) -> - MongoConfig = ?config(mongo_config, Config), + Type = mongo_type_bin(?config(mongo_type, Config)), + Name = ?config(mongo_name, Config), + MongoConfig0 = ?config(mongo_config, Config), + MongoConfig = MongoConfig0#{ + <<"name">> => Name, + <<"type">> => Type + }, ?assertMatch( {ok, _}, create_bridge_http(MongoConfig)