From a153d758c3a8c7130727563440f089df32d97959 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 12 Feb 2024 12:42:16 +0100 Subject: [PATCH 1/6] feat: refactor HStreamDB bridge to connector and action This commit also upgrades the hstreamdb_erl driver library and change the action/bridge to use the new hstreamdb_erl. Much of the code for the new API is copied from: https://github.com/emqx/emqx-enterprise/blob/be1a1604dd5c4c06ea6be15b702f35d9bdec48a7/lib-ee/emqx_rule_actions/src/emqx_backend_hstreamdb_actions.erl Fixes: https://emqx.atlassian.net/browse/EMQX-11458 --- .ci/docker-compose-file/.env | 2 +- apps/emqx_bridge/src/emqx_action_info.erl | 1 + apps/emqx_bridge_hstreamdb/rebar.config | 2 +- .../src/emqx_bridge_hstreamdb.app.src | 4 +- .../src/emqx_bridge_hstreamdb.erl | 157 ++++++- .../src/emqx_bridge_hstreamdb_action_info.erl | 88 ++++ .../src/emqx_bridge_hstreamdb_connector.erl | 412 +++++++++++------- .../test/emqx_bridge_hstreamdb_SUITE.erl | 153 ++++++- .../src/schema/emqx_connector_ee_schema.erl | 12 + .../src/schema/emqx_connector_schema.erl | 2 + mix.exs | 2 +- 11 files changed, 629 insertions(+), 206 deletions(-) create mode 100644 apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl diff --git a/.ci/docker-compose-file/.env b/.ci/docker-compose-file/.env index 73ec47d00..1b837aea3 100644 --- a/.ci/docker-compose-file/.env +++ b/.ci/docker-compose-file/.env @@ -10,7 +10,7 @@ CASSANDRA_TAG=3.11 MINIO_TAG=RELEASE.2023-03-20T20-16-18Z OPENTS_TAG=9aa7f88 KINESIS_TAG=2.1 -HSTREAMDB_TAG=v0.16.1 +HSTREAMDB_TAG=v0.19.3 HSTREAMDB_ZK_TAG=3.8.1 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 3a9e13f94..a57b7eed0 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -91,6 +91,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_kafka_action_info, emqx_bridge_kinesis_action_info, + emqx_bridge_hstreamdb_action_info, emqx_bridge_matrix_action_info, emqx_bridge_mongodb_action_info, emqx_bridge_oracle_action_info, diff --git a/apps/emqx_bridge_hstreamdb/rebar.config b/apps/emqx_bridge_hstreamdb/rebar.config index eab7bcb3f..c2e3194ac 100644 --- a/apps/emqx_bridge_hstreamdb/rebar.config +++ b/apps/emqx_bridge_hstreamdb/rebar.config @@ -3,7 +3,7 @@ {erl_opts, [debug_info]}. {deps, [ {hstreamdb_erl, - {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.4.5+v0.16.1"}}}, + {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.5.18+v0.18.1"}}}, {emqx, {path, "../../apps/emqx"}}, {emqx_utils, {path, "../../apps/emqx_utils"}} ]}. diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src index f9825e3dd..84c09fe3a 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_hstreamdb, [ {description, "EMQX Enterprise HStreamDB Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, @@ -8,7 +8,7 @@ emqx_resource, hstreamdb_erl ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_hstreamdb_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl index 7052e0120..ee0baaa4c 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl @@ -6,10 +6,12 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --import(hoconsc, [mk/2, enum/1, ref/2]). +-import(hoconsc, [mk/2, enum/1]). -export([ - conn_bridge_examples/1 + conn_bridge_examples/1, + bridge_v2_examples/1, + connector_examples/1 ]). -export([ @@ -19,6 +21,11 @@ desc/1 ]). +-define(CONNECTOR_TYPE, hstreamdb). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). +-define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>). +-define(DEFAULT_GRPC_FLUSH_TIMEOUT_RAW, <<"10s">>). + %% ------------------------------------------------------------------------------------------------- %% api @@ -27,16 +34,16 @@ conn_bridge_examples(Method) -> #{ <<"hstreamdb">> => #{ summary => <<"HStreamDB Bridge">>, - value => values(Method) + value => conn_bridge_example_values(Method) } } ]. -values(get) -> - values(post); -values(put) -> - values(post); -values(post) -> +conn_bridge_example_values(get) -> + conn_bridge_example_values(post); +conn_bridge_example_values(put) -> + conn_bridge_example_values(post); +conn_bridge_example_values(post) -> #{ type => <<"hstreamdb">>, name => <<"demo">>, @@ -55,15 +62,135 @@ values(post) -> }, ssl => #{enable => false} }; -values(_) -> +conn_bridge_example_values(_) -> #{}. +connector_examples(Method) -> + [ + #{ + <<"hstreamdb">> => + #{ + summary => <<"HStreamDB Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_values() + ) + } + } + ]. + +connector_values() -> + #{ + <<"url">> => <<"http://127.0.0.1:6570">>, + <<"grpc_timeout">> => <<"30s">>, + <<"ssl">> => + #{ + <<"enable">> => false, + <<"verify">> => <<"verify_peer">> + }, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_timeout">> => <<"5s">> + } + }. + +bridge_v2_examples(Method) -> + [ + #{ + <<"hstreamdb">> => + #{ + summary => <<"HStreamDB Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{ + <<"parameters">> => #{ + <<"aggregation_pool_size">> => 8, + <<"partition_key">> => <<"hej">>, + <<"record_template">> => <<"${payload}">>, + <<"stream">> => <<"mqtt_message">>, + <<"writer_pool_size">> => 8 + } + }. + %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions namespace() -> "bridge_hstreamdb". roots() -> []. +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + Fields = + fields(connector_fields) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(hstreamdb_action)); +fields(action) -> + {?ACTION_TYPE, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, hstreamdb_action)), + #{ + desc => <<"HStreamDB Action Config">>, + required => false + } + )}; +fields(hstreamdb_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + hoconsc:ref(?MODULE, action_parameters), + #{ + required => true, + desc => ?DESC("action_parameters") + } + ) + ); +fields(action_parameters) -> + [ + {stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})}, + + {partition_key, mk(binary(), #{required => false, desc => ?DESC("partition_key")})}, + + {grpc_flush_timeout, fun grpc_flush_timeout/1}, + {record_template, + mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})}, + {aggregation_pool_size, + mk(integer(), #{default => 8, desc => ?DESC("aggregation_pool_size")})}, + {max_batches, mk(integer(), #{default => 500, desc => ?DESC("max_batches")})}, + {writer_pool_size, mk(integer(), #{default => 8, desc => ?DESC("writer_pool_size")})}, + {batch_size, mk(integer(), #{default => 100, desc => ?DESC("batch_size")})}, + {batch_interval, + mk(emqx_schema:timeout_duration_ms(), #{ + default => <<"500ms">>, desc => ?DESC("batch_interval") + })} + ]; +fields(connector_fields) -> + [ + {url, + mk(binary(), #{ + required => true, desc => ?DESC("url"), default => <<"http://127.0.0.1:6570">> + })}, + {grpc_timeout, fun grpc_timeout/1} + ] ++ emqx_connector_schema_lib:ssl_fields(); +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + fields(connector_fields) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields("config") -> hstream_bridge_common_fields() ++ connector_fields(); @@ -80,6 +207,18 @@ fields("put") -> hstream_bridge_common_fields() ++ connector_fields(). +grpc_timeout(type) -> emqx_schema:timeout_duration_ms(); +grpc_timeout(desc) -> ?DESC("grpc_timeout"); +grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW; +grpc_timeout(required) -> false; +grpc_timeout(_) -> undefined. + +grpc_flush_timeout(type) -> emqx_schema:timeout_duration_ms(); +grpc_flush_timeout(desc) -> ?DESC("grpc_timeout"); +grpc_flush_timeout(default) -> ?DEFAULT_GRPC_FLUSH_TIMEOUT_RAW; +grpc_flush_timeout(required) -> false; +grpc_flush_timeout(_) -> undefined. + hstream_bridge_common_fields() -> emqx_bridge_schema:common_bridge_fields() ++ [ diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl new file mode 100644 index 000000000..66188110f --- /dev/null +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl @@ -0,0 +1,88 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_hstreamdb_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0, + bridge_v1_config_to_connector_config/1, + bridge_v1_config_to_action_config/2, + connector_action_config_to_bridge_v1_config/2 +]). + +bridge_v1_type_name() -> hstreamdb. + +action_type_name() -> hstreamdb. + +connector_type_name() -> hstreamdb. + +schema_module() -> emqx_bridge_hstreamdb. + +bridge_v1_config_to_connector_config(BridgeV1Conf) -> + ConnectorSchema = emqx_bridge_hstreamdb:fields(connector_fields), + ConnectorAtomKeys = lists:foldl(fun({K, _}, Acc) -> [K | Acc] end, [], ConnectorSchema), + ConnectorBinKeys = [atom_to_binary(K) || K <- ConnectorAtomKeys] ++ [<<"resource_opts">>], + ConnectorConfig0 = maps:with(ConnectorBinKeys, BridgeV1Conf), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnectorConfig0 + ). + +bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> + Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config( + BridgeV1Conf, ConnectorName, emqx_bridge_hstreamdb, "config_connector" + ), + %% Remove fields no longer relevant for the action + Config1 = lists:foldl( + fun(Field, Acc) -> + emqx_utils_maps:deep_remove(Field, Acc) + end, + Config0, + [ + [<<"parameters">>, <<"pool_size">>], + [<<"parameters">>, <<"direction">>] + ] + ), + %% Move pool_size to aggregation_pool_size and writer_pool_size + PoolSize = maps:get(<<"pool_size">>, BridgeV1Conf, 8), + Config2 = emqx_utils_maps:deep_put( + [<<"parameters">>, <<"aggregation_pool_size">>], + Config1, + PoolSize + ), + Config3 = emqx_utils_maps:deep_put( + [<<"parameters">>, <<"writer_pool_size">>], + Config2, + PoolSize + ), + Config3. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig), + BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1), + BridgeV1Config3 = maps:remove(<<"parameters">>, BridgeV1Config2), + %% Pick out pool_size from aggregation_pool_size + PoolSize = emqx_utils_maps:deep_get( + [<<"parameters">>, <<"aggregation_pool_size">>], ActionConfig, 8 + ), + BridgeV1Config4 = maps:put(<<"pool_size">>, PoolSize, BridgeV1Config3), + + %% Move the fields stream, partition_key and record_template from parameters in ActionConfig to the top level in BridgeV1Config + lists:foldl( + fun(Field, Acc) -> + emqx_utils_maps:deep_put( + [Field], + Acc, + emqx_utils_maps:deep_get([<<"parameters">>, Field], ActionConfig, <<>>) + ) + end, + BridgeV1Config4, + [<<"stream">>, <<"partition_key">>, <<"record_template">>] + ). diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index fdb80b1e1..bb65089d7 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -7,8 +7,9 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). --import(hoconsc, [mk/2, enum/1]). +-import(hoconsc, [mk/2]). -behaviour(emqx_resource). @@ -19,7 +20,11 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). -export([ @@ -38,67 +43,132 @@ -define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)). -define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>). +-define(DEFAULT_GRPC_FLUSH_TIMEOUT, 10000). +-define(DEFAULT_MAX_BATCHES, 500). +-define(DEFAULT_BATCH_INTERVAL, 500). +-define(DEFAULT_AGG_POOL_SIZE, 8). +-define(DEFAULT_WRITER_POOL_SIZE, 8). %% ------------------------------------------------------------------------------------------------- %% resource callback callback_mode() -> always_sync. on_start(InstId, Config) -> - start_client(InstId, Config). + try + do_on_start(InstId, Config) + catch + E:R:S -> + Error = #{ + msg => "start_hstreamdb_connector_error", + connector => InstId, + error => E, + reason => R, + stack => S + }, + ?SLOG(error, Error), + {error, Error} + end. + +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels, + client_options := ClientOptions + } = OldState, + ChannelId, + ChannelConfig +) -> + %{ok, ChannelState} = create_channel_state(ChannelId, PoolName, ChannelConfig), + Parameters0 = maps:get(parameters, ChannelConfig), + Parameters = Parameters0#{client_options => ClientOptions}, + PartitionKey = emqx_placeholder:preproc_tmpl(maps:get(partition_key, Parameters, <<"">>)), + try + ChannelState = #{ + producer => start_producer(ChannelId, Parameters), + record_template => record_template(Parameters), + partition_key => PartitionKey + }, + NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState} + catch + Error:Reason:Stack -> + {error, {Error, Reason, Stack}} + end. + +on_remove_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId +) -> + #{ + producer := Producer + } = maps:get(ChannelId, InstalledChannels), + _ = hstreamdb:stop_producer(Producer), + NewInstalledChannels = maps:remove(ChannelId, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +on_get_channel_status( + _ResId, + _ChannelId, + _State +) -> + ?status_connected. + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). on_stop(InstId, _State) -> - case emqx_resource:get_allocated_resources(InstId) of - #{?hstreamdb_client := #{client := Client, producer := Producer}} -> - StopClientRes = hstreamdb:stop_client(Client), - StopProducerRes = hstreamdb:stop_producer(Producer), - ?SLOG(info, #{ - msg => "stop_hstreamdb_connector", - connector => InstId, - client => Client, - producer => Producer, - stop_client => StopClientRes, - stop_producer => StopProducerRes - }); - _ -> - ok - end. + ?tp( + hstreamdb_connector_on_stop, + #{instance_id => InstId} + ). -define(FAILED_TO_APPLY_HRECORD_TEMPLATE, {error, {unrecoverable_error, failed_to_apply_hrecord_template}} ). on_query( - _InstId, - {send_message, Data}, - _State = #{ - producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate - } + InstId, + {ChannelID, Data}, + #{installed_channels := Channels} = _State ) -> + #{ + producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate + } = maps:get(ChannelID, Channels), try to_record(PartitionKey, HRecordTemplate, Data) of - Record -> append_record(Producer, Record, false) + Record -> append_record(InstId, Producer, Record, false) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. on_batch_query( - _InstId, - BatchList, - _State = #{ - producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate - } + InstId, + [{ChannelID, _Data} | _] = BatchList, + #{installed_channels := Channels} = _State ) -> + #{ + producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate + } = maps:get(ChannelID, Channels), try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of - Records -> append_record(Producer, Records, true) + Records -> append_record(InstId, Producer, Records, true) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. -on_get_status(_InstId, #{client := Client}) -> - case is_alive(Client) of - true -> - connected; - false -> - disconnected +on_get_status(_InstId, State) -> + case check_status(State) of + ok -> + ?status_connected; + Error -> + %% We set it to ?status_connecting so that the channels are not deleted. + %% The producers in the channels contains buffers so we don't want to delete them. + {?status_connecting, State, Error} end. %% ------------------------------------------------------------------------------------------------- @@ -140,142 +210,149 @@ desc(config) -> %% ------------------------------------------------------------------------------------------------- %% internal functions -start_client(InstId, Config) -> - try - do_start_client(InstId, Config) - catch - E:R:S -> - Error = #{ - msg => "start_hstreamdb_connector_error", - connector => InstId, - error => E, - reason => R, - stack => S - }, - ?SLOG(error, Error), - {error, Error} - end. -do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize, ssl := SSL}) -> +do_on_start(InstId, Config) -> ?SLOG(info, #{ msg => "starting_hstreamdb_connector_client", connector => InstId, config => Config }), - ClientName = client_name(InstId), + {ok, _} = application:ensure_all_started(hstreamdb_erl), + ClientOptions = client_options(Config), + State = #{ + client_options => ClientOptions, + installed_channels => #{} + }, + case check_status(State) of + ok -> + ?SLOG(info, #{ + msg => "hstreamdb_connector_client_started", + connector => InstId + }), + {ok, State}; + Error -> + ?tp( + hstreamdb_connector_start_failed, + #{error => client_not_alive} + ), + ?SLOG(error, #{ + msg => "hstreamdb_connector_client_not_alive", + connector => InstId, + error => Error + }), + {error, {connect_failed, Error}} + end. + +client_options(Config = #{url := ServerURL, ssl := SSL}) -> + GRPCTimeout = maps:get(<<"grpc_timeout">>, Config, ?DEFAULT_GRPC_TIMEOUT), + EnableSSL = maps:get(enable, SSL), RpcOpts = - case maps:get(enable, SSL) of + case EnableSSL of false -> - #{pool_size => PoolSize}; + #{pool_size => 1}; true -> #{ - pool_size => PoolSize, + pool_size => 1, gun_opts => #{ transport => tls, - transport_opts => emqx_tls_lib:to_client_opts(SSL) + transport_opts => + emqx_tls_lib:to_client_opts(SSL) } } end, - ClientOptions = [ - {url, binary_to_list(Server)}, - {rpc_options, RpcOpts} - ], - case hstreamdb:start_client(ClientName, ClientOptions) of + ClientOptions = #{ + url => to_string(ServerURL), + grpc_timeout => GRPCTimeout, + rpc_options => RpcOpts + }, + ClientOptions. + +check_status(ConnectorState) -> + try start_client(ConnectorState) of {ok, Client} -> - case is_alive(Client) of - true -> - ?SLOG(info, #{ - msg => "hstreamdb_connector_client_started", - connector => InstId, - client => Client - }), - start_producer(InstId, Client, Config); - _ -> - ?tp( - hstreamdb_connector_start_failed, - #{error => client_not_alive} - ), - ?SLOG(error, #{ - msg => "hstreamdb_connector_client_not_alive", - connector => InstId - }), - {error, connect_failed} + try hstreamdb_client:echo(Client) of + ok -> ok; + {error, _} = ErrorEcho -> ErrorEcho + after + _ = hstreamdb:stop_client(Client) end; - {error, {already_started, Pid}} -> - ?SLOG(info, #{ - msg => "starting_hstreamdb_connector_client_find_old_client_restart_client", - old_client_pid => Pid, - old_client_name => ClientName - }), - _ = hstreamdb:stop_client(ClientName), - start_client(InstId, Config); + {error, _} = StartClientError -> + StartClientError + catch + ErrorType:Reason:_ST -> + {error, {ErrorType, Reason}} + end. + +start_client(Opts) -> + ClientOptions = maps:get(client_options, Opts), + case hstreamdb:start_client(ClientOptions) of + {ok, Client} -> + {ok, Client}; {error, Error} -> - ?SLOG(error, #{ - msg => "hstreamdb_connector_client_failed", - connector => InstId, - reason => Error - }), {error, Error} end. -is_alive(Client) -> - hstreamdb_client:echo(Client) =:= ok. - start_producer( - InstId, - Client, - Options = #{stream := Stream, pool_size := PoolSize} + ActionId, + #{ + stream := Stream, + batch_size := BatchSize, + batch_interval := Interval + } = Opts ) -> - %% TODO: change these batch options after we have better disk cache. - BatchSize = maps:get(batch_size, Options, 100), - Interval = maps:get(batch_interval, Options, 1000), - ProducerOptions = [ - {stream, Stream}, - {callback, {?MODULE, on_flush_result, []}}, - {max_records, BatchSize}, - {interval, Interval}, - {pool_size, PoolSize}, - {grpc_timeout, maps:get(grpc_timeout, Options, ?DEFAULT_GRPC_TIMEOUT)} - ], - Name = produce_name(InstId), - ?SLOG(info, #{ - msg => "starting_hstreamdb_connector_producer", - connector => InstId - }), - case hstreamdb:start_producer(Client, Name, ProducerOptions) of - {ok, Producer} -> - ?SLOG(info, #{ - msg => "hstreamdb_connector_producer_started" - }), - State = #{ - client => Client, - producer => Producer, - enable_batch => maps:get(enable_batch, Options, false), - partition_key => emqx_placeholder:preproc_tmpl( - maps:get(partition_key, Options, <<"">>) - ), - record_template => record_template(Options) - }, - ok = emqx_resource:allocate_resource(InstId, ?hstreamdb_client, #{ - client => Client, producer => Producer - }), - {ok, State}; - {error, {already_started, Pid}} -> - ?SLOG(info, #{ - msg => - "starting_hstreamdb_connector_producer_find_old_producer_restart_producer", - old_producer_pid => Pid, - old_producer_name => Name - }), - _ = hstreamdb:stop_producer(Name), - start_producer(InstId, Client, Options); + MaxBatches = maps:get(max_batches, Opts, ?DEFAULT_MAX_BATCHES), + AggPoolSize = maps:get(aggregation_pool_size, Opts, ?DEFAULT_AGG_POOL_SIZE), + WriterPoolSize = maps:get(writer_pool_size, Opts, ?DEFAULT_WRITER_POOL_SIZE), + GRPCTimeout = maps:get(grpc_flush_timeout, Opts, ?DEFAULT_GRPC_FLUSH_TIMEOUT), + ClientOptions = maps:get(client_options, Opts), + ProducerOptions = #{ + stream => to_string(Stream), + buffer_options => #{ + interval => Interval, + callback => {?MODULE, on_flush_result, [ActionId]}, + max_records => BatchSize, + max_batches => MaxBatches + }, + buffer_pool_size => AggPoolSize, + writer_options => #{ + grpc_timeout => GRPCTimeout + }, + writer_pool_size => WriterPoolSize, + client_options => ClientOptions + }, + Name = produce_name(ActionId), + ensure_start_producer(Name, ProducerOptions). + +ensure_start_producer(ProducerName, ProducerOptions) -> + case hstreamdb:start_producer(ProducerName, ProducerOptions) of + ok -> + ok; + {error, {already_started, _Pid}} -> + %% HStreamDB producer already started, restart it + _ = hstreamdb:stop_producer(ProducerName), + %% the pool might have been leaked after relup + _ = ecpool:stop_sup_pool(ProducerName), + ok = hstreamdb:start_producer(ProducerName, ProducerOptions); + {error, { + {shutdown, + {failed_to_start_child, {pool_sup, Pool}, + {shutdown, + {failed_to_start_child, worker_sup, + {shutdown, {failed_to_start_child, _, {badarg, _}}}}}}}, + _ + }} -> + %% HStreamDB producer was not properly cleared, restart it + %% the badarg error in gproc maybe caused by the pool is leaked after relup + _ = ecpool:stop_sup_pool(Pool), + ok = hstreamdb:start_producer(ProducerName, ProducerOptions); {error, Reason} -> - ?SLOG(error, #{ - msg => "starting_hstreamdb_connector_producer_failed", - reason => Reason - }), - {error, Reason} - end. + %% HStreamDB start producer failed + throw({start_producer_failed, Reason}) + end, + ProducerName. + +produce_name(ActionId) -> + list_to_binary("backend_hstream_producer:" ++ to_string(ActionId)). to_record(PartitionKeyTmpl, HRecordTmpl, Data) -> PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data), @@ -289,43 +366,46 @@ to_record(PartitionKey, RawRecord) -> to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) -> lists:map( - fun({send_message, Data}) -> + fun({_, Data}) -> to_record(PartitionKeyTmpl, HRecordTmpl, Data) end, BatchList ). -append_record(Producer, MultiPartsRecords, MaybeBatch) when is_list(MultiPartsRecords) -> +append_record(ResourceId, Producer, MultiPartsRecords, MaybeBatch) when + is_list(MultiPartsRecords) +-> lists:foreach( - fun(Record) -> append_record(Producer, Record, MaybeBatch) end, MultiPartsRecords + fun(Record) -> append_record(ResourceId, Producer, Record, MaybeBatch) end, + MultiPartsRecords ); -append_record(Producer, Record, MaybeBatch) when is_tuple(Record) -> - do_append_records(Producer, Record, MaybeBatch). +append_record(ResourceId, Producer, Record, MaybeBatch) when is_tuple(Record) -> + do_append_records(ResourceId, Producer, Record, MaybeBatch). %% TODO: only sync request supported. implement async request later. -do_append_records(Producer, Record, true = IsBatch) -> +do_append_records(ResourceId, Producer, Record, true = IsBatch) -> Result = hstreamdb:append(Producer, Record), - handle_result(Result, Record, IsBatch); -do_append_records(Producer, Record, false = IsBatch) -> + handle_result(ResourceId, Result, Record, IsBatch); +do_append_records(ResourceId, Producer, Record, false = IsBatch) -> Result = hstreamdb:append_flush(Producer, Record), - handle_result(Result, Record, IsBatch). + handle_result(ResourceId, Result, Record, IsBatch). -handle_result(ok = Result, Record, IsBatch) -> - handle_result({ok, Result}, Record, IsBatch); -handle_result({ok, Result}, Record, IsBatch) -> +handle_result(ResourceId, ok = Result, Record, IsBatch) -> + handle_result(ResourceId, {ok, Result}, Record, IsBatch); +handle_result(ResourceId, {ok, Result}, Record, IsBatch) -> ?tp( hstreamdb_connector_query_append_return, - #{result => Result, is_batch => IsBatch} + #{result => Result, is_batch => IsBatch, instance_id => ResourceId} ), ?SLOG(debug, #{ msg => "hstreamdb_producer_sync_append_success", record => Record, is_batch => IsBatch }); -handle_result({error, Reason} = Err, Record, IsBatch) -> +handle_result(ResourceId, {error, Reason} = Err, Record, IsBatch) -> ?tp( hstreamdb_connector_query_append_return, - #{error => Reason, is_batch => IsBatch} + #{error => Reason, is_batch => IsBatch, instance_id => ResourceId} ), ?SLOG(error, #{ msg => "hstreamdb_producer_sync_append_failed", @@ -335,12 +415,6 @@ handle_result({error, Reason} = Err, Record, IsBatch) -> }), Err. -client_name(InstId) -> - "client:" ++ to_string(InstId). - -produce_name(ActionId) -> - list_to_atom("producer:" ++ to_string(ActionId)). - record_template(#{record_template := RawHRecordTemplate}) -> emqx_placeholder:preproc_tmpl(RawHRecordTemplate); record_template(_) -> diff --git a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl index 4d165c03d..1ac489334 100644 --- a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl +++ b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl @@ -117,16 +117,21 @@ end_per_suite(_Config) -> ok. init_per_testcase(t_to_hrecord_failed, Config) -> + init_per_testcase_common(), meck:new([hstreamdb], [passthrough, no_history, no_link]), meck:expect(hstreamdb, to_record, fun(_, _, _) -> error(trans_to_hrecord_failed) end), Config; init_per_testcase(_Testcase, Config) -> + init_per_testcase_common(), %% drop stream and will create a new one in common_init/1 %% TODO: create a new stream for each test case delete_bridge(Config), snabbkaffe:start_trace(), Config. +init_per_testcase_common() -> + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(). + end_per_testcase(t_to_hrecord_failed, _Config) -> meck:unload([hstreamdb]); end_per_testcase(_Testcase, Config) -> @@ -301,7 +306,10 @@ t_simple_query(Config) -> {ok, _}, create_bridge(Config) ), - Requests = gen_batch_req(BatchSize), + Type = ?config(hstreamdb_bridge_type, Config), + Name = ?config(hstreamdb_name, Config), + ActionId = emqx_bridge_v2:id(Type, Name), + Requests = gen_batch_req(BatchSize, ActionId), ?check_trace( begin ?wait_async_action( @@ -351,6 +359,24 @@ t_to_hrecord_failed(Config) -> end, ok. +%% Connector Action Tests + +t_action_on_get_status(Config) -> + emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}). + +t_action_create_via_http(Config) -> + emqx_bridge_v2_testlib:t_create_via_http(Config). + +t_action_sync_query(Config) -> + MakeMessageFun = fun() -> rand_data() end, + IsSuccessCheck = fun(Result) -> ?assertEqual(ok, Result) end, + TracePoint = hstreamdb_connector_query_append_return, + emqx_bridge_v2_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint). + +t_action_start_stop(Config) -> + StopTracePoint = hstreamdb_connector_on_stop, + emqx_bridge_v2_testlib:t_start_stop(Config, StopTracePoint). + %%------------------------------------------------------------------------------ %% Helper fns %%------------------------------------------------------------------------------ @@ -362,6 +388,10 @@ common_init(ConfigT) -> URL = "http://" ++ Host ++ ":" ++ RawPort, Config0 = [ + {bridge_type, <<"hstreamdb">>}, + {bridge_name, <<"my_hstreamdb_action">>}, + {connector_type, <<"hstreamdb">>}, + {connector_name, <<"my_hstreamdb_connector">>}, {hstreamdb_host, Host}, {hstreamdb_port, Port}, {hstreamdb_url, URL}, @@ -393,6 +423,8 @@ common_init(ConfigT) -> {hstreamdb_config, HStreamDBConf}, {hstreamdb_bridge_type, BridgeType}, {hstreamdb_name, Name}, + {bridge_config, action_config(Config0)}, + {connector_config, connector_config(Config0)}, {proxy_host, ProxyHost}, {proxy_port, ProxyPort} | Config0 @@ -424,7 +456,7 @@ hstreamdb_config(BridgeType, Config) -> " resource_opts = {\n" %% always sync " query_mode = sync\n" - " request_ttl = 500ms\n" + " request_ttl = 10000ms\n" " batch_size = ~b\n" " worker_pool_size = ~b\n" " }\n" @@ -443,6 +475,45 @@ hstreamdb_config(BridgeType, Config) -> ), {Name, parse_and_check(ConfigString, BridgeType, Name)}. +action_config(Config) -> + ConnectorName = ?config(connector_name, Config), + BatchSize = batch_size(Config), + #{ + <<"connector">> => ConnectorName, + <<"enable">> => true, + <<"parameters">> => + #{ + <<"aggregation_pool_size">> => ?POOL_SIZE, + <<"record_template">> => ?RECORD_TEMPLATE, + <<"stream">> => ?STREAM, + <<"writer_pool_size">> => ?POOL_SIZE + }, + <<"resource_opts">> => + #{ + <<"batch_size">> => BatchSize, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"query_mode">> => <<"sync">>, + <<"request_ttl">> => <<"45s">>, + <<"worker_pool_size">> => ?POOL_SIZE + } + }. + +connector_config(Config) -> + Port = integer_to_list(?config(hstreamdb_port, Config)), + URL = "http://" ++ ?config(hstreamdb_host, Config) ++ ":" ++ Port, + #{ + <<"url">> => URL, + <<"ssl">> => + #{<<"enable">> => false, <<"verify">> => <<"verify_peer">>}, + <<"grpc_timeout">> => <<"30s">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_timeout">> => <<"5s">> + } + }. + parse_and_check(ConfigString, BridgeType, Name) -> {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), @@ -454,10 +525,10 @@ parse_and_check(ConfigString, BridgeType, Name) -> -define(CONN_ATTEMPTS, 10). default_options(Config) -> - [ - {url, ?config(hstreamdb_url, Config)}, - {rpc_options, ?RPC_OPTIONS} - ]. + #{ + url => ?config(hstreamdb_url, Config), + rpc_options => ?RPC_OPTIONS + }. connect_direct_hstream(Name, Config) -> client(Name, Config, ?CONN_ATTEMPTS). @@ -511,8 +582,9 @@ send_message(Config, Data) -> query_resource(Config, Request) -> Name = ?config(hstreamdb_name, Config), BridgeType = ?config(hstreamdb_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). + ID = emqx_bridge_v2:id(BridgeType, Name), + ResID = emqx_connector_resource:resource_id(BridgeType, Name), + emqx_resource:query(ID, Request, #{timeout => 1_000, connector_resource_id => ResID}). restart_resource(Config) -> BridgeName = ?config(hstreamdb_name, Config), @@ -526,8 +598,16 @@ resource_id(Config) -> BridgeType = ?config(hstreamdb_bridge_type, Config), _ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName). +action_id(Config) -> + ActionName = ?config(hstreamdb_name, Config), + ActionType = ?config(hstreamdb_bridge_type, Config), + _ActionID = emqx_bridge_v2:id(ActionType, ActionName). + health_check_resource_ok(Config) -> - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))). + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))), + ActionName = ?config(hstreamdb_name, Config), + ActionType = ?config(hstreamdb_bridge_type, Config), + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(ActionType, ActionName)). health_check_resource_down(Config) -> case emqx_resource_manager:health_check(resource_id(Config)) of @@ -539,6 +619,19 @@ health_check_resource_down(Config) -> ?assert( false, lists:flatten(io_lib:format("invalid health check result:~p~n", [Other])) ) + end, + ActionName = ?config(hstreamdb_name, Config), + ActionType = ?config(hstreamdb_bridge_type, Config), + #{status := StatusV2} = emqx_bridge_v2:health_check(ActionType, ActionName), + case StatusV2 of + disconnected -> + ok; + connecting -> + ok; + OtherV2 -> + ?assert( + false, lists:flatten(io_lib:format("invalid health check result:~p~n", [OtherV2])) + ) end. % These funs start and then stop the hstreamdb connection @@ -548,22 +641,36 @@ connect_and_create_stream(Config) -> Client, ?STREAM, ?REPLICATION_FACTOR, ?BACKLOG_RETENTION_SECOND, ?SHARD_COUNT ) ), - %% force write to stream to make it created and ready to be written data for rest cases - ProducerOptions = [ - {pool_size, 4}, - {stream, ?STREAM}, - {callback, fun(_) -> ok end}, - {max_records, 10}, - {interval, 1000} - ], + %% force write to stream to make it created and ready to be written data for test cases + ProducerOptions = #{ + stream => ?STREAM, + buffer_options => #{ + interval => 1000, + callback => {?MODULE, on_flush_result, [<<"WHAT">>]}, + max_records => 1, + max_batches => 1 + }, + buffer_pool_size => 1, + writer_options => #{ + grpc_timeout => 100 + }, + writer_pool_size => 1, + client_options => default_options(Config) + }, + ?WITH_CLIENT( begin - {ok, Producer} = hstreamdb:start_producer(Client, test_producer, ProducerOptions), - _ = hstreamdb:append_flush(Producer, hstreamdb:to_record([], raw, rand_payload())), - _ = hstreamdb:stop_producer(Producer) + ok = hstreamdb:start_producer(test_producer, ProducerOptions), + _ = hstreamdb:append_flush(test_producer, hstreamdb:to_record([], raw, rand_payload())), + _ = hstreamdb:stop_producer(test_producer) end ). +on_flush_result({{flush, _Stream, _Records}, {ok, _Resp}}) -> + ok; +on_flush_result({{flush, _Stream, _Records}, {error, _Reason}}) -> + ok. + connect_and_delete_stream(Config) -> ?WITH_CLIENT( _ = hstreamdb_client:delete_stream(Client, ?STREAM) @@ -593,11 +700,11 @@ rand_payload() -> temperature => rand:uniform(40), humidity => rand:uniform(100) }). -gen_batch_req(Count) when +gen_batch_req(Count, ActionId) when is_integer(Count) andalso Count > 0 -> - [{send_message, rand_data()} || _Val <- lists:seq(1, Count)]; -gen_batch_req(Count) -> + [{ActionId, rand_data()} || _Val <- lists:seq(1, Count)]; +gen_batch_req(Count, _ActionId) -> ct:pal("Gen batch requests failed with unexpected Count: ~p", [Count]). str(List) when is_list(List) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index f8800cc10..4ccdc193b 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -28,6 +28,8 @@ resource_type(confluent_producer) -> emqx_bridge_kafka_impl_producer; resource_type(gcp_pubsub_producer) -> emqx_bridge_gcp_pubsub_impl_producer; +resource_type(hstreamdb) -> + emqx_bridge_hstreamdb_connector; resource_type(kafka_producer) -> emqx_bridge_kafka_impl_producer; resource_type(kinesis) -> @@ -122,6 +124,14 @@ connector_structs() -> required => false } )}, + {hstreamdb, + mk( + hoconsc:map(name, ref(emqx_bridge_hstreamdb, "config_connector")), + #{ + desc => <<"HStreamDB Connector Config">>, + required => false + } + )}, {kafka_producer, mk( hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")), @@ -298,6 +308,7 @@ schema_modules() -> emqx_bridge_azure_event_hub, emqx_bridge_confluent_producer, emqx_bridge_gcp_pubsub_producer_schema, + emqx_bridge_hstreamdb, emqx_bridge_kafka, emqx_bridge_kinesis, emqx_bridge_matrix, @@ -336,6 +347,7 @@ api_schemas(Method) -> <<"gcp_pubsub_producer">>, Method ++ "_connector" ), + api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method ++ "_connector"), api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"), api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 751efa3d9..189b9e2cf 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -128,6 +128,8 @@ connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer]; +connector_type_to_bridge_types(hstreamdb) -> + [hstreamdb]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(kinesis) -> diff --git a/mix.exs b/mix.exs index c2ef491e9..113fb8e2c 100644 --- a/mix.exs +++ b/mix.exs @@ -200,7 +200,7 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ - {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"}, + {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, {:wolff, github: "kafka4beam/wolff", tag: "1.10.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, From 4e8dfb48b730b93ee44fdc036062b98b051a117b Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 14 Feb 2024 17:43:48 +0100 Subject: [PATCH 2/6] fix: elvis problems --- .../src/emqx_bridge_hstreamdb_action_info.erl | 3 ++- .../src/emqx_bridge_hstreamdb_connector.erl | 15 +++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl index 66188110f..7aa6565fa 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl @@ -74,7 +74,8 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> ), BridgeV1Config4 = maps:put(<<"pool_size">>, PoolSize, BridgeV1Config3), - %% Move the fields stream, partition_key and record_template from parameters in ActionConfig to the top level in BridgeV1Config + %% Move the fields stream, partition_key and record_template from + %% parameters in ActionConfig to the top level in BridgeV1Config lists:foldl( fun(Field, Acc) -> emqx_utils_maps:deep_put( diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index bb65089d7..8413e5ecd 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -270,12 +270,7 @@ client_options(Config = #{url := ServerURL, ssl := SSL}) -> check_status(ConnectorState) -> try start_client(ConnectorState) of {ok, Client} -> - try hstreamdb_client:echo(Client) of - ok -> ok; - {error, _} = ErrorEcho -> ErrorEcho - after - _ = hstreamdb:stop_client(Client) - end; + check_status_with_client(Client); {error, _} = StartClientError -> StartClientError catch @@ -283,6 +278,14 @@ check_status(ConnectorState) -> {error, {ErrorType, Reason}} end. +check_status_with_client(Client) -> + try hstreamdb_client:echo(Client) of + ok -> ok; + {error, _} = ErrorEcho -> ErrorEcho + after + _ = hstreamdb:stop_client(Client) + end. + start_client(Opts) -> ClientOptions = maps:get(client_options, Opts), case hstreamdb:start_client(ClientOptions) of From 5e442fcbca4286d4eddacf0c0b5790afae1a9a43 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 15 Feb 2024 11:08:08 +0100 Subject: [PATCH 3/6] fix: labels and descriptions --- .../src/emqx_bridge_hstreamdb.erl | 30 ++++++++--- rel/i18n/emqx_bridge_hstreamdb.hocon | 54 +++++++++++++++++++ 2 files changed, 78 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl index ee0baaa4c..0556a731d 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl @@ -1,4 +1,4 @@ -%%-------------------------------------------------------------------- +%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_hstreamdb). @@ -160,9 +160,15 @@ fields(hstreamdb_action) -> ); fields(action_parameters) -> [ - {stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})}, + {stream, + mk(binary(), #{ + required => true, desc => ?DESC(emqx_bridge_hstreamdb_connector, "stream_name") + })}, - {partition_key, mk(binary(), #{required => false, desc => ?DESC("partition_key")})}, + {partition_key, + mk(binary(), #{ + required => false, desc => ?DESC(emqx_bridge_hstreamdb_connector, "partition_key") + })}, {grpc_flush_timeout, fun grpc_flush_timeout/1}, {record_template, @@ -181,7 +187,9 @@ fields(connector_fields) -> [ {url, mk(binary(), #{ - required => true, desc => ?DESC("url"), default => <<"http://127.0.0.1:6570">> + required => true, + desc => ?DESC(emqx_bridge_hstreamdb_connector, "url"), + default => <<"http://127.0.0.1:6570">> })}, {grpc_timeout, fun grpc_timeout/1} ] ++ emqx_connector_schema_lib:ssl_fields(); @@ -208,13 +216,13 @@ fields("put") -> connector_fields(). grpc_timeout(type) -> emqx_schema:timeout_duration_ms(); -grpc_timeout(desc) -> ?DESC("grpc_timeout"); +grpc_timeout(desc) -> ?DESC(emqx_bridge_hstreamdb_connector, "grpc_timeout"); grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW; grpc_timeout(required) -> false; grpc_timeout(_) -> undefined. grpc_flush_timeout(type) -> emqx_schema:timeout_duration_ms(); -grpc_flush_timeout(desc) -> ?DESC("grpc_timeout"); +grpc_flush_timeout(desc) -> ?DESC("grpc_flush_timeout"); grpc_flush_timeout(default) -> ?DEFAULT_GRPC_FLUSH_TIMEOUT_RAW; grpc_flush_timeout(required) -> false; grpc_flush_timeout(_) -> undefined. @@ -236,6 +244,16 @@ desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for HStreamDB bridge using `", string:to_upper(Method), "` method."]; +desc("creation_opts") -> + ?DESC(emqx_resource_schema, "creation_opts"); +desc("config_connector") -> + ?DESC("config_connector"); +desc(hstreamdb_action) -> + ?DESC("hstreamdb_action"); +desc(action_parameters) -> + ?DESC("action_parameters"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. diff --git a/rel/i18n/emqx_bridge_hstreamdb.hocon b/rel/i18n/emqx_bridge_hstreamdb.hocon index de9989953..70375dfbf 100644 --- a/rel/i18n/emqx_bridge_hstreamdb.hocon +++ b/rel/i18n/emqx_bridge_hstreamdb.hocon @@ -47,4 +47,58 @@ NOTE: When you use `raw record` template (which means the data is not a valid JS record_template.label: """HStream Record""" +action_parameters.desc: +"""Action specific configuration.""" + +action_parameters.label: +"""Action""" + +grpc_flush_timeout.desc: +"""Period for flushing gRPC calls to the HStreamDB server""" + +grpc_flush_timeout.label: +"""gRPC Flush Period""" + +aggregation_pool_size.desc: +"""Size of Record Aggregation Pool""" + +aggregation_pool_size.label: +"""Aggregation Pool Size""" + +max_batches.desc: +"""Maximum number of unconfirmed batches in the flush queue""" + +max_batches.label: +"""Max Batches""" + +writer_pool_size.desc: +"""Writer Pool Size""" + +writer_pool_size.label: +"""Writer Pool Size""" + +batch_size.desc: +"""Maximum number of insert data clauses that can be sent in a single request""" + +batch_size.label: +"""Max Batch Append Count""" + +batch_interval.desc: +"""Maximum interval in milliseconds that is allowed between two successive (batch) request""" + +batch_interval.label: +"""Max Batch Interval""" + +hstreamdb_action.desc: +"""Configuration for HStreamDB Action""" + +hstreamdb_action.label: +"""HStreamDB Action Configuration""" + +config_connector.desc: +"""Configuration for an HStreamDB connector.""" + +config_connector.label: +"""HStreamDB Connector Configuration""" + } From 0bf7a121e79cc6f355cd75f61e80bcb112c4407f Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 15 Feb 2024 12:26:45 +0100 Subject: [PATCH 4/6] docs: add change log entry for HStreamDB bridge refactoring --- changes/ee/feat-12512.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ee/feat-12512.en.md diff --git a/changes/ee/feat-12512.en.md b/changes/ee/feat-12512.en.md new file mode 100644 index 000000000..77ea4d2dd --- /dev/null +++ b/changes/ee/feat-12512.en.md @@ -0,0 +1 @@ +The HStreamDB bridge has been split into connector and action components. Old HStreamDB bridges will be upgraded automatically but it is recommended to do the upgrade manually as new fields has been added to the configuration. From 0cb28f5f4019edb49f0d12fd98d7287efbe36770 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 16 Feb 2024 11:50:51 +0100 Subject: [PATCH 5/6] docs: better descriptions and labels for configuration parameters Thanks @zmstone for the suggestions Co-authored-by: Zaiming (Stone) Shi --- rel/i18n/emqx_bridge_hstreamdb.hocon | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rel/i18n/emqx_bridge_hstreamdb.hocon b/rel/i18n/emqx_bridge_hstreamdb.hocon index 70375dfbf..af2768c4b 100644 --- a/rel/i18n/emqx_bridge_hstreamdb.hocon +++ b/rel/i18n/emqx_bridge_hstreamdb.hocon @@ -54,43 +54,43 @@ action_parameters.label: """Action""" grpc_flush_timeout.desc: -"""Period for flushing gRPC calls to the HStreamDB server""" +"""Time interval for flushing gRPC calls to the HStreamDB server.""" grpc_flush_timeout.label: -"""gRPC Flush Period""" +"""gRPC Flush Interval""" aggregation_pool_size.desc: -"""Size of Record Aggregation Pool""" +"""Size of record aggregation pool.""" aggregation_pool_size.label: """Aggregation Pool Size""" max_batches.desc: -"""Maximum number of unconfirmed batches in the flush queue""" +"""Maximum number of unconfirmed batches in the flush queue.""" max_batches.label: """Max Batches""" writer_pool_size.desc: -"""Writer Pool Size""" +"""The size of the writer pool.""" writer_pool_size.label: """Writer Pool Size""" batch_size.desc: -"""Maximum number of insert data clauses that can be sent in a single request""" +"""Maximum number of insert data clauses that can be sent in a single request.""" batch_size.label: """Max Batch Append Count""" batch_interval.desc: -"""Maximum interval in milliseconds that is allowed between two successive (batch) request""" +"""Maximum interval that is allowed between two successive (batch) request.""" batch_interval.label: """Max Batch Interval""" hstreamdb_action.desc: -"""Configuration for HStreamDB Action""" +"""Configuration for HStreamDB action.""" hstreamdb_action.label: """HStreamDB Action Configuration""" From def95aa22bf44ffc2b1292ca7c1d472bddfd4dca Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 19 Feb 2024 11:33:37 +0100 Subject: [PATCH 6/6] docs(HStreamDB bridge): make pool size descriptions better --- apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl | 2 +- rel/i18n/emqx_bridge_hstreamdb.hocon | 4 ++-- rel/i18n/emqx_bridge_hstreamdb_connector.hocon | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl index 0556a731d..694f459e0 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl @@ -110,10 +110,10 @@ bridge_v2_examples(Method) -> action_values() -> #{ <<"parameters">> => #{ - <<"aggregation_pool_size">> => 8, <<"partition_key">> => <<"hej">>, <<"record_template">> => <<"${payload}">>, <<"stream">> => <<"mqtt_message">>, + <<"aggregation_pool_size">> => 8, <<"writer_pool_size">> => 8 } }. diff --git a/rel/i18n/emqx_bridge_hstreamdb.hocon b/rel/i18n/emqx_bridge_hstreamdb.hocon index af2768c4b..d9eb6cc22 100644 --- a/rel/i18n/emqx_bridge_hstreamdb.hocon +++ b/rel/i18n/emqx_bridge_hstreamdb.hocon @@ -60,7 +60,7 @@ grpc_flush_timeout.label: """gRPC Flush Interval""" aggregation_pool_size.desc: -"""Size of record aggregation pool.""" +"""The size of the record aggregation pool. A larger aggregation pool size can lead to enhanced parallelization but may also result in reduced efficiency due to smaller batch sizes.""" aggregation_pool_size.label: """Aggregation Pool Size""" @@ -72,7 +72,7 @@ max_batches.label: """Max Batches""" writer_pool_size.desc: -"""The size of the writer pool.""" +"""The size of the writer pool. A larger pool may increase parallelization and concurrent write operations, potentially boosting throughput. Trade-offs include greater memory consumption and possible resource contention.""" writer_pool_size.label: """Writer Pool Size""" diff --git a/rel/i18n/emqx_bridge_hstreamdb_connector.hocon b/rel/i18n/emqx_bridge_hstreamdb_connector.hocon index 8f7ac2edb..3122d59da 100644 --- a/rel/i18n/emqx_bridge_hstreamdb_connector.hocon +++ b/rel/i18n/emqx_bridge_hstreamdb_connector.hocon @@ -19,7 +19,7 @@ name.label: """Connector Name""" url.desc: -"""HStreamDB Server URL. Using gRPC http server address.""" +"""HStreamDB Server URL. This URL will be used as the gRPC HTTP server address.""" url.label: """HStreamDB Server URL""" @@ -37,13 +37,13 @@ partition_key.label: """HStreamDB Partition Key""" pool_size.desc: -"""HStreamDB Pool Size.""" +"""The size of the aggregation pool and the writer pool (see the description of the HStreamDB action for more information about these pools). Larger pool sizes can enhance parallelization but may also reduce efficiency due to smaller batch sizes.""" pool_size.label: """HStreamDB Pool Size""" grpc_timeout.desc: -"""HStreamDB gRPC Timeout.""" +"""The timeout for HStreamDB gRPC requests.""" grpc_timeout.label: """HStreamDB gRPC Timeout"""