From a4992ef1b52b722b424bae8fe61aa8e22baf0686 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 28 Jul 2022 16:47:30 +0800 Subject: [PATCH 1/2] fix: hstreamdb connector conf & api --- apps/emqx_bridge/src/emqx_bridge_resource.erl | 4 +- .../emqx_connector/src/emqx_connector_api.erl | 16 +++- .../src/emqx_connector_schema.erl | 24 +++++- .../i18n/emqx_ee_bridge_hstream.conf | 10 +++ lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 6 +- .../src/emqx_ee_bridge_hstream.erl | 40 +++++----- .../i18n/emqx_ee_connector_hstream.conf | 21 +++++ .../src/emqx_ee_connector.erl | 32 ++++++++ .../src/emqx_ee_connector_hstream.erl | 79 ++++++++++++++----- 9 files changed, 179 insertions(+), 53 deletions(-) create mode 100644 lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index cbd337970..6bb30e6b6 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -287,9 +287,7 @@ parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = maps:without([connector, direction], Conf), Type, Name - ); -parse_confs(_Type, _Name, Conf) -> - Conf. + ). make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) -> BName = bridge_id(Type, Name), diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 18409fafd..8dcd3a4aa 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -80,7 +80,7 @@ connector_info_array_example(Method) -> [Config || #{value := Config} <- maps:values(connector_info_examples(Method))]. connector_info_examples(Method) -> - lists:foldl( + Fun = fun(Type, Acc) -> SType = atom_to_list(Type), maps:merge(Acc, #{ @@ -90,9 +90,17 @@ connector_info_examples(Method) -> } }) end, - #{}, - ?CONN_TYPES - ). + Broker = lists:foldl(Fun, #{}, ?CONN_TYPES), + EE = ee_example(Method), + maps:merge(Broker, EE). + +-if(?EMQX_RELEASE_EDITION == ee). +ee_example(Method) -> + emqx_ee_connector:connector_examples(Method). +-else. +ee_example(_Method) -> + #{}. +-endif. info_example(Type, Method) -> maps:merge( diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl index b0f20924f..65d51443b 100644 --- a/apps/emqx_connector/src/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/emqx_connector_schema.erl @@ -44,7 +44,9 @@ post_request() -> http_schema("post"). http_schema(Method) -> - Schemas = [?R_REF(schema_mod(Type), Method) || Type <- ?CONN_TYPES], + Broker = [?R_REF(schema_mod(Type), Method) || Type <- ?CONN_TYPES], + EE = [?R_REF(Module, Method) || Module <- schema_modules()], + Schemas = Broker ++ EE, ?UNION(Schemas). %%====================================================================================== @@ -57,13 +59,29 @@ roots() -> ["connectors"]. fields(connectors) -> fields("connectors"); fields("connectors") -> - [ + Broker = [ {mqtt, ?HOCON( ?MAP(name, ?R_REF(emqx_connector_mqtt_schema, "connector")), #{desc => ?DESC("mqtt")} )} - ]. + ], + EE = ee_fields_connectors(), + Broker ++ EE. + +-if(?EMQX_RELEASE_EDITION == ee). +schema_modules() -> + emqx_ee_connector:schema_modules(). + +ee_fields_connectors() -> + emqx_ee_connector:fields(connectors). +-else. +ee_fields_connectors() -> + []. + +schema_modules() -> + []. +-endif. desc(Record) when Record =:= connectors; diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf index a110dbf27..20b189293 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf @@ -81,4 +81,14 @@ will be forwarded. zh: "Bridge 名字" } } + desc_connector { + desc { + en: """Generic configuration for the connector.""" + zh: """连接器的通用配置。""" + } + label: { + en: "Connector Generic Configuration" + zh: "连接器通用配置。" + } + } } diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 7f051a158..0ed8ee1fe 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -23,12 +23,12 @@ conn_bridge_examples(Method) -> end, lists:foldl(Fun, #{}, schema_modules()). -resource_type(hstream) -> emqx_ee_connector_hstream; -resource_type(<<"hstream">>) -> emqx_ee_connector_hstream. +resource_type(hstreamdb) -> emqx_ee_connector_hstream; +resource_type(<<"hstreamdb">>) -> emqx_ee_connector_hstream. fields(bridges) -> [ - {hstream, + {hstreamdb, mk( hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")), #{desc => <<"EMQX Enterprise Config">>} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl index 56bac456e..73f7a20eb 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl @@ -7,7 +7,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -include("emqx_ee_bridge.hrl"). --import(hoconsc, [mk/2, enum/1]). +-import(hoconsc, [mk/2, enum/1, ref/2]). -export([ conn_bridge_example/1 @@ -25,7 +25,7 @@ conn_bridge_example(Method) -> #{ - <<"hstream">> => #{ + <<"hstreamdb">> => #{ summary => <<"HStreamDB Bridge">>, value => values(Method) } @@ -35,12 +35,9 @@ values(get) -> maps:merge(values(post), ?METRICS_EXAMPLE); values(post) -> #{ - type => hstream, + type => hstreamdb, name => <<"demo">>, - url => <<"http://127.0.0.1:6570">>, - stream => <<"stream1">>, - ordering_key => <<"some_key">>, - pool_size => 8, + connector => <<"hstreamdb:connector">>, enable => true, direction => egress, local_topic => <<"local/topic/#">>, @@ -56,11 +53,13 @@ namespace() -> "bridge". roots() -> []. fields("config") -> - ExtConfig = [ + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, - {payload, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("payload")})} - ], - basic_config() ++ ExtConfig; + {payload, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("payload")})}, + {connector, field(connector)} + ]; fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") -> @@ -68,6 +67,16 @@ fields("put") -> fields("get") -> emqx_bridge_schema:metrics_status_fields() ++ fields("post"). +field(connector) -> + mk( + hoconsc:union([binary(), ref(emqx_ee_connector_hstream, config)]), + #{ + required => true, + example => <<"hstreamdb:demo">>, + desc => ?DESC("desc_connector") + } + ). + desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> @@ -75,17 +84,10 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(_) -> undefined. -basic_config() -> - Basic = [ - {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})} - ], - emqx_ee_connector_hstream:fields(config) ++ Basic. - %% ------------------------------------------------------------------------------------------------- %% internal type_field() -> - {type, mk(enum([hstream]), #{required => true, desc => ?DESC("desc_type")})}. + {type, mk(enum([hstreamdb]), #{required => true, desc => ?DESC("desc_type")})}. name_field() -> {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf index fce00baed..af9eda92a 100644 --- a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf @@ -1,5 +1,26 @@ emqx_ee_connector_hstream { + type { + desc { + en: "The Connector Type." + zh: "连接器类型。" + } + label: { + en: "Connector Type" + zh: "连接器类型" + } + } + + name { + desc { + en: "Connector name, used as a human-readable description of the connector." + zh: "连接器名称,人类可读的连接器描述。" + } + label: { + en: "Connector Name" + zh: "连接器名称" + } + } url { desc { en: """HStream Server URL""" diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl new file mode 100644 index 000000000..7d175f0eb --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl @@ -0,0 +1,32 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_connector). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + schema_modules/0, + fields/1, + connector_examples/1 +]). + +schema_modules() -> + [emqx_ee_connector_hstream]. + +fields(connectors) -> + [ + {hstreamdb, + mk( + hoconsc:map(name, ref(emqx_ee_connector_hstream, config)), + #{desc => <<"EMQX Enterprise Config">>} + )} + ]. + +connector_examples(Method) -> + Fun = + fun(Module, Examples) -> + Example = erlang:apply(Module, connector_example, [Method]), + maps:merge(Examples, Example) + end, + lists:foldl(Fun, #{}, schema_modules()). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl index fa8439851..00c7e6f61 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl @@ -25,7 +25,8 @@ -export([ roots/0, - fields/1 + fields/1, + connector_example/1 ]). %% ------------------------------------------------------------------------------------------------- @@ -38,7 +39,7 @@ on_stop(InstId, #{client := Client, producer := Producer}) -> StopClientRes = hstreamdb:stop_client(Client), StopProducerRes = hstreamdb:stop_producer(Producer), ?SLOG(info, #{ - msg => "stop hstream connector", + msg => "stop hstreamdb connector", connector => InstId, client => Client, producer => Producer, @@ -64,7 +65,7 @@ on_get_status(_InstId, #{client := Client}) -> end. %% ------------------------------------------------------------------------------------------------- -%% hstream batch callback +%% hstreamdb batch callback %% TODO: maybe remove it after disk cache is ready on_flush_result({{flush, _Stream, _Records}, {ok, _Resp}}) -> @@ -82,9 +83,41 @@ fields(config) -> [ {url, mk(binary(), #{required => true, desc => ?DESC("url")})}, {stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})}, - {ordering_key, mk(binary(), #{required => true, desc => ?DESC("ordering_key")})}, + {ordering_key, mk(binary(), #{required => false, desc => ?DESC("ordering_key")})}, {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})} - ]. + ]; +fields("get") -> + fields("post"); +fields("put") -> + fields(config); +fields("post") -> + [ + {type, mk(hstreamdb, #{required => true, desc => ?DESC("type")})}, + {name, mk(binary(), #{required => true, desc => ?DESC("name")})} + ] ++ fields("put"). + +connector_example(Method) -> + #{ + <<"hstreamdb">> => #{ + summary => <<"HStreamDB Connector">>, + value => values(Method) + } + }. + +values(post) -> + maps:merge(values(put), #{name => <<"connector">>}); +values(get) -> + values(post); +values(put) -> + #{ + type => hstreamdb, + url => <<"http://127.0.0.1:6570">>, + stream => <<"stream1">>, + ordering_key => <<"some_key">>, + pool_size => 8 + }; +values(_) -> + #{}. %% ------------------------------------------------------------------------------------------------- %% internal functions @@ -94,7 +127,7 @@ start_client(InstId, Config) -> catch E:R:S -> ?SLOG(error, #{ - msg => "start hstream connector error", + msg => "start hstreamdb connector error", connector => InstId, error => E, reason => R, @@ -104,7 +137,7 @@ start_client(InstId, Config) -> do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> ?SLOG(info, #{ - msg => "starting hstream connector: client", + msg => "starting hstreamdb connector: client", connector => InstId, config => Config }), @@ -118,21 +151,21 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> case is_alive(Client) of true -> ?SLOG(info, #{ - msg => "hstream connector: client started", + msg => "hstreamdb connector: client started", connector => InstId, client => Client }), start_producer(InstId, Client, Config); _ -> ?SLOG(error, #{ - msg => "hstream connector: client not alive", + msg => "hstreamdb connector: client not alive", connector => InstId }), {error, connect_failed} end; {error, {already_started, Pid}} -> ?SLOG(info, #{ - msg => "starting hstream connector: client, find old client. restart client", + msg => "starting hstreamdb connector: client, find old client. restart client", old_client_pid => Pid, old_client_name => ClientName }), @@ -140,7 +173,7 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> start_client(InstId, Config); {error, Error} -> ?SLOG(error, #{ - msg => "hstream connector: client failed", + msg => "hstreamdb connector: client failed", connector => InstId, reason => Error }), @@ -155,7 +188,11 @@ is_alive(Client) -> false end. -start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSize}) -> +start_producer( + InstId, + Client, + Options = #{stream := Stream, pool_size := PoolSize, egress := #{payload := PayloadBin}} +) -> %% TODO: change these batch options after we have better disk cache. BatchSize = maps:get(batch_size, Options, 100), Interval = maps:get(batch_interval, Options, 1000), @@ -168,16 +205,15 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi ], Name = produce_name(InstId), ?SLOG(info, #{ - msg => "starting hstream connector: producer", + msg => "starting hstreamdb connector: producer", connector => InstId }), case hstreamdb:start_producer(Client, Name, ProducerOptions) of {ok, Producer} -> ?SLOG(info, #{ - msg => "hstream connector: producer started" + msg => "hstreamdb connector: producer started" }), EnableBatch = maps:get(enable_batch, Options, false), - PayloadBin = maps:get(payload, Options, <<"">>), Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin), OrderingKeyBin = maps:get(ordering_key, Options, <<"">>), OrderingKey = emqx_plugin_libs_rule:preproc_tmpl(OrderingKeyBin), @@ -191,7 +227,8 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi {ok, State}; {error, {already_started, Pid}} -> ?SLOG(info, #{ - msg => "starting hstream connector: producer, find old producer. restart producer", + msg => + "starting hstreamdb connector: producer, find old producer. restart producer", old_producer_pid => Pid, old_producer_name => Name }), @@ -199,7 +236,7 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi start_producer(InstId, Client, Options); {error, Reason} -> ?SLOG(error, #{ - msg => "starting hstream connector: producer, failed", + msg => "starting hstreamdb connector: producer, failed", reason => Reason }), {error, Reason} @@ -223,13 +260,13 @@ do_append(AfterQuery, Producer, Record) -> % case hstreamdb:append(Producer, Record) of % ok -> % ?SLOG(debug, #{ -% msg => "hstream producer async append success", +% msg => "hstreamdb producer async append success", % record => Record % }), % emqx_resource:query_success(AfterQuery); % {error, Reason} -> % ?SLOG(error, #{ -% msg => "hstream producer async append failed", +% msg => "hstreamdb producer async append failed", % reason => Reason, % record => Record % }), @@ -241,13 +278,13 @@ do_append(AfterQuery, false, Producer, Record) -> case hstreamdb:append_flush(Producer, Record) of {ok, _} -> ?SLOG(debug, #{ - msg => "hstream producer sync append success", + msg => "hstreamdb producer sync append success", record => Record }), emqx_resource:query_success(AfterQuery); {error, Reason} -> ?SLOG(error, #{ - msg => "hstream producer sync append failed", + msg => "hstreamdb producer sync append failed", reason => Reason, record => Record }), From b7c245c5b0074b607456dc839503c69d8d2ca2cf Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 28 Jul 2022 16:50:34 +0800 Subject: [PATCH 2/2] fix: hstreamdb zh docs --- .../i18n/emqx_ee_bridge_hstream.conf | 18 +++++------ .../i18n/emqx_ee_connector_hstream.conf | 32 +++++++++---------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf index 20b189293..ad07ad377 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf @@ -31,32 +31,32 @@ will be forwarded. config_enable { desc { en: """Enable or disable this bridge""" - zh: """启用/禁用 Bridge""" + zh: """启用/禁用桥接""" } label { en: "Enable Or Disable Bridge" - zh: "启用/禁用 Bridge" + zh: "启用/禁用桥接" } } config_direction { desc { en: """The direction of this bridge, MUST be 'egress'""" - zh: """Bridge 的方向, 必须是 egress""" + zh: """桥接的方向, 必须是 egress""" } label { en: "Bridge Direction" - zh: "Bridge 方向" + zh: "桥接方向" } } desc_config { desc { en: """Configuration for an HStreamDB bridge.""" - zh: """HStreamDB Bridge 配置""" + zh: """HStreamDB 桥接配置""" } label: { en: "HStreamDB Bridge Configuration" - zh: "HStreamDB Bridge 配置" + zh: "HStreamDB 桥接配置" } } @@ -67,18 +67,18 @@ will be forwarded. } label { en: "Bridge Type" - zh: "Bridge 类型" + zh: "桥接类型" } } desc_name { desc { en: """Bridge name, used as a human-readable description of the bridge.""" - zh: """Bridge 名字,Bridge 的可读描述""" + zh: """桥接名字,可读描述""" } label { en: "Bridge Name" - zh: "Bridge 名字" + zh: "桥接名字" } } desc_connector { diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf index af9eda92a..080076065 100644 --- a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf @@ -23,42 +23,42 @@ emqx_ee_connector_hstream { } url { desc { - en: """HStream Server URL""" - zh: """HStream 服务器 URL""" + en: """HStreamDB Server URL""" + zh: """HStreamDB 服务器 URL""" } label { - en: """HStream Server URL""" - zh: """HStream 服务器 URL""" + en: """HStreamDB Server URL""" + zh: """HStreamDB 服务器 URL""" } } stream_name { desc { - en: """HStream Stream Name""" - zh: """HStream 流名称""" + en: """HStreamDB Stream Name""" + zh: """HStreamDB 流名称""" } label { - en: """HStream Stream Name""" - zh: """HStream 流名称""" + en: """HStreamDB Stream Name""" + zh: """HStreamDB 流名称""" } } ordering_key { desc { - en: """HStream Ordering Key""" - zh: """HStream 分区键""" + en: """HStreamDB Ordering Key""" + zh: """HStreamDB 分区键""" } label { - en: """HStream Ordering Key""" - zh: """HStream 分区键""" + en: """HStreamDB Ordering Key""" + zh: """HStreamDB 分区键""" } } pool_size { desc { - en: """HStream Pool Size""" - zh: """HStream 连接池大小""" + en: """HStreamDB Pool Size""" + zh: """HStreamDB 连接池大小""" } label { - en: """HStream Pool Size""" - zh: """HStream 连接池大小""" + en: """HStreamDB Pool Size""" + zh: """HStreamDB 连接池大小""" } } }