Merge pull request #8710 from JimMoen/feat-influxdb-async

feat(influxdb): add async callback
This commit is contained in:
JimMoen 2022-08-12 21:08:00 +08:00 committed by GitHub
commit fdd5df93fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 69 additions and 20 deletions

View File

@ -304,7 +304,7 @@ on_query(
end, end,
Result. Result.
on_query_async(InstId, {send_message, Msg}, ReplyFun, State) -> on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
case maps:get(request, State, undefined) of case maps:get(request, State, undefined) of
undefined -> undefined ->
?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}), ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
@ -320,14 +320,14 @@ on_query_async(InstId, {send_message, Msg}, ReplyFun, State) ->
on_query_async( on_query_async(
InstId, InstId,
{undefined, Method, {Path, Headers, Body}, Timeout}, {undefined, Method, {Path, Headers, Body}, Timeout},
ReplyFun, ReplyFunAndArgs,
State State
) )
end; end;
on_query_async( on_query_async(
InstId, InstId,
{KeyOrNum, Method, Request, Timeout}, {KeyOrNum, Method, Request, Timeout},
ReplyFun, ReplyFunAndArgs,
#{pool_name := PoolName, base_path := BasePath} = State #{pool_name := PoolName, base_path := BasePath} = State
) -> ) ->
?TRACE( ?TRACE(
@ -346,7 +346,7 @@ on_query_async(
Method, Method,
NRequest, NRequest,
Timeout, Timeout,
ReplyFun ReplyFunAndArgs
). ).
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->

View File

@ -59,7 +59,7 @@ The auto restart interval after the resource is disconnected, in milliseconds.
zh: """请求模式。可选 '同步/异步',默认为'同步'模式。""" zh: """请求模式。可选 '同步/异步',默认为'同步'模式。"""
} }
label { label {
en: """query_mode""" en: """Query mode"""
zh: """请求模式""" zh: """请求模式"""
} }
} }
@ -70,7 +70,7 @@ The auto restart interval after the resource is disconnected, in milliseconds.
zh: """启用批量模式。""" zh: """启用批量模式。"""
} }
label { label {
en: """enable_batch""" en: """Enable batch"""
zh: """启用批量模式""" zh: """启用批量模式"""
} }
} }
@ -81,7 +81,7 @@ The auto restart interval after the resource is disconnected, in milliseconds.
zh: """启用队列模式。""" zh: """启用队列模式。"""
} }
label { label {
en: """enable_queue""" en: """Enable queue"""
zh: """启用队列模式""" zh: """启用队列模式"""
} }
} }
@ -92,7 +92,7 @@ The auto restart interval after the resource is disconnected, in milliseconds.
zh: """资源不可用时的重试时间""" zh: """资源不可用时的重试时间"""
} }
label { label {
en: """resume_interval""" en: """Resume interval"""
zh: """恢复时间""" zh: """恢复时间"""
} }
} }
@ -103,7 +103,7 @@ The auto restart interval after the resource is disconnected, in milliseconds.
zh: """异步请求飞行队列窗口大小""" zh: """异步请求飞行队列窗口大小"""
} }
label { label {
en: """async_inflight_window""" en: """Async inflight window"""
zh: """异步请求飞行队列窗口""" zh: """异步请求飞行队列窗口"""
} }
} }
@ -114,7 +114,7 @@ The auto restart interval after the resource is disconnected, in milliseconds.
zh: """批量请求大小""" zh: """批量请求大小"""
} }
label { label {
en: """batch_size""" en: """Batch size"""
zh: """批量请求大小""" zh: """批量请求大小"""
} }
} }
@ -125,7 +125,7 @@ The auto restart interval after the resource is disconnected, in milliseconds.
zh: """最大批量请求等待时间。""" zh: """最大批量请求等待时间。"""
} }
label { label {
en: """batch_time""" en: """Batch time"""
zh: """批量等待间隔""" zh: """批量等待间隔"""
} }
} }
@ -136,7 +136,7 @@ The auto restart interval after the resource is disconnected, in milliseconds.
zh: """消息队列的最大长度,以字节计。""" zh: """消息队列的最大长度,以字节计。"""
} }
label { label {
en: """queue_max_bytes""" en: """Queue max bytes"""
zh: """队列最大长度""" zh: """队列最大长度"""
} }
} }

View File

@ -76,16 +76,35 @@
| {resource_down, term()}. | {resource_down, term()}.
-define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). -define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024).
-define(DEFAULT_QUEUE_SIZE_RAW, <<"1GB">>).
%% count
-define(DEFAULT_BATCH_SIZE, 100). -define(DEFAULT_BATCH_SIZE, 100).
%% milliseconds
-define(DEFAULT_BATCH_TIME, 10). -define(DEFAULT_BATCH_TIME, 10).
-define(DEFAULT_BATCH_TIME_RAW, <<"10ms">>).
%% count
-define(DEFAULT_INFLIGHT, 100). -define(DEFAULT_INFLIGHT, 100).
%% milliseconds
-define(HEALTHCHECK_INTERVAL, 15000). -define(HEALTHCHECK_INTERVAL, 15000).
-define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>). -define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>).
%% milliseconds
-define(RESUME_INTERVAL, 15000). -define(RESUME_INTERVAL, 15000).
-define(RESUME_INTERVAL_RAW, <<"15s">>).
-define(START_AFTER_CREATED, true). -define(START_AFTER_CREATED, true).
%% milliseconds
-define(START_TIMEOUT, 5000). -define(START_TIMEOUT, 5000).
-define(START_TIMEOUT_RAW, <<"5s">>). -define(START_TIMEOUT_RAW, <<"5s">>).
%% milliseconds
-define(AUTO_RESTART_INTERVAL, 60000). -define(AUTO_RESTART_INTERVAL, 60000).
-define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>). -define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>).
-define(TEST_ID_PREFIX, "_test_:"). -define(TEST_ID_PREFIX, "_test_:").
-define(RES_METRICS, resource_metrics). -define(RES_METRICS, resource_metrics).

View File

@ -114,6 +114,7 @@
-optional_callbacks([ -optional_callbacks([
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,
on_query_async/4,
on_get_status/2 on_get_status/2
]). ]).
@ -130,6 +131,13 @@
%% when calling emqx_resource:on_batch_query/3 %% when calling emqx_resource:on_batch_query/3
-callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result(). -callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result().
-callback on_query_async(
resource_id(),
Request :: term(),
{ReplyFun :: function(), Args :: list()},
resource_state()
) -> query_result().
%% when calling emqx_resource:health_check/2 %% when calling emqx_resource:health_check/2
-callback on_get_status(resource_id(), resource_state()) -> -callback on_get_status(resource_id(), resource_state()) ->
resource_status() resource_status()

View File

@ -90,7 +90,7 @@ enable_queue(_) -> undefined.
resume_interval(type) -> emqx_schema:duration_ms(); resume_interval(type) -> emqx_schema:duration_ms();
resume_interval(desc) -> ?DESC("resume_interval"); resume_interval(desc) -> ?DESC("resume_interval");
resume_interval(default) -> ?RESUME_INTERVAL; resume_interval(default) -> ?RESUME_INTERVAL_RAW;
resume_interval(required) -> false; resume_interval(required) -> false;
resume_interval(_) -> undefined. resume_interval(_) -> undefined.
@ -108,12 +108,12 @@ batch_size(_) -> undefined.
batch_time(type) -> emqx_schema:duration_ms(); batch_time(type) -> emqx_schema:duration_ms();
batch_time(desc) -> ?DESC("batch_time"); batch_time(desc) -> ?DESC("batch_time");
batch_time(default) -> ?DEFAULT_BATCH_TIME; batch_time(default) -> ?DEFAULT_BATCH_TIME_RAW;
batch_time(required) -> false; batch_time(required) -> false;
batch_time(_) -> undefined. batch_time(_) -> undefined.
queue_max_bytes(type) -> emqx_schema:bytesize(); queue_max_bytes(type) -> emqx_schema:bytesize();
queue_max_bytes(desc) -> ?DESC("queue_max_bytes"); queue_max_bytes(desc) -> ?DESC("queue_max_bytes");
queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE; queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW;
queue_max_bytes(required) -> false; queue_max_bytes(required) -> false;
queue_max_bytes(_) -> undefined. queue_max_bytes(_) -> undefined.

View File

@ -40,7 +40,7 @@ TLDR:
""" """
} }
label { label {
en: "write_syntax" en: "Write Syntax"
zh: "写语句" zh: "写语句"
} }
} }
@ -90,7 +90,7 @@ TLDR:
desc_name { desc_name {
desc { desc {
en: """Bridge name, used as a human-readable description of the bridge.""" en: """Bridge name, used as a human-readable description of the bridge."""
zh: """桥接名字,可读描述""" zh: """桥接名字,人类可读描述信息。"""
} }
label { label {
en: "Bridge Name" en: "Bridge Name"

View File

@ -1,7 +1,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}, {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.3"}}} {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.4"}}}
]}. ]}.
{shell, [ {shell, [

View File

@ -18,6 +18,7 @@
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,
on_query_async/4,
on_get_status/2 on_get_status/2
]). ]).
@ -62,6 +63,20 @@ on_batch_query(InstId, BatchData, State = #{write_syntax := SyntaxLines, client
{resource_down, disconnected} {resource_down, disconnected}
end. end.
on_query_async(
InstId,
{send_message, Data},
{ReplayFun, Args},
_State = #{write_syntax := SyntaxLines, client := Client}
) ->
case data_to_points(Data, SyntaxLines) of
{ok, Points} ->
do_async_query(InstId, Client, Points, {ReplayFun, Args});
{error, ErrorPoints} = Err ->
log_error_points(InstId, ErrorPoints),
Err
end.
on_get_status(_InstId, #{client := Client}) -> on_get_status(_InstId, #{client := Client}) ->
case influxdb:is_alive(Client) of case influxdb:is_alive(Client) of
true -> true ->
@ -331,7 +346,6 @@ ssl_config(SSL = #{enable := true}) ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Query %% Query
do_query(InstId, Client, Points) -> do_query(InstId, Client, Points) ->
case influxdb:write(Client, Points) of case influxdb:write(Client, Points) of
ok -> ok ->
@ -349,6 +363,14 @@ do_query(InstId, Client, Points) ->
Err Err
end. end.
do_async_query(InstId, Client, Points, ReplayFunAndArgs) ->
?SLOG(info, #{
msg => "influxdb write point async",
connector => InstId,
points => Points
}),
ok = influxdb:write_async(Client, Points, ReplayFunAndArgs).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Tags & Fields Config Trans %% Tags & Fields Config Trans