From 64c015cf6fe9a49e81334846b0d85f2085687eb8 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 23 Nov 2023 18:50:19 +0100 Subject: [PATCH] fix: unify the schema modules in emqx_bridge_pgsql --- .../src/emqx_bridge_matrix.erl | 20 +- .../src/emqx_bridge_pgsql.erl | 229 +++++++++++++----- .../src/emqx_bridge_pgsql_action_info.erl | 2 +- .../src/schema/emqx_bridge_pgsql_schema.erl | 172 ------------- .../src/emqx_bridge_timescale.erl | 20 +- .../emqx_postgresql_connector_schema.erl | 2 +- 6 files changed, 193 insertions(+), 252 deletions(-) delete mode 100644 apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl index acfd86ded..78810bc9e 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl @@ -28,7 +28,7 @@ conn_bridge_examples(Method) -> #{ <<"matrix">> => #{ summary => <<"Matrix Bridge">>, - value => emqx_bridge_pgsql_schema:values_conn_bridge_examples(Method, matrix) + value => emqx_bridge_pgsql:values_conn_bridge_examples(Method, matrix) } } ]. @@ -42,28 +42,28 @@ roots() -> []. fields("post") -> emqx_bridge_pgsql:fields("post", matrix); fields("config_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields(action) -> {matrix, hoconsc:mk( - hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)), #{ desc => <<"Matrix Action Config">>, required => false } )}; fields("put_bridge_v2") -> - emqx_bridge_pgsql_schema:fields(pgsql_action); + emqx_bridge_pgsql:fields(pgsql_action); fields("get_bridge_v2") -> - emqx_bridge_pgsql_schema:fields(pgsql_action); + emqx_bridge_pgsql:fields(pgsql_action); fields("post_bridge_v2") -> - emqx_bridge_pgsql_schema:fields(pgsql_action); + emqx_bridge_pgsql:fields(pgsql_action); fields("put_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields("get_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields("post_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields(Method) -> emqx_bridge_pgsql:fields(Method). @@ -87,7 +87,7 @@ bridge_v2_examples(Method) -> #{ <<"matrix">> => #{ summary => <<"Matrix Action">>, - value => emqx_bridge_pgsql_schema:values({Method, matrix}) + value => emqx_bridge_pgsql:values({Method, matrix}) } } ]. diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl index bb15dfad9..fc0680aba 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl @@ -1,83 +1,91 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- + -module(emqx_bridge_pgsql). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("emqx_postgresql/include/emqx_postgresql.hrl"). -include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include_lib("epgsql/include/epgsql.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). --import(hoconsc, [mk/2, enum/1, ref/2]). - --export([ - conn_bridge_examples/1, - values/2, - fields/2 -]). - -export([ namespace/0, roots/0, fields/1, - desc/1 + desc/1, + values/2, + fields/2 ]). --define(DEFAULT_SQL, << - "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) " - "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))" ->>). +%% Examples +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1 +]). -%% ------------------------------------------------------------------------------------------------- -%% api +%% Exported for timescale and matrix bridges +-export([ + values/1, + values_conn_bridge_examples/2 +]). -conn_bridge_examples(Method) -> - [ - #{ - <<"pgsql">> => #{ - summary => <<"PostgreSQL Bridge">>, - value => values(Method, pgsql) - } - } - ]. +-define(PGSQL_HOST_OPTIONS, #{ + default_port => ?PGSQL_DEFAULT_PORT +}). -values(_Method, Type) -> - #{ - enable => true, - type => Type, - name => <<"foo">>, - server => <<"127.0.0.1:5432">>, - database => <<"mqtt">>, - pool_size => 8, - username => <<"root">>, - password => <<"******">>, - sql => ?DEFAULT_SQL, - local_topic => <<"local/topic/#">>, - resource_opts => #{ - worker_pool_size => 8, - health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - batch_size => ?DEFAULT_BATCH_SIZE, - batch_time => ?DEFAULT_BATCH_TIME, - query_mode => async, - max_buffer_bytes => ?DEFAULT_BUFFER_BYTES - } - }. - -%% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions namespace() -> "bridge_pgsql". -roots() -> []. +roots() -> + []. +fields("config_connector") -> + emqx_postgresql_connector_schema:fields("config_connector"); +fields(config) -> + fields("config_connector") ++ + fields(action); +fields(action) -> + {pgsql, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)), + #{ + desc => <<"PostgreSQL Action Config">>, + required => false + } + )}; +fields(action_parameters) -> + [ + {sql, + hoconsc:mk( + binary(), + #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>} + )} + ] ++ + emqx_connector_schema_lib:prepare_statement_fields(); +fields(pgsql_action) -> + emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters)); +%% TODO: All of these needs to be fixed +fields("put_bridge_v2") -> + fields(pgsql_action); +fields("get_bridge_v2") -> + fields(pgsql_action); +fields("post_bridge_v2") -> + fields(pgsql_action); fields("config") -> [ - {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {sql, - mk( + hoconsc:mk( binary(), - #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>} )}, {local_topic, - mk( + hoconsc:mk( binary(), #{desc => ?DESC("local_topic"), default => undefined} )} @@ -94,6 +102,12 @@ fields("get") -> fields("post", Type) -> [type_field(Type), name_field() | fields("config")]. +type_field(Type) -> + {type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. + desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> @@ -101,10 +115,109 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(_) -> undefined. -%% ------------------------------------------------------------------------------------------------- +default_sql() -> + << + "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) " + "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))" + >>. -type_field(Type) -> - {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}. +%% Examples -name_field() -> - {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. +bridge_v2_examples(Method) -> + [ + #{ + <<"pgsql">> => #{ + summary => <<"PostgreSQL Producer Action">>, + value => values({Method, bridge_v2_producer}) + } + } + ]. + +conn_bridge_examples(Method) -> + [ + #{ + <<"pgsql">> => #{ + summary => <<"PostgreSQL Producer Bridge">>, + value => values_conn_bridge_examples(Method, pgsql) + } + } + ]. + +values({get, PostgreSQLType}) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values({post, PostgreSQLType}) + ); +values({post, PostgreSQLType}) -> + maps:merge( + #{ + name => <<"my_pgsql_action">>, + type => PostgreSQLType + }, + values({put, PostgreSQLType}) + ); +values({put, PostgreSQLType}) -> + maps:merge( + #{ + enable => true, + connector => <<"my_pgsql_connector">>, + resource_opts => #{ + health_check_interval => "32s" + } + }, + values({producer, PostgreSQLType}) + ); +values({producer, _PostgreSQLType}) -> + #{ + <<"enable">> => true, + <<"connector">> => <<"connector_pgsql_test">>, + <<"parameters">> => #{ + <<"sql">> => + <<"INSERT INTO client_events(clientid, event, created_at) VALUES (\n ${clientid},\n ${event},\n TO_TIMESTAMP((${timestamp} :: bigint))\n)">> + }, + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"batch_time">> => <<"0ms">>, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"query_mode">> => <<"async">>, + <<"request_ttl">> => <<"45s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">>, + <<"worker_pool_size">> => 16 + } + }. + +values_conn_bridge_examples(_Method, Type) -> + #{ + enable => true, + type => Type, + name => <<"foo">>, + server => <<"127.0.0.1:5432">>, + database => <<"mqtt">>, + pool_size => 8, + username => <<"root">>, + password => <<"******">>, + sql => default_sql(), + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 8, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => async, + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + } + }. + +values(Method, Type) -> + values_conn_bridge_examples(Method, Type). diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl index e353eb440..c702b396b 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl @@ -19,4 +19,4 @@ action_type_name() -> pgsql. connector_type_name() -> pgsql. -schema_module() -> emqx_bridge_pgsql_schema. +schema_module() -> emqx_bridge_pgsql. diff --git a/apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl b/apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl deleted file mode 100644 index a3ad4bb11..000000000 --- a/apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl +++ /dev/null @@ -1,172 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_bridge_pgsql_schema). - --include_lib("emqx_connector/include/emqx_connector.hrl"). --include_lib("emqx_postgresql/include/emqx_postgresql.hrl"). --include_lib("typerefl/include/types.hrl"). --include_lib("emqx/include/logger.hrl"). --include_lib("hocon/include/hoconsc.hrl"). --include_lib("epgsql/include/epgsql.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). --include_lib("emqx_resource/include/emqx_resource.hrl"). - --export([roots/0, fields/1]). - -%% Examples --export([ - bridge_v2_examples/1, - conn_bridge_examples/1 -]). - -%% Exported for timescale and matrix bridges --export([ - values/1, - values_conn_bridge_examples/2 -]). - --define(PGSQL_HOST_OPTIONS, #{ - default_port => ?PGSQL_DEFAULT_PORT -}). - -roots() -> - [{config, #{type => hoconsc:ref(?MODULE, config)}}]. - -fields("config_connector") -> - emqx_postgresql_connector_schema:fields("config_connector"); -fields(config) -> - fields("config_connector") ++ - fields(action); -fields(action) -> - {pgsql, - hoconsc:mk( - hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), - #{ - desc => <<"PostgreSQL Action Config">>, - required => false - } - )}; -fields(action_parameters) -> - [ - {sql, - hoconsc:mk( - binary(), - #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>} - )} - ] ++ - emqx_connector_schema_lib:prepare_statement_fields(); -fields(pgsql_action) -> - emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters)); -%% TODO: All of these needs to be fixed -fields("put_bridge_v2") -> - fields(pgsql_action); -fields("get_bridge_v2") -> - fields(pgsql_action); -fields("post_bridge_v2") -> - fields(pgsql_action). - -default_sql() -> - << - "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) " - "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))" - >>. - -%% Examples - -bridge_v2_examples(Method) -> - [ - #{ - <<"pgsql">> => #{ - summary => <<"PostgreSQL Producer Action">>, - value => values({Method, bridge_v2_producer}) - } - } - ]. - -conn_bridge_examples(Method) -> - [ - #{ - <<"pgsql">> => #{ - summary => <<"PostgreSQL Producer Bridge">>, - value => values_conn_bridge_examples(Method, pgsql) - } - } - ]. - -values({get, PostgreSQLType}) -> - maps:merge( - #{ - status => <<"connected">>, - node_status => [ - #{ - node => <<"emqx@localhost">>, - status => <<"connected">> - } - ] - }, - values({post, PostgreSQLType}) - ); -values({post, PostgreSQLType}) -> - maps:merge( - #{ - name => <<"my_pgsql_action">>, - type => PostgreSQLType - }, - values({put, PostgreSQLType}) - ); -values({put, PostgreSQLType}) -> - maps:merge( - #{ - enable => true, - connector => <<"my_pgsql_connector">>, - resource_opts => #{ - health_check_interval => "32s" - } - }, - values({producer, PostgreSQLType}) - ); -values({producer, _PostgreSQLType}) -> - #{ - <<"enable">> => true, - <<"connector">> => <<"connector_pgsql_test">>, - <<"parameters">> => #{ - <<"sql">> => - <<"INSERT INTO client_events(clientid, event, created_at) VALUES (\n ${clientid},\n ${event},\n TO_TIMESTAMP((${timestamp} :: bigint))\n)">> - }, - <<"resource_opts">> => #{ - <<"batch_size">> => 1, - <<"batch_time">> => <<"0ms">>, - <<"health_check_interval">> => <<"15s">>, - <<"inflight_window">> => 100, - <<"max_buffer_bytes">> => <<"256MB">>, - <<"query_mode">> => <<"async">>, - <<"request_ttl">> => <<"45s">>, - <<"start_after_created">> => true, - <<"start_timeout">> => <<"5s">>, - <<"worker_pool_size">> => 16 - } - }. - -values_conn_bridge_examples(_Method, Type) -> - #{ - enable => true, - type => Type, - name => <<"foo">>, - server => <<"127.0.0.1:5432">>, - database => <<"mqtt">>, - pool_size => 8, - username => <<"root">>, - password => <<"******">>, - sql => default_sql(), - local_topic => <<"local/topic/#">>, - resource_opts => #{ - worker_pool_size => 8, - health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - batch_size => ?DEFAULT_BATCH_SIZE, - batch_time => ?DEFAULT_BATCH_TIME, - query_mode => async, - max_buffer_bytes => ?DEFAULT_BUFFER_BYTES - } - }. diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl index edeef26d4..9cefabc15 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl @@ -28,7 +28,7 @@ conn_bridge_examples(Method) -> #{ <<"timescale">> => #{ summary => <<"Timescale Bridge">>, - value => emqx_bridge_pgsql_schema:values_conn_bridge_examples(Method, timescale) + value => emqx_bridge_pgsql:values_conn_bridge_examples(Method, timescale) } } ]. @@ -42,28 +42,28 @@ roots() -> []. fields("post") -> emqx_bridge_pgsql:fields("post", timescale); fields("config_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields(action) -> {timescale, hoconsc:mk( - hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)), #{ desc => <<"Timescale Action Config">>, required => false } )}; fields("put_bridge_v2") -> - emqx_bridge_pgsql_schema:fields(pgsql_action); + emqx_bridge_pgsql:fields(pgsql_action); fields("get_bridge_v2") -> - emqx_bridge_pgsql_schema:fields(pgsql_action); + emqx_bridge_pgsql:fields(pgsql_action); fields("post_bridge_v2") -> - emqx_bridge_pgsql_schema:fields(pgsql_action); + emqx_bridge_pgsql:fields(pgsql_action); fields("put_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields("get_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields("post_connector") -> - emqx_bridge_pgsql_schema:fields("config_connector"); + emqx_bridge_pgsql:fields("config_connector"); fields(Method) -> emqx_bridge_pgsql:fields(Method). @@ -87,7 +87,7 @@ bridge_v2_examples(Method) -> #{ <<"timescale">> => #{ summary => <<"Timescale Action">>, - value => emqx_bridge_pgsql_schema:values({Method, timescale}) + value => emqx_bridge_pgsql:values({Method, timescale}) } } ]. diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl index d709f87c3..1d6949856 100644 --- a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl +++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl @@ -47,7 +47,7 @@ fields(config) -> fields(action) -> {pgsql, hoconsc:mk( - hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)), #{ desc => <<"PostgreSQL Action Config">>, required => false