diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index cde8bbe3b..4b565a614 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -20,6 +20,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("epgsql/include/epgsql.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([roots/0, fields/1]). @@ -31,6 +32,7 @@ on_start/2, on_stop/2, on_query/3, + on_batch_query/3, on_get_status/2 ]). @@ -38,7 +40,8 @@ -export([ query/3, - prepared_query/3 + prepared_query/3, + execute_batch/3 ]). -export([do_get_status/1]). @@ -47,6 +50,18 @@ default_port => ?PGSQL_DEFAULT_PORT }). +-type prepares() :: #{atom() => binary()}. +-type params_tokens() :: #{atom() => list()}. + +-type state() :: + #{ + poolname := atom(), + auto_reconnect := boolean(), + prepare_sql := prepares(), + params_tokens := params_tokens(), + prepare_statement := epgsql:statement() + }. + %%===================================================================== roots() -> @@ -65,6 +80,7 @@ server() -> %% =================================================================== callback_mode() -> always_sync. +-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}. on_start( InstId, #{ @@ -87,7 +103,7 @@ on_start( case maps:get(enable, SSL) of true -> [ - {ssl, true}, + {ssl, required}, {ssl_opts, emqx_tls_lib:to_client_opts(SSL)} ]; false -> @@ -100,13 +116,21 @@ on_start( {password, emqx_secret:wrap(Password)}, {database, DB}, {auto_reconnect, reconn_interval(AutoReconn)}, - {pool_size, PoolSize}, - {prepare_statement, maps:to_list(maps:get(prepare_statement, Config, #{}))} + {pool_size, PoolSize} ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), + Prepares = parse_prepare_sql(Config), + InitState = #{poolname => PoolName, auto_reconnect => AutoReconn, prepare_statement => #{}}, + State = maps:merge(InitState, Prepares), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of - ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; - {error, Reason} -> {error, Reason} + ok -> + {ok, init_prepare(State)}; + {error, Reason} -> + ?tp( + pgsql_connector_start_failed, + #{error => Reason} + ), + {error, Reason} end. on_stop(InstId, #{poolname := PoolName}) -> @@ -116,37 +140,145 @@ on_stop(InstId, #{poolname := PoolName}) -> }), emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(InstId, {Type, NameOrSQL}, #{poolname := _PoolName} = State) -> - on_query(InstId, {Type, NameOrSQL, []}, State); -on_query(InstId, {Type, NameOrSQL, Params}, #{poolname := PoolName} = State) -> +on_query(InstId, {TypeOrKey, NameOrSQL}, #{poolname := _PoolName} = State) -> + on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); +on_query( + InstId, + {TypeOrKey, NameOrSQL, Params}, + #{poolname := PoolName} = State +) -> ?SLOG(debug, #{ msg => "postgresql connector received sql query", connector => InstId, + type => TypeOrKey, sql => NameOrSQL, state => State }), - case Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Params]}, no_handover) of + Type = pgsql_query_type(TypeOrKey), + {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), + on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data). + +pgsql_query_type(sql) -> + query; +pgsql_query_type(query) -> + query; +pgsql_query_type(prepared_query) -> + prepared_query; +%% for bridge +pgsql_query_type(_) -> + pgsql_query_type(prepared_query). + +on_batch_query( + InstId, + BatchReq, + #{poolname := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State +) -> + case BatchReq of + [{Key, _} = Request | _] -> + BinKey = to_bin(Key), + case maps:get(BinKey, Tokens, undefined) of + undefined -> + Log = #{ + connector => InstId, + first_request => Request, + state => State, + msg => "batch prepare not implemented" + }, + ?SLOG(error, Log), + {error, batch_prepare_not_implemented}; + TokenList -> + {_, Datas} = lists:unzip(BatchReq), + Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], + St = maps:get(BinKey, Sts), + {_Column, Results} = on_sql_query(InstId, PoolName, execute_batch, St, Datas2), + %% this local function only suits for the result of batch insert + TransResult = fun + Trans([{ok, Count} | T], Acc) -> + Trans(T, Acc + Count); + Trans([{error, _} = Error | _], _Acc) -> + Error; + Trans([], Acc) -> + {ok, Acc} + end, + + TransResult(Results, 0) + end; + _ -> + Log = #{ + connector => InstId, + request => BatchReq, + state => State, + msg => "invalid request" + }, + ?SLOG(error, Log), + {error, invalid_request} + end. + +proc_sql_params(query, SQLOrKey, Params, _State) -> + {SQLOrKey, Params}; +proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> + {SQLOrKey, Params}; +proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) -> + Key = to_bin(TypeOrKey), + case maps:get(Key, ParamsTokens, undefined) of + undefined -> + {SQLOrData, Params}; + Tokens -> + {Key, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} + end. + +on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> + Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover), + case Result of {error, Reason} -> ?SLOG(error, #{ msg => "postgresql connector do sql query failed", connector => InstId, + type => Type, sql => NameOrSQL, reason => Reason }); _ -> + ?tp( + pgsql_connector_query_return, + #{result => Result} + ), ok end, Result. -on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) -> +on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) -> case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of - true -> connected; - false -> conn_status(AutoReconn) + true -> + case do_check_prepares(State) of + ok -> + connected; + {ok, NState} -> + %% return new state with prepared statements + {connected, NState}; + false -> + %% do not log error, it is logged in prepare_sql_to_conn + conn_status(AutoReconn) + end; + false -> + conn_status(AutoReconn) end. do_get_status(Conn) -> ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")). +do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> + ok; +do_check_prepares(State = #{poolname := PoolName, prepare_sql := {error, Prepares}}) -> + %% retry to prepare + case prepare_sql(Prepares, PoolName) of + {ok, Sts} -> + %% remove the error + {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}; + _Error -> + false + end. + %% =================================================================== conn_status(_AutoReconn = true) -> connecting; conn_status(_AutoReconn = false) -> disconnected. @@ -158,13 +290,9 @@ connect(Opts) -> Host = proplists:get_value(host, Opts), Username = proplists:get_value(username, Opts), Password = emqx_secret:unwrap(proplists:get_value(password, Opts)), - PrepareStatement = proplists:get_value(prepare_statement, Opts), case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of - {ok, Conn} -> - case parse(Conn, PrepareStatement) of - ok -> {ok, Conn}; - {error, Reason} -> {error, Reason} - end; + {ok, _Conn} = Ok -> + Ok; {error, Reason} -> {error, Reason} end. @@ -175,15 +303,8 @@ query(Conn, SQL, Params) -> prepared_query(Conn, Name, Params) -> epgsql:prepared_query2(Conn, Name, Params). -parse(_Conn, []) -> - ok; -parse(Conn, [{Name, Query} | More]) -> - case epgsql:parse2(Conn, Name, Query, []) of - {ok, _Statement} -> - parse(Conn, More); - Other -> - Other - end. +execute_batch(Conn, Statement, Params) -> + epgsql:execute_batch(Conn, Statement, Params). conn_opts(Opts) -> conn_opts(Opts, []). @@ -206,3 +327,91 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) -> conn_opts(Opts, [Opt | Acc]); conn_opts([_Opt | Opts], Acc) -> conn_opts(Opts, Acc). + +parse_prepare_sql(Config) -> + SQL = + case maps:get(prepare_statement, Config, undefined) of + undefined -> + case maps:get(sql, Config, undefined) of + undefined -> #{}; + Template -> #{<<"send_message">> => Template} + end; + Any -> + Any + end, + parse_prepare_sql(maps:to_list(SQL), #{}, #{}). + +parse_prepare_sql([{Key, H} | T], Prepares, Tokens) -> + {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '$n'), + parse_prepare_sql( + T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} + ); +parse_prepare_sql([], Prepares, Tokens) -> + #{ + prepare_sql => Prepares, + params_tokens => Tokens + }. + +init_prepare(State = #{prepare_sql := Prepares, poolname := PoolName}) -> + case maps:size(Prepares) of + 0 -> + State; + _ -> + case prepare_sql(Prepares, PoolName) of + {ok, Sts} -> + State#{prepare_statement := Sts}; + Error -> + LogMeta = #{ + msg => <<"PostgreSQL init prepare statement failed">>, error => Error + }, + ?SLOG(error, LogMeta), + %% mark the prepare_sqlas failed + State#{prepare_sql => {error, Prepares}} + end + end. + +prepare_sql(Prepares, PoolName) when is_map(Prepares) -> + prepare_sql(maps:to_list(Prepares), PoolName); +prepare_sql(Prepares, PoolName) -> + case do_prepare_sql(Prepares, PoolName) of + {ok, _Sts} = Ok -> + %% prepare for reconnect + ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), + Ok; + Error -> + Error + end. + +do_prepare_sql(Prepares, PoolName) -> + do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}). + +do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) -> + {ok, Conn} = ecpool_worker:client(Worker), + case prepare_sql_to_conn(Conn, Prepares) of + {ok, Sts} -> + do_prepare_sql(T, Prepares, PoolName, Sts); + Error -> + Error + end; +do_prepare_sql([], _Prepares, _PoolName, LastSts) -> + {ok, LastSts}. + +prepare_sql_to_conn(Conn, Prepares) -> + prepare_sql_to_conn(Conn, Prepares, #{}). + +prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements}; +prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) -> + LogMeta = #{msg => "PostgreSQL Prepare Statement", name => Key, prepare_sql => SQL}, + ?SLOG(info, LogMeta), + case epgsql:parse2(Conn, Key, SQL, []) of + {ok, Statement} -> + prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement}); + {error, Error} = Other -> + ?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}), + Other + end. + +to_bin(Bin) when is_binary(Bin) -> + Bin; +to_bin(Atom) when is_atom(Atom) -> + erlang:atom_to_binary(Atom). diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_pgsql.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_pgsql.conf new file mode 100644 index 000000000..0f80e1a1b --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_pgsql.conf @@ -0,0 +1,74 @@ +emqx_ee_bridge_pgsql { + + local_topic { + desc { + en: """The MQTT topic filter to be forwarded to PostgreSQL. All MQTT 'PUBLISH' messages with the topic +matching the local_topic will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is +configured, then both the data got from the rule and the MQTT messages that match local_topic +will be forwarded. +""" + zh: """发送到 'local_topic' 的消息都会转发到 PostgreSQL。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。 +""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + + sql_template { + desc { + en: """SQL Template""" + zh: """SQL 模板""" + } + label { + en: "SQL Template" + zh: "SQL 模板" + } + } + config_enable { + desc { + en: """Enable or disable this bridge""" + zh: """启用/禁用桥接""" + } + label { + en: "Enable Or Disable Bridge" + zh: "启用/禁用桥接" + } + } + + desc_config { + desc { + en: """Configuration for an PostgreSQL bridge.""" + zh: """PostgreSQL 桥接配置""" + } + label: { + en: "PostgreSQL Bridge Configuration" + zh: "PostgreSQL 桥接配置" + } + } + + desc_type { + desc { + en: """The Bridge Type""" + zh: """Bridge 类型""" + } + label { + en: "Bridge Type" + zh: "桥接类型" + } + } + + desc_name { + desc { + en: """Bridge name.""" + zh: """桥接名字""" + } + label { + en: "Bridge Name" + zh: "桥接名字" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 2e93a313c..d0099db1c 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -17,6 +17,7 @@ api_schemas(Method) -> ref(emqx_ee_bridge_gcp_pubsub, Method), ref(emqx_ee_bridge_kafka, Method), ref(emqx_ee_bridge_mysql, Method), + ref(emqx_ee_bridge_pgsql, Method), ref(emqx_ee_bridge_mongodb, Method ++ "_rs"), ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"), ref(emqx_ee_bridge_mongodb, Method ++ "_single"), @@ -36,7 +37,8 @@ schema_modules() -> emqx_ee_bridge_influxdb, emqx_ee_bridge_mongodb, emqx_ee_bridge_mysql, - emqx_ee_bridge_redis + emqx_ee_bridge_redis, + emqx_ee_bridge_pgsql ]. examples(Method) -> @@ -63,7 +65,8 @@ resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb; resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb; resource_type(redis_single) -> emqx_ee_connector_redis; resource_type(redis_sentinel) -> emqx_ee_connector_redis; -resource_type(redis_cluster) -> emqx_ee_connector_redis. +resource_type(redis_cluster) -> emqx_ee_connector_redis; +resource_type(pgsql) -> emqx_connector_pgsql. fields(bridges) -> [ @@ -98,6 +101,14 @@ fields(bridges) -> desc => <<"MySQL Bridge Config">>, required => false } + )}, + {pgsql, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_pgsql, "config")), + #{ + desc => <<"PostgreSQL Bridge Config">>, + required => false + } )} ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs(). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl new file mode 100644 index 000000000..1f9a005c9 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl @@ -0,0 +1,130 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_pgsql). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(DEFAULT_SQL, << + "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) " + "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))" +>>). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"pgsql">> => #{ + summary => <<"PostgreSQL Bridge">>, + value => values(Method) + } + } + ]. + +values(get) -> + maps:merge(values(post), ?METRICS_EXAMPLE); +values(post) -> + #{ + enable => true, + type => pgsql, + name => <<"foo">>, + server => <<"127.0.0.1:5432">>, + database => <<"mqtt">>, + pool_size => 8, + username => <<"root">>, + password => <<"public">>, + auto_reconnect => true, + sql => ?DEFAULT_SQL, + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 8, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => async, + max_queue_bytes => ?DEFAULT_QUEUE_SIZE + } + }; +values(put) -> + values(post). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_pgsql". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {sql, + mk( + binary(), + #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + )}, + {local_topic, + mk( + binary(), + #{desc => ?DESC("local_topic"), default => undefined} + )}, + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ] ++ + emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields(); +fields("creation_opts") -> + Opts = emqx_resource_schema:fields("creation_opts"), + [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."]; +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- +%% internal +is_hidden_opts(Field) -> + lists:member(Field, [ + async_inflight_window + ]). + +type_field() -> + {type, mk(enum([mysql]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.