From aa7def425decb24d0de557e1d39c324fe0db5bdf Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 28 Feb 2024 08:45:06 +0800 Subject: [PATCH] fix: get grpc_timeout from hstreamdb connector config failed --- .../include/emqx_bridge_hstreamdb.hrl | 9 +++++++++ .../src/emqx_bridge_hstreamdb.erl | 19 +++++++++++-------- .../src/emqx_bridge_hstreamdb_connector.erl | 15 +++------------ 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/apps/emqx_bridge_hstreamdb/include/emqx_bridge_hstreamdb.hrl b/apps/emqx_bridge_hstreamdb/include/emqx_bridge_hstreamdb.hrl index 72f1edcea..047b1f297 100644 --- a/apps/emqx_bridge_hstreamdb/include/emqx_bridge_hstreamdb.hrl +++ b/apps/emqx_bridge_hstreamdb/include/emqx_bridge_hstreamdb.hrl @@ -3,3 +3,12 @@ %%-------------------------------------------------------------------- -define(HSTREAMDB_DEFAULT_PORT, 6570). + +-define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>). +-define(DEFAULT_GRPC_FLUSH_TIMEOUT, 10000). +-define(DEFAULT_MAX_BATCHES, 500). +-define(DEFAULT_BATCH_INTERVAL, 500). +-define(DEFAULT_BATCH_INTERVAL_RAW, <<"500ms">>). +-define(DEFAULT_AGG_POOL_SIZE, 8). +-define(DEFAULT_WRITER_POOL_SIZE, 8). +-define(DEFAULT_GRPC_FLUSH_TIMEOUT_RAW, <<"10s">>). diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl index d7e30581b..d66077171 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl @@ -5,6 +5,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_bridge_hstreamdb.hrl"). -import(hoconsc, [mk/2, enum/1]). @@ -23,8 +24,6 @@ -define(CONNECTOR_TYPE, hstreamdb). -define(ACTION_TYPE, ?CONNECTOR_TYPE). --define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>). --define(DEFAULT_GRPC_FLUSH_TIMEOUT_RAW, <<"10s">>). %% ------------------------------------------------------------------------------------------------- %% api @@ -113,8 +112,8 @@ action_values() -> <<"partition_key">> => <<"hej">>, <<"record_template">> => <<"${payload}">>, <<"stream">> => <<"mqtt_message">>, - <<"aggregation_pool_size">> => 8, - <<"writer_pool_size">> => 8 + <<"aggregation_pool_size">> => ?DEFAULT_AGG_POOL_SIZE, + <<"writer_pool_size">> => ?DEFAULT_WRITER_POOL_SIZE } }. @@ -174,13 +173,17 @@ fields(action_parameters) -> {record_template, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})}, {aggregation_pool_size, - mk(integer(), #{default => 8, desc => ?DESC("aggregation_pool_size")})}, - {max_batches, mk(integer(), #{default => 500, desc => ?DESC("max_batches")})}, - {writer_pool_size, mk(integer(), #{default => 8, desc => ?DESC("writer_pool_size")})}, + mk(integer(), #{ + default => ?DEFAULT_AGG_POOL_SIZE, desc => ?DESC("aggregation_pool_size") + })}, + {max_batches, + mk(integer(), #{default => ?DEFAULT_MAX_BATCHES, desc => ?DESC("max_batches")})}, + {writer_pool_size, + mk(integer(), #{default => ?DEFAULT_WRITER_POOL_SIZE, desc => ?DESC("writer_pool_size")})}, {batch_size, mk(integer(), #{default => 100, desc => ?DESC("batch_size")})}, {batch_interval, mk(emqx_schema:timeout_duration_ms(), #{ - default => <<"500ms">>, desc => ?DESC("batch_interval") + default => ?DEFAULT_BATCH_INTERVAL_RAW, desc => ?DESC("batch_interval") })} ]; fields(connector_fields) -> diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index 26e66ad3e..cf195fc9f 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -8,6 +8,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include("emqx_bridge_hstreamdb.hrl"). -import(hoconsc, [mk/2]). @@ -41,14 +42,6 @@ %% Allocatable resources -define(hstreamdb_client, hstreamdb_client). --define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)). --define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>). --define(DEFAULT_GRPC_FLUSH_TIMEOUT, 10000). --define(DEFAULT_MAX_BATCHES, 500). --define(DEFAULT_BATCH_INTERVAL, 500). --define(DEFAULT_AGG_POOL_SIZE, 8). --define(DEFAULT_WRITER_POOL_SIZE, 8). - %% ------------------------------------------------------------------------------------------------- %% resource callback callback_mode() -> always_sync. @@ -243,11 +236,9 @@ do_on_start(InstId, Config) -> {error, {connect_failed, Error}} end. -client_options(Config = #{url := ServerURL, ssl := SSL}) -> - GRPCTimeout = maps:get(<<"grpc_timeout">>, Config, ?DEFAULT_GRPC_TIMEOUT), - EnableSSL = maps:get(enable, SSL), +client_options(#{url := ServerURL, ssl := SSL, grpc_timeout := GRPCTimeout}) -> RpcOpts = - case EnableSSL of + case maps:get(enable, SSL) of false -> #{pool_size => 1}; true ->