diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml index 66e7ec308..ce4f28ba7 100644 --- a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -15,6 +15,8 @@ services: - 8087:8087 - 13306:3306 - 13307:3307 + - 15432:5432 + - 15433:5433 command: - "-host=0.0.0.0" - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index 34bc5b1db..f4b11116b 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -29,5 +29,17 @@ "listen": "0.0.0.0:6379", "upstream": "redis:6379", "enabled": true + }, + { + "name": "pgsql_tcp", + "listen": "0.0.0.0:5432", + "upstream": "pgsql:5432", + "enabled": true + }, + { + "name": "pgsql_tls", + "listen": "0.0.0.0:5433", + "upstream": "pgsql-tls:5432", + "enabled": true } ] diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index cde8bbe3b..4b565a614 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -20,6 +20,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("epgsql/include/epgsql.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([roots/0, fields/1]). @@ -31,6 +32,7 @@ on_start/2, on_stop/2, on_query/3, + on_batch_query/3, on_get_status/2 ]). @@ -38,7 +40,8 @@ -export([ query/3, - prepared_query/3 + prepared_query/3, + execute_batch/3 ]). -export([do_get_status/1]). @@ -47,6 +50,18 @@ default_port => ?PGSQL_DEFAULT_PORT }). +-type prepares() :: #{atom() => binary()}. +-type params_tokens() :: #{atom() => list()}. + +-type state() :: + #{ + poolname := atom(), + auto_reconnect := boolean(), + prepare_sql := prepares(), + params_tokens := params_tokens(), + prepare_statement := epgsql:statement() + }. + %%===================================================================== roots() -> @@ -65,6 +80,7 @@ server() -> %% =================================================================== callback_mode() -> always_sync. +-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}. on_start( InstId, #{ @@ -87,7 +103,7 @@ on_start( case maps:get(enable, SSL) of true -> [ - {ssl, true}, + {ssl, required}, {ssl_opts, emqx_tls_lib:to_client_opts(SSL)} ]; false -> @@ -100,13 +116,21 @@ on_start( {password, emqx_secret:wrap(Password)}, {database, DB}, {auto_reconnect, reconn_interval(AutoReconn)}, - {pool_size, PoolSize}, - {prepare_statement, maps:to_list(maps:get(prepare_statement, Config, #{}))} + {pool_size, PoolSize} ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), + Prepares = parse_prepare_sql(Config), + InitState = #{poolname => PoolName, auto_reconnect => AutoReconn, prepare_statement => #{}}, + State = maps:merge(InitState, Prepares), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of - ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; - {error, Reason} -> {error, Reason} + ok -> + {ok, init_prepare(State)}; + {error, Reason} -> + ?tp( + pgsql_connector_start_failed, + #{error => Reason} + ), + {error, Reason} end. on_stop(InstId, #{poolname := PoolName}) -> @@ -116,37 +140,145 @@ on_stop(InstId, #{poolname := PoolName}) -> }), emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(InstId, {Type, NameOrSQL}, #{poolname := _PoolName} = State) -> - on_query(InstId, {Type, NameOrSQL, []}, State); -on_query(InstId, {Type, NameOrSQL, Params}, #{poolname := PoolName} = State) -> +on_query(InstId, {TypeOrKey, NameOrSQL}, #{poolname := _PoolName} = State) -> + on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); +on_query( + InstId, + {TypeOrKey, NameOrSQL, Params}, + #{poolname := PoolName} = State +) -> ?SLOG(debug, #{ msg => "postgresql connector received sql query", connector => InstId, + type => TypeOrKey, sql => NameOrSQL, state => State }), - case Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Params]}, no_handover) of + Type = pgsql_query_type(TypeOrKey), + {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), + on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data). + +pgsql_query_type(sql) -> + query; +pgsql_query_type(query) -> + query; +pgsql_query_type(prepared_query) -> + prepared_query; +%% for bridge +pgsql_query_type(_) -> + pgsql_query_type(prepared_query). + +on_batch_query( + InstId, + BatchReq, + #{poolname := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State +) -> + case BatchReq of + [{Key, _} = Request | _] -> + BinKey = to_bin(Key), + case maps:get(BinKey, Tokens, undefined) of + undefined -> + Log = #{ + connector => InstId, + first_request => Request, + state => State, + msg => "batch prepare not implemented" + }, + ?SLOG(error, Log), + {error, batch_prepare_not_implemented}; + TokenList -> + {_, Datas} = lists:unzip(BatchReq), + Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], + St = maps:get(BinKey, Sts), + {_Column, Results} = on_sql_query(InstId, PoolName, execute_batch, St, Datas2), + %% this local function only suits for the result of batch insert + TransResult = fun + Trans([{ok, Count} | T], Acc) -> + Trans(T, Acc + Count); + Trans([{error, _} = Error | _], _Acc) -> + Error; + Trans([], Acc) -> + {ok, Acc} + end, + + TransResult(Results, 0) + end; + _ -> + Log = #{ + connector => InstId, + request => BatchReq, + state => State, + msg => "invalid request" + }, + ?SLOG(error, Log), + {error, invalid_request} + end. + +proc_sql_params(query, SQLOrKey, Params, _State) -> + {SQLOrKey, Params}; +proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> + {SQLOrKey, Params}; +proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) -> + Key = to_bin(TypeOrKey), + case maps:get(Key, ParamsTokens, undefined) of + undefined -> + {SQLOrData, Params}; + Tokens -> + {Key, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} + end. + +on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> + Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover), + case Result of {error, Reason} -> ?SLOG(error, #{ msg => "postgresql connector do sql query failed", connector => InstId, + type => Type, sql => NameOrSQL, reason => Reason }); _ -> + ?tp( + pgsql_connector_query_return, + #{result => Result} + ), ok end, Result. -on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) -> +on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) -> case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of - true -> connected; - false -> conn_status(AutoReconn) + true -> + case do_check_prepares(State) of + ok -> + connected; + {ok, NState} -> + %% return new state with prepared statements + {connected, NState}; + false -> + %% do not log error, it is logged in prepare_sql_to_conn + conn_status(AutoReconn) + end; + false -> + conn_status(AutoReconn) end. do_get_status(Conn) -> ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")). +do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> + ok; +do_check_prepares(State = #{poolname := PoolName, prepare_sql := {error, Prepares}}) -> + %% retry to prepare + case prepare_sql(Prepares, PoolName) of + {ok, Sts} -> + %% remove the error + {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}; + _Error -> + false + end. + %% =================================================================== conn_status(_AutoReconn = true) -> connecting; conn_status(_AutoReconn = false) -> disconnected. @@ -158,13 +290,9 @@ connect(Opts) -> Host = proplists:get_value(host, Opts), Username = proplists:get_value(username, Opts), Password = emqx_secret:unwrap(proplists:get_value(password, Opts)), - PrepareStatement = proplists:get_value(prepare_statement, Opts), case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of - {ok, Conn} -> - case parse(Conn, PrepareStatement) of - ok -> {ok, Conn}; - {error, Reason} -> {error, Reason} - end; + {ok, _Conn} = Ok -> + Ok; {error, Reason} -> {error, Reason} end. @@ -175,15 +303,8 @@ query(Conn, SQL, Params) -> prepared_query(Conn, Name, Params) -> epgsql:prepared_query2(Conn, Name, Params). -parse(_Conn, []) -> - ok; -parse(Conn, [{Name, Query} | More]) -> - case epgsql:parse2(Conn, Name, Query, []) of - {ok, _Statement} -> - parse(Conn, More); - Other -> - Other - end. +execute_batch(Conn, Statement, Params) -> + epgsql:execute_batch(Conn, Statement, Params). conn_opts(Opts) -> conn_opts(Opts, []). @@ -206,3 +327,91 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) -> conn_opts(Opts, [Opt | Acc]); conn_opts([_Opt | Opts], Acc) -> conn_opts(Opts, Acc). + +parse_prepare_sql(Config) -> + SQL = + case maps:get(prepare_statement, Config, undefined) of + undefined -> + case maps:get(sql, Config, undefined) of + undefined -> #{}; + Template -> #{<<"send_message">> => Template} + end; + Any -> + Any + end, + parse_prepare_sql(maps:to_list(SQL), #{}, #{}). + +parse_prepare_sql([{Key, H} | T], Prepares, Tokens) -> + {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '$n'), + parse_prepare_sql( + T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} + ); +parse_prepare_sql([], Prepares, Tokens) -> + #{ + prepare_sql => Prepares, + params_tokens => Tokens + }. + +init_prepare(State = #{prepare_sql := Prepares, poolname := PoolName}) -> + case maps:size(Prepares) of + 0 -> + State; + _ -> + case prepare_sql(Prepares, PoolName) of + {ok, Sts} -> + State#{prepare_statement := Sts}; + Error -> + LogMeta = #{ + msg => <<"PostgreSQL init prepare statement failed">>, error => Error + }, + ?SLOG(error, LogMeta), + %% mark the prepare_sqlas failed + State#{prepare_sql => {error, Prepares}} + end + end. + +prepare_sql(Prepares, PoolName) when is_map(Prepares) -> + prepare_sql(maps:to_list(Prepares), PoolName); +prepare_sql(Prepares, PoolName) -> + case do_prepare_sql(Prepares, PoolName) of + {ok, _Sts} = Ok -> + %% prepare for reconnect + ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), + Ok; + Error -> + Error + end. + +do_prepare_sql(Prepares, PoolName) -> + do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}). + +do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) -> + {ok, Conn} = ecpool_worker:client(Worker), + case prepare_sql_to_conn(Conn, Prepares) of + {ok, Sts} -> + do_prepare_sql(T, Prepares, PoolName, Sts); + Error -> + Error + end; +do_prepare_sql([], _Prepares, _PoolName, LastSts) -> + {ok, LastSts}. + +prepare_sql_to_conn(Conn, Prepares) -> + prepare_sql_to_conn(Conn, Prepares, #{}). + +prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements}; +prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) -> + LogMeta = #{msg => "PostgreSQL Prepare Statement", name => Key, prepare_sql => SQL}, + ?SLOG(info, LogMeta), + case epgsql:parse2(Conn, Key, SQL, []) of + {ok, Statement} -> + prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement}); + {error, Error} = Other -> + ?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}), + Other + end. + +to_bin(Bin) when is_binary(Bin) -> + Bin; +to_bin(Atom) when is_atom(Atom) -> + erlang:atom_to_binary(Atom). diff --git a/lib-ee/emqx_ee_bridge/docker-ct b/lib-ee/emqx_ee_bridge/docker-ct index fba33559e..bf990bd7c 100644 --- a/lib-ee/emqx_ee_bridge/docker-ct +++ b/lib-ee/emqx_ee_bridge/docker-ct @@ -6,3 +6,4 @@ mongo_rs_sharded mysql redis redis_cluster +pgsql diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_pgsql.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_pgsql.conf new file mode 100644 index 000000000..0f80e1a1b --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_pgsql.conf @@ -0,0 +1,74 @@ +emqx_ee_bridge_pgsql { + + local_topic { + desc { + en: """The MQTT topic filter to be forwarded to PostgreSQL. All MQTT 'PUBLISH' messages with the topic +matching the local_topic will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is +configured, then both the data got from the rule and the MQTT messages that match local_topic +will be forwarded. +""" + zh: """发送到 'local_topic' 的消息都会转发到 PostgreSQL。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。 +""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + + sql_template { + desc { + en: """SQL Template""" + zh: """SQL 模板""" + } + label { + en: "SQL Template" + zh: "SQL 模板" + } + } + config_enable { + desc { + en: """Enable or disable this bridge""" + zh: """启用/禁用桥接""" + } + label { + en: "Enable Or Disable Bridge" + zh: "启用/禁用桥接" + } + } + + desc_config { + desc { + en: """Configuration for an PostgreSQL bridge.""" + zh: """PostgreSQL 桥接配置""" + } + label: { + en: "PostgreSQL Bridge Configuration" + zh: "PostgreSQL 桥接配置" + } + } + + desc_type { + desc { + en: """The Bridge Type""" + zh: """Bridge 类型""" + } + label { + en: "Bridge Type" + zh: "桥接类型" + } + } + + desc_name { + desc { + en: """Bridge name.""" + zh: """桥接名字""" + } + label { + en: "Bridge Name" + zh: "桥接名字" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 2e93a313c..d0099db1c 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -17,6 +17,7 @@ api_schemas(Method) -> ref(emqx_ee_bridge_gcp_pubsub, Method), ref(emqx_ee_bridge_kafka, Method), ref(emqx_ee_bridge_mysql, Method), + ref(emqx_ee_bridge_pgsql, Method), ref(emqx_ee_bridge_mongodb, Method ++ "_rs"), ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"), ref(emqx_ee_bridge_mongodb, Method ++ "_single"), @@ -36,7 +37,8 @@ schema_modules() -> emqx_ee_bridge_influxdb, emqx_ee_bridge_mongodb, emqx_ee_bridge_mysql, - emqx_ee_bridge_redis + emqx_ee_bridge_redis, + emqx_ee_bridge_pgsql ]. examples(Method) -> @@ -63,7 +65,8 @@ resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb; resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb; resource_type(redis_single) -> emqx_ee_connector_redis; resource_type(redis_sentinel) -> emqx_ee_connector_redis; -resource_type(redis_cluster) -> emqx_ee_connector_redis. +resource_type(redis_cluster) -> emqx_ee_connector_redis; +resource_type(pgsql) -> emqx_connector_pgsql. fields(bridges) -> [ @@ -98,6 +101,14 @@ fields(bridges) -> desc => <<"MySQL Bridge Config">>, required => false } + )}, + {pgsql, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_pgsql, "config")), + #{ + desc => <<"PostgreSQL Bridge Config">>, + required => false + } )} ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs(). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl new file mode 100644 index 000000000..1f9a005c9 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl @@ -0,0 +1,130 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_pgsql). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(DEFAULT_SQL, << + "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) " + "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))" +>>). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"pgsql">> => #{ + summary => <<"PostgreSQL Bridge">>, + value => values(Method) + } + } + ]. + +values(get) -> + maps:merge(values(post), ?METRICS_EXAMPLE); +values(post) -> + #{ + enable => true, + type => pgsql, + name => <<"foo">>, + server => <<"127.0.0.1:5432">>, + database => <<"mqtt">>, + pool_size => 8, + username => <<"root">>, + password => <<"public">>, + auto_reconnect => true, + sql => ?DEFAULT_SQL, + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 8, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => async, + max_queue_bytes => ?DEFAULT_QUEUE_SIZE + } + }; +values(put) -> + values(post). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_pgsql". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {sql, + mk( + binary(), + #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + )}, + {local_topic, + mk( + binary(), + #{desc => ?DESC("local_topic"), default => undefined} + )}, + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ] ++ + emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields(); +fields("creation_opts") -> + Opts = emqx_resource_schema:fields("creation_opts"), + [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."]; +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- +%% internal +is_hidden_opts(Field) -> + lists:member(Field, [ + async_inflight_window + ]). + +type_field() -> + {type, mk(enum([mysql]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl new file mode 100644 index 000000000..c5292a892 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl @@ -0,0 +1,481 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_bridge_pgsql_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +% SQL definitions +-define(SQL_BRIDGE, + "INSERT INTO mqtt_test(payload, arrived) " + "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))" +). +-define(SQL_CREATE_TABLE, + "CREATE TABLE IF NOT EXISTS mqtt_test (payload text, arrived timestamp NOT NULL) " +). +-define(SQL_DROP_TABLE, "DROP TABLE mqtt_test"). +-define(SQL_DELETE, "DELETE from mqtt_test"). +-define(SQL_SELECT, "SELECT payload FROM mqtt_test"). + +% DB defaults +-define(PGSQL_DATABASE, "mqtt"). +-define(PGSQL_USERNAME, "root"). +-define(PGSQL_PASSWORD, "public"). +-define(BATCH_SIZE, 10). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, tcp}, + {group, tls} + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + NonBatchCases = [t_write_timeout], + [ + {tcp, [ + {group, with_batch}, + {group, without_batch} + ]}, + {tls, [ + {group, with_batch}, + {group, without_batch} + ]}, + {with_batch, TCs -- NonBatchCases}, + {without_batch, TCs} + ]. + +init_per_group(tcp, Config) -> + Host = os:getenv("PGSQL_TCP_HOST", "toxiproxy"), + Port = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")), + [ + {pgsql_host, Host}, + {pgsql_port, Port}, + {enable_tls, false}, + {query_mode, sync}, + {proxy_name, "pgsql_tcp"} + | Config + ]; +init_per_group(tls, Config) -> + Host = os:getenv("PGSQL_TLS_HOST", "toxiproxy"), + Port = list_to_integer(os:getenv("PGSQL_TLS_PORT", "5433")), + [ + {pgsql_host, Host}, + {pgsql_port, Port}, + {enable_tls, true}, + {query_mode, sync}, + {proxy_name, "pgsql_tls"} + | Config + ]; +init_per_group(with_batch, Config0) -> + Config = [{enable_batch, true} | Config0], + common_init(Config); +init_per_group(without_batch, Config0) -> + Config = [{enable_batch, false} | Config0], + common_init(Config); +init_per_group(_Group, Config) -> + Config. + +end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch -> + connect_and_drop_table(Config), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok; +end_per_group(_Group, _Config) -> + ok. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), + ok. + +init_per_testcase(_Testcase, Config) -> + connect_and_clear_table(Config), + delete_bridge(Config), + Config. + +end_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + connect_and_clear_table(Config), + ok = snabbkaffe:stop(), + delete_bridge(Config), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +common_init(Config0) -> + BridgeType = <<"pgsql">>, + Host = ?config(pgsql_host, Config0), + Port = ?config(pgsql_port, Config0), + case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of + true -> + % Setup toxiproxy + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + % Ensure EE bridge module is loaded + _ = application:load(emqx_ee_bridge), + _ = emqx_ee_bridge:module_info(), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + emqx_mgmt_api_test_util:init_suite(), + % Connect to pgsql directly and create the table + connect_and_create_table(Config0), + {Name, PGConf} = pgsql_config(BridgeType, Config0), + Config = + [ + {pgsql_config, PGConf}, + {pgsql_bridge_type, BridgeType}, + {pgsql_name, Name}, + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort} + | Config0 + ], + Config; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_pgsql); + _ -> + {skip, no_pgsql} + end + end. + +pgsql_config(BridgeType, Config) -> + Port = integer_to_list(?config(pgsql_port, Config)), + Server = ?config(pgsql_host, Config) ++ ":" ++ Port, + Name = atom_to_binary(?MODULE), + BatchSize = + case ?config(enable_batch, Config) of + true -> ?BATCH_SIZE; + false -> 1 + end, + QueryMode = ?config(query_mode, Config), + TlsEnabled = ?config(enable_tls, Config), + ConfigString = + io_lib:format( + "bridges.~s.~s {\n" + " enable = true\n" + " server = ~p\n" + " database = ~p\n" + " username = ~p\n" + " password = ~p\n" + " sql = ~p\n" + " resource_opts = {\n" + " batch_size = ~b\n" + " query_mode = ~s\n" + " }\n" + " ssl = {\n" + " enable = ~w\n" + " }\n" + "}", + [ + BridgeType, + Name, + Server, + ?PGSQL_DATABASE, + ?PGSQL_USERNAME, + ?PGSQL_PASSWORD, + ?SQL_BRIDGE, + BatchSize, + QueryMode, + TlsEnabled + ] + ), + {Name, parse_and_check(ConfigString, BridgeType, Name)}. + +parse_and_check(ConfigString, BridgeType, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf, + Config. + +create_bridge(Config) -> + BridgeType = ?config(pgsql_bridge_type, Config), + Name = ?config(pgsql_name, Config), + PGConfig = ?config(pgsql_config, Config), + emqx_bridge:create(BridgeType, Name, PGConfig). + +delete_bridge(Config) -> + BridgeType = ?config(pgsql_bridge_type, Config), + Name = ?config(pgsql_name, Config), + emqx_bridge:remove(BridgeType, Name). + +create_bridge_http(Params) -> + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + +send_message(Config, Payload) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), + emqx_bridge:send_message(BridgeID, Payload). + +query_resource(Config, Request) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + emqx_resource:query(ResourceID, Request). + +connect_direct_pgsql(Config) -> + Opts = #{ + host => ?config(pgsql_host, Config), + port => ?config(pgsql_port, Config), + username => ?PGSQL_USERNAME, + password => ?PGSQL_PASSWORD, + database => ?PGSQL_DATABASE + }, + + SslOpts = + case ?config(enable_tls, Config) of + true -> + Opts#{ + ssl => true, + ssl_opts => emqx_tls_lib:to_client_opts(#{enable => true}) + }; + false -> + Opts + end, + {ok, Con} = epgsql:connect(SslOpts), + Con. + +% These funs connect and then stop the pgsql connection +connect_and_create_table(Config) -> + Con = connect_direct_pgsql(Config), + {ok, _, _} = epgsql:squery(Con, ?SQL_CREATE_TABLE), + ok = epgsql:close(Con). + +connect_and_drop_table(Config) -> + Con = connect_direct_pgsql(Config), + {ok, _, _} = epgsql:squery(Con, ?SQL_DROP_TABLE), + ok = epgsql:close(Con). + +connect_and_clear_table(Config) -> + Con = connect_direct_pgsql(Config), + {ok, _} = epgsql:squery(Con, ?SQL_DELETE), + ok = epgsql:close(Con). + +connect_and_get_payload(Config) -> + Con = connect_direct_pgsql(Config), + {ok, _, [{Result}]} = epgsql:squery(Con, ?SQL_SELECT), + ok = epgsql:close(Con), + Result. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_setup_via_config_and_publish(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + ?check_trace( + begin + ?wait_async_action( + ?assertEqual({ok, 1}, send_message(Config, SentData)), + #{?snk_kind := pgsql_connector_query_return}, + 10_000 + ), + ?assertMatch( + Val, + connect_and_get_payload(Config) + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(pgsql_connector_query_return, Trace0), + case ?config(enable_batch, Config) of + true -> + ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace); + false -> + ?assertMatch([#{result := {ok, 1}}], Trace) + end, + ok + end + ), + ok. + +t_setup_via_http_api_and_publish(Config) -> + BridgeType = ?config(pgsql_bridge_type, Config), + Name = ?config(pgsql_name, Config), + PgsqlConfig0 = ?config(pgsql_config, Config), + PgsqlConfig = PgsqlConfig0#{ + <<"name">> => Name, + <<"type">> => BridgeType + }, + ?assertMatch( + {ok, _}, + create_bridge_http(PgsqlConfig) + ), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + ?check_trace( + begin + ?wait_async_action( + ?assertEqual({ok, 1}, send_message(Config, SentData)), + #{?snk_kind := pgsql_connector_query_return}, + 10_000 + ), + ?assertMatch( + Val, + connect_and_get_payload(Config) + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(pgsql_connector_query_return, Trace0), + case ?config(enable_batch, Config) of + true -> + ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace); + false -> + ?assertMatch([#{result := {ok, 1}}], Trace) + end, + ok + end + ), + ok. + +t_get_status(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)), + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch( + {ok, Status} when Status =:= disconnected orelse Status =:= connecting, + emqx_resource_manager:health_check(ResourceID) + ) + end), + ok. + +t_create_disconnected(Config) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + ?check_trace( + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch({ok, _}, create_bridge(Config)) + end), + fun(Trace) -> + ?assertMatch( + [#{error := {start_pool_failed, _, _}}], + ?of_kind(pgsql_connector_start_failed, Trace) + ), + ok + end + ), + ok. + +t_write_failure(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + {ok, _} = create_bridge(Config), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + ?check_trace( + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + send_message(Config, SentData) + end), + fun + ({error, {resource_error, _}}, _Trace) -> + ok; + ({error, {recoverable_error, disconnected}}, _Trace) -> + ok; + (_, _Trace) -> + ?assert(false) + end + ), + ok. + +% This test doesn't work with batch enabled since it is not possible +% to set the timeout directly for batch queries +t_write_timeout(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + {ok, _} = create_bridge(Config), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + Timeout = 10, + emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch( + {error, {resource_error, _}}, + query_resource(Config, {send_message, SentData, [], Timeout}) + ) + end), + ok. + +t_simple_sql_query(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Request = {sql, <<"SELECT count(1) AS T">>}, + Result = query_resource(Config, Request), + case ?config(enable_batch, Config) of + true -> ?assertEqual({error, batch_prepare_not_implemented}, Result); + false -> ?assertMatch({ok, _, [{1}]}, Result) + end, + ok. + +t_missing_data(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Result = send_message(Config, #{}), + ?assertMatch( + {error, {error, error, <<"23502">>, not_null_violation, _, _}}, Result + ), + ok. + +t_bad_sql_parameter(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Request = {sql, <<"">>, [bad_parameter]}, + Result = query_resource(Config, Request), + case ?config(enable_batch, Config) of + true -> + ?assertEqual({error, invalid_request}, Result); + false -> + ?assertMatch( + {error, {resource_error, _}}, Result + ) + end, + ok.