feat: split pgsql, matrix and timescale into connector action

This commit splits the bridges pgsql, matrix and timescale into
connector and action.

Fixes:
https://emqx.atlassian.net/browse/EMQX-11155
This commit is contained in:
Kjell Winblad 2023-11-14 17:16:09 +01:00
parent f8f8cf9f30
commit d5b62eead0
16 changed files with 832 additions and 57 deletions

View File

@ -78,7 +78,11 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_kafka_action_info,
emqx_bridge_mongodb_action_info,
emqx_bridge_syskeeper_action_info
emqx_bridge_syskeeper_action_info,
emqx_bridge_syskeeper_action_info,
emqx_bridge_pgsql_action_info,
emqx_bridge_timescale_action_info,
emqx_bridge_matrix_action_info
].
-else.
hard_coded_action_info_modules_ee() ->

View File

@ -240,8 +240,8 @@ send_message(BridgeId, Message) ->
{BridgeV1Type, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
case emqx_bridge_v2:is_bridge_v2_type(BridgeV1Type) of
true ->
BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
emqx_bridge_v2:send_message(BridgeV2Type, BridgeName, Message, #{});
ActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeV1Type),
emqx_bridge_v2:send_message(ActionType, BridgeName, Message, #{});
false ->
ResId = emqx_bridge_resource:resource_id(BridgeV1Type, BridgeName),
send_message(BridgeV1Type, BridgeName, ResId, Message, #{})
@ -414,7 +414,7 @@ remove(BridgeType0, BridgeName) ->
}),
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
emqx_bridge_v2:remove(BridgeType, BridgeName);
emqx_bridge_v2:bridge_v1_remove(BridgeType0, BridgeName);
false ->
remove_v1(BridgeType, BridgeName)
end.

View File

@ -55,6 +55,7 @@
disable_enable/3,
health_check/2,
send_message/4,
query/4,
start/2,
reset_metrics/2,
create_dry_run/2,
@ -116,7 +117,9 @@
bridge_v1_enable_disable/3,
bridge_v1_restart/2,
bridge_v1_stop/2,
bridge_v1_start/2
bridge_v1_start/2,
%% For test cases only
bridge_v1_remove/2
]).
%%====================================================================
@ -547,25 +550,25 @@ get_query_mode(BridgeV2Type, Config) ->
ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType),
emqx_resource:query_mode(ResourceType, Config, CreationOpts).
-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
-spec query(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
term() | {error, term()}.
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
query(BridgeType, BridgeName, Message, QueryOpts0) ->
case lookup_conf(BridgeType, BridgeName) of
#{enable := true} = Config0 ->
Config = combine_connector_and_bridge_v2_config(BridgeType, BridgeName, Config0),
do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
do_query_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
#{enable := false} ->
{error, bridge_stopped};
_Error ->
{error, bridge_not_found}
end.
do_send_msg_with_enabled_config(
do_query_with_enabled_config(
_BridgeType, _BridgeName, _Message, _QueryOpts0, {error, Reason} = Error
) ->
?SLOG(error, Reason),
Error;
do_send_msg_with_enabled_config(
do_query_with_enabled_config(
BridgeType, BridgeName, Message, QueryOpts0, Config
) ->
QueryMode = get_query_mode(BridgeType, Config),
@ -579,7 +582,17 @@ do_send_msg_with_enabled_config(
}
),
BridgeV2Id = id(BridgeType, BridgeName),
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts).
case Message of
{send_message, Msg} ->
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Msg}, QueryOpts);
Msg ->
emqx_resource:query(BridgeV2Id, Msg, QueryOpts)
end.
-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
term() | {error, term()}.
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
query(BridgeType, BridgeName, {send_message, Message}, QueryOpts0).
-spec health_check(BridgeType :: term(), BridgeName :: term()) ->
#{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}.
@ -1325,6 +1338,34 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf).
%% Only called by test cases (may create broken references)
bridge_v1_remove(BridgeV1Type, BridgeName) ->
ActionType = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
bridge_v1_remove(
ActionType,
BridgeName,
lookup_conf(ActionType, BridgeName)
).
bridge_v1_remove(
ActionType,
Name,
#{connector := ConnectorName}
) ->
case remove(ActionType, Name) of
ok ->
ConnectorType = connector_type(ActionType),
emqx_connector:remove(ConnectorType, ConnectorName);
Error ->
Error
end;
bridge_v1_remove(
_ActionType,
_Name,
Error
) ->
Error.
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
bridge_v1_check_deps_and_remove(

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_matrix, [
{description, "EMQX Enterprise MatrixDB Bridge"},
{vsn, "0.1.2"},
{vsn, "0.1.3"},
{registered, []},
{applications, [
kernel,

View File

@ -14,6 +14,12 @@
desc/1
]).
%% Examples
-export([
bridge_v2_examples/1,
connector_examples/1
]).
%% -------------------------------------------------------------------------------------------------
%% api
@ -22,7 +28,7 @@ conn_bridge_examples(Method) ->
#{
<<"matrix">> => #{
summary => <<"Matrix Bridge">>,
value => emqx_bridge_pgsql:values(Method, matrix)
value => emqx_bridge_pgsql_schema:values_conn_bridge_examples(Method, matrix)
}
}
].
@ -35,8 +41,53 @@ roots() -> [].
fields("post") ->
emqx_bridge_pgsql:fields("post", matrix);
fields("config_connector") ->
emqx_bridge_pgsql_schema:fields("config_connector");
fields(action) ->
{matrix,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)),
#{
desc => <<"Matrix Action Config">>,
required => false
}
)};
fields("put_bridge_v2") ->
emqx_bridge_pgsql_schema:fields(pgsql_action);
fields("get_bridge_v2") ->
emqx_bridge_pgsql_schema:fields(pgsql_action);
fields("post_bridge_v2") ->
emqx_bridge_pgsql_schema:fields(pgsql_action);
fields("put_connector") ->
emqx_bridge_pgsql_schema:fields("config_connector");
fields("get_connector") ->
emqx_bridge_pgsql_schema:fields("config_connector");
fields("post_connector") ->
emqx_bridge_pgsql_schema:fields("config_connector");
fields(Method) ->
emqx_bridge_pgsql:fields(Method).
desc(_) ->
undefined.
%% Examples
connector_examples(Method) ->
[
#{
<<"matrix">> => #{
summary => <<"Matrix Connector">>,
value => emqx_postgresql_connector_schema:values({Method, connector})
}
}
].
bridge_v2_examples(Method) ->
[
#{
<<"matrix">> => #{
summary => <<"Matrix Action">>,
value => emqx_bridge_pgsql_schema:values({Method, matrix})
}
}
].

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_matrix_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> matrix.
action_type_name() -> matrix.
connector_type_name() -> matrix.
schema_module() -> emqx_bridge_matrix.

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pgsql_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> pgsql.
action_type_name() -> pgsql.
connector_type_name() -> pgsql.
schema_module() -> emqx_bridge_pgsql_schema.

View File

@ -0,0 +1,172 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pgsql_schema).
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("emqx_postgresql/include/emqx_postgresql.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("epgsql/include/epgsql.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-export([roots/0, fields/1]).
%% Examples
-export([
bridge_v2_examples/1,
conn_bridge_examples/1
]).
%% Exported for timescale and matrix bridges
-export([
values/1,
values_conn_bridge_examples/2
]).
-define(PGSQL_HOST_OPTIONS, #{
default_port => ?PGSQL_DEFAULT_PORT
}).
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields("config_connector") ->
emqx_postgresql_connector_schema:fields("config_connector");
fields(config) ->
fields("config_connector") ++
fields(action);
fields(action) ->
{pgsql,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)),
#{
desc => <<"PostgreSQL Action Config">>,
required => false
}
)};
fields(action_parameters) ->
[
{sql,
hoconsc:mk(
binary(),
#{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>}
)}
] ++
emqx_connector_schema_lib:prepare_statement_fields();
fields(pgsql_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters));
%% TODO: All of these needs to be fixed
fields("put_bridge_v2") ->
fields(pgsql_action);
fields("get_bridge_v2") ->
fields(pgsql_action);
fields("post_bridge_v2") ->
fields(pgsql_action).
default_sql() ->
<<
"insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
"values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
>>.
%% Examples
bridge_v2_examples(Method) ->
[
#{
<<"pgsql">> => #{
summary => <<"PostgreSQL Producer Action">>,
value => values({Method, bridge_v2_producer})
}
}
].
conn_bridge_examples(Method) ->
[
#{
<<"pgsql">> => #{
summary => <<"PostgreSQL Producer Bridge">>,
value => values_conn_bridge_examples(Method, pgsql)
}
}
].
values({get, PostgreSQLType}) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
},
values({post, PostgreSQLType})
);
values({post, PostgreSQLType}) ->
maps:merge(
#{
name => <<"my_pgsql_action">>,
type => PostgreSQLType
},
values({put, PostgreSQLType})
);
values({put, PostgreSQLType}) ->
maps:merge(
#{
enable => true,
connector => <<"my_pgsql_connector">>,
resource_opts => #{
health_check_interval => "32s"
}
},
values({producer, PostgreSQLType})
);
values({producer, _PostgreSQLType}) ->
#{
<<"enable">> => true,
<<"connector">> => <<"connector_pgsql_test">>,
<<"parameters">> => #{
<<"sql">> =>
<<"INSERT INTO client_events(clientid, event, created_at) VALUES (\n ${clientid},\n ${event},\n TO_TIMESTAMP((${timestamp} :: bigint))\n)">>
},
<<"resource_opts">> => #{
<<"batch_size">> => 1,
<<"batch_time">> => <<"0ms">>,
<<"health_check_interval">> => <<"15s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"query_mode">> => <<"async">>,
<<"request_ttl">> => <<"45s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>,
<<"worker_pool_size">> => 16
}
}.
values_conn_bridge_examples(_Method, Type) ->
#{
enable => true,
type => Type,
name => <<"foo">>,
server => <<"127.0.0.1:5432">>,
database => <<"mqtt">>,
pool_size => 8,
username => <<"root">>,
password => <<"******">>,
sql => default_sql(),
local_topic => <<"local/topic/#">>,
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
}
}.

View File

@ -114,7 +114,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
ok = emqx_common_test_helpers:stop_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
ok.
init_per_testcase(_Testcase, Config) ->
@ -147,7 +147,7 @@ common_init(Config0) ->
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
% Ensure enterprise bridge module is loaded
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
ok = emqx_common_test_helpers:start_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
_ = emqx_bridge_enterprise:module_info(),
emqx_mgmt_api_test_util:init_suite(),
% Connect to pgsql directly and create the table
@ -259,17 +259,16 @@ send_message(Config, Payload) ->
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
emqx_bridge:send_message(BridgeID, Payload).
query_resource(Config, Request) ->
query_resource(Config, Msg = _Request) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
emqx_bridge_v2:query(BridgeType, Name, Msg, #{timeout => 1_000}).
query_resource_sync(Config, Request) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
ActionId = emqx_bridge_v2:id(BridgeType, Name),
emqx_resource_buffer_worker:simple_sync_query(ActionId, Request).
query_resource_async(Config, Request) ->
query_resource_async(Config, Request, _Opts = #{}).
@ -279,9 +278,8 @@ query_resource_async(Config, Request, Opts) ->
BridgeType = ?config(pgsql_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Timeout = maps:get(timeout, Opts, 500),
Return = emqx_resource:query(ResourceID, Request, #{
Return = emqx_bridge_v2:query(BridgeType, Name, Request, #{
timeout => Timeout,
async_reply_fun => {AsyncReplyFun, []}
}),
@ -441,13 +439,12 @@ t_get_status(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)),
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch(
{ok, Status} when Status =:= disconnected orelse Status =:= connecting,
emqx_resource_manager:health_check(ResourceID)
#{status := Status} when Status =:= disconnected orelse Status =:= connecting,
emqx_bridge_v2:health_check(BridgeType, Name)
)
end),
ok.
@ -655,7 +652,7 @@ t_nasty_sql_string(Config) ->
t_missing_table(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
% ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
@ -665,21 +662,20 @@ t_missing_table(Config) ->
_Sleep = 1_000,
_Attempts = 20,
?assertMatch(
{ok, Status} when Status == connecting orelse Status == disconnected,
emqx_resource_manager:health_check(ResourceID)
#{status := Status} when Status == connecting orelse Status == disconnected,
emqx_bridge_v2:health_check(BridgeType, Name)
)
),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 1000,
?assertMatch(
{error, {resource_error, #{reason := unhealthy_target}}},
query_resource(Config, {send_message, SentData, [], Timeout})
query_resource(Config, {send_message, SentData})
),
ok
end,
fun(Trace) ->
?assertMatch([_], ?of_kind(pgsql_undefined_table, Trace)),
?assertMatch([_ | _], ?of_kind(pgsql_undefined_table, Trace)),
ok
end
),
@ -689,7 +685,7 @@ t_missing_table(Config) ->
t_table_removed(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
%%ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
connect_and_create_table(Config),
@ -697,13 +693,14 @@ t_table_removed(Config) ->
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
),
connect_and_drop_table(Config),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
case query_resource_sync(Config, {send_message, SentData, []}) of
{error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}} ->
ActionId = emqx_bridge_v2:id(BridgeType, Name),
case query_resource_sync(Config, {ActionId, SentData}) of
{error, {unrecoverable_error, _}} ->
ok;
?RESOURCE_ERROR_M(not_connected, _) ->
ok;
@ -720,7 +717,6 @@ t_table_removed(Config) ->
t_concurrent_health_checks(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
connect_and_create_table(Config),
@ -728,11 +724,13 @@ t_concurrent_health_checks(Config) ->
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
),
emqx_utils:pmap(
fun(_) ->
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
?assertMatch(
#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)
)
end,
lists:seq(1, 20)
),

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_timescale, [
{description, "EMQX Enterprise TimescaleDB Bridge"},
{vsn, "0.1.2"},
{vsn, "0.1.3"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource]},
{env, []},

View File

@ -14,6 +14,12 @@
desc/1
]).
%% Examples
-export([
bridge_v2_examples/1,
connector_examples/1
]).
%% -------------------------------------------------------------------------------------------------
%% api
@ -22,7 +28,7 @@ conn_bridge_examples(Method) ->
#{
<<"timescale">> => #{
summary => <<"Timescale Bridge">>,
value => emqx_bridge_pgsql:values(Method, timescale)
value => emqx_bridge_pgsql_schema:values_conn_bridge_examples(Method, timescale)
}
}
].
@ -35,8 +41,53 @@ roots() -> [].
fields("post") ->
emqx_bridge_pgsql:fields("post", timescale);
fields("config_connector") ->
emqx_bridge_pgsql_schema:fields("config_connector");
fields(action) ->
{timescale,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)),
#{
desc => <<"Timescale Action Config">>,
required => false
}
)};
fields("put_bridge_v2") ->
emqx_bridge_pgsql_schema:fields(pgsql_action);
fields("get_bridge_v2") ->
emqx_bridge_pgsql_schema:fields(pgsql_action);
fields("post_bridge_v2") ->
emqx_bridge_pgsql_schema:fields(pgsql_action);
fields("put_connector") ->
emqx_bridge_pgsql_schema:fields("config_connector");
fields("get_connector") ->
emqx_bridge_pgsql_schema:fields("config_connector");
fields("post_connector") ->
emqx_bridge_pgsql_schema:fields("config_connector");
fields(Method) ->
emqx_bridge_pgsql:fields(Method).
desc(_) ->
undefined.
%% Examples
connector_examples(Method) ->
[
#{
<<"timescale">> => #{
summary => <<"Timescale Connector">>,
value => emqx_postgresql_connector_schema:values({Method, connector})
}
}
].
bridge_v2_examples(Method) ->
[
#{
<<"timescale">> => #{
summary => <<"Timescale Action">>,
value => emqx_bridge_pgsql_schema:values({Method, timescale})
}
}
].

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_timescale_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> timescale.
action_type_name() -> timescale.
connector_type_name() -> timescale.
schema_module() -> emqx_bridge_timescale.

View File

@ -35,6 +35,12 @@ resource_type(syskeeper_forwarder) ->
emqx_bridge_syskeeper_connector;
resource_type(syskeeper_proxy) ->
emqx_bridge_syskeeper_proxy_server;
resource_type(pgsql) ->
emqx_postgresql;
resource_type(timescale) ->
emqx_postgresql;
resource_type(matrix) ->
emqx_postgresql;
resource_type(Type) ->
error({unknown_connector_type, Type}).
@ -108,6 +114,30 @@ connector_structs() ->
desc => <<"Syskeeper Proxy Connector Config">>,
required => false
}
)},
{pgsql,
mk(
hoconsc:map(name, ref(emqx_postgresql_connector_schema, "config_connector")),
#{
desc => <<"PostgreSQL Connector Config">>,
required => false
}
)},
{timescale,
mk(
hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")),
#{
desc => <<"Timescale Connector Config">>,
required => false
}
)},
{matrix,
mk(
hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")),
#{
desc => <<"Matrix Connector Config">>,
required => false
}
)}
].
@ -131,7 +161,10 @@ schema_modules() ->
emqx_bridge_kafka,
emqx_bridge_mongodb,
emqx_bridge_syskeeper_connector,
emqx_bridge_syskeeper_proxy
emqx_bridge_syskeeper_proxy,
emqx_postgresql_connector_schema,
emqx_bridge_timescale,
emqx_bridge_matrix
].
api_schemas(Method) ->
@ -152,7 +185,10 @@ api_schemas(Method) ->
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method)
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector")
].
api_ref(Module, Type, Method) ->

View File

@ -72,7 +72,10 @@ connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_p
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder];
connector_type_to_bridge_types(syskeeper_proxy) -> [].
connector_type_to_bridge_types(syskeeper_proxy) -> [];
connector_type_to_bridge_types(pgsql) -> [pgsql];
connector_type_to_bridge_types(timescale) -> [timescale];
connector_type_to_bridge_types(matrix) -> [matrix].
actions_config_name() -> <<"actions">>.

View File

@ -34,7 +34,11 @@
on_stop/2,
on_query/3,
on_batch_query/3,
on_get_status/2
on_get_status/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]).
-export([connect/1]).
@ -136,10 +140,11 @@ on_start(
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize}
],
State = parse_prepare_sql(Config),
State1 = parse_prepare_sql(Config, <<"send_message">>),
State2 = State1#{installed_channels => #{}},
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok ->
{ok, init_prepare(State#{pool_name => InstId, prepares => #{}})};
{ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})};
{error, Reason} ->
?tp(
pgsql_connector_start_failed,
@ -148,13 +153,140 @@ on_start(
{error, Reason}
end.
on_stop(InstId, _State) ->
on_stop(InstId, State) ->
?SLOG(info, #{
msg => "stopping_postgresql_connector",
connector => InstId
}),
close_connections(State),
emqx_resource_pool:stop(InstId).
close_connections(#{pool_name := PoolName} = _State) ->
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
close_connections_with_worker_pids(WorkerPids),
ok.
close_connections_with_worker_pids([WorkerPid | Rest]) ->
%% We ignore errors since any error probably means that the
%% connection is closed already.
try ecpool_worker:client(WorkerPid) of
{ok, Conn} ->
_ = epgsql:close(Conn),
close_connections_with_worker_pids(Rest);
_ ->
close_connections_with_worker_pids(Rest)
catch
_:_ ->
close_connections_with_worker_pids(Rest)
end;
close_connections_with_worker_pids([]) ->
ok.
on_add_channel(
_InstId,
#{
installed_channels := InstalledChannels
} = OldState,
ChannelId,
ChannelConfig
) ->
%% The following will throw an exception if the bridge producers fails to start
{ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig),
case ChannelState of
#{prepares := {error, Reason}} ->
{error, {unhealthy_target, Reason}};
_ ->
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}
end.
create_channel_state(
ChannelId,
#{pool_name := PoolName} = _ConnectorState,
#{parameters := Parameters} = _ChannelConfig
) ->
State1 = parse_prepare_sql(Parameters, ChannelId),
{ok,
init_prepare(State1#{
pool_name => PoolName,
prepare_statement => #{}
})}.
on_remove_channel(
_InstId,
#{
installed_channels := InstalledChannels
} = OldState,
ChannelId
) ->
%% Close prepared statements
ok = close_prepared_statement(ChannelId, OldState),
NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
close_prepared_statement(WorkerPids, ChannelId, State),
ok.
close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
%% We ignore errors since any error probably means that the
%% prepared statement doesn't exist.
try ecpool_worker:client(WorkerPid) of
{ok, Conn} ->
Statement = get_prepared_statement(ChannelId, State),
_ = epgsql:close(Conn, Statement),
close_prepared_statement(Rest, ChannelId, State);
_ ->
close_prepared_statement(Rest, ChannelId, State)
catch
_:_ ->
close_prepared_statement(Rest, ChannelId, State)
end;
close_prepared_statement([], _ChannelId, _State) ->
ok.
on_get_channel_status(
_ResId,
ChannelId,
#{
pool_name := PoolName,
installed_channels := Channels
} = _State
) ->
ChannelState = maps:get(ChannelId, Channels),
case
do_check_channel_sql(
PoolName,
ChannelId,
ChannelState
)
of
ok ->
connected;
{error, undefined_table} ->
{error, {unhealthy_target, <<"Table does not exist">>}};
{error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn
connecting
end.
do_check_channel_sql(
PoolName,
ChannelId,
#{query_templates := ChannelQueryTemplates} = _ChannelState
) ->
{SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates),
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
validate_table_existence(WorkerPids, SQL).
on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId).
on_query(InstId, {TypeOrKey, NameOrSQL}, State) ->
on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
on_query(
@ -187,10 +319,10 @@ pgsql_query_type(_) ->
on_batch_query(
InstId,
[{Key, _} = Request | _] = BatchReq,
#{pool_name := PoolName, query_templates := Templates, prepares := PrepStatements} = State
#{pool_name := PoolName} = State
) ->
BinKey = to_bin(Key),
case maps:get(BinKey, Templates, undefined) of
case get_template(BinKey, State) of
undefined ->
Log = #{
connector => InstId,
@ -201,7 +333,7 @@ on_batch_query(
?SLOG(error, Log),
{error, {unrecoverable_error, batch_prepare_not_implemented}};
{_Statement, RowTemplate} ->
PrepStatement = maps:get(BinKey, PrepStatements),
PrepStatement = get_prepared_statement(BinKey, State),
Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
case on_sql_query(InstId, PoolName, execute_batch, PrepStatement, Rows) of
{error, _Error} = Result ->
@ -223,15 +355,35 @@ proc_sql_params(query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) ->
Key = to_bin(TypeOrKey),
case maps:get(Key, Templates, undefined) of
proc_sql_params(TypeOrKey, SQLOrData, Params, State) ->
BinKey = to_bin(TypeOrKey),
case get_template(BinKey, State) of
undefined ->
{SQLOrData, Params};
{_Statement, RowTemplate} ->
{Key, render_prepare_sql_row(RowTemplate, SQLOrData)}
{BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)}
end.
get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
BinKey = to_bin(Key),
ChannelState = maps:get(BinKey, Channels),
ChannelQueryTemplates = maps:get(query_templates, ChannelState),
maps:get(BinKey, ChannelQueryTemplates);
get_template(Key, #{query_templates := Templates}) ->
BinKey = to_bin(Key),
maps:get(BinKey, Templates, undefined).
get_prepared_statement(Key, #{installed_channels := Channels} = _State) when
is_map_key(Key, Channels)
->
BinKey = to_bin(Key),
ChannelState = maps:get(BinKey, Channels),
ChannelPreparedStatements = maps:get(prepares, ChannelState),
maps:get(BinKey, ChannelPreparedStatements);
get_prepared_statement(Key, #{prepares := PrepStatements}) ->
BinKey = to_bin(Key),
maps:get(BinKey, PrepStatements).
on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
{error, Reason} = Result ->
@ -415,13 +567,13 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
conn_opts([_Opt | Opts], Acc) ->
conn_opts(Opts, Acc).
parse_prepare_sql(Config) ->
parse_prepare_sql(Config, SQLID) ->
Queries =
case Config of
#{prepare_statement := Qs} ->
Qs;
#{sql := Query} ->
#{<<"send_message">> => Query};
#{SQLID => Query};
#{} ->
#{}
end,

View File

@ -0,0 +1,201 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_postgresql_connector_schema).
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx_postgresql/include/emqx_postgresql.hrl").
-define(PGSQL_HOST_OPTIONS, #{
default_port => ?PGSQL_DEFAULT_PORT
}).
-export([
roots/0,
fields/1
]).
%% Examples
-export([
connector_examples/1,
values/1
]).
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields("config_connector") ->
[{server, server()}] ++
adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
emqx_connector_schema_lib:ssl_fields();
fields(config) ->
fields("config_connector") ++
fields(action);
fields(action) ->
{pgsql,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)),
#{
desc => <<"PostgreSQL Action Config">>,
required => false
}
)};
fields(pgsql_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters));
%% TODO: All of these needs to be fixed
fields("put_bridge_v2") ->
fields(pgsql_action);
fields("get_bridge_v2") ->
fields(pgsql_action);
fields("post_bridge_v2") ->
fields(pgsql_action);
fields("put_connector") ->
fields("config_connector");
fields("get_connector") ->
fields("config_connector");
fields("post_connector") ->
fields("config_connector").
server() ->
Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
adjust_fields(Fields) ->
lists:map(
fun
({username, Sc}) ->
%% to please dialyzer...
Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
{username, hocon_schema:override(Sc, Override)};
(Field) ->
Field
end,
Fields
).
%% Examples
connector_examples(Method) ->
[
#{
<<"pgsql">> => #{
summary => <<"PostgreSQL Producer Connector">>,
value => values({Method, connector})
}
}
].
%% TODO: All of these needs to be adjusted from Kafka to PostgreSQL
values({get, PostgreSQLType}) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
},
values({post, PostgreSQLType})
);
values({post, connector}) ->
maps:merge(
#{
name => <<"my_pgsql_connector">>,
type => <<"pgsql">>
},
values(common_config)
);
values({post, PostgreSQLType}) ->
maps:merge(
#{
name => <<"my_pgsql_action">>,
type => <<"pgsql">>
},
values({put, PostgreSQLType})
);
values({put, bridge_v2_producer}) ->
values(bridge_v2_producer);
values({put, connector}) ->
values(common_config);
values({put, PostgreSQLType}) ->
maps:merge(values(common_config), values(PostgreSQLType));
values(bridge_v2_producer) ->
maps:merge(
#{
enable => true,
connector => <<"my_pgsql_connector">>,
resource_opts => #{
health_check_interval => "32s"
}
},
values(producer)
);
values(common_config) ->
#{
authentication => #{
mechanism => <<"plain">>,
username => <<"username">>,
password => <<"******">>
},
bootstrap_hosts => <<"localhost:9092">>,
connect_timeout => <<"5s">>,
enable => true,
metadata_request_timeout => <<"4s">>,
min_metadata_refresh_interval => <<"3s">>,
socket_opts => #{
sndbuf => <<"1024KB">>,
recbuf => <<"1024KB">>,
nodelay => true,
tcp_keepalive => <<"none">>
}
};
values(producer) ->
#{
kafka => #{
topic => <<"kafka-topic">>,
message => #{
key => <<"${.clientid}">>,
value => <<"${.}">>,
timestamp => <<"${.timestamp}">>
},
max_batch_bytes => <<"896KB">>,
compression => <<"no_compression">>,
partition_strategy => <<"random">>,
required_acks => <<"all_isr">>,
partition_count_refresh_interval => <<"60s">>,
kafka_headers => <<"${pub_props}">>,
kafka_ext_headers => [
#{
kafka_ext_header_key => <<"clientid">>,
kafka_ext_header_value => <<"${clientid}">>
},
#{
kafka_ext_header_key => <<"topic">>,
kafka_ext_header_value => <<"${topic}">>
}
],
kafka_header_value_encode_mode => none,
max_inflight => 10,
buffer => #{
mode => <<"hybrid">>,
per_partition_limit => <<"2GB">>,
segment_bytes => <<"100MB">>,
memory_overload_protection => true
}
},
local_topic => <<"mqtt/local/topic">>
}.