diff --git a/apps/emqx_bridge/i18n/emqx_bridge_mqtt_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_mqtt_schema.conf index c0f549db3..ab8c97ce7 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_mqtt_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_mqtt_schema.conf @@ -1,16 +1,4 @@ emqx_bridge_mqtt_schema { - - desc_rec { - desc { - en: """Configuration for MQTT bridge.""" - zh: """MQTT Bridge 配置""" - } - label: { - en: "MQTT Bridge Configuration" - zh: "MQTT Bridge 配置" - } - } - desc_type { desc { en: """The bridge type.""" diff --git a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf index b1a8e56f9..704fd7bd7 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf @@ -11,24 +11,6 @@ emqx_bridge_schema { } } - desc_connector { - desc { - en: """ -The ID or the configs of the connector to be used for this bridge. Connector IDs must be of format: -{type}:{name}.
-In config files, you can find the corresponding config entry for a connector by such path: -'connectors.{type}.{name}'.
-""" - zh: """ -Bridge 使用的 Connector 的 ID 或者配置。Connector ID 的格式必须为:{type}:{name}.
-在配置文件中,您可以通过以下路径找到 Connector 的相应配置条目:'connector.{type}.{name}'。
""" - } - label: { - en: "Connector ID" - zh: "Connector ID" - } - } - desc_metrics { desc { en: """The metrics of the bridge""" @@ -85,7 +67,7 @@ Bridge 使用的 Connector 的 ID 或者配置。Connector ID 的格式必须为 } - bridges_name { + bridges_mqtt { desc { en: """MQTT bridges to/from another MQTT broker""" zh: """桥接到另一个 MQTT Broker 的 MQTT Bridge""" diff --git a/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf index fcc817bef..9e89b5f0c 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf @@ -11,17 +11,6 @@ emqx_bridge_webhook_schema { } } - config_direction { - desc { - en: """The direction of this bridge, MUST be 'egress'""" - zh: """Bridge 的方向, 必须是 egress""" - } - label: { - en: "Bridge Direction" - zh: "Bridge 方向" - } - } - config_url { desc { en: """ diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 354c4faee..109b1df86 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -37,8 +37,7 @@ create/3, disable_enable/3, remove/2, - list/0, - list_bridges_by_connector/1 + list/0 ]). -export([send_message/2]). @@ -48,6 +47,8 @@ %% exported for `emqx_telemetry' -export([get_basic_usage_info/0]). +-define(EGRESS_DIR_BRIDGES(T), T == webhook; T == mysql). + load() -> Bridges = emqx:get_config([bridges], #{}), lists:foreach( @@ -93,10 +94,10 @@ load_hook() -> load_hook(Bridges) -> lists:foreach( - fun({_Type, Bridge}) -> + fun({Type, Bridge}) -> lists:foreach( fun({_Name, BridgeConf}) -> - do_load_hook(BridgeConf) + do_load_hook(Type, BridgeConf) end, maps:to_list(Bridge) ) @@ -104,12 +105,13 @@ load_hook(Bridges) -> maps:to_list(Bridges) ). -do_load_hook(#{local_topic := _} = Conf) -> - case maps:get(direction, Conf, egress) of - egress -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE); - ingress -> ok - end; -do_load_hook(_Conf) -> +do_load_hook(Type, #{local_topic := _}) when ?EGRESS_DIR_BRIDGES(Type) -> + emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE); +do_load_hook(mqtt, #{egress := #{local := #{topic := _}}}) -> + emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE); +do_load_hook(kafka, #{producer := #{mqtt := #{topic := _}}}) -> + emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE); +do_load_hook(_Type, _Conf) -> ok. unload_hook() -> @@ -197,13 +199,6 @@ list() -> maps:to_list(emqx:get_raw_config([bridges], #{})) ). -list_bridges_by_connector(ConnectorId) -> - [ - B - || B = #{raw_config := #{<<"connector">> := Id}} <- list(), - ConnectorId =:= Id - ]. - lookup(Id) -> {Type, Name} = emqx_bridge_resource:parse_bridge_id(Id), lookup(Type, Name). @@ -303,13 +298,8 @@ get_matched_bridges(Topic) -> maps:fold( fun(BType, Conf, Acc0) -> maps:fold( - fun - %% Confs for MQTT, Kafka bridges have the `direction` flag - (_BName, #{direction := ingress}, Acc1) -> - Acc1; - (BName, #{direction := egress} = Egress, Acc1) -> - %% WebHook, MySQL bridges only have egress direction - get_matched_bridge_id(Egress, Topic, BType, BName, Acc1) + fun(BName, BConf, Acc1) -> + get_matched_bridge_id(BType, BConf, Topic, BName, Acc1) end, Acc0, Conf @@ -319,9 +309,18 @@ get_matched_bridges(Topic) -> Bridges ). -get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) -> +get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) -> Acc; -get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) -> +get_matched_bridge_id(BType, #{local_topic := Filter}, Topic, BName, Acc) when + ?EGRESS_DIR_BRIDGES(BType) +-> + do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc); +get_matched_bridge_id(mqtt, #{egress := #{local := #{topic := Filter}}}, Topic, BName, Acc) -> + do_get_matched_bridge_id(Topic, Filter, mqtt, BName, Acc); +get_matched_bridge_id(kafka, #{producer := #{mqtt := #{topic := Filter}}}, Topic, BName, Acc) -> + do_get_matched_bridge_id(Topic, Filter, kafka, BName, Acc). + +do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) -> case emqx_topic:match(Topic, Filter) of true -> [emqx_bridge_resource:bridge_id(BType, BName) | Acc]; false -> Acc diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index e48833f78..5fced2467 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -42,8 +42,6 @@ -export([lookup_from_local_node/2]). --define(CONN_TYPES, [mqtt]). - -define(TRY_PARSE_ID(ID, EXPR), try emqx_bridge_resource:parse_bridge_id(Id) of {BridgeType, BridgeName} -> @@ -146,7 +144,7 @@ param_path_id() -> #{ in => path, required => true, - example => <<"webhook:my_webhook">>, + example => <<"webhook:webhook_example">>, desc => ?DESC("desc_param_path_id") } )}. @@ -155,66 +153,45 @@ bridge_info_array_example(Method) -> [Config || #{value := Config} <- maps:values(bridge_info_examples(Method))]. bridge_info_examples(Method) -> - maps:merge(conn_bridge_examples(Method), #{ - <<"my_webhook">> => #{ - summary => <<"WebHook">>, - value => info_example(webhook, awesome, Method) - } - }). - -conn_bridge_examples(Method) -> - Fun = - fun(Type, Acc) -> - SType = atom_to_list(Type), - KeyIngress = bin(SType ++ "_ingress"), - KeyEgress = bin(SType ++ "_egress"), - maps:merge(Acc, #{ - KeyIngress => #{ - summary => bin(string:uppercase(SType) ++ " Ingress Bridge"), - value => info_example(Type, ingress, Method) - }, - KeyEgress => #{ - summary => bin(string:uppercase(SType) ++ " Egress Bridge"), - value => info_example(Type, egress, Method) - } - }) - end, - Broker = lists:foldl(Fun, #{}, ?CONN_TYPES), - EE = ee_conn_bridge_examples(Method), - maps:merge(Broker, EE). - --if(?EMQX_RELEASE_EDITION == ee). -ee_conn_bridge_examples(Method) -> - emqx_ee_bridge:conn_bridge_examples(Method). --else. -ee_conn_bridge_examples(_Method) -> - #{}. --endif. - -info_example(Type, Direction, Method) -> maps:merge( - info_example_basic(Type, Direction), - method_example(Type, Direction, Method) + #{ + <<"webhook_example">> => #{ + summary => <<"WebHook">>, + value => info_example(webhook, Method) + }, + <<"mqtt_example">> => #{ + summary => <<"MQTT Bridge">>, + value => info_example(mqtt, Method) + } + }, + ee_bridge_examples(Method) ). -method_example(Type, Direction, Method) when Method == get; Method == post -> +ee_bridge_examples(Method) -> + case erlang:function_exported(emqx_ee_bridge, examples, 1) of + true -> emqx_ee_bridge:examples(Method); + false -> #{} + end. + +info_example(Type, Method) -> + maps:merge( + info_example_basic(Type), + method_example(Type, Method) + ). + +method_example(Type, Method) when Method == get; Method == post -> SType = atom_to_list(Type), - SDir = atom_to_list(Direction), - SName = - case Type of - webhook -> "my_" ++ SType; - _ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge" - end, - TypeNameExamp = #{ + SName = SType ++ "_example", + TypeNameExam = #{ type => bin(SType), name => bin(SName) }, - maybe_with_metrics_example(TypeNameExamp, Method); -method_example(_Type, _Direction, put) -> + maybe_with_metrics_example(TypeNameExam, Method); +method_example(_Type, put) -> #{}. -maybe_with_metrics_example(TypeNameExamp, get) -> - TypeNameExamp#{ +maybe_with_metrics_example(TypeNameExam, get) -> + TypeNameExam#{ metrics => ?METRICS(0, 0, 0, 0, 0, 0), node_metrics => [ #{ @@ -223,10 +200,10 @@ maybe_with_metrics_example(TypeNameExamp, get) -> } ] }; -maybe_with_metrics_example(TypeNameExamp, _) -> - TypeNameExamp. +maybe_with_metrics_example(TypeNameExam, _) -> + TypeNameExam. -info_example_basic(webhook, _) -> +info_example_basic(webhook) -> #{ enable => true, url => <<"http://localhost:9901/messages/${topic}">>, @@ -241,28 +218,52 @@ info_example_basic(webhook, _) -> method => post, body => <<"${payload}">> }; -info_example_basic(mqtt, ingress) -> +info_example_basic(mqtt) -> + (mqtt_main_example())#{ + egress => mqtt_egress_example(), + ingress => mqtt_ingress_example() + }. + +mqtt_main_example() -> #{ enable => true, - connector => <<"mqtt:my_mqtt_connector">>, - direction => ingress, - remote_topic => <<"aws/#">>, - remote_qos => 1, - local_topic => <<"from_aws/${topic}">>, - local_qos => <<"${qos}">>, - payload => <<"${payload}">>, - retain => <<"${retain}">> - }; -info_example_basic(mqtt, egress) -> + mode => cluster_shareload, + server => <<"127.0.0.1:1883">>, + proto_ver => <<"v4">>, + username => <<"foo">>, + password => <<"bar">>, + clean_start => true, + keepalive => <<"300s">>, + retry_interval => <<"15s">>, + max_inflight => 100, + ssl => #{ + enable => false + } + }. +mqtt_egress_example() -> #{ - enable => true, - connector => <<"mqtt:my_mqtt_connector">>, - direction => egress, - local_topic => <<"emqx/#">>, - remote_topic => <<"from_emqx/${topic}">>, - remote_qos => <<"${qos}">>, - payload => <<"${payload}">>, - retain => false + local => #{ + topic => <<"emqx/#">> + }, + remote => #{ + topic => <<"from_emqx/${topic}">>, + qos => <<"${qos}">>, + payload => <<"${payload}">>, + retain => false + } + }. +mqtt_ingress_example() -> + #{ + remote => #{ + topic => <<"aws/#">>, + qos => 1 + }, + local => #{ + topic => <<"from_aws/${topic}">>, + qos => <<"${qos}">>, + payload => <<"${payload}">>, + retain => <<"${retain}">> + } }. schema("/bridges") -> diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index cac6ab1e6..46f5d6729 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -45,7 +45,6 @@ stop(_State) -> -if(?EMQX_RELEASE_EDITION == ee). start_ee_apps() -> {ok, _} = application:ensure_all_started(emqx_ee_bridge), - {ok, _} = application:ensure_all_started(emqx_ee_connector), ok. -else. start_ee_apps() -> diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index f7aeec30d..f0773a8ea 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -43,6 +43,9 @@ reset_metrics/1 ]). +%% bi-directional bridge with producer/consumer or ingress/egress configs +-define(IS_BI_DIR_BRIDGE(TYPE), TYPE == <<"mqtt">>; TYPE == <<"kafka">>). + -if(?EMQX_RELEASE_EDITION == ee). bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; bridge_to_resource_type(mqtt) -> emqx_connector_mqtt; @@ -102,7 +105,7 @@ create(Type, Name, Conf, Opts) -> resource_id(Type, Name), <<"emqx_bridge">>, bridge_to_resource_type(Type), - parse_confs(Type, Name, Conf), + parse_confs(bin(Type), Name, Conf), Opts ), maybe_disable_bridge(Type, Name, Conf). @@ -168,27 +171,21 @@ recreate(Type, Name, Conf, Opts) -> emqx_resource:recreate_local( resource_id(Type, Name), bridge_to_resource_type(Type), - parse_confs(Type, Name, Conf), + parse_confs(bin(Type), Name, Conf), Opts ). create_dry_run(Type, Conf) -> - Conf0 = fill_dry_run_conf(Conf), - case emqx_resource:check_config(bridge_to_resource_type(Type), Conf0) of - {ok, Conf1} -> - TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]), - case emqx_connector_ssl:convert_certs(TmpPath, Conf1) of - {error, Reason} -> - {error, Reason}; - {ok, ConfNew} -> - Res = emqx_resource:create_dry_run_local( - bridge_to_resource_type(Type), ConfNew - ), - _ = maybe_clear_certs(TmpPath, ConfNew), - Res - end; - {error, _} = Error -> - Error + TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]), + case emqx_connector_ssl:convert_certs(TmpPath, Conf) of + {error, Reason} -> + {error, Reason}; + {ok, ConfNew} -> + Res = emqx_resource:create_dry_run_local( + bridge_to_resource_type(Type), ConfNew + ), + _ = maybe_clear_certs(TmpPath, ConfNew), + Res end. remove(BridgeId) -> @@ -213,19 +210,6 @@ maybe_disable_bridge(Type, Name, Conf) -> true -> ok end. -fill_dry_run_conf(Conf) -> - Conf#{ - <<"egress">> => - #{ - <<"remote_topic">> => <<"t">>, - <<"remote_qos">> => 0, - <<"retain">> => true, - <<"payload">> => <<"val">> - }, - <<"ingress">> => - #{<<"remote_topic">> => <<"t">>} - }. - maybe_clear_certs(TmpPath, #{ssl := SslConf} = Conf) -> %% don't remove the cert files if they are in use case is_tmp_path_conf(TmpPath, SslConf) of @@ -245,8 +229,9 @@ is_tmp_path_conf(_TmpPath, _Conf) -> is_tmp_path(TmpPath, File) -> string:str(str(File), str(TmpPath)) > 0. +%% convert bridge configs to what the connector modules want parse_confs( - Type, + <<"webhook">>, _Name, #{ url := Url, @@ -256,7 +241,7 @@ parse_confs( request_timeout := ReqTimeout, max_retries := Retry } = Conf -) when Type == webhook orelse Type == <<"webhook">> -> +) -> {BaseUrl, Path} = parse_url(Url), {ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl), Conf#{ @@ -271,42 +256,14 @@ parse_confs( max_retries => Retry } }; -parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when - is_binary(ConnId) --> - case emqx_connector:parse_connector_id(ConnId) of - {Type, ConnName} -> - ConnectorConfs = emqx:get_config([connectors, Type, ConnName]), - make_resource_confs( - Direction, - ConnectorConfs, - maps:without([connector, direction], Conf), - Type, - Name - ); - {_ConnType, _ConnName} -> - error({cannot_use_connector_with_different_type, ConnId}) - end; -parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) when - is_map(ConnectorConfs) --> - make_resource_confs( - Direction, - ConnectorConfs, - maps:without([connector, direction], Conf), - Type, - Name - ). - -make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) -> +parse_confs(Type, Name, Conf) when ?IS_BI_DIR_BRIDGE(Type) -> + %% For some drivers that can be used as data-sources, we need to provide a + %% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it + %% receives a message from the external database. BName = bridge_id(Type, Name), - ConnectorConfs#{ - ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>} - }; -make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) -> - ConnectorConfs#{ - egress => BridgeConf - }. + Conf#{hookpoint => <<"$bridges/", BName/binary>>}; +parse_confs(_Type, _Name, Conf) -> + Conf. parse_url(Url) -> case string:split(Url, "//", leading) of diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl index 9fc06ec0e..1bf5565e9 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl @@ -3,43 +3,28 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --import(hoconsc, [mk/2]). +-import(hoconsc, [mk/2, ref/2]). --export([roots/0, fields/1, desc/1]). +-export([roots/0, fields/1, desc/1, namespace/0]). %%====================================================================================== %% Hocon Schema Definitions +namespace() -> "bridge_mqtt". + roots() -> []. - -fields("ingress") -> - [emqx_bridge_schema:direction_field(ingress, emqx_connector_mqtt_schema:ingress_desc())] ++ - emqx_bridge_schema:common_bridge_fields(mqtt_connector_ref()) ++ - proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress")); -fields("egress") -> - [emqx_bridge_schema:direction_field(egress, emqx_connector_mqtt_schema:egress_desc())] ++ - emqx_bridge_schema:common_bridge_fields(mqtt_connector_ref()) ++ - emqx_connector_mqtt_schema:fields("egress"); -fields("post_ingress") -> +fields("config") -> + emqx_bridge_schema:common_bridge_fields() ++ + emqx_connector_mqtt_schema:fields("config"); +fields("post") -> [ type_field(), name_field() - ] ++ proplists:delete(enable, fields("ingress")); -fields("post_egress") -> - [ - type_field(), - name_field() - ] ++ proplists:delete(enable, fields("egress")); -fields("put_ingress") -> - proplists:delete(enable, fields("ingress")); -fields("put_egress") -> - proplists:delete(enable, fields("egress")); -fields("get_ingress") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post_ingress"); -fields("get_egress") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post_egress"). + ] ++ emqx_connector_mqtt_schema:fields("config"); +fields("put") -> + emqx_connector_mqtt_schema:fields("config"); +fields("get") -> + emqx_bridge_schema:metrics_status_fields() ++ fields("config"). -desc(Rec) when Rec =:= "ingress"; Rec =:= "egress" -> - ?DESC("desc_rec"); desc(_) -> undefined. @@ -63,6 +48,3 @@ name_field() -> desc => ?DESC("desc_name") } )}. - -mqtt_connector_ref() -> - ?R_REF(emqx_connector_mqtt_schema, "connector"). diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index 4343dc223..aedfcaa03 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -14,16 +14,13 @@ ]). -export([ - common_bridge_fields/1, - metrics_status_fields/0, - direction_field/2 + common_bridge_fields/0, + metrics_status_fields/0 ]). %%====================================================================================== %% Hocon Schema Definitions --define(CONN_TYPES, [mqtt]). - %%====================================================================================== %% For HTTP APIs get_response() -> @@ -36,34 +33,26 @@ post_request() -> api_schema("post"). api_schema(Method) -> - Broker = - lists:flatmap( - fun(Type) -> - [ - ref(schema_mod(Type), Method ++ "_ingress"), - ref(schema_mod(Type), Method ++ "_egress") - ] - end, - ?CONN_TYPES - ) ++ [ref(Module, Method) || Module <- [emqx_bridge_webhook_schema]], + Broker = [ + ref(Mod, Method) + || Mod <- [emqx_bridge_webhook_schema, emqx_bridge_mqtt_schema] + ], EE = ee_api_schemas(Method), hoconsc:union(Broker ++ EE). --if(?EMQX_RELEASE_EDITION == ee). ee_api_schemas(Method) -> - emqx_ee_bridge:api_schemas(Method). + case erlang:function_exported(emqx_ee_bridge, api_schemas, 1) of + true -> emqx_ee_bridge:api_schemas(Method); + false -> [] + end. ee_fields_bridges() -> - emqx_ee_bridge:fields(bridges). --else. -ee_api_schemas(_) -> - []. + case erlang:function_exported(emqx_ee_bridge, fields, 1) of + true -> emqx_ee_bridge:fields(bridges); + false -> [] + end. -ee_fields_bridges() -> - []. --endif. - -common_bridge_fields(ConnectorRef) -> +common_bridge_fields() -> [ {enable, mk( @@ -72,15 +61,6 @@ common_bridge_fields(ConnectorRef) -> desc => ?DESC("desc_enable"), default => true } - )}, - {connector, - mk( - hoconsc:union([binary(), ConnectorRef]), - #{ - required => true, - example => <<"mqtt:my_mqtt_connector">>, - desc => ?DESC("desc_connector") - } )} ]. @@ -100,18 +80,6 @@ metrics_status_fields() -> )} ]. -direction_field(Dir, Desc) -> - {direction, - mk( - Dir, - #{ - required => true, - default => egress, - desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
" ++ - Desc - } - )}. - %%====================================================================================== %% For config files @@ -125,22 +93,13 @@ fields(bridges) -> mk( hoconsc:map(name, ref(emqx_bridge_webhook_schema, "config")), #{desc => ?DESC("bridges_webhook")} + )}, + {mqtt, + mk( + hoconsc:map(name, ref(emqx_bridge_mqtt_schema, "config")), + #{desc => ?DESC("bridges_mqtt")} )} - ] ++ - [ - {T, - mk( - hoconsc:map( - name, - hoconsc:union([ - ref(schema_mod(T), "ingress"), - ref(schema_mod(T), "egress") - ]) - ), - #{desc => ?DESC("bridges_name")} - )} - || T <- ?CONN_TYPES - ] ++ ee_fields_bridges(); + ] ++ ee_fields_bridges(); fields("metrics") -> [ {"matched", mk(integer(), #{desc => ?DESC("metric_matched")})}, @@ -181,6 +140,3 @@ status() -> node_name() -> {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. - -schema_mod(Type) -> - list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])). diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl index 7c19e9e55..0f692f195 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl @@ -50,14 +50,6 @@ basic_config() -> desc => ?DESC("config_enable"), default => true } - )}, - {direction, - mk( - egress, - #{ - desc => ?DESC("config_direction"), - default => egress - } )} ] ++ webhook_creation_opts() ++ proplists:delete( diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index ce54695a5..25aa82d76 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -165,7 +165,6 @@ gen_schema_json(Dir, I18nFile, SchemaModule) -> gen_api_schema_json(Dir, I18nFile, Lang) -> emqx_dashboard:init_i18n(I18nFile, Lang), gen_api_schema_json_hotconf(Dir, Lang), - gen_api_schema_json_connector(Dir, Lang), gen_api_schema_json_bridge(Dir, Lang), emqx_dashboard:clear_i18n(). @@ -174,11 +173,6 @@ gen_api_schema_json_hotconf(Dir, Lang) -> File = schema_filename(Dir, "hot-config-schema-", Lang), ok = do_gen_api_schema_json(File, emqx_mgmt_api_configs, SchemaInfo). -gen_api_schema_json_connector(Dir, Lang) -> - SchemaInfo = #{title => <<"EMQX Connector API Schema">>, version => <<"0.1.0">>}, - File = schema_filename(Dir, "connector-api-", Lang), - ok = do_gen_api_schema_json(File, emqx_connector_api, SchemaInfo). - gen_api_schema_json_bridge(Dir, Lang) -> SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => <<"0.1.0">>}, File = schema_filename(Dir, "bridge-api-", Lang), diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 1b24a0a38..726416568 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -60,7 +60,6 @@ emqx_exhook_schema, emqx_psk_schema, emqx_limiter_schema, - emqx_connector_schema, emqx_slow_subs_schema ]). diff --git a/apps/emqx_connector/i18n/emqx_connector_mqtt.conf b/apps/emqx_connector/i18n/emqx_connector_mqtt.conf index 1005d68dc..5ade54670 100644 --- a/apps/emqx_connector/i18n/emqx_connector_mqtt.conf +++ b/apps/emqx_connector/i18n/emqx_connector_mqtt.conf @@ -1,5 +1,4 @@ emqx_connector_mqtt { - num_of_bridges { desc { en: "The current number of bridges that are using this connector." diff --git a/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf b/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf index c9f227383..6dfd0ef27 100644 --- a/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf +++ b/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf @@ -1,4 +1,85 @@ emqx_connector_mqtt_schema { + ingress_desc { + desc { + en: """The ingress config defines how this bridge receive messages from the remote MQTT broker, and then + send them to the local broker.
+ Template with variables is allowed in 'remote.qos', 'local.topic', 'local.qos', 'local.retain', 'local.payload'.
+ NOTE: if this bridge is used as the input of a rule, and also 'local.topic' is + configured, then messages got from the remote broker will be sent to both the 'local.topic' and + the rule.""" + zh: """入口配置定义了该桥接如何从远程 MQTT Broker 接收消息,然后将消息发送到本地 Broker。
+ 以下字段中允许使用带有变量的模板:'remote.qos', 'local.topic', 'local.qos', 'local.retain', 'local.payload'。
+ 注意:如果此桥接被用作规则的输入,并且配置了 'local.topic',则从远程代理获取的消息将同时被发送到 'local.topic' 和规则。 + """ + } + label: { + en: "Ingress Configs" + zh: "入方向配置" + } + } + + egress_desc { + desc { + en: """The egress config defines how this bridge forwards messages from the local broker to the remote broker.
+Template with variables is allowed in 'remote.topic', 'local.qos', 'local.retain', 'local.payload'.
+NOTE: if this bridge is used as the action of a rule, and also 'local.topic' +is configured, then both the data got from the rule and the MQTT messages that matches +'local.topic' will be forwarded.""" + zh: """出口配置定义了该桥接如何将消息从本地 Broker 转发到远程 Broker。 +以下字段中允许使用带有变量的模板:'remote.topic', 'local.qos', 'local.retain', 'local.payload'。
+注意:如果此桥接被用作规则的动作,并且配置了 'local.topic',则从规则输出的数据以及匹配到 'local.topic' 的 MQTT 消息都会被转发。 + """ + } + label: { + en: "Egress Configs" + zh: "出方向配置" + } + } + + ingress_remote { + desc { + en: """The configs about subscribing to the remote broker.""" + zh: """订阅远程 Broker 相关的配置。""" + } + label: { + en: "Remote Configs" + zh: "远程配置" + } + } + + ingress_local { + desc { + en: """The configs about sending message to the local broker.""" + zh: """发送消息到本地 Broker 相关的配置。""" + } + label: { + en: "Local Configs" + zh: "本地配置" + } + } + + egress_remote { + desc { + en: """The configs about sending message to the remote broker.""" + zh: """发送消息到远程 Broker 相关的配置。""" + } + label: { + en: "Remote Configs" + zh: "远程配置" + } + } + + egress_local { + desc { + en: """The configs about receiving messages from ben.""" + zh: """收取本地 Broker 消息相关的配置。""" + } + label: { + en: "Local Configs" + zh: "本地配置" + } + } + mode { desc { en: """ @@ -9,7 +90,7 @@ In 'cluster_shareload' mode, the incoming load from the remote broker is shared using shared subscription.
Note that the 'clientid' is suffixed by the node name, this is to avoid clientid conflicts between different nodes. And we can only use shared subscription -topic filters for 'remote_topic' of ingress connections. +topic filters for 'remote.topic' of ingress connections. """ zh: """ MQTT 桥的模式。
@@ -17,7 +98,7 @@ MQTT 桥的模式。
- cluster_shareload:在 emqx 集群的每个节点上创建一个 MQTT 连接。
在“cluster_shareload”模式下,来自远程代理的传入负载通过共享订阅的方式接收。
请注意,“clientid”以节点名称为后缀,这是为了避免不同节点之间的clientid冲突。 -而且对于入口连接的“remote_topic”,我们只能使用共享订阅主题过滤器。 +而且对于入口连接的“remote.topic”,我们只能使用共享订阅主题过滤器。 """ } label: { @@ -166,17 +247,6 @@ Template with variables is allowed. } } - ingress_hookpoint { - desc { - en: "The hook point will be triggered when there's any message received from the remote broker." - zh: "当从远程borker收到任何消息时,将触发钩子。" - } - label: { - en: "Hookpoint" - zh: "挂载点" - } - } - egress_local_topic { desc { en: "The local topic to be forwarded to the remote broker" @@ -222,59 +292,6 @@ Template with variables is allowed. } } - dir { - desc { - en: """ -The dir where the replayq file saved.
-Set to 'false' disables the replayq feature. -""" - zh: """ -replayq 文件保存的目录。
-设置为 'false' 会禁用 replayq 功能。 -""" - } - label: { - en: "Replyq file Save Dir" - zh: "Replyq 文件保存目录" - } - } - - seg_bytes { - desc { - en: """ -The size in bytes of a single segment.
-A segment is mapping to a file in the replayq dir. If the current segment is full, a new segment -(file) will be opened to write. -""" - zh: """ -单个段的大小(以字节为单位)。
-一个段映射到 replayq 目录中的一个文件。 如果当前段已满,则新段(文件)将被打开写入。 -""" - } - label: { - en: "Segment Size" - zh: "Segment 大小" - } - } - - offload { - desc { - en: """ -In offload mode, the disk queue is only used to offload queue tail segments.
-The messages are cached in the memory first, then it writes to the replayq files after the size of -the memory cache reaches 'seg_bytes'. -""" - zh: """ -在Offload模式下,磁盘队列仅用于卸载队列尾段。
-消息首先缓存在内存中,然后写入replayq文件。内存缓大小为“seg_bytes” 指定的值。 -""" - } - label: { - en: "Offload Mode" - zh: "Offload 模式" - } - } - retain { desc { en: """ @@ -309,66 +326,15 @@ Template with variables is allowed. } } - desc_connector { + server_configs { desc { - en: """Generic configuration for the connector.""" - zh: """连接器的通用配置。""" + en: """Configs related to the server.""" + zh: """服务器相关的配置。""" } label: { - en: "Connector Generic Configuration" - zh: "连接器通用配置。" + en: "Server Configs" + zh: "服务配置。" } } - desc_ingress { - desc { - en: """ -The ingress config defines how this bridge receive messages from the remote MQTT broker, and then send them to the local broker.
-Template with variables is allowed in 'local_topic', 'remote_qos', 'qos', 'retain', 'payload'.
-NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also local_topic is configured, then messages got from the remote broker will be sent to both the 'local_topic' and the rule. -""" - zh: """ -Ingress 模式定义了这个 bridge 如何从远程 MQTT broker 接收消息,然后将它们发送到本地 broker 。
-允许带有的模板变量: 'local_topic'、'remote_qos'、'qos'、'retain'、'payload' 。
-注意:如果这个 bridge 被用作规则的输入(emqx 规则引擎),并且还配置了 local_topic,那么从远程 broker 获取的消息将同时被发送到 'local_topic' 和规则引擎。 -""" - } - label: { - en: "Ingress Config" - zh: "Ingress 模式配置" - } - } - - desc_egress { - desc { - en: """ -The egress config defines how this bridge forwards messages from the local broker to the remote broker.
-Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.
-NOTE: if this bridge is used as the action of a rule (emqx rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that matches local_topic will be forwarded. -""" - zh: """ -Egress 模式定义了 bridge 如何将消息从本地 broker 转发到远程 broker。
-允许带有的模板变量: 'remote_topic'、'qos'、'retain'、'payload' 。
-注意:如果这个 bridge 作为规则(emqx 规则引擎)的输出,并且还配置了 local_topic,那么从规则引擎中获取的数据和匹配 local_topic 的 MQTT 消息都会被转发到远程 broker 。 -""" - } - label: { - en: "Egress Config" - zh: "Egress 模式配置" - } - } - - desc_replayq { - desc { - en: """Queue messages in disk files.""" - zh: """本地磁盘消息队列""" - } - label: { - en: "Replayq" - zh: "本地磁盘消息队列" - } - } - - - } diff --git a/apps/emqx_connector/i18n/emqx_connector_schema.conf b/apps/emqx_connector/i18n/emqx_connector_schema.conf deleted file mode 100644 index 1f6fd5381..000000000 --- a/apps/emqx_connector/i18n/emqx_connector_schema.conf +++ /dev/null @@ -1,31 +0,0 @@ -emqx_connector_schema { - - mqtt { - desc { - en: "MQTT bridges." - zh: "MQTT bridges。" - } - label: { - en: "MQTT bridges" - zh: "MQTT bridges" - } - } - - desc_connector { - desc { - en: """ -Configuration for EMQX connectors.
-A connector maintains the data related to the external resources, such as MySQL database. -""" - zh: """ -EMQX 连接器的配置。
-连接器维护与外部资源相关的数据,比如 MySQL 数据库。 -""" - } - label: { - en: "Connector" - zh: "连接器" - } - } - -} diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl deleted file mode 100644 index d85959698..000000000 --- a/apps/emqx_connector/src/emqx_connector.erl +++ /dev/null @@ -1,165 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_connector). - --export([ - config_key_path/0, - pre_config_update/3, - post_config_update/5 -]). - --export([ - parse_connector_id/1, - connector_id/2 -]). - --export([ - list_raw/0, - lookup_raw/1, - lookup_raw/2, - create_dry_run/2, - update/2, - update/3, - delete/1, - delete/2 -]). - -config_key_path() -> - [connectors]. - -pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> - emqx_connector_ssl:convert_certs(filename:join(Path), Conf). - --dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]). -post_config_update([connectors, Type, Name] = Path, '$remove', _, OldConf, _AppEnvs) -> - ConnId = connector_id(Type, Name), - try - foreach_linked_bridges(ConnId, fun(#{type := BType, name := BName}) -> - throw({dependency_bridges_exist, emqx_bridge_resource:bridge_id(BType, BName)}) - end), - _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf) - catch - throw:Error -> {error, Error} - end; -post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> - ConnId = connector_id(Type, Name), - foreach_linked_bridges( - ConnId, - fun(#{type := BType, name := BName}) -> - BridgeConf = emqx:get_config([bridges, BType, BName]), - case - emqx_bridge_resource:update( - BType, - BName, - {BridgeConf#{connector => OldConf}, BridgeConf#{connector => NewConf}} - ) - of - ok -> ok; - {error, Reason} -> error({update_bridge_error, Reason}) - end - end - ). - -connector_id(Type0, Name0) -> - Type = bin(Type0), - Name = bin(Name0), - <>. - -parse_connector_id(ConnectorId) -> - case string:split(bin(ConnectorId), ":", all) of - [Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)}; - _ -> error({invalid_connector_id, ConnectorId}) - end. - -list_raw() -> - case get_raw_connector_conf() of - not_found -> - []; - Config -> - lists:foldl( - fun({Type, NameAndConf}, Connectors) -> - lists:foldl( - fun({Name, RawConf}, Acc) -> - [RawConf#{<<"type">> => Type, <<"name">> => Name} | Acc] - end, - Connectors, - maps:to_list(NameAndConf) - ) - end, - [], - maps:to_list(Config) - ) - end. - -lookup_raw(Id) when is_binary(Id) -> - {Type, Name} = parse_connector_id(Id), - lookup_raw(Type, Name). - -lookup_raw(Type, Name) -> - Path = [bin(P) || P <- [Type, Name]], - case get_raw_connector_conf() of - not_found -> - {error, not_found}; - Conf -> - case emqx_map_lib:deep_get(Path, Conf, not_found) of - not_found -> {error, not_found}; - Conf1 -> {ok, Conf1#{<<"type">> => Type, <<"name">> => Name}} - end - end. - --spec create_dry_run(module(), binary() | #{binary() => term()} | [#{binary() => term()}]) -> - ok | {error, Reason :: term()}. -create_dry_run(Type, Conf) -> - emqx_bridge_resource:create_dry_run(Type, Conf). - -update(Id, Conf) when is_binary(Id) -> - {Type, Name} = parse_connector_id(Id), - update(Type, Name, Conf). - -update(Type, Name, Conf) -> - emqx_conf:update(config_key_path() ++ [Type, Name], Conf, #{override_to => cluster}). - -delete(Id) when is_binary(Id) -> - {Type, Name} = parse_connector_id(Id), - delete(Type, Name). - -delete(Type, Name) -> - emqx_conf:remove(config_key_path() ++ [Type, Name], #{override_to => cluster}). - -get_raw_connector_conf() -> - case emqx:get_raw_config(config_key_path(), not_found) of - not_found -> - not_found; - RawConf -> - #{<<"connectors">> := Conf} = - emqx_config:fill_defaults(#{<<"connectors">> => RawConf}), - Conf - end. - -bin(Bin) when is_binary(Bin) -> Bin; -bin(Str) when is_list(Str) -> list_to_binary(Str); -bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). - -foreach_linked_bridges(ConnId, Do) -> - lists:foreach( - fun - (#{raw_config := #{<<"connector">> := ConnId0}} = Bridge) when ConnId0 == ConnId -> - Do(Bridge); - (_) -> - ok - end, - emqx_bridge:list() - ). diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl deleted file mode 100644 index 8dcd3a4aa..000000000 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ /dev/null @@ -1,339 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_connector_api). - --behaviour(minirest_api). - --include("emqx_connector.hrl"). - --include_lib("typerefl/include/types.hrl"). --include_lib("hocon/include/hoconsc.hrl"). - --import(hoconsc, [mk/2, ref/2, array/1, enum/1]). - -%% Swagger specs from hocon schema --export([api_spec/0, paths/0, schema/1, namespace/0]). - -%% API callbacks --export(['/connectors_test'/2, '/connectors'/2, '/connectors/:id'/2]). - --define(CONN_TYPES, [mqtt]). - --define(TRY_PARSE_ID(ID, EXPR), - try emqx_connector:parse_connector_id(Id) of - {ConnType, ConnName} -> - _ = ConnName, - EXPR - catch - error:{invalid_connector_id, Id0} -> - {400, #{ - code => 'INVALID_ID', - message => - <<"invalid_connector_id: ", Id0/binary, - ". Connector Ids must be of format {type}:{name}">> - }} - end -). - -namespace() -> "connector". - -api_spec() -> - emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}). - -paths() -> ["/connectors_test", "/connectors", "/connectors/:id"]. - -error_schema(Codes, Message) when is_list(Message) -> - error_schema(Codes, list_to_binary(Message)); -error_schema(Codes, Message) when is_binary(Message) -> - emqx_dashboard_swagger:error_codes(Codes, Message). - -put_request_body_schema() -> - emqx_dashboard_swagger:schema_with_examples( - emqx_connector_schema:put_request(), connector_info_examples(put) - ). - -post_request_body_schema() -> - emqx_dashboard_swagger:schema_with_examples( - emqx_connector_schema:post_request(), connector_info_examples(post) - ). - -get_response_body_schema() -> - emqx_dashboard_swagger:schema_with_examples( - emqx_connector_schema:get_response(), connector_info_examples(get) - ). - -connector_info_array_example(Method) -> - [Config || #{value := Config} <- maps:values(connector_info_examples(Method))]. - -connector_info_examples(Method) -> - Fun = - fun(Type, Acc) -> - SType = atom_to_list(Type), - maps:merge(Acc, #{ - Type => #{ - summary => bin(string:uppercase(SType) ++ " Connector"), - value => info_example(Type, Method) - } - }) - end, - Broker = lists:foldl(Fun, #{}, ?CONN_TYPES), - EE = ee_example(Method), - maps:merge(Broker, EE). - --if(?EMQX_RELEASE_EDITION == ee). -ee_example(Method) -> - emqx_ee_connector:connector_examples(Method). --else. -ee_example(_Method) -> - #{}. --endif. - -info_example(Type, Method) -> - maps:merge( - info_example_basic(Type), - method_example(Type, Method) - ). - -method_example(Type, Method) when Method == get; Method == post -> - SType = atom_to_list(Type), - SName = "my_" ++ SType ++ "_connector", - #{ - type => bin(SType), - name => bin(SName) - }; -method_example(_Type, put) -> - #{}. - -info_example_basic(mqtt) -> - #{ - mode => cluster_shareload, - server => <<"127.0.0.1:1883">>, - reconnect_interval => <<"15s">>, - proto_ver => <<"v4">>, - username => <<"foo">>, - password => <<"bar">>, - clientid => <<"foo">>, - clean_start => true, - keepalive => <<"300s">>, - retry_interval => <<"15s">>, - max_inflight => 100, - ssl => #{ - enable => false - } - }. - -param_path_id() -> - [ - {id, - mk( - binary(), - #{ - in => path, - example => <<"mqtt:my_mqtt_connector">>, - desc => ?DESC("id") - } - )} - ]. - -schema("/connectors_test") -> - #{ - 'operationId' => '/connectors_test', - post => #{ - tags => [<<"connectors">>], - desc => ?DESC("conn_test_post"), - summary => <<"Test creating connector">>, - 'requestBody' => post_request_body_schema(), - responses => #{ - 204 => <<"Test connector OK">>, - 400 => error_schema(['TEST_FAILED'], "connector test failed") - } - } - }; -schema("/connectors") -> - #{ - 'operationId' => '/connectors', - get => #{ - tags => [<<"connectors">>], - desc => ?DESC("conn_get"), - summary => <<"List connectors">>, - responses => #{ - 200 => emqx_dashboard_swagger:schema_with_example( - array(emqx_connector_schema:get_response()), - connector_info_array_example(get) - ) - } - }, - post => #{ - tags => [<<"connectors">>], - desc => ?DESC("conn_post"), - summary => <<"Create connector">>, - 'requestBody' => post_request_body_schema(), - responses => #{ - 201 => get_response_body_schema(), - 400 => error_schema(['ALREADY_EXISTS'], "connector already exists") - } - } - }; -schema("/connectors/:id") -> - #{ - 'operationId' => '/connectors/:id', - get => #{ - tags => [<<"connectors">>], - desc => ?DESC("conn_id_get"), - summary => <<"Get connector">>, - parameters => param_path_id(), - responses => #{ - 200 => get_response_body_schema(), - 404 => error_schema(['NOT_FOUND'], "Connector not found"), - 400 => error_schema(['INVALID_ID'], "Bad connector ID") - } - }, - put => #{ - tags => [<<"connectors">>], - desc => ?DESC("conn_id_put"), - summary => <<"Update connector">>, - parameters => param_path_id(), - 'requestBody' => put_request_body_schema(), - responses => #{ - 200 => get_response_body_schema(), - 404 => error_schema(['NOT_FOUND'], "Connector not found"), - 400 => error_schema(['INVALID_ID'], "Bad connector ID") - } - }, - delete => #{ - tags => [<<"connectors">>], - desc => ?DESC("conn_id_delete"), - summary => <<"Delete connector">>, - parameters => param_path_id(), - responses => #{ - 204 => <<"Delete connector successfully">>, - 403 => error_schema(['DEPENDENCY_EXISTS'], "Cannot remove dependent connector"), - 404 => error_schema(['NOT_FOUND'], "Delete failed, not found"), - 400 => error_schema(['INVALID_ID'], "Bad connector ID") - } - } - }. - -'/connectors_test'(post, #{body := #{<<"type">> := ConnType} = Params}) -> - case emqx_connector:create_dry_run(ConnType, maps:remove(<<"type">>, Params)) of - ok -> - {204}; - {error, Error} -> - {400, error_msg(['TEST_FAILED'], Error)} - end. - -'/connectors'(get, _Request) -> - {200, [format_resp(Conn) || Conn <- emqx_connector:list_raw()]}; -'/connectors'(post, #{body := #{<<"type">> := ConnType, <<"name">> := ConnName} = Params}) -> - case emqx_connector:lookup_raw(ConnType, ConnName) of - {ok, _} -> - {400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)}; - {error, not_found} -> - case - emqx_connector:update( - ConnType, - ConnName, - filter_out_request_body(Params) - ) - of - {ok, #{raw_config := RawConf}} -> - {201, - format_resp(RawConf#{ - <<"type">> => ConnType, - <<"name">> => ConnName - })}; - {error, Error} -> - {400, error_msg('BAD_REQUEST', Error)} - end - end; -'/connectors'(post, _) -> - {400, error_msg('BAD_REQUEST', <<"missing some required fields: [name, type]">>)}. - -'/connectors/:id'(get, #{bindings := #{id := Id}}) -> - ?TRY_PARSE_ID( - Id, - case emqx_connector:lookup_raw(ConnType, ConnName) of - {ok, Conf} -> - {200, format_resp(Conf)}; - {error, not_found} -> - {404, error_msg('NOT_FOUND', <<"connector not found">>)} - end - ); -'/connectors/:id'(put, #{bindings := #{id := Id}, body := Params0}) -> - Params = filter_out_request_body(Params0), - ?TRY_PARSE_ID( - Id, - case emqx_connector:lookup_raw(ConnType, ConnName) of - {ok, _} -> - case emqx_connector:update(ConnType, ConnName, Params) of - {ok, #{raw_config := RawConf}} -> - {200, - format_resp(RawConf#{ - <<"type">> => ConnType, - <<"name">> => ConnName - })}; - {error, Error} -> - {500, error_msg('INTERNAL_ERROR', Error)} - end; - {error, not_found} -> - {404, error_msg('NOT_FOUND', <<"connector not found">>)} - end - ); -'/connectors/:id'(delete, #{bindings := #{id := Id}}) -> - ?TRY_PARSE_ID( - Id, - case emqx_connector:lookup_raw(ConnType, ConnName) of - {ok, _} -> - case emqx_connector:delete(ConnType, ConnName) of - {ok, _} -> - {204}; - {error, {post_config_update, _, {dependency_bridges_exist, BridgeID}}} -> - {403, - error_msg( - 'DEPENDENCY_EXISTS', - <<"Cannot remove the connector as it's in use by a bridge: ", - BridgeID/binary>> - )}; - {error, Error} -> - {500, error_msg('INTERNAL_ERROR', Error)} - end; - {error, not_found} -> - {404, error_msg('NOT_FOUND', <<"connector not found">>)} - end - ). - -error_msg(Code, Msg) -> - #{code => Code, message => emqx_misc:readable_error_msg(Msg)}. - -format_resp(#{<<"type">> := ConnType, <<"name">> := ConnName} = RawConf) -> - NumOfBridges = length( - emqx_bridge:list_bridges_by_connector( - emqx_connector:connector_id(ConnType, ConnName) - ) - ), - RawConf#{ - <<"type">> => ConnType, - <<"name">> => ConnName, - <<"num_of_bridges">> => NumOfBridges - }. - -filter_out_request_body(Conf) -> - ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>, <<"name">>], - maps:without(ExtraConfs, Conf). - -bin(S) when is_list(S) -> - list_to_binary(S). diff --git a/apps/emqx_connector/src/emqx_connector_app.erl b/apps/emqx_connector/src/emqx_connector_app.erl index b6f5b8623..62167dc18 100644 --- a/apps/emqx_connector/src/emqx_connector_app.erl +++ b/apps/emqx_connector/src/emqx_connector_app.erl @@ -20,15 +20,10 @@ -export([start/2, stop/1]). --define(CONF_HDLR_PATH, (emqx_connector:config_key_path() ++ ['?', '?'])). - start(_StartType, _StartArgs) -> - ok = emqx_config_handler:add_handler(?CONF_HDLR_PATH, emqx_connector), - emqx_connector_mqtt_worker:register_metrics(), emqx_connector_sup:start_link(). stop(_State) -> - emqx_config_handler:remove_handler(?CONF_HDLR_PATH), ok. %% internal functions diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index e37f6a9a2..b57a94b36 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -67,7 +67,7 @@ fields("get") -> )} ] ++ fields("post"); fields("put") -> - emqx_connector_mqtt_schema:fields("connector"); + emqx_connector_mqtt_schema:fields("server_configs"); fields("post") -> [ {type, @@ -236,11 +236,9 @@ basic_config(#{ keepalive := KeepAlive, retry_interval := RetryIntv, max_inflight := MaxInflight, - replayq := ReplayQ, ssl := #{enable := EnableSsl} = Ssl }) -> #{ - replayq => ReplayQ, %% connection opts server => Server, %% 30s diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl deleted file mode 100644 index f0c9479de..000000000 --- a/apps/emqx_connector/src/emqx_connector_schema.erl +++ /dev/null @@ -1,95 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_connector_schema). - --behaviour(hocon_schema). - --include_lib("typerefl/include/types.hrl"). --include_lib("hocon/include/hoconsc.hrl"). - --export([namespace/0, roots/0, fields/1, desc/1]). - --export([ - get_response/0, - put_request/0, - post_request/0 -]). - -%% the config for webhook bridges do not need connectors --define(CONN_TYPES, [mqtt]). - -%%====================================================================================== -%% For HTTP APIs - -get_response() -> - http_schema("get"). - -put_request() -> - http_schema("put"). - -post_request() -> - http_schema("post"). - -http_schema(Method) -> - Broker = [?R_REF(schema_mod(Type), Method) || Type <- ?CONN_TYPES], - EE = ee_schemas(Method), - Schemas = Broker ++ EE, - ?UNION(Schemas). - -%%====================================================================================== -%% Hocon Schema Definitions - -namespace() -> connector. - -roots() -> ["connectors"]. - -fields(connectors) -> - fields("connectors"); -fields("connectors") -> - Broker = [ - {mqtt, - ?HOCON( - ?MAP(name, ?R_REF(emqx_connector_mqtt_schema, "connector")), - #{desc => ?DESC("mqtt")} - )} - ], - EE = ee_fields_connectors(), - Broker ++ EE. - --if(?EMQX_RELEASE_EDITION == ee). -ee_schemas(Method) -> - emqx_ee_connector:api_schemas(Method). - -ee_fields_connectors() -> - emqx_ee_connector:fields(connectors). --else. -ee_fields_connectors() -> - []. - -ee_schemas(_) -> - []. --endif. - -desc(Record) when - Record =:= connectors; - Record =:= "connectors" --> - ?DESC("desc_connector"); -desc(_) -> - undefined. - -schema_mod(Type) -> - list_to_atom(lists:concat(["emqx_connector_", Type])). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index d0251104b..372405b59 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -207,7 +207,7 @@ make_hdlr(Parent, Vars, Opts) -> sub_remote_topics(_ClientPid, undefined) -> ok; -sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) -> +sub_remote_topics(ClientPid, #{remote := #{topic := FromTopic, qos := QoS}}) -> case emqtt:subscribe(ClientPid, FromTopic, QoS) of {ok, _, _} -> ok; Error -> throw(Error) @@ -217,12 +217,10 @@ process_config(Config) -> maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config). maybe_publish_to_local_broker(Msg, Vars, Props) -> - case maps:get(local_topic, Vars, undefined) of - undefined -> - %% local topic is not set, discard it - ok; - _ -> - _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props)) + case emqx_map_lib:deep_get([local, topic], Vars, undefined) of + %% local topic is not set, discard it + undefined -> ok; + _ -> emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props)) end. format_msg_received( diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 43700506b..7198521f2 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -38,14 +38,16 @@ -type msg() :: emqx_types:message(). -type exp_msg() :: emqx_types:message() | #mqtt_msg{}. - --type variables() :: #{ - mountpoint := undefined | binary(), - remote_topic := binary(), - remote_qos := original | integer(), +-type remote_config() :: #{ + topic := binary(), + qos := original | integer(), retain := original | boolean(), payload := binary() }. +-type variables() :: #{ + mountpoint := undefined | binary(), + remote := remote_config() +}. make_pub_vars(_, undefined) -> undefined; @@ -67,10 +69,12 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> MapMsg = maps:put(retain, Retain0, Columns), to_remote_msg(MapMsg, Vars); to_remote_msg(MapMsg, #{ - remote_topic := TopicToken, - payload := PayloadToken, - remote_qos := QoSToken, - retain := RetainToken, + remote := #{ + topic := TopicToken, + payload := PayloadToken, + qos := QoSToken, + retain := RetainToken + }, mountpoint := Mountpoint }) when is_map(MapMsg) -> Topic = replace_vars_in_str(TopicToken, MapMsg), @@ -91,11 +95,13 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> to_broker_msg( #{dup := Dup} = MapMsg, #{ - local_topic := TopicToken, - payload := PayloadToken, - local_qos := QoSToken, - retain := RetainToken, - mountpoint := Mountpoint + local := #{ + topic := TopicToken, + payload := PayloadToken, + qos := QoSToken, + retain := RetainToken, + mountpoint := Mountpoint + } }, Props ) -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 4d35583a3..040e5f392 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -28,25 +28,33 @@ desc/1 ]). --export([ - ingress_desc/0, - egress_desc/0 -]). - -import(emqx_schema, [mk_duration/2]). +-import(hoconsc, [mk/2, ref/2]). + namespace() -> "connector-mqtt". roots() -> fields("config"). fields("config") -> - fields("connector") ++ - topic_mappings(); -fields("connector") -> + fields("server_configs") ++ + [ + {ingress, + mk( + ref(?MODULE, "ingress"), + #{default => #{}} + )}, + {egress, + mk( + ref(?MODULE, "egress"), + #{default => #{}} + )} + ]; +fields("server_configs") -> [ {mode, - sc( + mk( hoconsc:enum([cluster_shareload]), #{ default => cluster_shareload, @@ -54,7 +62,7 @@ fields("connector") -> } )}, {server, - sc( + mk( emqx_schema:ip_port(), #{ required => true, @@ -68,7 +76,7 @@ fields("connector") -> #{default => "15s"} )}, {proto_ver, - sc( + mk( hoconsc:enum([v3, v4, v5]), #{ default => v4, @@ -76,7 +84,7 @@ fields("connector") -> } )}, {bridge_mode, - sc( + mk( boolean(), #{ default => false, @@ -84,7 +92,7 @@ fields("connector") -> } )}, {username, - sc( + mk( binary(), #{ default => "emqx", @@ -92,7 +100,7 @@ fields("connector") -> } )}, {password, - sc( + mk( binary(), #{ default => "emqx", @@ -101,7 +109,7 @@ fields("connector") -> } )}, {clean_start, - sc( + mk( boolean(), #{ default => true, @@ -116,20 +124,31 @@ fields("connector") -> #{default => "15s"} )}, {max_inflight, - sc( + mk( non_neg_integer(), #{ default => 32, desc => ?DESC("max_inflight") } - )}, - {replayq, sc(ref("replayq"), #{})} + )} ] ++ emqx_connector_schema_lib:ssl_fields(); fields("ingress") -> - %% the message maybe subscribed by rules, in this case 'local_topic' is not necessary [ - {remote_topic, - sc( + {"remote", + mk( + ref(?MODULE, "ingress_remote"), + #{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_remote")} + )}, + {"local", + mk( + ref(?MODULE, "ingress_local"), + #{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local")} + )} + ]; +fields("ingress_remote") -> + [ + {topic, + mk( binary(), #{ required => true, @@ -137,47 +156,43 @@ fields("ingress") -> desc => ?DESC("ingress_remote_topic") } )}, - {remote_qos, - sc( + {qos, + mk( qos(), #{ default => 1, desc => ?DESC("ingress_remote_qos") } - )}, - {local_topic, - sc( + )} + ]; +fields("ingress_local") -> + [ + {topic, + mk( binary(), #{ validator => fun emqx_schema:non_empty_string/1, desc => ?DESC("ingress_local_topic") } )}, - {local_qos, - sc( + {qos, + mk( qos(), #{ default => <<"${qos}">>, desc => ?DESC("ingress_local_qos") } )}, - {hookpoint, - sc( - binary(), - #{desc => ?DESC("ingress_hookpoint")} - )}, - {retain, - sc( + mk( hoconsc:union([boolean(), binary()]), #{ default => <<"${retain}">>, desc => ?DESC("retain") } )}, - {payload, - sc( + mk( binary(), #{ default => <<"${payload}">>, @@ -186,18 +201,33 @@ fields("ingress") -> )} ]; fields("egress") -> - %% the message maybe sent from rules, in this case 'local_topic' is not necessary [ - {local_topic, - sc( + {"local", + mk( + ref(?MODULE, "egress_local"), + #{desc => ?DESC(emqx_connector_mqtt_schema, "egress_local")} + )}, + {"remote", + mk( + ref(?MODULE, "egress_remote"), + #{desc => ?DESC(emqx_connector_mqtt_schema, "egress_remote")} + )} + ]; +fields("egress_local") -> + [ + {topic, + mk( binary(), #{ desc => ?DESC("egress_local_topic"), validator => fun emqx_schema:non_empty_string/1 } - )}, - {remote_topic, - sc( + )} + ]; +fields("egress_remote") -> + [ + {topic, + mk( binary(), #{ required => true, @@ -205,104 +235,48 @@ fields("egress") -> desc => ?DESC("egress_remote_topic") } )}, - {remote_qos, - sc( + {qos, + mk( qos(), #{ required => true, desc => ?DESC("egress_remote_qos") } )}, - {retain, - sc( + mk( hoconsc:union([boolean(), binary()]), #{ required => true, desc => ?DESC("retain") } )}, - {payload, - sc( + mk( binary(), #{ required => true, desc => ?DESC("payload") } )} - ]; -fields("replayq") -> - [ - {dir, - sc( - hoconsc:union([boolean(), string()]), - #{desc => ?DESC("dir")} - )}, - {seg_bytes, - sc( - emqx_schema:bytesize(), - #{ - default => "100MB", - desc => ?DESC("seg_bytes") - } - )}, - {offload, - sc( - boolean(), - #{ - default => false, - desc => ?DESC("offload") - } - )} ]. -desc("connector") -> - ?DESC("desc_connector"); +desc("server_configs") -> + ?DESC("server_configs"); desc("ingress") -> - ingress_desc(); + ?DESC("ingress_desc"); +desc("ingress_remote") -> + ?DESC("ingress_remote"); +desc("ingress_local") -> + ?DESC("ingress_local"); desc("egress") -> - egress_desc(); -desc("replayq") -> - ?DESC("desc_replayq"); + ?DESC("egress_desc"); +desc("egress_remote") -> + ?DESC("egress_remote"); +desc("egress_local") -> + ?DESC("egress_local"); desc(_) -> undefined. -topic_mappings() -> - [ - {ingress, - sc( - ref("ingress"), - #{default => #{}} - )}, - {egress, - sc( - ref("egress"), - #{default => #{}} - )} - ]. - -ingress_desc() -> - "\n" - "The ingress config defines how this bridge receive messages from the remote MQTT broker, and then\n" - "send them to the local broker.
\n" - "Template with variables is allowed in 'local_topic', 'remote_qos', 'qos', 'retain',\n" - "'payload'.
\n" - "NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also local_topic is\n" - "configured, then messages got from the remote broker will be sent to both the 'local_topic' and\n" - "the rule.\n". - -egress_desc() -> - "\n" - "The egress config defines how this bridge forwards messages from the local broker to the remote\n" - "broker.
\n" - "Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.
\n" - "NOTE: if this bridge is used as the action of a rule (emqx rule engine), and also local_topic\n" - "is configured, then both the data got from the rule and the MQTT messages that matches\n" - "local_topic will be forwarded.\n". - qos() -> hoconsc:union([emqx_schema:qos(), binary()]). - -sc(Type, Meta) -> hoconsc:mk(Type, Meta). -ref(Field) -> hoconsc:ref(?MODULE, Field). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index db795a4cf..af5f5d3e7 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -68,7 +68,6 @@ %% APIs -export([ start_link/1, - register_metrics/0, stop/1 ]). @@ -247,18 +246,19 @@ pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) pre_process_in_out(_, undefined) -> undefined; +pre_process_in_out(in, #{local := LC} = Conf) when is_map(Conf) -> + Conf#{local => pre_process_in_out_common(LC)}; pre_process_in_out(in, Conf) when is_map(Conf) -> - Conf1 = pre_process_conf(local_topic, Conf), - Conf2 = pre_process_conf(local_qos, Conf1), - pre_process_in_out_common(Conf2); -pre_process_in_out(out, Conf) when is_map(Conf) -> - Conf1 = pre_process_conf(remote_topic, Conf), - Conf2 = pre_process_conf(remote_qos, Conf1), - pre_process_in_out_common(Conf2). + %% have no 'local' field in the config + Conf; +pre_process_in_out(out, #{remote := RC} = Conf) when is_map(Conf) -> + Conf#{remote => pre_process_in_out_common(RC)}. -pre_process_in_out_common(Conf) -> - Conf1 = pre_process_conf(payload, Conf), - pre_process_conf(retain, Conf1). +pre_process_in_out_common(Conf0) -> + Conf1 = pre_process_conf(topic, Conf0), + Conf2 = pre_process_conf(qos, Conf1), + Conf3 = pre_process_conf(payload, Conf2), + pre_process_conf(retain, Conf3). pre_process_conf(Key, Conf) -> case maps:find(Key, Conf) of @@ -450,7 +450,6 @@ do_send( ) -> Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), ExportMsg = fun(Message) -> - emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'), emqx_connector_mqtt_msg:to_remote_msg(Message, Vars) end, ?SLOG(debug, #{ @@ -551,15 +550,6 @@ format_mountpoint(Prefix) -> name(Id) -> list_to_atom(str(Id)). -register_metrics() -> - lists:foreach( - fun emqx_metrics:ensure/1, - [ - 'bridge.mqtt.message_sent_to_remote', - 'bridge.mqtt.message_received_from_remote' - ] - ). - obfuscate(Map) -> maps:fold( fun(K, V, Acc) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index ebc06d211..ad1dfaee8 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -7,7 +7,7 @@ -export([ api_schemas/1, - conn_bridge_examples/1, + examples/1, resource_type/1, fields/1 ]). @@ -28,7 +28,7 @@ schema_modules() -> emqx_ee_bridge_mysql ]. -conn_bridge_examples(Method) -> +examples(Method) -> MergeFun = fun(Example, Examples) -> maps:merge(Examples, Example) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl deleted file mode 100644 index 6846ea740..000000000 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl +++ /dev/null @@ -1,57 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- --module(emqx_ee_connector). - --import(hoconsc, [mk/2, enum/1, ref/2]). - --export([ - fields/1, - connector_examples/1, - api_schemas/1 -]). - -api_schemas(Method) -> - [ - ref(emqx_ee_connector_hstreamdb, Method), - ref(emqx_ee_connector_influxdb, "udp_" ++ Method), - ref(emqx_ee_connector_influxdb, "api_v1_" ++ Method), - ref(emqx_ee_connector_influxdb, "api_v2_" ++ Method) - ]. - -fields(connectors) -> - [ - {hstreamdb, - mk( - hoconsc:map(name, ref(emqx_ee_connector_hstreamdb, config)), - #{desc => <<"EMQX Enterprise Config">>} - )} - ] ++ fields(influxdb); -fields(influxdb) -> - [ - { - Protocol, - mk(hoconsc:map(name, ref(emqx_ee_connector_influxdb, Protocol)), #{ - desc => <<"EMQX Enterprise Config">> - }) - } - || Protocol <- [influxdb_udp, influxdb_api_v1, influxdb_api_v2] - ]. - -connector_examples(Method) -> - MergeFun = - fun(Example, Examples) -> - maps:merge(Examples, Example) - end, - Fun = - fun(Module, Examples) -> - ConnectorExamples = erlang:apply(Module, connector_examples, [Method]), - lists:foldl(MergeFun, Examples, ConnectorExamples) - end, - lists:foldl(Fun, #{}, schema_modules()). - -schema_modules() -> - [ - emqx_ee_connector_hstreamdb, - emqx_ee_connector_influxdb - ].