From a153d758c3a8c7130727563440f089df32d97959 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 12 Feb 2024 12:42:16 +0100 Subject: [PATCH] feat: refactor HStreamDB bridge to connector and action This commit also upgrades the hstreamdb_erl driver library and change the action/bridge to use the new hstreamdb_erl. Much of the code for the new API is copied from: https://github.com/emqx/emqx-enterprise/blob/be1a1604dd5c4c06ea6be15b702f35d9bdec48a7/lib-ee/emqx_rule_actions/src/emqx_backend_hstreamdb_actions.erl Fixes: https://emqx.atlassian.net/browse/EMQX-11458 --- .ci/docker-compose-file/.env | 2 +- apps/emqx_bridge/src/emqx_action_info.erl | 1 + apps/emqx_bridge_hstreamdb/rebar.config | 2 +- .../src/emqx_bridge_hstreamdb.app.src | 4 +- .../src/emqx_bridge_hstreamdb.erl | 157 ++++++- .../src/emqx_bridge_hstreamdb_action_info.erl | 88 ++++ .../src/emqx_bridge_hstreamdb_connector.erl | 412 +++++++++++------- .../test/emqx_bridge_hstreamdb_SUITE.erl | 153 ++++++- .../src/schema/emqx_connector_ee_schema.erl | 12 + .../src/schema/emqx_connector_schema.erl | 2 + mix.exs | 2 +- 11 files changed, 629 insertions(+), 206 deletions(-) create mode 100644 apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl diff --git a/.ci/docker-compose-file/.env b/.ci/docker-compose-file/.env index 73ec47d00..1b837aea3 100644 --- a/.ci/docker-compose-file/.env +++ b/.ci/docker-compose-file/.env @@ -10,7 +10,7 @@ CASSANDRA_TAG=3.11 MINIO_TAG=RELEASE.2023-03-20T20-16-18Z OPENTS_TAG=9aa7f88 KINESIS_TAG=2.1 -HSTREAMDB_TAG=v0.16.1 +HSTREAMDB_TAG=v0.19.3 HSTREAMDB_ZK_TAG=3.8.1 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 3a9e13f94..a57b7eed0 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -91,6 +91,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_kafka_action_info, emqx_bridge_kinesis_action_info, + emqx_bridge_hstreamdb_action_info, emqx_bridge_matrix_action_info, emqx_bridge_mongodb_action_info, emqx_bridge_oracle_action_info, diff --git a/apps/emqx_bridge_hstreamdb/rebar.config b/apps/emqx_bridge_hstreamdb/rebar.config index eab7bcb3f..c2e3194ac 100644 --- a/apps/emqx_bridge_hstreamdb/rebar.config +++ b/apps/emqx_bridge_hstreamdb/rebar.config @@ -3,7 +3,7 @@ {erl_opts, [debug_info]}. {deps, [ {hstreamdb_erl, - {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.4.5+v0.16.1"}}}, + {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.5.18+v0.18.1"}}}, {emqx, {path, "../../apps/emqx"}}, {emqx_utils, {path, "../../apps/emqx_utils"}} ]}. diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src index f9825e3dd..84c09fe3a 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_hstreamdb, [ {description, "EMQX Enterprise HStreamDB Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, @@ -8,7 +8,7 @@ emqx_resource, hstreamdb_erl ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_hstreamdb_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl index 7052e0120..ee0baaa4c 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl @@ -6,10 +6,12 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --import(hoconsc, [mk/2, enum/1, ref/2]). +-import(hoconsc, [mk/2, enum/1]). -export([ - conn_bridge_examples/1 + conn_bridge_examples/1, + bridge_v2_examples/1, + connector_examples/1 ]). -export([ @@ -19,6 +21,11 @@ desc/1 ]). +-define(CONNECTOR_TYPE, hstreamdb). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). +-define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>). +-define(DEFAULT_GRPC_FLUSH_TIMEOUT_RAW, <<"10s">>). + %% ------------------------------------------------------------------------------------------------- %% api @@ -27,16 +34,16 @@ conn_bridge_examples(Method) -> #{ <<"hstreamdb">> => #{ summary => <<"HStreamDB Bridge">>, - value => values(Method) + value => conn_bridge_example_values(Method) } } ]. -values(get) -> - values(post); -values(put) -> - values(post); -values(post) -> +conn_bridge_example_values(get) -> + conn_bridge_example_values(post); +conn_bridge_example_values(put) -> + conn_bridge_example_values(post); +conn_bridge_example_values(post) -> #{ type => <<"hstreamdb">>, name => <<"demo">>, @@ -55,15 +62,135 @@ values(post) -> }, ssl => #{enable => false} }; -values(_) -> +conn_bridge_example_values(_) -> #{}. +connector_examples(Method) -> + [ + #{ + <<"hstreamdb">> => + #{ + summary => <<"HStreamDB Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_values() + ) + } + } + ]. + +connector_values() -> + #{ + <<"url">> => <<"http://127.0.0.1:6570">>, + <<"grpc_timeout">> => <<"30s">>, + <<"ssl">> => + #{ + <<"enable">> => false, + <<"verify">> => <<"verify_peer">> + }, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_timeout">> => <<"5s">> + } + }. + +bridge_v2_examples(Method) -> + [ + #{ + <<"hstreamdb">> => + #{ + summary => <<"HStreamDB Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{ + <<"parameters">> => #{ + <<"aggregation_pool_size">> => 8, + <<"partition_key">> => <<"hej">>, + <<"record_template">> => <<"${payload}">>, + <<"stream">> => <<"mqtt_message">>, + <<"writer_pool_size">> => 8 + } + }. + %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions namespace() -> "bridge_hstreamdb". roots() -> []. +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + Fields = + fields(connector_fields) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(hstreamdb_action)); +fields(action) -> + {?ACTION_TYPE, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, hstreamdb_action)), + #{ + desc => <<"HStreamDB Action Config">>, + required => false + } + )}; +fields(hstreamdb_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + hoconsc:ref(?MODULE, action_parameters), + #{ + required => true, + desc => ?DESC("action_parameters") + } + ) + ); +fields(action_parameters) -> + [ + {stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})}, + + {partition_key, mk(binary(), #{required => false, desc => ?DESC("partition_key")})}, + + {grpc_flush_timeout, fun grpc_flush_timeout/1}, + {record_template, + mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})}, + {aggregation_pool_size, + mk(integer(), #{default => 8, desc => ?DESC("aggregation_pool_size")})}, + {max_batches, mk(integer(), #{default => 500, desc => ?DESC("max_batches")})}, + {writer_pool_size, mk(integer(), #{default => 8, desc => ?DESC("writer_pool_size")})}, + {batch_size, mk(integer(), #{default => 100, desc => ?DESC("batch_size")})}, + {batch_interval, + mk(emqx_schema:timeout_duration_ms(), #{ + default => <<"500ms">>, desc => ?DESC("batch_interval") + })} + ]; +fields(connector_fields) -> + [ + {url, + mk(binary(), #{ + required => true, desc => ?DESC("url"), default => <<"http://127.0.0.1:6570">> + })}, + {grpc_timeout, fun grpc_timeout/1} + ] ++ emqx_connector_schema_lib:ssl_fields(); +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + fields(connector_fields) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields("config") -> hstream_bridge_common_fields() ++ connector_fields(); @@ -80,6 +207,18 @@ fields("put") -> hstream_bridge_common_fields() ++ connector_fields(). +grpc_timeout(type) -> emqx_schema:timeout_duration_ms(); +grpc_timeout(desc) -> ?DESC("grpc_timeout"); +grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW; +grpc_timeout(required) -> false; +grpc_timeout(_) -> undefined. + +grpc_flush_timeout(type) -> emqx_schema:timeout_duration_ms(); +grpc_flush_timeout(desc) -> ?DESC("grpc_timeout"); +grpc_flush_timeout(default) -> ?DEFAULT_GRPC_FLUSH_TIMEOUT_RAW; +grpc_flush_timeout(required) -> false; +grpc_flush_timeout(_) -> undefined. + hstream_bridge_common_fields() -> emqx_bridge_schema:common_bridge_fields() ++ [ diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl new file mode 100644 index 000000000..66188110f --- /dev/null +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_action_info.erl @@ -0,0 +1,88 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_hstreamdb_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0, + bridge_v1_config_to_connector_config/1, + bridge_v1_config_to_action_config/2, + connector_action_config_to_bridge_v1_config/2 +]). + +bridge_v1_type_name() -> hstreamdb. + +action_type_name() -> hstreamdb. + +connector_type_name() -> hstreamdb. + +schema_module() -> emqx_bridge_hstreamdb. + +bridge_v1_config_to_connector_config(BridgeV1Conf) -> + ConnectorSchema = emqx_bridge_hstreamdb:fields(connector_fields), + ConnectorAtomKeys = lists:foldl(fun({K, _}, Acc) -> [K | Acc] end, [], ConnectorSchema), + ConnectorBinKeys = [atom_to_binary(K) || K <- ConnectorAtomKeys] ++ [<<"resource_opts">>], + ConnectorConfig0 = maps:with(ConnectorBinKeys, BridgeV1Conf), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnectorConfig0 + ). + +bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> + Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config( + BridgeV1Conf, ConnectorName, emqx_bridge_hstreamdb, "config_connector" + ), + %% Remove fields no longer relevant for the action + Config1 = lists:foldl( + fun(Field, Acc) -> + emqx_utils_maps:deep_remove(Field, Acc) + end, + Config0, + [ + [<<"parameters">>, <<"pool_size">>], + [<<"parameters">>, <<"direction">>] + ] + ), + %% Move pool_size to aggregation_pool_size and writer_pool_size + PoolSize = maps:get(<<"pool_size">>, BridgeV1Conf, 8), + Config2 = emqx_utils_maps:deep_put( + [<<"parameters">>, <<"aggregation_pool_size">>], + Config1, + PoolSize + ), + Config3 = emqx_utils_maps:deep_put( + [<<"parameters">>, <<"writer_pool_size">>], + Config2, + PoolSize + ), + Config3. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig), + BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1), + BridgeV1Config3 = maps:remove(<<"parameters">>, BridgeV1Config2), + %% Pick out pool_size from aggregation_pool_size + PoolSize = emqx_utils_maps:deep_get( + [<<"parameters">>, <<"aggregation_pool_size">>], ActionConfig, 8 + ), + BridgeV1Config4 = maps:put(<<"pool_size">>, PoolSize, BridgeV1Config3), + + %% Move the fields stream, partition_key and record_template from parameters in ActionConfig to the top level in BridgeV1Config + lists:foldl( + fun(Field, Acc) -> + emqx_utils_maps:deep_put( + [Field], + Acc, + emqx_utils_maps:deep_get([<<"parameters">>, Field], ActionConfig, <<>>) + ) + end, + BridgeV1Config4, + [<<"stream">>, <<"partition_key">>, <<"record_template">>] + ). diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index fdb80b1e1..bb65089d7 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -7,8 +7,9 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). --import(hoconsc, [mk/2, enum/1]). +-import(hoconsc, [mk/2]). -behaviour(emqx_resource). @@ -19,7 +20,11 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). -export([ @@ -38,67 +43,132 @@ -define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)). -define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>). +-define(DEFAULT_GRPC_FLUSH_TIMEOUT, 10000). +-define(DEFAULT_MAX_BATCHES, 500). +-define(DEFAULT_BATCH_INTERVAL, 500). +-define(DEFAULT_AGG_POOL_SIZE, 8). +-define(DEFAULT_WRITER_POOL_SIZE, 8). %% ------------------------------------------------------------------------------------------------- %% resource callback callback_mode() -> always_sync. on_start(InstId, Config) -> - start_client(InstId, Config). + try + do_on_start(InstId, Config) + catch + E:R:S -> + Error = #{ + msg => "start_hstreamdb_connector_error", + connector => InstId, + error => E, + reason => R, + stack => S + }, + ?SLOG(error, Error), + {error, Error} + end. + +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels, + client_options := ClientOptions + } = OldState, + ChannelId, + ChannelConfig +) -> + %{ok, ChannelState} = create_channel_state(ChannelId, PoolName, ChannelConfig), + Parameters0 = maps:get(parameters, ChannelConfig), + Parameters = Parameters0#{client_options => ClientOptions}, + PartitionKey = emqx_placeholder:preproc_tmpl(maps:get(partition_key, Parameters, <<"">>)), + try + ChannelState = #{ + producer => start_producer(ChannelId, Parameters), + record_template => record_template(Parameters), + partition_key => PartitionKey + }, + NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState} + catch + Error:Reason:Stack -> + {error, {Error, Reason, Stack}} + end. + +on_remove_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId +) -> + #{ + producer := Producer + } = maps:get(ChannelId, InstalledChannels), + _ = hstreamdb:stop_producer(Producer), + NewInstalledChannels = maps:remove(ChannelId, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +on_get_channel_status( + _ResId, + _ChannelId, + _State +) -> + ?status_connected. + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). on_stop(InstId, _State) -> - case emqx_resource:get_allocated_resources(InstId) of - #{?hstreamdb_client := #{client := Client, producer := Producer}} -> - StopClientRes = hstreamdb:stop_client(Client), - StopProducerRes = hstreamdb:stop_producer(Producer), - ?SLOG(info, #{ - msg => "stop_hstreamdb_connector", - connector => InstId, - client => Client, - producer => Producer, - stop_client => StopClientRes, - stop_producer => StopProducerRes - }); - _ -> - ok - end. + ?tp( + hstreamdb_connector_on_stop, + #{instance_id => InstId} + ). -define(FAILED_TO_APPLY_HRECORD_TEMPLATE, {error, {unrecoverable_error, failed_to_apply_hrecord_template}} ). on_query( - _InstId, - {send_message, Data}, - _State = #{ - producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate - } + InstId, + {ChannelID, Data}, + #{installed_channels := Channels} = _State ) -> + #{ + producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate + } = maps:get(ChannelID, Channels), try to_record(PartitionKey, HRecordTemplate, Data) of - Record -> append_record(Producer, Record, false) + Record -> append_record(InstId, Producer, Record, false) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. on_batch_query( - _InstId, - BatchList, - _State = #{ - producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate - } + InstId, + [{ChannelID, _Data} | _] = BatchList, + #{installed_channels := Channels} = _State ) -> + #{ + producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate + } = maps:get(ChannelID, Channels), try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of - Records -> append_record(Producer, Records, true) + Records -> append_record(InstId, Producer, Records, true) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. -on_get_status(_InstId, #{client := Client}) -> - case is_alive(Client) of - true -> - connected; - false -> - disconnected +on_get_status(_InstId, State) -> + case check_status(State) of + ok -> + ?status_connected; + Error -> + %% We set it to ?status_connecting so that the channels are not deleted. + %% The producers in the channels contains buffers so we don't want to delete them. + {?status_connecting, State, Error} end. %% ------------------------------------------------------------------------------------------------- @@ -140,142 +210,149 @@ desc(config) -> %% ------------------------------------------------------------------------------------------------- %% internal functions -start_client(InstId, Config) -> - try - do_start_client(InstId, Config) - catch - E:R:S -> - Error = #{ - msg => "start_hstreamdb_connector_error", - connector => InstId, - error => E, - reason => R, - stack => S - }, - ?SLOG(error, Error), - {error, Error} - end. -do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize, ssl := SSL}) -> +do_on_start(InstId, Config) -> ?SLOG(info, #{ msg => "starting_hstreamdb_connector_client", connector => InstId, config => Config }), - ClientName = client_name(InstId), + {ok, _} = application:ensure_all_started(hstreamdb_erl), + ClientOptions = client_options(Config), + State = #{ + client_options => ClientOptions, + installed_channels => #{} + }, + case check_status(State) of + ok -> + ?SLOG(info, #{ + msg => "hstreamdb_connector_client_started", + connector => InstId + }), + {ok, State}; + Error -> + ?tp( + hstreamdb_connector_start_failed, + #{error => client_not_alive} + ), + ?SLOG(error, #{ + msg => "hstreamdb_connector_client_not_alive", + connector => InstId, + error => Error + }), + {error, {connect_failed, Error}} + end. + +client_options(Config = #{url := ServerURL, ssl := SSL}) -> + GRPCTimeout = maps:get(<<"grpc_timeout">>, Config, ?DEFAULT_GRPC_TIMEOUT), + EnableSSL = maps:get(enable, SSL), RpcOpts = - case maps:get(enable, SSL) of + case EnableSSL of false -> - #{pool_size => PoolSize}; + #{pool_size => 1}; true -> #{ - pool_size => PoolSize, + pool_size => 1, gun_opts => #{ transport => tls, - transport_opts => emqx_tls_lib:to_client_opts(SSL) + transport_opts => + emqx_tls_lib:to_client_opts(SSL) } } end, - ClientOptions = [ - {url, binary_to_list(Server)}, - {rpc_options, RpcOpts} - ], - case hstreamdb:start_client(ClientName, ClientOptions) of + ClientOptions = #{ + url => to_string(ServerURL), + grpc_timeout => GRPCTimeout, + rpc_options => RpcOpts + }, + ClientOptions. + +check_status(ConnectorState) -> + try start_client(ConnectorState) of {ok, Client} -> - case is_alive(Client) of - true -> - ?SLOG(info, #{ - msg => "hstreamdb_connector_client_started", - connector => InstId, - client => Client - }), - start_producer(InstId, Client, Config); - _ -> - ?tp( - hstreamdb_connector_start_failed, - #{error => client_not_alive} - ), - ?SLOG(error, #{ - msg => "hstreamdb_connector_client_not_alive", - connector => InstId - }), - {error, connect_failed} + try hstreamdb_client:echo(Client) of + ok -> ok; + {error, _} = ErrorEcho -> ErrorEcho + after + _ = hstreamdb:stop_client(Client) end; - {error, {already_started, Pid}} -> - ?SLOG(info, #{ - msg => "starting_hstreamdb_connector_client_find_old_client_restart_client", - old_client_pid => Pid, - old_client_name => ClientName - }), - _ = hstreamdb:stop_client(ClientName), - start_client(InstId, Config); + {error, _} = StartClientError -> + StartClientError + catch + ErrorType:Reason:_ST -> + {error, {ErrorType, Reason}} + end. + +start_client(Opts) -> + ClientOptions = maps:get(client_options, Opts), + case hstreamdb:start_client(ClientOptions) of + {ok, Client} -> + {ok, Client}; {error, Error} -> - ?SLOG(error, #{ - msg => "hstreamdb_connector_client_failed", - connector => InstId, - reason => Error - }), {error, Error} end. -is_alive(Client) -> - hstreamdb_client:echo(Client) =:= ok. - start_producer( - InstId, - Client, - Options = #{stream := Stream, pool_size := PoolSize} + ActionId, + #{ + stream := Stream, + batch_size := BatchSize, + batch_interval := Interval + } = Opts ) -> - %% TODO: change these batch options after we have better disk cache. - BatchSize = maps:get(batch_size, Options, 100), - Interval = maps:get(batch_interval, Options, 1000), - ProducerOptions = [ - {stream, Stream}, - {callback, {?MODULE, on_flush_result, []}}, - {max_records, BatchSize}, - {interval, Interval}, - {pool_size, PoolSize}, - {grpc_timeout, maps:get(grpc_timeout, Options, ?DEFAULT_GRPC_TIMEOUT)} - ], - Name = produce_name(InstId), - ?SLOG(info, #{ - msg => "starting_hstreamdb_connector_producer", - connector => InstId - }), - case hstreamdb:start_producer(Client, Name, ProducerOptions) of - {ok, Producer} -> - ?SLOG(info, #{ - msg => "hstreamdb_connector_producer_started" - }), - State = #{ - client => Client, - producer => Producer, - enable_batch => maps:get(enable_batch, Options, false), - partition_key => emqx_placeholder:preproc_tmpl( - maps:get(partition_key, Options, <<"">>) - ), - record_template => record_template(Options) - }, - ok = emqx_resource:allocate_resource(InstId, ?hstreamdb_client, #{ - client => Client, producer => Producer - }), - {ok, State}; - {error, {already_started, Pid}} -> - ?SLOG(info, #{ - msg => - "starting_hstreamdb_connector_producer_find_old_producer_restart_producer", - old_producer_pid => Pid, - old_producer_name => Name - }), - _ = hstreamdb:stop_producer(Name), - start_producer(InstId, Client, Options); + MaxBatches = maps:get(max_batches, Opts, ?DEFAULT_MAX_BATCHES), + AggPoolSize = maps:get(aggregation_pool_size, Opts, ?DEFAULT_AGG_POOL_SIZE), + WriterPoolSize = maps:get(writer_pool_size, Opts, ?DEFAULT_WRITER_POOL_SIZE), + GRPCTimeout = maps:get(grpc_flush_timeout, Opts, ?DEFAULT_GRPC_FLUSH_TIMEOUT), + ClientOptions = maps:get(client_options, Opts), + ProducerOptions = #{ + stream => to_string(Stream), + buffer_options => #{ + interval => Interval, + callback => {?MODULE, on_flush_result, [ActionId]}, + max_records => BatchSize, + max_batches => MaxBatches + }, + buffer_pool_size => AggPoolSize, + writer_options => #{ + grpc_timeout => GRPCTimeout + }, + writer_pool_size => WriterPoolSize, + client_options => ClientOptions + }, + Name = produce_name(ActionId), + ensure_start_producer(Name, ProducerOptions). + +ensure_start_producer(ProducerName, ProducerOptions) -> + case hstreamdb:start_producer(ProducerName, ProducerOptions) of + ok -> + ok; + {error, {already_started, _Pid}} -> + %% HStreamDB producer already started, restart it + _ = hstreamdb:stop_producer(ProducerName), + %% the pool might have been leaked after relup + _ = ecpool:stop_sup_pool(ProducerName), + ok = hstreamdb:start_producer(ProducerName, ProducerOptions); + {error, { + {shutdown, + {failed_to_start_child, {pool_sup, Pool}, + {shutdown, + {failed_to_start_child, worker_sup, + {shutdown, {failed_to_start_child, _, {badarg, _}}}}}}}, + _ + }} -> + %% HStreamDB producer was not properly cleared, restart it + %% the badarg error in gproc maybe caused by the pool is leaked after relup + _ = ecpool:stop_sup_pool(Pool), + ok = hstreamdb:start_producer(ProducerName, ProducerOptions); {error, Reason} -> - ?SLOG(error, #{ - msg => "starting_hstreamdb_connector_producer_failed", - reason => Reason - }), - {error, Reason} - end. + %% HStreamDB start producer failed + throw({start_producer_failed, Reason}) + end, + ProducerName. + +produce_name(ActionId) -> + list_to_binary("backend_hstream_producer:" ++ to_string(ActionId)). to_record(PartitionKeyTmpl, HRecordTmpl, Data) -> PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data), @@ -289,43 +366,46 @@ to_record(PartitionKey, RawRecord) -> to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) -> lists:map( - fun({send_message, Data}) -> + fun({_, Data}) -> to_record(PartitionKeyTmpl, HRecordTmpl, Data) end, BatchList ). -append_record(Producer, MultiPartsRecords, MaybeBatch) when is_list(MultiPartsRecords) -> +append_record(ResourceId, Producer, MultiPartsRecords, MaybeBatch) when + is_list(MultiPartsRecords) +-> lists:foreach( - fun(Record) -> append_record(Producer, Record, MaybeBatch) end, MultiPartsRecords + fun(Record) -> append_record(ResourceId, Producer, Record, MaybeBatch) end, + MultiPartsRecords ); -append_record(Producer, Record, MaybeBatch) when is_tuple(Record) -> - do_append_records(Producer, Record, MaybeBatch). +append_record(ResourceId, Producer, Record, MaybeBatch) when is_tuple(Record) -> + do_append_records(ResourceId, Producer, Record, MaybeBatch). %% TODO: only sync request supported. implement async request later. -do_append_records(Producer, Record, true = IsBatch) -> +do_append_records(ResourceId, Producer, Record, true = IsBatch) -> Result = hstreamdb:append(Producer, Record), - handle_result(Result, Record, IsBatch); -do_append_records(Producer, Record, false = IsBatch) -> + handle_result(ResourceId, Result, Record, IsBatch); +do_append_records(ResourceId, Producer, Record, false = IsBatch) -> Result = hstreamdb:append_flush(Producer, Record), - handle_result(Result, Record, IsBatch). + handle_result(ResourceId, Result, Record, IsBatch). -handle_result(ok = Result, Record, IsBatch) -> - handle_result({ok, Result}, Record, IsBatch); -handle_result({ok, Result}, Record, IsBatch) -> +handle_result(ResourceId, ok = Result, Record, IsBatch) -> + handle_result(ResourceId, {ok, Result}, Record, IsBatch); +handle_result(ResourceId, {ok, Result}, Record, IsBatch) -> ?tp( hstreamdb_connector_query_append_return, - #{result => Result, is_batch => IsBatch} + #{result => Result, is_batch => IsBatch, instance_id => ResourceId} ), ?SLOG(debug, #{ msg => "hstreamdb_producer_sync_append_success", record => Record, is_batch => IsBatch }); -handle_result({error, Reason} = Err, Record, IsBatch) -> +handle_result(ResourceId, {error, Reason} = Err, Record, IsBatch) -> ?tp( hstreamdb_connector_query_append_return, - #{error => Reason, is_batch => IsBatch} + #{error => Reason, is_batch => IsBatch, instance_id => ResourceId} ), ?SLOG(error, #{ msg => "hstreamdb_producer_sync_append_failed", @@ -335,12 +415,6 @@ handle_result({error, Reason} = Err, Record, IsBatch) -> }), Err. -client_name(InstId) -> - "client:" ++ to_string(InstId). - -produce_name(ActionId) -> - list_to_atom("producer:" ++ to_string(ActionId)). - record_template(#{record_template := RawHRecordTemplate}) -> emqx_placeholder:preproc_tmpl(RawHRecordTemplate); record_template(_) -> diff --git a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl index 4d165c03d..1ac489334 100644 --- a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl +++ b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl @@ -117,16 +117,21 @@ end_per_suite(_Config) -> ok. init_per_testcase(t_to_hrecord_failed, Config) -> + init_per_testcase_common(), meck:new([hstreamdb], [passthrough, no_history, no_link]), meck:expect(hstreamdb, to_record, fun(_, _, _) -> error(trans_to_hrecord_failed) end), Config; init_per_testcase(_Testcase, Config) -> + init_per_testcase_common(), %% drop stream and will create a new one in common_init/1 %% TODO: create a new stream for each test case delete_bridge(Config), snabbkaffe:start_trace(), Config. +init_per_testcase_common() -> + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(). + end_per_testcase(t_to_hrecord_failed, _Config) -> meck:unload([hstreamdb]); end_per_testcase(_Testcase, Config) -> @@ -301,7 +306,10 @@ t_simple_query(Config) -> {ok, _}, create_bridge(Config) ), - Requests = gen_batch_req(BatchSize), + Type = ?config(hstreamdb_bridge_type, Config), + Name = ?config(hstreamdb_name, Config), + ActionId = emqx_bridge_v2:id(Type, Name), + Requests = gen_batch_req(BatchSize, ActionId), ?check_trace( begin ?wait_async_action( @@ -351,6 +359,24 @@ t_to_hrecord_failed(Config) -> end, ok. +%% Connector Action Tests + +t_action_on_get_status(Config) -> + emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}). + +t_action_create_via_http(Config) -> + emqx_bridge_v2_testlib:t_create_via_http(Config). + +t_action_sync_query(Config) -> + MakeMessageFun = fun() -> rand_data() end, + IsSuccessCheck = fun(Result) -> ?assertEqual(ok, Result) end, + TracePoint = hstreamdb_connector_query_append_return, + emqx_bridge_v2_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint). + +t_action_start_stop(Config) -> + StopTracePoint = hstreamdb_connector_on_stop, + emqx_bridge_v2_testlib:t_start_stop(Config, StopTracePoint). + %%------------------------------------------------------------------------------ %% Helper fns %%------------------------------------------------------------------------------ @@ -362,6 +388,10 @@ common_init(ConfigT) -> URL = "http://" ++ Host ++ ":" ++ RawPort, Config0 = [ + {bridge_type, <<"hstreamdb">>}, + {bridge_name, <<"my_hstreamdb_action">>}, + {connector_type, <<"hstreamdb">>}, + {connector_name, <<"my_hstreamdb_connector">>}, {hstreamdb_host, Host}, {hstreamdb_port, Port}, {hstreamdb_url, URL}, @@ -393,6 +423,8 @@ common_init(ConfigT) -> {hstreamdb_config, HStreamDBConf}, {hstreamdb_bridge_type, BridgeType}, {hstreamdb_name, Name}, + {bridge_config, action_config(Config0)}, + {connector_config, connector_config(Config0)}, {proxy_host, ProxyHost}, {proxy_port, ProxyPort} | Config0 @@ -424,7 +456,7 @@ hstreamdb_config(BridgeType, Config) -> " resource_opts = {\n" %% always sync " query_mode = sync\n" - " request_ttl = 500ms\n" + " request_ttl = 10000ms\n" " batch_size = ~b\n" " worker_pool_size = ~b\n" " }\n" @@ -443,6 +475,45 @@ hstreamdb_config(BridgeType, Config) -> ), {Name, parse_and_check(ConfigString, BridgeType, Name)}. +action_config(Config) -> + ConnectorName = ?config(connector_name, Config), + BatchSize = batch_size(Config), + #{ + <<"connector">> => ConnectorName, + <<"enable">> => true, + <<"parameters">> => + #{ + <<"aggregation_pool_size">> => ?POOL_SIZE, + <<"record_template">> => ?RECORD_TEMPLATE, + <<"stream">> => ?STREAM, + <<"writer_pool_size">> => ?POOL_SIZE + }, + <<"resource_opts">> => + #{ + <<"batch_size">> => BatchSize, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"query_mode">> => <<"sync">>, + <<"request_ttl">> => <<"45s">>, + <<"worker_pool_size">> => ?POOL_SIZE + } + }. + +connector_config(Config) -> + Port = integer_to_list(?config(hstreamdb_port, Config)), + URL = "http://" ++ ?config(hstreamdb_host, Config) ++ ":" ++ Port, + #{ + <<"url">> => URL, + <<"ssl">> => + #{<<"enable">> => false, <<"verify">> => <<"verify_peer">>}, + <<"grpc_timeout">> => <<"30s">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_timeout">> => <<"5s">> + } + }. + parse_and_check(ConfigString, BridgeType, Name) -> {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), @@ -454,10 +525,10 @@ parse_and_check(ConfigString, BridgeType, Name) -> -define(CONN_ATTEMPTS, 10). default_options(Config) -> - [ - {url, ?config(hstreamdb_url, Config)}, - {rpc_options, ?RPC_OPTIONS} - ]. + #{ + url => ?config(hstreamdb_url, Config), + rpc_options => ?RPC_OPTIONS + }. connect_direct_hstream(Name, Config) -> client(Name, Config, ?CONN_ATTEMPTS). @@ -511,8 +582,9 @@ send_message(Config, Data) -> query_resource(Config, Request) -> Name = ?config(hstreamdb_name, Config), BridgeType = ?config(hstreamdb_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). + ID = emqx_bridge_v2:id(BridgeType, Name), + ResID = emqx_connector_resource:resource_id(BridgeType, Name), + emqx_resource:query(ID, Request, #{timeout => 1_000, connector_resource_id => ResID}). restart_resource(Config) -> BridgeName = ?config(hstreamdb_name, Config), @@ -526,8 +598,16 @@ resource_id(Config) -> BridgeType = ?config(hstreamdb_bridge_type, Config), _ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName). +action_id(Config) -> + ActionName = ?config(hstreamdb_name, Config), + ActionType = ?config(hstreamdb_bridge_type, Config), + _ActionID = emqx_bridge_v2:id(ActionType, ActionName). + health_check_resource_ok(Config) -> - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))). + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))), + ActionName = ?config(hstreamdb_name, Config), + ActionType = ?config(hstreamdb_bridge_type, Config), + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(ActionType, ActionName)). health_check_resource_down(Config) -> case emqx_resource_manager:health_check(resource_id(Config)) of @@ -539,6 +619,19 @@ health_check_resource_down(Config) -> ?assert( false, lists:flatten(io_lib:format("invalid health check result:~p~n", [Other])) ) + end, + ActionName = ?config(hstreamdb_name, Config), + ActionType = ?config(hstreamdb_bridge_type, Config), + #{status := StatusV2} = emqx_bridge_v2:health_check(ActionType, ActionName), + case StatusV2 of + disconnected -> + ok; + connecting -> + ok; + OtherV2 -> + ?assert( + false, lists:flatten(io_lib:format("invalid health check result:~p~n", [OtherV2])) + ) end. % These funs start and then stop the hstreamdb connection @@ -548,22 +641,36 @@ connect_and_create_stream(Config) -> Client, ?STREAM, ?REPLICATION_FACTOR, ?BACKLOG_RETENTION_SECOND, ?SHARD_COUNT ) ), - %% force write to stream to make it created and ready to be written data for rest cases - ProducerOptions = [ - {pool_size, 4}, - {stream, ?STREAM}, - {callback, fun(_) -> ok end}, - {max_records, 10}, - {interval, 1000} - ], + %% force write to stream to make it created and ready to be written data for test cases + ProducerOptions = #{ + stream => ?STREAM, + buffer_options => #{ + interval => 1000, + callback => {?MODULE, on_flush_result, [<<"WHAT">>]}, + max_records => 1, + max_batches => 1 + }, + buffer_pool_size => 1, + writer_options => #{ + grpc_timeout => 100 + }, + writer_pool_size => 1, + client_options => default_options(Config) + }, + ?WITH_CLIENT( begin - {ok, Producer} = hstreamdb:start_producer(Client, test_producer, ProducerOptions), - _ = hstreamdb:append_flush(Producer, hstreamdb:to_record([], raw, rand_payload())), - _ = hstreamdb:stop_producer(Producer) + ok = hstreamdb:start_producer(test_producer, ProducerOptions), + _ = hstreamdb:append_flush(test_producer, hstreamdb:to_record([], raw, rand_payload())), + _ = hstreamdb:stop_producer(test_producer) end ). +on_flush_result({{flush, _Stream, _Records}, {ok, _Resp}}) -> + ok; +on_flush_result({{flush, _Stream, _Records}, {error, _Reason}}) -> + ok. + connect_and_delete_stream(Config) -> ?WITH_CLIENT( _ = hstreamdb_client:delete_stream(Client, ?STREAM) @@ -593,11 +700,11 @@ rand_payload() -> temperature => rand:uniform(40), humidity => rand:uniform(100) }). -gen_batch_req(Count) when +gen_batch_req(Count, ActionId) when is_integer(Count) andalso Count > 0 -> - [{send_message, rand_data()} || _Val <- lists:seq(1, Count)]; -gen_batch_req(Count) -> + [{ActionId, rand_data()} || _Val <- lists:seq(1, Count)]; +gen_batch_req(Count, _ActionId) -> ct:pal("Gen batch requests failed with unexpected Count: ~p", [Count]). str(List) when is_list(List) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index f8800cc10..4ccdc193b 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -28,6 +28,8 @@ resource_type(confluent_producer) -> emqx_bridge_kafka_impl_producer; resource_type(gcp_pubsub_producer) -> emqx_bridge_gcp_pubsub_impl_producer; +resource_type(hstreamdb) -> + emqx_bridge_hstreamdb_connector; resource_type(kafka_producer) -> emqx_bridge_kafka_impl_producer; resource_type(kinesis) -> @@ -122,6 +124,14 @@ connector_structs() -> required => false } )}, + {hstreamdb, + mk( + hoconsc:map(name, ref(emqx_bridge_hstreamdb, "config_connector")), + #{ + desc => <<"HStreamDB Connector Config">>, + required => false + } + )}, {kafka_producer, mk( hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")), @@ -298,6 +308,7 @@ schema_modules() -> emqx_bridge_azure_event_hub, emqx_bridge_confluent_producer, emqx_bridge_gcp_pubsub_producer_schema, + emqx_bridge_hstreamdb, emqx_bridge_kafka, emqx_bridge_kinesis, emqx_bridge_matrix, @@ -336,6 +347,7 @@ api_schemas(Method) -> <<"gcp_pubsub_producer">>, Method ++ "_connector" ), + api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method ++ "_connector"), api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"), api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 751efa3d9..189b9e2cf 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -128,6 +128,8 @@ connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer]; +connector_type_to_bridge_types(hstreamdb) -> + [hstreamdb]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(kinesis) -> diff --git a/mix.exs b/mix.exs index c2ef491e9..113fb8e2c 100644 --- a/mix.exs +++ b/mix.exs @@ -200,7 +200,7 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ - {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"}, + {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, {:wolff, github: "kafka4beam/wolff", tag: "1.10.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},