diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl index 8f5a552a7..4228d23d5 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl @@ -165,7 +165,7 @@ roots() -> []. fields("config_connector") -> emqx_connector_schema:common_fields() ++ - connection_fields() ++ + emqx_bridge_influxdb_connector:fields("connector") ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields("post_api_v1") -> method_fields(post, influxdb_api_v1); @@ -204,7 +204,7 @@ fields(Field) when Field == "post_connector" -> Fields = - connection_fields() ++ + emqx_bridge_influxdb_connector:fields("connector") ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); fields(Field) when @@ -219,19 +219,6 @@ fields(Type) when influxdb_bridge_common_fields() ++ connector_fields(Type). -connection_fields() -> - [ - emqx_bridge_influxdb_connector:server_field(), - {parameters, - mk( - hoconsc:union([ - ref(emqx_bridge_influxdb_connector, "connector_" ++ T) - || T <- ["influxdb_api_v1", "influxdb_api_v2"] - ]), - #{required => true, desc => ?DESC("influxdb_parameters")} - )} - ] ++ emqx_connector_schema_lib:ssl_fields(). - method_fields(post, ConnectorType) -> influxdb_bridge_common_fields() ++ connector_fields(ConnectorType) ++ diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 8e7ca8a73..2b406011f 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -40,7 +40,7 @@ -export([transform_bridge_v1_config_to_connector_config/1]). --export([precision_field/0, server_field/0]). +-export([precision_field/0]). %% only for test -export([is_unrecoverable_error/1]). @@ -232,28 +232,30 @@ namespace() -> connector_influxdb. roots() -> [ {config, #{ - type => hoconsc:union( - [ - hoconsc:ref(?MODULE, influxdb_api_v1), - hoconsc:ref(?MODULE, influxdb_api_v2) - ] - ) + type => hoconsc:ref(?MODULE, "connector") }} ]. -fields(common) -> +fields("connector") -> [ server_field(), - precision_field() - ]; + parameter_field() + ] ++ emqx_connector_schema_lib:ssl_fields(); fields("connector_influxdb_api_v1") -> [influxdb_type_field(influxdb_api_v1) | influxdb_api_v1_fields()]; fields("connector_influxdb_api_v2") -> [influxdb_type_field(influxdb_api_v2) | influxdb_api_v2_fields()]; +%% ============ begin: schema for old bridge configs ============ fields(influxdb_api_v1) -> - fields(common) ++ influxdb_api_v1_fields() ++ emqx_connector_schema_lib:ssl_fields(); + fields(common) ++ influxdb_api_v1_fields(); fields(influxdb_api_v2) -> - fields(common) ++ influxdb_api_v2_fields() ++ emqx_connector_schema_lib:ssl_fields(). + fields(common) ++ influxdb_api_v2_fields(); +fields(common) -> + [ + server_field(), + precision_field() + ] ++ emqx_connector_schema_lib:ssl_fields(). +%% ============ end: schema for old bridge configs ============ influxdb_type_field(Type) -> {influxdb_type, #{ @@ -262,6 +264,7 @@ influxdb_type_field(Type) -> default => Type, desc => ?DESC(atom_to_list(Type)) }}. + server_field() -> {server, server()}. @@ -275,6 +278,16 @@ precision_field() -> required => false, default => ms, desc => ?DESC("precision") })}. +parameter_field() -> + {parameters, + mk( + hoconsc:union([ + ref(?MODULE, "connector_" ++ T) + || T <- ["influxdb_api_v1", "influxdb_api_v2"] + ]), + #{required => true, desc => ?DESC("influxdb_parameters")} + )}. + influxdb_api_v1_fields() -> [ {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, @@ -304,6 +317,8 @@ desc(influxdb_api_v1) -> ?DESC("influxdb_api_v1"); desc(influxdb_api_v2) -> ?DESC("influxdb_api_v2"); +desc("connector") -> + ?DESC("connector"); desc("connector_influxdb_api_v1") -> ?DESC("influxdb_api_v1"); desc("connector_influxdb_api_v2") -> @@ -411,8 +426,7 @@ client_config( {host, str(Host)}, {port, Port}, {pool_size, erlang:system_info(schedulers)}, - {pool, InstId}, - {precision, atom_to_binary(maps:get(precision, Config, ms), utf8)} + {pool, InstId} ] ++ protocol_config(Config). %% api v1 config diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index 02bfa60fa..d79139f17 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -943,6 +943,7 @@ t_create_disconnected(Config) -> ?of_kind(influxdb_connector_start_failed, Trace), case Reason of econnrefused -> ok; + closed -> ok; {closed, _} -> ok; _ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason]) end, diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl index 94e8e3fad..915662b24 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl @@ -66,7 +66,7 @@ t_lifecycle(Config) -> Port = ?config(influxdb_tcp_port, Config), perform_lifecycle_check( <<"emqx_bridge_influxdb_connector_SUITE">>, - influxdb_config(Host, Port, false, <<"verify_none">>) + influxdb_connector_config(Host, Port, false, <<"verify_none">>) ). perform_lifecycle_check(PoolName, InitialConfig) -> @@ -76,6 +76,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> % expects this FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()}, {ok, #{ + id := ResourceId, state := #{client := #{pool := ReturnedPoolName}} = State, status := InitialStatus }} = emqx_resource:create_local( @@ -93,8 +94,18 @@ perform_lifecycle_check(PoolName, InitialConfig) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + %% install actions to the connector + ActionConfig = influxdb_action_config(), + ChannelId = <<"test_channel">>, + ?assertEqual( + ok, + emqx_resource_manager:add_channel( + ResourceId, ChannelId, ActionConfig + ) + ), + ?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)), % % Perform query as further check that the resource is working as expected - ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())), + ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))), ?assertEqual(ok, emqx_resource:stop(PoolName)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. @@ -116,7 +127,15 @@ perform_lifecycle_check(PoolName, InitialConfig) -> {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), - ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())), + ChannelId = <<"test_channel">>, + ?assertEqual( + ok, + emqx_resource_manager:add_channel( + ResourceId, ChannelId, ActionConfig + ) + ), + ?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)), + ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))), % Stop and remove the resource in one go. ?assertEqual(ok, emqx_resource:remove_local(PoolName)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), @@ -127,7 +146,7 @@ t_tls_verify_none(Config) -> PoolName = <<"testpool-1">>, Host = ?config(influxdb_tls_host, Config), Port = ?config(influxdb_tls_port, Config), - InitialConfig = influxdb_config(Host, Port, true, <<"verify_none">>), + InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_none">>), ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid), ?assertEqual(connected, ValidStatus), InvalidStatus = perform_tls_opts_check(PoolName, InitialConfig, fail), @@ -138,7 +157,7 @@ t_tls_verify_peer(Config) -> PoolName = <<"testpool-2">>, Host = ?config(influxdb_tls_host, Config), Port = ?config(influxdb_tls_port, Config), - InitialConfig = influxdb_config(Host, Port, true, <<"verify_peer">>), + InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_peer">>), %% This works without a CA-cert & friends since we are using a mock ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid), ?assertEqual(connected, ValidStatus), @@ -191,19 +210,30 @@ perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) -> % %% Helpers % %%------------------------------------------------------------------------------ -influxdb_config(Host, Port, SslEnabled, Verify) -> +influxdb_connector_config(Host, Port, SslEnabled, Verify) -> Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])), - ResourceConfig = #{ - <<"bucket">> => <<"mqtt">>, - <<"org">> => <<"emqx">>, - <<"token">> => <<"abcdefg">>, + ConnectorConf = #{ + <<"parameters">> => #{ + <<"influxdb_type">> => <<"influxdb_api_v2">>, + <<"bucket">> => <<"mqtt">>, + <<"org">> => <<"emqx">>, + <<"token">> => <<"abcdefg">> + }, <<"server">> => Server, <<"ssl">> => #{ <<"enable">> => SslEnabled, <<"verify">> => Verify } }, - #{<<"config">> => ResourceConfig}. + #{<<"config">> => ConnectorConf}. + +influxdb_action_config() -> + #{ + parameters => #{ + write_syntax => influxdb_write_syntax(), + precision => ms + } + }. custom_verify() -> fun @@ -227,8 +257,8 @@ influxdb_write_syntax() -> } ]. -test_query() -> - {send_message, #{ +test_query(ChannelId) -> + {ChannelId, #{ <<"clientid">> => <<"something">>, <<"payload">> => #{bool => true}, <<"topic">> => <<"connector_test">>, diff --git a/rel/i18n/emqx_bridge_influxdb.hocon b/rel/i18n/emqx_bridge_influxdb.hocon index 44226214f..412203cf9 100644 --- a/rel/i18n/emqx_bridge_influxdb.hocon +++ b/rel/i18n/emqx_bridge_influxdb.hocon @@ -52,6 +52,11 @@ action_parameters.label: action_parameters.desc: """Additional parameters specific to this action type""" +connector.label: +"""InfluxDB Connector""" +connector.desc: +"""InfluxDB Connector Configs""" + influxdb_action.label: """InfluxDB Action""" influxdb_action.desc: