From 5527edf4426f9ac10d9e556de1444b025423785c Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 15 Aug 2023 16:56:25 +0800 Subject: [PATCH] chore: bump hstreamdb_erl to `0.4.3+v0.16.1` --- apps/emqx_bridge_hstreamdb/rebar.config | 2 +- .../src/emqx_bridge_hstreamdb.app.src | 2 +- .../src/emqx_bridge_hstreamdb_connector.erl | 108 +++++++++--------- .../test/emqx_bridge_hstreamdb_SUITE.erl | 6 +- mix.exs | 2 +- 5 files changed, 63 insertions(+), 57 deletions(-) diff --git a/apps/emqx_bridge_hstreamdb/rebar.config b/apps/emqx_bridge_hstreamdb/rebar.config index 9a70b55f9..44970b75d 100644 --- a/apps/emqx_bridge_hstreamdb/rebar.config +++ b/apps/emqx_bridge_hstreamdb/rebar.config @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {erl_opts, [debug_info]}. {deps, [ - {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.3.1+v0.12.0"}}}, + {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.4.3+v0.16.1"}}}, {emqx, {path, "../../apps/emqx"}}, {emqx_utils, {path, "../../apps/emqx_utils"}} ]}. diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src index 2a800baca..39c8c3258 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_hstreamdb, [ {description, "EMQX Enterprise HStreamDB Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index 1763b252a..727ea4ad8 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -75,7 +75,7 @@ on_query( } ) -> try to_record(PartitionKey, HRecordTemplate, Data) of - Record -> append_record(Producer, Record) + Record -> append_record(Producer, Record, false) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. @@ -88,7 +88,7 @@ on_batch_query( } ) -> try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of - Records -> append_record(Producer, Records) + Records -> append_record(Producer, Records, true) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. @@ -156,16 +156,29 @@ start_client(InstId, Config) -> {error, Error} end. -do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> +do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize, ssl := SSL}) -> ?SLOG(info, #{ msg => "starting hstreamdb connector: client", connector => InstId, config => Config }), ClientName = client_name(InstId), + RpcOpts = + case maps:get(enable, SSL) of + false -> + #{pool_size => PoolSize}; + true -> + #{ + pool_size => PoolSize, + gun_opts => #{ + transport => tls, + transport_opts => emqx_tls_lib:to_client_opts(SSL) + } + } + end, ClientOptions = [ {url, binary_to_list(Server)}, - {rpc_options, #{pool_size => PoolSize}} + {rpc_options, RpcOpts} ], case hstreamdb:start_client(ClientName, ClientOptions) of {ok, Client} -> @@ -206,12 +219,7 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> end. is_alive(Client) -> - case hstreamdb:echo(Client) of - {ok, _Echo} -> - true; - _ErrorEcho -> - false - end. + hstreamdb_client:echo(Client) =:= ok. start_producer( InstId, @@ -280,54 +288,52 @@ to_record(PartitionKey, RawRecord) -> hstreamdb:to_record(PartitionKey, raw, RawRecord). to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) -> - Records0 = lists:map( + lists:map( fun({send_message, Data}) -> to_record(PartitionKeyTmpl, HRecordTmpl, Data) end, BatchList - ), - PartitionKeys = proplists:get_keys(Records0), - [ - {PartitionKey, proplists:get_all_values(PartitionKey, Records0)} - || PartitionKey <- PartitionKeys - ]. + ). -append_record(Producer, MultiPartsRecords) when is_list(MultiPartsRecords) -> - lists:foreach(fun(Record) -> append_record(Producer, Record) end, MultiPartsRecords); -append_record(Producer, Record) when is_tuple(Record) -> - do_append_records(false, Producer, Record). +append_record(Producer, MultiPartsRecords, MaybeBatch) when is_list(MultiPartsRecords) -> + lists:foreach( + fun(Record) -> append_record(Producer, Record, MaybeBatch) end, MultiPartsRecords + ); +append_record(Producer, Record, MaybeBatch) when is_tuple(Record) -> + do_append_records(Producer, Record, MaybeBatch). %% TODO: only sync request supported. implement async request later. -do_append_records(false, Producer, Record) -> - case hstreamdb:append_flush(Producer, Record) of - {ok, _Result} -> - ?tp( - hstreamdb_connector_query_return, - #{result => _Result} - ), - ?SLOG(debug, #{ - msg => "HStreamDB producer sync append success", - record => Record - }); - %% the HStream is warming up or buzy, something are not ready yet, retry after a while - {error, {unavailable, _} = Reason} -> - {error, - {recoverable_error, #{ - msg => "HStreamDB is warming up or buzy, will retry after a moment", - reason => Reason - }}}; - {error, Reason} = Err -> - ?tp( - hstreamdb_connector_query_return, - #{error => Reason} - ), - ?SLOG(error, #{ - msg => "HStreamDB producer sync append failed", - reason => Reason, - record => Record - }), - Err - end. +do_append_records(Producer, Record, true = IsBatch) -> + Result = hstreamdb:append(Producer, Record), + handle_result(Result, Record, IsBatch); +do_append_records(Producer, Record, false = IsBatch) -> + Result = hstreamdb:append_flush(Producer, Record), + handle_result(Result, Record, IsBatch). + +handle_result(ok = Result, Record, IsBatch) -> + handle_result({ok, Result}, Record, IsBatch); +handle_result({ok, Result}, Record, IsBatch) -> + ?tp( + hstreamdb_connector_query_append_return, + #{result => Result, is_batch => IsBatch} + ), + ?SLOG(debug, #{ + msg => "HStreamDB producer sync append success", + record => Record, + is_batch => IsBatch + }); +handle_result({error, Reason} = Err, Record, IsBatch) -> + ?tp( + hstreamdb_connector_query_append_return, + #{error => Reason, is_batch => IsBatch} + ), + ?SLOG(error, #{ + msg => "HStreamDB producer sync append failed", + reason => Reason, + record => Record, + is_batch => IsBatch + }), + Err. client_name(InstId) -> "client:" ++ to_string(InstId). diff --git a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl index 430343274..46f28ad40 100644 --- a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl +++ b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl @@ -432,7 +432,7 @@ client(Name, Config, N) -> try _ = hstreamdb:stop_client(Name), {ok, Client} = hstreamdb:start_client(Name, default_options(Config)), - {ok, echo} = hstreamdb:echo(Client), + ok = hstreamdb_client:echo(Client), Client catch Class:Error -> @@ -509,7 +509,7 @@ health_check_resource_down(Config) -> % These funs start and then stop the hstreamdb connection connect_and_create_stream(Config) -> ?WITH_CLIENT( - _ = hstreamdb:create_stream( + _ = hstreamdb_client:create_stream( Client, ?STREAM, ?REPLICATION_FACTOR, ?BACKLOG_RETENTION_SECOND, ?SHARD_COUNT ) ), @@ -531,7 +531,7 @@ connect_and_create_stream(Config) -> connect_and_delete_stream(Config) -> ?WITH_CLIENT( - _ = hstreamdb:delete_stream(Client, ?STREAM) + _ = hstreamdb_client:delete_stream(Client, ?STREAM) ). %%-------------------------------------------------------------------- diff --git a/mix.exs b/mix.exs index e6047249e..496f0b0c5 100644 --- a/mix.exs +++ b/mix.exs @@ -227,7 +227,7 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ - {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.3.1+v0.12.0"}, + {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.3+v0.16.1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true}, {:wolff, github: "kafka4beam/wolff", tag: "1.7.6"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},