Merge pull request #9534 from thalesmg/fix-influxdb-no-fields-50

fix(influxdb): check if fields are empty before sending
This commit is contained in:
JimMoen 2022-12-20 10:43:15 +08:00 committed by GitHub
commit 71d40c0490
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 142 additions and 17 deletions

View File

@ -54,8 +54,10 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
delete_all_bridges(), delete_all_bridges(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]), emqx_mgmt_api_test_util:end_suite(),
ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource]), ok = emqx_connector_test_helpers:stop_apps([
emqx_conf, emqx_bridge, emqx_resource, emqx_rule_engine
]),
_ = application:stop(emqx_connector), _ = application:stop(emqx_connector),
ok. ok.
@ -92,6 +94,7 @@ init_per_group(InfluxDBType, Config0) when
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
ok = start_apps(), ok = start_apps(),
{ok, _} = application:ensure_all_started(emqx_connector), {ok, _} = application:ensure_all_started(emqx_connector),
emqx_mgmt_api_test_util:init_suite(),
Config = [{use_tls, UseTLS} | Config0], Config = [{use_tls, UseTLS} | Config0],
{Name, ConfigString, InfluxDBConfig} = influxdb_config( {Name, ConfigString, InfluxDBConfig} = influxdb_config(
apiv1, InfluxDBHost, InfluxDBPort, Config apiv1, InfluxDBHost, InfluxDBPort, Config
@ -160,6 +163,7 @@ init_per_group(InfluxDBType, Config0) when
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
ok = start_apps(), ok = start_apps(),
{ok, _} = application:ensure_all_started(emqx_connector), {ok, _} = application:ensure_all_started(emqx_connector),
emqx_mgmt_api_test_util:init_suite(),
Config = [{use_tls, UseTLS} | Config0], Config = [{use_tls, UseTLS} | Config0],
{Name, ConfigString, InfluxDBConfig} = influxdb_config( {Name, ConfigString, InfluxDBConfig} = influxdb_config(
apiv2, InfluxDBHost, InfluxDBPort, Config apiv2, InfluxDBHost, InfluxDBPort, Config
@ -223,6 +227,7 @@ end_per_group(_Group, _Config) ->
ok. ok.
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
delete_all_rules(),
delete_all_bridges(), delete_all_bridges(),
Config. Config.
@ -231,6 +236,7 @@ end_per_testcase(_Testcase, Config) ->
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
delete_all_rules(),
delete_all_bridges(), delete_all_bridges(),
ok. ok.
@ -238,6 +244,14 @@ end_per_testcase(_Testcase, Config) ->
%% Helper fns %% Helper fns
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
start_apps() ->
%% some configs in emqx_conf app are mandatory
%% we want to make sure they are loaded before
%% ekka start in emqx_common_test_helpers:start_apps/1
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]).
example_write_syntax() -> example_write_syntax() ->
%% N.B.: this single space character is relevant %% N.B.: this single space character is relevant
<<"${topic},clientid=${clientid}", " ", "payload=${payload},", <<"${topic},clientid=${clientid}", " ", "payload=${payload},",
@ -248,6 +262,7 @@ example_write_syntax() ->
influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
EnableBatch = proplists:get_value(enable_batch, Config, true), EnableBatch = proplists:get_value(enable_batch, Config, true),
BatchSize = proplists:get_value(batch_size, Config, 100),
QueryMode = proplists:get_value(query_mode, Config, sync), QueryMode = proplists:get_value(query_mode, Config, sync),
UseTLS = proplists:get_value(use_tls, Config, false), UseTLS = proplists:get_value(use_tls, Config, false),
Name = atom_to_binary(?MODULE), Name = atom_to_binary(?MODULE),
@ -265,17 +280,28 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" resource_opts = {\n" " resource_opts = {\n"
" enable_batch = ~p\n" " enable_batch = ~p\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" batch_size = ~b\n"
" }\n" " }\n"
" ssl {\n" " ssl {\n"
" enable = ~p\n" " enable = ~p\n"
" verify = verify_none\n" " verify = verify_none\n"
" }\n" " }\n"
"}\n", "}\n",
[Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS] [
Name,
InfluxDBHost,
InfluxDBPort,
WriteSyntax,
EnableBatch,
QueryMode,
BatchSize,
UseTLS
]
), ),
{Name, ConfigString, parse_and_check(ConfigString, Type, Name)}; {Name, ConfigString, parse_and_check(ConfigString, Type, Name)};
influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
EnableBatch = proplists:get_value(enable_batch, Config, true), EnableBatch = proplists:get_value(enable_batch, Config, true),
BatchSize = proplists:get_value(batch_size, Config, 100),
QueryMode = proplists:get_value(query_mode, Config, sync), QueryMode = proplists:get_value(query_mode, Config, sync),
UseTLS = proplists:get_value(use_tls, Config, false), UseTLS = proplists:get_value(use_tls, Config, false),
Name = atom_to_binary(?MODULE), Name = atom_to_binary(?MODULE),
@ -293,13 +319,23 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" resource_opts = {\n" " resource_opts = {\n"
" enable_batch = ~p\n" " enable_batch = ~p\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" batch_size = ~b\n"
" }\n" " }\n"
" ssl {\n" " ssl {\n"
" enable = ~p\n" " enable = ~p\n"
" verify = verify_none\n" " verify = verify_none\n"
" }\n" " }\n"
"}\n", "}\n",
[Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS] [
Name,
InfluxDBHost,
InfluxDBPort,
WriteSyntax,
EnableBatch,
QueryMode,
BatchSize,
UseTLS
]
), ),
{Name, ConfigString, parse_and_check(ConfigString, Type, Name)}. {Name, ConfigString, parse_and_check(ConfigString, Type, Name)}.
@ -316,9 +352,13 @@ influxdb_type_bin(apiv2) ->
<<"influxdb_api_v2">>. <<"influxdb_api_v2">>.
create_bridge(Config) -> create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}).
create_bridge(Config, Overrides) ->
Type = influxdb_type_bin(?config(influxdb_type, Config)), Type = influxdb_type_bin(?config(influxdb_type, Config)),
Name = ?config(influxdb_name, Config), Name = ?config(influxdb_name, Config),
InfluxDBConfig = ?config(influxdb_config, Config), InfluxDBConfig0 = ?config(influxdb_config, Config),
InfluxDBConfig = emqx_map_lib:deep_merge(InfluxDBConfig0, Overrides),
emqx_bridge:create(Type, Name, InfluxDBConfig). emqx_bridge:create(Type, Name, InfluxDBConfig).
delete_bridge(Config) -> delete_bridge(Config) ->
@ -334,6 +374,34 @@ delete_all_bridges() ->
emqx_bridge:list() emqx_bridge:list()
). ).
delete_all_rules() ->
lists:foreach(
fun(#{id := RuleId}) ->
ok = emqx_rule_engine:delete_rule(RuleId)
end,
emqx_rule_engine:get_rules()
).
create_rule_and_action_http(Config) ->
create_rule_and_action_http(Config, _Overrides = #{}).
create_rule_and_action_http(Config, Overrides) ->
InfluxDBName = ?config(influxdb_name, Config),
Type = influxdb_type_bin(?config(influxdb_type, Config)),
BridgeId = emqx_bridge_resource:bridge_id(Type, InfluxDBName),
Params0 = #{
enable => true,
sql => <<"SELECT * FROM \"t/topic\"">>,
actions => [BridgeId]
},
Params = emqx_map_lib:deep_merge(Params0, Overrides),
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
{ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])};
Error -> Error
end.
send_message(Config, Payload) -> send_message(Config, Payload) ->
Name = ?config(influxdb_name, Config), Name = ?config(influxdb_name, Config),
Type = influxdb_type_bin(?config(influxdb_type, Config)), Type = influxdb_type_bin(?config(influxdb_type, Config)),
@ -851,10 +919,59 @@ t_write_failure(Config) ->
), ),
ok. ok.
start_apps() -> t_missing_field(Config) ->
%% some configs in emqx_conf app are mandatory QueryMode = ?config(query_mode, Config),
%% we want to make sure they are loaded before EnableBatch = ?config(enable_batch, Config),
%% ekka start in emqx_common_test_helpers:start_apps/1 {ok, _} =
emqx_common_test_helpers:render_and_load_app_config(emqx_conf), create_bridge(
ok = emqx_common_test_helpers:start_apps([emqx_conf]), Config,
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge]). #{
<<"resource_opts">> => #{<<"batch_size">> => 1},
<<"write_syntax">> => <<"${clientid} foo=${foo}i">>
}
),
%% note: we don't select foo here, but we interpolate it in the
%% fields, so it'll become undefined.
{ok, _} = create_rule_and_action_http(Config, #{sql => <<"select * from \"t/topic\"">>}),
ClientId0 = emqx_guid:to_hexstr(emqx_guid:gen()),
ClientId1 = emqx_guid:to_hexstr(emqx_guid:gen()),
%% Message with the field that we "forgot" to select in the rule
Msg0 = emqx_message:make(ClientId0, <<"t/topic">>, emqx_json:encode(#{foo => 123})),
%% Message without any fields
Msg1 = emqx_message:make(ClientId1, <<"t/topic">>, emqx_json:encode(#{})),
?check_trace(
begin
emqx:publish(Msg0),
emqx:publish(Msg1),
{ok, _} =
snabbkaffe:block_until(
?match_n_events(2, #{
?snk_kind := influxdb_connector_send_query_error,
mode := QueryMode
}),
_Timeout1 = 10_000
),
ok
end,
fun(Trace) ->
PersistedData0 = query_by_clientid(ClientId0, Config),
PersistedData1 = query_by_clientid(ClientId1, Config),
case EnableBatch of
true ->
?assertMatch(
[#{error := points_trans_failed}, #{error := points_trans_failed} | _],
?of_kind(influxdb_connector_send_query_error, Trace)
);
false ->
?assertMatch(
[#{error := [{error, no_fields}]}, #{error := [{error, no_fields}]} | _],
?of_kind(influxdb_connector_send_query_error, Trace)
)
end,
%% nothing should have been persisted
?assertEqual(#{}, PersistedData0),
?assertEqual(#{}, PersistedData1),
ok
end
),
ok.

View File

@ -446,15 +446,23 @@ lines_to_points(
TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of
[TimestampInt] when is_integer(TimestampInt) -> [TimestampInt] when is_integer(TimestampInt) ->
{_, EncodeTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
{_, EncodeFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
Point = #{ Point = #{
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data), measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data),
timestamp => TimestampInt, timestamp => TimestampInt,
tags => EncodeTags, tags => EncodedTags,
fields => EncodeFields fields => EncodedFields
}, },
lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc); case map_size(EncodedFields) =:= 0 of
true ->
%% influxdb client doesn't like empty field maps...
lines_to_points(Data, Rest, ResultPointsAcc, [
{error, no_fields} | ErrorPointsAcc
]);
false ->
lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc)
end;
BadTimestamp -> BadTimestamp ->
lines_to_points(Data, Rest, ResultPointsAcc, [ lines_to_points(Data, Rest, ResultPointsAcc, [
{error, {bad_timestamp, BadTimestamp}} | ErrorPointsAcc {error, {bad_timestamp, BadTimestamp}} | ErrorPointsAcc