Merge pull request #12013 from kjellwinblad/kjell/postgresql_conn_action_3/EMQX-11155

split pgsql, matrix and timescale into connector action
This commit is contained in:
Kjell Winblad 2023-11-25 10:23:53 +01:00 committed by GitHub
commit 7161f9d181
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 858 additions and 127 deletions

View File

@ -77,8 +77,11 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_confluent_producer_action_info,
emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_kafka_action_info,
emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info,
emqx_bridge_syskeeper_action_info
emqx_bridge_pgsql_action_info,
emqx_bridge_syskeeper_action_info,
emqx_bridge_timescale_action_info
].
-else.
hard_coded_action_info_modules_ee() ->

View File

@ -240,8 +240,8 @@ send_message(BridgeId, Message) ->
{BridgeV1Type, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
case emqx_bridge_v2:is_bridge_v2_type(BridgeV1Type) of
true ->
BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
emqx_bridge_v2:send_message(BridgeV2Type, BridgeName, Message, #{});
ActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeV1Type),
emqx_bridge_v2:send_message(ActionType, BridgeName, Message, #{});
false ->
ResId = emqx_bridge_resource:resource_id(BridgeV1Type, BridgeName),
send_message(BridgeV1Type, BridgeName, ResId, Message, #{})
@ -414,7 +414,7 @@ remove(BridgeType0, BridgeName) ->
}),
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
emqx_bridge_v2:remove(BridgeType, BridgeName);
emqx_bridge_v2:bridge_v1_remove(BridgeType0, BridgeName);
false ->
remove_v1(BridgeType, BridgeName)
end.

View File

@ -55,6 +55,7 @@
disable_enable/3,
health_check/2,
send_message/4,
query/4,
start/2,
reset_metrics/2,
create_dry_run/2,
@ -116,7 +117,9 @@
bridge_v1_enable_disable/3,
bridge_v1_restart/2,
bridge_v1_stop/2,
bridge_v1_start/2
bridge_v1_start/2,
%% For test cases only
bridge_v1_remove/2
]).
%%====================================================================
@ -547,25 +550,25 @@ get_query_mode(BridgeV2Type, Config) ->
ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType),
emqx_resource:query_mode(ResourceType, Config, CreationOpts).
-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
-spec query(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
term() | {error, term()}.
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
query(BridgeType, BridgeName, Message, QueryOpts0) ->
case lookup_conf(BridgeType, BridgeName) of
#{enable := true} = Config0 ->
Config = combine_connector_and_bridge_v2_config(BridgeType, BridgeName, Config0),
do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
do_query_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
#{enable := false} ->
{error, bridge_stopped};
_Error ->
{error, bridge_not_found}
end.
do_send_msg_with_enabled_config(
do_query_with_enabled_config(
_BridgeType, _BridgeName, _Message, _QueryOpts0, {error, Reason} = Error
) ->
?SLOG(error, Reason),
Error;
do_send_msg_with_enabled_config(
do_query_with_enabled_config(
BridgeType, BridgeName, Message, QueryOpts0, Config
) ->
QueryMode = get_query_mode(BridgeType, Config),
@ -579,7 +582,17 @@ do_send_msg_with_enabled_config(
}
),
BridgeV2Id = id(BridgeType, BridgeName),
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts).
case Message of
{send_message, Msg} ->
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Msg}, QueryOpts);
Msg ->
emqx_resource:query(BridgeV2Id, Msg, QueryOpts)
end.
-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
term() | {error, term()}.
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
query(BridgeType, BridgeName, {send_message, Message}, QueryOpts0).
-spec health_check(BridgeType :: term(), BridgeName :: term()) ->
#{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}.
@ -785,17 +798,24 @@ parse_id(Id) ->
end.
get_channels_for_connector(ConnectorId) ->
{ConnectorType, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId),
RootConf = maps:keys(emqx:get_config([?ROOT_KEY], #{})),
RelevantBridgeV2Types = [
Type
|| Type <- RootConf,
connector_type(Type) =:= ConnectorType
],
lists:flatten([
get_channels_for_connector(ConnectorName, BridgeV2Type)
|| BridgeV2Type <- RelevantBridgeV2Types
]).
try emqx_connector_resource:parse_connector_id(ConnectorId) of
{ConnectorType, ConnectorName} ->
RootConf = maps:keys(emqx:get_config([?ROOT_KEY], #{})),
RelevantBridgeV2Types = [
Type
|| Type <- RootConf,
connector_type(Type) =:= ConnectorType
],
lists:flatten([
get_channels_for_connector(ConnectorName, BridgeV2Type)
|| BridgeV2Type <- RelevantBridgeV2Types
])
catch
_:_ ->
%% ConnectorId is not a valid connector id so we assume the connector
%% has no channels (e.g. it is a a connector for authn or authz)
[]
end.
get_channels_for_connector(ConnectorName, BridgeV2Type) ->
BridgeV2s = emqx:get_config([?ROOT_KEY, BridgeV2Type], #{}),
@ -1325,6 +1345,34 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf).
%% Only called by test cases (may create broken references)
bridge_v1_remove(BridgeV1Type, BridgeName) ->
ActionType = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
bridge_v1_remove(
ActionType,
BridgeName,
lookup_conf(ActionType, BridgeName)
).
bridge_v1_remove(
ActionType,
Name,
#{connector := ConnectorName}
) ->
case remove(ActionType, Name) of
ok ->
ConnectorType = connector_type(ActionType),
emqx_connector:remove(ConnectorType, ConnectorName);
Error ->
Error
end;
bridge_v1_remove(
_ActionType,
_Name,
Error
) ->
Error.
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
bridge_v1_check_deps_and_remove(

View File

@ -1,13 +1,13 @@
{application, emqx_bridge_matrix, [
{description, "EMQX Enterprise MatrixDB Bridge"},
{vsn, "0.1.2"},
{vsn, "0.1.3"},
{registered, []},
{applications, [
kernel,
stdlib,
emqx_resource
]},
{env, []},
{env, [{emqx_action_info_modules, [emqx_bridge_matrix_action_info]}]},
{modules, []},
{links, []}
]}.

View File

@ -3,6 +3,8 @@
%%--------------------------------------------------------------------
-module(emqx_bridge_matrix).
-include_lib("hocon/include/hoconsc.hrl").
-export([
conn_bridge_examples/1
]).
@ -14,6 +16,12 @@
desc/1
]).
%% Examples
-export([
bridge_v2_examples/1,
connector_examples/1
]).
%% -------------------------------------------------------------------------------------------------
%% api
@ -22,7 +30,7 @@ conn_bridge_examples(Method) ->
#{
<<"matrix">> => #{
summary => <<"Matrix Bridge">>,
value => emqx_bridge_pgsql:values(Method, matrix)
value => emqx_bridge_pgsql:values_conn_bridge_examples(Method, matrix)
}
}
].
@ -35,8 +43,55 @@ roots() -> [].
fields("post") ->
emqx_bridge_pgsql:fields("post", matrix);
fields("config_connector") ->
emqx_bridge_pgsql:fields("config_connector");
fields(action) ->
{matrix,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)),
#{
desc => <<"Matrix Action Config">>,
required => false
}
)};
fields("put_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
fields("get_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
fields("post_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
fields("put_connector") ->
emqx_bridge_pgsql:fields("config_connector");
fields("get_connector") ->
emqx_bridge_pgsql:fields("config_connector");
fields("post_connector") ->
emqx_bridge_pgsql:fields("config_connector");
fields(Method) ->
emqx_bridge_pgsql:fields(Method).
desc("config_connector") ->
?DESC(emqx_postgresql_connector_schema, "config_connector");
desc(_) ->
undefined.
%% Examples
connector_examples(Method) ->
[
#{
<<"matrix">> => #{
summary => <<"Matrix Connector">>,
value => emqx_postgresql_connector_schema:values({Method, <<"matrix">>})
}
}
].
bridge_v2_examples(Method) ->
[
#{
<<"matrix">> => #{
summary => <<"Matrix Action">>,
value => emqx_bridge_pgsql:values({Method, matrix})
}
}
].

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_matrix_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> matrix.
action_type_name() -> matrix.
connector_type_name() -> matrix.
schema_module() -> emqx_bridge_matrix.

View File

@ -8,7 +8,7 @@
emqx_resource,
emqx_postgresql
]},
{env, []},
{env, [{emqx_action_info_modules, [emqx_bridge_pgsql_action_info]}]},
{modules, []},
{links, []}
]}.

View File

@ -1,83 +1,97 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pgsql).
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("emqx_postgresql/include/emqx_postgresql.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("epgsql/include/epgsql.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
conn_bridge_examples/1,
values/2,
fields/2
]).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
desc/1,
fields/2
]).
-define(DEFAULT_SQL, <<
"insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
"values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
>>).
%% Examples
-export([
bridge_v2_examples/1,
conn_bridge_examples/1
]).
%% -------------------------------------------------------------------------------------------------
%% api
%% Exported for timescale and matrix bridges
-export([
values/1,
values_conn_bridge_examples/2
]).
conn_bridge_examples(Method) ->
[
#{
<<"pgsql">> => #{
summary => <<"PostgreSQL Bridge">>,
value => values(Method, pgsql)
}
}
].
-define(PGSQL_HOST_OPTIONS, #{
default_port => ?PGSQL_DEFAULT_PORT
}).
values(_Method, Type) ->
#{
enable => true,
type => Type,
name => <<"foo">>,
server => <<"127.0.0.1:5432">>,
database => <<"mqtt">>,
pool_size => 8,
username => <<"root">>,
password => <<"******">>,
sql => ?DEFAULT_SQL,
local_topic => <<"local/topic/#">>,
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
}
}.
%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
namespace() -> "bridge_pgsql".
roots() -> [].
roots() ->
[].
fields("config_connector") ->
emqx_postgresql_connector_schema:fields("config_connector");
fields(config) ->
fields("config_connector") ++
fields(action);
fields(action) ->
{pgsql,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)),
#{
desc => <<"PostgreSQL Action Config">>,
required => false
}
)};
fields(action_parameters) ->
[
{sql,
hoconsc:mk(
binary(),
#{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>}
)}
] ++
emqx_connector_schema_lib:prepare_statement_fields();
fields(pgsql_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
hoconsc:ref(?MODULE, action_parameters),
#{
required => true,
desc => ?DESC("action_parameters")
}
)
);
fields("put_bridge_v2") ->
fields(pgsql_action);
fields("get_bridge_v2") ->
fields(pgsql_action);
fields("post_bridge_v2") ->
fields(pgsql_action);
fields("config") ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{sql,
mk(
hoconsc:mk(
binary(),
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
#{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>}
)},
{local_topic,
mk(
hoconsc:mk(
binary(),
#{desc => ?DESC("local_topic"), default => undefined}
)}
@ -94,17 +108,132 @@ fields("get") ->
fields("post", Type) ->
[type_field(Type), name_field() | fields("config")].
type_field(Type) ->
{type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
name_field() ->
{name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."];
desc(pgsql_action) ->
?DESC("pgsql_action");
desc(action_parameters) ->
?DESC("action_parameters");
desc("config_connector") ->
?DESC(emqx_postgresql_connector_schema, "config_connector");
desc(_) ->
undefined.
%% -------------------------------------------------------------------------------------------------
default_sql() ->
<<
"insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
"values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
>>.
type_field(Type) ->
{type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
%% Examples
name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
bridge_v2_examples(Method) ->
[
#{
<<"pgsql">> => #{
summary => <<"PostgreSQL Action">>,
value => values({Method, pgsql})
}
}
].
conn_bridge_examples(Method) ->
[
#{
<<"pgsql">> => #{
summary => <<"PostgreSQL Bridge">>,
value => values_conn_bridge_examples(Method, pgsql)
}
}
].
values({get, PostgreSQLType}) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
},
values({put, PostgreSQLType})
);
values({post, PostgreSQLType}) ->
values({put, PostgreSQLType});
values({put, PostgreSQLType}) ->
maps:merge(
#{
name => <<"my_action">>,
type => PostgreSQLType,
enable => true,
connector => <<"my_connector">>,
resource_opts => #{
batch_size => 1,
batch_time => <<"50ms">>,
inflight_window => 100,
max_buffer_bytes => <<"256MB">>,
request_ttl => <<"45s">>,
worker_pool_size => 16
}
},
values(parameters)
);
values(parameters) ->
#{
<<"parameters">> => #{
<<"sql">> =>
<<
"INSERT INTO client_events(clientid, event, created_at)"
"VALUES (\n"
" ${clientid},\n"
" ${event},\n"
" TO_TIMESTAMP((${timestamp} :: bigint))\n"
")"
>>
}
}.
values_conn_bridge_examples(get, Type) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
},
values_conn_bridge_examples(post, Type)
);
values_conn_bridge_examples(_Method, Type) ->
#{
enable => true,
type => Type,
name => <<"foo">>,
server => <<"127.0.0.1:5432">>,
database => <<"mqtt">>,
pool_size => 8,
username => <<"root">>,
password => <<"******">>,
sql => default_sql(),
local_topic => <<"local/topic/#">>,
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
}
}.

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pgsql_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> pgsql.
action_type_name() -> pgsql.
connector_type_name() -> pgsql.
schema_module() -> emqx_bridge_pgsql.

View File

@ -114,7 +114,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
ok = emqx_common_test_helpers:stop_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
ok.
init_per_testcase(_Testcase, Config) ->
@ -147,7 +147,7 @@ common_init(Config0) ->
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
% Ensure enterprise bridge module is loaded
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
ok = emqx_common_test_helpers:start_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
_ = emqx_bridge_enterprise:module_info(),
emqx_mgmt_api_test_util:init_suite(),
% Connect to pgsql directly and create the table
@ -259,17 +259,16 @@ send_message(Config, Payload) ->
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
emqx_bridge:send_message(BridgeID, Payload).
query_resource(Config, Request) ->
query_resource(Config, Msg = _Request) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
emqx_bridge_v2:query(BridgeType, Name, Msg, #{timeout => 1_000}).
query_resource_sync(Config, Request) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
ActionId = emqx_bridge_v2:id(BridgeType, Name),
emqx_resource_buffer_worker:simple_sync_query(ActionId, Request).
query_resource_async(Config, Request) ->
query_resource_async(Config, Request, _Opts = #{}).
@ -279,9 +278,8 @@ query_resource_async(Config, Request, Opts) ->
BridgeType = ?config(pgsql_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Timeout = maps:get(timeout, Opts, 500),
Return = emqx_resource:query(ResourceID, Request, #{
Return = emqx_bridge_v2:query(BridgeType, Name, Request, #{
timeout => Timeout,
async_reply_fun => {AsyncReplyFun, []}
}),
@ -441,13 +439,12 @@ t_get_status(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)),
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch(
{ok, Status} when Status =:= disconnected orelse Status =:= connecting,
emqx_resource_manager:health_check(ResourceID)
#{status := Status} when Status =:= disconnected orelse Status =:= connecting,
emqx_bridge_v2:health_check(BridgeType, Name)
)
end),
ok.
@ -655,7 +652,7 @@ t_nasty_sql_string(Config) ->
t_missing_table(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
% ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
@ -665,21 +662,20 @@ t_missing_table(Config) ->
_Sleep = 1_000,
_Attempts = 20,
?assertMatch(
{ok, Status} when Status == connecting orelse Status == disconnected,
emqx_resource_manager:health_check(ResourceID)
#{status := Status} when Status == connecting orelse Status == disconnected,
emqx_bridge_v2:health_check(BridgeType, Name)
)
),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 1000,
?assertMatch(
{error, {resource_error, #{reason := unhealthy_target}}},
query_resource(Config, {send_message, SentData, [], Timeout})
query_resource(Config, {send_message, SentData})
),
ok
end,
fun(Trace) ->
?assertMatch([_], ?of_kind(pgsql_undefined_table, Trace)),
?assertMatch([_ | _], ?of_kind(pgsql_undefined_table, Trace)),
ok
end
),
@ -689,7 +685,7 @@ t_missing_table(Config) ->
t_table_removed(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
%%ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
connect_and_create_table(Config),
@ -697,13 +693,14 @@ t_table_removed(Config) ->
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
),
connect_and_drop_table(Config),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
case query_resource_sync(Config, {send_message, SentData, []}) of
{error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}} ->
ActionId = emqx_bridge_v2:id(BridgeType, Name),
case query_resource_sync(Config, {ActionId, SentData}) of
{error, {unrecoverable_error, _}} ->
ok;
?RESOURCE_ERROR_M(not_connected, _) ->
ok;
@ -720,7 +717,6 @@ t_table_removed(Config) ->
t_concurrent_health_checks(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
connect_and_create_table(Config),
@ -728,11 +724,13 @@ t_concurrent_health_checks(Config) ->
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
),
emqx_utils:pmap(
fun(_) ->
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
?assertMatch(
#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)
)
end,
lists:seq(1, 20)
),

View File

@ -1,8 +1,9 @@
{application, emqx_bridge_timescale, [
{description, "EMQX Enterprise TimescaleDB Bridge"},
{vsn, "0.1.2"},
{vsn, "0.1.3"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource]},
{env, [{emqx_action_info_module, emqx_bridge_timescale_action_info}]},
{env, []},
{modules, []},
{links, []}

View File

@ -3,6 +3,8 @@
%%--------------------------------------------------------------------
-module(emqx_bridge_timescale).
-include_lib("hocon/include/hoconsc.hrl").
-export([
conn_bridge_examples/1
]).
@ -14,6 +16,12 @@
desc/1
]).
%% Examples
-export([
bridge_v2_examples/1,
connector_examples/1
]).
%% -------------------------------------------------------------------------------------------------
%% api
@ -22,7 +30,7 @@ conn_bridge_examples(Method) ->
#{
<<"timescale">> => #{
summary => <<"Timescale Bridge">>,
value => emqx_bridge_pgsql:values(Method, timescale)
value => emqx_bridge_pgsql:values_conn_bridge_examples(Method, timescale)
}
}
].
@ -35,8 +43,55 @@ roots() -> [].
fields("post") ->
emqx_bridge_pgsql:fields("post", timescale);
fields("config_connector") ->
emqx_bridge_pgsql:fields("config_connector");
fields(action) ->
{timescale,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)),
#{
desc => <<"Timescale Action Config">>,
required => false
}
)};
fields("put_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
fields("get_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
fields("post_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
fields("put_connector") ->
emqx_bridge_pgsql:fields("config_connector");
fields("get_connector") ->
emqx_bridge_pgsql:fields("config_connector");
fields("post_connector") ->
emqx_bridge_pgsql:fields("config_connector");
fields(Method) ->
emqx_bridge_pgsql:fields(Method).
desc("config_connector") ->
?DESC(emqx_postgresql_connector_schema, "config_connector");
desc(_) ->
undefined.
%% Examples
connector_examples(Method) ->
[
#{
<<"timescale">> => #{
summary => <<"Timescale Connector">>,
value => emqx_postgresql_connector_schema:values({Method, <<"timescale">>})
}
}
].
bridge_v2_examples(Method) ->
[
#{
<<"timescale">> => #{
summary => <<"Timescale Action">>,
value => emqx_bridge_pgsql:values({Method, timescale})
}
}
].

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_timescale_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> timescale.
action_type_name() -> timescale.
connector_type_name() -> timescale.
schema_module() -> emqx_bridge_timescale.

View File

@ -29,12 +29,18 @@ resource_type(gcp_pubsub_producer) ->
emqx_bridge_gcp_pubsub_impl_producer;
resource_type(kafka_producer) ->
emqx_bridge_kafka_impl_producer;
resource_type(matrix) ->
emqx_postgresql;
resource_type(mongodb) ->
emqx_bridge_mongodb_connector;
resource_type(pgsql) ->
emqx_postgresql;
resource_type(syskeeper_forwarder) ->
emqx_bridge_syskeeper_connector;
resource_type(syskeeper_proxy) ->
emqx_bridge_syskeeper_proxy_server;
resource_type(timescale) ->
emqx_postgresql;
resource_type(Type) ->
error({unknown_connector_type, Type}).
@ -108,6 +114,30 @@ connector_structs() ->
desc => <<"Syskeeper Proxy Connector Config">>,
required => false
}
)},
{pgsql,
mk(
hoconsc:map(name, ref(emqx_bridge_pgsql, "config_connector")),
#{
desc => <<"PostgreSQL Connector Config">>,
required => false
}
)},
{timescale,
mk(
hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")),
#{
desc => <<"Timescale Connector Config">>,
required => false
}
)},
{matrix,
mk(
hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")),
#{
desc => <<"Matrix Connector Config">>,
required => false
}
)}
].
@ -129,9 +159,12 @@ schema_modules() ->
emqx_bridge_confluent_producer,
emqx_bridge_gcp_pubsub_producer_schema,
emqx_bridge_kafka,
emqx_bridge_matrix,
emqx_bridge_mongodb,
emqx_bridge_syskeeper_connector,
emqx_bridge_syskeeper_proxy
emqx_bridge_syskeeper_proxy,
emqx_bridge_timescale,
emqx_postgresql_connector_schema
].
api_schemas(Method) ->
@ -150,9 +183,12 @@ api_schemas(Method) ->
Method ++ "_connector"
),
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method)
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector")
].
api_ref(Module, Type, Method) ->

View File

@ -70,9 +70,12 @@ connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_pro
connector_type_to_bridge_types(confluent_producer) -> [confluent_producer];
connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer];
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(matrix) -> [matrix];
connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
connector_type_to_bridge_types(pgsql) -> [pgsql];
connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder];
connector_type_to_bridge_types(syskeeper_proxy) -> [].
connector_type_to_bridge_types(syskeeper_proxy) -> [];
connector_type_to_bridge_types(timescale) -> [timescale].
actions_config_name() -> <<"actions">>.

View File

@ -34,7 +34,11 @@
on_stop/2,
on_query/3,
on_batch_query/3,
on_get_status/2
on_get_status/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]).
-export([connect/1]).
@ -136,10 +140,11 @@ on_start(
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize}
],
State = parse_prepare_sql(Config),
State1 = parse_prepare_sql(Config, <<"send_message">>),
State2 = State1#{installed_channels => #{}},
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok ->
{ok, init_prepare(State#{pool_name => InstId, prepares => #{}})};
{ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})};
{error, Reason} ->
?tp(
pgsql_connector_start_failed,
@ -148,13 +153,137 @@ on_start(
{error, Reason}
end.
on_stop(InstId, _State) ->
on_stop(InstId, State) ->
?SLOG(info, #{
msg => "stopping_postgresql_connector",
connector => InstId
}),
close_connections(State),
emqx_resource_pool:stop(InstId).
close_connections(#{pool_name := PoolName} = _State) ->
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
close_connections_with_worker_pids(WorkerPids),
ok.
close_connections_with_worker_pids([WorkerPid | Rest]) ->
%% We ignore errors since any error probably means that the
%% connection is closed already.
try ecpool_worker:client(WorkerPid) of
{ok, Conn} ->
_ = epgsql:close(Conn),
close_connections_with_worker_pids(Rest);
_ ->
close_connections_with_worker_pids(Rest)
catch
_:_ ->
close_connections_with_worker_pids(Rest)
end;
close_connections_with_worker_pids([]) ->
ok.
on_add_channel(
_InstId,
#{
installed_channels := InstalledChannels
} = OldState,
ChannelId,
ChannelConfig
) ->
%% The following will throw an exception if the bridge producers fails to start
{ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig),
case ChannelState of
#{prepares := {error, Reason}} ->
{error, {unhealthy_target, Reason}};
_ ->
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}
end.
create_channel_state(
ChannelId,
#{pool_name := PoolName} = _ConnectorState,
#{parameters := Parameters} = _ChannelConfig
) ->
State1 = parse_prepare_sql(Parameters, ChannelId),
{ok,
init_prepare(State1#{
pool_name => PoolName,
prepare_statement => #{}
})}.
on_remove_channel(
_InstId,
#{
installed_channels := InstalledChannels
} = OldState,
ChannelId
) ->
%% Close prepared statements
ok = close_prepared_statement(ChannelId, OldState),
NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
close_prepared_statement(WorkerPids, ChannelId, State),
ok.
close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
%% We ignore errors since any error probably means that the
%% prepared statement doesn't exist.
try ecpool_worker:client(WorkerPid) of
{ok, Conn} ->
Statement = get_prepared_statement(ChannelId, State),
_ = epgsql:close(Conn, Statement),
close_prepared_statement(Rest, ChannelId, State);
_ ->
close_prepared_statement(Rest, ChannelId, State)
catch
_:_ ->
close_prepared_statement(Rest, ChannelId, State)
end;
close_prepared_statement([], _ChannelId, _State) ->
ok.
on_get_channel_status(
_ResId,
ChannelId,
#{
pool_name := PoolName,
installed_channels := Channels
} = _State
) ->
ChannelState = maps:get(ChannelId, Channels),
case
do_check_channel_sql(
PoolName,
ChannelId,
ChannelState
)
of
ok ->
connected;
{error, undefined_table} ->
{error, {unhealthy_target, <<"Table does not exist">>}}
end.
do_check_channel_sql(
PoolName,
ChannelId,
#{query_templates := ChannelQueryTemplates} = _ChannelState
) ->
{SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates),
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
validate_table_existence(WorkerPids, SQL).
on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId).
on_query(InstId, {TypeOrKey, NameOrSQL}, State) ->
on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
on_query(
@ -187,10 +316,10 @@ pgsql_query_type(_) ->
on_batch_query(
InstId,
[{Key, _} = Request | _] = BatchReq,
#{pool_name := PoolName, query_templates := Templates, prepares := PrepStatements} = State
#{pool_name := PoolName} = State
) ->
BinKey = to_bin(Key),
case maps:get(BinKey, Templates, undefined) of
case get_template(BinKey, State) of
undefined ->
Log = #{
connector => InstId,
@ -201,7 +330,7 @@ on_batch_query(
?SLOG(error, Log),
{error, {unrecoverable_error, batch_prepare_not_implemented}};
{_Statement, RowTemplate} ->
PrepStatement = maps:get(BinKey, PrepStatements),
PrepStatement = get_prepared_statement(BinKey, State),
Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
case on_sql_query(InstId, PoolName, execute_batch, PrepStatement, Rows) of
{error, _Error} = Result ->
@ -223,15 +352,35 @@ proc_sql_params(query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) ->
Key = to_bin(TypeOrKey),
case maps:get(Key, Templates, undefined) of
proc_sql_params(TypeOrKey, SQLOrData, Params, State) ->
BinKey = to_bin(TypeOrKey),
case get_template(BinKey, State) of
undefined ->
{SQLOrData, Params};
{_Statement, RowTemplate} ->
{Key, render_prepare_sql_row(RowTemplate, SQLOrData)}
{BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)}
end.
get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
BinKey = to_bin(Key),
ChannelState = maps:get(BinKey, Channels),
ChannelQueryTemplates = maps:get(query_templates, ChannelState),
maps:get(BinKey, ChannelQueryTemplates);
get_template(Key, #{query_templates := Templates}) ->
BinKey = to_bin(Key),
maps:get(BinKey, Templates, undefined).
get_prepared_statement(Key, #{installed_channels := Channels} = _State) when
is_map_key(Key, Channels)
->
BinKey = to_bin(Key),
ChannelState = maps:get(BinKey, Channels),
ChannelPreparedStatements = maps:get(prepares, ChannelState),
maps:get(BinKey, ChannelPreparedStatements);
get_prepared_statement(Key, #{prepares := PrepStatements}) ->
BinKey = to_bin(Key),
maps:get(BinKey, PrepStatements).
on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
{error, Reason} = Result ->
@ -415,13 +564,13 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
conn_opts([_Opt | Opts], Acc) ->
conn_opts(Opts, Acc).
parse_prepare_sql(Config) ->
parse_prepare_sql(Config, SQLID) ->
Queries =
case Config of
#{prepare_statement := Qs} ->
Qs;
#{sql := Query} ->
#{<<"send_message">> => Query};
#{SQLID => Query};
#{} ->
#{}
end,

View File

@ -0,0 +1,150 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_postgresql_connector_schema).
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx_postgresql/include/emqx_postgresql.hrl").
-define(PGSQL_HOST_OPTIONS, #{
default_port => ?PGSQL_DEFAULT_PORT
}).
-export([
roots/0,
fields/1,
desc/1
]).
%% Examples
-export([
connector_examples/1,
values/1
]).
roots() ->
[].
fields("connection_fields") ->
[{server, server()}] ++
adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
emqx_connector_schema_lib:ssl_fields();
fields("config_connector") ->
fields("connection_fields") ++ emqx_connector_schema:common_fields();
fields(config) ->
fields("config_connector") ++
fields(action);
fields(action) ->
{pgsql,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)),
#{
desc => <<"PostgreSQL Action Config">>,
required => false
}
)};
fields(pgsql_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters));
fields("put_bridge_v2") ->
fields(pgsql_action);
fields("get_bridge_v2") ->
fields(pgsql_action);
fields("post_bridge_v2") ->
fields(pgsql_action);
fields("put_connector") ->
fields("config_connector");
fields("get_connector") ->
fields("config_connector");
fields("post_connector") ->
fields("config_connector").
server() ->
Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
adjust_fields(Fields) ->
lists:map(
fun
({username, Sc}) ->
%% to please dialyzer...
Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
{username, hocon_schema:override(Sc, Override)};
(Field) ->
Field
end,
Fields
).
%% Examples
connector_examples(Method) ->
[
#{
<<"pgsql">> => #{
summary => <<"PostgreSQL Connector">>,
value => values({Method, pgsql})
}
}
].
%% TODO: All of these needs to be adjusted from Kafka to PostgreSQL
values({get, PostgreSQLType}) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
},
values({post, PostgreSQLType})
);
values({post, PostgreSQLType}) ->
values({put, PostgreSQLType});
values({put, PostgreSQLType}) ->
maps:merge(
#{
name => <<"my_action">>,
type => PostgreSQLType
},
values(common)
);
values(common) ->
#{
<<"database">> => <<"emqx_data">>,
<<"enable">> => true,
<<"password">> => <<"public">>,
<<"pool_size">> => 8,
<<"server">> => <<"127.0.0.1:5432">>,
<<"ssl">> => #{
<<"ciphers">> => [],
<<"depth">> => 10,
<<"enable">> => false,
<<"hibernate_after">> => <<"5s">>,
<<"log_level">> => <<"notice">>,
<<"reuse_sessions">> => true,
<<"secure_renegotiate">> => true,
<<"verify">> => <<"verify_peer">>,
<<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
},
<<"username">> => <<"postgres">>
}.
desc("config_connector") ->
?DESC("config_connector");
desc(_) ->
undefined.

View File

@ -0,0 +1 @@
The bridges for PostgreSQL, Timescale and Matrix have been split so they are available via the connectors and actions APIs. They are still backwards compatible with the old bridge API.

View File

@ -40,4 +40,17 @@ sql_template.desc:
sql_template.label:
"""SQL Template"""
pgsql_action.desc:
"""Configuration for PostgreSQL Action"""
pgsql_action.label:
"""PostgreSQL Action Configuration"""
action_parameters.desc:
"""Configuration Parameters Specific to the PostgreSQL Action"""
action_parameters.label:
"""Action Parameters"""
}

View File

@ -8,4 +8,10 @@ The PostgreSQL default port 5432 is used if `[:Port]` is not specified."""
server.label:
"""Server Host"""
config_connector.desc:
"""The configuration for the PostgreSQL connector."""
config_connector.label:
"""PostgreSQL Connector Config"""
}

View File

@ -0,0 +1,18 @@
emqx_postgresql_connector_schema {
server.desc:
"""The IPv4 or IPv6 address or the hostname to connect to.<br/>
A host entry has the following form: `Host[:Port]`.<br/>
The PostgreSQL default port 5432 is used if `[:Port]` is not specified."""
server.label:
"""Server Host"""
config_connector.desc:
"""The configuration for the PostgreSQL connector."""
config_connector.label:
"""PostgreSQL Connector Config"""
}