diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 9ec0d440c..edc8da113 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -620,8 +620,8 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> ?INTERNAL_ERROR(Reason) end. -lookup_from_local_node(BridgeType, BridgeName) -> - case emqx_bridge:lookup(BridgeType, BridgeName) of +lookup_from_local_node(ActionType, ActionName) -> + case emqx_bridge:lookup(ActionType, ActionName) of {ok, Res} -> {ok, format_resource(Res, node())}; Error -> Error end. @@ -895,25 +895,19 @@ aggregate_metrics( format_resource( #{ - type := Type, + type := ActionType, name := BridgeName, raw_config := RawConf, resource_data := ResourceData }, Node ) -> - RawConfFull = - case emqx_bridge_v2:is_bridge_v2_type(Type) of - true -> - %% The defaults are already filled in - RawConf; - false -> - fill_defaults(Type, RawConf) - end, + BridgeV1Type = downgrade_type(ActionType, emqx_bridge_lib:get_conf(ActionType, BridgeName)), + RawConfFull = fill_defaults(BridgeV1Type, RawConf), redact( maps:merge( RawConfFull#{ - type => downgrade_type(Type, emqx_bridge_lib:get_conf(Type, BridgeName)), + type => BridgeV1Type, name => maps:get(<<"name">>, RawConf, BridgeName), node => Node }, diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index 55e898ec4..92e295589 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -235,11 +235,10 @@ mongodb_structs() -> kafka_structs() -> [ - {kafka_producer, + {kafka, mk( hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer)), #{ - aliases => [kafka], desc => <<"Kafka Producer Bridge Config">>, required => false, converter => fun kafka_producer_converter/2 diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index ac2c3d704..6a1cb7fbc 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -37,7 +37,7 @@ ]). %% for testing only --export([enterprise_api_schemas/1]). +-export([enterprise_api_schemas/1, enterprise_fields_bridges/0]). %%====================================================================================== %% Hocon Schema Definitions @@ -175,11 +175,10 @@ roots() -> [{bridges, ?HOCON(?R_REF(bridges), #{importance => ?IMPORTANCE_LOW})} fields(bridges) -> [ - {http, + {webhook, mk( hoconsc:map(name, ref(emqx_bridge_http_schema, "config")), #{ - aliases => [webhook], desc => ?DESC("bridges_webhook"), required => false, converter => fun http_bridge_converter/2 @@ -198,7 +197,7 @@ fields(bridges) -> end } )} - ] ++ enterprise_fields_bridges(); + ] ++ ?MODULE:enterprise_fields_bridges(); fields("metrics") -> [ {"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})}, diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl index b267e9bf7..a7506ddad 100644 --- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl @@ -21,7 +21,7 @@ empty_config_test() -> Conf1 = #{<<"bridges">> => #{}}, Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}}, ?assertEqual(Conf1, check(Conf1)), - ?assertEqual(#{<<"bridges">> => #{<<"http">> => #{}}}, check(Conf2)), + ?assertEqual(#{<<"bridges">> => #{<<"webhook">> => #{}}}, check(Conf2)), ok. %% ensure webhook config can be checked @@ -33,7 +33,7 @@ webhook_config_test() -> ?assertMatch( #{ <<"bridges">> := #{ - <<"http">> := #{ + <<"webhook">> := #{ <<"the_name">> := #{ <<"method">> := get, @@ -48,7 +48,7 @@ webhook_config_test() -> ?assertMatch( #{ <<"bridges">> := #{ - <<"http">> := #{ + <<"webhook">> := #{ <<"the_name">> := #{ <<"method">> := get, @@ -61,7 +61,7 @@ webhook_config_test() -> ), #{ <<"bridges">> := #{ - <<"http">> := #{ + <<"webhook">> := #{ <<"the_name">> := #{ <<"method">> := get, diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl index 215013a6d..b268c127d 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -106,7 +106,9 @@ setup_mocks() -> emqx_bridge_v2_schema, registered_api_schemas, 1, - fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end + fun(Method) -> + [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_v2_" ++ Method)}] + end ), catch meck:new(emqx_bridge_schema, MeckOpts), @@ -114,7 +116,24 @@ setup_mocks() -> emqx_bridge_schema, enterprise_api_schemas, 1, - fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end + fun(Method) -> + [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_v1_" ++ Method)}] + end + ), + meck:expect( + emqx_bridge_schema, + enterprise_fields_bridges, + 0, + fun() -> + [ + { + bridge_type_bin(), + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, v1_bridge)), #{} + ) + } + ] + end ), ok. @@ -156,7 +175,7 @@ fields("connector") -> {on_start_fun, hoconsc:mk(binary(), #{})}, {ssl, hoconsc:ref(ssl)} ]; -fields("api_post") -> +fields("api_v2_post") -> [ {connector, hoconsc:mk(binary(), #{})}, {name, hoconsc:mk(binary(), #{})}, @@ -164,6 +183,20 @@ fields("api_post") -> {send_to, hoconsc:mk(atom(), #{})} | fields("connector") ]; +fields("api_v1_post") -> + ConnectorFields = proplists:delete(resource_opts, fields("connector")), + [ + {connector, hoconsc:mk(binary(), #{})}, + {name, hoconsc:mk(binary(), #{})}, + {type, hoconsc:mk(bridge_type(), #{})}, + {send_to, hoconsc:mk(atom(), #{})}, + {resource_opts, hoconsc:mk(hoconsc:ref(?MODULE, v1_resource_opts), #{})} + | ConnectorFields + ]; +fields(v1_bridge) -> + lists:foldl(fun proplists:delete/2, fields("api_v1_post"), [name, type]); +fields(v1_resource_opts) -> + emqx_resource_schema:create_opts(_Overrides = []); fields(ssl) -> emqx_schema:client_ssl_opts_schema(#{required => false}). @@ -333,9 +366,11 @@ get_connector_http(Name) -> create_bridge_http_api_v1(Opts) -> Name = maps:get(name, Opts), Overrides = maps:get(overrides, Opts, #{}), + OverrideFn = maps:get(override_fn, Opts, fun(X) -> X end), BridgeConfig0 = emqx_utils_maps:deep_merge(bridge_config(), Overrides), BridgeConfig = maps:without([<<"connector">>], BridgeConfig0), - Params = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name}, + Params0 = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name}, + Params = OverrideFn(Params0), Path = emqx_mgmt_api_test_util:api_path(["bridges"]), ct:pal("creating bridge (http v1): ~p", [Params]), Res = request(post, Path, Params), @@ -919,3 +954,29 @@ t_obfuscated_secrets_probe(_Config) -> ), ok. + +t_v1_api_fill_defaults(_Config) -> + %% Ensure only one sub-field is used, but we get back the defaults filled in. + BridgeName = ?FUNCTION_NAME, + OverrideFn = fun(Params) -> + ResourceOpts = #{<<"resume_interval">> => 100}, + maps:put(<<"resource_opts">>, ResourceOpts, Params) + end, + ?assertMatch( + {ok, + {{_, 201, _}, _, #{ + <<"resource_opts">> := + #{ + <<"resume_interval">> := _, + <<"query_mode">> := _, + <<"inflight_window">> := _, + <<"start_timeout">> := _, + <<"start_after_created">> := _, + <<"max_buffer_bytes">> := _, + <<"batch_size">> := _ + } + }}}, + create_bridge_http_api_v1(#{name => BridgeName, override_fn => OverrideFn}) + ), + + ok. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 64871bf6d..e889962c3 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -22,7 +22,7 @@ kafka_producer_test() -> #{ <<"bridges">> := #{ - <<"kafka_producer">> := + <<"kafka">> := #{ <<"myproducer">> := #{<<"kafka">> := #{}} @@ -35,7 +35,7 @@ kafka_producer_test() -> #{ <<"bridges">> := #{ - <<"kafka_producer">> := + <<"kafka">> := #{ <<"myproducer">> := #{<<"local_topic">> := _} @@ -48,7 +48,7 @@ kafka_producer_test() -> #{ <<"bridges">> := #{ - <<"kafka_producer">> := + <<"kafka">> := #{ <<"myproducer">> := #{ @@ -64,7 +64,7 @@ kafka_producer_test() -> #{ <<"bridges">> := #{ - <<"kafka_producer">> := + <<"kafka">> := #{ <<"myproducer">> := #{ @@ -166,7 +166,7 @@ message_key_dispatch_validations_test() -> ?assertThrow( {_, [ #{ - path := "bridges.kafka_producer.myproducer.kafka", + path := "bridges.kafka.myproducer.kafka", reason := "Message key cannot be empty when `key_dispatch` strategy is used" } ]}, @@ -175,7 +175,7 @@ message_key_dispatch_validations_test() -> ?assertThrow( {_, [ #{ - path := "bridges.kafka_producer.myproducer.kafka", + path := "bridges.kafka.myproducer.kafka", reason := "Message key cannot be empty when `key_dispatch` strategy is used" } ]}, diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl index 5bab0cb32..96b87bbdf 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl @@ -122,7 +122,9 @@ fields("get_cluster") -> method_fields(get, redis_cluster); %% old bridge v1 schema fields(Type) when - Type == redis_single orelse Type == redis_sentinel orelse Type == redis_cluster + Type == redis_single; + Type == redis_sentinel; + Type == redis_cluster -> redis_bridge_common_fields(Type) ++ connector_fields(Type); diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl index b6b2ba012..690faafac 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl @@ -29,14 +29,12 @@ connector_type_name() -> redis. schema_module() -> ?SCHEMA_MODULE. connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> - fix_v1_type( - maps:merge( - maps:without( - [<<"connector">>], - map_unindent(<<"parameters">>, ActionConfig) - ), - map_unindent(<<"parameters">>, ConnectorConfig) - ) + maps:merge( + maps:without( + [<<"connector">>], + map_unindent(<<"parameters">>, ActionConfig) + ), + map_unindent(<<"parameters">>, ConnectorConfig) ). bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> @@ -77,9 +75,6 @@ bridge_v1_type_name() -> bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"redis_type">> := Type}}, _}) -> v1_type(Type). -fix_v1_type(#{<<"redis_type">> := RedisType} = Conf) -> - Conf#{<<"type">> => v1_type(RedisType)}. - v1_type(<<"single">>) -> redis_single; v1_type(<<"sentinel">>) -> redis_sentinel; v1_type(<<"cluster">>) -> redis_cluster. diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index 4835e8127..91b39bcce 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -82,9 +82,13 @@ on_start(InstId, Config) -> end. on_stop(InstId, #{conn_st := RedisConnSt}) -> - emqx_redis:on_stop(InstId, RedisConnSt); + Res = emqx_redis:on_stop(InstId, RedisConnSt), + ?tp(redis_bridge_stopped, #{instance_id => InstId}), + Res; on_stop(InstId, undefined = _State) -> - emqx_redis:on_stop(InstId, undefined). + Res = emqx_redis:on_stop(InstId, undefined), + ?tp(redis_bridge_stopped, #{instance_id => InstId}), + Res. on_get_status(InstId, #{conn_st := RedisConnSt}) -> emqx_redis:on_get_status(InstId, RedisConnSt). diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl index 6a3f1005f..0fb043eda 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl @@ -50,7 +50,6 @@ fields("config_connector") -> #{required => true, desc => ?DESC(redis_parameters)} )} ] ++ - emqx_redis:redis_fields() ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts) ++ emqx_connector_schema_lib:ssl_fields(); fields(connector_resource_opts) -> diff --git a/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl b/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl index 67480371d..83743762c 100644 --- a/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl +++ b/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl @@ -59,7 +59,11 @@ all() -> [{group, transports}, {group, rest}]. suite() -> [{timetrap, {minutes, 20}}]. groups() -> - ResourceSpecificTCs = [t_create_delete_bridge], + ResourceSpecificTCs = [ + t_create_delete_bridge, + t_create_via_http, + t_start_stop + ], TCs = emqx_common_test_helpers:all(?MODULE) -- ResourceSpecificTCs, TypeGroups = [ {group, redis_single}, @@ -130,10 +134,13 @@ wait_for_ci_redis(Checks, Config) -> emqx_resource, emqx_connector, emqx_bridge, - emqx_rule_engine + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} ], #{work_dir => emqx_cth_suite:work_dir(Config)} ), + {ok, _Api} = emqx_common_test_http:create_default_app(), [ {apps, Apps}, {proxy_host, ProxyHost}, @@ -177,9 +184,8 @@ init_per_testcase(Testcase, Config0) -> IsBatch = (BatchMode =:= batch_on), BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS), BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig}, - BridgeType = list_to_atom(atom_to_list(RedisType) ++ "_producer"), [ - {bridge_type, BridgeType}, + {bridge_type, RedisType}, {bridge_config, BridgeConfig1}, {is_batch, IsBatch} | Config @@ -425,6 +431,14 @@ t_create_disconnected(Config) -> ), ok = emqx_bridge:remove(Type, Name). +t_create_via_http(Config) -> + ok = emqx_bridge_testlib:t_create_via_http(Config), + ok. + +t_start_stop(Config) -> + ok = emqx_bridge_testlib:t_start_stop(Config, redis_bridge_stopped), + ok. + %%------------------------------------------------------------------------------ %% Helper functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_redis/src/emqx_redis.erl b/apps/emqx_redis/src/emqx_redis.erl index 5435b3a9e..9c5d25805 100644 --- a/apps/emqx_redis/src/emqx_redis.erl +++ b/apps/emqx_redis/src/emqx_redis.erl @@ -62,25 +62,22 @@ roots() -> fields(redis_single) -> fields(redis_single_connector) ++ - redis_fields() ++ emqx_connector_schema_lib:ssl_fields(); fields(redis_single_connector) -> [ {server, server()}, redis_type(single) - ]; + ] ++ redis_fields(); fields(redis_cluster) -> fields(redis_cluster_connector) ++ - lists:keydelete(database, 1, redis_fields()) ++ emqx_connector_schema_lib:ssl_fields(); fields(redis_cluster_connector) -> [ {servers, servers()}, redis_type(cluster) - ]; + ] ++ lists:keydelete(database, 1, redis_fields()); fields(redis_sentinel) -> fields(redis_sentinel_connector) ++ - redis_fields() ++ emqx_connector_schema_lib:ssl_fields(); fields(redis_sentinel_connector) -> [ @@ -91,7 +88,7 @@ fields(redis_sentinel_connector) -> required => true, desc => ?DESC("sentinel_desc") }} - ]. + ] ++ redis_fields(). server() -> Meta = #{desc => ?DESC("server")}, diff --git a/apps/emqx_resource/test/emqx_resource_schema_tests.erl b/apps/emqx_resource/test/emqx_resource_schema_tests.erl index b6cda8e97..51575cfe7 100644 --- a/apps/emqx_resource/test/emqx_resource_schema_tests.erl +++ b/apps/emqx_resource/test/emqx_resource_schema_tests.erl @@ -80,7 +80,7 @@ worker_pool_size_test_() -> Conf = emqx_utils_maps:deep_put( [ <<"bridges">>, - <<"http">>, + <<"webhook">>, <<"simple">>, <<"resource_opts">>, <<"worker_pool_size">> @@ -88,7 +88,7 @@ worker_pool_size_test_() -> BaseConf, WorkerPoolSize ), - #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := CheckedConf}}} = check(Conf), + #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := CheckedConf}}} = check(Conf), #{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf, WPS end, @@ -117,7 +117,7 @@ worker_pool_size_test_() -> %%=========================================================================== parse_and_check_webhook_bridge(Hocon) -> - #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)), + #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)), Conf. parse(Hocon) ->