From a573b9b38ca4048a49b3a9d645f5584ef93f7516 Mon Sep 17 00:00:00 2001 From: Paulo Zulato Date: Mon, 12 Jun 2023 21:25:13 -0300 Subject: [PATCH] fix(influxdb): check authentication Checks authentication on bridge start and get status. Also, handle authentication error when sending message. Fixes https://emqx.atlassian.net/browse/EMQX-10213 --- apps/emqx_bridge_influxdb/rebar.config | 2 +- .../src/emqx_bridge_influxdb_connector.erl | 51 +++++-- .../test/emqx_bridge_influxdb_SUITE.erl | 128 ++++++++++++++++++ changes/ee/fix-11031.en.md | 1 + mix.exs | 2 +- 5 files changed, 170 insertions(+), 14 deletions(-) create mode 100644 changes/ee/fix-11031.en.md diff --git a/apps/emqx_bridge_influxdb/rebar.config b/apps/emqx_bridge_influxdb/rebar.config index 0b11423c4..29ae34009 100644 --- a/apps/emqx_bridge_influxdb/rebar.config +++ b/apps/emqx_bridge_influxdb/rebar.config @@ -1,7 +1,7 @@ {erl_opts, [debug_info]}. {deps, [ - {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}}, + {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.10"}}}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index adf61918a..1fe5b4f78 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -150,7 +150,7 @@ on_batch_query_async( end. on_get_status(_InstId, #{client := Client}) -> - case influxdb:is_alive(Client) of + case influxdb:is_alive(Client) andalso ok =:= influxdb:check_auth(Client) of true -> connected; false -> @@ -262,17 +262,32 @@ do_start_client( {ok, Client} -> case influxdb:is_alive(Client, true) of true -> - State = #{ - client => Client, - write_syntax => to_config(Lines, Precision) - }, - ?SLOG(info, #{ - msg => "starting influxdb connector success", - connector => InstId, - client => redact_auth(Client), - state => redact_auth(State) - }), - {ok, State}; + case influxdb:check_auth(Client) of + ok -> + State = #{ + client => Client, + write_syntax => to_config(Lines, Precision) + }, + ?SLOG(info, #{ + msg => "starting influxdb connector success", + connector => InstId, + client => redact_auth(Client), + state => redact_auth(State) + }), + {ok, State}; + Error -> + ?tp(influxdb_connector_start_failed, #{error => auth_error}), + ?SLOG(warning, #{ + msg => "failed_to_start_influxdb_connector", + error => Error, + connector => InstId, + client => redact_auth(Client), + reason => auth_error + }), + %% no leak + _ = influxdb:stop_client(Client), + {error, influxdb_client_auth_error} + end; {false, Reason} -> ?tp(influxdb_connector_start_failed, #{ error => influxdb_client_not_alive, reason => Reason @@ -388,6 +403,14 @@ do_query(InstId, Client, Points) -> connector => InstId, points => Points }); + {error, {401, _, _}} -> + ?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}), + ?SLOG(error, #{ + msg => "influxdb_authorization_failed", + client => redact_auth(Client), + connector => InstId + }), + {error, {unrecoverable_error, <<"authorization failure">>}}; {error, Reason} = Err -> ?tp(influxdb_connector_do_query_failure, #{error => Reason}), ?SLOG(error, #{ @@ -421,6 +444,10 @@ reply_callback(ReplyFunAndArgs, {error, Reason} = Error) -> Result = {error, {recoverable_error, Reason}}, emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) end; +reply_callback(ReplyFunAndArgs, {ok, 401, _, _}) -> + ?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}), + Result = {error, {unrecoverable_error, <<"authorization failure">>}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); reply_callback(ReplyFunAndArgs, Result) -> emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result). diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index 8421f4e21..f97e5e977 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -1058,3 +1058,131 @@ t_missing_field(Config) -> end ), ok. + +t_authentication_error(Config0) -> + InfluxDBType = ?config(influxdb_type, Config0), + InfluxConfig0 = proplists:get_value(influxdb_config, Config0), + InfluxConfig = + case InfluxDBType of + apiv1 -> InfluxConfig0#{<<"password">> => <<"wrong_password">>}; + apiv2 -> InfluxConfig0#{<<"token">> => <<"wrong_token">>} + end, + Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}), + ?check_trace( + begin + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := influxdb_connector_start_failed}, + 10_000 + ) + end, + fun(Trace) -> + ?assertMatch( + [#{error := auth_error} | _], + ?of_kind(influxdb_connector_start_failed, Trace) + ), + ok + end + ), + ok. + +t_authentication_error_on_get_status(Config0) -> + ResourceId = resource_id(Config0), + + % Fake initialization to simulate credential update after bridge was created. + emqx_common_test_helpers:with_mock( + influxdb, + check_auth, + fun(_) -> + ok + end, + fun() -> + InfluxDBType = ?config(influxdb_type, Config0), + InfluxConfig0 = proplists:get_value(influxdb_config, Config0), + InfluxConfig = + case InfluxDBType of + apiv1 -> InfluxConfig0#{<<"password">> => <<"wrong_password">>}; + apiv2 -> InfluxConfig0#{<<"token">> => <<"wrong_token">>} + end, + Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}), + {ok, _} = create_bridge(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 10, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ) + end + ), + + % Now back to wrong credentials + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), + ok. + +t_authentication_error_on_send_message(Config0) -> + ResourceId = resource_id(Config0), + QueryMode = proplists:get_value(query_mode, Config0, sync), + InfluxDBType = ?config(influxdb_type, Config0), + InfluxConfig0 = proplists:get_value(influxdb_config, Config0), + InfluxConfig = + case InfluxDBType of + apiv1 -> InfluxConfig0#{<<"password">> => <<"wrong_password">>}; + apiv2 -> InfluxConfig0#{<<"token">> => <<"wrong_token">>} + end, + Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}), + + % Fake initialization to simulate credential update after bridge was created. + emqx_common_test_helpers:with_mock( + influxdb, + check_auth, + fun(_) -> + ok + end, + fun() -> + {ok, _} = create_bridge(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 10, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ) + end + ), + + % Now back to wrong credentials + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{ + int_key => -123, + bool => true, + float_key => 24.5, + uint_key => 123 + }, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"timestamp">> => erlang:system_time(millisecond), + <<"payload">> => Payload + }, + case QueryMode of + sync -> + ?assertMatch( + {error, {unrecoverable_error, <<"authorization failure">>}}, + send_message(Config, SentData) + ); + async -> + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config, SentData)), + #{?snk_kind := handle_async_reply}, + 1_000 + ) + end, + fun(Trace) -> + ?assertMatch( + [#{error := <<"authorization failure">>} | _], + ?of_kind(influxdb_connector_do_query_failure, Trace) + ), + ok + end + ) + end, + ok. diff --git a/changes/ee/fix-11031.en.md b/changes/ee/fix-11031.en.md new file mode 100644 index 000000000..346078345 --- /dev/null +++ b/changes/ee/fix-11031.en.md @@ -0,0 +1 @@ +Fixed credential validation when creating bridge and checking status for InfluxDB Bridges. diff --git a/mix.exs b/mix.exs index 028a5dfff..e1123e7a1 100644 --- a/mix.exs +++ b/mix.exs @@ -193,7 +193,7 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, - {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.9", override: true}, + {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.10", override: true}, {:wolff, github: "kafka4beam/wolff", tag: "1.7.5"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0"},