fix(influxdb): check if fields are empty before sending

Related Issue: https://emqx.atlassian.net/browse/EMQX-8461

Currently, the InfluxDB client raises an error if an empty `fields`
map is passed to it for pushing data.

```
  14:03:35.563 [error] [InfluxDB] Encode [
    %{
      fields: %{},
      measurement: "t/topic",
      tags: %{},
      timestamp: 1670864615563
    }
  ] failed: :error :missing_field [
    {:influxdb_line, :encode_fields, 1,
     [
       file: '/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl',
       line: 60
     ]},
    {:influxdb_line, :encode_, 1,
     [
       file: '/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl',
       line: 44
     ]},
    {:influxdb_line, :"-encode_/1-fun-0-", 2,
     [
       file: '/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl',
       line: 38
     ]},
    {:influxdb, :write, 2,
     [file: '/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb.erl', line: 79]},
    {:emqx_ee_connector_influxdb, :do_query, 3,
     [
       file: '/home/thales/dev/emqx/emqx/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl',
       line: 322
     ]},
    {:emqx_resource_worker, :apply_query_fun, 6,
     [
       file: '/home/thales/dev/emqx/emqx/apps/emqx_resource/src/emqx_resource_worker.erl',
       line: 454
     ]},
    {:emqx_resource_worker, :query_or_acc, 3,
     [
       file: '/home/thales/dev/emqx/emqx/apps/emqx_resource/src/emqx_resource_worker.erl',
       line: 306
     ]},
    {:gen_statem, :loop_state_callback, 11, [file: 'gen_statem.erl', line: 1203]},
    {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}
  ]
  2022-12-12T14:03:35.563607-03:00 [error] [InfluxDB] Encode [#{fields => #{},measurement => <<"t/topic">>,tags => #{},timestamp => 1670864615563}] failed: error missing_field [{influxdb_line,encode_fields,1,[{file,"/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl"},{line,60}]},{influxdb_line,encode_,1,[{file,"/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl"},{line,44}]},{influxdb_line,'-encode_/1-fun-0-',2,[{file,"/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl"},{line,38}]},{influxdb,write,2,[{file,"/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb.erl"},{line,79}]},{emqx_ee_connector_influxdb,do_query,3,[{file,"/home/thales/dev/emqx/emqx/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl"},{line,322}]},{emqx_resource_worker,apply_query_fun,6,[{file,"/home/thales/dev/emqx/emqx/apps/emqx_resource/src/emqx_resource_worker.erl"},{line,454}]},{emqx_resource_worker,query_or_acc,3,[{file,"/home/thales/dev/emqx/emqx/apps/emqx_resource/src/emqx_resource_worker.erl"},{line,306}]},{gen_statem,loop_state_callback,11,[{file,"gen_statem.erl"},{line,1203}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]

  14:03:35.565 [error] [connector: "bridge:influxdb_api_v2:aaa", msg: 'influxdb write point failed', reason: :missing_field]
  2022-12-12T14:03:35.565345-03:00 [error] connector: <<"bridge:influxdb_api_v2:aaa">>, line: 335, mfa: emqx_ee_connector_influxdb:do_query/3, msg: influxdb write point failed, reason: missing_field
  []

  14:03:35.565 [error] [id: "bridge:influxdb_api_v2:aaa", msg: :send_error, reason: :missing_field]
  iex(emqx@127.0.0.1)2> 2022-12-12T14:03:35.565913-03:00 [error] id: <<"bridge:influxdb_api_v2:aaa">>, line: 396, mfa: emqx_resource_worker:handle_query_result/4, msg: send_error, reason: missing_field
```

Instead of raising, we check if the interpolation resulted in an empty
map due to lack of context and just fail the push more gracefully.

Related to this, the original issue _appears_ to be related to a
frontend issue (to be confirmed and fixed separately), where the
it is not encoding the field types:
https://emqx.atlassian.net/browse/EMQX-8461?focusedCommentId=24805
This commit is contained in:
Thales Macedo Garitezi 2022-12-12 15:41:22 -03:00
parent 5b3ad54c1b
commit c0b208dd9e
2 changed files with 142 additions and 17 deletions

View File

@ -54,8 +54,10 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
delete_all_bridges(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource]),
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_connector_test_helpers:stop_apps([
emqx_conf, emqx_bridge, emqx_resource, emqx_rule_engine
]),
_ = application:stop(emqx_connector),
ok.
@ -92,6 +94,7 @@ init_per_group(InfluxDBType, Config0) when
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
ok = start_apps(),
{ok, _} = application:ensure_all_started(emqx_connector),
emqx_mgmt_api_test_util:init_suite(),
Config = [{use_tls, UseTLS} | Config0],
{Name, ConfigString, InfluxDBConfig} = influxdb_config(
apiv1, InfluxDBHost, InfluxDBPort, Config
@ -160,6 +163,7 @@ init_per_group(InfluxDBType, Config0) when
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
ok = start_apps(),
{ok, _} = application:ensure_all_started(emqx_connector),
emqx_mgmt_api_test_util:init_suite(),
Config = [{use_tls, UseTLS} | Config0],
{Name, ConfigString, InfluxDBConfig} = influxdb_config(
apiv2, InfluxDBHost, InfluxDBPort, Config
@ -223,6 +227,7 @@ end_per_group(_Group, _Config) ->
ok.
init_per_testcase(_Testcase, Config) ->
delete_all_rules(),
delete_all_bridges(),
Config.
@ -231,6 +236,7 @@ end_per_testcase(_Testcase, Config) ->
ProxyPort = ?config(proxy_port, Config),
ok = snabbkaffe:stop(),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
delete_all_rules(),
delete_all_bridges(),
ok.
@ -238,6 +244,14 @@ end_per_testcase(_Testcase, Config) ->
%% Helper fns
%%------------------------------------------------------------------------------
start_apps() ->
%% some configs in emqx_conf app are mandatory
%% we want to make sure they are loaded before
%% ekka start in emqx_common_test_helpers:start_apps/1
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]).
example_write_syntax() ->
%% N.B.: this single space character is relevant
<<"${topic},clientid=${clientid}", " ", "payload=${payload},",
@ -248,6 +262,7 @@ example_write_syntax() ->
influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
EnableBatch = proplists:get_value(enable_batch, Config, true),
BatchSize = proplists:get_value(batch_size, Config, 100),
QueryMode = proplists:get_value(query_mode, Config, sync),
UseTLS = proplists:get_value(use_tls, Config, false),
Name = atom_to_binary(?MODULE),
@ -265,17 +280,28 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" resource_opts = {\n"
" enable_batch = ~p\n"
" query_mode = ~s\n"
" batch_size = ~b\n"
" }\n"
" ssl {\n"
" enable = ~p\n"
" verify = verify_none\n"
" }\n"
"}\n",
[Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS]
[
Name,
InfluxDBHost,
InfluxDBPort,
WriteSyntax,
EnableBatch,
QueryMode,
BatchSize,
UseTLS
]
),
{Name, ConfigString, parse_and_check(ConfigString, Type, Name)};
influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
EnableBatch = proplists:get_value(enable_batch, Config, true),
BatchSize = proplists:get_value(batch_size, Config, 100),
QueryMode = proplists:get_value(query_mode, Config, sync),
UseTLS = proplists:get_value(use_tls, Config, false),
Name = atom_to_binary(?MODULE),
@ -293,13 +319,23 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" resource_opts = {\n"
" enable_batch = ~p\n"
" query_mode = ~s\n"
" batch_size = ~b\n"
" }\n"
" ssl {\n"
" enable = ~p\n"
" verify = verify_none\n"
" }\n"
"}\n",
[Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS]
[
Name,
InfluxDBHost,
InfluxDBPort,
WriteSyntax,
EnableBatch,
QueryMode,
BatchSize,
UseTLS
]
),
{Name, ConfigString, parse_and_check(ConfigString, Type, Name)}.
@ -316,9 +352,13 @@ influxdb_type_bin(apiv2) ->
<<"influxdb_api_v2">>.
create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}).
create_bridge(Config, Overrides) ->
Type = influxdb_type_bin(?config(influxdb_type, Config)),
Name = ?config(influxdb_name, Config),
InfluxDBConfig = ?config(influxdb_config, Config),
InfluxDBConfig0 = ?config(influxdb_config, Config),
InfluxDBConfig = emqx_map_lib:deep_merge(InfluxDBConfig0, Overrides),
emqx_bridge:create(Type, Name, InfluxDBConfig).
delete_bridge(Config) ->
@ -334,6 +374,34 @@ delete_all_bridges() ->
emqx_bridge:list()
).
delete_all_rules() ->
lists:foreach(
fun(#{id := RuleId}) ->
ok = emqx_rule_engine:delete_rule(RuleId)
end,
emqx_rule_engine:get_rules()
).
create_rule_and_action_http(Config) ->
create_rule_and_action_http(Config, _Overrides = #{}).
create_rule_and_action_http(Config, Overrides) ->
InfluxDBName = ?config(influxdb_name, Config),
Type = influxdb_type_bin(?config(influxdb_type, Config)),
BridgeId = emqx_bridge_resource:bridge_id(Type, InfluxDBName),
Params0 = #{
enable => true,
sql => <<"SELECT * FROM \"t/topic\"">>,
actions => [BridgeId]
},
Params = emqx_map_lib:deep_merge(Params0, Overrides),
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
{ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])};
Error -> Error
end.
send_message(Config, Payload) ->
Name = ?config(influxdb_name, Config),
Type = influxdb_type_bin(?config(influxdb_type, Config)),
@ -851,10 +919,59 @@ t_write_failure(Config) ->
),
ok.
start_apps() ->
%% some configs in emqx_conf app are mandatory
%% we want to make sure they are loaded before
%% ekka start in emqx_common_test_helpers:start_apps/1
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge]).
t_missing_field(Config) ->
QueryMode = ?config(query_mode, Config),
EnableBatch = ?config(enable_batch, Config),
{ok, _} =
create_bridge(
Config,
#{
<<"resource_opts">> => #{<<"batch_size">> => 1},
<<"write_syntax">> => <<"${clientid} foo=${foo}i">>
}
),
%% note: we don't select foo here, but we interpolate it in the
%% fields, so it'll become undefined.
{ok, _} = create_rule_and_action_http(Config, #{sql => <<"select * from \"t/topic\"">>}),
ClientId0 = emqx_guid:to_hexstr(emqx_guid:gen()),
ClientId1 = emqx_guid:to_hexstr(emqx_guid:gen()),
%% Message with the field that we "forgot" to select in the rule
Msg0 = emqx_message:make(ClientId0, <<"t/topic">>, emqx_json:encode(#{foo => 123})),
%% Message without any fields
Msg1 = emqx_message:make(ClientId1, <<"t/topic">>, emqx_json:encode(#{})),
?check_trace(
begin
emqx:publish(Msg0),
emqx:publish(Msg1),
{ok, _} =
snabbkaffe:block_until(
?match_n_events(2, #{
?snk_kind := influxdb_connector_send_query_error,
mode := QueryMode
}),
_Timeout1 = 10_000
),
ok
end,
fun(Trace) ->
PersistedData0 = query_by_clientid(ClientId0, Config),
PersistedData1 = query_by_clientid(ClientId1, Config),
case EnableBatch of
true ->
?assertMatch(
[#{error := points_trans_failed}, #{error := points_trans_failed} | _],
?of_kind(influxdb_connector_send_query_error, Trace)
);
false ->
?assertMatch(
[#{error := [{error, no_fields}]}, #{error := [{error, no_fields}]} | _],
?of_kind(influxdb_connector_send_query_error, Trace)
)
end,
%% nothing should have been persisted
?assertEqual(#{}, PersistedData0),
?assertEqual(#{}, PersistedData1),
ok
end
),
ok.

View File

@ -446,15 +446,23 @@ lines_to_points(
TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of
[TimestampInt] when is_integer(TimestampInt) ->
{_, EncodeTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
{_, EncodeFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
{_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
{_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
Point = #{
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data),
timestamp => TimestampInt,
tags => EncodeTags,
fields => EncodeFields
tags => EncodedTags,
fields => EncodedFields
},
lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc);
case map_size(EncodedFields) =:= 0 of
true ->
%% influxdb client doesn't like empty field maps...
lines_to_points(Data, Rest, ResultPointsAcc, [
{error, no_fields} | ErrorPointsAcc
]);
false ->
lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc)
end;
BadTimestamp ->
lines_to_points(Data, Rest, ResultPointsAcc, [
{error, {bad_timestamp, BadTimestamp}} | ErrorPointsAcc