From 1587f038a5c0464f1772b8119b3e897deae226c9 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 3 Jul 2023 18:45:22 +0800 Subject: [PATCH] feat: hstreamdb bridge with batch query --- .../src/emqx_bridge_hstreamdb.erl | 87 +++++---- .../src/emqx_bridge_hstreamdb_connector.erl | 177 ++++++++++-------- changes/ee/feat-10203.en.md | 1 + rel/i18n/emqx_bridge_hstreamdb.hocon | 14 +- .../emqx_bridge_hstreamdb_connector.hocon | 42 +++-- scripts/spellcheck/dicts/emqx.txt | 4 + 6 files changed, 181 insertions(+), 144 deletions(-) create mode 100644 changes/ee/feat-10203.en.md diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl index dacfc3633..7052e0120 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl @@ -32,16 +32,31 @@ conn_bridge_examples(Method) -> } ]. -values(_Method) -> +values(get) -> + values(post); +values(put) -> + values(post); +values(post) -> #{ - type => hstreamdb, + type => <<"hstreamdb">>, name => <<"demo">>, - connector => <<"hstreamdb:connector">>, - enable => true, - direction => egress, - local_topic => <<"local/topic/#">>, - payload => <<"${payload}">> - }. + direction => <<"egress">>, + url => <<"http://127.0.0.1:6570">>, + stream => <<"stream">>, + %% raw HRecord + record_template => + <<"{ \"temperature\": ${payload.temperature}, \"humidity\": ${payload.humidity} }">>, + pool_size => 8, + %% grpc_timeout => <<"1m">> + resource_opts => #{ + query_mode => sync, + batch_size => 100, + batch_time => <<"20ms">> + }, + ssl => #{enable => false} + }; +values(_) -> + #{}. %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions @@ -50,41 +65,45 @@ namespace() -> "bridge_hstreamdb". roots() -> []. fields("config") -> - [ - {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, - {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, - {payload, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("payload")})}, - {connector, field(connector)} - ]; + hstream_bridge_common_fields() ++ + connector_fields(); fields("post") -> - [type_field(), name_field() | fields("config")]; -fields("put") -> - fields("config"); + hstream_bridge_common_fields() ++ + connector_fields() ++ + type_name_fields(); fields("get") -> - emqx_bridge_schema:status_fields() ++ fields("post"). + hstream_bridge_common_fields() ++ + connector_fields() ++ + type_name_fields() ++ + emqx_bridge_schema:status_fields(); +fields("put") -> + hstream_bridge_common_fields() ++ + connector_fields(). -field(connector) -> - mk( - hoconsc:union([binary(), ref(emqx_bridge_hstreamdb_connector, config)]), - #{ - required => true, - example => <<"hstreamdb:demo">>, - desc => ?DESC("desc_connector") - } - ). +hstream_bridge_common_fields() -> + emqx_bridge_schema:common_bridge_fields() ++ + [ + {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, + {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, + {record_template, + mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})} + ] ++ + emqx_resource_schema:fields("resource_opts"). + +connector_fields() -> + emqx_bridge_hstreamdb_connector:fields(config). desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> - ["Configuration for HStream using `", string:to_upper(Method), "` method."]; + ["Configuration for HStreamDB bridge using `", string:to_upper(Method), "` method."]; desc(_) -> undefined. %% ------------------------------------------------------------------------------------------------- %% internal -type_field() -> - {type, mk(enum([hstreamdb]), #{required => true, desc => ?DESC("desc_type")})}. - -name_field() -> - {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. +type_name_fields() -> + [ + {type, mk(enum([hstreamdb]), #{required => true, desc => ?DESC("desc_type")})}, + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} + ]. diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index c2a210271..31627db81 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -6,6 +6,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -import(hoconsc, [mk/2, enum/1]). @@ -17,6 +18,7 @@ on_start/2, on_stop/2, on_query/3, + on_batch_query/3, on_get_status/2 ]). @@ -28,10 +30,12 @@ namespace/0, roots/0, fields/1, - desc/1, - connector_examples/1 + desc/1 ]). +-define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)). +-define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>). + %% ------------------------------------------------------------------------------------------------- %% resource callback callback_mode() -> always_sync. @@ -51,13 +55,35 @@ on_stop(InstId, #{client := Client, producer := Producer}) -> stop_producer => StopProducerRes }). +-define(FAILED_TO_APPLY_HRECORD_TEMPLATE, + {error, {unrecoverable_error, failed_to_apply_hrecord_template}} +). + on_query( _InstId, {send_message, Data}, - #{producer := Producer, ordering_key := OrderingKey, payload := Payload} + _State = #{ + producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate + } ) -> - Record = to_record(OrderingKey, Payload, Data), - do_append(Producer, Record). + try to_record(PartitionKey, HRecordTemplate, Data) of + Record -> append_record(Producer, Record) + catch + _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE + end. + +on_batch_query( + _InstId, + BatchList, + _State = #{ + producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate + } +) -> + try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of + Records -> append_record(Producer, Records) + catch + _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE + end. on_get_status(_InstId, #{client := Client}) -> case is_alive(Client) of @@ -87,43 +113,16 @@ fields(config) -> [ {url, mk(binary(), #{required => true, desc => ?DESC("url")})}, {stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})}, - {ordering_key, mk(binary(), #{required => false, desc => ?DESC("ordering_key")})}, - {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})} - ]; -fields("get") -> - fields("post"); -fields("put") -> - fields(config); -fields("post") -> - [ - {type, mk(hstreamdb, #{required => true, desc => ?DESC("type")})}, - {name, mk(binary(), #{required => true, desc => ?DESC("name")})} - ] ++ fields("put"). + {partition_key, mk(binary(), #{required => false, desc => ?DESC("partition_key")})}, + {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})}, + {grpc_timeout, fun grpc_timeout/1} + ] ++ emqx_connector_schema_lib:ssl_fields(). -connector_examples(Method) -> - [ - #{ - <<"hstreamdb">> => #{ - summary => <<"HStreamDB Connector">>, - value => values(Method) - } - } - ]. - -values(post) -> - maps:merge(values(put), #{name => <<"connector">>}); -values(get) -> - values(post); -values(put) -> - #{ - type => hstreamdb, - url => <<"http://127.0.0.1:6570">>, - stream => <<"stream1">>, - ordering_key => <<"some_key">>, - pool_size => 8 - }; -values(_) -> - #{}. +grpc_timeout(type) -> emqx_schema:timeout_duration_ms(); +grpc_timeout(desc) -> ?DESC("grpc_timeout"); +grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW; +grpc_timeout(required) -> false; +grpc_timeout(_) -> undefined. desc(config) -> ?DESC("config"). @@ -168,6 +167,10 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> }), start_producer(InstId, Client, Config); _ -> + ?tp( + hstreamdb_connector_start_failed, + #{error => client_not_alive} + ), ?SLOG(error, #{ msg => "hstreamdb connector: client not alive", connector => InstId @@ -202,7 +205,7 @@ is_alive(Client) -> start_producer( InstId, Client, - Options = #{stream := Stream, pool_size := PoolSize, egress := #{payload := PayloadBin}} + Options = #{stream := Stream, pool_size := PoolSize} ) -> %% TODO: change these batch options after we have better disk cache. BatchSize = maps:get(batch_size, Options, 100), @@ -212,7 +215,8 @@ start_producer( {callback, {?MODULE, on_flush_result, []}}, {max_records, BatchSize}, {interval, Interval}, - {pool_size, PoolSize} + {pool_size, PoolSize}, + {grpc_timeout, maps:get(grpc_timeout, Options, ?DEFAULT_GRPC_TIMEOUT)} ], Name = produce_name(InstId), ?SLOG(info, #{ @@ -224,16 +228,14 @@ start_producer( ?SLOG(info, #{ msg => "hstreamdb connector: producer started" }), - EnableBatch = maps:get(enable_batch, Options, false), - Payload = emqx_placeholder:preproc_tmpl(PayloadBin), - OrderingKeyBin = maps:get(ordering_key, Options, <<"">>), - OrderingKey = emqx_placeholder:preproc_tmpl(OrderingKeyBin), State = #{ client => Client, producer => Producer, - enable_batch => EnableBatch, - ordering_key => OrderingKey, - payload => Payload + enable_batch => maps:get(enable_batch, Options, false), + partition_key => emqx_placeholder:preproc_tmpl( + maps:get(partition_key, Options, <<"">>) + ), + record_template => record_template(Options) }, {ok, State}; {error, {already_started, Pid}} -> @@ -253,47 +255,53 @@ start_producer( {error, Reason} end. -to_record(OrderingKeyTmpl, PayloadTmpl, Data) -> - OrderingKey = emqx_placeholder:proc_tmpl(OrderingKeyTmpl, Data), - Payload = emqx_placeholder:proc_tmpl(PayloadTmpl, Data), - to_record(OrderingKey, Payload). +to_record(PartitionKeyTmpl, HRecordTmpl, Data) -> + PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data), + RawRecord = emqx_placeholder:proc_tmpl(HRecordTmpl, Data), + to_record(PartitionKey, RawRecord). -to_record(OrderingKey, Payload) when is_binary(OrderingKey) -> - to_record(binary_to_list(OrderingKey), Payload); -to_record(OrderingKey, Payload) -> - hstreamdb:to_record(OrderingKey, raw, Payload). +to_record(PartitionKey, RawRecord) when is_binary(PartitionKey) -> + to_record(binary_to_list(PartitionKey), RawRecord); +to_record(PartitionKey, RawRecord) -> + hstreamdb:to_record(PartitionKey, raw, RawRecord). -do_append(Producer, Record) -> - do_append(false, Producer, Record). +to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) -> + Records0 = lists:map( + fun({send_message, Data}) -> + to_record(PartitionKeyTmpl, HRecordTmpl, Data) + end, + BatchList + ), + PartitionKeys = proplists:get_keys(Records0), + [ + {PartitionKey, proplists:get_all_values(PartitionKey, Records0)} + || PartitionKey <- PartitionKeys + ]. -%% TODO: this append is async, remove or change it after we have better disk cache. -% do_append(true, Producer, Record) -> -% case hstreamdb:append(Producer, Record) of -% ok -> -% ?SLOG(debug, #{ -% msg => "hstreamdb producer async append success", -% record => Record -% }); -% {error, Reason} = Err -> -% ?SLOG(error, #{ -% msg => "hstreamdb producer async append failed", -% reason => Reason, -% record => Record -% }), -% Err -% end; -do_append(false, Producer, Record) -> - %% TODO: this append is sync, but it does not support [Record], can only append one Record. - %% Change it after we have better dick cache. +append_record(Producer, MultiPartsRecords) when is_list(MultiPartsRecords) -> + lists:foreach(fun(Record) -> append_record(Producer, Record) end, MultiPartsRecords); +append_record(Producer, Record) when is_tuple(Record) -> + do_append_records(false, Producer, Record). + +%% TODO: only sync request supported. implement async request later. +do_append_records(false, Producer, Record) -> case hstreamdb:append_flush(Producer, Record) of - {ok, _} -> + {ok, _Result} -> + ?tp( + hstreamdb_connector_query_return, + #{result => _Result} + ), ?SLOG(debug, #{ - msg => "hstreamdb producer sync append success", + msg => "HStreamDB producer sync append success", record => Record }); {error, Reason} = Err -> + ?tp( + hstreamdb_connector_query_return, + #{error => Reason} + ), ?SLOG(error, #{ - msg => "hstreamdb producer sync append failed", + msg => "HStreamDB producer sync append failed", reason => Reason, record => Record }), @@ -306,6 +314,11 @@ client_name(InstId) -> produce_name(ActionId) -> list_to_atom("producer:" ++ to_string(ActionId)). +record_template(#{record_template := RawHRecordTemplate}) -> + emqx_placeholder:preproc_tmpl(RawHRecordTemplate); +record_template(_) -> + emqx_placeholder:preproc_tmpl(<<"${payload}">>). + to_string(List) when is_list(List) -> List; to_string(Bin) when is_binary(Bin) -> binary_to_list(Bin); to_string(Atom) when is_atom(Atom) -> atom_to_list(Atom). diff --git a/changes/ee/feat-10203.en.md b/changes/ee/feat-10203.en.md new file mode 100644 index 000000000..a2ff3b3bb --- /dev/null +++ b/changes/ee/feat-10203.en.md @@ -0,0 +1 @@ +Add HStreamDB bridge support, adapted to the HStreamDB `v0.15.0`. diff --git a/rel/i18n/emqx_bridge_hstreamdb.hocon b/rel/i18n/emqx_bridge_hstreamdb.hocon index d9e7f1561..10700d4eb 100644 --- a/rel/i18n/emqx_bridge_hstreamdb.hocon +++ b/rel/i18n/emqx_bridge_hstreamdb.hocon @@ -6,12 +6,6 @@ config_direction.desc: config_direction.label: """Bridge Direction""" -config_enable.desc: -"""Enable or disable this bridge""" - -config_enable.label: -"""Enable Or Disable Bridge""" - desc_config.desc: """Configuration for an HStreamDB bridge.""" @@ -46,10 +40,10 @@ will be forwarded.""" local_topic.label: """Local Topic""" -payload.desc: -"""The payload to be forwarded to the HStreamDB. Placeholders supported.""" +record_template.desc: +"""The HStream Record template to be forwarded to the HStreamDB. Placeholders supported.""" -payload.label: -"""Payload""" +record_template.label: +"""HStream Record""" } diff --git a/rel/i18n/emqx_bridge_hstreamdb_connector.hocon b/rel/i18n/emqx_bridge_hstreamdb_connector.hocon index 001340e9c..c0faa794c 100644 --- a/rel/i18n/emqx_bridge_hstreamdb_connector.hocon +++ b/rel/i18n/emqx_bridge_hstreamdb_connector.hocon @@ -6,16 +6,34 @@ config.desc: config.label: """Connection config""" +type.desc: +"""The Connector Type.""" + +type.label: +"""Connector Type""" + name.desc: """Connector name, used as a human-readable description of the connector.""" name.label: """Connector Name""" -ordering_key.desc: +url.desc: +"""HStreamDB Server URL""" + +url.label: +"""HStreamDB Server URL""" + +stream_name.desc: +"""HStreamDB Stream Name""" + +stream_name.label: +"""HStreamDB Stream Name""" + +partition_key.desc: """HStreamDB Ordering Key""" -ordering_key.label: +partition_key.label: """HStreamDB Ordering Key""" pool_size.desc: @@ -24,22 +42,10 @@ pool_size.desc: pool_size.label: """HStreamDB Pool Size""" -stream_name.desc: -"""HStreamDB Stream Name""" +grpc_timeout.desc: +"""HStreamDB gRPC Timeout""" -stream_name.label: -"""HStreamDB Stream Name""" - -type.desc: -"""The Connector Type.""" - -type.label: -"""Connector Type""" - -url.desc: -"""HStreamDB Server URL""" - -url.label: -"""HStreamDB Server URL""" +grpc_timeout.label: +"""HStreamDB gRPC Timeout""" } diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index 03587aa54..953b0b762 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -270,6 +270,10 @@ hstream hstreamDB hstream hstreamdb +hrecord +hRecord +Hrecord +HRecord SASL GSSAPI keytab