From 3678673124ec72aabd3564386c0cab0374fb4a57 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 12 Aug 2022 10:44:47 +0800 Subject: [PATCH 1/5] fix: schema default value using raw type before convert --- apps/emqx_resource/include/emqx_resource.hrl | 19 +++++++++++++++++++ .../src/schema/emqx_resource_schema.erl | 6 +++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 190c278ae..8ec57a00e 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -76,16 +76,35 @@ | {resource_down, term()}. -define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). +-define(DEFAULT_QUEUE_SIZE_RAW, <<"1GB">>). + +%% count -define(DEFAULT_BATCH_SIZE, 100). + +%% milliseconds -define(DEFAULT_BATCH_TIME, 10). +-define(DEFAULT_BATCH_TIME_RAW, <<"10ms">>). + +%% count -define(DEFAULT_INFLIGHT, 100). + +%% milliseconds -define(HEALTHCHECK_INTERVAL, 15000). -define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>). + +%% milliseconds -define(RESUME_INTERVAL, 15000). +-define(RESUME_INTERVAL_RAW, <<"15s">>). + -define(START_AFTER_CREATED, true). + +%% milliseconds -define(START_TIMEOUT, 5000). -define(START_TIMEOUT_RAW, <<"5s">>). + +%% milliseconds -define(AUTO_RESTART_INTERVAL, 60000). -define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>). + -define(TEST_ID_PREFIX, "_test_:"). -define(RES_METRICS, resource_metrics). diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index ccc31a707..6111543d2 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -90,7 +90,7 @@ enable_queue(_) -> undefined. resume_interval(type) -> emqx_schema:duration_ms(); resume_interval(desc) -> ?DESC("resume_interval"); -resume_interval(default) -> ?RESUME_INTERVAL; +resume_interval(default) -> ?RESUME_INTERVAL_RAW; resume_interval(required) -> false; resume_interval(_) -> undefined. @@ -108,12 +108,12 @@ batch_size(_) -> undefined. batch_time(type) -> emqx_schema:duration_ms(); batch_time(desc) -> ?DESC("batch_time"); -batch_time(default) -> ?DEFAULT_BATCH_TIME; +batch_time(default) -> ?DEFAULT_BATCH_TIME_RAW; batch_time(required) -> false; batch_time(_) -> undefined. queue_max_bytes(type) -> emqx_schema:bytesize(); queue_max_bytes(desc) -> ?DESC("queue_max_bytes"); -queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE; +queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW; queue_max_bytes(required) -> false; queue_max_bytes(_) -> undefined. From fa5e8f142291547e901e42b87b31c016fe4ba45c Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 12 Aug 2022 11:31:21 +0800 Subject: [PATCH 2/5] chore: refine i18n label --- .../i18n/emqx_resource_schema_i18n.conf | 16 ++++++++-------- .../i18n/emqx_ee_bridge_influxdb.conf | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 3ec170ebf..aa6579bbd 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -59,7 +59,7 @@ The auto restart interval after the resource is disconnected, in milliseconds. zh: """请求模式。可选 '同步/异步',默认为'同步'模式。""" } label { - en: """query_mode""" + en: """Query mode""" zh: """请求模式""" } } @@ -70,7 +70,7 @@ The auto restart interval after the resource is disconnected, in milliseconds. zh: """启用批量模式。""" } label { - en: """enable_batch""" + en: """Enable batch""" zh: """启用批量模式""" } } @@ -81,7 +81,7 @@ The auto restart interval after the resource is disconnected, in milliseconds. zh: """启用队列模式。""" } label { - en: """enable_queue""" + en: """Enable queue""" zh: """启用队列模式""" } } @@ -92,7 +92,7 @@ The auto restart interval after the resource is disconnected, in milliseconds. zh: """资源不可用时的重试时间""" } label { - en: """resume_interval""" + en: """Resume interval""" zh: """恢复时间""" } } @@ -103,7 +103,7 @@ The auto restart interval after the resource is disconnected, in milliseconds. zh: """异步请求飞行队列窗口大小""" } label { - en: """async_inflight_window""" + en: """Async inflight window""" zh: """异步请求飞行队列窗口""" } } @@ -114,7 +114,7 @@ The auto restart interval after the resource is disconnected, in milliseconds. zh: """批量请求大小""" } label { - en: """batch_size""" + en: """Batch size""" zh: """批量请求大小""" } } @@ -125,7 +125,7 @@ The auto restart interval after the resource is disconnected, in milliseconds. zh: """最大批量请求等待时间。""" } label { - en: """batch_time""" + en: """Batch time""" zh: """批量等待间隔""" } } @@ -136,7 +136,7 @@ The auto restart interval after the resource is disconnected, in milliseconds. zh: """消息队列的最大长度,以字节计。""" } label { - en: """queue_max_bytes""" + en: """Queue max bytes""" zh: """队列最大长度""" } } diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf index ffd0b66a0..412922408 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf @@ -40,7 +40,7 @@ TLDR: """ } label { - en: "write_syntax" + en: "Write Syntax" zh: "写语句" } } @@ -57,7 +57,7 @@ TLDR: config_direction { desc { en: """The direction of this bridge, MUST be 'egress'""" - zh: """桥接的方向, 必须是 egress""" + zh: """桥接的方向,必须是 egress""" } label { en: "Bridge Direction" @@ -90,7 +90,7 @@ TLDR: desc_name { desc { en: """Bridge name, used as a human-readable description of the bridge.""" - zh: """桥接名字,可读描述""" + zh: """桥接名字,人类可读的描述信息。""" } label { en: "Bridge Name" From dc7953c3e10e1c0dc792389fd15c45300ad4b764 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 12 Aug 2022 17:28:56 +0800 Subject: [PATCH 3/5] chore: refine async query variable name --- apps/emqx_connector/src/emqx_connector_http.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 11030fef6..b7fd21c5f 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -304,7 +304,7 @@ on_query( end, Result. -on_query_async(InstId, {send_message, Msg}, ReplyFun, State) -> +on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> case maps:get(request, State, undefined) of undefined -> ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}), @@ -320,14 +320,14 @@ on_query_async(InstId, {send_message, Msg}, ReplyFun, State) -> on_query_async( InstId, {undefined, Method, {Path, Headers, Body}, Timeout}, - ReplyFun, + ReplyFunAndArgs, State ) end; on_query_async( InstId, {KeyOrNum, Method, Request, Timeout}, - ReplyFun, + ReplyFunAndArgs, #{pool_name := PoolName, base_path := BasePath} = State ) -> ?TRACE( @@ -346,7 +346,7 @@ on_query_async( Method, NRequest, Timeout, - ReplyFun + ReplyFunAndArgs ). on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> From 441d8c9d5750674db210e32fd188fcd2b3c59970 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 12 Aug 2022 17:44:38 +0800 Subject: [PATCH 4/5] chore: bump `influxdb-client-erl` vsn --- lib-ee/emqx_ee_connector/rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index 485b0120d..5963b7ab0 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -1,7 +1,7 @@ {erl_opts, [debug_info]}. {deps, [ {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}, - {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.3"}}} + {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.4"}}} ]}. {shell, [ From 594d071c05ed243e1c6fb016fdd2e73e06f5dbf2 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 12 Aug 2022 17:45:20 +0800 Subject: [PATCH 5/5] feat(influxdb): add async callback --- apps/emqx_resource/src/emqx_resource.erl | 8 +++++++ .../src/emqx_ee_connector_influxdb.erl | 24 ++++++++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 60f0dd360..b79650904 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -114,6 +114,7 @@ -optional_callbacks([ on_query/3, on_batch_query/3, + on_query_async/4, on_get_status/2 ]). @@ -130,6 +131,13 @@ %% when calling emqx_resource:on_batch_query/3 -callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result(). +-callback on_query_async( + resource_id(), + Request :: term(), + {ReplyFun :: function(), Args :: list()}, + resource_state() +) -> query_result(). + %% when calling emqx_resource:health_check/2 -callback on_get_status(resource_id(), resource_state()) -> resource_status() diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 0a2bbb638..d0c17b6d5 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -18,6 +18,7 @@ on_stop/2, on_query/3, on_batch_query/3, + on_query_async/4, on_get_status/2 ]). @@ -62,6 +63,20 @@ on_batch_query(InstId, BatchData, State = #{write_syntax := SyntaxLines, client {resource_down, disconnected} end. +on_query_async( + InstId, + {send_message, Data}, + {ReplayFun, Args}, + _State = #{write_syntax := SyntaxLines, client := Client} +) -> + case data_to_points(Data, SyntaxLines) of + {ok, Points} -> + do_async_query(InstId, Client, Points, {ReplayFun, Args}); + {error, ErrorPoints} = Err -> + log_error_points(InstId, ErrorPoints), + Err + end. + on_get_status(_InstId, #{client := Client}) -> case influxdb:is_alive(Client) of true -> @@ -331,7 +346,6 @@ ssl_config(SSL = #{enable := true}) -> %% ------------------------------------------------------------------------------------------------- %% Query - do_query(InstId, Client, Points) -> case influxdb:write(Client, Points) of ok -> @@ -349,6 +363,14 @@ do_query(InstId, Client, Points) -> Err end. +do_async_query(InstId, Client, Points, ReplayFunAndArgs) -> + ?SLOG(info, #{ + msg => "influxdb write point async", + connector => InstId, + points => Points + }), + ok = influxdb:write_async(Client, Points, ReplayFunAndArgs). + %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Config Trans