diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl
index 129142f24..4f195b417 100644
--- a/apps/emqx_bridge/src/emqx_action_info.erl
+++ b/apps/emqx_bridge/src/emqx_action_info.erl
@@ -77,8 +77,11 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_confluent_producer_action_info,
emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_kafka_action_info,
+ emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info,
- emqx_bridge_syskeeper_action_info
+ emqx_bridge_pgsql_action_info,
+ emqx_bridge_syskeeper_action_info,
+ emqx_bridge_timescale_action_info
].
-else.
hard_coded_action_info_modules_ee() ->
diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index f557210ed..9e14c0c9a 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -240,8 +240,8 @@ send_message(BridgeId, Message) ->
{BridgeV1Type, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
case emqx_bridge_v2:is_bridge_v2_type(BridgeV1Type) of
true ->
- BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
- emqx_bridge_v2:send_message(BridgeV2Type, BridgeName, Message, #{});
+ ActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeV1Type),
+ emqx_bridge_v2:send_message(ActionType, BridgeName, Message, #{});
false ->
ResId = emqx_bridge_resource:resource_id(BridgeV1Type, BridgeName),
send_message(BridgeV1Type, BridgeName, ResId, Message, #{})
@@ -414,7 +414,7 @@ remove(BridgeType0, BridgeName) ->
}),
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
- emqx_bridge_v2:remove(BridgeType, BridgeName);
+ emqx_bridge_v2:bridge_v1_remove(BridgeType0, BridgeName);
false ->
remove_v1(BridgeType, BridgeName)
end.
diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl
index 54ccf1b24..1863ed84b 100644
--- a/apps/emqx_bridge/src/emqx_bridge_v2.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl
@@ -55,6 +55,7 @@
disable_enable/3,
health_check/2,
send_message/4,
+ query/4,
start/2,
reset_metrics/2,
create_dry_run/2,
@@ -116,7 +117,9 @@
bridge_v1_enable_disable/3,
bridge_v1_restart/2,
bridge_v1_stop/2,
- bridge_v1_start/2
+ bridge_v1_start/2,
+ %% For test cases only
+ bridge_v1_remove/2
]).
%%====================================================================
@@ -547,25 +550,25 @@ get_query_mode(BridgeV2Type, Config) ->
ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType),
emqx_resource:query_mode(ResourceType, Config, CreationOpts).
--spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
+-spec query(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
term() | {error, term()}.
-send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
+query(BridgeType, BridgeName, Message, QueryOpts0) ->
case lookup_conf(BridgeType, BridgeName) of
#{enable := true} = Config0 ->
Config = combine_connector_and_bridge_v2_config(BridgeType, BridgeName, Config0),
- do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
+ do_query_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
#{enable := false} ->
{error, bridge_stopped};
_Error ->
{error, bridge_not_found}
end.
-do_send_msg_with_enabled_config(
+do_query_with_enabled_config(
_BridgeType, _BridgeName, _Message, _QueryOpts0, {error, Reason} = Error
) ->
?SLOG(error, Reason),
Error;
-do_send_msg_with_enabled_config(
+do_query_with_enabled_config(
BridgeType, BridgeName, Message, QueryOpts0, Config
) ->
QueryMode = get_query_mode(BridgeType, Config),
@@ -579,7 +582,17 @@ do_send_msg_with_enabled_config(
}
),
BridgeV2Id = id(BridgeType, BridgeName),
- emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts).
+ case Message of
+ {send_message, Msg} ->
+ emqx_resource:query(BridgeV2Id, {BridgeV2Id, Msg}, QueryOpts);
+ Msg ->
+ emqx_resource:query(BridgeV2Id, Msg, QueryOpts)
+ end.
+
+-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
+ term() | {error, term()}.
+send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
+ query(BridgeType, BridgeName, {send_message, Message}, QueryOpts0).
-spec health_check(BridgeType :: term(), BridgeName :: term()) ->
#{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}.
@@ -785,17 +798,24 @@ parse_id(Id) ->
end.
get_channels_for_connector(ConnectorId) ->
- {ConnectorType, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId),
- RootConf = maps:keys(emqx:get_config([?ROOT_KEY], #{})),
- RelevantBridgeV2Types = [
- Type
- || Type <- RootConf,
- connector_type(Type) =:= ConnectorType
- ],
- lists:flatten([
- get_channels_for_connector(ConnectorName, BridgeV2Type)
- || BridgeV2Type <- RelevantBridgeV2Types
- ]).
+ try emqx_connector_resource:parse_connector_id(ConnectorId) of
+ {ConnectorType, ConnectorName} ->
+ RootConf = maps:keys(emqx:get_config([?ROOT_KEY], #{})),
+ RelevantBridgeV2Types = [
+ Type
+ || Type <- RootConf,
+ connector_type(Type) =:= ConnectorType
+ ],
+ lists:flatten([
+ get_channels_for_connector(ConnectorName, BridgeV2Type)
+ || BridgeV2Type <- RelevantBridgeV2Types
+ ])
+ catch
+ _:_ ->
+ %% ConnectorId is not a valid connector id so we assume the connector
+ %% has no channels (e.g. it is a a connector for authn or authz)
+ []
+ end.
get_channels_for_connector(ConnectorName, BridgeV2Type) ->
BridgeV2s = emqx:get_config([?ROOT_KEY, BridgeV2Type], #{}),
@@ -1325,6 +1345,34 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf).
+%% Only called by test cases (may create broken references)
+bridge_v1_remove(BridgeV1Type, BridgeName) ->
+ ActionType = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
+ bridge_v1_remove(
+ ActionType,
+ BridgeName,
+ lookup_conf(ActionType, BridgeName)
+ ).
+
+bridge_v1_remove(
+ ActionType,
+ Name,
+ #{connector := ConnectorName}
+) ->
+ case remove(ActionType, Name) of
+ ok ->
+ ConnectorType = connector_type(ActionType),
+ emqx_connector:remove(ConnectorType, ConnectorName);
+ Error ->
+ Error
+ end;
+bridge_v1_remove(
+ _ActionType,
+ _Name,
+ Error
+) ->
+ Error.
+
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
bridge_v1_check_deps_and_remove(
diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src
index 14aca1f75..479aa13df 100644
--- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src
+++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src
@@ -1,13 +1,13 @@
{application, emqx_bridge_matrix, [
{description, "EMQX Enterprise MatrixDB Bridge"},
- {vsn, "0.1.2"},
+ {vsn, "0.1.3"},
{registered, []},
{applications, [
kernel,
stdlib,
emqx_resource
]},
- {env, []},
+ {env, [{emqx_action_info_modules, [emqx_bridge_matrix_action_info]}]},
{modules, []},
{links, []}
]}.
diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl
index abd98adb6..f74e18d3b 100644
--- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl
+++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl
@@ -3,6 +3,8 @@
%%--------------------------------------------------------------------
-module(emqx_bridge_matrix).
+-include_lib("hocon/include/hoconsc.hrl").
+
-export([
conn_bridge_examples/1
]).
@@ -14,6 +16,12 @@
desc/1
]).
+%% Examples
+-export([
+ bridge_v2_examples/1,
+ connector_examples/1
+]).
+
%% -------------------------------------------------------------------------------------------------
%% api
@@ -22,7 +30,7 @@ conn_bridge_examples(Method) ->
#{
<<"matrix">> => #{
summary => <<"Matrix Bridge">>,
- value => emqx_bridge_pgsql:values(Method, matrix)
+ value => emqx_bridge_pgsql:values_conn_bridge_examples(Method, matrix)
}
}
].
@@ -35,8 +43,55 @@ roots() -> [].
fields("post") ->
emqx_bridge_pgsql:fields("post", matrix);
+fields("config_connector") ->
+ emqx_bridge_pgsql:fields("config_connector");
+fields(action) ->
+ {matrix,
+ hoconsc:mk(
+ hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)),
+ #{
+ desc => <<"Matrix Action Config">>,
+ required => false
+ }
+ )};
+fields("put_bridge_v2") ->
+ emqx_bridge_pgsql:fields(pgsql_action);
+fields("get_bridge_v2") ->
+ emqx_bridge_pgsql:fields(pgsql_action);
+fields("post_bridge_v2") ->
+ emqx_bridge_pgsql:fields(pgsql_action);
+fields("put_connector") ->
+ emqx_bridge_pgsql:fields("config_connector");
+fields("get_connector") ->
+ emqx_bridge_pgsql:fields("config_connector");
+fields("post_connector") ->
+ emqx_bridge_pgsql:fields("config_connector");
fields(Method) ->
emqx_bridge_pgsql:fields(Method).
+desc("config_connector") ->
+ ?DESC(emqx_postgresql_connector_schema, "config_connector");
desc(_) ->
undefined.
+
+%% Examples
+
+connector_examples(Method) ->
+ [
+ #{
+ <<"matrix">> => #{
+ summary => <<"Matrix Connector">>,
+ value => emqx_postgresql_connector_schema:values({Method, <<"matrix">>})
+ }
+ }
+ ].
+
+bridge_v2_examples(Method) ->
+ [
+ #{
+ <<"matrix">> => #{
+ summary => <<"Matrix Action">>,
+ value => emqx_bridge_pgsql:values({Method, matrix})
+ }
+ }
+ ].
diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl
new file mode 100644
index 000000000..4eae13415
--- /dev/null
+++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl
@@ -0,0 +1,22 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_matrix_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+ bridge_v1_type_name/0,
+ action_type_name/0,
+ connector_type_name/0,
+ schema_module/0
+]).
+
+bridge_v1_type_name() -> matrix.
+
+action_type_name() -> matrix.
+
+connector_type_name() -> matrix.
+
+schema_module() -> emqx_bridge_matrix.
diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src
index 7a17652e0..614747254 100644
--- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src
+++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src
@@ -8,7 +8,7 @@
emqx_resource,
emqx_postgresql
]},
- {env, []},
+ {env, [{emqx_action_info_modules, [emqx_bridge_pgsql_action_info]}]},
{modules, []},
{links, []}
]}.
diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl
index bb15dfad9..4c0efe269 100644
--- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl
+++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl
@@ -1,83 +1,97 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
+
-module(emqx_bridge_pgsql).
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("emqx_postgresql/include/emqx_postgresql.hrl").
-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("epgsql/include/epgsql.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
--import(hoconsc, [mk/2, enum/1, ref/2]).
-
--export([
- conn_bridge_examples/1,
- values/2,
- fields/2
-]).
-
-export([
namespace/0,
roots/0,
fields/1,
- desc/1
+ desc/1,
+ fields/2
]).
--define(DEFAULT_SQL, <<
- "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
- "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
->>).
+%% Examples
+-export([
+ bridge_v2_examples/1,
+ conn_bridge_examples/1
+]).
-%% -------------------------------------------------------------------------------------------------
-%% api
+%% Exported for timescale and matrix bridges
+-export([
+ values/1,
+ values_conn_bridge_examples/2
+]).
-conn_bridge_examples(Method) ->
- [
- #{
- <<"pgsql">> => #{
- summary => <<"PostgreSQL Bridge">>,
- value => values(Method, pgsql)
- }
- }
- ].
+-define(PGSQL_HOST_OPTIONS, #{
+ default_port => ?PGSQL_DEFAULT_PORT
+}).
-values(_Method, Type) ->
- #{
- enable => true,
- type => Type,
- name => <<"foo">>,
- server => <<"127.0.0.1:5432">>,
- database => <<"mqtt">>,
- pool_size => 8,
- username => <<"root">>,
- password => <<"******">>,
- sql => ?DEFAULT_SQL,
- local_topic => <<"local/topic/#">>,
- resource_opts => #{
- worker_pool_size => 8,
- health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
- batch_size => ?DEFAULT_BATCH_SIZE,
- batch_time => ?DEFAULT_BATCH_TIME,
- query_mode => async,
- max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
- }
- }.
-
-%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
namespace() -> "bridge_pgsql".
-roots() -> [].
+roots() ->
+ [].
+fields("config_connector") ->
+ emqx_postgresql_connector_schema:fields("config_connector");
+fields(config) ->
+ fields("config_connector") ++
+ fields(action);
+fields(action) ->
+ {pgsql,
+ hoconsc:mk(
+ hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)),
+ #{
+ desc => <<"PostgreSQL Action Config">>,
+ required => false
+ }
+ )};
+fields(action_parameters) ->
+ [
+ {sql,
+ hoconsc:mk(
+ binary(),
+ #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>}
+ )}
+ ] ++
+ emqx_connector_schema_lib:prepare_statement_fields();
+fields(pgsql_action) ->
+ emqx_bridge_v2_schema:make_producer_action_schema(
+ hoconsc:mk(
+ hoconsc:ref(?MODULE, action_parameters),
+ #{
+ required => true,
+ desc => ?DESC("action_parameters")
+ }
+ )
+ );
+fields("put_bridge_v2") ->
+ fields(pgsql_action);
+fields("get_bridge_v2") ->
+ fields(pgsql_action);
+fields("post_bridge_v2") ->
+ fields(pgsql_action);
fields("config") ->
[
- {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+ {enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{sql,
- mk(
+ hoconsc:mk(
binary(),
- #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
+ #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>}
)},
{local_topic,
- mk(
+ hoconsc:mk(
binary(),
#{desc => ?DESC("local_topic"), default => undefined}
)}
@@ -94,17 +108,132 @@ fields("get") ->
fields("post", Type) ->
[type_field(Type), name_field() | fields("config")].
+type_field(Type) ->
+ {type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+ {name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
+
desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."];
+desc(pgsql_action) ->
+ ?DESC("pgsql_action");
+desc(action_parameters) ->
+ ?DESC("action_parameters");
+desc("config_connector") ->
+ ?DESC(emqx_postgresql_connector_schema, "config_connector");
desc(_) ->
undefined.
-%% -------------------------------------------------------------------------------------------------
+default_sql() ->
+ <<
+ "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
+ "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
+ >>.
-type_field(Type) ->
- {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
+%% Examples
-name_field() ->
- {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
+bridge_v2_examples(Method) ->
+ [
+ #{
+ <<"pgsql">> => #{
+ summary => <<"PostgreSQL Action">>,
+ value => values({Method, pgsql})
+ }
+ }
+ ].
+
+conn_bridge_examples(Method) ->
+ [
+ #{
+ <<"pgsql">> => #{
+ summary => <<"PostgreSQL Bridge">>,
+ value => values_conn_bridge_examples(Method, pgsql)
+ }
+ }
+ ].
+
+values({get, PostgreSQLType}) ->
+ maps:merge(
+ #{
+ status => <<"connected">>,
+ node_status => [
+ #{
+ node => <<"emqx@localhost">>,
+ status => <<"connected">>
+ }
+ ]
+ },
+ values({put, PostgreSQLType})
+ );
+values({post, PostgreSQLType}) ->
+ values({put, PostgreSQLType});
+values({put, PostgreSQLType}) ->
+ maps:merge(
+ #{
+ name => <<"my_action">>,
+ type => PostgreSQLType,
+ enable => true,
+ connector => <<"my_connector">>,
+ resource_opts => #{
+ batch_size => 1,
+ batch_time => <<"50ms">>,
+ inflight_window => 100,
+ max_buffer_bytes => <<"256MB">>,
+ request_ttl => <<"45s">>,
+ worker_pool_size => 16
+ }
+ },
+ values(parameters)
+ );
+values(parameters) ->
+ #{
+ <<"parameters">> => #{
+ <<"sql">> =>
+ <<
+ "INSERT INTO client_events(clientid, event, created_at)"
+ "VALUES (\n"
+ " ${clientid},\n"
+ " ${event},\n"
+ " TO_TIMESTAMP((${timestamp} :: bigint))\n"
+ ")"
+ >>
+ }
+ }.
+
+values_conn_bridge_examples(get, Type) ->
+ maps:merge(
+ #{
+ status => <<"connected">>,
+ node_status => [
+ #{
+ node => <<"emqx@localhost">>,
+ status => <<"connected">>
+ }
+ ]
+ },
+ values_conn_bridge_examples(post, Type)
+ );
+values_conn_bridge_examples(_Method, Type) ->
+ #{
+ enable => true,
+ type => Type,
+ name => <<"foo">>,
+ server => <<"127.0.0.1:5432">>,
+ database => <<"mqtt">>,
+ pool_size => 8,
+ username => <<"root">>,
+ password => <<"******">>,
+ sql => default_sql(),
+ local_topic => <<"local/topic/#">>,
+ resource_opts => #{
+ worker_pool_size => 8,
+ health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
+ batch_size => ?DEFAULT_BATCH_SIZE,
+ batch_time => ?DEFAULT_BATCH_TIME,
+ query_mode => async,
+ max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
+ }
+ }.
diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl
new file mode 100644
index 000000000..c702b396b
--- /dev/null
+++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl
@@ -0,0 +1,22 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_pgsql_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+ bridge_v1_type_name/0,
+ action_type_name/0,
+ connector_type_name/0,
+ schema_module/0
+]).
+
+bridge_v1_type_name() -> pgsql.
+
+action_type_name() -> pgsql.
+
+connector_type_name() -> pgsql.
+
+schema_module() -> emqx_bridge_pgsql.
diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl
index 722489ba6..58aaa7d71 100644
--- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl
+++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl
@@ -114,7 +114,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(),
- ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
+ ok = emqx_common_test_helpers:stop_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
ok.
init_per_testcase(_Testcase, Config) ->
@@ -147,7 +147,7 @@ common_init(Config0) ->
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
% Ensure enterprise bridge module is loaded
- ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
+ ok = emqx_common_test_helpers:start_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
_ = emqx_bridge_enterprise:module_info(),
emqx_mgmt_api_test_util:init_suite(),
% Connect to pgsql directly and create the table
@@ -259,17 +259,16 @@ send_message(Config, Payload) ->
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
emqx_bridge:send_message(BridgeID, Payload).
-query_resource(Config, Request) ->
+query_resource(Config, Msg = _Request) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
- emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
+ emqx_bridge_v2:query(BridgeType, Name, Msg, #{timeout => 1_000}).
query_resource_sync(Config, Request) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
- emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
+ ActionId = emqx_bridge_v2:id(BridgeType, Name),
+ emqx_resource_buffer_worker:simple_sync_query(ActionId, Request).
query_resource_async(Config, Request) ->
query_resource_async(Config, Request, _Opts = #{}).
@@ -279,9 +278,8 @@ query_resource_async(Config, Request, Opts) ->
BridgeType = ?config(pgsql_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Timeout = maps:get(timeout, Opts, 500),
- Return = emqx_resource:query(ResourceID, Request, #{
+ Return = emqx_bridge_v2:query(BridgeType, Name, Request, #{
timeout => Timeout,
async_reply_fun => {AsyncReplyFun, []}
}),
@@ -441,13 +439,12 @@ t_get_status(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
+ ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)),
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch(
- {ok, Status} when Status =:= disconnected orelse Status =:= connecting,
- emqx_resource_manager:health_check(ResourceID)
+ #{status := Status} when Status =:= disconnected orelse Status =:= connecting,
+ emqx_bridge_v2:health_check(BridgeType, Name)
)
end),
ok.
@@ -655,7 +652,7 @@ t_nasty_sql_string(Config) ->
t_missing_table(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+ % ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
@@ -665,21 +662,20 @@ t_missing_table(Config) ->
_Sleep = 1_000,
_Attempts = 20,
?assertMatch(
- {ok, Status} when Status == connecting orelse Status == disconnected,
- emqx_resource_manager:health_check(ResourceID)
+ #{status := Status} when Status == connecting orelse Status == disconnected,
+ emqx_bridge_v2:health_check(BridgeType, Name)
)
),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
- Timeout = 1000,
?assertMatch(
{error, {resource_error, #{reason := unhealthy_target}}},
- query_resource(Config, {send_message, SentData, [], Timeout})
+ query_resource(Config, {send_message, SentData})
),
ok
end,
fun(Trace) ->
- ?assertMatch([_], ?of_kind(pgsql_undefined_table, Trace)),
+ ?assertMatch([_ | _], ?of_kind(pgsql_undefined_table, Trace)),
ok
end
),
@@ -689,7 +685,7 @@ t_missing_table(Config) ->
t_table_removed(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+ %%ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
connect_and_create_table(Config),
@@ -697,13 +693,14 @@ t_table_removed(Config) ->
?retry(
_Sleep = 1_000,
_Attempts = 20,
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
+ ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
),
connect_and_drop_table(Config),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
- case query_resource_sync(Config, {send_message, SentData, []}) of
- {error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}} ->
+ ActionId = emqx_bridge_v2:id(BridgeType, Name),
+ case query_resource_sync(Config, {ActionId, SentData}) of
+ {error, {unrecoverable_error, _}} ->
ok;
?RESOURCE_ERROR_M(not_connected, _) ->
ok;
@@ -720,7 +717,6 @@ t_table_removed(Config) ->
t_concurrent_health_checks(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
connect_and_create_table(Config),
@@ -728,11 +724,13 @@ t_concurrent_health_checks(Config) ->
?retry(
_Sleep = 1_000,
_Attempts = 20,
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
+ ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
),
emqx_utils:pmap(
fun(_) ->
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
+ ?assertMatch(
+ #{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)
+ )
end,
lists:seq(1, 20)
),
diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src
index adb024591..53302a21f 100644
--- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src
+++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src
@@ -1,8 +1,9 @@
{application, emqx_bridge_timescale, [
{description, "EMQX Enterprise TimescaleDB Bridge"},
- {vsn, "0.1.2"},
+ {vsn, "0.1.3"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource]},
+ {env, [{emqx_action_info_module, emqx_bridge_timescale_action_info}]},
{env, []},
{modules, []},
{links, []}
diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl
index c4dedf07c..796d9d9f6 100644
--- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl
+++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl
@@ -3,6 +3,8 @@
%%--------------------------------------------------------------------
-module(emqx_bridge_timescale).
+-include_lib("hocon/include/hoconsc.hrl").
+
-export([
conn_bridge_examples/1
]).
@@ -14,6 +16,12 @@
desc/1
]).
+%% Examples
+-export([
+ bridge_v2_examples/1,
+ connector_examples/1
+]).
+
%% -------------------------------------------------------------------------------------------------
%% api
@@ -22,7 +30,7 @@ conn_bridge_examples(Method) ->
#{
<<"timescale">> => #{
summary => <<"Timescale Bridge">>,
- value => emqx_bridge_pgsql:values(Method, timescale)
+ value => emqx_bridge_pgsql:values_conn_bridge_examples(Method, timescale)
}
}
].
@@ -35,8 +43,55 @@ roots() -> [].
fields("post") ->
emqx_bridge_pgsql:fields("post", timescale);
+fields("config_connector") ->
+ emqx_bridge_pgsql:fields("config_connector");
+fields(action) ->
+ {timescale,
+ hoconsc:mk(
+ hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)),
+ #{
+ desc => <<"Timescale Action Config">>,
+ required => false
+ }
+ )};
+fields("put_bridge_v2") ->
+ emqx_bridge_pgsql:fields(pgsql_action);
+fields("get_bridge_v2") ->
+ emqx_bridge_pgsql:fields(pgsql_action);
+fields("post_bridge_v2") ->
+ emqx_bridge_pgsql:fields(pgsql_action);
+fields("put_connector") ->
+ emqx_bridge_pgsql:fields("config_connector");
+fields("get_connector") ->
+ emqx_bridge_pgsql:fields("config_connector");
+fields("post_connector") ->
+ emqx_bridge_pgsql:fields("config_connector");
fields(Method) ->
emqx_bridge_pgsql:fields(Method).
+desc("config_connector") ->
+ ?DESC(emqx_postgresql_connector_schema, "config_connector");
desc(_) ->
undefined.
+
+%% Examples
+
+connector_examples(Method) ->
+ [
+ #{
+ <<"timescale">> => #{
+ summary => <<"Timescale Connector">>,
+ value => emqx_postgresql_connector_schema:values({Method, <<"timescale">>})
+ }
+ }
+ ].
+
+bridge_v2_examples(Method) ->
+ [
+ #{
+ <<"timescale">> => #{
+ summary => <<"Timescale Action">>,
+ value => emqx_bridge_pgsql:values({Method, timescale})
+ }
+ }
+ ].
diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl
new file mode 100644
index 000000000..fff74b578
--- /dev/null
+++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl
@@ -0,0 +1,22 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_timescale_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+ bridge_v1_type_name/0,
+ action_type_name/0,
+ connector_type_name/0,
+ schema_module/0
+]).
+
+bridge_v1_type_name() -> timescale.
+
+action_type_name() -> timescale.
+
+connector_type_name() -> timescale.
+
+schema_module() -> emqx_bridge_timescale.
diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
index 535917e4e..389623b0a 100644
--- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
+++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
@@ -29,12 +29,18 @@ resource_type(gcp_pubsub_producer) ->
emqx_bridge_gcp_pubsub_impl_producer;
resource_type(kafka_producer) ->
emqx_bridge_kafka_impl_producer;
+resource_type(matrix) ->
+ emqx_postgresql;
resource_type(mongodb) ->
emqx_bridge_mongodb_connector;
+resource_type(pgsql) ->
+ emqx_postgresql;
resource_type(syskeeper_forwarder) ->
emqx_bridge_syskeeper_connector;
resource_type(syskeeper_proxy) ->
emqx_bridge_syskeeper_proxy_server;
+resource_type(timescale) ->
+ emqx_postgresql;
resource_type(Type) ->
error({unknown_connector_type, Type}).
@@ -108,6 +114,30 @@ connector_structs() ->
desc => <<"Syskeeper Proxy Connector Config">>,
required => false
}
+ )},
+ {pgsql,
+ mk(
+ hoconsc:map(name, ref(emqx_bridge_pgsql, "config_connector")),
+ #{
+ desc => <<"PostgreSQL Connector Config">>,
+ required => false
+ }
+ )},
+ {timescale,
+ mk(
+ hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")),
+ #{
+ desc => <<"Timescale Connector Config">>,
+ required => false
+ }
+ )},
+ {matrix,
+ mk(
+ hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")),
+ #{
+ desc => <<"Matrix Connector Config">>,
+ required => false
+ }
)}
].
@@ -129,9 +159,12 @@ schema_modules() ->
emqx_bridge_confluent_producer,
emqx_bridge_gcp_pubsub_producer_schema,
emqx_bridge_kafka,
+ emqx_bridge_matrix,
emqx_bridge_mongodb,
emqx_bridge_syskeeper_connector,
- emqx_bridge_syskeeper_proxy
+ emqx_bridge_syskeeper_proxy,
+ emqx_bridge_timescale,
+ emqx_postgresql_connector_schema
].
api_schemas(Method) ->
@@ -150,9 +183,12 @@ api_schemas(Method) ->
Method ++ "_connector"
),
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
+ api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
- api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method)
+ api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
+ api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
+ api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector")
].
api_ref(Module, Type, Method) ->
diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl
index 765a693e2..a7de0cf52 100644
--- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl
+++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl
@@ -70,9 +70,12 @@ connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_pro
connector_type_to_bridge_types(confluent_producer) -> [confluent_producer];
connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer];
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
+connector_type_to_bridge_types(matrix) -> [matrix];
connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
+connector_type_to_bridge_types(pgsql) -> [pgsql];
connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder];
-connector_type_to_bridge_types(syskeeper_proxy) -> [].
+connector_type_to_bridge_types(syskeeper_proxy) -> [];
+connector_type_to_bridge_types(timescale) -> [timescale].
actions_config_name() -> <<"actions">>.
diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl
index ba1ad4be5..ce62fa30b 100644
--- a/apps/emqx_postgresql/src/emqx_postgresql.erl
+++ b/apps/emqx_postgresql/src/emqx_postgresql.erl
@@ -34,7 +34,11 @@
on_stop/2,
on_query/3,
on_batch_query/3,
- on_get_status/2
+ on_get_status/2,
+ on_add_channel/4,
+ on_remove_channel/3,
+ on_get_channels/1,
+ on_get_channel_status/3
]).
-export([connect/1]).
@@ -136,10 +140,11 @@ on_start(
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize}
],
- State = parse_prepare_sql(Config),
+ State1 = parse_prepare_sql(Config, <<"send_message">>),
+ State2 = State1#{installed_channels => #{}},
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok ->
- {ok, init_prepare(State#{pool_name => InstId, prepares => #{}})};
+ {ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})};
{error, Reason} ->
?tp(
pgsql_connector_start_failed,
@@ -148,13 +153,137 @@ on_start(
{error, Reason}
end.
-on_stop(InstId, _State) ->
+on_stop(InstId, State) ->
?SLOG(info, #{
msg => "stopping_postgresql_connector",
connector => InstId
}),
+ close_connections(State),
emqx_resource_pool:stop(InstId).
+close_connections(#{pool_name := PoolName} = _State) ->
+ WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
+ close_connections_with_worker_pids(WorkerPids),
+ ok.
+
+close_connections_with_worker_pids([WorkerPid | Rest]) ->
+ %% We ignore errors since any error probably means that the
+ %% connection is closed already.
+ try ecpool_worker:client(WorkerPid) of
+ {ok, Conn} ->
+ _ = epgsql:close(Conn),
+ close_connections_with_worker_pids(Rest);
+ _ ->
+ close_connections_with_worker_pids(Rest)
+ catch
+ _:_ ->
+ close_connections_with_worker_pids(Rest)
+ end;
+close_connections_with_worker_pids([]) ->
+ ok.
+
+on_add_channel(
+ _InstId,
+ #{
+ installed_channels := InstalledChannels
+ } = OldState,
+ ChannelId,
+ ChannelConfig
+) ->
+ %% The following will throw an exception if the bridge producers fails to start
+ {ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig),
+ case ChannelState of
+ #{prepares := {error, Reason}} ->
+ {error, {unhealthy_target, Reason}};
+ _ ->
+ NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
+ %% Update state
+ NewState = OldState#{installed_channels => NewInstalledChannels},
+ {ok, NewState}
+ end.
+
+create_channel_state(
+ ChannelId,
+ #{pool_name := PoolName} = _ConnectorState,
+ #{parameters := Parameters} = _ChannelConfig
+) ->
+ State1 = parse_prepare_sql(Parameters, ChannelId),
+ {ok,
+ init_prepare(State1#{
+ pool_name => PoolName,
+ prepare_statement => #{}
+ })}.
+
+on_remove_channel(
+ _InstId,
+ #{
+ installed_channels := InstalledChannels
+ } = OldState,
+ ChannelId
+) ->
+ %% Close prepared statements
+ ok = close_prepared_statement(ChannelId, OldState),
+ NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
+ %% Update state
+ NewState = OldState#{installed_channels => NewInstalledChannels},
+ {ok, NewState}.
+
+close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
+ WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
+ close_prepared_statement(WorkerPids, ChannelId, State),
+ ok.
+
+close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
+ %% We ignore errors since any error probably means that the
+ %% prepared statement doesn't exist.
+ try ecpool_worker:client(WorkerPid) of
+ {ok, Conn} ->
+ Statement = get_prepared_statement(ChannelId, State),
+ _ = epgsql:close(Conn, Statement),
+ close_prepared_statement(Rest, ChannelId, State);
+ _ ->
+ close_prepared_statement(Rest, ChannelId, State)
+ catch
+ _:_ ->
+ close_prepared_statement(Rest, ChannelId, State)
+ end;
+close_prepared_statement([], _ChannelId, _State) ->
+ ok.
+
+on_get_channel_status(
+ _ResId,
+ ChannelId,
+ #{
+ pool_name := PoolName,
+ installed_channels := Channels
+ } = _State
+) ->
+ ChannelState = maps:get(ChannelId, Channels),
+ case
+ do_check_channel_sql(
+ PoolName,
+ ChannelId,
+ ChannelState
+ )
+ of
+ ok ->
+ connected;
+ {error, undefined_table} ->
+ {error, {unhealthy_target, <<"Table does not exist">>}}
+ end.
+
+do_check_channel_sql(
+ PoolName,
+ ChannelId,
+ #{query_templates := ChannelQueryTemplates} = _ChannelState
+) ->
+ {SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates),
+ WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
+ validate_table_existence(WorkerPids, SQL).
+
+on_get_channels(ResId) ->
+ emqx_bridge_v2:get_channels_for_connector(ResId).
+
on_query(InstId, {TypeOrKey, NameOrSQL}, State) ->
on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
on_query(
@@ -187,10 +316,10 @@ pgsql_query_type(_) ->
on_batch_query(
InstId,
[{Key, _} = Request | _] = BatchReq,
- #{pool_name := PoolName, query_templates := Templates, prepares := PrepStatements} = State
+ #{pool_name := PoolName} = State
) ->
BinKey = to_bin(Key),
- case maps:get(BinKey, Templates, undefined) of
+ case get_template(BinKey, State) of
undefined ->
Log = #{
connector => InstId,
@@ -201,7 +330,7 @@ on_batch_query(
?SLOG(error, Log),
{error, {unrecoverable_error, batch_prepare_not_implemented}};
{_Statement, RowTemplate} ->
- PrepStatement = maps:get(BinKey, PrepStatements),
+ PrepStatement = get_prepared_statement(BinKey, State),
Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
case on_sql_query(InstId, PoolName, execute_batch, PrepStatement, Rows) of
{error, _Error} = Result ->
@@ -223,15 +352,35 @@ proc_sql_params(query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
-proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) ->
- Key = to_bin(TypeOrKey),
- case maps:get(Key, Templates, undefined) of
+proc_sql_params(TypeOrKey, SQLOrData, Params, State) ->
+ BinKey = to_bin(TypeOrKey),
+ case get_template(BinKey, State) of
undefined ->
{SQLOrData, Params};
{_Statement, RowTemplate} ->
- {Key, render_prepare_sql_row(RowTemplate, SQLOrData)}
+ {BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)}
end.
+get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
+ BinKey = to_bin(Key),
+ ChannelState = maps:get(BinKey, Channels),
+ ChannelQueryTemplates = maps:get(query_templates, ChannelState),
+ maps:get(BinKey, ChannelQueryTemplates);
+get_template(Key, #{query_templates := Templates}) ->
+ BinKey = to_bin(Key),
+ maps:get(BinKey, Templates, undefined).
+
+get_prepared_statement(Key, #{installed_channels := Channels} = _State) when
+ is_map_key(Key, Channels)
+->
+ BinKey = to_bin(Key),
+ ChannelState = maps:get(BinKey, Channels),
+ ChannelPreparedStatements = maps:get(prepares, ChannelState),
+ maps:get(BinKey, ChannelPreparedStatements);
+get_prepared_statement(Key, #{prepares := PrepStatements}) ->
+ BinKey = to_bin(Key),
+ maps:get(BinKey, PrepStatements).
+
on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
{error, Reason} = Result ->
@@ -415,13 +564,13 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
conn_opts([_Opt | Opts], Acc) ->
conn_opts(Opts, Acc).
-parse_prepare_sql(Config) ->
+parse_prepare_sql(Config, SQLID) ->
Queries =
case Config of
#{prepare_statement := Qs} ->
Qs;
#{sql := Query} ->
- #{<<"send_message">> => Query};
+ #{SQLID => Query};
#{} ->
#{}
end,
diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl
new file mode 100644
index 000000000..74591beee
--- /dev/null
+++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl
@@ -0,0 +1,150 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_postgresql_connector_schema).
+
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_postgresql/include/emqx_postgresql.hrl").
+
+-define(PGSQL_HOST_OPTIONS, #{
+ default_port => ?PGSQL_DEFAULT_PORT
+}).
+
+-export([
+ roots/0,
+ fields/1,
+ desc/1
+]).
+
+%% Examples
+-export([
+ connector_examples/1,
+ values/1
+]).
+
+roots() ->
+ [].
+
+fields("connection_fields") ->
+ [{server, server()}] ++
+ adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
+ emqx_connector_schema_lib:ssl_fields();
+fields("config_connector") ->
+ fields("connection_fields") ++ emqx_connector_schema:common_fields();
+fields(config) ->
+ fields("config_connector") ++
+ fields(action);
+fields(action) ->
+ {pgsql,
+ hoconsc:mk(
+ hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)),
+ #{
+ desc => <<"PostgreSQL Action Config">>,
+ required => false
+ }
+ )};
+fields(pgsql_action) ->
+ emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters));
+fields("put_bridge_v2") ->
+ fields(pgsql_action);
+fields("get_bridge_v2") ->
+ fields(pgsql_action);
+fields("post_bridge_v2") ->
+ fields(pgsql_action);
+fields("put_connector") ->
+ fields("config_connector");
+fields("get_connector") ->
+ fields("config_connector");
+fields("post_connector") ->
+ fields("config_connector").
+
+server() ->
+ Meta = #{desc => ?DESC("server")},
+ emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
+
+adjust_fields(Fields) ->
+ lists:map(
+ fun
+ ({username, Sc}) ->
+ %% to please dialyzer...
+ Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
+ {username, hocon_schema:override(Sc, Override)};
+ (Field) ->
+ Field
+ end,
+ Fields
+ ).
+
+%% Examples
+connector_examples(Method) ->
+ [
+ #{
+ <<"pgsql">> => #{
+ summary => <<"PostgreSQL Connector">>,
+ value => values({Method, pgsql})
+ }
+ }
+ ].
+
+%% TODO: All of these needs to be adjusted from Kafka to PostgreSQL
+values({get, PostgreSQLType}) ->
+ maps:merge(
+ #{
+ status => <<"connected">>,
+ node_status => [
+ #{
+ node => <<"emqx@localhost">>,
+ status => <<"connected">>
+ }
+ ]
+ },
+ values({post, PostgreSQLType})
+ );
+values({post, PostgreSQLType}) ->
+ values({put, PostgreSQLType});
+values({put, PostgreSQLType}) ->
+ maps:merge(
+ #{
+ name => <<"my_action">>,
+ type => PostgreSQLType
+ },
+ values(common)
+ );
+values(common) ->
+ #{
+ <<"database">> => <<"emqx_data">>,
+ <<"enable">> => true,
+ <<"password">> => <<"public">>,
+ <<"pool_size">> => 8,
+ <<"server">> => <<"127.0.0.1:5432">>,
+ <<"ssl">> => #{
+ <<"ciphers">> => [],
+ <<"depth">> => 10,
+ <<"enable">> => false,
+ <<"hibernate_after">> => <<"5s">>,
+ <<"log_level">> => <<"notice">>,
+ <<"reuse_sessions">> => true,
+ <<"secure_renegotiate">> => true,
+ <<"verify">> => <<"verify_peer">>,
+ <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
+ },
+ <<"username">> => <<"postgres">>
+ }.
+
+desc("config_connector") ->
+ ?DESC("config_connector");
+desc(_) ->
+ undefined.
diff --git a/changes/ee/feat-12013.en.md b/changes/ee/feat-12013.en.md
new file mode 100644
index 000000000..b72b7b5be
--- /dev/null
+++ b/changes/ee/feat-12013.en.md
@@ -0,0 +1 @@
+The bridges for PostgreSQL, Timescale and Matrix have been split so they are available via the connectors and actions APIs. They are still backwards compatible with the old bridge API.
diff --git a/rel/i18n/emqx_bridge_pgsql.hocon b/rel/i18n/emqx_bridge_pgsql.hocon
index 0a5ca2b04..8fcf9139e 100644
--- a/rel/i18n/emqx_bridge_pgsql.hocon
+++ b/rel/i18n/emqx_bridge_pgsql.hocon
@@ -40,4 +40,17 @@ sql_template.desc:
sql_template.label:
"""SQL Template"""
+pgsql_action.desc:
+"""Configuration for PostgreSQL Action"""
+
+pgsql_action.label:
+"""PostgreSQL Action Configuration"""
+
+
+action_parameters.desc:
+"""Configuration Parameters Specific to the PostgreSQL Action"""
+
+action_parameters.label:
+"""Action Parameters"""
+
}
diff --git a/rel/i18n/emqx_postgresql.hocon b/rel/i18n/emqx_postgresql.hocon
index c6d2581c1..9740b0814 100644
--- a/rel/i18n/emqx_postgresql.hocon
+++ b/rel/i18n/emqx_postgresql.hocon
@@ -8,4 +8,10 @@ The PostgreSQL default port 5432 is used if `[:Port]` is not specified."""
server.label:
"""Server Host"""
+config_connector.desc:
+"""The configuration for the PostgreSQL connector."""
+
+config_connector.label:
+"""PostgreSQL Connector Config"""
+
}
diff --git a/rel/i18n/emqx_postgresql_connector_schema.hocon b/rel/i18n/emqx_postgresql_connector_schema.hocon
new file mode 100644
index 000000000..8ecfb958a
--- /dev/null
+++ b/rel/i18n/emqx_postgresql_connector_schema.hocon
@@ -0,0 +1,18 @@
+
+emqx_postgresql_connector_schema {
+
+server.desc:
+"""The IPv4 or IPv6 address or the hostname to connect to.
+A host entry has the following form: `Host[:Port]`.
+The PostgreSQL default port 5432 is used if `[:Port]` is not specified."""
+
+server.label:
+"""Server Host"""
+
+config_connector.desc:
+"""The configuration for the PostgreSQL connector."""
+
+config_connector.label:
+"""PostgreSQL Connector Config"""
+
+}