diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index b590f0cd4..90e62764a 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -51,11 +51,39 @@ post_request() -> api_schema(Method) -> Broker = [ - ref(Mod, Method) - || Mod <- [emqx_bridge_webhook_schema, emqx_bridge_mqtt_schema] + {Type, ref(Mod, Method)} + || {Type, Mod} <- [ + {<<"webhook">>, emqx_bridge_webhook_schema}, + {<<"mqtt">>, emqx_bridge_mqtt_schema} + ] ], EE = ee_api_schemas(Method), - hoconsc:union(Broker ++ EE). + hoconsc:union(bridge_api_union(Broker ++ EE)). + +bridge_api_union(Refs) -> + Index = maps:from_list(Refs), + fun + (all_union_members) -> + maps:values(Index); + ({value, V}) -> + case V of + #{<<"type">> := T} -> + case maps:get(T, Index, undefined) of + undefined -> + throw(#{ + field_name => type, + reason => <<"unknown bridge type">> + }); + Ref -> + [Ref] + end; + _ -> + throw(#{ + field_name => type, + reason => <<"unknown bridge type">> + }) + end + end. -if(?EMQX_RELEASE_EDITION == ee). ee_api_schemas(Method) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 6e2dbcbce..bff42dd98 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_bridge, [ {description, "EMQX Enterprise data bridges"}, - {vsn, "0.1.12"}, + {vsn, "0.1.13"}, {registered, []}, {applications, [ kernel, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 17ffe9b9b..804e7d814 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -14,33 +14,37 @@ api_schemas(Method) -> [ - ref(emqx_bridge_gcp_pubsub, Method), - ref(emqx_bridge_kafka, Method ++ "_consumer"), - ref(emqx_bridge_kafka, Method ++ "_producer"), - ref(emqx_bridge_cassandra, Method), - ref(emqx_ee_bridge_mysql, Method), - ref(emqx_bridge_pgsql, Method), - ref(emqx_ee_bridge_mongodb, Method ++ "_rs"), - ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"), - ref(emqx_ee_bridge_mongodb, Method ++ "_single"), - ref(emqx_ee_bridge_hstreamdb, Method), - ref(emqx_bridge_influxdb, Method ++ "_api_v1"), - ref(emqx_bridge_influxdb, Method ++ "_api_v2"), - ref(emqx_ee_bridge_redis, Method ++ "_single"), - ref(emqx_ee_bridge_redis, Method ++ "_sentinel"), - ref(emqx_ee_bridge_redis, Method ++ "_cluster"), - ref(emqx_bridge_timescale, Method), - ref(emqx_bridge_matrix, Method), - ref(emqx_bridge_tdengine, Method), - ref(emqx_ee_bridge_clickhouse, Method), - ref(emqx_bridge_dynamo, Method), - ref(emqx_bridge_rocketmq, Method), - ref(emqx_bridge_sqlserver, Method), - ref(emqx_bridge_opents, Method), - ref(emqx_bridge_pulsar, Method ++ "_producer"), - ref(emqx_bridge_oracle, Method), - ref(emqx_bridge_iotdb, Method), - ref(emqx_bridge_rabbitmq, Method) + %% We need to map the `type' field of a request (binary) to a + %% bridge schema module. + api_ref(emqx_bridge_gcp_pubsub, <<"gcp_pubsub">>, Method), + api_ref(emqx_bridge_kafka, <<"kafka_consumer">>, Method ++ "_consumer"), + %% TODO: rename this to `kafka_producer' after alias support is added + %% to hocon; keeping this as just `kafka' for backwards compatibility. + api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_producer"), + api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method), + api_ref(emqx_ee_bridge_mysql, <<"mysql">>, Method), + api_ref(emqx_bridge_pgsql, <<"pgsql">>, Method), + api_ref(emqx_ee_bridge_mongodb, <<"mongodb_rs">>, Method ++ "_rs"), + api_ref(emqx_ee_bridge_mongodb, <<"mongodb_sharded">>, Method ++ "_sharded"), + api_ref(emqx_ee_bridge_mongodb, <<"mongodb_single">>, Method ++ "_single"), + api_ref(emqx_ee_bridge_hstreamdb, <<"hstreamdb">>, Method), + api_ref(emqx_bridge_influxdb, <<"influxdb_api_v1">>, Method ++ "_api_v1"), + api_ref(emqx_bridge_influxdb, <<"influxdb_api_v2">>, Method ++ "_api_v2"), + api_ref(emqx_ee_bridge_redis, <<"redis_single">>, Method ++ "_single"), + api_ref(emqx_ee_bridge_redis, <<"redis_sentinel">>, Method ++ "_sentinel"), + api_ref(emqx_ee_bridge_redis, <<"redis_cluster">>, Method ++ "_cluster"), + api_ref(emqx_bridge_timescale, <<"timescale">>, Method), + api_ref(emqx_bridge_matrix, <<"matrix">>, Method), + api_ref(emqx_bridge_tdengine, <<"tdengine">>, Method), + api_ref(emqx_ee_bridge_clickhouse, <<"clickhouse">>, Method), + api_ref(emqx_bridge_dynamo, <<"dynamo">>, Method), + api_ref(emqx_bridge_rocketmq, <<"rocketmq">>, Method), + api_ref(emqx_bridge_sqlserver, <<"sqlserver">>, Method), + api_ref(emqx_bridge_opents, <<"opents">>, Method), + api_ref(emqx_bridge_pulsar, <<"pulsar_producer">>, Method ++ "_producer"), + api_ref(emqx_bridge_oracle, <<"oracle">>, Method), + api_ref(emqx_bridge_iotdb, <<"iotdb">>, Method), + api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method) ]. schema_modules() -> @@ -338,3 +342,6 @@ rabbitmq_structs() -> } )} ]. + +api_ref(Module, Type, Method) -> + {Type, ref(Module, Method)}.