diff --git a/.ci/docker-compose-file/.env b/.ci/docker-compose-file/.env index 73ec47d00..1b837aea3 100644 --- a/.ci/docker-compose-file/.env +++ b/.ci/docker-compose-file/.env @@ -10,7 +10,7 @@ CASSANDRA_TAG=3.11 MINIO_TAG=RELEASE.2023-03-20T20-16-18Z OPENTS_TAG=9aa7f88 KINESIS_TAG=2.1 -HSTREAMDB_TAG=v0.16.1 +HSTREAMDB_TAG=v0.19.3 HSTREAMDB_ZK_TAG=3.8.1 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index e54ef6124..5a62ae139 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -91,6 +91,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_kafka_action_info, emqx_bridge_kinesis_action_info, + emqx_bridge_hstreamdb_action_info, emqx_bridge_matrix_action_info, emqx_bridge_mongodb_action_info, emqx_bridge_oracle_action_info, diff --git a/apps/emqx_bridge_hstreamdb/rebar.config b/apps/emqx_bridge_hstreamdb/rebar.config index eab7bcb3f..c2e3194ac 100644 --- a/apps/emqx_bridge_hstreamdb/rebar.config +++ b/apps/emqx_bridge_hstreamdb/rebar.config @@ -3,7 +3,7 @@ {erl_opts, [debug_info]}. {deps, [ {hstreamdb_erl, - {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.4.5+v0.16.1"}}}, + {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.5.18+v0.18.1"}}}, {emqx, {path, "../../apps/emqx"}}, {emqx_utils, {path, "../../apps/emqx_utils"}} ]}. diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src index f9825e3dd..84c09fe3a 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_hstreamdb, [ {description, "EMQX Enterprise HStreamDB Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, @@ -8,7 +8,7 @@ emqx_resource, hstreamdb_erl ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_hstreamdb_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl index 7052e0120..694f459e0 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl @@ -1,4 +1,4 @@ -%%-------------------------------------------------------------------- +%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_hstreamdb). @@ -6,10 +6,12 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --import(hoconsc, [mk/2, enum/1, ref/2]). +-import(hoconsc, [mk/2, enum/1]). -export([ - conn_bridge_examples/1 + conn_bridge_examples/1, + bridge_v2_examples/1, + connector_examples/1 ]). -export([ @@ -19,6 +21,11 @@ desc/1 ]). +-define(CONNECTOR_TYPE, hstreamdb). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). +-define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>). +-define(DEFAULT_GRPC_FLUSH_TIMEOUT_RAW, <<"10s">>). + %% ------------------------------------------------------------------------------------------------- %% api @@ -27,16 +34,16 @@ conn_bridge_examples(Method) -> #{ <<"hstreamdb">> => #{ summary => <<"HStreamDB Bridge">>, - value => values(Method) + value => conn_bridge_example_values(Method) } } ]. -values(get) -> - values(post); -values(put) -> - values(post); -values(post) -> +conn_bridge_example_values(get) -> + conn_bridge_example_values(post); +conn_bridge_example_values(put) -> + conn_bridge_example_values(post); +conn_bridge_example_values(post) -> #{ type => <<"hstreamdb">>, name => <<"demo">>, @@ -55,15 +62,143 @@ values(post) -> }, ssl => #{enable => false} }; -values(_) -> +conn_bridge_example_values(_) -> #{}. +connector_examples(Method) -> + [ + #{ + <<"hstreamdb">> => + #{ + summary => <<"HStreamDB Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_values() + ) + } + } + ]. + +connector_values() -> + #{ + <<"url">> => <<"http://127.0.0.1:6570">>, + <<"grpc_timeout">> => <<"30s">>, + <<"ssl">> => + #{ + <<"enable">> => false, + <<"verify">> => <<"verify_peer">> + }, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_timeout">> => <<"5s">> + } + }. + +bridge_v2_examples(Method) -> + [ + #{ + <<"hstreamdb">> => + #{ + summary => <<"HStreamDB Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{ + <<"parameters">> => #{ + <<"partition_key">> => <<"hej">>, + <<"record_template">> => <<"${payload}">>, + <<"stream">> => <<"mqtt_message">>, + <<"aggregation_pool_size">> => 8, + <<"writer_pool_size">> => 8 + } + }. + %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions namespace() -> "bridge_hstreamdb". roots() -> []. +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + Fields = + fields(connector_fields) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(hstreamdb_action)); +fields(action) -> + {?ACTION_TYPE, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, hstreamdb_action)), + #{ + desc => <<"HStreamDB Action Config">>, + required => false + } + )}; +fields(hstreamdb_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + hoconsc:ref(?MODULE, action_parameters), + #{ + required => true, + desc => ?DESC("action_parameters") + } + ) + ); +fields(action_parameters) -> + [ + {stream, + mk(binary(), #{ + required => true, desc => ?DESC(emqx_bridge_hstreamdb_connector, "stream_name") + })}, + + {partition_key, + mk(binary(), #{ + required => false, desc => ?DESC(emqx_bridge_hstreamdb_connector, "partition_key") + })}, + + {grpc_flush_timeout, fun grpc_flush_timeout/1}, + {record_template, + mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})}, + {aggregation_pool_size, + mk(integer(), #{default => 8, desc => ?DESC("aggregation_pool_size")})}, + {max_batches, mk(integer(), #{default => 500, desc => ?DESC("max_batches")})}, + {writer_pool_size, mk(integer(), #{default => 8, desc => ?DESC("writer_pool_size")})}, + {batch_size, mk(integer(), #{default => 100, desc => ?DESC("batch_size")})}, + {batch_interval, + mk(emqx_schema:timeout_duration_ms(), #{ + default => <<"500ms">>, desc => ?DESC("batch_interval") + })} + ]; +fields(connector_fields) -> + [ + {url, + mk(binary(), #{ + required => true, + desc => ?DESC(emqx_bridge_hstreamdb_connector, "url"), + default => <<"http://127.0.0.1:6570">> + })}, + {grpc_timeout, fun grpc_timeout/1} + ] ++ emqx_connector_schema_lib:ssl_fields(); +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + fields(connector_fields) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields("config") -> hstream_bridge_common_fields() ++ connector_fields(); @@ -80,6 +215,18 @@ fields("put") -> hstream_bridge_common_fields() ++ connector_fields(). +grpc_timeout(type) -> emqx_schema:timeout_duration_ms(); +grpc_timeout(desc) -> ?DESC(emqx_bridge_hstreamdb_connector, "grpc_timeout"); +grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW; +grpc_timeout(required) -> false; +grpc_timeout(_) -> undefined. + +grpc_flush_timeout(type) -> emqx_schema:timeout_duration_ms(); +grpc_flush_timeout(desc) -> ?DESC("grpc_flush_timeout"); +grpc_flush_timeout(default) -> ?DEFAULT_GRPC_FLUSH_TIMEOUT_RAW; +grpc_flush_timeout(required) -> false; +grpc_flush_timeout(_) -> undefined. + hstream_bridge_common_fields() -> emqx_bridge_schema:common_bridge_fields() ++ [ @@ -97,6 +244,16 @@ desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for HStreamDB bridge using `", string:to_upper(Method), "` method."]; +desc("creation_opts") -> + ?DESC(emqx_resource_schema, "creation_opts"); +desc("config_connector") -> + ?DESC("config_connector"); +desc(hstreamdb_action) -> + ?DESC("hstreamdb_action"); +desc(action_parameters) -> + ?DESC("action_parameters"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl new file mode 100644 index 000000000..7aa6565fa --- /dev/null +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl @@ -0,0 +1,89 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_hstreamdb_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0, + bridge_v1_config_to_connector_config/1, + bridge_v1_config_to_action_config/2, + connector_action_config_to_bridge_v1_config/2 +]). + +bridge_v1_type_name() -> hstreamdb. + +action_type_name() -> hstreamdb. + +connector_type_name() -> hstreamdb. + +schema_module() -> emqx_bridge_hstreamdb. + +bridge_v1_config_to_connector_config(BridgeV1Conf) -> + ConnectorSchema = emqx_bridge_hstreamdb:fields(connector_fields), + ConnectorAtomKeys = lists:foldl(fun({K, _}, Acc) -> [K | Acc] end, [], ConnectorSchema), + ConnectorBinKeys = [atom_to_binary(K) || K <- ConnectorAtomKeys] ++ [<<"resource_opts">>], + ConnectorConfig0 = maps:with(ConnectorBinKeys, BridgeV1Conf), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnectorConfig0 + ). + +bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> + Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config( + BridgeV1Conf, ConnectorName, emqx_bridge_hstreamdb, "config_connector" + ), + %% Remove fields no longer relevant for the action + Config1 = lists:foldl( + fun(Field, Acc) -> + emqx_utils_maps:deep_remove(Field, Acc) + end, + Config0, + [ + [<<"parameters">>, <<"pool_size">>], + [<<"parameters">>, <<"direction">>] + ] + ), + %% Move pool_size to aggregation_pool_size and writer_pool_size + PoolSize = maps:get(<<"pool_size">>, BridgeV1Conf, 8), + Config2 = emqx_utils_maps:deep_put( + [<<"parameters">>, <<"aggregation_pool_size">>], + Config1, + PoolSize + ), + Config3 = emqx_utils_maps:deep_put( + [<<"parameters">>, <<"writer_pool_size">>], + Config2, + PoolSize + ), + Config3. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig), + BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1), + BridgeV1Config3 = maps:remove(<<"parameters">>, BridgeV1Config2), + %% Pick out pool_size from aggregation_pool_size + PoolSize = emqx_utils_maps:deep_get( + [<<"parameters">>, <<"aggregation_pool_size">>], ActionConfig, 8 + ), + BridgeV1Config4 = maps:put(<<"pool_size">>, PoolSize, BridgeV1Config3), + + %% Move the fields stream, partition_key and record_template from + %% parameters in ActionConfig to the top level in BridgeV1Config + lists:foldl( + fun(Field, Acc) -> + emqx_utils_maps:deep_put( + [Field], + Acc, + emqx_utils_maps:deep_get([<<"parameters">>, Field], ActionConfig, <<>>) + ) + end, + BridgeV1Config4, + [<<"stream">>, <<"partition_key">>, <<"record_template">>] + ). diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index fdb80b1e1..8413e5ecd 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -7,8 +7,9 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). --import(hoconsc, [mk/2, enum/1]). +-import(hoconsc, [mk/2]). -behaviour(emqx_resource). @@ -19,7 +20,11 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). -export([ @@ -38,67 +43,132 @@ -define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)). -define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>). +-define(DEFAULT_GRPC_FLUSH_TIMEOUT, 10000). +-define(DEFAULT_MAX_BATCHES, 500). +-define(DEFAULT_BATCH_INTERVAL, 500). +-define(DEFAULT_AGG_POOL_SIZE, 8). +-define(DEFAULT_WRITER_POOL_SIZE, 8). %% ------------------------------------------------------------------------------------------------- %% resource callback callback_mode() -> always_sync. on_start(InstId, Config) -> - start_client(InstId, Config). + try + do_on_start(InstId, Config) + catch + E:R:S -> + Error = #{ + msg => "start_hstreamdb_connector_error", + connector => InstId, + error => E, + reason => R, + stack => S + }, + ?SLOG(error, Error), + {error, Error} + end. + +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels, + client_options := ClientOptions + } = OldState, + ChannelId, + ChannelConfig +) -> + %{ok, ChannelState} = create_channel_state(ChannelId, PoolName, ChannelConfig), + Parameters0 = maps:get(parameters, ChannelConfig), + Parameters = Parameters0#{client_options => ClientOptions}, + PartitionKey = emqx_placeholder:preproc_tmpl(maps:get(partition_key, Parameters, <<"">>)), + try + ChannelState = #{ + producer => start_producer(ChannelId, Parameters), + record_template => record_template(Parameters), + partition_key => PartitionKey + }, + NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState} + catch + Error:Reason:Stack -> + {error, {Error, Reason, Stack}} + end. + +on_remove_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId +) -> + #{ + producer := Producer + } = maps:get(ChannelId, InstalledChannels), + _ = hstreamdb:stop_producer(Producer), + NewInstalledChannels = maps:remove(ChannelId, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +on_get_channel_status( + _ResId, + _ChannelId, + _State +) -> + ?status_connected. + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). on_stop(InstId, _State) -> - case emqx_resource:get_allocated_resources(InstId) of - #{?hstreamdb_client := #{client := Client, producer := Producer}} -> - StopClientRes = hstreamdb:stop_client(Client), - StopProducerRes = hstreamdb:stop_producer(Producer), - ?SLOG(info, #{ - msg => "stop_hstreamdb_connector", - connector => InstId, - client => Client, - producer => Producer, - stop_client => StopClientRes, - stop_producer => StopProducerRes - }); - _ -> - ok - end. + ?tp( + hstreamdb_connector_on_stop, + #{instance_id => InstId} + ). -define(FAILED_TO_APPLY_HRECORD_TEMPLATE, {error, {unrecoverable_error, failed_to_apply_hrecord_template}} ). on_query( - _InstId, - {send_message, Data}, - _State = #{ - producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate - } + InstId, + {ChannelID, Data}, + #{installed_channels := Channels} = _State ) -> + #{ + producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate + } = maps:get(ChannelID, Channels), try to_record(PartitionKey, HRecordTemplate, Data) of - Record -> append_record(Producer, Record, false) + Record -> append_record(InstId, Producer, Record, false) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. on_batch_query( - _InstId, - BatchList, - _State = #{ - producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate - } + InstId, + [{ChannelID, _Data} | _] = BatchList, + #{installed_channels := Channels} = _State ) -> + #{ + producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate + } = maps:get(ChannelID, Channels), try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of - Records -> append_record(Producer, Records, true) + Records -> append_record(InstId, Producer, Records, true) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. -on_get_status(_InstId, #{client := Client}) -> - case is_alive(Client) of - true -> - connected; - false -> - disconnected +on_get_status(_InstId, State) -> + case check_status(State) of + ok -> + ?status_connected; + Error -> + %% We set it to ?status_connecting so that the channels are not deleted. + %% The producers in the channels contains buffers so we don't want to delete them. + {?status_connecting, State, Error} end. %% ------------------------------------------------------------------------------------------------- @@ -140,142 +210,152 @@ desc(config) -> %% ------------------------------------------------------------------------------------------------- %% internal functions -start_client(InstId, Config) -> - try - do_start_client(InstId, Config) - catch - E:R:S -> - Error = #{ - msg => "start_hstreamdb_connector_error", - connector => InstId, - error => E, - reason => R, - stack => S - }, - ?SLOG(error, Error), - {error, Error} - end. -do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize, ssl := SSL}) -> +do_on_start(InstId, Config) -> ?SLOG(info, #{ msg => "starting_hstreamdb_connector_client", connector => InstId, config => Config }), - ClientName = client_name(InstId), + {ok, _} = application:ensure_all_started(hstreamdb_erl), + ClientOptions = client_options(Config), + State = #{ + client_options => ClientOptions, + installed_channels => #{} + }, + case check_status(State) of + ok -> + ?SLOG(info, #{ + msg => "hstreamdb_connector_client_started", + connector => InstId + }), + {ok, State}; + Error -> + ?tp( + hstreamdb_connector_start_failed, + #{error => client_not_alive} + ), + ?SLOG(error, #{ + msg => "hstreamdb_connector_client_not_alive", + connector => InstId, + error => Error + }), + {error, {connect_failed, Error}} + end. + +client_options(Config = #{url := ServerURL, ssl := SSL}) -> + GRPCTimeout = maps:get(<<"grpc_timeout">>, Config, ?DEFAULT_GRPC_TIMEOUT), + EnableSSL = maps:get(enable, SSL), RpcOpts = - case maps:get(enable, SSL) of + case EnableSSL of false -> - #{pool_size => PoolSize}; + #{pool_size => 1}; true -> #{ - pool_size => PoolSize, + pool_size => 1, gun_opts => #{ transport => tls, - transport_opts => emqx_tls_lib:to_client_opts(SSL) + transport_opts => + emqx_tls_lib:to_client_opts(SSL) } } end, - ClientOptions = [ - {url, binary_to_list(Server)}, - {rpc_options, RpcOpts} - ], - case hstreamdb:start_client(ClientName, ClientOptions) of + ClientOptions = #{ + url => to_string(ServerURL), + grpc_timeout => GRPCTimeout, + rpc_options => RpcOpts + }, + ClientOptions. + +check_status(ConnectorState) -> + try start_client(ConnectorState) of {ok, Client} -> - case is_alive(Client) of - true -> - ?SLOG(info, #{ - msg => "hstreamdb_connector_client_started", - connector => InstId, - client => Client - }), - start_producer(InstId, Client, Config); - _ -> - ?tp( - hstreamdb_connector_start_failed, - #{error => client_not_alive} - ), - ?SLOG(error, #{ - msg => "hstreamdb_connector_client_not_alive", - connector => InstId - }), - {error, connect_failed} - end; - {error, {already_started, Pid}} -> - ?SLOG(info, #{ - msg => "starting_hstreamdb_connector_client_find_old_client_restart_client", - old_client_pid => Pid, - old_client_name => ClientName - }), - _ = hstreamdb:stop_client(ClientName), - start_client(InstId, Config); + check_status_with_client(Client); + {error, _} = StartClientError -> + StartClientError + catch + ErrorType:Reason:_ST -> + {error, {ErrorType, Reason}} + end. + +check_status_with_client(Client) -> + try hstreamdb_client:echo(Client) of + ok -> ok; + {error, _} = ErrorEcho -> ErrorEcho + after + _ = hstreamdb:stop_client(Client) + end. + +start_client(Opts) -> + ClientOptions = maps:get(client_options, Opts), + case hstreamdb:start_client(ClientOptions) of + {ok, Client} -> + {ok, Client}; {error, Error} -> - ?SLOG(error, #{ - msg => "hstreamdb_connector_client_failed", - connector => InstId, - reason => Error - }), {error, Error} end. -is_alive(Client) -> - hstreamdb_client:echo(Client) =:= ok. - start_producer( - InstId, - Client, - Options = #{stream := Stream, pool_size := PoolSize} + ActionId, + #{ + stream := Stream, + batch_size := BatchSize, + batch_interval := Interval + } = Opts ) -> - %% TODO: change these batch options after we have better disk cache. - BatchSize = maps:get(batch_size, Options, 100), - Interval = maps:get(batch_interval, Options, 1000), - ProducerOptions = [ - {stream, Stream}, - {callback, {?MODULE, on_flush_result, []}}, - {max_records, BatchSize}, - {interval, Interval}, - {pool_size, PoolSize}, - {grpc_timeout, maps:get(grpc_timeout, Options, ?DEFAULT_GRPC_TIMEOUT)} - ], - Name = produce_name(InstId), - ?SLOG(info, #{ - msg => "starting_hstreamdb_connector_producer", - connector => InstId - }), - case hstreamdb:start_producer(Client, Name, ProducerOptions) of - {ok, Producer} -> - ?SLOG(info, #{ - msg => "hstreamdb_connector_producer_started" - }), - State = #{ - client => Client, - producer => Producer, - enable_batch => maps:get(enable_batch, Options, false), - partition_key => emqx_placeholder:preproc_tmpl( - maps:get(partition_key, Options, <<"">>) - ), - record_template => record_template(Options) - }, - ok = emqx_resource:allocate_resource(InstId, ?hstreamdb_client, #{ - client => Client, producer => Producer - }), - {ok, State}; - {error, {already_started, Pid}} -> - ?SLOG(info, #{ - msg => - "starting_hstreamdb_connector_producer_find_old_producer_restart_producer", - old_producer_pid => Pid, - old_producer_name => Name - }), - _ = hstreamdb:stop_producer(Name), - start_producer(InstId, Client, Options); + MaxBatches = maps:get(max_batches, Opts, ?DEFAULT_MAX_BATCHES), + AggPoolSize = maps:get(aggregation_pool_size, Opts, ?DEFAULT_AGG_POOL_SIZE), + WriterPoolSize = maps:get(writer_pool_size, Opts, ?DEFAULT_WRITER_POOL_SIZE), + GRPCTimeout = maps:get(grpc_flush_timeout, Opts, ?DEFAULT_GRPC_FLUSH_TIMEOUT), + ClientOptions = maps:get(client_options, Opts), + ProducerOptions = #{ + stream => to_string(Stream), + buffer_options => #{ + interval => Interval, + callback => {?MODULE, on_flush_result, [ActionId]}, + max_records => BatchSize, + max_batches => MaxBatches + }, + buffer_pool_size => AggPoolSize, + writer_options => #{ + grpc_timeout => GRPCTimeout + }, + writer_pool_size => WriterPoolSize, + client_options => ClientOptions + }, + Name = produce_name(ActionId), + ensure_start_producer(Name, ProducerOptions). + +ensure_start_producer(ProducerName, ProducerOptions) -> + case hstreamdb:start_producer(ProducerName, ProducerOptions) of + ok -> + ok; + {error, {already_started, _Pid}} -> + %% HStreamDB producer already started, restart it + _ = hstreamdb:stop_producer(ProducerName), + %% the pool might have been leaked after relup + _ = ecpool:stop_sup_pool(ProducerName), + ok = hstreamdb:start_producer(ProducerName, ProducerOptions); + {error, { + {shutdown, + {failed_to_start_child, {pool_sup, Pool}, + {shutdown, + {failed_to_start_child, worker_sup, + {shutdown, {failed_to_start_child, _, {badarg, _}}}}}}}, + _ + }} -> + %% HStreamDB producer was not properly cleared, restart it + %% the badarg error in gproc maybe caused by the pool is leaked after relup + _ = ecpool:stop_sup_pool(Pool), + ok = hstreamdb:start_producer(ProducerName, ProducerOptions); {error, Reason} -> - ?SLOG(error, #{ - msg => "starting_hstreamdb_connector_producer_failed", - reason => Reason - }), - {error, Reason} - end. + %% HStreamDB start producer failed + throw({start_producer_failed, Reason}) + end, + ProducerName. + +produce_name(ActionId) -> + list_to_binary("backend_hstream_producer:" ++ to_string(ActionId)). to_record(PartitionKeyTmpl, HRecordTmpl, Data) -> PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data), @@ -289,43 +369,46 @@ to_record(PartitionKey, RawRecord) -> to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) -> lists:map( - fun({send_message, Data}) -> + fun({_, Data}) -> to_record(PartitionKeyTmpl, HRecordTmpl, Data) end, BatchList ). -append_record(Producer, MultiPartsRecords, MaybeBatch) when is_list(MultiPartsRecords) -> +append_record(ResourceId, Producer, MultiPartsRecords, MaybeBatch) when + is_list(MultiPartsRecords) +-> lists:foreach( - fun(Record) -> append_record(Producer, Record, MaybeBatch) end, MultiPartsRecords + fun(Record) -> append_record(ResourceId, Producer, Record, MaybeBatch) end, + MultiPartsRecords ); -append_record(Producer, Record, MaybeBatch) when is_tuple(Record) -> - do_append_records(Producer, Record, MaybeBatch). +append_record(ResourceId, Producer, Record, MaybeBatch) when is_tuple(Record) -> + do_append_records(ResourceId, Producer, Record, MaybeBatch). %% TODO: only sync request supported. implement async request later. -do_append_records(Producer, Record, true = IsBatch) -> +do_append_records(ResourceId, Producer, Record, true = IsBatch) -> Result = hstreamdb:append(Producer, Record), - handle_result(Result, Record, IsBatch); -do_append_records(Producer, Record, false = IsBatch) -> + handle_result(ResourceId, Result, Record, IsBatch); +do_append_records(ResourceId, Producer, Record, false = IsBatch) -> Result = hstreamdb:append_flush(Producer, Record), - handle_result(Result, Record, IsBatch). + handle_result(ResourceId, Result, Record, IsBatch). -handle_result(ok = Result, Record, IsBatch) -> - handle_result({ok, Result}, Record, IsBatch); -handle_result({ok, Result}, Record, IsBatch) -> +handle_result(ResourceId, ok = Result, Record, IsBatch) -> + handle_result(ResourceId, {ok, Result}, Record, IsBatch); +handle_result(ResourceId, {ok, Result}, Record, IsBatch) -> ?tp( hstreamdb_connector_query_append_return, - #{result => Result, is_batch => IsBatch} + #{result => Result, is_batch => IsBatch, instance_id => ResourceId} ), ?SLOG(debug, #{ msg => "hstreamdb_producer_sync_append_success", record => Record, is_batch => IsBatch }); -handle_result({error, Reason} = Err, Record, IsBatch) -> +handle_result(ResourceId, {error, Reason} = Err, Record, IsBatch) -> ?tp( hstreamdb_connector_query_append_return, - #{error => Reason, is_batch => IsBatch} + #{error => Reason, is_batch => IsBatch, instance_id => ResourceId} ), ?SLOG(error, #{ msg => "hstreamdb_producer_sync_append_failed", @@ -335,12 +418,6 @@ handle_result({error, Reason} = Err, Record, IsBatch) -> }), Err. -client_name(InstId) -> - "client:" ++ to_string(InstId). - -produce_name(ActionId) -> - list_to_atom("producer:" ++ to_string(ActionId)). - record_template(#{record_template := RawHRecordTemplate}) -> emqx_placeholder:preproc_tmpl(RawHRecordTemplate); record_template(_) -> diff --git a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl index 4d165c03d..1ac489334 100644 --- a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl +++ b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl @@ -117,16 +117,21 @@ end_per_suite(_Config) -> ok. init_per_testcase(t_to_hrecord_failed, Config) -> + init_per_testcase_common(), meck:new([hstreamdb], [passthrough, no_history, no_link]), meck:expect(hstreamdb, to_record, fun(_, _, _) -> error(trans_to_hrecord_failed) end), Config; init_per_testcase(_Testcase, Config) -> + init_per_testcase_common(), %% drop stream and will create a new one in common_init/1 %% TODO: create a new stream for each test case delete_bridge(Config), snabbkaffe:start_trace(), Config. +init_per_testcase_common() -> + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(). + end_per_testcase(t_to_hrecord_failed, _Config) -> meck:unload([hstreamdb]); end_per_testcase(_Testcase, Config) -> @@ -301,7 +306,10 @@ t_simple_query(Config) -> {ok, _}, create_bridge(Config) ), - Requests = gen_batch_req(BatchSize), + Type = ?config(hstreamdb_bridge_type, Config), + Name = ?config(hstreamdb_name, Config), + ActionId = emqx_bridge_v2:id(Type, Name), + Requests = gen_batch_req(BatchSize, ActionId), ?check_trace( begin ?wait_async_action( @@ -351,6 +359,24 @@ t_to_hrecord_failed(Config) -> end, ok. +%% Connector Action Tests + +t_action_on_get_status(Config) -> + emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}). + +t_action_create_via_http(Config) -> + emqx_bridge_v2_testlib:t_create_via_http(Config). + +t_action_sync_query(Config) -> + MakeMessageFun = fun() -> rand_data() end, + IsSuccessCheck = fun(Result) -> ?assertEqual(ok, Result) end, + TracePoint = hstreamdb_connector_query_append_return, + emqx_bridge_v2_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint). + +t_action_start_stop(Config) -> + StopTracePoint = hstreamdb_connector_on_stop, + emqx_bridge_v2_testlib:t_start_stop(Config, StopTracePoint). + %%------------------------------------------------------------------------------ %% Helper fns %%------------------------------------------------------------------------------ @@ -362,6 +388,10 @@ common_init(ConfigT) -> URL = "http://" ++ Host ++ ":" ++ RawPort, Config0 = [ + {bridge_type, <<"hstreamdb">>}, + {bridge_name, <<"my_hstreamdb_action">>}, + {connector_type, <<"hstreamdb">>}, + {connector_name, <<"my_hstreamdb_connector">>}, {hstreamdb_host, Host}, {hstreamdb_port, Port}, {hstreamdb_url, URL}, @@ -393,6 +423,8 @@ common_init(ConfigT) -> {hstreamdb_config, HStreamDBConf}, {hstreamdb_bridge_type, BridgeType}, {hstreamdb_name, Name}, + {bridge_config, action_config(Config0)}, + {connector_config, connector_config(Config0)}, {proxy_host, ProxyHost}, {proxy_port, ProxyPort} | Config0 @@ -424,7 +456,7 @@ hstreamdb_config(BridgeType, Config) -> " resource_opts = {\n" %% always sync " query_mode = sync\n" - " request_ttl = 500ms\n" + " request_ttl = 10000ms\n" " batch_size = ~b\n" " worker_pool_size = ~b\n" " }\n" @@ -443,6 +475,45 @@ hstreamdb_config(BridgeType, Config) -> ), {Name, parse_and_check(ConfigString, BridgeType, Name)}. +action_config(Config) -> + ConnectorName = ?config(connector_name, Config), + BatchSize = batch_size(Config), + #{ + <<"connector">> => ConnectorName, + <<"enable">> => true, + <<"parameters">> => + #{ + <<"aggregation_pool_size">> => ?POOL_SIZE, + <<"record_template">> => ?RECORD_TEMPLATE, + <<"stream">> => ?STREAM, + <<"writer_pool_size">> => ?POOL_SIZE + }, + <<"resource_opts">> => + #{ + <<"batch_size">> => BatchSize, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"query_mode">> => <<"sync">>, + <<"request_ttl">> => <<"45s">>, + <<"worker_pool_size">> => ?POOL_SIZE + } + }. + +connector_config(Config) -> + Port = integer_to_list(?config(hstreamdb_port, Config)), + URL = "http://" ++ ?config(hstreamdb_host, Config) ++ ":" ++ Port, + #{ + <<"url">> => URL, + <<"ssl">> => + #{<<"enable">> => false, <<"verify">> => <<"verify_peer">>}, + <<"grpc_timeout">> => <<"30s">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_timeout">> => <<"5s">> + } + }. + parse_and_check(ConfigString, BridgeType, Name) -> {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), @@ -454,10 +525,10 @@ parse_and_check(ConfigString, BridgeType, Name) -> -define(CONN_ATTEMPTS, 10). default_options(Config) -> - [ - {url, ?config(hstreamdb_url, Config)}, - {rpc_options, ?RPC_OPTIONS} - ]. + #{ + url => ?config(hstreamdb_url, Config), + rpc_options => ?RPC_OPTIONS + }. connect_direct_hstream(Name, Config) -> client(Name, Config, ?CONN_ATTEMPTS). @@ -511,8 +582,9 @@ send_message(Config, Data) -> query_resource(Config, Request) -> Name = ?config(hstreamdb_name, Config), BridgeType = ?config(hstreamdb_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). + ID = emqx_bridge_v2:id(BridgeType, Name), + ResID = emqx_connector_resource:resource_id(BridgeType, Name), + emqx_resource:query(ID, Request, #{timeout => 1_000, connector_resource_id => ResID}). restart_resource(Config) -> BridgeName = ?config(hstreamdb_name, Config), @@ -526,8 +598,16 @@ resource_id(Config) -> BridgeType = ?config(hstreamdb_bridge_type, Config), _ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName). +action_id(Config) -> + ActionName = ?config(hstreamdb_name, Config), + ActionType = ?config(hstreamdb_bridge_type, Config), + _ActionID = emqx_bridge_v2:id(ActionType, ActionName). + health_check_resource_ok(Config) -> - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))). + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))), + ActionName = ?config(hstreamdb_name, Config), + ActionType = ?config(hstreamdb_bridge_type, Config), + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(ActionType, ActionName)). health_check_resource_down(Config) -> case emqx_resource_manager:health_check(resource_id(Config)) of @@ -539,6 +619,19 @@ health_check_resource_down(Config) -> ?assert( false, lists:flatten(io_lib:format("invalid health check result:~p~n", [Other])) ) + end, + ActionName = ?config(hstreamdb_name, Config), + ActionType = ?config(hstreamdb_bridge_type, Config), + #{status := StatusV2} = emqx_bridge_v2:health_check(ActionType, ActionName), + case StatusV2 of + disconnected -> + ok; + connecting -> + ok; + OtherV2 -> + ?assert( + false, lists:flatten(io_lib:format("invalid health check result:~p~n", [OtherV2])) + ) end. % These funs start and then stop the hstreamdb connection @@ -548,22 +641,36 @@ connect_and_create_stream(Config) -> Client, ?STREAM, ?REPLICATION_FACTOR, ?BACKLOG_RETENTION_SECOND, ?SHARD_COUNT ) ), - %% force write to stream to make it created and ready to be written data for rest cases - ProducerOptions = [ - {pool_size, 4}, - {stream, ?STREAM}, - {callback, fun(_) -> ok end}, - {max_records, 10}, - {interval, 1000} - ], + %% force write to stream to make it created and ready to be written data for test cases + ProducerOptions = #{ + stream => ?STREAM, + buffer_options => #{ + interval => 1000, + callback => {?MODULE, on_flush_result, [<<"WHAT">>]}, + max_records => 1, + max_batches => 1 + }, + buffer_pool_size => 1, + writer_options => #{ + grpc_timeout => 100 + }, + writer_pool_size => 1, + client_options => default_options(Config) + }, + ?WITH_CLIENT( begin - {ok, Producer} = hstreamdb:start_producer(Client, test_producer, ProducerOptions), - _ = hstreamdb:append_flush(Producer, hstreamdb:to_record([], raw, rand_payload())), - _ = hstreamdb:stop_producer(Producer) + ok = hstreamdb:start_producer(test_producer, ProducerOptions), + _ = hstreamdb:append_flush(test_producer, hstreamdb:to_record([], raw, rand_payload())), + _ = hstreamdb:stop_producer(test_producer) end ). +on_flush_result({{flush, _Stream, _Records}, {ok, _Resp}}) -> + ok; +on_flush_result({{flush, _Stream, _Records}, {error, _Reason}}) -> + ok. + connect_and_delete_stream(Config) -> ?WITH_CLIENT( _ = hstreamdb_client:delete_stream(Client, ?STREAM) @@ -593,11 +700,11 @@ rand_payload() -> temperature => rand:uniform(40), humidity => rand:uniform(100) }). -gen_batch_req(Count) when +gen_batch_req(Count, ActionId) when is_integer(Count) andalso Count > 0 -> - [{send_message, rand_data()} || _Val <- lists:seq(1, Count)]; -gen_batch_req(Count) -> + [{ActionId, rand_data()} || _Val <- lists:seq(1, Count)]; +gen_batch_req(Count, _ActionId) -> ct:pal("Gen batch requests failed with unexpected Count: ~p", [Count]). str(List) when is_list(List) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 4679e1bc4..07986b562 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -28,6 +28,8 @@ resource_type(confluent_producer) -> emqx_bridge_kafka_impl_producer; resource_type(gcp_pubsub_producer) -> emqx_bridge_gcp_pubsub_impl_producer; +resource_type(hstreamdb) -> + emqx_bridge_hstreamdb_connector; resource_type(kafka_producer) -> emqx_bridge_kafka_impl_producer; resource_type(kinesis) -> @@ -126,6 +128,14 @@ connector_structs() -> required => false } )}, + {hstreamdb, + mk( + hoconsc:map(name, ref(emqx_bridge_hstreamdb, "config_connector")), + #{ + desc => <<"HStreamDB Connector Config">>, + required => false + } + )}, {kafka_producer, mk( hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")), @@ -310,6 +320,7 @@ schema_modules() -> emqx_bridge_azure_event_hub, emqx_bridge_confluent_producer, emqx_bridge_gcp_pubsub_producer_schema, + emqx_bridge_hstreamdb, emqx_bridge_kafka, emqx_bridge_kinesis, emqx_bridge_matrix, @@ -349,6 +360,7 @@ api_schemas(Method) -> <<"gcp_pubsub_producer">>, Method ++ "_connector" ), + api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method ++ "_connector"), api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"), api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 27d7f6379..764a16849 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -128,6 +128,8 @@ connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer]; +connector_type_to_bridge_types(hstreamdb) -> + [hstreamdb]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(kinesis) -> diff --git a/changes/ee/feat-12512.en.md b/changes/ee/feat-12512.en.md new file mode 100644 index 000000000..77ea4d2dd --- /dev/null +++ b/changes/ee/feat-12512.en.md @@ -0,0 +1 @@ +The HStreamDB bridge has been split into connector and action components. Old HStreamDB bridges will be upgraded automatically but it is recommended to do the upgrade manually as new fields has been added to the configuration. diff --git a/mix.exs b/mix.exs index abbf5940c..598b770b8 100644 --- a/mix.exs +++ b/mix.exs @@ -200,7 +200,7 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ - {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"}, + {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, {:wolff, github: "kafka4beam/wolff", tag: "1.10.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, diff --git a/rel/i18n/emqx_bridge_hstreamdb.hocon b/rel/i18n/emqx_bridge_hstreamdb.hocon index de9989953..d9eb6cc22 100644 --- a/rel/i18n/emqx_bridge_hstreamdb.hocon +++ b/rel/i18n/emqx_bridge_hstreamdb.hocon @@ -47,4 +47,58 @@ NOTE: When you use `raw record` template (which means the data is not a valid JS record_template.label: """HStream Record""" +action_parameters.desc: +"""Action specific configuration.""" + +action_parameters.label: +"""Action""" + +grpc_flush_timeout.desc: +"""Time interval for flushing gRPC calls to the HStreamDB server.""" + +grpc_flush_timeout.label: +"""gRPC Flush Interval""" + +aggregation_pool_size.desc: +"""The size of the record aggregation pool. A larger aggregation pool size can lead to enhanced parallelization but may also result in reduced efficiency due to smaller batch sizes.""" + +aggregation_pool_size.label: +"""Aggregation Pool Size""" + +max_batches.desc: +"""Maximum number of unconfirmed batches in the flush queue.""" + +max_batches.label: +"""Max Batches""" + +writer_pool_size.desc: +"""The size of the writer pool. A larger pool may increase parallelization and concurrent write operations, potentially boosting throughput. Trade-offs include greater memory consumption and possible resource contention.""" + +writer_pool_size.label: +"""Writer Pool Size""" + +batch_size.desc: +"""Maximum number of insert data clauses that can be sent in a single request.""" + +batch_size.label: +"""Max Batch Append Count""" + +batch_interval.desc: +"""Maximum interval that is allowed between two successive (batch) request.""" + +batch_interval.label: +"""Max Batch Interval""" + +hstreamdb_action.desc: +"""Configuration for HStreamDB action.""" + +hstreamdb_action.label: +"""HStreamDB Action Configuration""" + +config_connector.desc: +"""Configuration for an HStreamDB connector.""" + +config_connector.label: +"""HStreamDB Connector Configuration""" + } diff --git a/rel/i18n/emqx_bridge_hstreamdb_connector.hocon b/rel/i18n/emqx_bridge_hstreamdb_connector.hocon index 8f7ac2edb..3122d59da 100644 --- a/rel/i18n/emqx_bridge_hstreamdb_connector.hocon +++ b/rel/i18n/emqx_bridge_hstreamdb_connector.hocon @@ -19,7 +19,7 @@ name.label: """Connector Name""" url.desc: -"""HStreamDB Server URL. Using gRPC http server address.""" +"""HStreamDB Server URL. This URL will be used as the gRPC HTTP server address.""" url.label: """HStreamDB Server URL""" @@ -37,13 +37,13 @@ partition_key.label: """HStreamDB Partition Key""" pool_size.desc: -"""HStreamDB Pool Size.""" +"""The size of the aggregation pool and the writer pool (see the description of the HStreamDB action for more information about these pools). Larger pool sizes can enhance parallelization but may also reduce efficiency due to smaller batch sizes.""" pool_size.label: """HStreamDB Pool Size""" grpc_timeout.desc: -"""HStreamDB gRPC Timeout.""" +"""The timeout for HStreamDB gRPC requests.""" grpc_timeout.label: """HStreamDB gRPC Timeout"""