From d5b62eead0152e5221bd667571ea7116987ddd14 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 14 Nov 2023 17:16:09 +0100 Subject: [PATCH 01/18] feat: split pgsql, matrix and timescale into connector action This commit splits the bridges pgsql, matrix and timescale into connector and action. Fixes: https://emqx.atlassian.net/browse/EMQX-11155 --- apps/emqx_bridge/src/emqx_action_info.erl | 6 +- apps/emqx_bridge/src/emqx_bridge.erl | 6 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 55 ++++- .../src/emqx_bridge_matrix.app.src | 2 +- .../src/emqx_bridge_matrix.erl | 53 ++++- .../src/emqx_bridge_matrix_action_info.erl | 22 ++ .../src/emqx_bridge_pgsql_action_info.erl | 22 ++ .../src/schema/emqx_bridge_pgsql_schema.erl | 172 +++++++++++++++ .../test/emqx_bridge_pgsql_SUITE.erl | 50 +++-- .../src/emqx_bridge_timescale.app.src | 2 +- .../src/emqx_bridge_timescale.erl | 53 ++++- .../src/emqx_bridge_timescale_action_info.erl | 22 ++ .../src/schema/emqx_connector_ee_schema.erl | 40 +++- .../src/schema/emqx_connector_schema.erl | 5 +- apps/emqx_postgresql/src/emqx_postgresql.erl | 178 ++++++++++++++-- .../emqx_postgresql_connector_schema.erl | 201 ++++++++++++++++++ 16 files changed, 832 insertions(+), 57 deletions(-) create mode 100644 apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl create mode 100644 apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl create mode 100644 apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl create mode 100644 apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl create mode 100644 apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 129142f24..74fd38811 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -78,7 +78,11 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_kafka_action_info, emqx_bridge_mongodb_action_info, - emqx_bridge_syskeeper_action_info + emqx_bridge_syskeeper_action_info, + emqx_bridge_syskeeper_action_info, + emqx_bridge_pgsql_action_info, + emqx_bridge_timescale_action_info, + emqx_bridge_matrix_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index f557210ed..9e14c0c9a 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -240,8 +240,8 @@ send_message(BridgeId, Message) -> {BridgeV1Type, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId), case emqx_bridge_v2:is_bridge_v2_type(BridgeV1Type) of true -> - BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), - emqx_bridge_v2:send_message(BridgeV2Type, BridgeName, Message, #{}); + ActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeV1Type), + emqx_bridge_v2:send_message(ActionType, BridgeName, Message, #{}); false -> ResId = emqx_bridge_resource:resource_id(BridgeV1Type, BridgeName), send_message(BridgeV1Type, BridgeName, ResId, Message, #{}) @@ -414,7 +414,7 @@ remove(BridgeType0, BridgeName) -> }), case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of true -> - emqx_bridge_v2:remove(BridgeType, BridgeName); + emqx_bridge_v2:bridge_v1_remove(BridgeType0, BridgeName); false -> remove_v1(BridgeType, BridgeName) end. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 54ccf1b24..63874d67e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -55,6 +55,7 @@ disable_enable/3, health_check/2, send_message/4, + query/4, start/2, reset_metrics/2, create_dry_run/2, @@ -116,7 +117,9 @@ bridge_v1_enable_disable/3, bridge_v1_restart/2, bridge_v1_stop/2, - bridge_v1_start/2 + bridge_v1_start/2, + %% For test cases only + bridge_v1_remove/2 ]). %%==================================================================== @@ -547,25 +550,25 @@ get_query_mode(BridgeV2Type, Config) -> ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType), emqx_resource:query_mode(ResourceType, Config, CreationOpts). --spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) -> +-spec query(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) -> term() | {error, term()}. -send_message(BridgeType, BridgeName, Message, QueryOpts0) -> +query(BridgeType, BridgeName, Message, QueryOpts0) -> case lookup_conf(BridgeType, BridgeName) of #{enable := true} = Config0 -> Config = combine_connector_and_bridge_v2_config(BridgeType, BridgeName, Config0), - do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config); + do_query_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config); #{enable := false} -> {error, bridge_stopped}; _Error -> {error, bridge_not_found} end. -do_send_msg_with_enabled_config( +do_query_with_enabled_config( _BridgeType, _BridgeName, _Message, _QueryOpts0, {error, Reason} = Error ) -> ?SLOG(error, Reason), Error; -do_send_msg_with_enabled_config( +do_query_with_enabled_config( BridgeType, BridgeName, Message, QueryOpts0, Config ) -> QueryMode = get_query_mode(BridgeType, Config), @@ -579,7 +582,17 @@ do_send_msg_with_enabled_config( } ), BridgeV2Id = id(BridgeType, BridgeName), - emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). + case Message of + {send_message, Msg} -> + emqx_resource:query(BridgeV2Id, {BridgeV2Id, Msg}, QueryOpts); + Msg -> + emqx_resource:query(BridgeV2Id, Msg, QueryOpts) + end. + +-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) -> + term() | {error, term()}. +send_message(BridgeType, BridgeName, Message, QueryOpts0) -> + query(BridgeType, BridgeName, {send_message, Message}, QueryOpts0). -spec health_check(BridgeType :: term(), BridgeName :: term()) -> #{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}. @@ -1325,6 +1338,34 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) -> } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf), create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf). +%% Only called by test cases (may create broken references) +bridge_v1_remove(BridgeV1Type, BridgeName) -> + ActionType = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), + bridge_v1_remove( + ActionType, + BridgeName, + lookup_conf(ActionType, BridgeName) + ). + +bridge_v1_remove( + ActionType, + Name, + #{connector := ConnectorName} +) -> + case remove(ActionType, Name) of + ok -> + ConnectorType = connector_type(ActionType), + emqx_connector:remove(ConnectorType, ConnectorName); + Error -> + Error + end; +bridge_v1_remove( + _ActionType, + _Name, + Error +) -> + Error. + bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), bridge_v1_check_deps_and_remove( diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src index 14aca1f75..e2a63a01e 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_matrix, [ {description, "EMQX Enterprise MatrixDB Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl index abd98adb6..acfd86ded 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl @@ -14,6 +14,12 @@ desc/1 ]). +%% Examples +-export([ + bridge_v2_examples/1, + connector_examples/1 +]). + %% ------------------------------------------------------------------------------------------------- %% api @@ -22,7 +28,7 @@ conn_bridge_examples(Method) -> #{ <<"matrix">> => #{ summary => <<"Matrix Bridge">>, - value => emqx_bridge_pgsql:values(Method, matrix) + value => emqx_bridge_pgsql_schema:values_conn_bridge_examples(Method, matrix) } } ]. @@ -35,8 +41,53 @@ roots() -> []. fields("post") -> emqx_bridge_pgsql:fields("post", matrix); +fields("config_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); +fields(action) -> + {matrix, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + #{ + desc => <<"Matrix Action Config">>, + required => false + } + )}; +fields("put_bridge_v2") -> + emqx_bridge_pgsql_schema:fields(pgsql_action); +fields("get_bridge_v2") -> + emqx_bridge_pgsql_schema:fields(pgsql_action); +fields("post_bridge_v2") -> + emqx_bridge_pgsql_schema:fields(pgsql_action); +fields("put_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); +fields("get_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); +fields("post_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); fields(Method) -> emqx_bridge_pgsql:fields(Method). desc(_) -> undefined. + +%% Examples + +connector_examples(Method) -> + [ + #{ + <<"matrix">> => #{ + summary => <<"Matrix Connector">>, + value => emqx_postgresql_connector_schema:values({Method, connector}) + } + } + ]. + +bridge_v2_examples(Method) -> + [ + #{ + <<"matrix">> => #{ + summary => <<"Matrix Action">>, + value => emqx_bridge_pgsql_schema:values({Method, matrix}) + } + } + ]. diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl new file mode 100644 index 000000000..4eae13415 --- /dev/null +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_matrix_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> matrix. + +action_type_name() -> matrix. + +connector_type_name() -> matrix. + +schema_module() -> emqx_bridge_matrix. diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl new file mode 100644 index 000000000..e353eb440 --- /dev/null +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_pgsql_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> pgsql. + +action_type_name() -> pgsql. + +connector_type_name() -> pgsql. + +schema_module() -> emqx_bridge_pgsql_schema. diff --git a/apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl b/apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl new file mode 100644 index 000000000..a3ad4bb11 --- /dev/null +++ b/apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl @@ -0,0 +1,172 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_pgsql_schema). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("emqx_postgresql/include/emqx_postgresql.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("epgsql/include/epgsql.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-export([roots/0, fields/1]). + +%% Examples +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1 +]). + +%% Exported for timescale and matrix bridges +-export([ + values/1, + values_conn_bridge_examples/2 +]). + +-define(PGSQL_HOST_OPTIONS, #{ + default_port => ?PGSQL_DEFAULT_PORT +}). + +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields("config_connector") -> + emqx_postgresql_connector_schema:fields("config_connector"); +fields(config) -> + fields("config_connector") ++ + fields(action); +fields(action) -> + {pgsql, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + #{ + desc => <<"PostgreSQL Action Config">>, + required => false + } + )}; +fields(action_parameters) -> + [ + {sql, + hoconsc:mk( + binary(), + #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>} + )} + ] ++ + emqx_connector_schema_lib:prepare_statement_fields(); +fields(pgsql_action) -> + emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters)); +%% TODO: All of these needs to be fixed +fields("put_bridge_v2") -> + fields(pgsql_action); +fields("get_bridge_v2") -> + fields(pgsql_action); +fields("post_bridge_v2") -> + fields(pgsql_action). + +default_sql() -> + << + "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) " + "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))" + >>. + +%% Examples + +bridge_v2_examples(Method) -> + [ + #{ + <<"pgsql">> => #{ + summary => <<"PostgreSQL Producer Action">>, + value => values({Method, bridge_v2_producer}) + } + } + ]. + +conn_bridge_examples(Method) -> + [ + #{ + <<"pgsql">> => #{ + summary => <<"PostgreSQL Producer Bridge">>, + value => values_conn_bridge_examples(Method, pgsql) + } + } + ]. + +values({get, PostgreSQLType}) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values({post, PostgreSQLType}) + ); +values({post, PostgreSQLType}) -> + maps:merge( + #{ + name => <<"my_pgsql_action">>, + type => PostgreSQLType + }, + values({put, PostgreSQLType}) + ); +values({put, PostgreSQLType}) -> + maps:merge( + #{ + enable => true, + connector => <<"my_pgsql_connector">>, + resource_opts => #{ + health_check_interval => "32s" + } + }, + values({producer, PostgreSQLType}) + ); +values({producer, _PostgreSQLType}) -> + #{ + <<"enable">> => true, + <<"connector">> => <<"connector_pgsql_test">>, + <<"parameters">> => #{ + <<"sql">> => + <<"INSERT INTO client_events(clientid, event, created_at) VALUES (\n ${clientid},\n ${event},\n TO_TIMESTAMP((${timestamp} :: bigint))\n)">> + }, + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"batch_time">> => <<"0ms">>, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"query_mode">> => <<"async">>, + <<"request_ttl">> => <<"45s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">>, + <<"worker_pool_size">> => 16 + } + }. + +values_conn_bridge_examples(_Method, Type) -> + #{ + enable => true, + type => Type, + name => <<"foo">>, + server => <<"127.0.0.1:5432">>, + database => <<"mqtt">>, + pool_size => 8, + username => <<"root">>, + password => <<"******">>, + sql => default_sql(), + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 8, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => async, + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + } + }. diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index 722489ba6..58aaa7d71 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -114,7 +114,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), + ok = emqx_common_test_helpers:stop_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]), ok. init_per_testcase(_Testcase, Config) -> @@ -147,7 +147,7 @@ common_init(Config0) -> ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), % Ensure enterprise bridge module is loaded - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + ok = emqx_common_test_helpers:start_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]), _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), % Connect to pgsql directly and create the table @@ -259,17 +259,16 @@ send_message(Config, Payload) -> BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), emqx_bridge:send_message(BridgeID, Payload). -query_resource(Config, Request) -> +query_resource(Config, Msg = _Request) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). + emqx_bridge_v2:query(BridgeType, Name, Msg, #{timeout => 1_000}). query_resource_sync(Config, Request) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request). + ActionId = emqx_bridge_v2:id(BridgeType, Name), + emqx_resource_buffer_worker:simple_sync_query(ActionId, Request). query_resource_async(Config, Request) -> query_resource_async(Config, Request, _Opts = #{}). @@ -279,9 +278,8 @@ query_resource_async(Config, Request, Opts) -> BridgeType = ?config(pgsql_bridge_type, Config), Ref = alias([reply]), AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end, - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), Timeout = maps:get(timeout, Opts, 500), - Return = emqx_resource:query(ResourceID, Request, #{ + Return = emqx_bridge_v2:query(BridgeType, Name, Request, #{ timeout => Timeout, async_reply_fun => {AsyncReplyFun, []} }), @@ -441,13 +439,12 @@ t_get_status(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)), + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)), emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ?assertMatch( - {ok, Status} when Status =:= disconnected orelse Status =:= connecting, - emqx_resource_manager:health_check(ResourceID) + #{status := Status} when Status =:= disconnected orelse Status =:= connecting, + emqx_bridge_v2:health_check(BridgeType, Name) ) end), ok. @@ -655,7 +652,7 @@ t_nasty_sql_string(Config) -> t_missing_table(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + % ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?check_trace( begin @@ -665,21 +662,20 @@ t_missing_table(Config) -> _Sleep = 1_000, _Attempts = 20, ?assertMatch( - {ok, Status} when Status == connecting orelse Status == disconnected, - emqx_resource_manager:health_check(ResourceID) + #{status := Status} when Status == connecting orelse Status == disconnected, + emqx_bridge_v2:health_check(BridgeType, Name) ) ), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, - Timeout = 1000, ?assertMatch( {error, {resource_error, #{reason := unhealthy_target}}}, - query_resource(Config, {send_message, SentData, [], Timeout}) + query_resource(Config, {send_message, SentData}) ), ok end, fun(Trace) -> - ?assertMatch([_], ?of_kind(pgsql_undefined_table, Trace)), + ?assertMatch([_ | _], ?of_kind(pgsql_undefined_table, Trace)), ok end ), @@ -689,7 +685,7 @@ t_missing_table(Config) -> t_table_removed(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + %%ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?check_trace( begin connect_and_create_table(Config), @@ -697,13 +693,14 @@ t_table_removed(Config) -> ?retry( _Sleep = 1_000, _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)) ), connect_and_drop_table(Config), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, - case query_resource_sync(Config, {send_message, SentData, []}) of - {error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}} -> + ActionId = emqx_bridge_v2:id(BridgeType, Name), + case query_resource_sync(Config, {ActionId, SentData}) of + {error, {unrecoverable_error, _}} -> ok; ?RESOURCE_ERROR_M(not_connected, _) -> ok; @@ -720,7 +717,6 @@ t_table_removed(Config) -> t_concurrent_health_checks(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?check_trace( begin connect_and_create_table(Config), @@ -728,11 +724,13 @@ t_concurrent_health_checks(Config) -> ?retry( _Sleep = 1_000, _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)) ), emqx_utils:pmap( fun(_) -> - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ?assertMatch( + #{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name) + ) end, lists:seq(1, 20) ), diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src index adb024591..4842f5c93 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_timescale, [ {description, "EMQX Enterprise TimescaleDB Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource]}, {env, []}, diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl index c4dedf07c..edeef26d4 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl @@ -14,6 +14,12 @@ desc/1 ]). +%% Examples +-export([ + bridge_v2_examples/1, + connector_examples/1 +]). + %% ------------------------------------------------------------------------------------------------- %% api @@ -22,7 +28,7 @@ conn_bridge_examples(Method) -> #{ <<"timescale">> => #{ summary => <<"Timescale Bridge">>, - value => emqx_bridge_pgsql:values(Method, timescale) + value => emqx_bridge_pgsql_schema:values_conn_bridge_examples(Method, timescale) } } ]. @@ -35,8 +41,53 @@ roots() -> []. fields("post") -> emqx_bridge_pgsql:fields("post", timescale); +fields("config_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); +fields(action) -> + {timescale, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + #{ + desc => <<"Timescale Action Config">>, + required => false + } + )}; +fields("put_bridge_v2") -> + emqx_bridge_pgsql_schema:fields(pgsql_action); +fields("get_bridge_v2") -> + emqx_bridge_pgsql_schema:fields(pgsql_action); +fields("post_bridge_v2") -> + emqx_bridge_pgsql_schema:fields(pgsql_action); +fields("put_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); +fields("get_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); +fields("post_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); fields(Method) -> emqx_bridge_pgsql:fields(Method). desc(_) -> undefined. + +%% Examples + +connector_examples(Method) -> + [ + #{ + <<"timescale">> => #{ + summary => <<"Timescale Connector">>, + value => emqx_postgresql_connector_schema:values({Method, connector}) + } + } + ]. + +bridge_v2_examples(Method) -> + [ + #{ + <<"timescale">> => #{ + summary => <<"Timescale Action">>, + value => emqx_bridge_pgsql_schema:values({Method, timescale}) + } + } + ]. diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl new file mode 100644 index 000000000..fff74b578 --- /dev/null +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_timescale_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> timescale. + +action_type_name() -> timescale. + +connector_type_name() -> timescale. + +schema_module() -> emqx_bridge_timescale. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 535917e4e..1ffe306e9 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -35,6 +35,12 @@ resource_type(syskeeper_forwarder) -> emqx_bridge_syskeeper_connector; resource_type(syskeeper_proxy) -> emqx_bridge_syskeeper_proxy_server; +resource_type(pgsql) -> + emqx_postgresql; +resource_type(timescale) -> + emqx_postgresql; +resource_type(matrix) -> + emqx_postgresql; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -108,6 +114,30 @@ connector_structs() -> desc => <<"Syskeeper Proxy Connector Config">>, required => false } + )}, + {pgsql, + mk( + hoconsc:map(name, ref(emqx_postgresql_connector_schema, "config_connector")), + #{ + desc => <<"PostgreSQL Connector Config">>, + required => false + } + )}, + {timescale, + mk( + hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")), + #{ + desc => <<"Timescale Connector Config">>, + required => false + } + )}, + {matrix, + mk( + hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")), + #{ + desc => <<"Matrix Connector Config">>, + required => false + } )} ]. @@ -131,7 +161,10 @@ schema_modules() -> emqx_bridge_kafka, emqx_bridge_mongodb, emqx_bridge_syskeeper_connector, - emqx_bridge_syskeeper_proxy + emqx_bridge_syskeeper_proxy, + emqx_postgresql_connector_schema, + emqx_bridge_timescale, + emqx_bridge_matrix ]. api_schemas(Method) -> @@ -152,7 +185,10 @@ api_schemas(Method) -> api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), - api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method) + api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), + api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), + api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), + api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector") ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 765a693e2..cd99f0fe6 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -72,7 +72,10 @@ connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_p connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; -connector_type_to_bridge_types(syskeeper_proxy) -> []. +connector_type_to_bridge_types(syskeeper_proxy) -> []; +connector_type_to_bridge_types(pgsql) -> [pgsql]; +connector_type_to_bridge_types(timescale) -> [timescale]; +connector_type_to_bridge_types(matrix) -> [matrix]. actions_config_name() -> <<"actions">>. diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index ba1ad4be5..68fa1da00 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -34,7 +34,11 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). -export([connect/1]). @@ -136,10 +140,11 @@ on_start( {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {pool_size, PoolSize} ], - State = parse_prepare_sql(Config), + State1 = parse_prepare_sql(Config, <<"send_message">>), + State2 = State1#{installed_channels => #{}}, case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> - {ok, init_prepare(State#{pool_name => InstId, prepares => #{}})}; + {ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})}; {error, Reason} -> ?tp( pgsql_connector_start_failed, @@ -148,13 +153,140 @@ on_start( {error, Reason} end. -on_stop(InstId, _State) -> +on_stop(InstId, State) -> ?SLOG(info, #{ msg => "stopping_postgresql_connector", connector => InstId }), + close_connections(State), emqx_resource_pool:stop(InstId). +close_connections(#{pool_name := PoolName} = _State) -> + WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + close_connections_with_worker_pids(WorkerPids), + ok. + +close_connections_with_worker_pids([WorkerPid | Rest]) -> + %% We ignore errors since any error probably means that the + %% connection is closed already. + try ecpool_worker:client(WorkerPid) of + {ok, Conn} -> + _ = epgsql:close(Conn), + close_connections_with_worker_pids(Rest); + _ -> + close_connections_with_worker_pids(Rest) + catch + _:_ -> + close_connections_with_worker_pids(Rest) + end; +close_connections_with_worker_pids([]) -> + ok. + +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId, + ChannelConfig +) -> + %% The following will throw an exception if the bridge producers fails to start + {ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig), + case ChannelState of + #{prepares := {error, Reason}} -> + {error, {unhealthy_target, Reason}}; + _ -> + NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState} + end. + +create_channel_state( + ChannelId, + #{pool_name := PoolName} = _ConnectorState, + #{parameters := Parameters} = _ChannelConfig +) -> + State1 = parse_prepare_sql(Parameters, ChannelId), + {ok, + init_prepare(State1#{ + pool_name => PoolName, + prepare_statement => #{} + })}. + +on_remove_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId +) -> + %% Close prepared statements + ok = close_prepared_statement(ChannelId, OldState), + NewInstalledChannels = maps:remove(ChannelId, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) -> + WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + close_prepared_statement(WorkerPids, ChannelId, State), + ok. + +close_prepared_statement([WorkerPid | Rest], ChannelId, State) -> + %% We ignore errors since any error probably means that the + %% prepared statement doesn't exist. + try ecpool_worker:client(WorkerPid) of + {ok, Conn} -> + Statement = get_prepared_statement(ChannelId, State), + _ = epgsql:close(Conn, Statement), + close_prepared_statement(Rest, ChannelId, State); + _ -> + close_prepared_statement(Rest, ChannelId, State) + catch + _:_ -> + close_prepared_statement(Rest, ChannelId, State) + end; +close_prepared_statement([], _ChannelId, _State) -> + ok. + +on_get_channel_status( + _ResId, + ChannelId, + #{ + pool_name := PoolName, + installed_channels := Channels + } = _State +) -> + ChannelState = maps:get(ChannelId, Channels), + case + do_check_channel_sql( + PoolName, + ChannelId, + ChannelState + ) + of + ok -> + connected; + {error, undefined_table} -> + {error, {unhealthy_target, <<"Table does not exist">>}}; + {error, _Reason} -> + %% do not log error, it is logged in prepare_sql_to_conn + connecting + end. + +do_check_channel_sql( + PoolName, + ChannelId, + #{query_templates := ChannelQueryTemplates} = _ChannelState +) -> + {SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates), + WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + validate_table_existence(WorkerPids, SQL). + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + on_query(InstId, {TypeOrKey, NameOrSQL}, State) -> on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); on_query( @@ -187,10 +319,10 @@ pgsql_query_type(_) -> on_batch_query( InstId, [{Key, _} = Request | _] = BatchReq, - #{pool_name := PoolName, query_templates := Templates, prepares := PrepStatements} = State + #{pool_name := PoolName} = State ) -> BinKey = to_bin(Key), - case maps:get(BinKey, Templates, undefined) of + case get_template(BinKey, State) of undefined -> Log = #{ connector => InstId, @@ -201,7 +333,7 @@ on_batch_query( ?SLOG(error, Log), {error, {unrecoverable_error, batch_prepare_not_implemented}}; {_Statement, RowTemplate} -> - PrepStatement = maps:get(BinKey, PrepStatements), + PrepStatement = get_prepared_statement(BinKey, State), Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq], case on_sql_query(InstId, PoolName, execute_batch, PrepStatement, Rows) of {error, _Error} = Result -> @@ -223,15 +355,35 @@ proc_sql_params(query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; -proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) -> - Key = to_bin(TypeOrKey), - case maps:get(Key, Templates, undefined) of +proc_sql_params(TypeOrKey, SQLOrData, Params, State) -> + BinKey = to_bin(TypeOrKey), + case get_template(BinKey, State) of undefined -> {SQLOrData, Params}; {_Statement, RowTemplate} -> - {Key, render_prepare_sql_row(RowTemplate, SQLOrData)} + {BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)} end. +get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) -> + BinKey = to_bin(Key), + ChannelState = maps:get(BinKey, Channels), + ChannelQueryTemplates = maps:get(query_templates, ChannelState), + maps:get(BinKey, ChannelQueryTemplates); +get_template(Key, #{query_templates := Templates}) -> + BinKey = to_bin(Key), + maps:get(BinKey, Templates, undefined). + +get_prepared_statement(Key, #{installed_channels := Channels} = _State) when + is_map_key(Key, Channels) +-> + BinKey = to_bin(Key), + ChannelState = maps:get(BinKey, Channels), + ChannelPreparedStatements = maps:get(prepares, ChannelState), + maps:get(BinKey, ChannelPreparedStatements); +get_prepared_statement(Key, #{prepares := PrepStatements}) -> + BinKey = to_bin(Key), + maps:get(BinKey, PrepStatements). + on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of {error, Reason} = Result -> @@ -415,13 +567,13 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) -> conn_opts([_Opt | Opts], Acc) -> conn_opts(Opts, Acc). -parse_prepare_sql(Config) -> +parse_prepare_sql(Config, SQLID) -> Queries = case Config of #{prepare_statement := Qs} -> Qs; #{sql := Query} -> - #{<<"send_message">> => Query}; + #{SQLID => Query}; #{} -> #{} end, diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl new file mode 100644 index 000000000..d709f87c3 --- /dev/null +++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl @@ -0,0 +1,201 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_postgresql_connector_schema). + +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_postgresql/include/emqx_postgresql.hrl"). + +-define(PGSQL_HOST_OPTIONS, #{ + default_port => ?PGSQL_DEFAULT_PORT +}). + +-export([ + roots/0, + fields/1 +]). + +%% Examples +-export([ + connector_examples/1, + values/1 +]). + +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields("config_connector") -> + [{server, server()}] ++ + adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++ + emqx_connector_schema_lib:ssl_fields(); +fields(config) -> + fields("config_connector") ++ + fields(action); +fields(action) -> + {pgsql, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + #{ + desc => <<"PostgreSQL Action Config">>, + required => false + } + )}; +fields(pgsql_action) -> + emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters)); +%% TODO: All of these needs to be fixed +fields("put_bridge_v2") -> + fields(pgsql_action); +fields("get_bridge_v2") -> + fields(pgsql_action); +fields("post_bridge_v2") -> + fields(pgsql_action); +fields("put_connector") -> + fields("config_connector"); +fields("get_connector") -> + fields("config_connector"); +fields("post_connector") -> + fields("config_connector"). + +server() -> + Meta = #{desc => ?DESC("server")}, + emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS). + +adjust_fields(Fields) -> + lists:map( + fun + ({username, Sc}) -> + %% to please dialyzer... + Override = #{type => hocon_schema:field_schema(Sc, type), required => true}, + {username, hocon_schema:override(Sc, Override)}; + (Field) -> + Field + end, + Fields + ). + +%% Examples +connector_examples(Method) -> + [ + #{ + <<"pgsql">> => #{ + summary => <<"PostgreSQL Producer Connector">>, + value => values({Method, connector}) + } + } + ]. + +%% TODO: All of these needs to be adjusted from Kafka to PostgreSQL +values({get, PostgreSQLType}) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values({post, PostgreSQLType}) + ); +values({post, connector}) -> + maps:merge( + #{ + name => <<"my_pgsql_connector">>, + type => <<"pgsql">> + }, + values(common_config) + ); +values({post, PostgreSQLType}) -> + maps:merge( + #{ + name => <<"my_pgsql_action">>, + type => <<"pgsql">> + }, + values({put, PostgreSQLType}) + ); +values({put, bridge_v2_producer}) -> + values(bridge_v2_producer); +values({put, connector}) -> + values(common_config); +values({put, PostgreSQLType}) -> + maps:merge(values(common_config), values(PostgreSQLType)); +values(bridge_v2_producer) -> + maps:merge( + #{ + enable => true, + connector => <<"my_pgsql_connector">>, + resource_opts => #{ + health_check_interval => "32s" + } + }, + values(producer) + ); +values(common_config) -> + #{ + authentication => #{ + mechanism => <<"plain">>, + username => <<"username">>, + password => <<"******">> + }, + bootstrap_hosts => <<"localhost:9092">>, + connect_timeout => <<"5s">>, + enable => true, + metadata_request_timeout => <<"4s">>, + min_metadata_refresh_interval => <<"3s">>, + socket_opts => #{ + sndbuf => <<"1024KB">>, + recbuf => <<"1024KB">>, + nodelay => true, + tcp_keepalive => <<"none">> + } + }; +values(producer) -> + #{ + kafka => #{ + topic => <<"kafka-topic">>, + message => #{ + key => <<"${.clientid}">>, + value => <<"${.}">>, + timestamp => <<"${.timestamp}">> + }, + max_batch_bytes => <<"896KB">>, + compression => <<"no_compression">>, + partition_strategy => <<"random">>, + required_acks => <<"all_isr">>, + partition_count_refresh_interval => <<"60s">>, + kafka_headers => <<"${pub_props}">>, + kafka_ext_headers => [ + #{ + kafka_ext_header_key => <<"clientid">>, + kafka_ext_header_value => <<"${clientid}">> + }, + #{ + kafka_ext_header_key => <<"topic">>, + kafka_ext_header_value => <<"${topic}">> + } + ], + kafka_header_value_encode_mode => none, + max_inflight => 10, + buffer => #{ + mode => <<"hybrid">>, + per_partition_limit => <<"2GB">>, + segment_bytes => <<"100MB">>, + memory_overload_protection => true + } + }, + local_topic => <<"mqtt/local/topic">> + }. From dbe73c70b12120ab3c68dbdfcda7b98760206598 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 23 Nov 2023 17:29:57 +0100 Subject: [PATCH 02/18] fix: dialyzer problem --- apps/emqx_postgresql/src/emqx_postgresql.erl | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index 68fa1da00..ce62fa30b 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -269,10 +269,7 @@ on_get_channel_status( ok -> connected; {error, undefined_table} -> - {error, {unhealthy_target, <<"Table does not exist">>}}; - {error, _Reason} -> - %% do not log error, it is logged in prepare_sql_to_conn - connecting + {error, {unhealthy_target, <<"Table does not exist">>}} end. do_check_channel_sql( From 2e3028a8f8800d3cacd67a8055eddd28f98a9fa6 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 23 Nov 2023 17:41:28 +0100 Subject: [PATCH 03/18] fix(emqx_postgresql): fix lifecycle test --- apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl b/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl index 5a93a0578..1b41e2dd0 100644 --- a/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl +++ b/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl @@ -61,13 +61,15 @@ end_per_testcase(_, _Config) -> t_lifecycle(_Config) -> perform_lifecycle_check( - <<"emqx_postgresql_SUITE">>, + <<"connector:pgsql:emqx_postgresql_SUITE">>, pgsql_config() ). perform_lifecycle_check(ResourceId, InitialConfig) -> + x:show(initial_config, InitialConfig), {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?PGSQL_RESOURCE_MOD, InitialConfig), + x:show(check_config_ok, CheckedConfig), {ok, #{ state := #{pool_name := PoolName} = State, status := InitialStatus From b8f510d956469775d2278646348d3488949b8e10 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 23 Nov 2023 18:02:54 +0100 Subject: [PATCH 04/18] fix: add env variables about action info --- apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src | 2 +- apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src | 2 +- apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src index e2a63a01e..9175e5d4e 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src @@ -7,7 +7,7 @@ stdlib, emqx_resource ]}, - {env, []}, + {env, [{emqx_action_info_module, emqx_bridge_matrix_action_info}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src index 7a17652e0..58d093f6f 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src @@ -8,7 +8,7 @@ emqx_resource, emqx_postgresql ]}, - {env, []}, + {env, [{emqx_action_info_module, emqx_bridge_pgsql_action_info}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src index 4842f5c93..53302a21f 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src @@ -3,6 +3,7 @@ {vsn, "0.1.3"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource]}, + {env, [{emqx_action_info_module, emqx_bridge_timescale_action_info}]}, {env, []}, {modules, []}, {links, []} From 64c015cf6fe9a49e81334846b0d85f2085687eb8 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 23 Nov 2023 18:50:19 +0100 Subject: [PATCH 05/18] fix: unify the schema modules in emqx_bridge_pgsql --- .../src/emqx_bridge_matrix.erl | 20 +- .../src/emqx_bridge_pgsql.erl | 229 +++++++++++++----- .../src/emqx_bridge_pgsql_action_info.erl | 2 +- .../src/schema/emqx_bridge_pgsql_schema.erl | 172 ------------- .../src/emqx_bridge_timescale.erl | 20 +- .../emqx_postgresql_connector_schema.erl | 2 +- 6 files changed, 193 insertions(+), 252 deletions(-) delete mode 100644 apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl index acfd86ded..78810bc9e 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl @@ -28,7 +28,7 @@ conn_bridge_examples(Method) -> #{ <<"matrix">> => #{ summary => <<"Matrix Bridge">>, - value => emqx_bridge_pgsql_schema:values_conn_bridge_examples(Method, matrix) + value => emqx_bridge_pgsql:values_conn_bridge_examples(Method, matrix) } } ]. @@ -42,28 +42,28 @@ roots() -> []. fields("post") -> emqx_bridge_pgsql:fields("post", matrix); fields("config_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields(action) -> {matrix, hoconsc:mk( - hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)), #{ desc => <<"Matrix Action Config">>, required => false } )}; fields("put_bridge_v2") -> - emqx_bridge_pgsql_schema:fields(pgsql_action); + emqx_bridge_pgsql:fields(pgsql_action); fields("get_bridge_v2") -> - emqx_bridge_pgsql_schema:fields(pgsql_action); + emqx_bridge_pgsql:fields(pgsql_action); fields("post_bridge_v2") -> - emqx_bridge_pgsql_schema:fields(pgsql_action); + emqx_bridge_pgsql:fields(pgsql_action); fields("put_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields("get_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields("post_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields(Method) -> emqx_bridge_pgsql:fields(Method). @@ -87,7 +87,7 @@ bridge_v2_examples(Method) -> #{ <<"matrix">> => #{ summary => <<"Matrix Action">>, - value => emqx_bridge_pgsql_schema:values({Method, matrix}) + value => emqx_bridge_pgsql:values({Method, matrix}) } } ]. diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl index bb15dfad9..fc0680aba 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl @@ -1,83 +1,91 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- + -module(emqx_bridge_pgsql). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("emqx_postgresql/include/emqx_postgresql.hrl"). -include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include_lib("epgsql/include/epgsql.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). --import(hoconsc, [mk/2, enum/1, ref/2]). - --export([ - conn_bridge_examples/1, - values/2, - fields/2 -]). - -export([ namespace/0, roots/0, fields/1, - desc/1 + desc/1, + values/2, + fields/2 ]). --define(DEFAULT_SQL, << - "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) " - "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))" ->>). +%% Examples +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1 +]). -%% ------------------------------------------------------------------------------------------------- -%% api +%% Exported for timescale and matrix bridges +-export([ + values/1, + values_conn_bridge_examples/2 +]). -conn_bridge_examples(Method) -> - [ - #{ - <<"pgsql">> => #{ - summary => <<"PostgreSQL Bridge">>, - value => values(Method, pgsql) - } - } - ]. +-define(PGSQL_HOST_OPTIONS, #{ + default_port => ?PGSQL_DEFAULT_PORT +}). -values(_Method, Type) -> - #{ - enable => true, - type => Type, - name => <<"foo">>, - server => <<"127.0.0.1:5432">>, - database => <<"mqtt">>, - pool_size => 8, - username => <<"root">>, - password => <<"******">>, - sql => ?DEFAULT_SQL, - local_topic => <<"local/topic/#">>, - resource_opts => #{ - worker_pool_size => 8, - health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - batch_size => ?DEFAULT_BATCH_SIZE, - batch_time => ?DEFAULT_BATCH_TIME, - query_mode => async, - max_buffer_bytes => ?DEFAULT_BUFFER_BYTES - } - }. - -%% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions namespace() -> "bridge_pgsql". -roots() -> []. +roots() -> + []. +fields("config_connector") -> + emqx_postgresql_connector_schema:fields("config_connector"); +fields(config) -> + fields("config_connector") ++ + fields(action); +fields(action) -> + {pgsql, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)), + #{ + desc => <<"PostgreSQL Action Config">>, + required => false + } + )}; +fields(action_parameters) -> + [ + {sql, + hoconsc:mk( + binary(), + #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>} + )} + ] ++ + emqx_connector_schema_lib:prepare_statement_fields(); +fields(pgsql_action) -> + emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters)); +%% TODO: All of these needs to be fixed +fields("put_bridge_v2") -> + fields(pgsql_action); +fields("get_bridge_v2") -> + fields(pgsql_action); +fields("post_bridge_v2") -> + fields(pgsql_action); fields("config") -> [ - {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {sql, - mk( + hoconsc:mk( binary(), - #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>} )}, {local_topic, - mk( + hoconsc:mk( binary(), #{desc => ?DESC("local_topic"), default => undefined} )} @@ -94,6 +102,12 @@ fields("get") -> fields("post", Type) -> [type_field(Type), name_field() | fields("config")]. +type_field(Type) -> + {type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. + desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> @@ -101,10 +115,109 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(_) -> undefined. -%% ------------------------------------------------------------------------------------------------- +default_sql() -> + << + "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) " + "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))" + >>. -type_field(Type) -> - {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}. +%% Examples -name_field() -> - {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. +bridge_v2_examples(Method) -> + [ + #{ + <<"pgsql">> => #{ + summary => <<"PostgreSQL Producer Action">>, + value => values({Method, bridge_v2_producer}) + } + } + ]. + +conn_bridge_examples(Method) -> + [ + #{ + <<"pgsql">> => #{ + summary => <<"PostgreSQL Producer Bridge">>, + value => values_conn_bridge_examples(Method, pgsql) + } + } + ]. + +values({get, PostgreSQLType}) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values({post, PostgreSQLType}) + ); +values({post, PostgreSQLType}) -> + maps:merge( + #{ + name => <<"my_pgsql_action">>, + type => PostgreSQLType + }, + values({put, PostgreSQLType}) + ); +values({put, PostgreSQLType}) -> + maps:merge( + #{ + enable => true, + connector => <<"my_pgsql_connector">>, + resource_opts => #{ + health_check_interval => "32s" + } + }, + values({producer, PostgreSQLType}) + ); +values({producer, _PostgreSQLType}) -> + #{ + <<"enable">> => true, + <<"connector">> => <<"connector_pgsql_test">>, + <<"parameters">> => #{ + <<"sql">> => + <<"INSERT INTO client_events(clientid, event, created_at) VALUES (\n ${clientid},\n ${event},\n TO_TIMESTAMP((${timestamp} :: bigint))\n)">> + }, + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"batch_time">> => <<"0ms">>, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"query_mode">> => <<"async">>, + <<"request_ttl">> => <<"45s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">>, + <<"worker_pool_size">> => 16 + } + }. + +values_conn_bridge_examples(_Method, Type) -> + #{ + enable => true, + type => Type, + name => <<"foo">>, + server => <<"127.0.0.1:5432">>, + database => <<"mqtt">>, + pool_size => 8, + username => <<"root">>, + password => <<"******">>, + sql => default_sql(), + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 8, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => async, + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + } + }. + +values(Method, Type) -> + values_conn_bridge_examples(Method, Type). diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl index e353eb440..c702b396b 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl @@ -19,4 +19,4 @@ action_type_name() -> pgsql. connector_type_name() -> pgsql. -schema_module() -> emqx_bridge_pgsql_schema. +schema_module() -> emqx_bridge_pgsql. diff --git a/apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl b/apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl deleted file mode 100644 index a3ad4bb11..000000000 --- a/apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl +++ /dev/null @@ -1,172 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_bridge_pgsql_schema). - --include_lib("emqx_connector/include/emqx_connector.hrl"). --include_lib("emqx_postgresql/include/emqx_postgresql.hrl"). --include_lib("typerefl/include/types.hrl"). --include_lib("emqx/include/logger.hrl"). --include_lib("hocon/include/hoconsc.hrl"). --include_lib("epgsql/include/epgsql.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). --include_lib("emqx_resource/include/emqx_resource.hrl"). - --export([roots/0, fields/1]). - -%% Examples --export([ - bridge_v2_examples/1, - conn_bridge_examples/1 -]). - -%% Exported for timescale and matrix bridges --export([ - values/1, - values_conn_bridge_examples/2 -]). - --define(PGSQL_HOST_OPTIONS, #{ - default_port => ?PGSQL_DEFAULT_PORT -}). - -roots() -> - [{config, #{type => hoconsc:ref(?MODULE, config)}}]. - -fields("config_connector") -> - emqx_postgresql_connector_schema:fields("config_connector"); -fields(config) -> - fields("config_connector") ++ - fields(action); -fields(action) -> - {pgsql, - hoconsc:mk( - hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), - #{ - desc => <<"PostgreSQL Action Config">>, - required => false - } - )}; -fields(action_parameters) -> - [ - {sql, - hoconsc:mk( - binary(), - #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>} - )} - ] ++ - emqx_connector_schema_lib:prepare_statement_fields(); -fields(pgsql_action) -> - emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters)); -%% TODO: All of these needs to be fixed -fields("put_bridge_v2") -> - fields(pgsql_action); -fields("get_bridge_v2") -> - fields(pgsql_action); -fields("post_bridge_v2") -> - fields(pgsql_action). - -default_sql() -> - << - "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) " - "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))" - >>. - -%% Examples - -bridge_v2_examples(Method) -> - [ - #{ - <<"pgsql">> => #{ - summary => <<"PostgreSQL Producer Action">>, - value => values({Method, bridge_v2_producer}) - } - } - ]. - -conn_bridge_examples(Method) -> - [ - #{ - <<"pgsql">> => #{ - summary => <<"PostgreSQL Producer Bridge">>, - value => values_conn_bridge_examples(Method, pgsql) - } - } - ]. - -values({get, PostgreSQLType}) -> - maps:merge( - #{ - status => <<"connected">>, - node_status => [ - #{ - node => <<"emqx@localhost">>, - status => <<"connected">> - } - ] - }, - values({post, PostgreSQLType}) - ); -values({post, PostgreSQLType}) -> - maps:merge( - #{ - name => <<"my_pgsql_action">>, - type => PostgreSQLType - }, - values({put, PostgreSQLType}) - ); -values({put, PostgreSQLType}) -> - maps:merge( - #{ - enable => true, - connector => <<"my_pgsql_connector">>, - resource_opts => #{ - health_check_interval => "32s" - } - }, - values({producer, PostgreSQLType}) - ); -values({producer, _PostgreSQLType}) -> - #{ - <<"enable">> => true, - <<"connector">> => <<"connector_pgsql_test">>, - <<"parameters">> => #{ - <<"sql">> => - <<"INSERT INTO client_events(clientid, event, created_at) VALUES (\n ${clientid},\n ${event},\n TO_TIMESTAMP((${timestamp} :: bigint))\n)">> - }, - <<"resource_opts">> => #{ - <<"batch_size">> => 1, - <<"batch_time">> => <<"0ms">>, - <<"health_check_interval">> => <<"15s">>, - <<"inflight_window">> => 100, - <<"max_buffer_bytes">> => <<"256MB">>, - <<"query_mode">> => <<"async">>, - <<"request_ttl">> => <<"45s">>, - <<"start_after_created">> => true, - <<"start_timeout">> => <<"5s">>, - <<"worker_pool_size">> => 16 - } - }. - -values_conn_bridge_examples(_Method, Type) -> - #{ - enable => true, - type => Type, - name => <<"foo">>, - server => <<"127.0.0.1:5432">>, - database => <<"mqtt">>, - pool_size => 8, - username => <<"root">>, - password => <<"******">>, - sql => default_sql(), - local_topic => <<"local/topic/#">>, - resource_opts => #{ - worker_pool_size => 8, - health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - batch_size => ?DEFAULT_BATCH_SIZE, - batch_time => ?DEFAULT_BATCH_TIME, - query_mode => async, - max_buffer_bytes => ?DEFAULT_BUFFER_BYTES - } - }. diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl index edeef26d4..9cefabc15 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl @@ -28,7 +28,7 @@ conn_bridge_examples(Method) -> #{ <<"timescale">> => #{ summary => <<"Timescale Bridge">>, - value => emqx_bridge_pgsql_schema:values_conn_bridge_examples(Method, timescale) + value => emqx_bridge_pgsql:values_conn_bridge_examples(Method, timescale) } } ]. @@ -42,28 +42,28 @@ roots() -> []. fields("post") -> emqx_bridge_pgsql:fields("post", timescale); fields("config_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields(action) -> {timescale, hoconsc:mk( - hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)), #{ desc => <<"Timescale Action Config">>, required => false } )}; fields("put_bridge_v2") -> - emqx_bridge_pgsql_schema:fields(pgsql_action); + emqx_bridge_pgsql:fields(pgsql_action); fields("get_bridge_v2") -> - emqx_bridge_pgsql_schema:fields(pgsql_action); + emqx_bridge_pgsql:fields(pgsql_action); fields("post_bridge_v2") -> - emqx_bridge_pgsql_schema:fields(pgsql_action); + emqx_bridge_pgsql:fields(pgsql_action); fields("put_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields("get_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields("post_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields(Method) -> emqx_bridge_pgsql:fields(Method). @@ -87,7 +87,7 @@ bridge_v2_examples(Method) -> #{ <<"timescale">> => #{ summary => <<"Timescale Action">>, - value => emqx_bridge_pgsql_schema:values({Method, timescale}) + value => emqx_bridge_pgsql:values({Method, timescale}) } } ]. diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl index d709f87c3..1d6949856 100644 --- a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl +++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl @@ -47,7 +47,7 @@ fields(config) -> fields(action) -> {pgsql, hoconsc:mk( - hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)), #{ desc => <<"PostgreSQL Action Config">>, required => false From f7296d549f9faf8ca28a68be8d50b0395d4b4278 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 23 Nov 2023 18:58:01 +0100 Subject: [PATCH 06/18] fix: elvis problem --- apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl index fc0680aba..c32504124 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl @@ -181,7 +181,14 @@ values({producer, _PostgreSQLType}) -> <<"connector">> => <<"connector_pgsql_test">>, <<"parameters">> => #{ <<"sql">> => - <<"INSERT INTO client_events(clientid, event, created_at) VALUES (\n ${clientid},\n ${event},\n TO_TIMESTAMP((${timestamp} :: bigint))\n)">> + << + "INSERT INTO client_events(clientid, event, created_at)" + "VALUES (\n" + " ${clientid},\n" + " ${event},\n" + " TO_TIMESTAMP((${timestamp} :: bigint))\n" + ")" + >> }, <<"resource_opts">> => #{ <<"batch_size">> => 1, From 6ef9c6fe4ae5ad31db4ce3f6509e8d3e8176d4fa Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 23 Nov 2023 19:15:02 +0100 Subject: [PATCH 07/18] fix: ops --- apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl | 2 -- 1 file changed, 2 deletions(-) diff --git a/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl b/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl index 1b41e2dd0..c7aee3019 100644 --- a/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl +++ b/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl @@ -66,10 +66,8 @@ t_lifecycle(_Config) -> ). perform_lifecycle_check(ResourceId, InitialConfig) -> - x:show(initial_config, InitialConfig), {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?PGSQL_RESOURCE_MOD, InitialConfig), - x:show(check_config_ok, CheckedConfig), {ok, #{ state := #{pool_name := PoolName} = State, status := InitialStatus From c5e281b84b3dd2a5fed2120c54e894778d3871c8 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 24 Nov 2023 11:01:44 +0100 Subject: [PATCH 08/18] fix: emqx_auth_postgresql test suites --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 29 ++++++++++++------- .../test/emqx_postgresql_SUITE.erl | 2 +- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 63874d67e..1863ed84b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -798,17 +798,24 @@ parse_id(Id) -> end. get_channels_for_connector(ConnectorId) -> - {ConnectorType, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), - RootConf = maps:keys(emqx:get_config([?ROOT_KEY], #{})), - RelevantBridgeV2Types = [ - Type - || Type <- RootConf, - connector_type(Type) =:= ConnectorType - ], - lists:flatten([ - get_channels_for_connector(ConnectorName, BridgeV2Type) - || BridgeV2Type <- RelevantBridgeV2Types - ]). + try emqx_connector_resource:parse_connector_id(ConnectorId) of + {ConnectorType, ConnectorName} -> + RootConf = maps:keys(emqx:get_config([?ROOT_KEY], #{})), + RelevantBridgeV2Types = [ + Type + || Type <- RootConf, + connector_type(Type) =:= ConnectorType + ], + lists:flatten([ + get_channels_for_connector(ConnectorName, BridgeV2Type) + || BridgeV2Type <- RelevantBridgeV2Types + ]) + catch + _:_ -> + %% ConnectorId is not a valid connector id so we assume the connector + %% has no channels (e.g. it is a a connector for authn or authz) + [] + end. get_channels_for_connector(ConnectorName, BridgeV2Type) -> BridgeV2s = emqx:get_config([?ROOT_KEY, BridgeV2Type], #{}), diff --git a/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl b/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl index c7aee3019..5a93a0578 100644 --- a/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl +++ b/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl @@ -61,7 +61,7 @@ end_per_testcase(_, _Config) -> t_lifecycle(_Config) -> perform_lifecycle_check( - <<"connector:pgsql:emqx_postgresql_SUITE">>, + <<"emqx_postgresql_SUITE">>, pgsql_config() ). From 3dca83c854e7867ba02ffde72f6926c8166ab30c Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 24 Nov 2023 15:20:07 +0100 Subject: [PATCH 09/18] fix: all missing descriptions --- .../src/emqx_bridge_matrix.erl | 4 ++++ .../src/emqx_bridge_pgsql.erl | 6 ++++++ .../src/emqx_bridge_timescale.erl | 4 ++++ .../src/schema/emqx_connector_ee_schema.erl | 2 +- .../emqx_postgresql_connector_schema.erl | 10 ++++++++-- rel/i18n/emqx_bridge_pgsql.hocon | 13 +++++++++++++ rel/i18n/emqx_postgresql.hocon | 6 ++++++ .../emqx_postgresql_connector_schema.hocon | 18 ++++++++++++++++++ 8 files changed, 60 insertions(+), 3 deletions(-) create mode 100644 rel/i18n/emqx_postgresql_connector_schema.hocon diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl index 78810bc9e..fb8ee9d4d 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_matrix). +-include_lib("hocon/include/hoconsc.hrl"). + -export([ conn_bridge_examples/1 ]). @@ -67,6 +69,8 @@ fields("post_connector") -> fields(Method) -> emqx_bridge_pgsql:fields(Method). +desc("config_connector") -> + ?DESC(emqx_postgresql_connector_schema, "config_connector"); desc(_) -> undefined. diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl index c32504124..2fe3960d5 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl @@ -112,6 +112,12 @@ desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."]; +desc(pgsql_action) -> + ?DESC("pgsql_action"); +desc(action_parameters) -> + ?DESC("action_parameters"); +desc("config_connector") -> + ?DESC(emqx_postgresql_connector_schema, "config_connector"); desc(_) -> undefined. diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl index 9cefabc15..759f69ed7 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_timescale). +-include_lib("hocon/include/hoconsc.hrl"). + -export([ conn_bridge_examples/1 ]). @@ -67,6 +69,8 @@ fields("post_connector") -> fields(Method) -> emqx_bridge_pgsql:fields(Method). +desc("config_connector") -> + ?DESC(emqx_postgresql_connector_schema, "config_connector"); desc(_) -> undefined. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 1ffe306e9..c2ce2568c 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -117,7 +117,7 @@ connector_structs() -> )}, {pgsql, mk( - hoconsc:map(name, ref(emqx_postgresql_connector_schema, "config_connector")), + hoconsc:map(name, ref(emqx_bridge_pgsql, "config_connector")), #{ desc => <<"PostgreSQL Connector Config">>, required => false diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl index 1d6949856..366d9c71d 100644 --- a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl +++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl @@ -25,7 +25,8 @@ -export([ roots/0, - fields/1 + fields/1, + desc/1 ]). %% Examples @@ -35,7 +36,7 @@ ]). roots() -> - [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + []. fields("config_connector") -> [{server, server()}] ++ @@ -199,3 +200,8 @@ values(producer) -> }, local_topic => <<"mqtt/local/topic">> }. + +desc("config_connector") -> + ?DESC("config_connector"); +desc(_) -> + undefined. diff --git a/rel/i18n/emqx_bridge_pgsql.hocon b/rel/i18n/emqx_bridge_pgsql.hocon index 0a5ca2b04..8fcf9139e 100644 --- a/rel/i18n/emqx_bridge_pgsql.hocon +++ b/rel/i18n/emqx_bridge_pgsql.hocon @@ -40,4 +40,17 @@ sql_template.desc: sql_template.label: """SQL Template""" +pgsql_action.desc: +"""Configuration for PostgreSQL Action""" + +pgsql_action.label: +"""PostgreSQL Action Configuration""" + + +action_parameters.desc: +"""Configuration Parameters Specific to the PostgreSQL Action""" + +action_parameters.label: +"""Action Parameters""" + } diff --git a/rel/i18n/emqx_postgresql.hocon b/rel/i18n/emqx_postgresql.hocon index c6d2581c1..9740b0814 100644 --- a/rel/i18n/emqx_postgresql.hocon +++ b/rel/i18n/emqx_postgresql.hocon @@ -8,4 +8,10 @@ The PostgreSQL default port 5432 is used if `[:Port]` is not specified.""" server.label: """Server Host""" +config_connector.desc: +"""The configuration for the PostgreSQL connector.""" + +config_connector.label: +"""PostgreSQL Connector Config""" + } diff --git a/rel/i18n/emqx_postgresql_connector_schema.hocon b/rel/i18n/emqx_postgresql_connector_schema.hocon new file mode 100644 index 000000000..8ecfb958a --- /dev/null +++ b/rel/i18n/emqx_postgresql_connector_schema.hocon @@ -0,0 +1,18 @@ + +emqx_postgresql_connector_schema { + +server.desc: +"""The IPv4 or IPv6 address or the hostname to connect to.
+A host entry has the following form: `Host[:Port]`.
+The PostgreSQL default port 5432 is used if `[:Port]` is not specified.""" + +server.label: +"""Server Host""" + +config_connector.desc: +"""The configuration for the PostgreSQL connector.""" + +config_connector.label: +"""PostgreSQL Connector Config""" + +} From e920160805b0b7f1ebb3d6f605bbf693bf33af06 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 24 Nov 2023 15:42:51 +0100 Subject: [PATCH 10/18] fix: add enable and description fields to PostgreSQL connector --- .../src/schema/emqx_postgresql_connector_schema.erl | 9 ++++++++- rel/i18n/emqx_postgresql_connector_schema.hocon | 6 ++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl index 366d9c71d..ee92c62fc 100644 --- a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl +++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl @@ -38,10 +38,17 @@ roots() -> []. -fields("config_connector") -> +fields("connection_fields") -> [{server, server()}] ++ adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++ emqx_connector_schema_lib:ssl_fields(); +fields("config_connector") -> + fields("connection_fields") ++ fields(enable_and_desc); +fields(enable_and_desc) -> + [ + {enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {description, emqx_schema:description_schema()} + ]; fields(config) -> fields("config_connector") ++ fields(action); diff --git a/rel/i18n/emqx_postgresql_connector_schema.hocon b/rel/i18n/emqx_postgresql_connector_schema.hocon index 8ecfb958a..056e66d09 100644 --- a/rel/i18n/emqx_postgresql_connector_schema.hocon +++ b/rel/i18n/emqx_postgresql_connector_schema.hocon @@ -15,4 +15,10 @@ config_connector.desc: config_connector.label: """PostgreSQL Connector Config""" +config_enable.desc: +"""Enable (true) or disable (false) this Kafka bridge.""" + +config_enable.label: +"""Enable or Disable""" + } From fc7bedb81a37eeae5b3d93f15280c4070f168324 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 24 Nov 2023 17:37:58 +0100 Subject: [PATCH 11/18] fix: remove duplicated entry --- apps/emqx_bridge/src/emqx_action_info.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 74fd38811..f206c664d 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -79,7 +79,6 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_kafka_action_info, emqx_bridge_mongodb_action_info, emqx_bridge_syskeeper_action_info, - emqx_bridge_syskeeper_action_info, emqx_bridge_pgsql_action_info, emqx_bridge_timescale_action_info, emqx_bridge_matrix_action_info From d03674a50549a2d7db63bdc5dfdd15377f85ad90 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 24 Nov 2023 17:59:56 +0100 Subject: [PATCH 12/18] fix: duplicate key in example --- .../src/emqx_bridge_pgsql.erl | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl index 2fe3960d5..4e05c2b1d 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl @@ -176,15 +176,18 @@ values({put, PostgreSQLType}) -> enable => true, connector => <<"my_pgsql_connector">>, resource_opts => #{ - health_check_interval => "32s" + batch_size => 1, + batch_time => <<"50ms">>, + inflight_window => 100, + max_buffer_bytes => <<"256MB">>, + request_ttl => <<"45s">>, + worker_pool_size => 16 } }, values({producer, PostgreSQLType}) ); values({producer, _PostgreSQLType}) -> #{ - <<"enable">> => true, - <<"connector">> => <<"connector_pgsql_test">>, <<"parameters">> => #{ <<"sql">> => << @@ -195,18 +198,6 @@ values({producer, _PostgreSQLType}) -> " TO_TIMESTAMP((${timestamp} :: bigint))\n" ")" >> - }, - <<"resource_opts">> => #{ - <<"batch_size">> => 1, - <<"batch_time">> => <<"0ms">>, - <<"health_check_interval">> => <<"15s">>, - <<"inflight_window">> => 100, - <<"max_buffer_bytes">> => <<"256MB">>, - <<"query_mode">> => <<"async">>, - <<"request_ttl">> => <<"45s">>, - <<"start_after_created">> => true, - <<"start_timeout">> => <<"5s">>, - <<"worker_pool_size">> => 16 } }. From f070d80b1a6b5283e7660b47a0f080ee7bb554fa Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 24 Nov 2023 18:50:52 +0100 Subject: [PATCH 13/18] fix: swagger examples for PostgreSQL, Matrix and Timescale --- .../src/emqx_bridge_matrix.erl | 2 +- .../src/emqx_bridge_pgsql.erl | 41 ++++--- .../src/emqx_bridge_timescale.erl | 2 +- .../emqx_postgresql_connector_schema.erl | 104 ++++-------------- 4 files changed, 48 insertions(+), 101 deletions(-) diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl index fb8ee9d4d..f74e18d3b 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl @@ -81,7 +81,7 @@ connector_examples(Method) -> #{ <<"matrix">> => #{ summary => <<"Matrix Connector">>, - value => emqx_postgresql_connector_schema:values({Method, connector}) + value => emqx_postgresql_connector_schema:values({Method, <<"matrix">>}) } } ]. diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl index 4e05c2b1d..534570ac9 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl @@ -18,7 +18,6 @@ roots/0, fields/1, desc/1, - values/2, fields/2 ]). @@ -133,8 +132,8 @@ bridge_v2_examples(Method) -> [ #{ <<"pgsql">> => #{ - summary => <<"PostgreSQL Producer Action">>, - value => values({Method, bridge_v2_producer}) + summary => <<"PostgreSQL Action">>, + value => values({Method, pgsql}) } } ]. @@ -143,7 +142,7 @@ conn_bridge_examples(Method) -> [ #{ <<"pgsql">> => #{ - summary => <<"PostgreSQL Producer Bridge">>, + summary => <<"PostgreSQL Bridge">>, value => values_conn_bridge_examples(Method, pgsql) } } @@ -160,21 +159,17 @@ values({get, PostgreSQLType}) -> } ] }, - values({post, PostgreSQLType}) - ); -values({post, PostgreSQLType}) -> - maps:merge( - #{ - name => <<"my_pgsql_action">>, - type => PostgreSQLType - }, values({put, PostgreSQLType}) ); +values({post, PostgreSQLType}) -> + values({put, PostgreSQLType}); values({put, PostgreSQLType}) -> maps:merge( #{ + name => <<"my_action">>, + type => PostgreSQLType, enable => true, - connector => <<"my_pgsql_connector">>, + connector => <<"my_connector">>, resource_opts => #{ batch_size => 1, batch_time => <<"50ms">>, @@ -184,9 +179,9 @@ values({put, PostgreSQLType}) -> worker_pool_size => 16 } }, - values({producer, PostgreSQLType}) + values(parameters) ); -values({producer, _PostgreSQLType}) -> +values(parameters) -> #{ <<"parameters">> => #{ <<"sql">> => @@ -201,6 +196,19 @@ values({producer, _PostgreSQLType}) -> } }. +values_conn_bridge_examples(get, Type) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values_conn_bridge_examples(post, Type) + ); values_conn_bridge_examples(_Method, Type) -> #{ enable => true, @@ -222,6 +230,3 @@ values_conn_bridge_examples(_Method, Type) -> max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }. - -values(Method, Type) -> - values_conn_bridge_examples(Method, Type). diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl index 759f69ed7..796d9d9f6 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl @@ -81,7 +81,7 @@ connector_examples(Method) -> #{ <<"timescale">> => #{ summary => <<"Timescale Connector">>, - value => emqx_postgresql_connector_schema:values({Method, connector}) + value => emqx_postgresql_connector_schema:values({Method, <<"timescale">>}) } } ]. diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl index ee92c62fc..ffdc3771c 100644 --- a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl +++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl @@ -99,8 +99,8 @@ connector_examples(Method) -> [ #{ <<"pgsql">> => #{ - summary => <<"PostgreSQL Producer Connector">>, - value => values({Method, connector}) + summary => <<"PostgreSQL Connector">>, + value => values({Method, pgsql}) } } ]. @@ -119,93 +119,35 @@ values({get, PostgreSQLType}) -> }, values({post, PostgreSQLType}) ); -values({post, connector}) -> - maps:merge( - #{ - name => <<"my_pgsql_connector">>, - type => <<"pgsql">> - }, - values(common_config) - ); values({post, PostgreSQLType}) -> - maps:merge( - #{ - name => <<"my_pgsql_action">>, - type => <<"pgsql">> - }, - values({put, PostgreSQLType}) - ); -values({put, bridge_v2_producer}) -> - values(bridge_v2_producer); -values({put, connector}) -> - values(common_config); + values({put, PostgreSQLType}); values({put, PostgreSQLType}) -> - maps:merge(values(common_config), values(PostgreSQLType)); -values(bridge_v2_producer) -> maps:merge( #{ - enable => true, - connector => <<"my_pgsql_connector">>, - resource_opts => #{ - health_check_interval => "32s" - } + name => <<"my_action">>, + type => PostgreSQLType }, - values(producer) + values(common) ); -values(common_config) -> +values(common) -> #{ - authentication => #{ - mechanism => <<"plain">>, - username => <<"username">>, - password => <<"******">> + <<"database">> => <<"emqx_data">>, + <<"enable">> => true, + <<"password">> => <<"public">>, + <<"pool_size">> => 8, + <<"server">> => <<"127.0.0.1:5432">>, + <<"ssl">> => #{ + <<"ciphers">> => [], + <<"depth">> => 10, + <<"enable">> => false, + <<"hibernate_after">> => <<"5s">>, + <<"log_level">> => <<"notice">>, + <<"reuse_sessions">> => true, + <<"secure_renegotiate">> => true, + <<"verify">> => <<"verify_peer">>, + <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>] }, - bootstrap_hosts => <<"localhost:9092">>, - connect_timeout => <<"5s">>, - enable => true, - metadata_request_timeout => <<"4s">>, - min_metadata_refresh_interval => <<"3s">>, - socket_opts => #{ - sndbuf => <<"1024KB">>, - recbuf => <<"1024KB">>, - nodelay => true, - tcp_keepalive => <<"none">> - } - }; -values(producer) -> - #{ - kafka => #{ - topic => <<"kafka-topic">>, - message => #{ - key => <<"${.clientid}">>, - value => <<"${.}">>, - timestamp => <<"${.timestamp}">> - }, - max_batch_bytes => <<"896KB">>, - compression => <<"no_compression">>, - partition_strategy => <<"random">>, - required_acks => <<"all_isr">>, - partition_count_refresh_interval => <<"60s">>, - kafka_headers => <<"${pub_props}">>, - kafka_ext_headers => [ - #{ - kafka_ext_header_key => <<"clientid">>, - kafka_ext_header_value => <<"${clientid}">> - }, - #{ - kafka_ext_header_key => <<"topic">>, - kafka_ext_header_value => <<"${topic}">> - } - ], - kafka_header_value_encode_mode => none, - max_inflight => 10, - buffer => #{ - mode => <<"hybrid">>, - per_partition_limit => <<"2GB">>, - segment_bytes => <<"100MB">>, - memory_overload_protection => true - } - }, - local_topic => <<"mqtt/local/topic">> + <<"username">> => <<"postgres">> }. desc("config_connector") -> From f79d38983dd287950cb57e4b3a2a9be72154e0d5 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 24 Nov 2023 18:56:57 +0100 Subject: [PATCH 14/18] docs: add changelog entry --- changes/ee/feat-12013.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ee/feat-12013.en.md diff --git a/changes/ee/feat-12013.en.md b/changes/ee/feat-12013.en.md new file mode 100644 index 000000000..b72b7b5be --- /dev/null +++ b/changes/ee/feat-12013.en.md @@ -0,0 +1 @@ +The bridges for PostgreSQL, Timescale and Matrix have been split so they are available via the connectors and actions APIs. They are still backwards compatible with the old bridge API. From 30e248061fcb6677534de083bbcb7722079074bc Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 24 Nov 2023 20:07:37 +0100 Subject: [PATCH 15/18] fix: problems found by @thalesmg in code review Co-authored-by: Thales Macedo Garitezi --- apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src | 2 +- apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src | 2 +- rel/i18n/emqx_postgresql_connector_schema.hocon | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src index 9175e5d4e..479aa13df 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src @@ -7,7 +7,7 @@ stdlib, emqx_resource ]}, - {env, [{emqx_action_info_module, emqx_bridge_matrix_action_info}]}, + {env, [{emqx_action_info_modules, [emqx_bridge_matrix_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src index 58d093f6f..614747254 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src @@ -8,7 +8,7 @@ emqx_resource, emqx_postgresql ]}, - {env, [{emqx_action_info_module, emqx_bridge_pgsql_action_info}]}, + {env, [{emqx_action_info_modules, [emqx_bridge_pgsql_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/rel/i18n/emqx_postgresql_connector_schema.hocon b/rel/i18n/emqx_postgresql_connector_schema.hocon index 056e66d09..4546b86ef 100644 --- a/rel/i18n/emqx_postgresql_connector_schema.hocon +++ b/rel/i18n/emqx_postgresql_connector_schema.hocon @@ -16,7 +16,7 @@ config_connector.label: """PostgreSQL Connector Config""" config_enable.desc: -"""Enable (true) or disable (false) this Kafka bridge.""" +"""Enable (true) or disable (false) this PostgreSQL bridge.""" config_enable.label: """Enable or Disable""" From 66945dcc5c54ca10b335fc655dffa9ff08c84a32 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 24 Nov 2023 20:31:29 +0100 Subject: [PATCH 16/18] fix: address more comments from @thalesmg --- apps/emqx_bridge/src/emqx_action_info.erl | 6 +++--- apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl | 3 +-- .../src/schema/emqx_connector_ee_schema.erl | 16 ++++++++-------- .../src/schema/emqx_connector_schema.erl | 6 +++--- .../schema/emqx_postgresql_connector_schema.erl | 7 +------ rel/i18n/emqx_postgresql_connector_schema.hocon | 6 ------ 6 files changed, 16 insertions(+), 28 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index f206c664d..4f195b417 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -77,11 +77,11 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_confluent_producer_action_info, emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_kafka_action_info, + emqx_bridge_matrix_action_info, emqx_bridge_mongodb_action_info, - emqx_bridge_syskeeper_action_info, emqx_bridge_pgsql_action_info, - emqx_bridge_timescale_action_info, - emqx_bridge_matrix_action_info + emqx_bridge_syskeeper_action_info, + emqx_bridge_timescale_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl index 534570ac9..949016336 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_pgsql). @@ -68,7 +68,6 @@ fields(action_parameters) -> emqx_connector_schema_lib:prepare_statement_fields(); fields(pgsql_action) -> emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters)); -%% TODO: All of these needs to be fixed fields("put_bridge_v2") -> fields(pgsql_action); fields("get_bridge_v2") -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index c2ce2568c..389623b0a 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -29,18 +29,18 @@ resource_type(gcp_pubsub_producer) -> emqx_bridge_gcp_pubsub_impl_producer; resource_type(kafka_producer) -> emqx_bridge_kafka_impl_producer; +resource_type(matrix) -> + emqx_postgresql; resource_type(mongodb) -> emqx_bridge_mongodb_connector; +resource_type(pgsql) -> + emqx_postgresql; resource_type(syskeeper_forwarder) -> emqx_bridge_syskeeper_connector; resource_type(syskeeper_proxy) -> emqx_bridge_syskeeper_proxy_server; -resource_type(pgsql) -> - emqx_postgresql; resource_type(timescale) -> emqx_postgresql; -resource_type(matrix) -> - emqx_postgresql; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -159,12 +159,12 @@ schema_modules() -> emqx_bridge_confluent_producer, emqx_bridge_gcp_pubsub_producer_schema, emqx_bridge_kafka, + emqx_bridge_matrix, emqx_bridge_mongodb, emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_proxy, - emqx_postgresql_connector_schema, emqx_bridge_timescale, - emqx_bridge_matrix + emqx_postgresql_connector_schema ]. api_schemas(Method) -> @@ -183,12 +183,12 @@ api_schemas(Method) -> Method ++ "_connector" ), api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), + api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), - api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), - api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector") + api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector") ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index cd99f0fe6..a7de0cf52 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -70,12 +70,12 @@ connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_pro connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; +connector_type_to_bridge_types(matrix) -> [matrix]; connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; +connector_type_to_bridge_types(pgsql) -> [pgsql]; connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; connector_type_to_bridge_types(syskeeper_proxy) -> []; -connector_type_to_bridge_types(pgsql) -> [pgsql]; -connector_type_to_bridge_types(timescale) -> [timescale]; -connector_type_to_bridge_types(matrix) -> [matrix]. +connector_type_to_bridge_types(timescale) -> [timescale]. actions_config_name() -> <<"actions">>. diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl index ffdc3771c..d9ccbdc79 100644 --- a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl +++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl @@ -43,12 +43,7 @@ fields("connection_fields") -> adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++ emqx_connector_schema_lib:ssl_fields(); fields("config_connector") -> - fields("connection_fields") ++ fields(enable_and_desc); -fields(enable_and_desc) -> - [ - {enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {description, emqx_schema:description_schema()} - ]; + fields("connection_fields") ++ emqx_connector_schema:common_fields(); fields(config) -> fields("config_connector") ++ fields(action); diff --git a/rel/i18n/emqx_postgresql_connector_schema.hocon b/rel/i18n/emqx_postgresql_connector_schema.hocon index 4546b86ef..8ecfb958a 100644 --- a/rel/i18n/emqx_postgresql_connector_schema.hocon +++ b/rel/i18n/emqx_postgresql_connector_schema.hocon @@ -15,10 +15,4 @@ config_connector.desc: config_connector.label: """PostgreSQL Connector Config""" -config_enable.desc: -"""Enable (true) or disable (false) this PostgreSQL bridge.""" - -config_enable.label: -"""Enable or Disable""" - } From c6c1d886f0007ca360bb87f9edfc3e708bfeff24 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 24 Nov 2023 21:19:26 +0100 Subject: [PATCH 17/18] fix: make pgsql action schema properly --- apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl index 949016336..4c0efe269 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl @@ -67,7 +67,15 @@ fields(action_parameters) -> ] ++ emqx_connector_schema_lib:prepare_statement_fields(); fields(pgsql_action) -> - emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters)); + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + hoconsc:ref(?MODULE, action_parameters), + #{ + required => true, + desc => ?DESC("action_parameters") + } + ) + ); fields("put_bridge_v2") -> fields(pgsql_action); fields("get_bridge_v2") -> From c85004b7ef6fb05b0d490a1584fa32105d82968e Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 24 Nov 2023 21:22:27 +0100 Subject: [PATCH 18/18] chore: remove obsolete TODO --- .../src/schema/emqx_postgresql_connector_schema.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl index d9ccbdc79..74591beee 100644 --- a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl +++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl @@ -58,7 +58,6 @@ fields(action) -> )}; fields(pgsql_action) -> emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters)); -%% TODO: All of these needs to be fixed fields("put_bridge_v2") -> fields(pgsql_action); fields("get_bridge_v2") ->