From 5527edf4426f9ac10d9e556de1444b025423785c Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 15 Aug 2023 16:56:25 +0800 Subject: [PATCH 1/4] chore: bump hstreamdb_erl to `0.4.3+v0.16.1` --- apps/emqx_bridge_hstreamdb/rebar.config | 2 +- .../src/emqx_bridge_hstreamdb.app.src | 2 +- .../src/emqx_bridge_hstreamdb_connector.erl | 108 +++++++++--------- .../test/emqx_bridge_hstreamdb_SUITE.erl | 6 +- mix.exs | 2 +- 5 files changed, 63 insertions(+), 57 deletions(-) diff --git a/apps/emqx_bridge_hstreamdb/rebar.config b/apps/emqx_bridge_hstreamdb/rebar.config index 9a70b55f9..44970b75d 100644 --- a/apps/emqx_bridge_hstreamdb/rebar.config +++ b/apps/emqx_bridge_hstreamdb/rebar.config @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {erl_opts, [debug_info]}. {deps, [ - {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.3.1+v0.12.0"}}}, + {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.4.3+v0.16.1"}}}, {emqx, {path, "../../apps/emqx"}}, {emqx_utils, {path, "../../apps/emqx_utils"}} ]}. diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src index 2a800baca..39c8c3258 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_hstreamdb, [ {description, "EMQX Enterprise HStreamDB Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index 1763b252a..727ea4ad8 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -75,7 +75,7 @@ on_query( } ) -> try to_record(PartitionKey, HRecordTemplate, Data) of - Record -> append_record(Producer, Record) + Record -> append_record(Producer, Record, false) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. @@ -88,7 +88,7 @@ on_batch_query( } ) -> try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of - Records -> append_record(Producer, Records) + Records -> append_record(Producer, Records, true) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. @@ -156,16 +156,29 @@ start_client(InstId, Config) -> {error, Error} end. -do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> +do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize, ssl := SSL}) -> ?SLOG(info, #{ msg => "starting hstreamdb connector: client", connector => InstId, config => Config }), ClientName = client_name(InstId), + RpcOpts = + case maps:get(enable, SSL) of + false -> + #{pool_size => PoolSize}; + true -> + #{ + pool_size => PoolSize, + gun_opts => #{ + transport => tls, + transport_opts => emqx_tls_lib:to_client_opts(SSL) + } + } + end, ClientOptions = [ {url, binary_to_list(Server)}, - {rpc_options, #{pool_size => PoolSize}} + {rpc_options, RpcOpts} ], case hstreamdb:start_client(ClientName, ClientOptions) of {ok, Client} -> @@ -206,12 +219,7 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> end. is_alive(Client) -> - case hstreamdb:echo(Client) of - {ok, _Echo} -> - true; - _ErrorEcho -> - false - end. + hstreamdb_client:echo(Client) =:= ok. start_producer( InstId, @@ -280,54 +288,52 @@ to_record(PartitionKey, RawRecord) -> hstreamdb:to_record(PartitionKey, raw, RawRecord). to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) -> - Records0 = lists:map( + lists:map( fun({send_message, Data}) -> to_record(PartitionKeyTmpl, HRecordTmpl, Data) end, BatchList - ), - PartitionKeys = proplists:get_keys(Records0), - [ - {PartitionKey, proplists:get_all_values(PartitionKey, Records0)} - || PartitionKey <- PartitionKeys - ]. + ). -append_record(Producer, MultiPartsRecords) when is_list(MultiPartsRecords) -> - lists:foreach(fun(Record) -> append_record(Producer, Record) end, MultiPartsRecords); -append_record(Producer, Record) when is_tuple(Record) -> - do_append_records(false, Producer, Record). +append_record(Producer, MultiPartsRecords, MaybeBatch) when is_list(MultiPartsRecords) -> + lists:foreach( + fun(Record) -> append_record(Producer, Record, MaybeBatch) end, MultiPartsRecords + ); +append_record(Producer, Record, MaybeBatch) when is_tuple(Record) -> + do_append_records(Producer, Record, MaybeBatch). %% TODO: only sync request supported. implement async request later. -do_append_records(false, Producer, Record) -> - case hstreamdb:append_flush(Producer, Record) of - {ok, _Result} -> - ?tp( - hstreamdb_connector_query_return, - #{result => _Result} - ), - ?SLOG(debug, #{ - msg => "HStreamDB producer sync append success", - record => Record - }); - %% the HStream is warming up or buzy, something are not ready yet, retry after a while - {error, {unavailable, _} = Reason} -> - {error, - {recoverable_error, #{ - msg => "HStreamDB is warming up or buzy, will retry after a moment", - reason => Reason - }}}; - {error, Reason} = Err -> - ?tp( - hstreamdb_connector_query_return, - #{error => Reason} - ), - ?SLOG(error, #{ - msg => "HStreamDB producer sync append failed", - reason => Reason, - record => Record - }), - Err - end. +do_append_records(Producer, Record, true = IsBatch) -> + Result = hstreamdb:append(Producer, Record), + handle_result(Result, Record, IsBatch); +do_append_records(Producer, Record, false = IsBatch) -> + Result = hstreamdb:append_flush(Producer, Record), + handle_result(Result, Record, IsBatch). + +handle_result(ok = Result, Record, IsBatch) -> + handle_result({ok, Result}, Record, IsBatch); +handle_result({ok, Result}, Record, IsBatch) -> + ?tp( + hstreamdb_connector_query_append_return, + #{result => Result, is_batch => IsBatch} + ), + ?SLOG(debug, #{ + msg => "HStreamDB producer sync append success", + record => Record, + is_batch => IsBatch + }); +handle_result({error, Reason} = Err, Record, IsBatch) -> + ?tp( + hstreamdb_connector_query_append_return, + #{error => Reason, is_batch => IsBatch} + ), + ?SLOG(error, #{ + msg => "HStreamDB producer sync append failed", + reason => Reason, + record => Record, + is_batch => IsBatch + }), + Err. client_name(InstId) -> "client:" ++ to_string(InstId). diff --git a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl index 430343274..46f28ad40 100644 --- a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl +++ b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl @@ -432,7 +432,7 @@ client(Name, Config, N) -> try _ = hstreamdb:stop_client(Name), {ok, Client} = hstreamdb:start_client(Name, default_options(Config)), - {ok, echo} = hstreamdb:echo(Client), + ok = hstreamdb_client:echo(Client), Client catch Class:Error -> @@ -509,7 +509,7 @@ health_check_resource_down(Config) -> % These funs start and then stop the hstreamdb connection connect_and_create_stream(Config) -> ?WITH_CLIENT( - _ = hstreamdb:create_stream( + _ = hstreamdb_client:create_stream( Client, ?STREAM, ?REPLICATION_FACTOR, ?BACKLOG_RETENTION_SECOND, ?SHARD_COUNT ) ), @@ -531,7 +531,7 @@ connect_and_create_stream(Config) -> connect_and_delete_stream(Config) -> ?WITH_CLIENT( - _ = hstreamdb:delete_stream(Client, ?STREAM) + _ = hstreamdb_client:delete_stream(Client, ?STREAM) ). %%-------------------------------------------------------------------- diff --git a/mix.exs b/mix.exs index e6047249e..496f0b0c5 100644 --- a/mix.exs +++ b/mix.exs @@ -227,7 +227,7 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ - {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.3.1+v0.12.0"}, + {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.3+v0.16.1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true}, {:wolff, github: "kafka4beam/wolff", tag: "1.7.6"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, From 649647190e686ef17c0f2b805f1697fed9e88225 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 18 Aug 2023 18:10:29 +0800 Subject: [PATCH 2/4] test: new batch mechanism of hstreamdb_erl --- .../test/emqx_bridge_hstreamdb_SUITE.erl | 71 ++++++++++++++----- 1 file changed, 53 insertions(+), 18 deletions(-) diff --git a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl index 46f28ad40..14ea202be 100644 --- a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl +++ b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl @@ -13,8 +13,13 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). % SQL definitions --define(STREAM, "stream"). + +-define(STREAM, "demo_stream"). +%% could not be "stream" in Production Environment +%% especially not in hstreamdb_sql CLI client + -define(REPLICATION_FACTOR, 1). + %% in seconds -define(BACKLOG_RETENTION_SECOND, (24 * 60 * 60)). -define(SHARD_COUNT, 1). @@ -146,16 +151,23 @@ t_setup_via_config_and_publish(Config) -> begin ?wait_async_action( ?assertEqual(ok, send_message(Config, Data)), - #{?snk_kind := hstreamdb_connector_query_return}, + #{?snk_kind := hstreamdb_connector_query_append_return}, 10_000 ), ok end, fun(Trace0) -> - Trace = ?of_kind(hstreamdb_connector_query_return, Trace0), + Trace = ?of_kind(hstreamdb_connector_query_append_return, Trace0), lists:foreach( fun(EachTrace) -> - ?assertMatch(#{result := #{streamName := <>}}, EachTrace) + case ?config(enable_batch, Config) of + true -> + ?assertMatch(#{result := ok, is_batch := true}, EachTrace); + false -> + ?assertMatch( + #{result := #{'batchId' := _}, is_batch := false}, EachTrace + ) + end end, Trace ), @@ -181,16 +193,26 @@ t_setup_via_http_api_and_publish(Config) -> begin ?wait_async_action( ?assertEqual(ok, send_message(Config, Data)), - #{?snk_kind := hstreamdb_connector_query_return}, + #{?snk_kind := hstreamdb_connector_query_append_return}, 10_000 ), ok end, fun(Trace) -> - ?assertMatch( - [#{result := #{streamName := <>}}], - ?of_kind(hstreamdb_connector_query_return, Trace) - ) + lists:foreach( + fun(EachTrace) -> + case ?config(enable_batch, Config) of + true -> + ?assertMatch(#{result := ok, is_batch := true}, EachTrace); + false -> + ?assertMatch( + #{result := #{'batchId' := _}, is_batch := false}, EachTrace + ) + end + end, + ?of_kind(hstreamdb_connector_query_append_return, Trace) + ), + ok end ), ok. @@ -240,6 +262,7 @@ t_write_failure(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), QueryMode = ?config(query_mode, Config), + EnableBatch = ?config(enable_batch, Config), Data = rand_data(), {{ok, _}, {ok, _}} = ?wait_async_action( @@ -251,10 +274,16 @@ t_write_failure(Config) -> health_check_resource_down(Config), case QueryMode of sync -> - ?assertMatch( - {error, {resource_error, #{msg := "call resource timeout", reason := timeout}}}, - send_message(Config, Data) - ); + case EnableBatch of + true -> + %% append to batch always returns ok + ?assertMatch(ok, send_message(Config, Data)); + false -> + ?assertMatch( + {error, {cannot_list_shards, {<>, econnrefused}}}, + send_message(Config, Data) + ) + end; async -> %% TODO: async mode is not supported yet, %% but it will return ok if calling emqx_resource_buffer_worker:async_query/3, @@ -282,17 +311,23 @@ t_simple_query(Config) -> end, Requests ), - #{?snk_kind := hstreamdb_connector_query_return}, + #{?snk_kind := hstreamdb_connector_query_append_return}, 10_000 ) end, - fun(Trace0) -> - Trace = ?of_kind(hstreamdb_connector_query_return, Trace0), + fun(Trace) -> lists:foreach( fun(EachTrace) -> - ?assertMatch(#{result := #{streamName := <>}}, EachTrace) + case ?config(enable_batch, Config) of + true -> + ?assertMatch(#{result := ok, is_batch := true}, EachTrace); + false -> + ?assertMatch( + #{result := #{'batchId' := _}, is_batch := false}, EachTrace + ) + end end, - Trace + ?of_kind(hstreamdb_connector_query_append_return, Trace) ), ok end From f45b5fac6f3cb844b093b219a53447167660e17c Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 18 Aug 2023 18:20:37 +0800 Subject: [PATCH 3/4] chore: unhidden hstreamdb bridge for e5.2.0 --- apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl | 6 ++---- changes/ee/feat-11478.en.md | 1 + 3 files changed, 4 insertions(+), 5 deletions(-) create mode 100644 changes/ee/feat-11478.en.md diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 96d953e34..3a76843a1 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.26"}, + {vsn, "0.1.27"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index 07f858979..a6bd4a754 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -32,8 +32,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_mongodb, <<"mongodb_rs">>, Method ++ "_rs"), api_ref(emqx_bridge_mongodb, <<"mongodb_sharded">>, Method ++ "_sharded"), api_ref(emqx_bridge_mongodb, <<"mongodb_single">>, Method ++ "_single"), - %% TODO: un-hide for e5.2.0... - %%api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method), + api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method), api_ref(emqx_bridge_influxdb, <<"influxdb_api_v1">>, Method ++ "_api_v1"), api_ref(emqx_bridge_influxdb, <<"influxdb_api_v2">>, Method ++ "_api_v2"), api_ref(emqx_bridge_redis, <<"redis_single">>, Method ++ "_single"), @@ -147,8 +146,7 @@ fields(bridges) -> hoconsc:map(name, ref(emqx_bridge_hstreamdb, "config")), #{ desc => <<"HStreamDB Bridge Config">>, - required => false, - importance => ?IMPORTANCE_HIDDEN + required => false } )}, {mysql, diff --git a/changes/ee/feat-11478.en.md b/changes/ee/feat-11478.en.md new file mode 100644 index 000000000..42761c77e --- /dev/null +++ b/changes/ee/feat-11478.en.md @@ -0,0 +1 @@ +Add HStreamDB bridge support (both TCP and TLS connection allowed), adapted to the HStreamDB `v0.16.1`. From f559ac3410818af7eb059c609dfffcf85baeb3a6 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 21 Aug 2023 17:17:19 +0800 Subject: [PATCH 4/4] ci: bump hstreamdb & zookeeper image vsn --- .ci/docker-compose-file/.env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci/docker-compose-file/.env b/.ci/docker-compose-file/.env index b7033caae..e65600f06 100644 --- a/.ci/docker-compose-file/.env +++ b/.ci/docker-compose-file/.env @@ -10,7 +10,7 @@ CASSANDRA_TAG=3.11.6 MINIO_TAG=RELEASE.2023-03-20T20-16-18Z OPENTS_TAG=9aa7f88 KINESIS_TAG=2.1 -HSTREAMDB_TAG=v0.15.0 +HSTREAMDB_TAG=v0.16.1 HSTREAMDB_ZK_TAG=3.8.1 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server