chore: bump hstreamdb_erl to `0.4.3+v0.16.1`

This commit is contained in:
JimMoen 2023-08-15 16:56:25 +08:00
parent ed67303281
commit 5527edf442
No known key found for this signature in database
GPG Key ID: 87A520B4F76BA86D
5 changed files with 63 additions and 57 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.3.1+v0.12.0"}}},
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.4.3+v0.16.1"}}},
{emqx, {path, "../../apps/emqx"}},
{emqx_utils, {path, "../../apps/emqx_utils"}}
]}.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_hstreamdb, [
{description, "EMQX Enterprise HStreamDB Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [
kernel,

View File

@ -75,7 +75,7 @@ on_query(
}
) ->
try to_record(PartitionKey, HRecordTemplate, Data) of
Record -> append_record(Producer, Record)
Record -> append_record(Producer, Record, false)
catch
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
end.
@ -88,7 +88,7 @@ on_batch_query(
}
) ->
try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of
Records -> append_record(Producer, Records)
Records -> append_record(Producer, Records, true)
catch
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
end.
@ -156,16 +156,29 @@ start_client(InstId, Config) ->
{error, Error}
end.
do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->
do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize, ssl := SSL}) ->
?SLOG(info, #{
msg => "starting hstreamdb connector: client",
connector => InstId,
config => Config
}),
ClientName = client_name(InstId),
RpcOpts =
case maps:get(enable, SSL) of
false ->
#{pool_size => PoolSize};
true ->
#{
pool_size => PoolSize,
gun_opts => #{
transport => tls,
transport_opts => emqx_tls_lib:to_client_opts(SSL)
}
}
end,
ClientOptions = [
{url, binary_to_list(Server)},
{rpc_options, #{pool_size => PoolSize}}
{rpc_options, RpcOpts}
],
case hstreamdb:start_client(ClientName, ClientOptions) of
{ok, Client} ->
@ -206,12 +219,7 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->
end.
is_alive(Client) ->
case hstreamdb:echo(Client) of
{ok, _Echo} ->
true;
_ErrorEcho ->
false
end.
hstreamdb_client:echo(Client) =:= ok.
start_producer(
InstId,
@ -280,54 +288,52 @@ to_record(PartitionKey, RawRecord) ->
hstreamdb:to_record(PartitionKey, raw, RawRecord).
to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) ->
Records0 = lists:map(
lists:map(
fun({send_message, Data}) ->
to_record(PartitionKeyTmpl, HRecordTmpl, Data)
end,
BatchList
),
PartitionKeys = proplists:get_keys(Records0),
[
{PartitionKey, proplists:get_all_values(PartitionKey, Records0)}
|| PartitionKey <- PartitionKeys
].
).
append_record(Producer, MultiPartsRecords) when is_list(MultiPartsRecords) ->
lists:foreach(fun(Record) -> append_record(Producer, Record) end, MultiPartsRecords);
append_record(Producer, Record) when is_tuple(Record) ->
do_append_records(false, Producer, Record).
append_record(Producer, MultiPartsRecords, MaybeBatch) when is_list(MultiPartsRecords) ->
lists:foreach(
fun(Record) -> append_record(Producer, Record, MaybeBatch) end, MultiPartsRecords
);
append_record(Producer, Record, MaybeBatch) when is_tuple(Record) ->
do_append_records(Producer, Record, MaybeBatch).
%% TODO: only sync request supported. implement async request later.
do_append_records(false, Producer, Record) ->
case hstreamdb:append_flush(Producer, Record) of
{ok, _Result} ->
?tp(
hstreamdb_connector_query_return,
#{result => _Result}
),
?SLOG(debug, #{
msg => "HStreamDB producer sync append success",
record => Record
});
%% the HStream is warming up or buzy, something are not ready yet, retry after a while
{error, {unavailable, _} = Reason} ->
{error,
{recoverable_error, #{
msg => "HStreamDB is warming up or buzy, will retry after a moment",
reason => Reason
}}};
{error, Reason} = Err ->
?tp(
hstreamdb_connector_query_return,
#{error => Reason}
),
?SLOG(error, #{
msg => "HStreamDB producer sync append failed",
reason => Reason,
record => Record
}),
Err
end.
do_append_records(Producer, Record, true = IsBatch) ->
Result = hstreamdb:append(Producer, Record),
handle_result(Result, Record, IsBatch);
do_append_records(Producer, Record, false = IsBatch) ->
Result = hstreamdb:append_flush(Producer, Record),
handle_result(Result, Record, IsBatch).
handle_result(ok = Result, Record, IsBatch) ->
handle_result({ok, Result}, Record, IsBatch);
handle_result({ok, Result}, Record, IsBatch) ->
?tp(
hstreamdb_connector_query_append_return,
#{result => Result, is_batch => IsBatch}
),
?SLOG(debug, #{
msg => "HStreamDB producer sync append success",
record => Record,
is_batch => IsBatch
});
handle_result({error, Reason} = Err, Record, IsBatch) ->
?tp(
hstreamdb_connector_query_append_return,
#{error => Reason, is_batch => IsBatch}
),
?SLOG(error, #{
msg => "HStreamDB producer sync append failed",
reason => Reason,
record => Record,
is_batch => IsBatch
}),
Err.
client_name(InstId) ->
"client:" ++ to_string(InstId).

View File

@ -432,7 +432,7 @@ client(Name, Config, N) ->
try
_ = hstreamdb:stop_client(Name),
{ok, Client} = hstreamdb:start_client(Name, default_options(Config)),
{ok, echo} = hstreamdb:echo(Client),
ok = hstreamdb_client:echo(Client),
Client
catch
Class:Error ->
@ -509,7 +509,7 @@ health_check_resource_down(Config) ->
% These funs start and then stop the hstreamdb connection
connect_and_create_stream(Config) ->
?WITH_CLIENT(
_ = hstreamdb:create_stream(
_ = hstreamdb_client:create_stream(
Client, ?STREAM, ?REPLICATION_FACTOR, ?BACKLOG_RETENTION_SECOND, ?SHARD_COUNT
)
),
@ -531,7 +531,7 @@ connect_and_create_stream(Config) ->
connect_and_delete_stream(Config) ->
?WITH_CLIENT(
_ = hstreamdb:delete_stream(Client, ?STREAM)
_ = hstreamdb_client:delete_stream(Client, ?STREAM)
).
%%--------------------------------------------------------------------

View File

@ -227,7 +227,7 @@ defmodule EMQXUmbrella.MixProject do
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
[
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.3.1+v0.12.0"},
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.3+v0.16.1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.7.6"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},