From c0b208dd9e9b9b117deef73fb3aaf15429af692d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 12 Dec 2022 15:41:22 -0300 Subject: [PATCH] fix(influxdb): check if fields are empty before sending Related Issue: https://emqx.atlassian.net/browse/EMQX-8461 Currently, the InfluxDB client raises an error if an empty `fields` map is passed to it for pushing data. ``` 14:03:35.563 [error] [InfluxDB] Encode [ %{ fields: %{}, measurement: "t/topic", tags: %{}, timestamp: 1670864615563 } ] failed: :error :missing_field [ {:influxdb_line, :encode_fields, 1, [ file: '/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl', line: 60 ]}, {:influxdb_line, :encode_, 1, [ file: '/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl', line: 44 ]}, {:influxdb_line, :"-encode_/1-fun-0-", 2, [ file: '/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl', line: 38 ]}, {:influxdb, :write, 2, [file: '/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb.erl', line: 79]}, {:emqx_ee_connector_influxdb, :do_query, 3, [ file: '/home/thales/dev/emqx/emqx/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl', line: 322 ]}, {:emqx_resource_worker, :apply_query_fun, 6, [ file: '/home/thales/dev/emqx/emqx/apps/emqx_resource/src/emqx_resource_worker.erl', line: 454 ]}, {:emqx_resource_worker, :query_or_acc, 3, [ file: '/home/thales/dev/emqx/emqx/apps/emqx_resource/src/emqx_resource_worker.erl', line: 306 ]}, {:gen_statem, :loop_state_callback, 11, [file: 'gen_statem.erl', line: 1203]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]} ] 2022-12-12T14:03:35.563607-03:00 [error] [InfluxDB] Encode [#{fields => #{},measurement => <<"t/topic">>,tags => #{},timestamp => 1670864615563}] failed: error missing_field [{influxdb_line,encode_fields,1,[{file,"/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl"},{line,60}]},{influxdb_line,encode_,1,[{file,"/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl"},{line,44}]},{influxdb_line,'-encode_/1-fun-0-',2,[{file,"/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl"},{line,38}]},{influxdb,write,2,[{file,"/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb.erl"},{line,79}]},{emqx_ee_connector_influxdb,do_query,3,[{file,"/home/thales/dev/emqx/emqx/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl"},{line,322}]},{emqx_resource_worker,apply_query_fun,6,[{file,"/home/thales/dev/emqx/emqx/apps/emqx_resource/src/emqx_resource_worker.erl"},{line,454}]},{emqx_resource_worker,query_or_acc,3,[{file,"/home/thales/dev/emqx/emqx/apps/emqx_resource/src/emqx_resource_worker.erl"},{line,306}]},{gen_statem,loop_state_callback,11,[{file,"gen_statem.erl"},{line,1203}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}] 14:03:35.565 [error] [connector: "bridge:influxdb_api_v2:aaa", msg: 'influxdb write point failed', reason: :missing_field] 2022-12-12T14:03:35.565345-03:00 [error] connector: <<"bridge:influxdb_api_v2:aaa">>, line: 335, mfa: emqx_ee_connector_influxdb:do_query/3, msg: influxdb write point failed, reason: missing_field [] 14:03:35.565 [error] [id: "bridge:influxdb_api_v2:aaa", msg: :send_error, reason: :missing_field] iex(emqx@127.0.0.1)2> 2022-12-12T14:03:35.565913-03:00 [error] id: <<"bridge:influxdb_api_v2:aaa">>, line: 396, mfa: emqx_resource_worker:handle_query_result/4, msg: send_error, reason: missing_field ``` Instead of raising, we check if the interpolation resulted in an empty map due to lack of context and just fail the push more gracefully. Related to this, the original issue _appears_ to be related to a frontend issue (to be confirmed and fixed separately), where the it is not encoding the field types: https://emqx.atlassian.net/browse/EMQX-8461?focusedCommentId=24805 --- .../test/emqx_ee_bridge_influxdb_SUITE.erl | 141 ++++++++++++++++-- .../src/emqx_ee_connector_influxdb.erl | 18 ++- 2 files changed, 142 insertions(+), 17 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index 3a7339e27..80b3d1c09 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -54,8 +54,10 @@ init_per_suite(Config) -> end_per_suite(_Config) -> delete_all_bridges(), - ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource]), + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_connector_test_helpers:stop_apps([ + emqx_conf, emqx_bridge, emqx_resource, emqx_rule_engine + ]), _ = application:stop(emqx_connector), ok. @@ -92,6 +94,7 @@ init_per_group(InfluxDBType, Config0) when emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), ok = start_apps(), {ok, _} = application:ensure_all_started(emqx_connector), + emqx_mgmt_api_test_util:init_suite(), Config = [{use_tls, UseTLS} | Config0], {Name, ConfigString, InfluxDBConfig} = influxdb_config( apiv1, InfluxDBHost, InfluxDBPort, Config @@ -160,6 +163,7 @@ init_per_group(InfluxDBType, Config0) when emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), ok = start_apps(), {ok, _} = application:ensure_all_started(emqx_connector), + emqx_mgmt_api_test_util:init_suite(), Config = [{use_tls, UseTLS} | Config0], {Name, ConfigString, InfluxDBConfig} = influxdb_config( apiv2, InfluxDBHost, InfluxDBPort, Config @@ -223,6 +227,7 @@ end_per_group(_Group, _Config) -> ok. init_per_testcase(_Testcase, Config) -> + delete_all_rules(), delete_all_bridges(), Config. @@ -231,6 +236,7 @@ end_per_testcase(_Testcase, Config) -> ProxyPort = ?config(proxy_port, Config), ok = snabbkaffe:stop(), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + delete_all_rules(), delete_all_bridges(), ok. @@ -238,6 +244,14 @@ end_per_testcase(_Testcase, Config) -> %% Helper fns %%------------------------------------------------------------------------------ +start_apps() -> + %% some configs in emqx_conf app are mandatory + %% we want to make sure they are loaded before + %% ekka start in emqx_common_test_helpers:start_apps/1 + emqx_common_test_helpers:render_and_load_app_config(emqx_conf), + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]). + example_write_syntax() -> %% N.B.: this single space character is relevant <<"${topic},clientid=${clientid}", " ", "payload=${payload},", @@ -248,6 +262,7 @@ example_write_syntax() -> influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> EnableBatch = proplists:get_value(enable_batch, Config, true), + BatchSize = proplists:get_value(batch_size, Config, 100), QueryMode = proplists:get_value(query_mode, Config, sync), UseTLS = proplists:get_value(use_tls, Config, false), Name = atom_to_binary(?MODULE), @@ -265,17 +280,28 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> " resource_opts = {\n" " enable_batch = ~p\n" " query_mode = ~s\n" + " batch_size = ~b\n" " }\n" " ssl {\n" " enable = ~p\n" " verify = verify_none\n" " }\n" "}\n", - [Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS] + [ + Name, + InfluxDBHost, + InfluxDBPort, + WriteSyntax, + EnableBatch, + QueryMode, + BatchSize, + UseTLS + ] ), {Name, ConfigString, parse_and_check(ConfigString, Type, Name)}; influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> EnableBatch = proplists:get_value(enable_batch, Config, true), + BatchSize = proplists:get_value(batch_size, Config, 100), QueryMode = proplists:get_value(query_mode, Config, sync), UseTLS = proplists:get_value(use_tls, Config, false), Name = atom_to_binary(?MODULE), @@ -293,13 +319,23 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> " resource_opts = {\n" " enable_batch = ~p\n" " query_mode = ~s\n" + " batch_size = ~b\n" " }\n" " ssl {\n" " enable = ~p\n" " verify = verify_none\n" " }\n" "}\n", - [Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS] + [ + Name, + InfluxDBHost, + InfluxDBPort, + WriteSyntax, + EnableBatch, + QueryMode, + BatchSize, + UseTLS + ] ), {Name, ConfigString, parse_and_check(ConfigString, Type, Name)}. @@ -316,9 +352,13 @@ influxdb_type_bin(apiv2) -> <<"influxdb_api_v2">>. create_bridge(Config) -> + create_bridge(Config, _Overrides = #{}). + +create_bridge(Config, Overrides) -> Type = influxdb_type_bin(?config(influxdb_type, Config)), Name = ?config(influxdb_name, Config), - InfluxDBConfig = ?config(influxdb_config, Config), + InfluxDBConfig0 = ?config(influxdb_config, Config), + InfluxDBConfig = emqx_map_lib:deep_merge(InfluxDBConfig0, Overrides), emqx_bridge:create(Type, Name, InfluxDBConfig). delete_bridge(Config) -> @@ -334,6 +374,34 @@ delete_all_bridges() -> emqx_bridge:list() ). +delete_all_rules() -> + lists:foreach( + fun(#{id := RuleId}) -> + ok = emqx_rule_engine:delete_rule(RuleId) + end, + emqx_rule_engine:get_rules() + ). + +create_rule_and_action_http(Config) -> + create_rule_and_action_http(Config, _Overrides = #{}). + +create_rule_and_action_http(Config, Overrides) -> + InfluxDBName = ?config(influxdb_name, Config), + Type = influxdb_type_bin(?config(influxdb_type, Config)), + BridgeId = emqx_bridge_resource:bridge_id(Type, InfluxDBName), + Params0 = #{ + enable => true, + sql => <<"SELECT * FROM \"t/topic\"">>, + actions => [BridgeId] + }, + Params = emqx_map_lib:deep_merge(Params0, Overrides), + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + send_message(Config, Payload) -> Name = ?config(influxdb_name, Config), Type = influxdb_type_bin(?config(influxdb_type, Config)), @@ -851,10 +919,59 @@ t_write_failure(Config) -> ), ok. -start_apps() -> - %% some configs in emqx_conf app are mandatory - %% we want to make sure they are loaded before - %% ekka start in emqx_common_test_helpers:start_apps/1 - emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge]). +t_missing_field(Config) -> + QueryMode = ?config(query_mode, Config), + EnableBatch = ?config(enable_batch, Config), + {ok, _} = + create_bridge( + Config, + #{ + <<"resource_opts">> => #{<<"batch_size">> => 1}, + <<"write_syntax">> => <<"${clientid} foo=${foo}i">> + } + ), + %% note: we don't select foo here, but we interpolate it in the + %% fields, so it'll become undefined. + {ok, _} = create_rule_and_action_http(Config, #{sql => <<"select * from \"t/topic\"">>}), + ClientId0 = emqx_guid:to_hexstr(emqx_guid:gen()), + ClientId1 = emqx_guid:to_hexstr(emqx_guid:gen()), + %% Message with the field that we "forgot" to select in the rule + Msg0 = emqx_message:make(ClientId0, <<"t/topic">>, emqx_json:encode(#{foo => 123})), + %% Message without any fields + Msg1 = emqx_message:make(ClientId1, <<"t/topic">>, emqx_json:encode(#{})), + ?check_trace( + begin + emqx:publish(Msg0), + emqx:publish(Msg1), + {ok, _} = + snabbkaffe:block_until( + ?match_n_events(2, #{ + ?snk_kind := influxdb_connector_send_query_error, + mode := QueryMode + }), + _Timeout1 = 10_000 + ), + ok + end, + fun(Trace) -> + PersistedData0 = query_by_clientid(ClientId0, Config), + PersistedData1 = query_by_clientid(ClientId1, Config), + case EnableBatch of + true -> + ?assertMatch( + [#{error := points_trans_failed}, #{error := points_trans_failed} | _], + ?of_kind(influxdb_connector_send_query_error, Trace) + ); + false -> + ?assertMatch( + [#{error := [{error, no_fields}]}, #{error := [{error, no_fields}]} | _], + ?of_kind(influxdb_connector_send_query_error, Trace) + ) + end, + %% nothing should have been persisted + ?assertEqual(#{}, PersistedData0), + ?assertEqual(#{}, PersistedData1), + ok + end + ), + ok. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 2c5b8a8fc..3ef9fc352 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -446,15 +446,23 @@ lines_to_points( TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of [TimestampInt] when is_integer(TimestampInt) -> - {_, EncodeTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), - {_, EncodeFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), + {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), + {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), Point = #{ measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data), timestamp => TimestampInt, - tags => EncodeTags, - fields => EncodeFields + tags => EncodedTags, + fields => EncodedFields }, - lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc); + case map_size(EncodedFields) =:= 0 of + true -> + %% influxdb client doesn't like empty field maps... + lines_to_points(Data, Rest, ResultPointsAcc, [ + {error, no_fields} | ErrorPointsAcc + ]); + false -> + lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc) + end; BadTimestamp -> lines_to_points(Data, Rest, ResultPointsAcc, [ {error, {bad_timestamp, BadTimestamp}} | ErrorPointsAcc