Merge pull request #8595 from DDDHuang/hstream_connector

fix: hstreamdb connector conf & api
This commit is contained in:
DDDHuang 2022-07-28 17:22:25 +08:00 committed by GitHub
commit c4f76a4e4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 204 additions and 78 deletions

View File

@ -287,9 +287,7 @@ parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} =
maps:without([connector, direction], Conf),
Type,
Name
);
parse_confs(_Type, _Name, Conf) ->
Conf.
).
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) ->
BName = bridge_id(Type, Name),

View File

@ -80,7 +80,7 @@ connector_info_array_example(Method) ->
[Config || #{value := Config} <- maps:values(connector_info_examples(Method))].
connector_info_examples(Method) ->
lists:foldl(
Fun =
fun(Type, Acc) ->
SType = atom_to_list(Type),
maps:merge(Acc, #{
@ -90,9 +90,17 @@ connector_info_examples(Method) ->
}
})
end,
#{},
?CONN_TYPES
).
Broker = lists:foldl(Fun, #{}, ?CONN_TYPES),
EE = ee_example(Method),
maps:merge(Broker, EE).
-if(?EMQX_RELEASE_EDITION == ee).
ee_example(Method) ->
emqx_ee_connector:connector_examples(Method).
-else.
ee_example(_Method) ->
#{}.
-endif.
info_example(Type, Method) ->
maps:merge(

View File

@ -44,7 +44,9 @@ post_request() ->
http_schema("post").
http_schema(Method) ->
Schemas = [?R_REF(schema_mod(Type), Method) || Type <- ?CONN_TYPES],
Broker = [?R_REF(schema_mod(Type), Method) || Type <- ?CONN_TYPES],
EE = [?R_REF(Module, Method) || Module <- schema_modules()],
Schemas = Broker ++ EE,
?UNION(Schemas).
%%======================================================================================
@ -57,13 +59,29 @@ roots() -> ["connectors"].
fields(connectors) ->
fields("connectors");
fields("connectors") ->
[
Broker = [
{mqtt,
?HOCON(
?MAP(name, ?R_REF(emqx_connector_mqtt_schema, "connector")),
#{desc => ?DESC("mqtt")}
)}
].
],
EE = ee_fields_connectors(),
Broker ++ EE.
-if(?EMQX_RELEASE_EDITION == ee).
schema_modules() ->
emqx_ee_connector:schema_modules().
ee_fields_connectors() ->
emqx_ee_connector:fields(connectors).
-else.
ee_fields_connectors() ->
[].
schema_modules() ->
[].
-endif.
desc(Record) when
Record =:= connectors;

View File

@ -31,32 +31,32 @@ will be forwarded.
config_enable {
desc {
en: """Enable or disable this bridge"""
zh: """启用/禁用 Bridge"""
zh: """启用/禁用桥接"""
}
label {
en: "Enable Or Disable Bridge"
zh: "启用/禁用 Bridge"
zh: "启用/禁用桥接"
}
}
config_direction {
desc {
en: """The direction of this bridge, MUST be 'egress'"""
zh: """Bridge 的方向, 必须是 egress"""
zh: """桥接的方向, 必须是 egress"""
}
label {
en: "Bridge Direction"
zh: "Bridge 方向"
zh: "桥接方向"
}
}
desc_config {
desc {
en: """Configuration for an HStreamDB bridge."""
zh: """HStreamDB Bridge 配置"""
zh: """HStreamDB 桥接配置"""
}
label: {
en: "HStreamDB Bridge Configuration"
zh: "HStreamDB Bridge 配置"
zh: "HStreamDB 桥接配置"
}
}
@ -67,18 +67,28 @@ will be forwarded.
}
label {
en: "Bridge Type"
zh: "Bridge 类型"
zh: "桥接类型"
}
}
desc_name {
desc {
en: """Bridge name, used as a human-readable description of the bridge."""
zh: """Bridge 名字Bridge 的可读描述"""
zh: """桥接名字,可读描述"""
}
label {
en: "Bridge Name"
zh: "Bridge 名字"
zh: "桥接名字"
}
}
desc_connector {
desc {
en: """Generic configuration for the connector."""
zh: """连接器的通用配置。"""
}
label: {
en: "Connector Generic Configuration"
zh: "连接器通用配置。"
}
}
}

View File

@ -23,12 +23,12 @@ conn_bridge_examples(Method) ->
end,
lists:foldl(Fun, #{}, schema_modules()).
resource_type(hstream) -> emqx_ee_connector_hstream;
resource_type(<<"hstream">>) -> emqx_ee_connector_hstream.
resource_type(hstreamdb) -> emqx_ee_connector_hstream;
resource_type(<<"hstreamdb">>) -> emqx_ee_connector_hstream.
fields(bridges) ->
[
{hstream,
{hstreamdb,
mk(
hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")),
#{desc => <<"EMQX Enterprise Config">>}

View File

@ -7,7 +7,7 @@
-include_lib("hocon/include/hoconsc.hrl").
-include("emqx_ee_bridge.hrl").
-import(hoconsc, [mk/2, enum/1]).
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
conn_bridge_example/1
@ -25,7 +25,7 @@
conn_bridge_example(Method) ->
#{
<<"hstream">> => #{
<<"hstreamdb">> => #{
summary => <<"HStreamDB Bridge">>,
value => values(Method)
}
@ -35,12 +35,9 @@ values(get) ->
maps:merge(values(post), ?METRICS_EXAMPLE);
values(post) ->
#{
type => hstream,
type => hstreamdb,
name => <<"demo">>,
url => <<"http://127.0.0.1:6570">>,
stream => <<"stream1">>,
ordering_key => <<"some_key">>,
pool_size => 8,
connector => <<"hstreamdb:connector">>,
enable => true,
direction => egress,
local_topic => <<"local/topic/#">>,
@ -56,11 +53,13 @@ namespace() -> "bridge".
roots() -> [].
fields("config") ->
ExtConfig = [
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
{payload, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("payload")})}
],
basic_config() ++ ExtConfig;
{payload, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("payload")})},
{connector, field(connector)}
];
fields("post") ->
[type_field(), name_field() | fields("config")];
fields("put") ->
@ -68,6 +67,16 @@ fields("put") ->
fields("get") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("post").
field(connector) ->
mk(
hoconsc:union([binary(), ref(emqx_ee_connector_hstream, config)]),
#{
required => true,
example => <<"hstreamdb:demo">>,
desc => ?DESC("desc_connector")
}
).
desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
@ -75,17 +84,10 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
desc(_) ->
undefined.
basic_config() ->
Basic = [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}
],
emqx_ee_connector_hstream:fields(config) ++ Basic.
%% -------------------------------------------------------------------------------------------------
%% internal
type_field() ->
{type, mk(enum([hstream]), #{required => true, desc => ?DESC("desc_type")})}.
{type, mk(enum([hstreamdb]), #{required => true, desc => ?DESC("desc_type")})}.
name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

View File

@ -1,43 +1,64 @@
emqx_ee_connector_hstream {
type {
desc {
en: "The Connector Type."
zh: "连接器类型。"
}
label: {
en: "Connector Type"
zh: "连接器类型"
}
}
name {
desc {
en: "Connector name, used as a human-readable description of the connector."
zh: "连接器名称,人类可读的连接器描述。"
}
label: {
en: "Connector Name"
zh: "连接器名称"
}
}
url {
desc {
en: """HStream Server URL"""
zh: """HStream 服务器 URL"""
en: """HStreamDB Server URL"""
zh: """HStreamDB 服务器 URL"""
}
label {
en: """HStream Server URL"""
zh: """HStream 服务器 URL"""
en: """HStreamDB Server URL"""
zh: """HStreamDB 服务器 URL"""
}
}
stream_name {
desc {
en: """HStream Stream Name"""
zh: """HStream 流名称"""
en: """HStreamDB Stream Name"""
zh: """HStreamDB 流名称"""
}
label {
en: """HStream Stream Name"""
zh: """HStream 流名称"""
en: """HStreamDB Stream Name"""
zh: """HStreamDB 流名称"""
}
}
ordering_key {
desc {
en: """HStream Ordering Key"""
zh: """HStream 分区键"""
en: """HStreamDB Ordering Key"""
zh: """HStreamDB 分区键"""
}
label {
en: """HStream Ordering Key"""
zh: """HStream 分区键"""
en: """HStreamDB Ordering Key"""
zh: """HStreamDB 分区键"""
}
}
pool_size {
desc {
en: """HStream Pool Size"""
zh: """HStream 连接池大小"""
en: """HStreamDB Pool Size"""
zh: """HStreamDB 连接池大小"""
}
label {
en: """HStream Pool Size"""
zh: """HStream 连接池大小"""
en: """HStreamDB Pool Size"""
zh: """HStreamDB 连接池大小"""
}
}
}

View File

@ -0,0 +1,32 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_connector).
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
schema_modules/0,
fields/1,
connector_examples/1
]).
schema_modules() ->
[emqx_ee_connector_hstream].
fields(connectors) ->
[
{hstreamdb,
mk(
hoconsc:map(name, ref(emqx_ee_connector_hstream, config)),
#{desc => <<"EMQX Enterprise Config">>}
)}
].
connector_examples(Method) ->
Fun =
fun(Module, Examples) ->
Example = erlang:apply(Module, connector_example, [Method]),
maps:merge(Examples, Example)
end,
lists:foldl(Fun, #{}, schema_modules()).

View File

@ -25,7 +25,8 @@
-export([
roots/0,
fields/1
fields/1,
connector_example/1
]).
%% -------------------------------------------------------------------------------------------------
@ -38,7 +39,7 @@ on_stop(InstId, #{client := Client, producer := Producer}) ->
StopClientRes = hstreamdb:stop_client(Client),
StopProducerRes = hstreamdb:stop_producer(Producer),
?SLOG(info, #{
msg => "stop hstream connector",
msg => "stop hstreamdb connector",
connector => InstId,
client => Client,
producer => Producer,
@ -64,7 +65,7 @@ on_get_status(_InstId, #{client := Client}) ->
end.
%% -------------------------------------------------------------------------------------------------
%% hstream batch callback
%% hstreamdb batch callback
%% TODO: maybe remove it after disk cache is ready
on_flush_result({{flush, _Stream, _Records}, {ok, _Resp}}) ->
@ -82,9 +83,41 @@ fields(config) ->
[
{url, mk(binary(), #{required => true, desc => ?DESC("url")})},
{stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})},
{ordering_key, mk(binary(), #{required => true, desc => ?DESC("ordering_key")})},
{ordering_key, mk(binary(), #{required => false, desc => ?DESC("ordering_key")})},
{pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})}
].
];
fields("get") ->
fields("post");
fields("put") ->
fields(config);
fields("post") ->
[
{type, mk(hstreamdb, #{required => true, desc => ?DESC("type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("name")})}
] ++ fields("put").
connector_example(Method) ->
#{
<<"hstreamdb">> => #{
summary => <<"HStreamDB Connector">>,
value => values(Method)
}
}.
values(post) ->
maps:merge(values(put), #{name => <<"connector">>});
values(get) ->
values(post);
values(put) ->
#{
type => hstreamdb,
url => <<"http://127.0.0.1:6570">>,
stream => <<"stream1">>,
ordering_key => <<"some_key">>,
pool_size => 8
};
values(_) ->
#{}.
%% -------------------------------------------------------------------------------------------------
%% internal functions
@ -94,7 +127,7 @@ start_client(InstId, Config) ->
catch
E:R:S ->
?SLOG(error, #{
msg => "start hstream connector error",
msg => "start hstreamdb connector error",
connector => InstId,
error => E,
reason => R,
@ -104,7 +137,7 @@ start_client(InstId, Config) ->
do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->
?SLOG(info, #{
msg => "starting hstream connector: client",
msg => "starting hstreamdb connector: client",
connector => InstId,
config => Config
}),
@ -118,21 +151,21 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->
case is_alive(Client) of
true ->
?SLOG(info, #{
msg => "hstream connector: client started",
msg => "hstreamdb connector: client started",
connector => InstId,
client => Client
}),
start_producer(InstId, Client, Config);
_ ->
?SLOG(error, #{
msg => "hstream connector: client not alive",
msg => "hstreamdb connector: client not alive",
connector => InstId
}),
{error, connect_failed}
end;
{error, {already_started, Pid}} ->
?SLOG(info, #{
msg => "starting hstream connector: client, find old client. restart client",
msg => "starting hstreamdb connector: client, find old client. restart client",
old_client_pid => Pid,
old_client_name => ClientName
}),
@ -140,7 +173,7 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->
start_client(InstId, Config);
{error, Error} ->
?SLOG(error, #{
msg => "hstream connector: client failed",
msg => "hstreamdb connector: client failed",
connector => InstId,
reason => Error
}),
@ -155,7 +188,11 @@ is_alive(Client) ->
false
end.
start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSize}) ->
start_producer(
InstId,
Client,
Options = #{stream := Stream, pool_size := PoolSize, egress := #{payload := PayloadBin}}
) ->
%% TODO: change these batch options after we have better disk cache.
BatchSize = maps:get(batch_size, Options, 100),
Interval = maps:get(batch_interval, Options, 1000),
@ -168,16 +205,15 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi
],
Name = produce_name(InstId),
?SLOG(info, #{
msg => "starting hstream connector: producer",
msg => "starting hstreamdb connector: producer",
connector => InstId
}),
case hstreamdb:start_producer(Client, Name, ProducerOptions) of
{ok, Producer} ->
?SLOG(info, #{
msg => "hstream connector: producer started"
msg => "hstreamdb connector: producer started"
}),
EnableBatch = maps:get(enable_batch, Options, false),
PayloadBin = maps:get(payload, Options, <<"">>),
Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin),
OrderingKeyBin = maps:get(ordering_key, Options, <<"">>),
OrderingKey = emqx_plugin_libs_rule:preproc_tmpl(OrderingKeyBin),
@ -191,7 +227,8 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi
{ok, State};
{error, {already_started, Pid}} ->
?SLOG(info, #{
msg => "starting hstream connector: producer, find old producer. restart producer",
msg =>
"starting hstreamdb connector: producer, find old producer. restart producer",
old_producer_pid => Pid,
old_producer_name => Name
}),
@ -199,7 +236,7 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi
start_producer(InstId, Client, Options);
{error, Reason} ->
?SLOG(error, #{
msg => "starting hstream connector: producer, failed",
msg => "starting hstreamdb connector: producer, failed",
reason => Reason
}),
{error, Reason}
@ -223,13 +260,13 @@ do_append(AfterQuery, Producer, Record) ->
% case hstreamdb:append(Producer, Record) of
% ok ->
% ?SLOG(debug, #{
% msg => "hstream producer async append success",
% msg => "hstreamdb producer async append success",
% record => Record
% }),
% emqx_resource:query_success(AfterQuery);
% {error, Reason} ->
% ?SLOG(error, #{
% msg => "hstream producer async append failed",
% msg => "hstreamdb producer async append failed",
% reason => Reason,
% record => Record
% }),
@ -241,13 +278,13 @@ do_append(AfterQuery, false, Producer, Record) ->
case hstreamdb:append_flush(Producer, Record) of
{ok, _} ->
?SLOG(debug, #{
msg => "hstream producer sync append success",
msg => "hstreamdb producer sync append success",
record => Record
}),
emqx_resource:query_success(AfterQuery);
{error, Reason} ->
?SLOG(error, #{
msg => "hstream producer sync append failed",
msg => "hstreamdb producer sync append failed",
reason => Reason,
record => Record
}),