diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index 3a7339e27..80b3d1c09 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -54,8 +54,10 @@ init_per_suite(Config) -> end_per_suite(_Config) -> delete_all_bridges(), - ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource]), + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_connector_test_helpers:stop_apps([ + emqx_conf, emqx_bridge, emqx_resource, emqx_rule_engine + ]), _ = application:stop(emqx_connector), ok. @@ -92,6 +94,7 @@ init_per_group(InfluxDBType, Config0) when emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), ok = start_apps(), {ok, _} = application:ensure_all_started(emqx_connector), + emqx_mgmt_api_test_util:init_suite(), Config = [{use_tls, UseTLS} | Config0], {Name, ConfigString, InfluxDBConfig} = influxdb_config( apiv1, InfluxDBHost, InfluxDBPort, Config @@ -160,6 +163,7 @@ init_per_group(InfluxDBType, Config0) when emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), ok = start_apps(), {ok, _} = application:ensure_all_started(emqx_connector), + emqx_mgmt_api_test_util:init_suite(), Config = [{use_tls, UseTLS} | Config0], {Name, ConfigString, InfluxDBConfig} = influxdb_config( apiv2, InfluxDBHost, InfluxDBPort, Config @@ -223,6 +227,7 @@ end_per_group(_Group, _Config) -> ok. init_per_testcase(_Testcase, Config) -> + delete_all_rules(), delete_all_bridges(), Config. @@ -231,6 +236,7 @@ end_per_testcase(_Testcase, Config) -> ProxyPort = ?config(proxy_port, Config), ok = snabbkaffe:stop(), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + delete_all_rules(), delete_all_bridges(), ok. @@ -238,6 +244,14 @@ end_per_testcase(_Testcase, Config) -> %% Helper fns %%------------------------------------------------------------------------------ +start_apps() -> + %% some configs in emqx_conf app are mandatory + %% we want to make sure they are loaded before + %% ekka start in emqx_common_test_helpers:start_apps/1 + emqx_common_test_helpers:render_and_load_app_config(emqx_conf), + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]). + example_write_syntax() -> %% N.B.: this single space character is relevant <<"${topic},clientid=${clientid}", " ", "payload=${payload},", @@ -248,6 +262,7 @@ example_write_syntax() -> influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> EnableBatch = proplists:get_value(enable_batch, Config, true), + BatchSize = proplists:get_value(batch_size, Config, 100), QueryMode = proplists:get_value(query_mode, Config, sync), UseTLS = proplists:get_value(use_tls, Config, false), Name = atom_to_binary(?MODULE), @@ -265,17 +280,28 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> " resource_opts = {\n" " enable_batch = ~p\n" " query_mode = ~s\n" + " batch_size = ~b\n" " }\n" " ssl {\n" " enable = ~p\n" " verify = verify_none\n" " }\n" "}\n", - [Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS] + [ + Name, + InfluxDBHost, + InfluxDBPort, + WriteSyntax, + EnableBatch, + QueryMode, + BatchSize, + UseTLS + ] ), {Name, ConfigString, parse_and_check(ConfigString, Type, Name)}; influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> EnableBatch = proplists:get_value(enable_batch, Config, true), + BatchSize = proplists:get_value(batch_size, Config, 100), QueryMode = proplists:get_value(query_mode, Config, sync), UseTLS = proplists:get_value(use_tls, Config, false), Name = atom_to_binary(?MODULE), @@ -293,13 +319,23 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> " resource_opts = {\n" " enable_batch = ~p\n" " query_mode = ~s\n" + " batch_size = ~b\n" " }\n" " ssl {\n" " enable = ~p\n" " verify = verify_none\n" " }\n" "}\n", - [Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS] + [ + Name, + InfluxDBHost, + InfluxDBPort, + WriteSyntax, + EnableBatch, + QueryMode, + BatchSize, + UseTLS + ] ), {Name, ConfigString, parse_and_check(ConfigString, Type, Name)}. @@ -316,9 +352,13 @@ influxdb_type_bin(apiv2) -> <<"influxdb_api_v2">>. create_bridge(Config) -> + create_bridge(Config, _Overrides = #{}). + +create_bridge(Config, Overrides) -> Type = influxdb_type_bin(?config(influxdb_type, Config)), Name = ?config(influxdb_name, Config), - InfluxDBConfig = ?config(influxdb_config, Config), + InfluxDBConfig0 = ?config(influxdb_config, Config), + InfluxDBConfig = emqx_map_lib:deep_merge(InfluxDBConfig0, Overrides), emqx_bridge:create(Type, Name, InfluxDBConfig). delete_bridge(Config) -> @@ -334,6 +374,34 @@ delete_all_bridges() -> emqx_bridge:list() ). +delete_all_rules() -> + lists:foreach( + fun(#{id := RuleId}) -> + ok = emqx_rule_engine:delete_rule(RuleId) + end, + emqx_rule_engine:get_rules() + ). + +create_rule_and_action_http(Config) -> + create_rule_and_action_http(Config, _Overrides = #{}). + +create_rule_and_action_http(Config, Overrides) -> + InfluxDBName = ?config(influxdb_name, Config), + Type = influxdb_type_bin(?config(influxdb_type, Config)), + BridgeId = emqx_bridge_resource:bridge_id(Type, InfluxDBName), + Params0 = #{ + enable => true, + sql => <<"SELECT * FROM \"t/topic\"">>, + actions => [BridgeId] + }, + Params = emqx_map_lib:deep_merge(Params0, Overrides), + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + send_message(Config, Payload) -> Name = ?config(influxdb_name, Config), Type = influxdb_type_bin(?config(influxdb_type, Config)), @@ -851,10 +919,59 @@ t_write_failure(Config) -> ), ok. -start_apps() -> - %% some configs in emqx_conf app are mandatory - %% we want to make sure they are loaded before - %% ekka start in emqx_common_test_helpers:start_apps/1 - emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge]). +t_missing_field(Config) -> + QueryMode = ?config(query_mode, Config), + EnableBatch = ?config(enable_batch, Config), + {ok, _} = + create_bridge( + Config, + #{ + <<"resource_opts">> => #{<<"batch_size">> => 1}, + <<"write_syntax">> => <<"${clientid} foo=${foo}i">> + } + ), + %% note: we don't select foo here, but we interpolate it in the + %% fields, so it'll become undefined. + {ok, _} = create_rule_and_action_http(Config, #{sql => <<"select * from \"t/topic\"">>}), + ClientId0 = emqx_guid:to_hexstr(emqx_guid:gen()), + ClientId1 = emqx_guid:to_hexstr(emqx_guid:gen()), + %% Message with the field that we "forgot" to select in the rule + Msg0 = emqx_message:make(ClientId0, <<"t/topic">>, emqx_json:encode(#{foo => 123})), + %% Message without any fields + Msg1 = emqx_message:make(ClientId1, <<"t/topic">>, emqx_json:encode(#{})), + ?check_trace( + begin + emqx:publish(Msg0), + emqx:publish(Msg1), + {ok, _} = + snabbkaffe:block_until( + ?match_n_events(2, #{ + ?snk_kind := influxdb_connector_send_query_error, + mode := QueryMode + }), + _Timeout1 = 10_000 + ), + ok + end, + fun(Trace) -> + PersistedData0 = query_by_clientid(ClientId0, Config), + PersistedData1 = query_by_clientid(ClientId1, Config), + case EnableBatch of + true -> + ?assertMatch( + [#{error := points_trans_failed}, #{error := points_trans_failed} | _], + ?of_kind(influxdb_connector_send_query_error, Trace) + ); + false -> + ?assertMatch( + [#{error := [{error, no_fields}]}, #{error := [{error, no_fields}]} | _], + ?of_kind(influxdb_connector_send_query_error, Trace) + ) + end, + %% nothing should have been persisted + ?assertEqual(#{}, PersistedData0), + ?assertEqual(#{}, PersistedData1), + ok + end + ), + ok. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 2c5b8a8fc..3ef9fc352 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -446,15 +446,23 @@ lines_to_points( TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of [TimestampInt] when is_integer(TimestampInt) -> - {_, EncodeTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), - {_, EncodeFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), + {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), + {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), Point = #{ measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data), timestamp => TimestampInt, - tags => EncodeTags, - fields => EncodeFields + tags => EncodedTags, + fields => EncodedFields }, - lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc); + case map_size(EncodedFields) =:= 0 of + true -> + %% influxdb client doesn't like empty field maps... + lines_to_points(Data, Rest, ResultPointsAcc, [ + {error, no_fields} | ErrorPointsAcc + ]); + false -> + lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc) + end; BadTimestamp -> lines_to_points(Data, Rest, ResultPointsAcc, [ {error, {bad_timestamp, BadTimestamp}} | ErrorPointsAcc