feat: hstreamdb bridge with batch query

This commit is contained in:
JimMoen 2023-07-03 18:45:22 +08:00
parent b9bfdfd583
commit 1587f038a5
No known key found for this signature in database
GPG Key ID: 87A520B4F76BA86D
6 changed files with 181 additions and 144 deletions

View File

@ -32,16 +32,31 @@ conn_bridge_examples(Method) ->
}
].
values(_Method) ->
values(get) ->
values(post);
values(put) ->
values(post);
values(post) ->
#{
type => hstreamdb,
type => <<"hstreamdb">>,
name => <<"demo">>,
connector => <<"hstreamdb:connector">>,
enable => true,
direction => egress,
local_topic => <<"local/topic/#">>,
payload => <<"${payload}">>
}.
direction => <<"egress">>,
url => <<"http://127.0.0.1:6570">>,
stream => <<"stream">>,
%% raw HRecord
record_template =>
<<"{ \"temperature\": ${payload.temperature}, \"humidity\": ${payload.humidity} }">>,
pool_size => 8,
%% grpc_timeout => <<"1m">>
resource_opts => #{
query_mode => sync,
batch_size => 100,
batch_time => <<"20ms">>
},
ssl => #{enable => false}
};
values(_) ->
#{}.
%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
@ -50,41 +65,45 @@ namespace() -> "bridge_hstreamdb".
roots() -> [].
fields("config") ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
{payload, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("payload")})},
{connector, field(connector)}
];
hstream_bridge_common_fields() ++
connector_fields();
fields("post") ->
[type_field(), name_field() | fields("config")];
fields("put") ->
fields("config");
hstream_bridge_common_fields() ++
connector_fields() ++
type_name_fields();
fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post").
hstream_bridge_common_fields() ++
connector_fields() ++
type_name_fields() ++
emqx_bridge_schema:status_fields();
fields("put") ->
hstream_bridge_common_fields() ++
connector_fields().
field(connector) ->
mk(
hoconsc:union([binary(), ref(emqx_bridge_hstreamdb_connector, config)]),
#{
required => true,
example => <<"hstreamdb:demo">>,
desc => ?DESC("desc_connector")
}
).
hstream_bridge_common_fields() ->
emqx_bridge_schema:common_bridge_fields() ++
[
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
{record_template,
mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})}
] ++
emqx_resource_schema:fields("resource_opts").
connector_fields() ->
emqx_bridge_hstreamdb_connector:fields(config).
desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for HStream using `", string:to_upper(Method), "` method."];
["Configuration for HStreamDB bridge using `", string:to_upper(Method), "` method."];
desc(_) ->
undefined.
%% -------------------------------------------------------------------------------------------------
%% internal
type_field() ->
{type, mk(enum([hstreamdb]), #{required => true, desc => ?DESC("desc_type")})}.
name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
type_name_fields() ->
[
{type, mk(enum([hstreamdb]), #{required => true, desc => ?DESC("desc_type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
].

View File

@ -6,6 +6,7 @@
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(hoconsc, [mk/2, enum/1]).
@ -17,6 +18,7 @@
on_start/2,
on_stop/2,
on_query/3,
on_batch_query/3,
on_get_status/2
]).
@ -28,10 +30,12 @@
namespace/0,
roots/0,
fields/1,
desc/1,
connector_examples/1
desc/1
]).
-define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)).
-define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>).
%% -------------------------------------------------------------------------------------------------
%% resource callback
callback_mode() -> always_sync.
@ -51,13 +55,35 @@ on_stop(InstId, #{client := Client, producer := Producer}) ->
stop_producer => StopProducerRes
}).
-define(FAILED_TO_APPLY_HRECORD_TEMPLATE,
{error, {unrecoverable_error, failed_to_apply_hrecord_template}}
).
on_query(
_InstId,
{send_message, Data},
#{producer := Producer, ordering_key := OrderingKey, payload := Payload}
_State = #{
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
}
) ->
Record = to_record(OrderingKey, Payload, Data),
do_append(Producer, Record).
try to_record(PartitionKey, HRecordTemplate, Data) of
Record -> append_record(Producer, Record)
catch
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
end.
on_batch_query(
_InstId,
BatchList,
_State = #{
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
}
) ->
try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of
Records -> append_record(Producer, Records)
catch
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
end.
on_get_status(_InstId, #{client := Client}) ->
case is_alive(Client) of
@ -87,43 +113,16 @@ fields(config) ->
[
{url, mk(binary(), #{required => true, desc => ?DESC("url")})},
{stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})},
{ordering_key, mk(binary(), #{required => false, desc => ?DESC("ordering_key")})},
{pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})}
];
fields("get") ->
fields("post");
fields("put") ->
fields(config);
fields("post") ->
[
{type, mk(hstreamdb, #{required => true, desc => ?DESC("type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("name")})}
] ++ fields("put").
{partition_key, mk(binary(), #{required => false, desc => ?DESC("partition_key")})},
{pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})},
{grpc_timeout, fun grpc_timeout/1}
] ++ emqx_connector_schema_lib:ssl_fields().
connector_examples(Method) ->
[
#{
<<"hstreamdb">> => #{
summary => <<"HStreamDB Connector">>,
value => values(Method)
}
}
].
values(post) ->
maps:merge(values(put), #{name => <<"connector">>});
values(get) ->
values(post);
values(put) ->
#{
type => hstreamdb,
url => <<"http://127.0.0.1:6570">>,
stream => <<"stream1">>,
ordering_key => <<"some_key">>,
pool_size => 8
};
values(_) ->
#{}.
grpc_timeout(type) -> emqx_schema:timeout_duration_ms();
grpc_timeout(desc) -> ?DESC("grpc_timeout");
grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW;
grpc_timeout(required) -> false;
grpc_timeout(_) -> undefined.
desc(config) ->
?DESC("config").
@ -168,6 +167,10 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->
}),
start_producer(InstId, Client, Config);
_ ->
?tp(
hstreamdb_connector_start_failed,
#{error => client_not_alive}
),
?SLOG(error, #{
msg => "hstreamdb connector: client not alive",
connector => InstId
@ -202,7 +205,7 @@ is_alive(Client) ->
start_producer(
InstId,
Client,
Options = #{stream := Stream, pool_size := PoolSize, egress := #{payload := PayloadBin}}
Options = #{stream := Stream, pool_size := PoolSize}
) ->
%% TODO: change these batch options after we have better disk cache.
BatchSize = maps:get(batch_size, Options, 100),
@ -212,7 +215,8 @@ start_producer(
{callback, {?MODULE, on_flush_result, []}},
{max_records, BatchSize},
{interval, Interval},
{pool_size, PoolSize}
{pool_size, PoolSize},
{grpc_timeout, maps:get(grpc_timeout, Options, ?DEFAULT_GRPC_TIMEOUT)}
],
Name = produce_name(InstId),
?SLOG(info, #{
@ -224,16 +228,14 @@ start_producer(
?SLOG(info, #{
msg => "hstreamdb connector: producer started"
}),
EnableBatch = maps:get(enable_batch, Options, false),
Payload = emqx_placeholder:preproc_tmpl(PayloadBin),
OrderingKeyBin = maps:get(ordering_key, Options, <<"">>),
OrderingKey = emqx_placeholder:preproc_tmpl(OrderingKeyBin),
State = #{
client => Client,
producer => Producer,
enable_batch => EnableBatch,
ordering_key => OrderingKey,
payload => Payload
enable_batch => maps:get(enable_batch, Options, false),
partition_key => emqx_placeholder:preproc_tmpl(
maps:get(partition_key, Options, <<"">>)
),
record_template => record_template(Options)
},
{ok, State};
{error, {already_started, Pid}} ->
@ -253,47 +255,53 @@ start_producer(
{error, Reason}
end.
to_record(OrderingKeyTmpl, PayloadTmpl, Data) ->
OrderingKey = emqx_placeholder:proc_tmpl(OrderingKeyTmpl, Data),
Payload = emqx_placeholder:proc_tmpl(PayloadTmpl, Data),
to_record(OrderingKey, Payload).
to_record(PartitionKeyTmpl, HRecordTmpl, Data) ->
PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data),
RawRecord = emqx_placeholder:proc_tmpl(HRecordTmpl, Data),
to_record(PartitionKey, RawRecord).
to_record(OrderingKey, Payload) when is_binary(OrderingKey) ->
to_record(binary_to_list(OrderingKey), Payload);
to_record(OrderingKey, Payload) ->
hstreamdb:to_record(OrderingKey, raw, Payload).
to_record(PartitionKey, RawRecord) when is_binary(PartitionKey) ->
to_record(binary_to_list(PartitionKey), RawRecord);
to_record(PartitionKey, RawRecord) ->
hstreamdb:to_record(PartitionKey, raw, RawRecord).
do_append(Producer, Record) ->
do_append(false, Producer, Record).
to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) ->
Records0 = lists:map(
fun({send_message, Data}) ->
to_record(PartitionKeyTmpl, HRecordTmpl, Data)
end,
BatchList
),
PartitionKeys = proplists:get_keys(Records0),
[
{PartitionKey, proplists:get_all_values(PartitionKey, Records0)}
|| PartitionKey <- PartitionKeys
].
%% TODO: this append is async, remove or change it after we have better disk cache.
% do_append(true, Producer, Record) ->
% case hstreamdb:append(Producer, Record) of
% ok ->
% ?SLOG(debug, #{
% msg => "hstreamdb producer async append success",
% record => Record
% });
% {error, Reason} = Err ->
% ?SLOG(error, #{
% msg => "hstreamdb producer async append failed",
% reason => Reason,
% record => Record
% }),
% Err
% end;
do_append(false, Producer, Record) ->
%% TODO: this append is sync, but it does not support [Record], can only append one Record.
%% Change it after we have better dick cache.
append_record(Producer, MultiPartsRecords) when is_list(MultiPartsRecords) ->
lists:foreach(fun(Record) -> append_record(Producer, Record) end, MultiPartsRecords);
append_record(Producer, Record) when is_tuple(Record) ->
do_append_records(false, Producer, Record).
%% TODO: only sync request supported. implement async request later.
do_append_records(false, Producer, Record) ->
case hstreamdb:append_flush(Producer, Record) of
{ok, _} ->
{ok, _Result} ->
?tp(
hstreamdb_connector_query_return,
#{result => _Result}
),
?SLOG(debug, #{
msg => "hstreamdb producer sync append success",
msg => "HStreamDB producer sync append success",
record => Record
});
{error, Reason} = Err ->
?tp(
hstreamdb_connector_query_return,
#{error => Reason}
),
?SLOG(error, #{
msg => "hstreamdb producer sync append failed",
msg => "HStreamDB producer sync append failed",
reason => Reason,
record => Record
}),
@ -306,6 +314,11 @@ client_name(InstId) ->
produce_name(ActionId) ->
list_to_atom("producer:" ++ to_string(ActionId)).
record_template(#{record_template := RawHRecordTemplate}) ->
emqx_placeholder:preproc_tmpl(RawHRecordTemplate);
record_template(_) ->
emqx_placeholder:preproc_tmpl(<<"${payload}">>).
to_string(List) when is_list(List) -> List;
to_string(Bin) when is_binary(Bin) -> binary_to_list(Bin);
to_string(Atom) when is_atom(Atom) -> atom_to_list(Atom).

View File

@ -0,0 +1 @@
Add HStreamDB bridge support, adapted to the HStreamDB `v0.15.0`.

View File

@ -6,12 +6,6 @@ config_direction.desc:
config_direction.label:
"""Bridge Direction"""
config_enable.desc:
"""Enable or disable this bridge"""
config_enable.label:
"""Enable Or Disable Bridge"""
desc_config.desc:
"""Configuration for an HStreamDB bridge."""
@ -46,10 +40,10 @@ will be forwarded."""
local_topic.label:
"""Local Topic"""
payload.desc:
"""The payload to be forwarded to the HStreamDB. Placeholders supported."""
record_template.desc:
"""The HStream Record template to be forwarded to the HStreamDB. Placeholders supported."""
payload.label:
"""Payload"""
record_template.label:
"""HStream Record"""
}

View File

@ -6,16 +6,34 @@ config.desc:
config.label:
"""Connection config"""
type.desc:
"""The Connector Type."""
type.label:
"""Connector Type"""
name.desc:
"""Connector name, used as a human-readable description of the connector."""
name.label:
"""Connector Name"""
ordering_key.desc:
url.desc:
"""HStreamDB Server URL"""
url.label:
"""HStreamDB Server URL"""
stream_name.desc:
"""HStreamDB Stream Name"""
stream_name.label:
"""HStreamDB Stream Name"""
partition_key.desc:
"""HStreamDB Ordering Key"""
ordering_key.label:
partition_key.label:
"""HStreamDB Ordering Key"""
pool_size.desc:
@ -24,22 +42,10 @@ pool_size.desc:
pool_size.label:
"""HStreamDB Pool Size"""
stream_name.desc:
"""HStreamDB Stream Name"""
grpc_timeout.desc:
"""HStreamDB gRPC Timeout"""
stream_name.label:
"""HStreamDB Stream Name"""
type.desc:
"""The Connector Type."""
type.label:
"""Connector Type"""
url.desc:
"""HStreamDB Server URL"""
url.label:
"""HStreamDB Server URL"""
grpc_timeout.label:
"""HStreamDB gRPC Timeout"""
}

View File

@ -270,6 +270,10 @@ hstream
hstreamDB
hstream
hstreamdb
hrecord
hRecord
Hrecord
HRecord
SASL
GSSAPI
keytab