Merge pull request #8693 from JimMoen/feat-influxdb-batch-query

Feat influxdb batch query
This commit is contained in:
Xinyu Liu 2022-08-12 00:22:23 +08:00 committed by GitHub
commit a05cc20db7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 437 additions and 129 deletions

View File

@ -49,14 +49,14 @@
-export([get_basic_usage_info/0]). -export([get_basic_usage_info/0]).
load() -> load() ->
%% set wait_for_resource_ready => 0 to start resources async
Opts = #{auto_retry_interval => 60000, wait_for_resource_ready => 0},
Bridges = emqx:get_config([bridges], #{}), Bridges = emqx:get_config([bridges], #{}),
lists:foreach( lists:foreach(
fun({Type, NamedConf}) -> fun({Type, NamedConf}) ->
lists:foreach( lists:foreach(
fun({Name, Conf}) -> fun({Name, Conf}) ->
safe_load_bridge(Type, Name, Conf, Opts) %% fetch opts for `emqx_resource_worker`
ResOpts = emqx_resource:fetch_creation_opts(Conf),
safe_load_bridge(Type, Name, Conf, ResOpts)
end, end,
maps:to_list(NamedConf) maps:to_list(NamedConf)
) )
@ -171,9 +171,9 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
diff_confs(NewConf, OldConf), diff_confs(NewConf, OldConf),
%% The config update will be failed if any task in `perform_bridge_changes` failed. %% The config update will be failed if any task in `perform_bridge_changes` failed.
Result = perform_bridge_changes([ Result = perform_bridge_changes([
{fun emqx_bridge_resource:remove/3, Removed}, {fun emqx_bridge_resource:remove/4, Removed},
{fun emqx_bridge_resource:create/3, Added}, {fun emqx_bridge_resource:create/4, Added},
{fun emqx_bridge_resource:update/3, Updated} {fun emqx_bridge_resource:update/4, Updated}
]), ]),
ok = unload_hook(), ok = unload_hook(),
ok = load_hook(NewConf), ok = load_hook(NewConf),
@ -260,8 +260,16 @@ perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) ->
fun fun
({_Type, _Name}, _Conf, {error, Reason}) -> ({_Type, _Name}, _Conf, {error, Reason}) ->
{error, Reason}; {error, Reason};
%% for emqx_bridge_resource:update/4
({Type, Name}, {OldConf, Conf}, _) ->
ResOpts = emqx_resource:fetch_creation_opts(Conf),
case Action(Type, Name, {OldConf, Conf}, ResOpts) of
{error, Reason} -> {error, Reason};
Return -> Return
end;
({Type, Name}, Conf, _) -> ({Type, Name}, Conf, _) ->
case Action(Type, Name, Conf) of ResOpts = emqx_resource:fetch_creation_opts(Conf),
case Action(Type, Name, Conf, ResOpts) of
{error, Reason} -> {error, Reason}; {error, Reason} -> {error, Reason};
Return -> Return Return -> Return
end end

View File

@ -34,9 +34,10 @@
create_dry_run/2, create_dry_run/2,
remove/1, remove/1,
remove/2, remove/2,
remove/3, remove/4,
update/2, update/2,
update/3, update/3,
update/4,
stop/2, stop/2,
restart/2, restart/2,
reset_metrics/1 reset_metrics/1
@ -111,6 +112,9 @@ update(BridgeId, {OldConf, Conf}) ->
update(BridgeType, BridgeName, {OldConf, Conf}). update(BridgeType, BridgeName, {OldConf, Conf}).
update(Type, Name, {OldConf, Conf}) -> update(Type, Name, {OldConf, Conf}) ->
update(Type, Name, {OldConf, Conf}, #{}).
update(Type, Name, {OldConf, Conf}, Opts) ->
%% TODO: sometimes its not necessary to restart the bridge connection. %% TODO: sometimes its not necessary to restart the bridge connection.
%% %%
%% - if the connection related configs like `servers` is updated, we should restart/start %% - if the connection related configs like `servers` is updated, we should restart/start
@ -127,7 +131,7 @@ update(Type, Name, {OldConf, Conf}) ->
name => Name, name => Name,
config => Conf config => Conf
}), }),
case recreate(Type, Name, Conf) of case recreate(Type, Name, Conf, Opts) of
{ok, _} -> {ok, _} ->
maybe_disable_bridge(Type, Name, Conf); maybe_disable_bridge(Type, Name, Conf);
{error, not_found} -> {error, not_found} ->
@ -137,7 +141,7 @@ update(Type, Name, {OldConf, Conf}) ->
name => Name, name => Name,
config => Conf config => Conf
}), }),
create(Type, Name, Conf); create(Type, Name, Conf, Opts);
{error, Reason} -> {error, Reason} ->
{error, {update_bridge_failed, Reason}} {error, {update_bridge_failed, Reason}}
end; end;
@ -158,11 +162,14 @@ recreate(Type, Name) ->
recreate(Type, Name, emqx:get_config([bridges, Type, Name])). recreate(Type, Name, emqx:get_config([bridges, Type, Name])).
recreate(Type, Name, Conf) -> recreate(Type, Name, Conf) ->
recreate(Type, Name, Conf, #{}).
recreate(Type, Name, Conf, Opts) ->
emqx_resource:recreate_local( emqx_resource:recreate_local(
resource_id(Type, Name), resource_id(Type, Name),
bridge_to_resource_type(Type), bridge_to_resource_type(Type),
parse_confs(Type, Name, Conf), parse_confs(Type, Name, Conf),
#{auto_retry_interval => 60000} Opts#{auto_retry_interval => 60000}
). ).
create_dry_run(Type, Conf) -> create_dry_run(Type, Conf) ->
@ -186,13 +193,13 @@ create_dry_run(Type, Conf) ->
remove(BridgeId) -> remove(BridgeId) ->
{BridgeType, BridgeName} = parse_bridge_id(BridgeId), {BridgeType, BridgeName} = parse_bridge_id(BridgeId),
remove(BridgeType, BridgeName, #{}). remove(BridgeType, BridgeName, #{}, #{}).
remove(Type, Name) -> remove(Type, Name) ->
remove(Type, Name, undefined). remove(Type, Name, #{}, #{}).
%% just for perform_bridge_changes/1 %% just for perform_bridge_changes/1
remove(Type, Name, _Conf) -> remove(Type, Name, _Conf, _Opts) ->
?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
case emqx_resource:remove_local(resource_id(Type, Name)) of case emqx_resource:remove_local(resource_id(Type, Name)) of
ok -> ok; ok -> ok;

View File

@ -656,6 +656,13 @@ typename_to_spec("file()", _Mod) ->
#{type => string, example => <<"/path/to/file">>}; #{type => string, example => <<"/path/to/file">>};
typename_to_spec("ip_port()", _Mod) -> typename_to_spec("ip_port()", _Mod) ->
#{type => string, example => <<"127.0.0.1:80">>}; #{type => string, example => <<"127.0.0.1:80">>};
typename_to_spec("write_syntax()", _Mod) ->
#{
type => string,
example =>
<<"${topic},clientid=${clientid}", " ", "payload=${payload},",
"${clientid}_int_value=${payload.int_key}i,", "bool=${payload.bool}">>
};
typename_to_spec("ip_ports()", _Mod) -> typename_to_spec("ip_ports()", _Mod) ->
#{type => string, example => <<"127.0.0.1:80, 127.0.0.2:80">>}; #{type => string, example => <<"127.0.0.1:80, 127.0.0.2:80">>};
typename_to_spec("url()", _Mod) -> typename_to_spec("url()", _Mod) ->

View File

@ -0,0 +1,91 @@
emqx_resource_schema {
query_mode {
desc {
en: """Query mode. Optional 'sync/async', default 'sync'."""
zh: """请求模式。可选 '同步/异步',默认为'同步'模式。"""
}
label {
en: """query_mode"""
zh: """query_mode"""
}
}
enable_batch {
desc {
en: """Batch mode enabled."""
zh: """启用批量模式。"""
}
label {
en: """enable_batch"""
zh: """启用批量模式"""
}
}
enable_queue {
desc {
en: """Queue mode enabled."""
zh: """启用队列模式。"""
}
label {
en: """enable_queue"""
zh: """启用队列模式"""
}
}
resume_interval {
desc {
en: """Resume time interval when resource down."""
zh: """资源不可用时的重试时间"""
}
label {
en: """resume_interval"""
zh: """恢复时间"""
}
}
async_inflight_window {
desc {
en: """Async queyr inflight window."""
zh: """异步请求飞行队列窗口大小"""
}
label {
en: """async_inflight_window"""
zh: """异步请求飞行队列窗口"""
}
}
batch_size {
desc {
en: """Maximum batch count."""
zh: """批量请求大小"""
}
label {
en: """batch_size"""
zh: """批量请求大小"""
}
}
batch_time {
desc {
en: """Maximum batch waiting interval."""
zh: """最大批量请求等待时间。"""
}
label {
en: """batch_time"""
zh: """批量等待间隔"""
}
}
queue_max_bytes {
desc {
en: """Maximum queue storage size in bytes."""
zh: """消息队列的最大长度,以字节计。"""
}
label {
en: """queue_max_bytes"""
zh: """队列最大长度"""
}
}
}

View File

@ -40,7 +40,7 @@
metrics := emqx_metrics_worker:metrics() metrics := emqx_metrics_worker:metrics()
}. }.
-type resource_group() :: binary(). -type resource_group() :: binary().
-type create_opts() :: #{ -type creation_opts() :: #{
health_check_interval => integer(), health_check_interval => integer(),
health_check_timeout => integer(), health_check_timeout => integer(),
%% We can choose to block the return of emqx_resource:start until %% We can choose to block the return of emqx_resource:start until
@ -52,7 +52,15 @@
start_after_created => boolean(), start_after_created => boolean(),
%% If the resource disconnected, we can set to retry starting the resource %% If the resource disconnected, we can set to retry starting the resource
%% periodically. %% periodically.
auto_retry_interval => integer() auto_retry_interval => integer(),
enable_batch => boolean(),
batch_size => integer(),
batch_time => integer(),
enable_queue => boolean(),
queue_max_bytes => integer(),
query_mode => async | sync | dynamic,
resume_interval => integer(),
async_inflight_window => integer()
}. }.
-type query_result() :: -type query_result() ::
ok ok
@ -60,5 +68,17 @@
| {error, term()} | {error, term()}
| {resource_down, term()}. | {resource_down, term()}.
%% count
-define(DEFAULT_BATCH_SIZE, 100).
%% milliseconds
-define(DEFAULT_BATCH_TIME, 10).
%% bytes
-define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024).
-define(DEFAULT_INFLIGHT, 100).
-define(RESUME_INTERVAL, 15000).
-define(TEST_ID_PREFIX, "_test_:"). -define(TEST_ID_PREFIX, "_test_:").
-define(RES_METRICS, resource_metrics). -define(RES_METRICS, resource_metrics).

View File

@ -102,6 +102,7 @@
list_instances_verbose/0, list_instances_verbose/0,
%% return the data of the instance %% return the data of the instance
get_instance/1, get_instance/1,
fetch_creation_opts/1,
%% return all the instances of the same resource type %% return all the instances of the same resource type
list_instances_by_type/1, list_instances_by_type/1,
generate_id/1, generate_id/1,
@ -126,6 +127,7 @@
%% when calling emqx_resource:query/3 %% when calling emqx_resource:query/3
-callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result(). -callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result().
%% when calling emqx_resource:on_batch_query/3
-callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result(). -callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result().
%% when calling emqx_resource:health_check/2 %% when calling emqx_resource:health_check/2
@ -158,7 +160,7 @@ is_resource_mod(Module) ->
create(ResId, Group, ResourceType, Config) -> create(ResId, Group, ResourceType, Config) ->
create(ResId, Group, ResourceType, Config, #{}). create(ResId, Group, ResourceType, Config, #{}).
-spec create(resource_id(), resource_group(), resource_type(), resource_config(), create_opts()) -> -spec create(resource_id(), resource_group(), resource_type(), resource_config(), creation_opts()) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}. {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create(ResId, Group, ResourceType, Config, Opts) -> create(ResId, Group, ResourceType, Config, Opts) ->
emqx_resource_proto_v1:create(ResId, Group, ResourceType, Config, Opts). emqx_resource_proto_v1:create(ResId, Group, ResourceType, Config, Opts).
@ -174,7 +176,7 @@ create_local(ResId, Group, ResourceType, Config) ->
resource_group(), resource_group(),
resource_type(), resource_type(),
resource_config(), resource_config(),
create_opts() creation_opts()
) -> ) ->
{ok, resource_data()}. {ok, resource_data()}.
create_local(ResId, Group, ResourceType, Config, Opts) -> create_local(ResId, Group, ResourceType, Config, Opts) ->
@ -195,7 +197,7 @@ create_dry_run_local(ResourceType, Config) ->
recreate(ResId, ResourceType, Config) -> recreate(ResId, ResourceType, Config) ->
recreate(ResId, ResourceType, Config, #{}). recreate(ResId, ResourceType, Config, #{}).
-spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) -> -spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) ->
{ok, resource_data()} | {error, Reason :: term()}. {ok, resource_data()} | {error, Reason :: term()}.
recreate(ResId, ResourceType, Config, Opts) -> recreate(ResId, ResourceType, Config, Opts) ->
emqx_resource_proto_v1:recreate(ResId, ResourceType, Config, Opts). emqx_resource_proto_v1:recreate(ResId, ResourceType, Config, Opts).
@ -205,7 +207,7 @@ recreate(ResId, ResourceType, Config, Opts) ->
recreate_local(ResId, ResourceType, Config) -> recreate_local(ResId, ResourceType, Config) ->
recreate_local(ResId, ResourceType, Config, #{}). recreate_local(ResId, ResourceType, Config, #{}).
-spec recreate_local(resource_id(), resource_type(), resource_config(), create_opts()) -> -spec recreate_local(resource_id(), resource_type(), resource_config(), creation_opts()) ->
{ok, resource_data()} | {error, Reason :: term()}. {ok, resource_data()} | {error, Reason :: term()}.
recreate_local(ResId, ResourceType, Config, Opts) -> recreate_local(ResId, ResourceType, Config, Opts) ->
emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts). emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts).
@ -248,7 +250,7 @@ simple_async_query(ResId, Request, ReplyFun) ->
start(ResId) -> start(ResId) ->
start(ResId, #{}). start(ResId, #{}).
-spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. -spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
start(ResId, Opts) -> start(ResId, Opts) ->
emqx_resource_manager:start(ResId, Opts). emqx_resource_manager:start(ResId, Opts).
@ -256,7 +258,7 @@ start(ResId, Opts) ->
restart(ResId) -> restart(ResId) ->
restart(ResId, #{}). restart(ResId, #{}).
-spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. -spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
restart(ResId, Opts) -> restart(ResId, Opts) ->
emqx_resource_manager:restart(ResId, Opts). emqx_resource_manager:restart(ResId, Opts).
@ -276,6 +278,25 @@ set_resource_status_connecting(ResId) ->
get_instance(ResId) -> get_instance(ResId) ->
emqx_resource_manager:lookup(ResId). emqx_resource_manager:lookup(ResId).
-spec fetch_creation_opts(map()) -> creation_opts().
fetch_creation_opts(Opts) ->
SupportedOpts = [
health_check_interval,
health_check_timeout,
wait_for_resource_ready,
start_after_created,
auto_retry_interval,
enable_batch,
batch_size,
batch_time,
enable_queue,
queue_max_bytes,
query_mode,
resume_interval,
async_inflight_window
],
maps:with(SupportedOpts, Opts).
-spec list_instances() -> [resource_id()]. -spec list_instances() -> [resource_id()].
list_instances() -> list_instances() ->
[Id || #{id := Id} <- list_instances_verbose()]. [Id || #{id := Id} <- list_instances_verbose()].
@ -340,7 +361,7 @@ check_and_create(ResId, Group, ResourceType, RawConfig) ->
resource_group(), resource_group(),
resource_type(), resource_type(),
raw_resource_config(), raw_resource_config(),
create_opts() creation_opts()
) -> ) ->
{ok, resource_data() | 'already_created'} | {error, term()}. {ok, resource_data() | 'already_created'} | {error, term()}.
check_and_create(ResId, Group, ResourceType, RawConfig, Opts) -> check_and_create(ResId, Group, ResourceType, RawConfig, Opts) ->
@ -365,7 +386,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig) ->
resource_group(), resource_group(),
resource_type(), resource_type(),
raw_resource_config(), raw_resource_config(),
create_opts() creation_opts()
) -> {ok, resource_data()} | {error, term()}. ) -> {ok, resource_data()} | {error, term()}.
check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) -> check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) ->
check_and_do( check_and_do(
@ -378,7 +399,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) ->
resource_id(), resource_id(),
resource_type(), resource_type(),
raw_resource_config(), raw_resource_config(),
create_opts() creation_opts()
) -> ) ->
{ok, resource_data()} | {error, term()}. {ok, resource_data()} | {error, term()}.
check_and_recreate(ResId, ResourceType, RawConfig, Opts) -> check_and_recreate(ResId, ResourceType, RawConfig, Opts) ->
@ -392,7 +413,7 @@ check_and_recreate(ResId, ResourceType, RawConfig, Opts) ->
resource_id(), resource_id(),
resource_type(), resource_type(),
raw_resource_config(), raw_resource_config(),
create_opts() creation_opts()
) -> ) ->
{ok, resource_data()} | {error, term()}. {ok, resource_data()} | {error, term()}.
check_and_recreate_local(ResId, ResourceType, RawConfig, Opts) -> check_and_recreate_local(ResId, ResourceType, RawConfig, Opts) ->

View File

@ -85,7 +85,7 @@ manager_id_to_resource_id(MgrId) ->
resource_group(), resource_group(),
resource_type(), resource_type(),
resource_config(), resource_config(),
create_opts() creation_opts()
) -> {ok, resource_data()}. ) -> {ok, resource_data()}.
ensure_resource(ResId, Group, ResourceType, Config, Opts) -> ensure_resource(ResId, Group, ResourceType, Config, Opts) ->
case lookup(ResId) of case lookup(ResId) of
@ -97,7 +97,7 @@ ensure_resource(ResId, Group, ResourceType, Config, Opts) ->
end. end.
%% @doc Called from emqx_resource when recreating a resource which may or may not exist %% @doc Called from emqx_resource when recreating a resource which may or may not exist
-spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) -> -spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) ->
{ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}. {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}.
recreate(ResId, ResourceType, NewConfig, Opts) -> recreate(ResId, ResourceType, NewConfig, Opts) ->
case lookup(ResId) of case lookup(ResId) of
@ -166,7 +166,7 @@ remove(ResId, ClearMetrics) when is_binary(ResId) ->
safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION). safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION).
%% @doc Stops and then starts an instance that was already running %% @doc Stops and then starts an instance that was already running
-spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. -spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
restart(ResId, Opts) when is_binary(ResId) -> restart(ResId, Opts) when is_binary(ResId) ->
case safe_call(ResId, restart, ?T_OPERATION) of case safe_call(ResId, restart, ?T_OPERATION) of
ok -> ok ->
@ -177,7 +177,7 @@ restart(ResId, Opts) when is_binary(ResId) ->
end. end.
%% @doc Start the resource %% @doc Start the resource
-spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. -spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
start(ResId, Opts) -> start(ResId, Opts) ->
case safe_call(ResId, start, ?T_OPERATION) of case safe_call(ResId, start, ?T_OPERATION) of
ok -> ok ->

View File

@ -52,13 +52,6 @@
-export([reply_after_query/6, batch_reply_after_query/6]). -export([reply_after_query/6, batch_reply_after_query/6]).
-define(RESUME_INTERVAL, 15000).
%% count
-define(DEFAULT_BATCH_SIZE, 100).
%% milliseconds
-define(DEFAULT_BATCH_TIME, 10).
-define(Q_ITEM(REQUEST), {q_item, REQUEST}). -define(Q_ITEM(REQUEST), {q_item, REQUEST}).
-define(QUERY(FROM, REQUEST), {query, FROM, REQUEST}). -define(QUERY(FROM, REQUEST), {query, FROM, REQUEST}).
@ -69,8 +62,6 @@
{error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}} {error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}}
). ).
-define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}). -define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}).
-define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024).
-define(DEFAULT_INFLIGHT, 100).
-type id() :: binary(). -type id() :: binary().
-type query() :: {query, from(), request()}. -type query() :: {query, from(), request()}.
@ -122,7 +113,7 @@ init({Id, Index, Opts}) ->
Name = name(Id, Index), Name = name(Id, Index),
BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
Queue = Queue =
case maps:get(queue_enabled, Opts, false) of case maps:get(enable_queue, Opts, false) of
true -> true ->
replayq:open(#{ replayq:open(#{
dir => disk_queue_dir(Id, Index), dir => disk_queue_dir(Id, Index),
@ -144,7 +135,7 @@ init({Id, Index, Opts}) ->
%% if the resource worker is overloaded %% if the resource worker is overloaded
query_mode => maps:get(query_mode, Opts, sync), query_mode => maps:get(query_mode, Opts, sync),
async_inflight_window => maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), async_inflight_window => maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
batch_enabled => maps:get(batch_enabled, Opts, false), enable_batch => maps:get(enable_batch, Opts, false),
batch_size => BatchSize, batch_size => BatchSize,
batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
queue => Queue, queue => Queue,
@ -270,14 +261,14 @@ drop_head(Q) ->
ok = replayq:ack(Q1, AckRef), ok = replayq:ack(Q1, AckRef),
Q1. Q1.
query_or_acc(From, Request, #{batch_enabled := true, acc := Acc, acc_left := Left} = St0) -> query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left} = St0) ->
Acc1 = [?QUERY(From, Request) | Acc], Acc1 = [?QUERY(From, Request) | Acc],
St = St0#{acc := Acc1, acc_left := Left - 1}, St = St0#{acc := Acc1, acc_left := Left - 1},
case Left =< 1 of case Left =< 1 of
true -> flush(St); true -> flush(St);
false -> {keep_state, ensure_flush_timer(St)} false -> {keep_state, ensure_flush_timer(St)}
end; end;
query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id, query_mode := QM} = St) -> query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, query_mode := QM} = St) ->
QueryOpts = #{ QueryOpts = #{
inflight_name => maps:get(name, St), inflight_name => maps:get(name, St),
inflight_window => maps:get(async_inflight_window, St) inflight_window => maps:get(async_inflight_window, St)

View File

@ -38,7 +38,7 @@ introduced_in() ->
resource_group(), resource_group(),
resource_type(), resource_type(),
resource_config(), resource_config(),
create_opts() creation_opts()
) -> ) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}. {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create(ResId, Group, ResourceType, Config, Opts) -> create(ResId, Group, ResourceType, Config, Opts) ->
@ -58,7 +58,7 @@ create_dry_run(ResourceType, Config) ->
resource_id(), resource_id(),
resource_type(), resource_type(),
resource_config(), resource_config(),
create_opts() creation_opts()
) -> ) ->
{ok, resource_data()} | {error, Reason :: term()}. {ok, resource_data()} | {error, Reason :: term()}.
recreate(ResId, ResourceType, Config, Opts) -> recreate(ResId, ResourceType, Config, Opts) ->

View File

@ -0,0 +1,91 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_resource_schema).
-include("emqx_resource.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([namespace/0, roots/0, fields/1]).
%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
namespace() -> "resource_schema".
roots() -> [].
fields('creation_opts') ->
[
{query_mode, fun query_mode/1},
{resume_interval, fun resume_interval/1},
{async_inflight_window, fun async_inflight_window/1},
{enable_batch, fun enable_batch/1},
{batch_size, fun batch_size/1},
{batch_time, fun batch_time/1},
{enable_queue, fun enable_queue/1},
{max_queue_bytes, fun queue_max_bytes/1}
].
query_mode(type) -> enum([sync, async]);
query_mode(desc) -> ?DESC("query_mode");
query_mode(default) -> sync;
query_mode(required) -> false;
query_mode(_) -> undefined.
enable_batch(type) -> boolean();
enable_batch(required) -> false;
enable_batch(default) -> false;
enable_batch(desc) -> ?DESC("enable_batch");
enable_batch(_) -> undefined.
enable_queue(type) -> boolean();
enable_queue(required) -> false;
enable_queue(default) -> false;
enable_queue(desc) -> ?DESC("enable_queue");
enable_queue(_) -> undefined.
resume_interval(type) -> emqx_schema:duration_ms();
resume_interval(desc) -> ?DESC("resume_interval");
resume_interval(default) -> ?RESUME_INTERVAL;
resume_interval(required) -> false;
resume_interval(_) -> undefined.
async_inflight_window(type) -> pos_integer();
async_inflight_window(desc) -> ?DESC("async_inflight_window");
async_inflight_window(default) -> ?DEFAULT_INFLIGHT;
async_inflight_window(required) -> false;
async_inflight_window(_) -> undefined.
batch_size(type) -> pos_integer();
batch_size(desc) -> ?DESC("batch_size");
batch_size(default) -> ?DEFAULT_BATCH_SIZE;
batch_size(required) -> false;
batch_size(_) -> undefined.
batch_time(type) -> emqx_schema:duration_ms();
batch_time(desc) -> ?DESC("batch_time");
batch_time(default) -> ?DEFAULT_BATCH_TIME;
batch_time(required) -> false;
batch_time(_) -> undefined.
queue_max_bytes(type) -> emqx_schema:bytesize();
queue_max_bytes(desc) -> ?DESC("queue_max_bytes");
queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE;
queue_max_bytes(required) -> false;
queue_max_bytes(_) -> undefined.

View File

@ -211,7 +211,7 @@ t_batch_query_counter(_) ->
?DEFAULT_RESOURCE_GROUP, ?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource, register => true}, #{name => test_resource, register => true},
#{batch_enabled => true} #{enable_batch => true}
), ),
?check_trace( ?check_trace(

View File

@ -1,5 +1,5 @@
# Introduction # Introduction
This chart bootstraps an emqx deployment on a Kubernetes cluster using the Helm package manager. This chart bootstraps an emqx deployment on a Kubernetes cluster using the Helm package manager.
# Prerequisites # Prerequisites
+ Kubernetes 1.6+ + Kubernetes 1.6+
@ -8,7 +8,7 @@ This chart bootstraps an emqx deployment on a Kubernetes cluster using the Helm
# Installing the Chart # Installing the Chart
To install the chart with the release name `my-emqx`: To install the chart with the release name `my-emqx`:
+ From github + From github
``` ```
$ git clone https://github.com/emqx/emqx.git $ git clone https://github.com/emqx/emqx.git
$ cd emqx/deploy/charts/emqx $ cd emqx/deploy/charts/emqx
@ -31,58 +31,58 @@ $ helm del my-emqx
# Configuration # Configuration
The following table lists the configurable parameters of the emqx chart and their default values. The following table lists the configurable parameters of the emqx chart and their default values.
| Parameter | Description | Default Value | | Parameter | Description | Default Value |
| --- | --- | --- | |--------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------|
| `replicaCount` | It is recommended to have odd number of nodes in a cluster, otherwise the emqx cluster cannot be automatically healed in case of net-split. |3| | `replicaCount` | It is recommended to have odd number of nodes in a cluster, otherwise the emqx cluster cannot be automatically healed in case of net-split. | 3 |
| `image.repository` | EMQX Image name |emqx/emqx| | `image.repository` | EMQX Image name | emqx/emqx |
| `image.pullPolicy` | The image pull policy |IfNotPresent| | `image.pullPolicy` | The image pull policy | IfNotPresent |
| `image.pullSecrets ` | The image pull secrets |`[]` (does not add image pull secrets to deployed pods)| | `image.pullSecrets ` | The image pull secrets | `[]` (does not add image pull secrets to deployed pods) |
| `envFromSecret` | The name pull a secret in the same kubernetes namespace which contains values that will be added to the environment | nil | | `envFromSecret` | The name pull a secret in the same kubernetes namespace which contains values that will be added to the environment | nil |
| `recreatePods` | Forces the recreation of pods during upgrades, which can be useful to always apply the most recent configuration. | false | | `recreatePods` | Forces the recreation of pods during upgrades, which can be useful to always apply the most recent configuration. | false |
`podAnnotations ` | Annotations for pod | `{}` | `podAnnotations ` | Annotations for pod | `{}` |
`podManagementPolicy`| To redeploy a chart with existing PVC(s), the value must be set to Parallel to avoid deadlock | `Parallel` | `podManagementPolicy` | To redeploy a chart with existing PVC(s), the value must be set to Parallel to avoid deadlock | `Parallel` |
| `persistence.enabled` | Enable EMQX persistence using PVC |false| | `persistence.enabled` | Enable EMQX persistence using PVC | false |
| `persistence.storageClass` | Storage class of backing PVC |`nil` (uses alpha storage class annotation)| | `persistence.storageClass` | Storage class of backing PVC | `nil` (uses alpha storage class annotation) |
| `persistence.existingClaim` | EMQX data Persistent Volume existing claim name, evaluated as a template |""| | `persistence.existingClaim` | EMQX data Persistent Volume existing claim name, evaluated as a template | "" |
| `persistence.accessMode` | PVC Access Mode for EMQX volume |ReadWriteOnce| | `persistence.accessMode` | PVC Access Mode for EMQX volume | ReadWriteOnce |
| `persistence.size` | PVC Storage Request for EMQX volume |20Mi| | `persistence.size` | PVC Storage Request for EMQX volume | 20Mi |
| `initContainers` | Containers that run before the creation of EMQX containers. They can contain utilities or setup scripts. |`{}`| | `initContainers` | Containers that run before the creation of EMQX containers. They can contain utilities or setup scripts. | `{}` |
| `resources` | CPU/Memory resource requests/limits |{}| | `resources` | CPU/Memory resource requests/limits | {} |
| `nodeSelector` | Node labels for pod assignment |`{}`| | `nodeSelector` | Node labels for pod assignment | `{}` |
| `tolerations` | Toleration labels for pod assignment |`[]`| | `tolerations` | Toleration labels for pod assignment | `[]` |
| `affinity` | Map of node/pod affinities |`{}`| | `affinity` | Map of node/pod affinities | `{}` |
| `service.type` | Kubernetes Service type. |ClusterIP| | `service.type` | Kubernetes Service type. | ClusterIP |
| `service.mqtt` | Port for MQTT. |1883| | `service.mqtt` | Port for MQTT. | 1883 |
| `service.mqttssl` | Port for MQTT(SSL). |8883| | `service.mqttssl` | Port for MQTT(SSL). | 8883 |
| `service.mgmt` | Port for mgmt API. |8081| | `service.mgmt` | Port for mgmt API. | 8081 |
| `service.ws` | Port for WebSocket/HTTP. |8083| | `service.ws` | Port for WebSocket/HTTP. | 8083 |
| `service.wss` | Port for WSS/HTTPS. |8084| | `service.wss` | Port for WSS/HTTPS. | 8084 |
| `service.dashboard` | Port for dashboard. |18083| | `service.dashboard` | Port for dashboard. | 18083 |
| `service.nodePorts.mqtt` | Kubernetes node port for MQTT. |nil| | `service.nodePorts.mqtt` | Kubernetes node port for MQTT. | nil |
| `service.nodePorts.mqttssl` | Kubernetes node port for MQTT(SSL). |nil| | `service.nodePorts.mqttssl` | Kubernetes node port for MQTT(SSL). | nil |
| `service.nodePorts.mgmt` | Kubernetes node port for mgmt API. |nil| | `service.nodePorts.mgmt` | Kubernetes node port for mgmt API. | nil |
| `service.nodePorts.ws` | Kubernetes node port for WebSocket/HTTP. |nil| | `service.nodePorts.ws` | Kubernetes node port for WebSocket/HTTP. | nil |
| `service.nodePorts.wss` | Kubernetes node port for WSS/HTTPS. |nil| | `service.nodePorts.wss` | Kubernetes node port for WSS/HTTPS. | nil |
| `service.nodePorts.dashboard` | Kubernetes node port for dashboard. |nil| | `service.nodePorts.dashboard` | Kubernetes node port for dashboard. | nil |
| `service.loadBalancerIP` | loadBalancerIP for Service | nil | | `service.loadBalancerIP` | loadBalancerIP for Service | nil |
| `service.loadBalancerSourceRanges` | Address(es) that are allowed when service is LoadBalancer | [] | | `service.loadBalancerSourceRanges` | Address(es) that are allowed when service is LoadBalancer | [] |
| `service.externalIPs` | ExternalIPs for the service | [] | | `service.externalIPs` | ExternalIPs for the service | [] |
| `service.annotations` | Service annotations | {}(evaluated as a template)| | `service.annotations` | Service annotations | {}(evaluated as a template) |
| `ingress.dashboard.enabled` | Enable ingress for EMQX Dashboard | false | | `ingress.dashboard.enabled` | Enable ingress for EMQX Dashboard | false |
| `ingress.dashboard.ingressClassName` | Set the ingress class for EMQX Dashboard | | | `ingress.dashboard.ingressClassName` | Set the ingress class for EMQX Dashboard | |
| `ingress.dashboard.path` | Ingress path for EMQX Dashboard | / | | `ingress.dashboard.path` | Ingress path for EMQX Dashboard | / |
| `ingress.dashboard.pathType` | Ingress pathType for EMQX Dashboard | `ImplementationSpecific` | `ingress.dashboard.pathType` | Ingress pathType for EMQX Dashboard | `ImplementationSpecific` |
| `ingress.dashboard.hosts` | Ingress hosts for EMQX Mgmt API | dashboard.emqx.local | | `ingress.dashboard.hosts` | Ingress hosts for EMQX Mgmt API | dashboard.emqx.local |
| `ingress.dashboard.tls` | Ingress tls for EMQX Mgmt API | [] | | `ingress.dashboard.tls` | Ingress tls for EMQX Mgmt API | [] |
| `ingress.dashboard.annotations` | Ingress annotations for EMQX Mgmt API | {} | | `ingress.dashboard.annotations` | Ingress annotations for EMQX Mgmt API | {} |
| `ingress.mgmt.enabled` | Enable ingress for EMQX Mgmt API | false | | `ingress.mgmt.enabled` | Enable ingress for EMQX Mgmt API | false |
| `ingress.dashboard.ingressClassName` | Set the ingress class for EMQX Mgmt API | | | `ingress.dashboard.ingressClassName` | Set the ingress class for EMQX Mgmt API | |
| `ingress.mgmt.path` | Ingress path for EMQX Mgmt API | / | | `ingress.mgmt.path` | Ingress path for EMQX Mgmt API | / |
| `ingress.mgmt.hosts` | Ingress hosts for EMQX Mgmt API | api.emqx.local | | `ingress.mgmt.hosts` | Ingress hosts for EMQX Mgmt API | api.emqx.local |
| `ingress.mgmt.tls` | Ingress tls for EMQX Mgmt API | [] | | `ingress.mgmt.tls` | Ingress tls for EMQX Mgmt API | [] |
| `ingress.mgmt.annotations` | Ingress annotations for EMQX Mgmt API | {} | | `ingress.mgmt.annotations` | Ingress annotations for EMQX Mgmt API | {} |
| `metrics.enable` | If set to true, [prometheus-operator](https://github.com/prometheus-operator/prometheus-operator) needs to be installed, and emqx_prometheus needs to enable | false | | `metrics.enable` | If set to true, [prometheus-operator](https://github.com/prometheus-operator/prometheus-operator) needs to be installed, and emqx_prometheus needs to enable | false |
| `metrics.type` | Now we only supported "prometheus" | "prometheus" | | `metrics.type` | Now we only supported "prometheus" | "prometheus" |
## EMQX specific settings ## EMQX specific settings
The following table lists the configurable [EMQX](https://www.emqx.io/)-specific parameters of the chart and their default values. The following table lists the configurable [EMQX](https://www.emqx.io/)-specific parameters of the chart and their default values.

View File

@ -21,6 +21,11 @@
desc/1 desc/1
]). ]).
-type write_syntax() :: list().
-reflect_type([write_syntax/0]).
-typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}).
-export([to_influx_lines/1]).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% api %% api
@ -51,7 +56,7 @@ values(Protocol, get) ->
values(Protocol, post) -> values(Protocol, post) ->
case Protocol of case Protocol of
"influxdb_api_v2" -> "influxdb_api_v2" ->
SupportUint = <<"uint_value=${payload.uint_key}u">>; SupportUint = <<"uint_value=${payload.uint_key}u,">>;
_ -> _ ->
SupportUint = <<>> SupportUint = <<>>
end, end,
@ -65,7 +70,10 @@ values(Protocol, post) ->
write_syntax => write_syntax =>
<<"${topic},clientid=${clientid}", " ", "payload=${payload},", <<"${topic},clientid=${clientid}", " ", "payload=${payload},",
"${clientid}_int_value=${payload.int_key}i,", SupportUint/binary, "${clientid}_int_value=${payload.int_key}i,", SupportUint/binary,
"bool=${payload.bool}">> "bool=${payload.bool}">>,
enable_batch => false,
batch_size => 5,
batch_time => <<"1m">>
}; };
values(Protocol, put) -> values(Protocol, put) ->
values(Protocol, post). values(Protocol, post).
@ -104,7 +112,9 @@ fields("get_api_v2") ->
fields(Name) when fields(Name) when
Name == influxdb_udp orelse Name == influxdb_api_v1 orelse Name == influxdb_api_v2 Name == influxdb_udp orelse Name == influxdb_api_v1 orelse Name == influxdb_api_v2
-> ->
fields(basic) ++ connector_field(Name). fields(basic) ++
emqx_resource_schema:fields('creation_opts') ++
connector_field(Name).
method_fileds(post, ConnectorType) -> method_fileds(post, ConnectorType) ->
fields(basic) ++ connector_field(ConnectorType) ++ type_name_field(ConnectorType); fields(basic) ++ connector_field(ConnectorType) ++ type_name_field(ConnectorType);
@ -148,19 +158,21 @@ desc(_) ->
undefined. undefined.
write_syntax(type) -> write_syntax(type) ->
list(); ?MODULE:write_syntax();
write_syntax(required) -> write_syntax(required) ->
true; true;
write_syntax(validator) -> write_syntax(validator) ->
[?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")]; [?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")];
write_syntax(converter) -> write_syntax(converter) ->
fun converter_influx_lines/1; fun to_influx_lines/1;
write_syntax(desc) -> write_syntax(desc) ->
?DESC("write_syntax"); ?DESC("write_syntax");
write_syntax(format) ->
<<"sql">>;
write_syntax(_) -> write_syntax(_) ->
undefined. undefined.
converter_influx_lines(RawLines) -> to_influx_lines(RawLines) ->
Lines = string:tokens(str(RawLines), "\n"), Lines = string:tokens(str(RawLines), "\n"),
lists:reverse(lists:foldl(fun converter_influx_line/2, [], Lines)). lists:reverse(lists:foldl(fun converter_influx_line/2, [], Lines)).

View File

@ -17,6 +17,7 @@
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_batch_query/3,
on_get_status/2 on_get_status/2
]). ]).
@ -37,8 +38,29 @@ on_start(InstId, Config) ->
on_stop(_InstId, #{client := Client}) -> on_stop(_InstId, #{client := Client}) ->
influxdb:stop_client(Client). influxdb:stop_client(Client).
on_query(InstId, {send_message, Data}, State) -> on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
do_query(InstId, {send_message, Data}, State). case data_to_points(Data, SyntaxLines) of
{ok, Points} ->
do_query(InstId, Client, Points);
{error, ErrorPoints} = Err ->
log_error_points(InstId, ErrorPoints),
Err
end.
%% Once a Batched Data trans to points failed.
%% This batch query failed
on_batch_query(InstId, BatchData, State = #{write_syntax := SyntaxLines, client := Client}) ->
case on_get_status(InstId, State) of
connected ->
case parse_batch_data(InstId, BatchData, SyntaxLines) of
{ok, Points} ->
do_query(InstId, Client, Points);
{error, Reason} ->
{error, Reason}
end;
disconnected ->
{resource_down, disconnected}
end.
on_get_status(_InstId, #{client := Client}) -> on_get_status(_InstId, #{client := Client}) ->
case influxdb:is_alive(Client) of case influxdb:is_alive(Client) of
@ -79,7 +101,7 @@ fields("api_v2_put") ->
fields(basic) -> fields(basic) ->
[ [
{host, {host,
mk(binary(), #{required => true, default => <<"120.0.0.1">>, desc => ?DESC("host")})}, mk(binary(), #{required => true, default => <<"127.0.0.1">>, desc => ?DESC("host")})},
{port, mk(pos_integer(), #{required => true, default => 8086, desc => ?DESC("port")})}, {port, mk(pos_integer(), #{required => true, default => 8086, desc => ?DESC("port")})},
{precision, {precision,
mk(enum([ns, us, ms, s, m, h]), #{ mk(enum([ns, us, ms, s, m, h]), #{
@ -310,18 +332,7 @@ ssl_config(SSL = #{enable := true}) ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Query %% Query
do_query(InstId, {send_message, Data}, State = #{client := Client}) -> do_query(InstId, Client, Points) ->
{Points, Errs} = data_to_points(Data, State),
lists:foreach(
fun({error, Reason}) ->
?SLOG(error, #{
msg => "influxdb trans point failed",
connector => InstId,
reason => Reason
})
end,
Errs
),
case influxdb:write(Client, Points) of case influxdb:write(Client, Points) of
ok -> ok ->
?SLOG(debug, #{ ?SLOG(debug, #{
@ -376,11 +387,45 @@ to_maps_config(K, V, Res) ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Tags & Fields Data Trans %% Tags & Fields Data Trans
data_to_points(Data, #{write_syntax := Lines}) -> parse_batch_data(InstId, BatchData, SyntaxLines) ->
lines_to_points(Data, Lines, [], []). {Points, Errors} = lists:foldl(
fun({send_message, Data}, {ListOfPoints, ErrAccIn}) ->
case data_to_points(Data, SyntaxLines) of
{ok, Points} ->
{[Points | ListOfPoints], ErrAccIn};
{error, ErrorPoints} ->
log_error_points(InstId, ErrorPoints),
{ListOfPoints, ErrAccIn + 1}
end
end,
{[], 0},
BatchData
),
case Errors of
0 ->
{ok, lists:flatten(Points)};
_ ->
?SLOG(error, #{
msg => io_lib:format("InfluxDB trans point failed, count: ~p", [Errors]),
connector => InstId,
reason => points_trans_failed
}),
{error, points_trans_failed}
end.
lines_to_points(_, [], Points, Errs) -> data_to_points(Data, SyntaxLines) ->
{Points, Errs}; lines_to_points(Data, SyntaxLines, [], []).
%% When converting multiple rows data into InfluxDB Line Protocol, they are considered to be strongly correlated.
%% And once a row fails to convert, all of them are considered to have failed.
lines_to_points(_, [], Points, ErrorPoints) ->
case ErrorPoints of
[] ->
{ok, Points};
_ ->
%% ignore trans succeeded points
{error, ErrorPoints}
end;
lines_to_points( lines_to_points(
Data, Data,
[ [
@ -392,8 +437,8 @@ lines_to_points(
} }
| Rest | Rest
], ],
ResAcc, ResultPointsAcc,
ErrAcc ErrorPointsAcc
) -> ) ->
TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of
@ -406,9 +451,11 @@ lines_to_points(
tags => EncodeTags, tags => EncodeTags,
fields => EncodeFields fields => EncodeFields
}, },
lines_to_points(Data, Rest, [Point | ResAcc], ErrAcc); lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc);
BadTimestamp -> BadTimestamp ->
lines_to_points(Data, Rest, ResAcc, [{error, {bad_timestamp, BadTimestamp}} | ErrAcc]) lines_to_points(Data, Rest, ResultPointsAcc, [
{error, {bad_timestamp, BadTimestamp}} | ErrorPointsAcc
])
end. end.
maps_config_to_data(K, V, {Data, Res}) -> maps_config_to_data(K, V, {Data, Res}) ->
@ -461,3 +508,16 @@ data_filter(Bool) when is_boolean(Bool) -> Bool;
data_filter(Data) -> bin(Data). data_filter(Data) -> bin(Data).
bin(Data) -> emqx_plugin_libs_rule:bin(Data). bin(Data) -> emqx_plugin_libs_rule:bin(Data).
%% helper funcs
log_error_points(InstId, Errs) ->
lists:foreach(
fun({error, Reason}) ->
?SLOG(error, #{
msg => "influxdb trans point failed",
connector => InstId,
reason => Reason
})
end,
Errs
).